diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog-helpers.c')
-rw-r--r-- | xlators/features/changelog/src/changelog-helpers.c | 211 |
1 files changed, 94 insertions, 117 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c index ad4fe4013..91c43a16c 100644 --- a/xlators/features/changelog/src/changelog-helpers.c +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -21,6 +21,7 @@ #include "changelog-helpers.h" #include "changelog-mem-types.h" +#include "changelog-encoders.h" #include <pthread.h> void @@ -52,45 +53,51 @@ changelog_thread_cleanup (xlator_t *this, pthread_t thr_id) inline void * changelog_get_usable_buffer (changelog_local_t *local) { - changelog_write_data_t *cwd = &local->cld.cld_wdata; + changelog_log_data_t *cld = NULL; - if (!cwd->cwd_iobuf) + if (!local) + return NULL; + + cld = &local->cld; + if (!cld->cld_iobuf) return NULL; - return cwd->cwd_ptr; + return cld->cld_iobuf->ptr; } inline void changelog_set_usable_record_and_length (changelog_local_t *local, size_t len, int xr) { - changelog_write_data_t *cwd = &local->cld.cld_wdata; + changelog_log_data_t *cld = NULL; - cwd->cwd_ptr_len = len; - cwd->cwd_xtra_records = xr; + cld = &local->cld; + + cld->cld_ptr_len = len; + cld->cld_xtra_records = xr; } void changelog_local_cleanup (xlator_t *xl, changelog_local_t *local) { - int i = 0; - changelog_opt_t *co = NULL; - changelog_write_data_t *cwd = NULL; + int i = 0; + changelog_opt_t *co = NULL; + changelog_log_data_t *cld = NULL; if (!local) return; - cwd = &local->cld.cld_wdata; + cld = &local->cld; /* cleanup dynamic allocation for extra records */ - if (cwd->cwd_xtra_records) { - co = (changelog_opt_t *) cwd->cwd_ptr; - for (; i < cwd->cwd_xtra_records; i++, co++) + if (cld->cld_xtra_records) { + co = (changelog_opt_t *) cld->cld_ptr; + for (; i < cld->cld_xtra_records; i++, co++) if (co->co_free) co->co_free (co); } - CHANGELOG_IOBUF_UNREF (cwd->cwd_iobuf); + CHANGELOG_IOBUF_UNREF (cld->cld_iobuf); if (local->inode) inode_unref (local->inode); @@ -118,8 +125,7 @@ changelog_write (int fd, char *buffer, size_t len) static int changelog_rollover_changelog (xlator_t *this, - changelog_priv_t *priv, - changelog_rollover_data_t *crd) + changelog_priv_t *priv, unsigned long ts) { int ret = -1; int notify = 0; @@ -132,22 +138,11 @@ changelog_rollover_changelog (xlator_t *this, priv->changelog_fd = -1; } - /** - * no rolling-over of changelogs, policy implementer choose - * to do the heavy-lifting of having distinct changelog name. - * - * NOTE: This implies libgfchangelog would not be notified - (well, we could, but lets not do that now...) - */ - if (!crd->crd_use_suffix) - return 0; - (void) snprintf (ofile, PATH_MAX, - "%s/%s", priv->changelog_dir, - crd->crd_changelog_oname); - (void) snprintf (nfile, PATH_MAX, "%s/%s.%lu", - priv->changelog_dir, - crd->crd_changelog_name, crd->crd_roll_key); + "%s/"CHANGELOG_FILE_NAME, priv->changelog_dir); + (void) snprintf (nfile, PATH_MAX, + "%s/"CHANGELOG_FILE_NAME".%lu", + priv->changelog_dir, ts); ret = rename (ofile, nfile); if (!ret) @@ -179,8 +174,7 @@ changelog_rollover_changelog (xlator_t *this, int changelog_open (xlator_t *this, - changelog_priv_t *priv, - changelog_local_t *local, changelog_rollover_data_t *crd) + changelog_priv_t *priv) { int fd = 0; int ret = -1; @@ -189,12 +183,12 @@ changelog_open (xlator_t *this, char changelog_path[PATH_MAX] = {0,}; (void) snprintf (changelog_path, PATH_MAX, - "%s/%s", priv->changelog_dir, - crd->crd_changelog_name); + "%s/"CHANGELOG_FILE_NAME, + priv->changelog_dir); flags |= (O_CREAT | O_RDWR); if (priv->fsync_interval == 0) - flags |= O_SYNC; + flags |= O_SYNC; fd = open (changelog_path, flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); @@ -207,25 +201,12 @@ changelog_open (xlator_t *this, } priv->changelog_fd = fd; - CHANGELOG_INVOKE_CFOP (this, priv, reset_offset, local); - - /* preallocate if required */ - if (crd->crd_prealloc_size > 0) { - ret = posix_fallocate (priv->changelog_fd, - 0, crd->crd_prealloc_size); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "failed to preallocate %llu bytes", - (unsigned long long) crd->crd_prealloc_size); - } - } (void) snprintf (buffer, 1024, CHANGELOG_HEADER, CHANGELOG_VERSION_MAJOR, CHANGELOG_VERSION_MINOR, - priv->encode_mode); - ret = changelog_write_change (this, priv, - local, buffer, strlen (buffer)); + priv->ce->encoder); + ret = changelog_write_change (priv, buffer, strlen (buffer)); if (ret) { close (priv->changelog_fd); priv->changelog_fd = -1; @@ -238,19 +219,18 @@ changelog_open (xlator_t *this, return ret; } -static int +int changelog_start_next_change (xlator_t *this, changelog_priv_t *priv, - changelog_local_t *local, - changelog_log_data_t *cld) + unsigned long ts, gf_boolean_t finale) { - int ret = 0; - changelog_rollover_data_t *crd = &cld->cld_roll; + int ret = -1; - ret = changelog_rollover_changelog (this, priv, crd); + ret = changelog_rollover_changelog (this, priv, ts); + + if (!ret && !finale) + ret = changelog_open (this, priv); - if (!ret && !crd->crd_finale) - ret = changelog_open (this, priv, local, crd); return ret; } @@ -264,42 +244,37 @@ changelog_entry_length () } int -changelog_write_change (xlator_t *this, changelog_priv_t *priv, - changelog_local_t *local, char *buffer, size_t len) +changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last) { - int ret = -1; - off_t offset = 0; - ssize_t size = 0; - size_t writen = 0; - - offset = CHANGELOG_INVOKE_CFOP (this, priv, get_offset, local); + struct timeval tv = {0,}; - while (writen < len) { - size = pwrite (priv->changelog_fd, - buffer + writen, len - writen, offset + writen); - if (size <= 0) - break; + cld->cld_type = CHANGELOG_TYPE_ROLLOVER; - writen += size; - } + if (gettimeofday (&tv, NULL)) + return -1; - if (writen == len) { - ret = 0; - CHANGELOG_INVOKE_CFOP (this, priv, set_offset, local, writen); - } + cld->cld_roll_time = (unsigned long) tv.tv_sec; + cld->cld_finale = is_last; + return 0; +} - return ret; +int +changelog_write_change (changelog_priv_t *priv, char *buffer, size_t len) +{ + return changelog_write (priv->changelog_fd, buffer, len); } inline int changelog_handle_change (xlator_t *this, - changelog_priv_t *priv, - changelog_local_t *local, changelog_log_data_t *cld) + changelog_priv_t *priv, changelog_log_data_t *cld) { int ret = 0; if (CHANGELOG_TYPE_IS_ROLLOVER (cld->cld_type)) { - ret = changelog_start_next_change (this, priv, local, cld); + changelog_encode_change(priv); + ret = changelog_start_next_change (this, priv, + cld->cld_roll_time, + cld->cld_finale); if (ret) gf_log (this->name, GF_LOG_ERROR, "Problem rolling over changelog(s)"); @@ -323,7 +298,7 @@ changelog_handle_change (xlator_t *this, goto out; } - ret = priv->ce->encode (this, local, cld); + ret = priv->ce->encode (this, cld); if (ret) { gf_log (this->name, GF_LOG_ERROR, "error writing changelog to disk"); @@ -333,17 +308,6 @@ changelog_handle_change (xlator_t *this, return ret; } -static inline void -changelog_local_init_defaults (changelog_local_t *local, - uuid_t gfid, struct iobuf *iobuf) -{ - changelog_write_data_t *cwd = &(local->cld.cld_wdata); - - uuid_copy (cwd->cwd_gfid, gfid); - cwd->cwd_iobuf = iobuf; - cwd->cwd_xtra_records = 0; /* set by the caller */ -} - changelog_local_t * changelog_local_init (xlator_t *this, inode_t *inode, uuid_t gfid, int xtra_records, @@ -353,7 +317,7 @@ changelog_local_init (xlator_t *this, inode_t *inode, struct iobuf *iobuf = NULL; /** - * Relax the presence of inode if @update_flag is true. + * We relax the presence of inode if @update_flag is true. * The caller (implmentation of the fop) needs to be careful to * not blindly use local->inode. */ @@ -378,7 +342,10 @@ changelog_local_init (xlator_t *this, inode_t *inode, local->update_no_check = update_flag; - (void) changelog_local_init_defaults (local, gfid, iobuf); + uuid_copy (local->cld.cld_gfid, gfid); + + local->cld.cld_iobuf = iobuf; + local->cld.cld_xtra_records = 0; /* set by the caller */ if (inode) local->inode = inode_ref (inode); @@ -406,11 +373,9 @@ changelog_forget (xlator_t *this, inode_t *inode) int changelog_inject_single_event (xlator_t *this, changelog_priv_t *priv, - changelog_local_t *local, changelog_log_data_t *cld) { - return priv->cd.dispatchfn (this, priv, - priv->cd.cd_data, local, cld); + return priv->cd.dispatchfn (this, priv, priv->cd.cd_data, cld, NULL); } /** @@ -421,9 +386,9 @@ void * changelog_rollover (void *data) { int ret = 0; - char *cname = NULL; xlator_t *this = NULL; struct timeval tv = {0,}; + changelog_log_data_t cld = {0,}; changelog_time_slice_t *slice = NULL; changelog_priv_t *priv = data; @@ -438,11 +403,16 @@ changelog_rollover (void *data) if (ret) continue; + ret = changelog_fill_rollover_data (&cld, _gf_false); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "failed to fill rollover data"); + continue; + } + LOCK (&priv->lock); { - cname = CHANGELOG_FNAME_FROM_POLICY (priv->cp); - ret = CHANGELOG_INVOKE_CFOP (this, priv, rollover, - cname, _gf_false); + ret = changelog_inject_single_event (this, priv, &cld); if (!ret) SLICE_VERSION_UPDATE (slice); } @@ -458,9 +428,11 @@ changelog_fsync_thread (void *data) int ret = 0; xlator_t *this = NULL; struct timeval tv = {0,}; + changelog_log_data_t cld = {0,}; changelog_priv_t *priv = data; this = priv->cf.this; + cld.cld_type = CHANGELOG_TYPE_FSYNC; while (1) { tv.tv_sec = priv->fsync_interval; @@ -470,7 +442,7 @@ changelog_fsync_thread (void *data) if (ret) continue; - ret = CHANGELOG_INVOKE_CFOP (this, priv, sync); + ret = changelog_inject_single_event (this, priv, &cld); if (ret) gf_log (this->name, GF_LOG_ERROR, "failed to inject fsync event"); @@ -668,19 +640,19 @@ changelog_inode_ctx_get (xlator_t *this, * signifies an update was recorded in the current time slice). */ inline void -changelog_update (xlator_t *this, - changelog_priv_t *priv, - changelog_local_t *local, - changelog_log_type type) +changelog_update (xlator_t *this, changelog_priv_t *priv, + changelog_local_t *local, changelog_log_type type) { - int ret = 0; - unsigned long *iver = NULL; - unsigned long version = 0; - inode_t *inode = NULL; - changelog_time_slice_t *slice = NULL; - changelog_inode_ctx_t *ctx = NULL; - changelog_log_data_t *cld_0 = NULL; - gf_boolean_t need_upd = _gf_true; + int ret = 0; + unsigned long *iver = NULL; + unsigned long version = 0; + inode_t *inode = NULL; + changelog_time_slice_t *slice = NULL; + changelog_inode_ctx_t *ctx = NULL; + changelog_log_data_t *cld_0 = NULL; + changelog_log_data_t *cld_1 = NULL; + changelog_local_t *next_local = NULL; + gf_boolean_t need_upd = _gf_true; slice = &priv->slice; @@ -704,8 +676,13 @@ changelog_update (xlator_t *this, cld_0 = &local->cld; cld_0->cld_type = type; + if ( (next_local = local->prev_entry) != NULL ) { + cld_1 = &next_local->cld; + cld_1->cld_type = type; + } + ret = priv->cd.dispatchfn (this, priv, - priv->cd.cd_data, local, cld_0); + priv->cd.cd_data, cld_0, cld_1); /** * update after the dispatcher has successfully done |