diff options
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog-journal-handler.c')
-rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog-journal-handler.c | 985 |
1 files changed, 985 insertions, 0 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c new file mode 100644 index 00000000000..6ee6f9f074f --- /dev/null +++ b/xlators/features/changelog/lib/src/gf-changelog-journal-handler.c @@ -0,0 +1,985 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "uuid.h" +#include "globals.h" +#include "glusterfs.h" + +#include "gf-changelog-helpers.h" + +/* from the changelog translator */ +#include "changelog-misc.h" +#include "changelog-mem-types.h" + +#include "gf-changelog-journal.h" + +extern int byebye; + +/** + * number of gfid records after fop number + */ +int nr_gfids[] = { + [GF_FOP_MKNOD] = 1, + [GF_FOP_MKDIR] = 1, + [GF_FOP_UNLINK] = 1, + [GF_FOP_RMDIR] = 1, + [GF_FOP_SYMLINK] = 1, + [GF_FOP_RENAME] = 2, + [GF_FOP_LINK] = 1, + [GF_FOP_CREATE] = 1, +}; + +int nr_extra_recs[] = { + [GF_FOP_MKNOD] = 3, + [GF_FOP_MKDIR] = 3, + [GF_FOP_UNLINK] = 0, + [GF_FOP_RMDIR] = 0, + [GF_FOP_SYMLINK] = 0, + [GF_FOP_RENAME] = 0, + [GF_FOP_LINK] = 0, + [GF_FOP_CREATE] = 3, +}; + +static char * +binary_to_ascii (uuid_t uuid) +{ + return uuid_utoa (uuid); +} + +static char * +conv_noop (char *ptr) { return ptr; } + +#define VERIFY_SEPARATOR(ptr, plen, perr) \ + { \ + if (*(ptr + plen) != '\0') { \ + perr = 1; \ + break; \ + } \ + } + +#define MOVER_MOVE(mover, nleft, bytes) \ + { \ + mover += bytes; \ + nleft -= bytes; \ + } \ + +#define PARSE_GFID(mov, ptr, le, fn, perr) \ + { \ + VERIFY_SEPARATOR (mov, le, perr); \ + ptr = fn (mov); \ + if (!ptr) { \ + perr = 1; \ + break; \ + } \ + } + +#define FILL_AND_MOVE(pt, buf, of, mo, nl, le) \ + { \ + GF_CHANGELOG_FILL_BUFFER (pt, buf, of, strlen (pt)); \ + MOVER_MOVE (mo, nl, le); \ + } + + +#define PARSE_GFID_MOVE(ptr, uuid, mover, nleft, perr) \ + { \ + memcpy (uuid, mover, sizeof (uuid_t)); \ + ptr = binary_to_ascii (uuid); \ + if (!ptr) { \ + perr = 1; \ + break; \ + } \ + MOVER_MOVE (mover, nleft, sizeof (uuid_t)); \ + } \ + +#define LINE_BUFSIZE (3*PATH_MAX) /* enough buffer for extra chars too */ + +/** + * using mmap() makes parsing easy. fgets() cannot be used here as + * the binary gfid could contain a line-feed (0x0A), in that case fgets() + * would read an incomplete line and parsing would fail. using POSIX fds + * would result is additional code to maintain state in case of partial + * reads of data (where multiple entries do not fit extirely in the buffer). + * + * mmap() gives the flexibility of pointing to an offset in the file + * without us worrying about reading it in memory (VM does that for us for + * free). + */ + +static int +gf_changelog_parse_binary (xlator_t *this, + gf_changelog_journal_t *jnl, + int from_fd, int to_fd, + size_t start_offset, struct stat *stbuf) + +{ + int ret = -1; + off_t off = 0; + off_t nleft = 0; + uuid_t uuid = {0,}; + char *ptr = NULL; + char *bname_start = NULL; + char *bname_end = NULL; + char *mover = NULL; + void *start = NULL; + char current_mover = ' '; + size_t blen = 0; + int parse_err = 0; + char ascii[LINE_BUFSIZE] = {0,}; + + nleft = stbuf->st_size; + + start = mmap (NULL, nleft, PROT_READ, MAP_PRIVATE, from_fd, 0); + if (start == MAP_FAILED) { + gf_log (this->name, GF_LOG_ERROR, + "mmap() error (reason: %s)", strerror (errno)); + goto out; + } + + mover = start; + + MOVER_MOVE (mover, nleft, start_offset); + + while (nleft > 0) { + + off = blen = 0; + ptr = bname_start = bname_end = NULL; + + current_mover = *mover; + + switch (current_mover) { + case 'D': + case 'M': + MOVER_MOVE (mover, nleft, 1); + PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err); + + break; + + case 'E': + MOVER_MOVE (mover, nleft, 1); + PARSE_GFID_MOVE (ptr, uuid, mover, nleft, parse_err); + + bname_start = mover; + bname_end = strchr (mover, '\n'); + if (bname_end == NULL) { + parse_err = 1; + break; + } + + blen = bname_end - bname_start; + MOVER_MOVE (mover, nleft, blen); + + break; + + default: + parse_err = 1; + } + + if (parse_err) + break; + + GF_CHANGELOG_FILL_BUFFER (¤t_mover, ascii, off, 1); + GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1); + GF_CHANGELOG_FILL_BUFFER (ptr, ascii, off, strlen (ptr)); + if (blen) + GF_CHANGELOG_FILL_BUFFER (bname_start, + ascii, off, blen); + GF_CHANGELOG_FILL_BUFFER ("\n", ascii, off, 1); + + if (gf_changelog_write (to_fd, ascii, off) != off) { + gf_log (this->name, GF_LOG_ERROR, + "processing binary changelog failed due to " + " error in writing ascii change (reason: %s)", + strerror (errno)); + break; + } + + MOVER_MOVE (mover, nleft, 1); + } + + if ((nleft == 0) && (!parse_err)) + ret = 0; + + if (munmap (start, stbuf->st_size)) + gf_log (this->name, GF_LOG_ERROR, + "munmap() error (reason: %s)", strerror (errno)); + out: + return ret; +} + +/** + * ascii decoder: + * - separate out one entry from another + * - use fop name rather than fop number + */ +static int +gf_changelog_parse_ascii (xlator_t *this, + gf_changelog_journal_t *jnl, + int from_fd, int to_fd, + size_t start_offset, struct stat *stbuf) +{ + int ng = 0; + int ret = -1; + int fop = 0; + int len = 0; + off_t off = 0; + off_t nleft = 0; + char *ptr = NULL; + char *eptr = NULL; + void *start = NULL; + char *mover = NULL; + int parse_err = 0; + char current_mover = ' '; + char ascii[LINE_BUFSIZE] = {0,}; + const char *fopname = NULL; + + nleft = stbuf->st_size; + + start = mmap (NULL, nleft, PROT_READ, MAP_PRIVATE, from_fd, 0); + if (start == MAP_FAILED) { + gf_log (this->name, GF_LOG_ERROR, + "mmap() error (reason: %s)", strerror (errno)); + goto out; + } + + mover = start; + + MOVER_MOVE (mover, nleft, start_offset); + + while (nleft > 0) { + off = 0; + current_mover = *mover; + + GF_CHANGELOG_FILL_BUFFER (¤t_mover, ascii, off, 1); + GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1); + + switch (current_mover) { + case 'D': + MOVER_MOVE (mover, nleft, 1); + + /* target gfid */ + PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN, + conv_noop, parse_err); + FILL_AND_MOVE(ptr, ascii, off, + mover, nleft, UUID_CANONICAL_FORM_LEN); + break; + case 'M': + MOVER_MOVE (mover, nleft, 1); + + /* target gfid */ + PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN, + conv_noop, parse_err); + FILL_AND_MOVE (ptr, ascii, off, + mover, nleft, UUID_CANONICAL_FORM_LEN); + FILL_AND_MOVE (" ", ascii, off, mover, nleft, 1); + + /* fop */ + len = strlen (mover); + VERIFY_SEPARATOR (mover, len, parse_err); + + fop = atoi (mover); + fopname = gf_fop_list[fop]; + if (fopname == NULL) { + parse_err = 1; + break; + } + + MOVER_MOVE (mover, nleft, len); + + len = strlen (fopname); + GF_CHANGELOG_FILL_BUFFER (fopname, ascii, off, len); + + break; + + case 'E': + MOVER_MOVE (mover, nleft, 1); + + /* target gfid */ + PARSE_GFID (mover, ptr, UUID_CANONICAL_FORM_LEN, + conv_noop, parse_err); + FILL_AND_MOVE (ptr, ascii, off, + mover, nleft, UUID_CANONICAL_FORM_LEN); + FILL_AND_MOVE (" ", ascii, off, + mover, nleft, 1); + + /* fop */ + len = strlen (mover); + VERIFY_SEPARATOR (mover, len, parse_err); + + fop = atoi (mover); + fopname = gf_fop_list[fop]; + if (fopname == NULL) { + parse_err = 1; + break; + } + + MOVER_MOVE (mover, nleft, len); + + len = strlen (fopname); + GF_CHANGELOG_FILL_BUFFER (fopname, ascii, off, len); + + ng = nr_extra_recs[fop]; + for (; ng > 0; ng--) { + MOVER_MOVE (mover, nleft, 1); + len = strlen (mover); + VERIFY_SEPARATOR (mover, len, parse_err); + + GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1); + FILL_AND_MOVE (mover, ascii, + off, mover, nleft, len); + } + + /* pargfid + bname */ + ng = nr_gfids[fop]; + while (ng-- > 0) { + MOVER_MOVE (mover, nleft, 1); + len = strlen (mover); + GF_CHANGELOG_FILL_BUFFER (" ", ascii, off, 1); + + PARSE_GFID (mover, ptr, len, + conv_noop, parse_err); + eptr = calloc (3, strlen (ptr)); + if (!eptr) { + parse_err = 1; + break; + } + + gf_rfc3986_encode ((unsigned char *) ptr, + eptr, jnl->rfc3986); + FILL_AND_MOVE (eptr, ascii, off, + mover, nleft, len); + free (eptr); + } + + break; + default: + parse_err = 1; + } + + if (parse_err) + break; + + GF_CHANGELOG_FILL_BUFFER ("\n", ascii, off, 1); + + if (gf_changelog_write (to_fd, ascii, off) != off) { + gf_log (this->name, GF_LOG_ERROR, + "processing ascii changelog failed due to " + " error in writing change (reason: %s)", + strerror (errno)); + break; + } + + MOVER_MOVE (mover, nleft, 1); + + } + + if ((nleft == 0) && (!parse_err)) + ret = 0; + + if (munmap (start, stbuf->st_size)) + gf_log (this->name, GF_LOG_ERROR, + "munmap() error (reason: %s)", strerror (errno)); + + out: + return ret; +} + +#define COPY_BUFSIZE 8192 +static int +gf_changelog_copy (xlator_t *this, int from_fd, int to_fd) +{ + ssize_t size = 0; + char buffer[COPY_BUFSIZE+1] = {0,}; + + while (1) { + size = read (from_fd, buffer, COPY_BUFSIZE); + if (size <= 0) + break; + + if (gf_changelog_write (to_fd, + buffer, size) != size) { + gf_log (this->name, GF_LOG_ERROR, + "error processing ascii changlog"); + size = -1; + break; + } + } + + return (size < 0 ? -1 : 0); +} + +static int +gf_changelog_decode (xlator_t *this, gf_changelog_journal_t *jnl, + int from_fd, int to_fd, struct stat *stbuf, int *zerob) +{ + int ret = -1; + int encoding = -1; + size_t elen = 0; + char buffer[1024] = {0,}; + + CHANGELOG_GET_ENCODING (from_fd, buffer, 1024, encoding, elen); + if (encoding == -1) /* unknown encoding */ + goto out; + + if (!CHANGELOG_VALID_ENCODING (encoding)) + goto out; + + if (elen == stbuf->st_size) { + *zerob = 1; + goto out; + } + + /** + * start processing after the header + */ + lseek (from_fd, elen, SEEK_SET); + + switch (encoding) { + case CHANGELOG_ENCODE_BINARY: + /** + * this ideally should have been a part of changelog-encoders.c + * (ie. part of the changelog translator). + */ + ret = gf_changelog_parse_binary (this, jnl, from_fd, + to_fd, elen, stbuf); + break; + + case CHANGELOG_ENCODE_ASCII: + ret = gf_changelog_parse_ascii (this, jnl, from_fd, + to_fd, elen, stbuf); + break; + default: + ret = gf_changelog_copy (this, from_fd, to_fd); + } + + out: + return ret; +} + +int +gf_changelog_publish (xlator_t *this, + gf_changelog_journal_t *jnl, char *from_path) +{ + int ret = 0; + char dest[PATH_MAX] = {0,}; + char to_path[PATH_MAX] = {0,}; + struct stat stbuf = {0,}; + + (void) snprintf (to_path, PATH_MAX, "%s%s", + jnl->jnl_current_dir, basename (from_path)); + + /* handle zerob file that wont exist in current */ + ret = stat (to_path, &stbuf); + if (ret) { + if (errno == ENOENT) + ret = 0; + goto out; + } + + (void) snprintf (dest, PATH_MAX, "%s%s", + jnl->jnl_processing_dir, basename (from_path)); + + ret = rename (to_path, dest); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "error moving %s to processing dir" + " (reason: %s)", to_path, strerror (errno)); + } + +out: + return ret; +} + +int +gf_changelog_consume (xlator_t *this, + gf_changelog_journal_t *jnl, + char *from_path, gf_boolean_t no_publish) +{ + int ret = -1; + int fd1 = 0; + int fd2 = 0; + int zerob = 0; + struct stat stbuf = {0,}; + char dest[PATH_MAX] = {0,}; + char to_path[PATH_MAX] = {0,}; + + ret = stat (from_path, &stbuf); + if (ret || !S_ISREG(stbuf.st_mode)) { + ret = -1; + gf_log (this->name, GF_LOG_ERROR, + "stat failed on changelog file: %s", from_path); + goto out; + } + + fd1 = open (from_path, O_RDONLY); + if (fd1 < 0) { + gf_log (this->name, GF_LOG_ERROR, + "cannot open changelog file: %s (reason: %s)", + from_path, strerror (errno)); + goto out; + } + + (void) snprintf (to_path, PATH_MAX, "%s%s", + jnl->jnl_current_dir, basename (from_path)); + (void) snprintf (dest, PATH_MAX, "%s%s", + jnl->jnl_processing_dir, basename (from_path)); + + fd2 = open (to_path, O_CREAT | O_TRUNC | O_RDWR, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (fd2 < 0) { + gf_log (this->name, GF_LOG_ERROR, + "cannot create ascii changelog file %s (reason %s)", + to_path, strerror (errno)); + goto close_fd; + } else { + ret = gf_changelog_decode (this, jnl, fd1, + fd2, &stbuf, &zerob); + + close (fd2); + + if (!ret) { + /* move it to processing on a successful + decode */ + if (no_publish == _gf_true) + goto close_fd; + ret = rename (to_path, dest); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "error moving %s to processing dir" + " (reason: %s)", to_path, + strerror (errno)); + } + + /* remove it from .current if it's an empty file */ + if (zerob) { + /* zerob changelogs must be unlinked */ + ret = unlink (to_path); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "could not unlink %s (reason: %s)", + to_path, strerror (errno)); + } + } + + close_fd: + close (fd1); + + out: + return ret; +} + +void * +gf_changelog_process (void *data) +{ + int ret = 0; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_entry_t *entry = NULL; + gf_changelog_processor_t *jnl_proc = NULL; + + this = THIS; + + jnl = data; + jnl_proc = jnl->jnl_proc; + + while (1) { + pthread_mutex_lock (&jnl_proc->lock); + { + while (list_empty (&jnl_proc->entries)) { + jnl_proc->waiting = _gf_true; + pthread_cond_wait + (&jnl_proc->cond, &jnl_proc->lock); + } + + entry = list_first_entry (&jnl_proc->entries, + gf_changelog_entry_t, list); + list_del (&entry->list); + jnl_proc->waiting = _gf_false; + } + pthread_mutex_unlock (&jnl_proc->lock); + + if (entry) { + ret = gf_changelog_consume (this, jnl, + entry->path, _gf_false); + GF_FREE (entry); + } + } + + return NULL; +} + +inline void +gf_changelog_queue_journal (gf_changelog_processor_t *jnl_proc, + changelog_event_t *event) +{ + size_t len = 0; + gf_changelog_entry_t *entry = NULL; + + entry = GF_CALLOC (1, sizeof (gf_changelog_entry_t), + gf_changelog_mt_libgfchangelog_entry_t); + if (!entry) + return; + INIT_LIST_HEAD (&entry->list); + + len = strlen (event->u.journal.path); + (void)memcpy (entry->path, event->u.journal.path, len+1); + + pthread_mutex_lock (&jnl_proc->lock); + { + list_add_tail (&entry->list, &jnl_proc->entries); + if (jnl_proc->waiting) + pthread_cond_signal (&jnl_proc->cond); + } + pthread_mutex_unlock (&jnl_proc->lock); + + return; +} + +void +gf_changelog_handle_journal (void *xl, char *brick, + void *cbkdata, changelog_event_t *event) +{ + int ret = 0; + gf_changelog_journal_t *jnl = NULL; + gf_changelog_processor_t *jnl_proc = NULL; + + jnl = cbkdata; + jnl_proc = jnl->jnl_proc; + + gf_changelog_queue_journal (jnl_proc, event); +} + +void +gf_changelog_journal_disconnect (void *xl, char *brick, void *data) +{ + gf_changelog_journal_t *jnl = NULL; + + jnl = data; + + pthread_spin_lock (&jnl->lock); + { + JNL_SET_API_STATE (jnl, JNL_API_DISCONNECTED); + }; + pthread_spin_unlock (&jnl->lock); +} + +void +gf_changelog_journal_connect (void *xl, char *brick, void *data) +{ + gf_changelog_journal_t *jnl = NULL; + + jnl = data; + + pthread_spin_lock (&jnl->lock); + { + JNL_SET_API_STATE (jnl, JNL_API_CONNECTED); + }; + pthread_spin_unlock (&jnl->lock); + + return; +} + +void +gf_changelog_cleanup_processor (gf_changelog_journal_t *jnl) +{ + int ret = 0; + xlator_t *this = NULL; + gf_changelog_processor_t *jnl_proc = NULL; + + this = THIS; + if (!this || !jnl || !jnl->jnl_proc) + goto error_return; + + jnl_proc = jnl->jnl_proc; + + ret = gf_thread_cleanup (this, jnl_proc->processor); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to cleanup processor thread"); + goto error_return; + } + + (void)pthread_mutex_destroy (&jnl_proc->lock); + (void)pthread_cond_destroy (&jnl_proc->cond); + + GF_FREE (jnl_proc); + + error_return: + return; +} + +int +gf_changelog_init_processor (gf_changelog_journal_t *jnl) +{ + int ret = -1; + gf_changelog_processor_t *jnl_proc = NULL; + + jnl_proc = GF_CALLOC (1, sizeof (gf_changelog_processor_t), + gf_changelog_mt_libgfchangelog_t); + if (!jnl_proc) + goto error_return; + + ret = pthread_mutex_init (&jnl_proc->lock, NULL); + if (ret != 0) + goto free_jnl_proc; + ret = pthread_cond_init (&jnl_proc->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + + INIT_LIST_HEAD (&jnl_proc->entries); + ret = pthread_create (&jnl_proc->processor, + NULL, gf_changelog_process, jnl); + if (ret != 0) + goto cleanup_cond; + jnl_proc->waiting = _gf_false; + + jnl->jnl_proc = jnl_proc; + return 0; + + cleanup_cond: + (void) pthread_cond_destroy (&jnl_proc->cond); + cleanup_mutex: + (void) pthread_mutex_destroy (&jnl_proc->lock); + free_jnl_proc: + GF_FREE (jnl_proc); + error_return: + return -1; +} + +static void +gf_changelog_cleanup_fds (gf_changelog_journal_t *jnl) +{ + /* tracker fd */ + if (jnl->jnl_fd != -1) + close (jnl->jnl_fd); + /* processing dir */ + if (jnl->jnl_dir) + closedir (jnl->jnl_dir); + + if (jnl->jnl_working_dir) + free (jnl->jnl_working_dir); /* allocated by realpath */ +} + +static int +gf_changelog_open_dirs (xlator_t *this, gf_changelog_journal_t *jnl) +{ + int ret = -1; + DIR *dir = NULL; + int tracker_fd = 0; + char tracker_path[PATH_MAX] = {0,}; + + /* .current */ + (void) snprintf (jnl->jnl_current_dir, PATH_MAX, + "%s/"GF_CHANGELOG_CURRENT_DIR"/", + jnl->jnl_working_dir); + ret = recursive_rmdir (jnl->jnl_current_dir); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Failed to rmdir: %s, err: %s", + jnl->jnl_current_dir, strerror (errno)); + goto out; + } + ret = mkdir_p (jnl->jnl_current_dir, 0600, _gf_false); + if (ret) + goto out; + + /* .processed */ + (void) snprintf (jnl->jnl_processed_dir, PATH_MAX, + "%s/"GF_CHANGELOG_PROCESSED_DIR"/", + jnl->jnl_working_dir); + ret = mkdir_p (jnl->jnl_processed_dir, 0600, _gf_false); + if (ret) + goto out; + + /* .processing */ + (void) snprintf (jnl->jnl_processing_dir, PATH_MAX, + "%s/"GF_CHANGELOG_PROCESSING_DIR"/", + jnl->jnl_working_dir); + ret = recursive_rmdir (jnl->jnl_processing_dir); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Failed to rmdir: %s, err: %s", + jnl->jnl_processing_dir, strerror (errno)); + goto out; + } + + ret = mkdir_p (jnl->jnl_processing_dir, 0600, _gf_false); + if (ret) + goto out; + + dir = opendir (jnl->jnl_processing_dir); + if (!dir) { + gf_log ("", GF_LOG_ERROR, + "opendir() error [reason: %s]", strerror (errno)); + goto out; + } + + jnl->jnl_dir = dir; + + (void) snprintf (tracker_path, PATH_MAX, + "%s/"GF_CHANGELOG_TRACKER, jnl->jnl_working_dir); + + tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (tracker_fd < 0) { + closedir (jnl->jnl_dir); + ret = -1; + goto out; + } + + jnl->jnl_fd = tracker_fd; + ret = 0; + out: + return ret; +} + +int +gf_changelog_init_history (xlator_t *this, + gf_changelog_journal_t *jnl, + char *brick_path, char *scratch_dir) +{ + int i = 0; + int ret = 0; + char hist_scratch_dir[PATH_MAX] = {0,}; + + jnl->hist_jnl = GF_CALLOC (1, sizeof (*jnl), + gf_changelog_mt_libgfchangelog_t); + if (!jnl->hist_jnl) + goto error_return; + + jnl->hist_jnl->jnl_dir = NULL; + jnl->hist_jnl->jnl_fd = -1; + + (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX); + (void) snprintf (hist_scratch_dir, PATH_MAX, + "%s/"GF_CHANGELOG_HISTORY_DIR"/", + jnl->jnl_working_dir); + + ret = mkdir_p (hist_scratch_dir, 0600, _gf_false); + if (ret) + goto dealloc_hist; + + jnl->hist_jnl->jnl_working_dir = realpath (hist_scratch_dir, NULL); + if (!jnl->hist_jnl->jnl_working_dir) + goto dealloc_hist; + + ret = gf_changelog_open_dirs (this, jnl->hist_jnl); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "could not create entries in history scratch dir"); + goto dealloc_hist; + } + + (void) strncpy (jnl->hist_jnl->jnl_brickpath, brick_path, PATH_MAX); + + for (i = 0; i < 256; i++) { + jnl->hist_jnl->rfc3986[i] = + (isalnum(i) || i == '~' || + i == '-' || i == '.' || i == '_') ? i : 0; + } + + return 0; + + dealloc_hist: + GF_FREE (jnl->hist_jnl); + jnl->hist_jnl = NULL; + error_return: + return -1; +} + +void +gf_changelog_journal_fini (void *xl, char *brick, void *data) +{ + int ret = 0; + xlator_t *this = NULL; + gf_changelog_journal_t *jnl = NULL; + + this = xl; + jnl = data; + + gf_changelog_cleanup_processor (jnl); + + gf_changelog_cleanup_fds (jnl); + if (jnl->hist_jnl) + gf_changelog_cleanup_fds (jnl->hist_jnl); + + GF_FREE (jnl); +} + +void * +gf_changelog_journal_init (void *xl, struct gf_brick_spec *brick) +{ + int i = 0; + int ret = 0; + xlator_t *this = NULL; + struct stat buf = {0,}; + char *scratch_dir = NULL; + gf_changelog_journal_t *jnl = NULL; + + this = xl; + scratch_dir = (char *) brick->ptr; + + jnl = GF_CALLOC (1, sizeof (gf_changelog_journal_t), + gf_changelog_mt_libgfchangelog_t); + if (!jnl) + goto error_return; + + if (stat (scratch_dir, &buf) && errno == ENOENT) { + ret = mkdir_p (scratch_dir, 0600, _gf_true); + if (ret) + goto dealloc_private; + } + + jnl->jnl_working_dir = realpath (scratch_dir, NULL); + if (!jnl->jnl_working_dir) + goto dealloc_private; + + ret = gf_changelog_open_dirs (this, jnl); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "could not create entries in scratch dir"); + goto dealloc_private; + } + + (void) strncpy (jnl->jnl_brickpath, brick->brick_path, PATH_MAX); + + /* RFC 3986 {de,en}coding */ + for (i = 0; i < 256; i++) { + jnl->rfc3986[i] = + (isalnum(i) || i == '~' || + i == '-' || i == '.' || i == '_') ? i : 0; + } + + ret = gf_changelog_init_history (this, jnl, + brick->brick_path, scratch_dir); + if (ret) + goto cleanup_fds; + + /* initialize journal processor */ + ret = gf_changelog_init_processor (jnl); + if (ret) + goto cleanup_fds; + + JNL_SET_API_STATE (jnl, JNL_API_CONN_INPROGESS); + ret = pthread_spin_init (&jnl->lock, 0); + if (ret != 0) + goto cleanup_processor; + return jnl; + + cleanup_processor: + gf_changelog_cleanup_processor (jnl); + cleanup_fds: + gf_changelog_cleanup_fds (jnl); + if (jnl->hist_jnl) + gf_changelog_cleanup_fds (jnl->hist_jnl); + dealloc_private: + GF_FREE (jnl); + error_return: + return NULL; +} |