diff options
Diffstat (limited to 'xlators/features/changelog/src')
| -rw-r--r-- | xlators/features/changelog/src/changelog-barrier.c | 17 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-encoders.h | 4 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-ev-handle.c | 23 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-ev-handle.h | 8 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-helpers.c | 316 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-helpers.h | 65 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-mem-types.h | 2 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-messages.h | 125 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-misc.h | 4 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-rpc-common.c | 73 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-rpc-common.h | 4 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-rpc.c | 185 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-rpc.h | 2 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-rt.c | 6 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-rt.h | 4 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog.c | 373 |
16 files changed, 745 insertions, 466 deletions
diff --git a/xlators/features/changelog/src/changelog-barrier.c b/xlators/features/changelog/src/changelog-barrier.c index e8d742404df..0fb89ddb127 100644 --- a/xlators/features/changelog/src/changelog-barrier.c +++ b/xlators/features/changelog/src/changelog-barrier.c @@ -10,7 +10,7 @@ #include "changelog-helpers.h" #include "changelog-messages.h" -#include "call-stub.h" +#include <glusterfs/call-stub.h> /* Enqueue a stub*/ void @@ -53,14 +53,14 @@ chlog_barrier_dequeue_all(xlator_t *this, struct list_head *queue) { call_stub_t *stub = NULL; - gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_INFO, - "Dequeuing all the changelog barriered fops"); + gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_DEQUEUING_BARRIER_FOPS, + NULL); while ((stub = __chlog_barrier_dequeue(this, queue))) call_resume(stub); - gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_INFO, - "Dequeuing changelog barriered fops is finished"); + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_MSG_DEQUEUING_BARRIER_FOPS_FINISHED, NULL); return; } @@ -80,8 +80,7 @@ chlog_barrier_timeout(void *data) INIT_LIST_HEAD(&queue); - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_BARRIER_ERROR, - "Disabling changelog barrier because of the timeout."); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_BARRIER_TIMEOUT, NULL); LOCK(&priv->lock); { @@ -120,8 +119,8 @@ __chlog_barrier_enable(xlator_t *this, changelog_priv_t *priv) priv->timer = gf_timer_call_after(this->ctx, priv->timeout, chlog_barrier_timeout, (void *)this); if (!priv->timer) { - gf_msg(this->name, GF_LOG_CRITICAL, 0, CHANGELOG_MSG_BARRIER_ERROR, - "Couldn't add changelog barrier timeout event."); + gf_smsg(this->name, GF_LOG_CRITICAL, 0, + CHANGELOG_MSG_TIMEOUT_ADD_FAILED, NULL); goto out; } diff --git a/xlators/features/changelog/src/changelog-encoders.h b/xlators/features/changelog/src/changelog-encoders.h index ca42c4c4fe0..26252696d56 100644 --- a/xlators/features/changelog/src/changelog-encoders.h +++ b/xlators/features/changelog/src/changelog-encoders.h @@ -11,8 +11,8 @@ #ifndef _CHANGELOG_ENCODERS_H #define _CHANGELOG_ENCODERS_H -#include "xlator.h" -#include "defaults.h" +#include <glusterfs/xlator.h> +#include <glusterfs/defaults.h> #include "changelog-helpers.h" diff --git a/xlators/features/changelog/src/changelog-ev-handle.c b/xlators/features/changelog/src/changelog-ev-handle.c index 3ed6ff821d9..aa94459de5a 100644 --- a/xlators/features/changelog/src/changelog-ev-handle.c +++ b/xlators/features/changelog/src/changelog-ev-handle.c @@ -134,6 +134,8 @@ changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event, changelog_clnt_t *c_clnt = NULL; changelog_priv_t *priv = NULL; changelog_ev_selector_t *selection = NULL; + uint64_t clntcnt = 0; + uint64_t xprtcnt = 0; crpc = mydata; this = crpc->this; @@ -144,6 +146,7 @@ changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event, switch (event) { case RPC_CLNT_CONNECT: selection = &priv->ev_selection; + GF_ATOMIC_INC(priv->clntcnt); LOCK(&c_clnt->wait_lock); { @@ -176,12 +179,23 @@ changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event, changelog_set_disconnect_flag(crpc, _gf_true); } UNLOCK(&crpc->lock); + LOCK(&c_clnt->active_lock); + { + list_del_init(&crpc->list); + } + UNLOCK(&c_clnt->active_lock); break; case RPC_CLNT_MSG: case RPC_CLNT_DESTROY: /* Free up mydata */ changelog_rpc_clnt_unref(crpc); + clntcnt = GF_ATOMIC_DEC(priv->clntcnt); + xprtcnt = GF_ATOMIC_GET(priv->xprtcnt); + if (this->cleanup_starting) { + if (!clntcnt && !xprtcnt) + changelog_process_cleanup_event(this); + } break; case RPC_CLNT_PING: break; @@ -211,8 +225,8 @@ changelog_ev_connector(void *data) changelog_rpc_notify); if (!crpc->rpc) { gf_smsg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_RPC_CONNECT_ERROR, - "failed to connect back", "path=%s", crpc->sock, NULL); + CHANGELOG_MSG_RPC_CONNECT_ERROR, "path=%s", crpc->sock, + NULL); crpc->cleanup(crpc); goto mutex_unlock; } @@ -364,9 +378,8 @@ changelog_ev_dispatch(void *data) ret = rbuf_wait_for_completion(c_clnt->rbuf, opaque, _dispatcher, c_clnt); if (ret) - gf_msg(this->name, GF_LOG_WARNING, 0, - CHANGELOG_MSG_PUT_BUFFER_FAILED, - "failed to put buffer after consumption"); + gf_smsg(this->name, GF_LOG_WARNING, 0, + CHANGELOG_MSG_PUT_BUFFER_FAILED, NULL); } return NULL; diff --git a/xlators/features/changelog/src/changelog-ev-handle.h b/xlators/features/changelog/src/changelog-ev-handle.h index 7e543a0edb3..cc1af58a276 100644 --- a/xlators/features/changelog/src/changelog-ev-handle.h +++ b/xlators/features/changelog/src/changelog-ev-handle.h @@ -11,11 +11,11 @@ #ifndef __CHANGELOG_EV_HANDLE_H #define __CHANGELOG_EV_HANDLE_H -#include "list.h" -#include "xlator.h" +#include <glusterfs/list.h> +#include <glusterfs/xlator.h> #include "rpc-clnt.h" -#include "rot-buffs.h" +#include <glusterfs/rot-buffs.h> struct changelog_clnt; @@ -131,4 +131,6 @@ changelog_ev_queue_connection(changelog_clnt_t *, changelog_rpc_clnt_t *); void changelog_ev_cleanup_connections(xlator_t *, changelog_clnt_t *); +void +changelog_process_cleanup_event(xlator_t *); #endif diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c index 9ff9115c40d..e561997d858 100644 --- a/xlators/features/changelog/src/changelog-helpers.c +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -8,11 +8,11 @@ cases as published by the Free Software Foundation. */ -#include "xlator.h" -#include "defaults.h" -#include "logging.h" -#include "iobuf.h" -#include "syscall.h" +#include <glusterfs/xlator.h> +#include <glusterfs/defaults.h> +#include <glusterfs/logging.h> +#include <glusterfs/iobuf.h> +#include <glusterfs/syscall.h> #include "changelog-helpers.h" #include "changelog-encoders.h" @@ -22,6 +22,7 @@ #include "changelog-encoders.h" #include "changelog-rpc-common.h" #include <pthread.h> +#include <time.h> static void changelog_cleanup_free_mutex(void *arg_mutex) @@ -41,16 +42,15 @@ changelog_thread_cleanup(xlator_t *this, pthread_t thr_id) /* send a cancel request to the thread */ ret = pthread_cancel(thr_id); if (ret != 0) { - gf_msg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_PTHREAD_CANCEL_FAILED, "could not cancel thread"); + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_MSG_PTHREAD_CANCEL_FAILED, NULL); goto out; } ret = pthread_join(thr_id, &retval); if ((ret != 0) || (retval != PTHREAD_CANCELED)) { - gf_msg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_PTHREAD_CANCEL_FAILED, - "cancel request not adhered as expected"); + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_MSG_PTHREAD_CANCEL_FAILED, NULL); } out: @@ -153,27 +153,6 @@ changelog_init_event_selection(xlator_t *this, return 0; } -int -changelog_cleanup_event_selection(xlator_t *this, - changelog_ev_selector_t *selection) -{ - int j = CHANGELOG_EV_SELECTION_RANGE; - - LOCK(&selection->reflock); - { - while (j--) { - if (selection->ref[j] > 0) - gf_msg(this->name, GF_LOG_WARNING, 0, - CHANGELOG_MSG_CLEANUP_ON_ACTIVE_REF, - "changelog event selection cleaning up " - " on active references"); - } - } - UNLOCK(&selection->reflock); - - return LOCK_DESTROY(&selection->reflock); -} - static void changelog_perform_dispatch(xlator_t *this, changelog_priv_t *priv, void *mem, size_t size) @@ -263,8 +242,7 @@ changelog_write(int fd, char *buffer, size_t len) } int -htime_update(xlator_t *this, changelog_priv_t *priv, unsigned long ts, - char *buffer) +htime_update(xlator_t *this, changelog_priv_t *priv, time_t ts, char *buffer) { char changelog_path[PATH_MAX + 1] = { 0, @@ -277,8 +255,8 @@ htime_update(xlator_t *this, changelog_priv_t *priv, unsigned long ts, int ret = 0; if (priv->htime_fd == -1) { - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_HTIME_ERROR, - "Htime fd not available for updation"); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_HTIME_ERROR, + "reason=fd not available", NULL); ret = -1; goto out; } @@ -288,13 +266,13 @@ htime_update(xlator_t *this, changelog_priv_t *priv, unsigned long ts, goto out; } if (changelog_write(priv->htime_fd, (void *)changelog_path, len + 1) < 0) { - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_HTIME_ERROR, - "Htime file content write failed"); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_HTIME_ERROR, + "reason=write failed", NULL); ret = -1; goto out; } - len = snprintf(x_value, sizeof(x_value), "%lu:%d", ts, + len = snprintf(x_value, sizeof(x_value), "%ld:%d", ts, priv->rollover_count); if (len >= sizeof(x_value)) { ret = -1; @@ -303,12 +281,12 @@ htime_update(xlator_t *this, changelog_priv_t *priv, unsigned long ts, if (sys_fsetxattr(priv->htime_fd, HTIME_KEY, x_value, len, XATTR_REPLACE)) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_HTIME_ERROR, - "Htime xattr updation failed with XATTR_REPLACE", + "reason=xattr updation failed", "XATTR_REPLACE=true", "changelog=%s", changelog_path, NULL); if (sys_fsetxattr(priv->htime_fd, HTIME_KEY, x_value, len, 0)) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_HTIME_ERROR, - "Htime xattr updation failed", "changelog=%s", + "reason=xattr updation failed", "changelog=%s", changelog_path, NULL); ret = -1; goto out; @@ -346,15 +324,15 @@ cl_is_empty(xlator_t *this, int fd) ret = sys_fstat(fd, &stbuf); if (ret) { - gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FSTAT_OP_FAILED, - "Could not stat (CHANGELOG)"); + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FSTAT_OP_FAILED, + NULL); goto out; } ret = sys_lseek(fd, 0, SEEK_SET); if (ret == -1) { - gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_LSEEK_OP_FAILED, - "Could not lseek (CHANGELOG)"); + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_LSEEK_OP_FAILED, + NULL); goto out; } @@ -390,8 +368,8 @@ update_path(xlator_t *this, char *cl_path) found = strstr(cl_path, up_cl); if (found == NULL) { - gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_LSEEK_OP_FAILED, - "Could not find CHANGELOG in changelog path"); + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PATH_NOT_FOUND, + NULL); goto out; } else { memcpy(found, low_cl, sizeof(low_cl) - 1); @@ -403,18 +381,22 @@ out: } static int -changelog_rollover_changelog(xlator_t *this, changelog_priv_t *priv, - unsigned long ts) +changelog_rollover_changelog(xlator_t *this, changelog_priv_t *priv, time_t ts) { int ret = -1; int notify = 0; int cl_empty_flag = 0; + struct tm *gmt; + char yyyymmdd[40]; char ofile[PATH_MAX] = { 0, }; char nfile[PATH_MAX] = { 0, }; + char nfile_dir[PATH_MAX] = { + 0, + }; changelog_event_t ev = { 0, }; @@ -422,33 +404,37 @@ changelog_rollover_changelog(xlator_t *this, changelog_priv_t *priv, if (priv->changelog_fd != -1) { ret = sys_fsync(priv->changelog_fd); if (ret < 0) { - gf_msg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_FSYNC_OP_FAILED, "fsync failed"); + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_MSG_FSYNC_OP_FAILED, NULL); } ret = cl_is_empty(this, priv->changelog_fd); if (ret == 1) { cl_empty_flag = 1; } else if (ret == -1) { /* Log error but proceed as usual */ - gf_msg(this->name, GF_LOG_WARNING, 0, - CHANGELOG_MSG_DETECT_EMPTY_CHANGELOG_FAILED, - "Error detecting empty changelog"); + gf_smsg(this->name, GF_LOG_WARNING, 0, + CHANGELOG_MSG_DETECT_EMPTY_CHANGELOG_FAILED, NULL); } sys_close(priv->changelog_fd); priv->changelog_fd = -1; } + /* Get GMT time. */ + gmt = gmtime(&ts); + + strftime(yyyymmdd, sizeof(yyyymmdd), "%Y/%m/%d", gmt); + (void)snprintf(ofile, PATH_MAX, "%s/" CHANGELOG_FILE_NAME, priv->changelog_dir); - (void)snprintf(nfile, PATH_MAX, "%s/" CHANGELOG_FILE_NAME ".%lu", - priv->changelog_dir, ts); + (void)snprintf(nfile, PATH_MAX, "%s/%s/" CHANGELOG_FILE_NAME ".%ld", + priv->changelog_dir, yyyymmdd, ts); + (void)snprintf(nfile_dir, PATH_MAX, "%s/%s", priv->changelog_dir, yyyymmdd); if (cl_empty_flag == 1) { ret = sys_unlink(ofile); if (ret) { gf_smsg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_UNLINK_OP_FAILED, - "error unlinking empty changelog", "path=%s", ofile, NULL); + CHANGELOG_MSG_UNLINK_OP_FAILED, "path=%s", ofile, NULL); ret = 0; /* Error in unlinking empty changelog should not break further changelog operation, so reset return value to 0*/ @@ -456,13 +442,26 @@ changelog_rollover_changelog(xlator_t *this, changelog_priv_t *priv, } else { ret = sys_rename(ofile, nfile); + /* Changelog file rename gets ENOENT when parent dir doesn't exist */ + if (errno == ENOENT) { + ret = mkdir_p(nfile_dir, 0600, _gf_true); + + if ((ret == -1) && (EEXIST != errno)) { + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_MSG_MKDIR_ERROR, "%s", nfile_dir, NULL); + goto out; + } + + ret = sys_rename(ofile, nfile); + } + if (ret && (errno == ENOENT)) { ret = 0; goto out; } if (ret) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_RENAME_ERROR, - "error renaming", "from=%s", ofile, "to=%s", nfile, NULL); + "from=%s", ofile, "to=%s", nfile, NULL); } } @@ -476,8 +475,8 @@ changelog_rollover_changelog(xlator_t *this, changelog_priv_t *priv, } ret = htime_update(this, priv, ts, nfile); if (ret == -1) { - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_HTIME_ERROR, - "could not update htime file"); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_HTIME_ERROR, + NULL); goto out; } } @@ -501,15 +500,10 @@ out: { if (ret) { priv->bn.bnotify_error = _gf_true; - gf_msg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_EXPLICIT_ROLLOVER_FAILED, - "Fail snapshot because of " - "previous errors"); + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_EXPLICIT_ROLLOVER_FAILED, NULL); } else { gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BNOTIFY_INFO, - "Explicit " - "rollover changelog signaling " - "bnotify", "changelog=%s", nfile, NULL); } priv->bn.bnotify = _gf_false; @@ -556,8 +550,8 @@ find_current_htime(int ht_dir_fd, const char *ht_dir_path, char *ht_file_bname) cnt = scandir(ht_dir_path, &namelist, filter_cur_par_dirs, alphasort); if (cnt < 0) { - gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_SCAN_DIR_FAILED, - "scandir failed"); + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_SCAN_DIR_FAILED, + NULL); } else if (cnt > 0) { if (snprintf(ht_file_bname, NAME_MAX, "%s", namelist[cnt - 1]->d_name) >= NAME_MAX) { @@ -566,16 +560,15 @@ find_current_htime(int ht_dir_fd, const char *ht_dir_path, char *ht_file_bname) } if (sys_fsetxattr(ht_dir_fd, HTIME_CURRENT, ht_file_bname, strlen(ht_file_bname), 0)) { - gf_msg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_FSETXATTR_FAILED, - "fsetxattr failed: HTIME_CURRENT"); + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_MSG_FSETXATTR_FAILED, "HTIME_CURRENT", NULL); ret = -1; goto out; } if (sys_fsync(ht_dir_fd) < 0) { - gf_msg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_FSYNC_OP_FAILED, "fsync failed"); + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_MSG_FSYNC_OP_FAILED, NULL); ret = -1; goto out; } @@ -596,7 +589,7 @@ out: * returns -1 on failure or error */ int -htime_open(xlator_t *this, changelog_priv_t *priv, unsigned long ts) +htime_open(xlator_t *this, changelog_priv_t *priv, time_t ts) { int ht_file_fd = -1; int ht_dir_fd = -1; @@ -632,7 +625,7 @@ htime_open(xlator_t *this, changelog_priv_t *priv, unsigned long ts) ht_dir_fd = open(ht_dir_path, O_RDONLY); if (ht_dir_fd == -1) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_OPEN_FAILED, - "open failed", "path=%s", ht_dir_path, NULL); + "path=%s", ht_dir_path, NULL); ret = -1; goto out; } @@ -640,9 +633,8 @@ htime_open(xlator_t *this, changelog_priv_t *priv, unsigned long ts) size = sys_fgetxattr(ht_dir_fd, HTIME_CURRENT, ht_file_bname, sizeof(ht_file_bname)); if (size < 0) { - gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FGETXATTR_FAILED, - "Error extracting" - " HTIME_CURRENT."); + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FGETXATTR_FAILED, + "name=HTIME_CURRENT", NULL); /* If upgrade scenario, find the latest HTIME.TSTAMP file * and use the same. If error, create a new HTIME.TSTAMP @@ -650,20 +642,18 @@ htime_open(xlator_t *this, changelog_priv_t *priv, unsigned long ts) */ cnt = find_current_htime(ht_dir_fd, ht_dir_path, ht_file_bname); if (cnt <= 0) { - gf_msg(this->name, GF_LOG_INFO, errno, CHANGELOG_MSG_HTIME_INFO, - "HTIME_CURRENT not found. Changelog enabled" - " before init"); + gf_smsg(this->name, GF_LOG_INFO, errno, + CHANGELOG_MSG_NO_HTIME_CURRENT, NULL); sys_close(ht_dir_fd); return htime_create(this, priv, ts); } - gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_HTIME_ERROR, - "Error extracting" - " HTIME_CURRENT."); + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_MSG_HTIME_CURRENT_ERROR, NULL); } - gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_HTIME_INFO, - "HTIME_CURRENT", "path=%s", ht_file_bname, NULL); + gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_HTIME_CURRENT, "path=%s", + ht_file_bname, NULL); len = snprintf(ht_file_path, PATH_MAX, "%s/%s", ht_dir_path, ht_file_bname); if ((len < 0) || (len >= PATH_MAX)) { ret = -1; @@ -676,7 +666,7 @@ htime_open(xlator_t *this, changelog_priv_t *priv, unsigned long ts) S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); if (ht_file_fd < 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_OPEN_FAILED, - "unable to open htime file", "path=%s", ht_file_path, NULL); + "path=%s", ht_file_path, NULL); ret = -1; goto out; } @@ -686,8 +676,8 @@ htime_open(xlator_t *this, changelog_priv_t *priv, unsigned long ts) ret = sys_fstat(ht_file_fd, &stat_buf); if (ret < 0) { - gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_HTIME_ERROR, - "unable to stat htime file", "path=%s", ht_file_path, NULL); + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_HTIME_STAT_ERROR, + "path=%s", ht_file_path, NULL); ret = -1; goto out; } @@ -696,9 +686,7 @@ htime_open(xlator_t *this, changelog_priv_t *priv, unsigned long ts) size = sys_fgetxattr(ht_file_fd, HTIME_KEY, x_value, sizeof(x_value)); if (size < 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FGETXATTR_FAILED, - "error extracting max" - " timstamp from htime file", - "path=%s", ht_file_path, NULL); + "name=%s", HTIME_KEY, "path=%s", ht_file_path, NULL); ret = -1; goto out; } @@ -710,14 +698,11 @@ htime_open(xlator_t *this, changelog_priv_t *priv, unsigned long ts) total1 = stat_buf.st_size / record_len; if (total != total1) { gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_TOTAL_LOG_INFO, - "Mismatch of changelog count. " - "INIT CASE", "xattr_total=%lu", total, "size_total=%lu", total1, NULL); } - gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_TOTAL_LOG_INFO, - "INIT CASE", "min=%lu", min_ts, "max=%lu", max_ts, - "total_changelogs=%lu", total, NULL); + gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_TOTAL_LOG_INFO, "min=%lu", + min_ts, "max=%lu", max_ts, "total_changelogs=%lu", total, NULL); if (total < total1) priv->rollover_count = total1 + 1; @@ -734,7 +719,7 @@ out: * returns -1 on failure or error */ int -htime_create(xlator_t *this, changelog_priv_t *priv, unsigned long ts) +htime_create(xlator_t *this, changelog_priv_t *priv, time_t ts) { int ht_file_fd = -1; int ht_dir_fd = -1; @@ -751,15 +736,13 @@ htime_create(xlator_t *this, changelog_priv_t *priv, unsigned long ts) int flags = 0; int32_t len = 0; - gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_HTIME_INFO, - "Changelog enable: Creating new " - "HTIME file", - "name=%lu", ts, NULL); + gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_NEW_HTIME_FILE, + "name=%ld", ts, NULL); CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, ht_dir_path); /* get the htime file name in ht_file_path */ - len = snprintf(ht_file_path, PATH_MAX, "%s/%s.%lu", ht_dir_path, + len = snprintf(ht_file_path, PATH_MAX, "%s/%s.%ld", ht_dir_path, HTIME_FILE_NAME, ts); if ((len < 0) || (len >= PATH_MAX)) { ret = -1; @@ -771,23 +754,23 @@ htime_create(xlator_t *this, changelog_priv_t *priv, unsigned long ts) S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); if (ht_file_fd < 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_OPEN_FAILED, - "unable to create htime file", "path=%s", ht_file_path, NULL); + "path=%s", ht_file_path, NULL); ret = -1; goto out; } if (sys_fsetxattr(ht_file_fd, HTIME_KEY, HTIME_INITIAL_VALUE, sizeof(HTIME_INITIAL_VALUE) - 1, 0)) { - gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FSETXATTR_FAILED, - "Htime xattr initialization failed"); + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_MSG_XATTR_INIT_FAILED, NULL); ret = -1; goto out; } ret = sys_fsync(ht_file_fd); if (ret < 0) { - gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FSYNC_OP_FAILED, - "fsync failed"); + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FSYNC_OP_FAILED, + NULL); goto out; } @@ -800,26 +783,25 @@ htime_create(xlator_t *this, changelog_priv_t *priv, unsigned long ts) ht_dir_fd = open(ht_dir_path, O_RDONLY); if (ht_dir_fd == -1) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_OPEN_FAILED, - "open failed", "path=%s", ht_dir_path, NULL); + "path=%s", ht_dir_path, NULL); ret = -1; goto out; } - (void)snprintf(ht_file_bname, sizeof(ht_file_bname), "%s.%lu", + (void)snprintf(ht_file_bname, sizeof(ht_file_bname), "%s.%ld", HTIME_FILE_NAME, ts); if (sys_fsetxattr(ht_dir_fd, HTIME_CURRENT, ht_file_bname, strlen(ht_file_bname), 0)) { - gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FSETXATTR_FAILED, - "fsetxattr failed:" - " HTIME_CURRENT"); + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FSETXATTR_FAILED, + " HTIME_CURRENT", NULL); ret = -1; goto out; } ret = sys_fsync(ht_dir_fd); if (ret < 0) { - gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FSYNC_OP_FAILED, - "fsync failed"); + gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_FSYNC_OP_FAILED, + NULL); goto out; } @@ -873,7 +855,7 @@ changelog_snap_open(xlator_t *this, changelog_priv_t *priv) fd = open(c_snap_path, flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); if (fd < 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_OPEN_FAILED, - "unable to open file", "path=%s", c_snap_path, NULL); + "path=%s", c_snap_path, NULL); ret = -1; goto out; } @@ -905,8 +887,8 @@ changelog_snap_logging_start(xlator_t *this, changelog_priv_t *priv) int ret = 0; ret = changelog_snap_open(this, priv); - gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_SNAP_INFO, - "Now starting to log in call path"); + gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_SNAP_INFO, "starting", + NULL); return ret; } @@ -926,8 +908,8 @@ changelog_snap_logging_stop(xlator_t *this, changelog_priv_t *priv) sys_close(priv->c_snap_fd); priv->c_snap_fd = -1; - gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_SNAP_INFO, - "Stopped to log in call path"); + gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_SNAP_INFO, "Stopped", + NULL); return ret; } @@ -955,9 +937,6 @@ changelog_open_journal(xlator_t *this, changelog_priv_t *priv) fd = open(changelog_path, flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); if (fd < 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_OPEN_FAILED, - "unable to open/create changelog file." - " change-logging will be" - " inactive", "path=%s", changelog_path, NULL); goto out; } @@ -980,8 +959,8 @@ out: } int -changelog_start_next_change(xlator_t *this, changelog_priv_t *priv, - unsigned long ts, gf_boolean_t finale) +changelog_start_next_change(xlator_t *this, changelog_priv_t *priv, time_t ts, + gf_boolean_t finale) { int ret = -1; @@ -1002,21 +981,12 @@ changelog_entry_length() return sizeof(changelog_log_data_t); } -int +void changelog_fill_rollover_data(changelog_log_data_t *cld, gf_boolean_t is_last) { - struct timeval tv = { - 0, - }; - cld->cld_type = CHANGELOG_TYPE_ROLLOVER; - - if (gettimeofday(&tv, NULL)) - return -1; - - cld->cld_roll_time = (unsigned long)tv.tv_sec; + cld->cld_roll_time = gf_time(); cld->cld_finale = is_last; - return 0; } int @@ -1074,11 +1044,10 @@ changelog_snap_handle_ascii_change(xlator_t *this, changelog_log_data_t *cld) ret = changelog_snap_write_change(priv, buffer, off); if (ret < 0) { - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_WRITE_FAILED, - "error writing csnap to disk"); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_WRITE_FAILED, + "csnap", NULL); } - gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_SNAP_INFO, - "Successfully wrote to csnap"); + gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_WROTE_TO_CSNAP, NULL); ret = 0; out: return ret; @@ -1095,9 +1064,8 @@ changelog_handle_change(xlator_t *this, changelog_priv_t *priv, ret = changelog_start_next_change(this, priv, cld->cld_roll_time, cld->cld_finale); if (ret) - gf_msg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_GET_TIME_OP_FAILED, - "Problem rolling over changelog(s)"); + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_GET_TIME_OP_FAILED, NULL); goto out; } @@ -1111,16 +1079,16 @@ changelog_handle_change(xlator_t *this, changelog_priv_t *priv, if (CHANGELOG_TYPE_IS_FSYNC(cld->cld_type)) { ret = sys_fsync(priv->changelog_fd); if (ret < 0) { - gf_msg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_FSYNC_OP_FAILED, "fsync failed"); + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_MSG_FSYNC_OP_FAILED, NULL); } goto out; } ret = priv->ce->encode(this, cld); if (ret) { - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_WRITE_FAILED, - "error writing changelog to disk"); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_WRITE_FAILED, + "changelog", NULL); } out: @@ -1143,6 +1111,7 @@ changelog_local_init(xlator_t *this, inode_t *inode, uuid_t gfid, gf_msg_callingfn(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_INODE_NOT_FOUND, "inode needed for version checking !!!"); + goto out; } @@ -1211,7 +1180,7 @@ changelog_drain_black_fops(xlator_t *this, changelog_priv_t *priv) ret = pthread_mutex_lock(&priv->dm.drain_black_mutex); if (ret) gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_ERROR, - "pthread error", "error=%d", ret, NULL); + "error=%d", ret, NULL); while (priv->dm.black_fop_cnt > 0) { gf_msg_debug(this->name, 0, "Conditional wait on black fops: %ld", priv->dm.black_fop_cnt); @@ -1220,14 +1189,14 @@ changelog_drain_black_fops(xlator_t *this, changelog_priv_t *priv) &priv->dm.drain_black_mutex); if (ret) gf_smsg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_PTHREAD_COND_WAIT_FAILED, - "pthread cond wait failed", "error=%d", ret, NULL); + CHANGELOG_MSG_PTHREAD_COND_WAIT_FAILED, "error=%d", ret, + NULL); } priv->dm.drain_wait_black = _gf_false; ret = pthread_mutex_unlock(&priv->dm.drain_black_mutex); if (ret) gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_ERROR, - "pthread error", "error=%d", ret, NULL); + "error=%d", ret, NULL); pthread_cleanup_pop(0); gf_msg_debug(this->name, 0, "Woke up: Conditional wait on black fops"); } @@ -1247,7 +1216,7 @@ changelog_drain_white_fops(xlator_t *this, changelog_priv_t *priv) ret = pthread_mutex_lock(&priv->dm.drain_white_mutex); if (ret) gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_ERROR, - "pthread error", "error=%d", ret, NULL); + "error=%d", ret, NULL); while (priv->dm.white_fop_cnt > 0) { gf_msg_debug(this->name, 0, "Conditional wait on white fops : %ld", priv->dm.white_fop_cnt); @@ -1256,14 +1225,14 @@ changelog_drain_white_fops(xlator_t *this, changelog_priv_t *priv) &priv->dm.drain_white_mutex); if (ret) gf_smsg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_PTHREAD_COND_WAIT_FAILED, - "pthread cond wait failed", "error=%d", ret, NULL); + CHANGELOG_MSG_PTHREAD_COND_WAIT_FAILED, "error=%d", ret, + NULL); } priv->dm.drain_wait_white = _gf_false; ret = pthread_mutex_unlock(&priv->dm.drain_white_mutex); if (ret) gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_ERROR, - "pthread error", "error=%d", ret, NULL); + "error=%d", ret, NULL); pthread_cleanup_pop(0); gf_msg_debug(this->name, 0, "Woke up: Conditional wait on white fops"); } @@ -1292,7 +1261,7 @@ changelog_rollover(void *data) while (1) { (void)pthread_testcancel(); - tv.tv_sec = time(NULL) + priv->rollover_time; + tv.tv_sec = gf_time() + priv->rollover_time; tv.tv_nsec = 0; ret = 0; /* Reset ret to zero */ @@ -1315,12 +1284,12 @@ changelog_rollover(void *data) pthread_cleanup_pop(0); if (ret == 0) { - gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_INFO, - "Explicit wakeup on barrier notify"); + gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_INFO, + NULL); priv->explicit_rollover = _gf_true; } else if (ret && ret != ETIMEDOUT) { - gf_msg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_SELECT_FAILED, - "pthread_cond_timedwait failed"); + gf_smsg(this->name, GF_LOG_ERROR, errno, + CHANGELOG_MSG_SELECT_FAILED, NULL); continue; } else if (ret && ret == ETIMEDOUT) { gf_msg_debug(this->name, 0, "Wokeup on timeout"); @@ -1373,13 +1342,7 @@ changelog_rollover(void *data) if (priv->explicit_rollover == _gf_true) sleep(1); - ret = changelog_fill_rollover_data(&cld, _gf_false); - if (ret) { - gf_msg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_GET_TIME_OP_FAILED, - "failed to fill rollover data"); - continue; - } + changelog_fill_rollover_data(&cld, _gf_false); _mask_cancellation(); @@ -1427,9 +1390,8 @@ changelog_fsync_thread(void *data) ret = changelog_inject_single_event(this, priv, &cld); if (ret) - gf_msg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_INJECT_FSYNC_FAILED, - "failed to inject fsync event"); + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_INJECT_FSYNC_FAILED, NULL); _unmask_cancellation(); } @@ -1851,23 +1813,21 @@ changelog_fill_entry_buf(call_frame_t *frame, xlator_t *this, loc_t *loc, parent = inode_parent(loc->inode, 0, 0); if (!parent) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_INODE_NOT_FOUND, - "Parent inode not found", "gfid=%s", - uuid_utoa(loc->inode->gfid), NULL); + "type=parent", "gfid=%s", uuid_utoa(loc->inode->gfid), NULL); goto err; } CHANGELOG_INIT_NOCHECK(this, *local, loc->inode, loc->inode->gfid, 5); if (!(*local)) { - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_LOCAL_INIT_FAILED, - "changelog local" - " initiatilization failed"); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_LOCAL_INIT_FAILED, + NULL); goto err; } co = changelog_get_usable_buffer(*local); if (!co) { - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_NO_MEMORY, - "Failed to get buffer"); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_GET_BUFFER_FAILED, + NULL); goto err; } diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h index 10d457e8cf5..38fa7590c32 100644 --- a/xlators/features/changelog/src/changelog-helpers.h +++ b/xlators/features/changelog/src/changelog-helpers.h @@ -11,14 +11,14 @@ #ifndef _CHANGELOG_HELPERS_H #define _CHANGELOG_HELPERS_H -#include "locking.h" -#include "timer.h" +#include <glusterfs/locking.h> +#include <glusterfs/timer.h> #include "pthread.h" -#include "iobuf.h" -#include "rot-buffs.h" +#include <glusterfs/iobuf.h> +#include <glusterfs/rot-buffs.h> #include "changelog-misc.h" -#include "call-stub.h" +#include <glusterfs/call-stub.h> #include "rpcsvc.h" #include "changelog-ev-handle.h" @@ -31,7 +31,7 @@ */ typedef struct changelog_log_data { /* rollover related */ - unsigned long cld_roll_time; + time_t cld_roll_time; /* reopen changelog? */ gf_boolean_t cld_finale; @@ -97,12 +97,6 @@ struct changelog_encoder { typedef struct changelog_time_slice { /** - * just in case we need nanosecond granularity some day. - * field is unused as of now (maybe we'd need it later). - */ - struct timeval tv_start; - - /** * version of changelog file, incremented each time changes * rollover. */ @@ -190,8 +184,12 @@ typedef struct changelog_ev_selector { /* changelog's private structure */ struct changelog_priv { + /* changelog journalling */ gf_boolean_t active; + /* changelog live notifications */ + gf_boolean_t rpc_active; + /* to generate unique socket file per brick */ char *changelog_brick; @@ -307,6 +305,24 @@ struct changelog_priv { /* glusterfind dependency to capture paths on deleted entries*/ gf_boolean_t capture_del_path; + + /* Save total no. of listners */ + gf_atomic_t listnercnt; + + /* Save total no. of xprt are associated with listner */ + gf_atomic_t xprtcnt; + + /* Save xprt list */ + struct list_head xprt_list; + + /* Save total no. of client connection */ + gf_atomic_t clntcnt; + + /* Save cleanup brick in victim */ + xlator_t *victim; + + /* Status to save cleanup notify status */ + gf_boolean_t notify_down; }; struct changelog_local { @@ -401,11 +417,11 @@ changelog_local_t * changelog_local_init(xlator_t *this, inode_t *inode, uuid_t gfid, int xtra_records, gf_boolean_t update_flag); int -changelog_start_next_change(xlator_t *this, changelog_priv_t *priv, - unsigned long ts, gf_boolean_t finale); +changelog_start_next_change(xlator_t *this, changelog_priv_t *priv, time_t ts, + gf_boolean_t finale); int changelog_open_journal(xlator_t *this, changelog_priv_t *priv); -int +void changelog_fill_rollover_data(changelog_log_data_t *cld, gf_boolean_t is_last); int changelog_inject_single_event(xlator_t *this, changelog_priv_t *priv, @@ -429,12 +445,11 @@ changelog_fsync_thread(void *data); int changelog_forget(xlator_t *this, inode_t *inode); int -htime_update(xlator_t *this, changelog_priv_t *priv, unsigned long ts, - char *buffer); +htime_update(xlator_t *this, changelog_priv_t *priv, time_t ts, char *buffer); int -htime_open(xlator_t *this, changelog_priv_t *priv, unsigned long ts); +htime_open(xlator_t *this, changelog_priv_t *priv, time_t ts); int -htime_create(xlator_t *this, changelog_priv_t *priv, unsigned long ts); +htime_create(xlator_t *this, changelog_priv_t *priv, time_t ts); /* Geo-Rep snapshot dependency changes */ void @@ -492,8 +507,6 @@ changelog_deselect_event(xlator_t *, changelog_ev_selector_t *, unsigned int); int changelog_init_event_selection(xlator_t *, changelog_ev_selector_t *); int -changelog_cleanup_event_selection(xlator_t *, changelog_ev_selector_t *); -int changelog_ev_selected(xlator_t *, changelog_ev_selector_t *, unsigned int); void changelog_dispatch_event(xlator_t *, changelog_priv_t *, changelog_event_t *); @@ -656,8 +669,8 @@ resolve_pargfid_to_path(xlator_t *this, const uuid_t gfid, char **path, #define CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, label) \ do { \ if (!priv->active) { \ - gf_msg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_NOT_ACTIVE, \ - "Changelog is not active, return success"); \ + gf_smsg(this->name, GF_LOG_WARNING, 0, \ + CHANGELOG_MSG_CHANGELOG_NOT_ACTIVE, NULL); \ ret = 0; \ goto label; \ } \ @@ -668,7 +681,7 @@ resolve_pargfid_to_path(xlator_t *this, const uuid_t gfid, char **path, do { \ if (ret) { \ gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_PTHREAD_ERROR, \ - "pthread error", "error=%d", ret, NULL); \ + "error=%d", ret, NULL); \ ret = -1; \ goto label; \ } \ @@ -679,7 +692,7 @@ resolve_pargfid_to_path(xlator_t *this, const uuid_t gfid, char **path, do { \ if (ret) { \ gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_PTHREAD_ERROR, \ - "pthread error", "error=%d", ret, NULL); \ + "error=%d", ret, NULL); \ ret = -1; \ flag = _gf_true; \ goto label; \ @@ -691,7 +704,7 @@ resolve_pargfid_to_path(xlator_t *this, const uuid_t gfid, char **path, do { \ if (ret) { \ gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_PTHREAD_ERROR, \ - "pthread error", "error=%d", ret, NULL); \ + "error=%d", ret, NULL); \ ret = -1; \ pthread_mutex_unlock(&mutex); \ goto label; \ diff --git a/xlators/features/changelog/src/changelog-mem-types.h b/xlators/features/changelog/src/changelog-mem-types.h index 1e3786c6298..a2d8a9cbe93 100644 --- a/xlators/features/changelog/src/changelog-mem-types.h +++ b/xlators/features/changelog/src/changelog-mem-types.h @@ -11,7 +11,7 @@ #ifndef _CHANGELOG_MEM_TYPES_H #define _CHANGELOG_MEM_TYPES_H -#include "mem-types.h" +#include <glusterfs/mem-types.h> enum gf_changelog_mem_types { gf_changelog_mt_priv_t = gf_common_mt_end + 1, diff --git a/xlators/features/changelog/src/changelog-messages.h b/xlators/features/changelog/src/changelog-messages.h index dbf133ec836..cb0e16c85d8 100644 --- a/xlators/features/changelog/src/changelog-messages.h +++ b/xlators/features/changelog/src/changelog-messages.h @@ -11,7 +11,7 @@ #ifndef _CHANGELOG_MESSAGES_H_ #define _CHANGELOG_MESSAGES_H_ -#include "glfs-message-id.h" +#include <glusterfs/glfs-message-id.h> /* To add new message IDs, append new identifiers at the end of the list. * @@ -24,7 +24,7 @@ */ GLFS_MSGID( - CHANGELOG, CHANGELOG_MSG_OPEN_FAILED, CHANGELOG_MSG_NO_MEMORY, + CHANGELOG, CHANGELOG_MSG_OPEN_FAILED, CHANGELOG_MSG_BARRIER_FOP_FAILED, CHANGELOG_MSG_VOL_MISCONFIGURED, CHANGELOG_MSG_RENAME_ERROR, CHANGELOG_MSG_READ_ERROR, CHANGELOG_MSG_HTIME_ERROR, CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED, @@ -37,11 +37,11 @@ GLFS_MSGID( CHANGELOG_MSG_FSYNC_OP_FAILED, CHANGELOG_MSG_TOTAL_LOG_INFO, CHANGELOG_MSG_SNAP_INFO, CHANGELOG_MSG_SELECT_FAILED, CHANGELOG_MSG_FCNTL_FAILED, CHANGELOG_MSG_BNOTIFY_INFO, - CHANGELOG_MSG_ENTRY_BUF_INFO, CHANGELOG_MSG_NOT_ACTIVE, + CHANGELOG_MSG_ENTRY_BUF_INFO, CHANGELOG_MSG_CHANGELOG_NOT_ACTIVE, CHANGELOG_MSG_LOCAL_INIT_FAILED, CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, CHANGELOG_MSG_PROGRAM_NAME_REG_FAILED, CHANGELOG_MSG_HANDLE_PROBE_ERROR, CHANGELOG_MSG_SET_FD_CONTEXT, CHANGELOG_MSG_FREEUP_FAILED, - CHANGELOG_MSG_HTIME_INFO, CHANGELOG_MSG_RPC_SUBMIT_REPLY_FAILED, + CHANGELOG_MSG_RECONFIGURE, CHANGELOG_MSG_RPC_SUBMIT_REPLY_FAILED, CHANGELOG_MSG_RPC_BUILD_ERROR, CHANGELOG_MSG_RPC_CONNECT_ERROR, CHANGELOG_MSG_RPC_START_ERROR, CHANGELOG_MSG_BUFFER_STARVATION_ERROR, CHANGELOG_MSG_SCAN_DIR_FAILED, CHANGELOG_MSG_FSETXATTR_FAILED, @@ -52,6 +52,121 @@ GLFS_MSGID( CHANGELOG_MSG_FSTAT_OP_FAILED, CHANGELOG_MSG_LSEEK_OP_FAILED, CHANGELOG_MSG_STRSTR_OP_FAILED, CHANGELOG_MSG_UNLINK_OP_FAILED, CHANGELOG_MSG_DETECT_EMPTY_CHANGELOG_FAILED, - CHANGELOG_MSG_READLINK_OP_FAILED, CHANGELOG_MSG_EXPLICIT_ROLLOVER_FAILED); + CHANGELOG_MSG_READLINK_OP_FAILED, CHANGELOG_MSG_EXPLICIT_ROLLOVER_FAILED, + CHANGELOG_MSG_RPCSVC_NOTIFY_FAILED, CHANGELOG_MSG_MEMORY_INIT_FAILED, + CHANGELOG_MSG_NO_MEMORY, CHANGELOG_MSG_HTIME_STAT_ERROR, + CHANGELOG_MSG_HTIME_CURRENT_ERROR, CHANGELOG_MSG_BNOTIFY_COND_INFO, + CHANGELOG_MSG_NO_HTIME_CURRENT, CHANGELOG_MSG_HTIME_CURRENT, + CHANGELOG_MSG_NEW_HTIME_FILE, CHANGELOG_MSG_MKDIR_ERROR, + CHANGELOG_MSG_PATH_NOT_FOUND, CHANGELOG_MSG_XATTR_INIT_FAILED, + CHANGELOG_MSG_WROTE_TO_CSNAP, CHANGELOG_MSG_UNUSED_0, + CHANGELOG_MSG_GET_BUFFER_FAILED, CHANGELOG_MSG_BARRIER_STATE_NOTIFY, + CHANGELOG_MSG_BARRIER_DISABLED, CHANGELOG_MSG_BARRIER_ALREADY_DISABLED, + CHANGELOG_MSG_BARRIER_ON_ERROR, CHANGELOG_MSG_BARRIER_ENABLE, + CHANGELOG_MSG_BARRIER_KEY_NOT_FOUND, CHANGELOG_MSG_ERROR_IN_DICT_GET, + CHANGELOG_MSG_UNUSED_1, CHANGELOG_MSG_UNUSED_2, + CHANGELOG_MSG_DEQUEUING_BARRIER_FOPS, + CHANGELOG_MSG_DEQUEUING_BARRIER_FOPS_FINISHED, + CHANGELOG_MSG_BARRIER_TIMEOUT, CHANGELOG_MSG_TIMEOUT_ADD_FAILED, + CHANGELOG_MSG_CLEANUP_ALREADY_SET); +#define CHANGELOG_MSG_BARRIER_FOP_FAILED_STR \ + "failed to barrier FOPs, disabling changelog barrier" +#define CHANGELOG_MSG_MEMORY_INIT_FAILED_STR "memory accounting init failed" +#define CHANGELOG_MSG_NO_MEMORY_STR "failed to create local memory pool" +#define CHANGELOG_MSG_ENTRY_BUF_INFO_STR \ + "Entry cannot be captured for gfid, Capturing DATA entry." +#define CHANGELOG_MSG_PTHREAD_ERROR_STR "pthread error" +#define CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED_STR "pthread_mutex_init failed" +#define CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED_STR "pthread_cond_init failed" +#define CHANGELOG_MSG_HTIME_ERROR_STR "failed to update HTIME file" +#define CHANGELOG_MSG_HTIME_STAT_ERROR_STR "unable to stat htime file" +#define CHANGELOG_MSG_HTIME_CURRENT_ERROR_STR "Error extracting HTIME_CURRENT." +#define CHANGELOG_MSG_UNLINK_OP_FAILED_STR "error unlinking empty changelog" +#define CHANGELOG_MSG_RENAME_ERROR_STR "error renaming" +#define CHANGELOG_MSG_MKDIR_ERROR_STR "unable to create directory" +#define CHANGELOG_MSG_BNOTIFY_INFO_STR \ + "Explicit rollover changelog signaling bnotify" +#define CHANGELOG_MSG_BNOTIFY_COND_INFO_STR "Woke up: bnotify conditional wait" +#define CHANGELOG_MSG_RECONFIGURE_STR "Reconfigure: Changelog Enable" +#define CHANGELOG_MSG_NO_HTIME_CURRENT_STR \ + "HTIME_CURRENT not found. Changelog enabled before init" +#define CHANGELOG_MSG_HTIME_CURRENT_STR "HTIME_CURRENT" +#define CHANGELOG_MSG_NEW_HTIME_FILE_STR \ + "Changelog enable: Creating new HTIME file" +#define CHANGELOG_MSG_FGETXATTR_FAILED_STR "fgetxattr failed" +#define CHANGELOG_MSG_TOTAL_LOG_INFO_STR "changelog info" +#define CHANGELOG_MSG_PTHREAD_COND_WAIT_FAILED_STR "pthread cond wait failed" +#define CHANGELOG_MSG_INODE_NOT_FOUND_STR "inode not found" +#define CHANGELOG_MSG_READLINK_OP_FAILED_STR \ + "could not read the link from the gfid handle" +#define CHANGELOG_MSG_OPEN_FAILED_STR "unable to open file" +#define CHANGELOG_MSG_RPC_CONNECT_ERROR_STR "failed to connect back" +#define CHANGELOG_MSG_BUFFER_STARVATION_ERROR_STR \ + "Failed to get buffer for RPC dispatch" +#define CHANGELOG_MSG_PTHREAD_CANCEL_FAILED_STR "could not cancel thread" +#define CHANGELOG_MSG_FSTAT_OP_FAILED_STR "Could not stat (CHANGELOG)" +#define CHANGELOG_MSG_LSEEK_OP_FAILED_STR "Could not lseek (changelog)" +#define CHANGELOG_MSG_PATH_NOT_FOUND_STR \ + "Could not find CHANGELOG in changelog path" +#define CHANGELOG_MSG_FSYNC_OP_FAILED_STR "fsync failed" +#define CHANGELOG_MSG_DETECT_EMPTY_CHANGELOG_FAILED_STR \ + "Error detecting empty changelog" +#define CHANGELOG_MSG_EXPLICIT_ROLLOVER_FAILED_STR \ + "Fail snapshot because of previous errors" +#define CHANGELOG_MSG_SCAN_DIR_FAILED_STR "scandir failed" +#define CHANGELOG_MSG_FSETXATTR_FAILED_STR "fsetxattr failed" +#define CHANGELOG_MSG_XATTR_INIT_FAILED_STR "Htime xattr initialization failed" +#define CHANGELOG_MSG_SNAP_INFO_STR "log in call path" +#define CHANGELOG_MSG_WRITE_FAILED_STR "error writing to disk" +#define CHANGELOG_MSG_WROTE_TO_CSNAP_STR "Successfully wrote to csnap" +#define CHANGELOG_MSG_GET_TIME_OP_FAILED_STR "Problem rolling over changelog(s)" +#define CHANGELOG_MSG_BARRIER_INFO_STR "Explicit wakeup on barrier notify" +#define CHANGELOG_MSG_SELECT_FAILED_STR "pthread_cond_timedwait failed" +#define CHANGELOG_MSG_INJECT_FSYNC_FAILED_STR "failed to inject fsync event" +#define CHANGELOG_MSG_LOCAL_INIT_FAILED_STR \ + "changelog local initialization failed" +#define CHANGELOG_MSG_GET_BUFFER_FAILED_STR "Failed to get buffer" +#define CHANGELOG_MSG_SET_FD_CONTEXT_STR \ + "could not set fd context(for release cbk)" +#define CHANGELOG_MSG_DICT_GET_FAILED_STR "Barrier failed" +#define CHANGELOG_MSG_BARRIER_STATE_NOTIFY_STR "Barrier notification" +#define CHANGELOG_MSG_BARRIER_ERROR_STR \ + "Received another barrier off notification while already off" +#define CHANGELOG_MSG_BARRIER_DISABLED_STR "disabled changelog barrier" +#define CHANGELOG_MSG_BARRIER_ALREADY_DISABLED_STR \ + "Changelog barrier already disabled" +#define CHANGELOG_MSG_BARRIER_ON_ERROR_STR \ + "Received another barrier on notification when last one is not served yet" +#define CHANGELOG_MSG_BARRIER_ENABLE_STR "Enabled changelog barrier" +#define CHANGELOG_MSG_BARRIER_KEY_NOT_FOUND_STR "barrier key not found" +#define CHANGELOG_MSG_ERROR_IN_DICT_GET_STR \ + "Something went wrong in dict_get_str_boolean" +#define CHANGELOG_MSG_DIR_OPTIONS_NOT_SET_STR "changelog-dir option is not set" +#define CHANGELOG_MSG_FREEUP_FAILED_STR "could not cleanup bootstrapper" +#define CHANGELOG_MSG_CHILD_MISCONFIGURED_STR \ + "translator needs a single subvolume" +#define CHANGELOG_MSG_VOL_MISCONFIGURED_STR \ + "dangling volume. please check volfile" +#define CHANGELOG_MSG_DEQUEUING_BARRIER_FOPS_STR \ + "Dequeuing all the changelog barriered fops" +#define CHANGELOG_MSG_DEQUEUING_BARRIER_FOPS_FINISHED_STR \ + "Dequeuing changelog barriered fops is finished" +#define CHANGELOG_MSG_BARRIER_TIMEOUT_STR \ + "Disabling changelog barrier because of the timeout" +#define CHANGELOG_MSG_TIMEOUT_ADD_FAILED_STR \ + "Couldn't add changelog barrier timeout event" +#define CHANGELOG_MSG_RPC_BUILD_ERROR_STR "failed to build rpc options" +#define CHANGELOG_MSG_NOTIFY_REGISTER_FAILED_STR "failed to register notify" +#define CHANGELOG_MSG_RPC_START_ERROR_STR "failed to start rpc" +#define CHANGELOG_MSG_CREATE_FRAME_FAILED_STR "failed to create frame" +#define CHANGELOG_MSG_RPC_SUBMIT_REPLY_FAILED_STR "failed to serialize reply" +#define CHANGELOG_MSG_PROGRAM_NAME_REG_FAILED_STR "cannot register program" +#define CHANGELOG_MSG_CHANGELOG_NOT_ACTIVE_STR \ + "Changelog is not active, return success" +#define CHANGELOG_MSG_PUT_BUFFER_FAILED_STR \ + "failed to put buffer after consumption" +#define CHANGELOG_MSG_CLEANUP_ALREADY_SET_STR \ + "cleanup_starting flag is already set for xl" +#define CHANGELOG_MSG_HANDLE_PROBE_ERROR_STR "xdr decoding error" #endif /* !_CHANGELOG_MESSAGES_H_ */ diff --git a/xlators/features/changelog/src/changelog-misc.h b/xlators/features/changelog/src/changelog-misc.h index 04d1bdeba03..e2addc09414 100644 --- a/xlators/features/changelog/src/changelog-misc.h +++ b/xlators/features/changelog/src/changelog-misc.h @@ -11,8 +11,8 @@ #ifndef _CHANGELOG_MISC_H #define _CHANGELOG_MISC_H -#include "glusterfs.h" -#include "common-utils.h" +#include <glusterfs/glusterfs.h> +#include <glusterfs/common-utils.h> #define CHANGELOG_MAX_TYPE 4 #define CHANGELOG_FILE_NAME "CHANGELOG" diff --git a/xlators/features/changelog/src/changelog-rpc-common.c b/xlators/features/changelog/src/changelog-rpc-common.c index 91d6581836a..125246a17e1 100644 --- a/xlators/features/changelog/src/changelog-rpc-common.c +++ b/xlators/features/changelog/src/changelog-rpc-common.c @@ -11,7 +11,7 @@ #include "changelog-rpc-common.h" #include "changelog-messages.h" -#include "syscall.h" +#include <glusterfs/syscall.h> /** ***************************************************** Client Interface @@ -28,7 +28,7 @@ changelog_rpc_poller(void *arg) { xlator_t *this = arg; - (void)event_dispatch(this->ctx->event_pool); + (void)gf_event_dispatch(this->ctx->event_pool); return NULL; } @@ -47,10 +47,10 @@ changelog_rpc_client_init(xlator_t *this, void *cbkdata, char *sockfile, if (!options) goto error_return; - ret = rpc_transport_unix_options_build(&options, sockfile, 0); + ret = rpc_transport_unix_options_build(options, sockfile, 0); if (ret) { - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_BUILD_ERROR, - "failed to build rpc options"); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_BUILD_ERROR, + NULL); goto dealloc_dict; } @@ -60,19 +60,19 @@ changelog_rpc_client_init(xlator_t *this, void *cbkdata, char *sockfile, ret = rpc_clnt_register_notify(rpc, fn, cbkdata); if (ret) { - gf_msg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, - "failed to register notify"); + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, NULL); goto dealloc_rpc_clnt; } ret = rpc_clnt_start(rpc); if (ret) { - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR, - "failed to start rpc"); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR, + NULL); goto dealloc_rpc_clnt; } + dict_unref(options); return rpc; dealloc_rpc_clnt: @@ -164,8 +164,8 @@ changelog_invoke_rpc(xlator_t *this, struct rpc_clnt *rpc, frame = create_frame(this, this->ctx->pool); if (!frame) { - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_CREATE_FRAME_FAILED, - "failed to create frame"); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_CREATE_FRAME_FAILED, + NULL); goto error_return; } @@ -238,8 +238,8 @@ changelog_rpc_sumbit_reply(rpcsvc_request_t *req, void *arg, iob = __changelog_rpc_serialize_reply(req, arg, &iov, xdrproc); if (!iob) - gf_msg("", GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_SUBMIT_REPLY_FAILED, - "failed to serialize reply"); + gf_smsg("", GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_SUBMIT_REPLY_FAILED, + NULL); else iobref_add(iobref, iob); @@ -260,6 +260,10 @@ changelog_rpc_server_destroy(xlator_t *this, rpcsvc_t *rpc, char *sockfile, rpcsvc_listener_t *listener = NULL; rpcsvc_listener_t *next = NULL; struct rpcsvc_program *prog = NULL; + rpc_transport_t *trans = NULL; + + if (!rpc) + return; while (*progs) { prog = *progs; @@ -269,22 +273,25 @@ changelog_rpc_server_destroy(xlator_t *this, rpcsvc_t *rpc, char *sockfile, list_for_each_entry_safe(listener, next, &rpc->listeners, list) { - rpcsvc_listener_destroy(listener); + if (listener->trans) { + trans = listener->trans; + rpc_transport_disconnect(trans, _gf_false); + } } (void)rpcsvc_unregister_notify(rpc, fn, this); - sys_unlink(sockfile); - if (rpc->rxpool) { - mem_pool_destroy(rpc->rxpool); - rpc->rxpool = NULL; - } /* TODO Avoid freeing rpc object in case of brick multiplex after freeing rpc object svc->rpclock corrupted and it takes more time to detach a brick */ - if (!this->cleanup_starting) + if (!this->cleanup_starting) { + if (rpc->rxpool) { + mem_pool_destroy(rpc->rxpool); + rpc->rxpool = NULL; + } GF_FREE(rpc); + } } rpcsvc_t * @@ -299,22 +306,25 @@ changelog_rpc_server_init(xlator_t *this, char *sockfile, void *cbkdata, if (!cbkdata) cbkdata = this; - ret = rpcsvc_transport_unix_options_build(&options, sockfile); + options = dict_new(); + if (!options) + return NULL; + + ret = rpcsvc_transport_unix_options_build(options, sockfile); if (ret) goto dealloc_dict; rpc = rpcsvc_init(this, this->ctx, options, 8); if (rpc == NULL) { - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR, - "failed to init rpc"); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR, + NULL); goto dealloc_dict; } ret = rpcsvc_register_notify(rpc, fn, cbkdata); if (ret) { - gf_msg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, - "failed to register notify function"); + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, NULL); goto dealloc_rpc; } @@ -328,11 +338,10 @@ changelog_rpc_server_init(xlator_t *this, char *sockfile, void *cbkdata, prog = *progs; ret = rpcsvc_program_register(rpc, prog, _gf_false); if (ret) { - gf_msg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_PROGRAM_NAME_REG_FAILED, - "cannot register program " - "(name: %s, prognum: %d, pogver: %d)", - prog->progname, prog->prognum, prog->progver); + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_PROGRAM_NAME_REG_FAILED, "name%s", + prog->progname, "prognum=%d", prog->prognum, "pogver=%d", + prog->progver, NULL); goto dealloc_rpc; } diff --git a/xlators/features/changelog/src/changelog-rpc-common.h b/xlators/features/changelog/src/changelog-rpc-common.h index 2d3f06e60c0..4d9aa2c694b 100644 --- a/xlators/features/changelog/src/changelog-rpc-common.h +++ b/xlators/features/changelog/src/changelog-rpc-common.h @@ -13,8 +13,8 @@ #include "rpcsvc.h" #include "rpc-clnt.h" -#include "gf-event.h" -#include "call-stub.h" +#include <glusterfs/gf-event.h> +#include <glusterfs/call-stub.h> #include "changelog-xdr.h" #include "xdr-generic.h" diff --git a/xlators/features/changelog/src/changelog-rpc.c b/xlators/features/changelog/src/changelog-rpc.c index 828f85e8e45..440b88091a6 100644 --- a/xlators/features/changelog/src/changelog-rpc.c +++ b/xlators/features/changelog/src/changelog-rpc.c @@ -8,12 +8,12 @@ cases as published by the Free Software Foundation. */ -#include "syscall.h" +#include <glusterfs/syscall.h> #include "changelog-rpc.h" #include "changelog-mem-types.h" #include "changelog-ev-handle.h" -struct rpcsvc_program *changelog_programs[]; +static struct rpcsvc_program *changelog_programs[]; static void changelog_cleanup_dispatchers(xlator_t *this, changelog_priv_t *priv, int count) @@ -43,9 +43,6 @@ changelog_cleanup_rpc_threads(xlator_t *this, changelog_priv_t *priv) /** terminate dispatcher thread(s) */ changelog_cleanup_dispatchers(this, priv, priv->nr_dispatchers); - /* TODO: what about pending and waiting connections? */ - changelog_ev_cleanup_connections(this, conn); - /* destroy locks */ ret = pthread_mutex_destroy(&conn->pending_lock); if (ret != 0) @@ -72,9 +69,6 @@ changelog_init_rpc_threads(xlator_t *this, changelog_priv_t *priv, rbuf_t *rbuf, int j = 0; int ret = 0; changelog_clnt_t *conn = NULL; - char thread_name[GF_THREAD_NAMEMAX] = { - 0, - }; conn = &priv->connections; @@ -114,9 +108,9 @@ changelog_init_rpc_threads(xlator_t *this, changelog_priv_t *priv, rbuf_t *rbuf, /* spawn dispatcher threads */ for (; j < nr_dispatchers; j++) { - snprintf(thread_name, sizeof(thread_name), "clogd%03hx", (j & 0x3ff)); ret = gf_thread_create(&priv->ev_dispatcher[j], NULL, - changelog_ev_dispatch, conn, thread_name); + changelog_ev_dispatch, conn, "clogd%03hx", + j & 0x3ff); if (ret != 0) { changelog_cleanup_dispatchers(this, priv, j); break; @@ -147,48 +141,146 @@ int changelog_rpcsvc_notify(rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, void *data) { + xlator_t *this = NULL; + rpc_transport_t *trans = NULL; + rpc_transport_t *xprt = NULL; + rpc_transport_t *xp_next = NULL; + changelog_priv_t *priv = NULL; + uint64_t listnercnt = 0; + uint64_t xprtcnt = 0; + uint64_t clntcnt = 0; + rpcsvc_listener_t *listener = NULL; + rpcsvc_listener_t *next = NULL; + gf_boolean_t listner_found = _gf_false; + socket_private_t *sockpriv = NULL; + + if (!xl || !data || !rpc) { + gf_msg_callingfn("changelog", GF_LOG_WARNING, 0, + CHANGELOG_MSG_RPCSVC_NOTIFY_FAILED, + "Calling rpc_notify without initializing"); + goto out; + } + + this = xl; + trans = data; + priv = this->private; + + if (!priv) { + gf_msg_callingfn("changelog", GF_LOG_WARNING, 0, + CHANGELOG_MSG_RPCSVC_NOTIFY_FAILED, + "Calling rpc_notify without priv initializing"); + goto out; + } + + if (event == RPCSVC_EVENT_ACCEPT) { + GF_ATOMIC_INC(priv->xprtcnt); + LOCK(&priv->lock); + { + list_add_tail(&trans->list, &priv->xprt_list); + } + UNLOCK(&priv->lock); + goto out; + } + + if (event == RPCSVC_EVENT_DISCONNECT) { + list_for_each_entry_safe(listener, next, &rpc->listeners, list) + { + if (listener && listener->trans) { + if (listener->trans == trans) { + listnercnt = GF_ATOMIC_DEC(priv->listnercnt); + listner_found = _gf_true; + rpcsvc_listener_destroy(listener); + } + } + } + + if (listnercnt > 0) { + goto out; + } + if (listner_found) { + LOCK(&priv->lock); + list_for_each_entry_safe(xprt, xp_next, &priv->xprt_list, list) + { + sockpriv = (socket_private_t *)(xprt->private); + gf_log("changelog", GF_LOG_INFO, + "Send disconnect" + " on socket %d", + sockpriv->sock); + rpc_transport_disconnect(xprt, _gf_false); + } + UNLOCK(&priv->lock); + goto out; + } + LOCK(&priv->lock); + { + list_del_init(&trans->list); + } + UNLOCK(&priv->lock); + + xprtcnt = GF_ATOMIC_DEC(priv->xprtcnt); + clntcnt = GF_ATOMIC_GET(priv->clntcnt); + if (!xprtcnt && !clntcnt) { + changelog_process_cleanup_event(this); + } + } + +out: return 0; } void +changelog_process_cleanup_event(xlator_t *this) +{ + gf_boolean_t cleanup_notify = _gf_false; + changelog_priv_t *priv = NULL; + char sockfile[UNIX_PATH_MAX] = { + 0, + }; + + if (!this) + return; + priv = this->private; + if (!priv) + return; + + LOCK(&priv->lock); + { + cleanup_notify = priv->notify_down; + priv->notify_down = _gf_true; + } + UNLOCK(&priv->lock); + + if (priv->victim && !cleanup_notify) { + default_notify(this, GF_EVENT_PARENT_DOWN, priv->victim); + + if (priv->rpc) { + /* sockfile path could have been saved to avoid this */ + CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile, + UNIX_PATH_MAX); + sys_unlink(sockfile); + (void)rpcsvc_unregister_notify(priv->rpc, changelog_rpcsvc_notify, + this); + if (priv->rpc->rxpool) { + mem_pool_destroy(priv->rpc->rxpool); + priv->rpc->rxpool = NULL; + } + GF_FREE(priv->rpc); + priv->rpc = NULL; + } + } +} + +void changelog_destroy_rpc_listner(xlator_t *this, changelog_priv_t *priv) { char sockfile[UNIX_PATH_MAX] = { 0, }; - changelog_clnt_t *c_clnt = &priv->connections; - changelog_rpc_clnt_t *crpc = NULL; - int nofconn = 0; /* sockfile path could have been saved to avoid this */ CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile, UNIX_PATH_MAX); changelog_rpc_server_destroy(this, priv->rpc, sockfile, changelog_rpcsvc_notify, changelog_programs); - - /* TODO Below approach is not perfect to wait for cleanup - all active connections without this code brick process - can be crash in case of brick multiplexing if any in-progress - request process on rpc by changelog xlator after - cleanup resources - */ - - if (c_clnt) { - do { - nofconn = 0; - LOCK(&c_clnt->active_lock); - list_for_each_entry(crpc, &c_clnt->active, list) { nofconn++; } - UNLOCK(&c_clnt->active_lock); - LOCK(&c_clnt->wait_lock); - list_for_each_entry(crpc, &c_clnt->waitq, list) { nofconn++; } - UNLOCK(&c_clnt->wait_lock); - pthread_mutex_lock(&c_clnt->pending_lock); - list_for_each_entry(crpc, &c_clnt->pending, list) { nofconn++; } - pthread_mutex_unlock(&c_clnt->pending_lock); - - } while (nofconn); /* Wait for all connection cleanup */ - } - - (void)changelog_cleanup_rpc_threads(this, priv); } rpcsvc_t * @@ -287,16 +379,15 @@ changelog_handle_probe(rpcsvc_request_t *req) this = req->trans->xl; if (this->cleanup_starting) { - gf_msg(this->name, GF_LOG_DEBUG, 0, CHANGELOG_MSG_HANDLE_PROBE_ERROR, - "cleanup_starting flag is already set for xl"); + gf_smsg(this->name, GF_LOG_DEBUG, 0, CHANGELOG_MSG_CLEANUP_ALREADY_SET, + NULL); return 0; } ret = xdr_to_generic(req->msg[0], &rpc_req, (xdrproc_t)xdr_changelog_probe_req); if (ret < 0) { - gf_msg("", GF_LOG_ERROR, 0, CHANGELOG_MSG_HANDLE_PROBE_ERROR, - "xdr decoding error"); + gf_smsg("", GF_LOG_ERROR, 0, CHANGELOG_MSG_HANDLE_PROBE_ERROR, NULL); req->rpc_err = GARBAGE_ARGS; goto handle_xdr_error; } @@ -328,13 +419,13 @@ submit_rpc: * RPC declarations */ -rpcsvc_actor_t changelog_svc_actors[CHANGELOG_RPC_PROC_MAX] = { +static rpcsvc_actor_t changelog_svc_actors[CHANGELOG_RPC_PROC_MAX] = { [CHANGELOG_RPC_PROBE_FILTER] = {"CHANGELOG PROBE FILTER", - CHANGELOG_RPC_PROBE_FILTER, - changelog_handle_probe, NULL, 0, DRC_NA}, + changelog_handle_probe, NULL, + CHANGELOG_RPC_PROBE_FILTER, DRC_NA, 0}, }; -struct rpcsvc_program changelog_svc_prog = { +static struct rpcsvc_program changelog_svc_prog = { .progname = CHANGELOG_RPC_PROGNAME, .prognum = CHANGELOG_RPC_PROGNUM, .progver = CHANGELOG_RPC_PROGVER, @@ -343,7 +434,7 @@ struct rpcsvc_program changelog_svc_prog = { .synctask = _gf_true, }; -struct rpcsvc_program *changelog_programs[] = { +static struct rpcsvc_program *changelog_programs[] = { &changelog_svc_prog, NULL, }; diff --git a/xlators/features/changelog/src/changelog-rpc.h b/xlators/features/changelog/src/changelog-rpc.h index 8002cea5091..b1707565249 100644 --- a/xlators/features/changelog/src/changelog-rpc.h +++ b/xlators/features/changelog/src/changelog-rpc.h @@ -11,7 +11,7 @@ #ifndef __CHANGELOG_RPC_H #define __CHANGELOG_RPC_H -#include "xlator.h" +#include <glusterfs/xlator.h> #include "changelog-helpers.h" /* one time */ diff --git a/xlators/features/changelog/src/changelog-rt.c b/xlators/features/changelog/src/changelog-rt.c index 968c76b8b20..841545ae359 100644 --- a/xlators/features/changelog/src/changelog-rt.c +++ b/xlators/features/changelog/src/changelog-rt.c @@ -8,9 +8,9 @@ cases as published by the Free Software Foundation. */ -#include "xlator.h" -#include "defaults.h" -#include "logging.h" +#include <glusterfs/xlator.h> +#include <glusterfs/defaults.h> +#include <glusterfs/logging.h> #include "changelog-rt.h" #include "changelog-mem-types.h" diff --git a/xlators/features/changelog/src/changelog-rt.h b/xlators/features/changelog/src/changelog-rt.h index df0d5b03487..28b9827d85b 100644 --- a/xlators/features/changelog/src/changelog-rt.h +++ b/xlators/features/changelog/src/changelog-rt.h @@ -11,8 +11,8 @@ #ifndef _CHANGELOG_RT_H #define _CHANGELOG_RT_H -#include "locking.h" -#include "timer.h" +#include <glusterfs/locking.h> +#include <glusterfs/timer.h> #include "pthread.h" #include "changelog-helpers.h" diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c index 35a523316ed..6a6e5af859e 100644 --- a/xlators/features/changelog/src/changelog.c +++ b/xlators/features/changelog/src/changelog.c @@ -8,11 +8,11 @@ cases as published by the Free Software Foundation. */ -#include "xlator.h" -#include "defaults.h" -#include "syscall.h" -#include "logging.h" -#include "iobuf.h" +#include <glusterfs/xlator.h> +#include <glusterfs/defaults.h> +#include <glusterfs/syscall.h> +#include <glusterfs/logging.h> +#include <glusterfs/iobuf.h> #include "changelog-rt.h" @@ -34,6 +34,12 @@ static struct changelog_bootstrap cb_bootstrap[] = { }, }; +static int +changelog_init_rpc(xlator_t *this, changelog_priv_t *priv); + +static int +changelog_init(xlator_t *this, changelog_priv_t *priv); + /* Entry operations - TYPE III */ /** @@ -149,9 +155,8 @@ changelog_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags, goto out; } if (barrier_enabled && !stub) { - gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY, - "Failed to barrier FOPs, disabling changelog barrier", - "fop=rmdir", NULL); + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, + CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=rmdir", NULL); chlog_barrier_dequeue_all(this, &queue); } @@ -298,9 +303,8 @@ changelog_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags, goto out; } if (barrier_enabled && !stub) { - gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY, - "Failed to barrier FOPs, disabling changelog barrier", - "fop=unlink", NULL); + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, + CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=unlink", NULL); chlog_barrier_dequeue_all(this, &queue); } @@ -418,9 +422,8 @@ changelog_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, goto out; } if (barrier_enabled && !stub) { - gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY, - "Failed to barrier FOPs, disabling changelog barrier", - "fop=rename", NULL); + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, + CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=rename", NULL); chlog_barrier_dequeue_all(this, &queue); } /* changelog barrier */ @@ -531,8 +534,7 @@ changelog_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, } if (barrier_enabled && !stub) { - gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_NO_MEMORY, - "Failed to barrier FOPs, disabling changelog barrier", + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=link", NULL); chlog_barrier_dequeue_all(this, &queue); } @@ -660,9 +662,8 @@ changelog_mkdir(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, } if (barrier_enabled && !stub) { - gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY, - "Failed to barrier FOPs, disabling changelog barrier", - "fop=mkdir", NULL); + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, + CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=mkdir", NULL); chlog_barrier_dequeue_all(this, &queue); } @@ -782,9 +783,8 @@ changelog_symlink(call_frame_t *frame, xlator_t *this, const char *linkname, } if (barrier_enabled && !stub) { - gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY, - "Failed to barrier FOPs, disabling changelog barrier", - "fop=symlink", NULL); + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, + CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=symlink", NULL); chlog_barrier_dequeue_all(this, &queue); } @@ -929,9 +929,8 @@ changelog_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, } if (barrier_enabled && !stub) { - gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY, - "Failed to barrier FOPs, disabling changelog barrier", - "fop=mknod", NULL); + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, + CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=mknod", NULL); chlog_barrier_dequeue_all(this, &queue); } @@ -972,8 +971,8 @@ changelog_create_cbk(call_frame_t *frame, void *cookie, xlator_t *this, CHANGELOG_OP_TYPE_RELEASE)) { ret = fd_ctx_set(fd, this, (uint64_t)(long)0x1); if (ret) - gf_msg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_SET_FD_CONTEXT, - "could not set fd context (for release cbk)"); + gf_smsg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_SET_FD_CONTEXT, + NULL); } changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY); @@ -1083,9 +1082,8 @@ changelog_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, } if (barrier_enabled && !stub) { - gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY, - "Failed to barrier FOPs, disabling changelog barrier", - "fop=create", NULL); + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, + CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=create", NULL); chlog_barrier_dequeue_all(this, &queue); } @@ -1388,9 +1386,6 @@ changelog_handle_virtual_xattr(call_frame_t *frame, xlator_t *this, loc_t *loc, ret = changelog_fill_entry_buf(frame, this, loc, &local); if (ret) { gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_ENTRY_BUF_INFO, - "Entry cannot be" - " captured for gfid, Capturing DATA" - " entry.", "gfid=%s", uuid_utoa(loc->inode->gfid), NULL); goto unwind; } @@ -1806,8 +1801,8 @@ changelog_open_cbk(call_frame_t *frame, void *cookie, xlator_t *this, CHANGELOG_OP_TYPE_RELEASE)) { ret = fd_ctx_set(fd, this, (uint64_t)(long)0x1); if (ret) - gf_msg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_SET_FD_CONTEXT, - "could not set fd context (for release cbk)"); + gf_smsg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_SET_FD_CONTEXT, + NULL); } unwind: @@ -2004,6 +1999,15 @@ notify(xlator_t *this, int event, void *data, ...) struct list_head queue = { 0, }; + uint64_t xprtcnt = 0; + uint64_t clntcnt = 0; + changelog_clnt_t *conn = NULL; + gf_boolean_t cleanup_notify = _gf_false; + char sockfile[UNIX_PATH_MAX] = { + 0, + }; + rpcsvc_listener_t *listener = NULL; + rpcsvc_listener_t *next = NULL; INIT_LIST_HEAD(&queue); @@ -2011,6 +2015,50 @@ notify(xlator_t *this, int event, void *data, ...) if (!priv) goto out; + if (event == GF_EVENT_PARENT_DOWN) { + priv->victim = data; + gf_log(this->name, GF_LOG_INFO, + "cleanup changelog rpc connection of brick %s", + priv->victim->name); + + if (priv->rpc_active) { + this->cleanup_starting = 1; + changelog_destroy_rpc_listner(this, priv); + conn = &priv->connections; + if (conn) + changelog_ev_cleanup_connections(this, conn); + xprtcnt = GF_ATOMIC_GET(priv->xprtcnt); + clntcnt = GF_ATOMIC_GET(priv->clntcnt); + if (!xprtcnt && !clntcnt) { + LOCK(&priv->lock); + { + cleanup_notify = priv->notify_down; + priv->notify_down = _gf_true; + } + UNLOCK(&priv->lock); + if (priv->rpc) { + list_for_each_entry_safe(listener, next, + &priv->rpc->listeners, list) + { + if (listener->trans) { + rpc_transport_unref(listener->trans); + } + } + rpcsvc_destroy(priv->rpc); + priv->rpc = NULL; + } + CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile, + UNIX_PATH_MAX); + sys_unlink(sockfile); + if (!cleanup_notify) + default_notify(this, GF_EVENT_PARENT_DOWN, data); + } + } else { + default_notify(this, GF_EVENT_PARENT_DOWN, data); + } + goto out; + } + if (event == GF_EVENT_TRANSLATOR_OP) { dict = data; @@ -2018,15 +2066,15 @@ notify(xlator_t *this, int event, void *data, ...) switch (barrier) { case DICT_ERROR: - gf_msg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_DICT_GET_FAILED, - "Barrier dict_get_str_boolean failed"); + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_DICT_GET_FAILED, "dict_get_str_boolean", + NULL); ret = -1; goto out; case BARRIER_OFF: - gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_INFO, - "Barrier off notification"); + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_MSG_BARRIER_STATE_NOTIFY, "off", NULL); CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out); LOCK(&priv->c_snap_lock); @@ -2043,10 +2091,8 @@ notify(xlator_t *this, int event, void *data, ...) UNLOCK(&priv->bflags.lock); if (ret == -1) { - gf_msg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_BARRIER_ERROR, - "Received another barrier off" - " notification while already off"); + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_BARRIER_ERROR, NULL); goto out; } @@ -2064,13 +2110,11 @@ notify(xlator_t *this, int event, void *data, ...) */ if (ret == 0) { chlog_barrier_dequeue_all(this, &queue); - gf_msg(this->name, GF_LOG_INFO, 0, - CHANGELOG_MSG_BARRIER_INFO, - "Disabled changelog barrier"); + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_MSG_BARRIER_DISABLED, NULL); } else { - gf_msg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_BARRIER_ERROR, - "Changelog barrier already disabled"); + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_BARRIER_ALREADY_DISABLED, NULL); } LOCK(&priv->bflags.lock); @@ -2082,8 +2126,8 @@ notify(xlator_t *this, int event, void *data, ...) goto out; case BARRIER_ON: - gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_INFO, - "Barrier on notification"); + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_MSG_BARRIER_STATE_NOTIFY, "on", NULL); CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out); LOCK(&priv->c_snap_lock); @@ -2102,11 +2146,8 @@ notify(xlator_t *this, int event, void *data, ...) UNLOCK(&priv->bflags.lock); if (ret == -1) { - gf_msg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_BARRIER_ERROR, - "Received another barrier on" - "notification when last one is" - "not served yet"); + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_BARRIER_ON_ERROR, NULL); goto out; } @@ -2129,14 +2170,14 @@ notify(xlator_t *this, int event, void *data, ...) goto out; } - gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_INFO, - "Enabled changelog barrier"); + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_MSG_BARRIER_ENABLE, NULL); ret = changelog_barrier_notify(priv, buf); if (ret) { - gf_msg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_WRITE_FAILED, - "Explicit roll over: write failed"); + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_WRITE_FAILED, "Explicit roll over", + NULL); changelog_barrier_cleanup(this, priv, &queue); ret = -1; goto out; @@ -2160,21 +2201,20 @@ notify(xlator_t *this, int event, void *data, ...) } ret1 = pthread_mutex_unlock(&priv->bn.bnotify_mutex); CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret1, out, bclean_req); - gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BNOTIFY_INFO, - "Woke up: bnotify conditional wait"); + gf_smsg(this->name, GF_LOG_INFO, 0, + CHANGELOG_MSG_BNOTIFY_COND_INFO, NULL); goto out; case DICT_DEFAULT: - gf_msg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_DICT_GET_FAILED, "barrier key not found"); + gf_smsg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_BARRIER_KEY_NOT_FOUND, NULL); ret = -1; goto out; default: - gf_msg(this->name, GF_LOG_ERROR, EINVAL, - CHANGELOG_MSG_DICT_GET_FAILED, - "Something went bad in dict_get_str_boolean"); + gf_smsg(this->name, GF_LOG_ERROR, EINVAL, + CHANGELOG_MSG_ERROR_IN_DICT_GET, NULL); ret = -1; goto out; } @@ -2200,9 +2240,8 @@ mem_acct_init(xlator_t *this) ret = xlator_mem_acct_init(this, gf_changelog_mt_end + 1); if (ret != 0) { - gf_msg(this->name, GF_LOG_WARNING, ENOMEM, CHANGELOG_MSG_NO_MEMORY, - "Memory accounting" - " init failed"); + gf_smsg(this->name, GF_LOG_WARNING, ENOMEM, + CHANGELOG_MSG_MEMORY_INIT_FAILED, NULL); return ret; } @@ -2213,23 +2252,11 @@ static int changelog_init(xlator_t *this, changelog_priv_t *priv) { int i = 0; - int ret = -1; - struct timeval tv = { - 0, - }; + int ret = 0; changelog_log_data_t cld = { 0, }; - ret = gettimeofday(&tv, NULL); - if (ret) { - gf_msg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_GET_TIME_OP_FAILED, "gettimeofday() failure"); - goto out; - } - - priv->slice.tv_start = tv; - priv->maps[CHANGELOG_TYPE_DATA] = "D "; priv->maps[CHANGELOG_TYPE_METADATA] = "M "; priv->maps[CHANGELOG_TYPE_METADATA_XATTR] = "M "; @@ -2248,9 +2275,7 @@ changelog_init(xlator_t *this, changelog_priv_t *priv) * in case there was an encoding change. so... things are kept * simple here. */ - ret = changelog_fill_rollover_data(&cld, _gf_false); - if (ret) - goto out; + changelog_fill_rollover_data(&cld, _gf_false); ret = htime_open(this, priv, cld.cld_roll_time); /* call htime open with cld's rollover_time */ @@ -2288,8 +2313,8 @@ changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv) if ((ret = pthread_mutex_init(&priv->bn.bnotify_mutex, NULL)) != 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED, - "bnotify pthread_mutex_init failed", "ret=%d", ret, NULL); + CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED, "name=bnotify", + "ret=%d", ret, NULL); ret = -1; goto out; } @@ -2297,8 +2322,8 @@ changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv) if ((ret = pthread_cond_init(&priv->bn.bnotify_cond, NULL)) != 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED, - "bnotify pthread_cond_init failed", "ret=%d", ret, NULL); + CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED, "name=bnotify", + "ret=%d", ret, NULL); ret = -1; goto out; } @@ -2306,8 +2331,8 @@ changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv) if ((ret = pthread_mutex_init(&priv->dm.drain_black_mutex, NULL)) != 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED, - "drain_black pthread_mutex_init failed", "ret=%d", ret, NULL); + CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED, "name=drain_black", + "ret=%d", ret, NULL); ret = -1; goto out; } @@ -2315,8 +2340,8 @@ changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv) if ((ret = pthread_cond_init(&priv->dm.drain_black_cond, NULL)) != 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED, - "drain_black pthread_cond_init failed", "ret=%d", ret, NULL); + CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED, "name=drain_black", + "ret=%d", ret, NULL); ret = -1; goto out; } @@ -2324,8 +2349,8 @@ changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv) if ((ret = pthread_mutex_init(&priv->dm.drain_white_mutex, NULL)) != 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED, - "drain_white pthread_mutex_init failed", "ret=%d", ret, NULL); + CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED, "name=drain_white", + "ret=%d", ret, NULL); ret = -1; goto out; } @@ -2333,8 +2358,8 @@ changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv) if ((ret = pthread_cond_init(&priv->dm.drain_white_cond, NULL)) != 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, - CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED, - "drain_white pthread_cond_init failed", "ret=%d", ret, NULL); + CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED, "name=drain_white", + "ret=%d", ret, NULL); ret = -1; goto out; } @@ -2343,7 +2368,7 @@ changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv) if ((pthread_mutex_init(&priv->cr.lock, NULL)) != 0) { gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED, - "changelog_rollover lock init failed", "ret=%d", ret, NULL); + "name=changelog_rollover", "ret=%d", ret, NULL); ret = -1; goto out; } @@ -2394,6 +2419,22 @@ changelog_barrier_pthread_destroy(changelog_priv_t *priv) LOCK_DESTROY(&priv->bflags.lock); } +static void +changelog_cleanup_rpc(xlator_t *this, changelog_priv_t *priv) +{ + /* terminate rpc server */ + if (!this->cleanup_starting) + changelog_destroy_rpc_listner(this, priv); + + (void)changelog_cleanup_rpc_threads(this, priv); + /* cleanup rot buffs */ + rbuf_dtor(priv->rbuf); + + /* cleanup poller thread */ + if (priv->poller) + (void)changelog_thread_cleanup(this, priv->poller); +} + int reconfigure(xlator_t *this, dict_t *options) { @@ -2402,6 +2443,9 @@ reconfigure(xlator_t *this, dict_t *options) changelog_priv_t *priv = NULL; gf_boolean_t active_earlier = _gf_true; gf_boolean_t active_now = _gf_true; + gf_boolean_t rpc_active_earlier = _gf_true; + gf_boolean_t rpc_active_now = _gf_true; + gf_boolean_t iniate_rpc = _gf_false; changelog_time_slice_t *slice = NULL; changelog_log_data_t cld = { 0, @@ -2412,9 +2456,6 @@ reconfigure(xlator_t *this, dict_t *options) char csnap_dir[PATH_MAX] = { 0, }; - struct timeval tv = { - 0, - }; uint32_t timeout = 0; priv = this->private; @@ -2423,14 +2464,15 @@ reconfigure(xlator_t *this, dict_t *options) ret = -1; active_earlier = priv->active; + rpc_active_earlier = priv->rpc_active; /* first stop the rollover and the fsync thread */ changelog_cleanup_helper_threads(this, priv); GF_OPTION_RECONF("changelog-dir", tmp, options, str, out); if (!tmp) { - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_DIR_OPTIONS_NOT_SET, - "\"changelog-dir\" option is not set"); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_DIR_OPTIONS_NOT_SET, + NULL); goto out; } @@ -2456,6 +2498,29 @@ reconfigure(xlator_t *this, dict_t *options) goto out; GF_OPTION_RECONF("changelog", active_now, options, bool, out); + GF_OPTION_RECONF("changelog-notification", rpc_active_now, options, bool, + out); + + /* If journalling is enabled, enable rpc notifications */ + if (active_now && !active_earlier) { + if (!rpc_active_earlier) + iniate_rpc = _gf_true; + } + + if (rpc_active_now && !rpc_active_earlier) { + iniate_rpc = _gf_true; + } + + /* TODO: Disable of changelog-notifications is not supported for now + * as there is no clean way of cleaning up of rpc resources + */ + + if (iniate_rpc) { + ret = changelog_init_rpc(this, priv); + if (ret) + goto out; + priv->rpc_active = _gf_true; + } /** * changelog_handle_change() handles changes that could possibly @@ -2482,9 +2547,7 @@ reconfigure(xlator_t *this, dict_t *options) out); if (active_now || active_earlier) { - ret = changelog_fill_rollover_data(&cld, !active_now); - if (ret) - goto out; + changelog_fill_rollover_data(&cld, !active_now); slice = &priv->slice; @@ -2501,15 +2564,9 @@ reconfigure(xlator_t *this, dict_t *options) if (active_now) { if (!active_earlier) { - gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_HTIME_INFO, - "Reconfigure: Changelog Enable"); - if (gettimeofday(&tv, NULL)) { - gf_msg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_HTIME_ERROR, "unable to fetch htime"); - ret = -1; - goto out; - } - htime_create(this, priv, tv.tv_sec); + gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_RECONFIGURE, + NULL); + htime_create(this, priv, gf_time()); } ret = changelog_spawn_helper_threads(this, priv); } @@ -2534,8 +2591,7 @@ changelog_freeup_options(xlator_t *this, changelog_priv_t *priv) ret = priv->cb->dtor(this, &priv->cd); if (ret) - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_FREEUP_FAILED, - "could not cleanup bootstrapper"); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_FREEUP_FAILED, NULL); GF_FREE(priv->changelog_brick); GF_FREE(priv->changelog_dir); } @@ -2587,6 +2643,7 @@ changelog_init_options(xlator_t *this, changelog_priv_t *priv) goto dealloc_2; GF_OPTION_INIT("changelog", priv->active, bool, dealloc_2); + GF_OPTION_INIT("changelog-notification", priv->rpc_active, bool, dealloc_2); GF_OPTION_INIT("capture-del-path", priv->capture_del_path, bool, dealloc_2); GF_OPTION_INIT("op-mode", tmp, str, dealloc_2); @@ -2625,20 +2682,6 @@ error_return: return -1; } -static void -changelog_cleanup_rpc(xlator_t *this, changelog_priv_t *priv) -{ - /* terminate rpc server */ - changelog_destroy_rpc_listner(this, priv); - - /* cleanup rot buffs */ - rbuf_dtor(priv->rbuf); - - /* cleanup poller thread */ - if (priv->poller) - (void)changelog_thread_cleanup(this, priv->poller); -} - static int changelog_init_rpc(xlator_t *this, changelog_priv_t *priv) { @@ -2679,14 +2722,14 @@ init(xlator_t *this) GF_VALIDATE_OR_GOTO("changelog", this, error_return); if (!this->children || this->children->next) { - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_CHILD_MISCONFIGURED, - "translator needs a single subvolume"); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_CHILD_MISCONFIGURED, + NULL); goto error_return; } if (!this->parents) { - gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_VOL_MISCONFIGURED, - "dangling volume. please check volfile"); + gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_VOL_MISCONFIGURED, + NULL); goto error_return; } @@ -2696,13 +2739,18 @@ init(xlator_t *this) this->local_pool = mem_pool_new(changelog_local_t, 64); if (!this->local_pool) { - gf_msg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY, - "failed to create local memory pool"); + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY, + NULL); goto cleanup_priv; } LOCK_INIT(&priv->lock); LOCK_INIT(&priv->c_snap_lock); + GF_ATOMIC_INIT(priv->listnercnt, 0); + GF_ATOMIC_INIT(priv->clntcnt, 0); + GF_ATOMIC_INIT(priv->xprtcnt, 0); + INIT_LIST_HEAD(&priv->xprt_list); + priv->htime_fd = -1; ret = changelog_init_options(this, priv); if (ret) @@ -2730,10 +2778,13 @@ init(xlator_t *this) INIT_LIST_HEAD(&priv->queue); priv->barrier_enabled = _gf_false; - /* RPC ball rolling.. */ - ret = changelog_init_rpc(this, priv); - if (ret) - goto cleanup_barrier; + if (priv->rpc_active || priv->active) { + /* RPC ball rolling.. */ + ret = changelog_init_rpc(this, priv); + if (ret) + goto cleanup_barrier; + priv->rpc_active = _gf_true; + } ret = changelog_init(this, priv); if (ret) @@ -2745,13 +2796,16 @@ init(xlator_t *this) return 0; cleanup_rpc: - changelog_cleanup_rpc(this, priv); + if (priv->rpc_active) { + changelog_cleanup_rpc(this, priv); + } cleanup_barrier: changelog_barrier_pthread_destroy(priv); cleanup_options: changelog_freeup_options(this, priv); cleanup_mempool: mem_pool_destroy(this->local_pool); + this->local_pool = NULL; cleanup_priv: GF_FREE(priv); error_return: @@ -2770,9 +2824,11 @@ fini(xlator_t *this) priv = this->private; if (priv) { - /* terminate RPC server/threads */ - changelog_cleanup_rpc(this, priv); - + if (priv->active || priv->rpc_active) { + /* terminate RPC server/threads */ + changelog_cleanup_rpc(this, priv); + GF_FREE(priv->ev_dispatcher); + } /* call barrier_disable to cancel timer */ if (priv->barrier_enabled) __chlog_barrier_disable(this, &queue); @@ -2841,6 +2897,13 @@ struct volume_options options[] = { .flags = OPT_FLAG_SETTABLE, .level = OPT_STATUS_BASIC, .tags = {"journal", "georep", "glusterfind"}}, + {.key = {"changelog-notification"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "off", + .description = "enable/disable changelog live notification", + .op_version = {3}, + .level = OPT_STATUS_BASIC, + .tags = {"bitrot", "georep"}}, {.key = {"changelog-brick"}, .type = GF_OPTION_TYPE_PATH, .description = "brick path to generate unique socket file name." @@ -2910,3 +2973,17 @@ struct volume_options options[] = { .tags = {"journal", "glusterfind"}}, {.key = {NULL}}, }; + +xlator_api_t xlator_api = { + .init = init, + .fini = fini, + .notify = notify, + .reconfigure = reconfigure, + .mem_acct_init = mem_acct_init, + .op_version = {1}, /* Present from the initial version */ + .fops = &fops, + .cbks = &cbks, + .options = options, + .identifier = "changelog", + .category = GF_MAINTAINED, +}; |
