summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/src/changelog.c')
-rw-r--r--xlators/features/changelog/src/changelog.c373
1 files changed, 225 insertions, 148 deletions
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c
index 35a523316ed..6a6e5af859e 100644
--- a/xlators/features/changelog/src/changelog.c
+++ b/xlators/features/changelog/src/changelog.c
@@ -8,11 +8,11 @@
cases as published by the Free Software Foundation.
*/
-#include "xlator.h"
-#include "defaults.h"
-#include "syscall.h"
-#include "logging.h"
-#include "iobuf.h"
+#include <glusterfs/xlator.h>
+#include <glusterfs/defaults.h>
+#include <glusterfs/syscall.h>
+#include <glusterfs/logging.h>
+#include <glusterfs/iobuf.h>
#include "changelog-rt.h"
@@ -34,6 +34,12 @@ static struct changelog_bootstrap cb_bootstrap[] = {
},
};
+static int
+changelog_init_rpc(xlator_t *this, changelog_priv_t *priv);
+
+static int
+changelog_init(xlator_t *this, changelog_priv_t *priv);
+
/* Entry operations - TYPE III */
/**
@@ -149,9 +155,8 @@ changelog_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags,
goto out;
}
if (barrier_enabled && !stub) {
- gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
- "Failed to barrier FOPs, disabling changelog barrier",
- "fop=rmdir", NULL);
+ gf_smsg(this->name, GF_LOG_ERROR, ENOMEM,
+ CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=rmdir", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
@@ -298,9 +303,8 @@ changelog_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags,
goto out;
}
if (barrier_enabled && !stub) {
- gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
- "Failed to barrier FOPs, disabling changelog barrier",
- "fop=unlink", NULL);
+ gf_smsg(this->name, GF_LOG_ERROR, ENOMEM,
+ CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=unlink", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
@@ -418,9 +422,8 @@ changelog_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
goto out;
}
if (barrier_enabled && !stub) {
- gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
- "Failed to barrier FOPs, disabling changelog barrier",
- "fop=rename", NULL);
+ gf_smsg(this->name, GF_LOG_ERROR, ENOMEM,
+ CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=rename", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
/* changelog barrier */
@@ -531,8 +534,7 @@ changelog_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
}
if (barrier_enabled && !stub) {
- gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_NO_MEMORY,
- "Failed to barrier FOPs, disabling changelog barrier",
+ gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_BARRIER_FOP_FAILED,
"fop=link", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
@@ -660,9 +662,8 @@ changelog_mkdir(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
}
if (barrier_enabled && !stub) {
- gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
- "Failed to barrier FOPs, disabling changelog barrier",
- "fop=mkdir", NULL);
+ gf_smsg(this->name, GF_LOG_ERROR, ENOMEM,
+ CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=mkdir", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
@@ -782,9 +783,8 @@ changelog_symlink(call_frame_t *frame, xlator_t *this, const char *linkname,
}
if (barrier_enabled && !stub) {
- gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
- "Failed to barrier FOPs, disabling changelog barrier",
- "fop=symlink", NULL);
+ gf_smsg(this->name, GF_LOG_ERROR, ENOMEM,
+ CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=symlink", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
@@ -929,9 +929,8 @@ changelog_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
}
if (barrier_enabled && !stub) {
- gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
- "Failed to barrier FOPs, disabling changelog barrier",
- "fop=mknod", NULL);
+ gf_smsg(this->name, GF_LOG_ERROR, ENOMEM,
+ CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=mknod", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
@@ -972,8 +971,8 @@ changelog_create_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
CHANGELOG_OP_TYPE_RELEASE)) {
ret = fd_ctx_set(fd, this, (uint64_t)(long)0x1);
if (ret)
- gf_msg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_SET_FD_CONTEXT,
- "could not set fd context (for release cbk)");
+ gf_smsg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_SET_FD_CONTEXT,
+ NULL);
}
changelog_update(this, priv, local, CHANGELOG_TYPE_ENTRY);
@@ -1083,9 +1082,8 @@ changelog_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
}
if (barrier_enabled && !stub) {
- gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
- "Failed to barrier FOPs, disabling changelog barrier",
- "fop=create", NULL);
+ gf_smsg(this->name, GF_LOG_ERROR, ENOMEM,
+ CHANGELOG_MSG_BARRIER_FOP_FAILED, "fop=create", NULL);
chlog_barrier_dequeue_all(this, &queue);
}
@@ -1388,9 +1386,6 @@ changelog_handle_virtual_xattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
ret = changelog_fill_entry_buf(frame, this, loc, &local);
if (ret) {
gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_ENTRY_BUF_INFO,
- "Entry cannot be"
- " captured for gfid, Capturing DATA"
- " entry.",
"gfid=%s", uuid_utoa(loc->inode->gfid), NULL);
goto unwind;
}
@@ -1806,8 +1801,8 @@ changelog_open_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
CHANGELOG_OP_TYPE_RELEASE)) {
ret = fd_ctx_set(fd, this, (uint64_t)(long)0x1);
if (ret)
- gf_msg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_SET_FD_CONTEXT,
- "could not set fd context (for release cbk)");
+ gf_smsg(this->name, GF_LOG_WARNING, 0, CHANGELOG_MSG_SET_FD_CONTEXT,
+ NULL);
}
unwind:
@@ -2004,6 +1999,15 @@ notify(xlator_t *this, int event, void *data, ...)
struct list_head queue = {
0,
};
+ uint64_t xprtcnt = 0;
+ uint64_t clntcnt = 0;
+ changelog_clnt_t *conn = NULL;
+ gf_boolean_t cleanup_notify = _gf_false;
+ char sockfile[UNIX_PATH_MAX] = {
+ 0,
+ };
+ rpcsvc_listener_t *listener = NULL;
+ rpcsvc_listener_t *next = NULL;
INIT_LIST_HEAD(&queue);
@@ -2011,6 +2015,50 @@ notify(xlator_t *this, int event, void *data, ...)
if (!priv)
goto out;
+ if (event == GF_EVENT_PARENT_DOWN) {
+ priv->victim = data;
+ gf_log(this->name, GF_LOG_INFO,
+ "cleanup changelog rpc connection of brick %s",
+ priv->victim->name);
+
+ if (priv->rpc_active) {
+ this->cleanup_starting = 1;
+ changelog_destroy_rpc_listner(this, priv);
+ conn = &priv->connections;
+ if (conn)
+ changelog_ev_cleanup_connections(this, conn);
+ xprtcnt = GF_ATOMIC_GET(priv->xprtcnt);
+ clntcnt = GF_ATOMIC_GET(priv->clntcnt);
+ if (!xprtcnt && !clntcnt) {
+ LOCK(&priv->lock);
+ {
+ cleanup_notify = priv->notify_down;
+ priv->notify_down = _gf_true;
+ }
+ UNLOCK(&priv->lock);
+ if (priv->rpc) {
+ list_for_each_entry_safe(listener, next,
+ &priv->rpc->listeners, list)
+ {
+ if (listener->trans) {
+ rpc_transport_unref(listener->trans);
+ }
+ }
+ rpcsvc_destroy(priv->rpc);
+ priv->rpc = NULL;
+ }
+ CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile,
+ UNIX_PATH_MAX);
+ sys_unlink(sockfile);
+ if (!cleanup_notify)
+ default_notify(this, GF_EVENT_PARENT_DOWN, data);
+ }
+ } else {
+ default_notify(this, GF_EVENT_PARENT_DOWN, data);
+ }
+ goto out;
+ }
+
if (event == GF_EVENT_TRANSLATOR_OP) {
dict = data;
@@ -2018,15 +2066,15 @@ notify(xlator_t *this, int event, void *data, ...)
switch (barrier) {
case DICT_ERROR:
- gf_msg(this->name, GF_LOG_ERROR, 0,
- CHANGELOG_MSG_DICT_GET_FAILED,
- "Barrier dict_get_str_boolean failed");
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_MSG_DICT_GET_FAILED, "dict_get_str_boolean",
+ NULL);
ret = -1;
goto out;
case BARRIER_OFF:
- gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_INFO,
- "Barrier off notification");
+ gf_smsg(this->name, GF_LOG_INFO, 0,
+ CHANGELOG_MSG_BARRIER_STATE_NOTIFY, "off", NULL);
CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out);
LOCK(&priv->c_snap_lock);
@@ -2043,10 +2091,8 @@ notify(xlator_t *this, int event, void *data, ...)
UNLOCK(&priv->bflags.lock);
if (ret == -1) {
- gf_msg(this->name, GF_LOG_ERROR, 0,
- CHANGELOG_MSG_BARRIER_ERROR,
- "Received another barrier off"
- " notification while already off");
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_MSG_BARRIER_ERROR, NULL);
goto out;
}
@@ -2064,13 +2110,11 @@ notify(xlator_t *this, int event, void *data, ...)
*/
if (ret == 0) {
chlog_barrier_dequeue_all(this, &queue);
- gf_msg(this->name, GF_LOG_INFO, 0,
- CHANGELOG_MSG_BARRIER_INFO,
- "Disabled changelog barrier");
+ gf_smsg(this->name, GF_LOG_INFO, 0,
+ CHANGELOG_MSG_BARRIER_DISABLED, NULL);
} else {
- gf_msg(this->name, GF_LOG_ERROR, 0,
- CHANGELOG_MSG_BARRIER_ERROR,
- "Changelog barrier already disabled");
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_MSG_BARRIER_ALREADY_DISABLED, NULL);
}
LOCK(&priv->bflags.lock);
@@ -2082,8 +2126,8 @@ notify(xlator_t *this, int event, void *data, ...)
goto out;
case BARRIER_ON:
- gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_INFO,
- "Barrier on notification");
+ gf_smsg(this->name, GF_LOG_INFO, 0,
+ CHANGELOG_MSG_BARRIER_STATE_NOTIFY, "on", NULL);
CHANGELOG_NOT_ON_THEN_GOTO(priv, ret, out);
LOCK(&priv->c_snap_lock);
@@ -2102,11 +2146,8 @@ notify(xlator_t *this, int event, void *data, ...)
UNLOCK(&priv->bflags.lock);
if (ret == -1) {
- gf_msg(this->name, GF_LOG_ERROR, 0,
- CHANGELOG_MSG_BARRIER_ERROR,
- "Received another barrier on"
- "notification when last one is"
- "not served yet");
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_MSG_BARRIER_ON_ERROR, NULL);
goto out;
}
@@ -2129,14 +2170,14 @@ notify(xlator_t *this, int event, void *data, ...)
goto out;
}
- gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BARRIER_INFO,
- "Enabled changelog barrier");
+ gf_smsg(this->name, GF_LOG_INFO, 0,
+ CHANGELOG_MSG_BARRIER_ENABLE, NULL);
ret = changelog_barrier_notify(priv, buf);
if (ret) {
- gf_msg(this->name, GF_LOG_ERROR, 0,
- CHANGELOG_MSG_WRITE_FAILED,
- "Explicit roll over: write failed");
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_MSG_WRITE_FAILED, "Explicit roll over",
+ NULL);
changelog_barrier_cleanup(this, priv, &queue);
ret = -1;
goto out;
@@ -2160,21 +2201,20 @@ notify(xlator_t *this, int event, void *data, ...)
}
ret1 = pthread_mutex_unlock(&priv->bn.bnotify_mutex);
CHANGELOG_PTHREAD_ERROR_HANDLE_1(ret1, out, bclean_req);
- gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_BNOTIFY_INFO,
- "Woke up: bnotify conditional wait");
+ gf_smsg(this->name, GF_LOG_INFO, 0,
+ CHANGELOG_MSG_BNOTIFY_COND_INFO, NULL);
goto out;
case DICT_DEFAULT:
- gf_msg(this->name, GF_LOG_ERROR, 0,
- CHANGELOG_MSG_DICT_GET_FAILED, "barrier key not found");
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ CHANGELOG_MSG_BARRIER_KEY_NOT_FOUND, NULL);
ret = -1;
goto out;
default:
- gf_msg(this->name, GF_LOG_ERROR, EINVAL,
- CHANGELOG_MSG_DICT_GET_FAILED,
- "Something went bad in dict_get_str_boolean");
+ gf_smsg(this->name, GF_LOG_ERROR, EINVAL,
+ CHANGELOG_MSG_ERROR_IN_DICT_GET, NULL);
ret = -1;
goto out;
}
@@ -2200,9 +2240,8 @@ mem_acct_init(xlator_t *this)
ret = xlator_mem_acct_init(this, gf_changelog_mt_end + 1);
if (ret != 0) {
- gf_msg(this->name, GF_LOG_WARNING, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
- "Memory accounting"
- " init failed");
+ gf_smsg(this->name, GF_LOG_WARNING, ENOMEM,
+ CHANGELOG_MSG_MEMORY_INIT_FAILED, NULL);
return ret;
}
@@ -2213,23 +2252,11 @@ static int
changelog_init(xlator_t *this, changelog_priv_t *priv)
{
int i = 0;
- int ret = -1;
- struct timeval tv = {
- 0,
- };
+ int ret = 0;
changelog_log_data_t cld = {
0,
};
- ret = gettimeofday(&tv, NULL);
- if (ret) {
- gf_msg(this->name, GF_LOG_ERROR, errno,
- CHANGELOG_MSG_GET_TIME_OP_FAILED, "gettimeofday() failure");
- goto out;
- }
-
- priv->slice.tv_start = tv;
-
priv->maps[CHANGELOG_TYPE_DATA] = "D ";
priv->maps[CHANGELOG_TYPE_METADATA] = "M ";
priv->maps[CHANGELOG_TYPE_METADATA_XATTR] = "M ";
@@ -2248,9 +2275,7 @@ changelog_init(xlator_t *this, changelog_priv_t *priv)
* in case there was an encoding change. so... things are kept
* simple here.
*/
- ret = changelog_fill_rollover_data(&cld, _gf_false);
- if (ret)
- goto out;
+ changelog_fill_rollover_data(&cld, _gf_false);
ret = htime_open(this, priv, cld.cld_roll_time);
/* call htime open with cld's rollover_time */
@@ -2288,8 +2313,8 @@ changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv)
if ((ret = pthread_mutex_init(&priv->bn.bnotify_mutex, NULL)) != 0) {
gf_smsg(this->name, GF_LOG_ERROR, errno,
- CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED,
- "bnotify pthread_mutex_init failed", "ret=%d", ret, NULL);
+ CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED, "name=bnotify",
+ "ret=%d", ret, NULL);
ret = -1;
goto out;
}
@@ -2297,8 +2322,8 @@ changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv)
if ((ret = pthread_cond_init(&priv->bn.bnotify_cond, NULL)) != 0) {
gf_smsg(this->name, GF_LOG_ERROR, errno,
- CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED,
- "bnotify pthread_cond_init failed", "ret=%d", ret, NULL);
+ CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED, "name=bnotify",
+ "ret=%d", ret, NULL);
ret = -1;
goto out;
}
@@ -2306,8 +2331,8 @@ changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv)
if ((ret = pthread_mutex_init(&priv->dm.drain_black_mutex, NULL)) != 0) {
gf_smsg(this->name, GF_LOG_ERROR, errno,
- CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED,
- "drain_black pthread_mutex_init failed", "ret=%d", ret, NULL);
+ CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED, "name=drain_black",
+ "ret=%d", ret, NULL);
ret = -1;
goto out;
}
@@ -2315,8 +2340,8 @@ changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv)
if ((ret = pthread_cond_init(&priv->dm.drain_black_cond, NULL)) != 0) {
gf_smsg(this->name, GF_LOG_ERROR, errno,
- CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED,
- "drain_black pthread_cond_init failed", "ret=%d", ret, NULL);
+ CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED, "name=drain_black",
+ "ret=%d", ret, NULL);
ret = -1;
goto out;
}
@@ -2324,8 +2349,8 @@ changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv)
if ((ret = pthread_mutex_init(&priv->dm.drain_white_mutex, NULL)) != 0) {
gf_smsg(this->name, GF_LOG_ERROR, errno,
- CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED,
- "drain_white pthread_mutex_init failed", "ret=%d", ret, NULL);
+ CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED, "name=drain_white",
+ "ret=%d", ret, NULL);
ret = -1;
goto out;
}
@@ -2333,8 +2358,8 @@ changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv)
if ((ret = pthread_cond_init(&priv->dm.drain_white_cond, NULL)) != 0) {
gf_smsg(this->name, GF_LOG_ERROR, errno,
- CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED,
- "drain_white pthread_cond_init failed", "ret=%d", ret, NULL);
+ CHANGELOG_MSG_PTHREAD_COND_INIT_FAILED, "name=drain_white",
+ "ret=%d", ret, NULL);
ret = -1;
goto out;
}
@@ -2343,7 +2368,7 @@ changelog_barrier_pthread_init(xlator_t *this, changelog_priv_t *priv)
if ((pthread_mutex_init(&priv->cr.lock, NULL)) != 0) {
gf_smsg(this->name, GF_LOG_ERROR, errno,
CHANGELOG_MSG_PTHREAD_MUTEX_INIT_FAILED,
- "changelog_rollover lock init failed", "ret=%d", ret, NULL);
+ "name=changelog_rollover", "ret=%d", ret, NULL);
ret = -1;
goto out;
}
@@ -2394,6 +2419,22 @@ changelog_barrier_pthread_destroy(changelog_priv_t *priv)
LOCK_DESTROY(&priv->bflags.lock);
}
+static void
+changelog_cleanup_rpc(xlator_t *this, changelog_priv_t *priv)
+{
+ /* terminate rpc server */
+ if (!this->cleanup_starting)
+ changelog_destroy_rpc_listner(this, priv);
+
+ (void)changelog_cleanup_rpc_threads(this, priv);
+ /* cleanup rot buffs */
+ rbuf_dtor(priv->rbuf);
+
+ /* cleanup poller thread */
+ if (priv->poller)
+ (void)changelog_thread_cleanup(this, priv->poller);
+}
+
int
reconfigure(xlator_t *this, dict_t *options)
{
@@ -2402,6 +2443,9 @@ reconfigure(xlator_t *this, dict_t *options)
changelog_priv_t *priv = NULL;
gf_boolean_t active_earlier = _gf_true;
gf_boolean_t active_now = _gf_true;
+ gf_boolean_t rpc_active_earlier = _gf_true;
+ gf_boolean_t rpc_active_now = _gf_true;
+ gf_boolean_t iniate_rpc = _gf_false;
changelog_time_slice_t *slice = NULL;
changelog_log_data_t cld = {
0,
@@ -2412,9 +2456,6 @@ reconfigure(xlator_t *this, dict_t *options)
char csnap_dir[PATH_MAX] = {
0,
};
- struct timeval tv = {
- 0,
- };
uint32_t timeout = 0;
priv = this->private;
@@ -2423,14 +2464,15 @@ reconfigure(xlator_t *this, dict_t *options)
ret = -1;
active_earlier = priv->active;
+ rpc_active_earlier = priv->rpc_active;
/* first stop the rollover and the fsync thread */
changelog_cleanup_helper_threads(this, priv);
GF_OPTION_RECONF("changelog-dir", tmp, options, str, out);
if (!tmp) {
- gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_DIR_OPTIONS_NOT_SET,
- "\"changelog-dir\" option is not set");
+ gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_DIR_OPTIONS_NOT_SET,
+ NULL);
goto out;
}
@@ -2456,6 +2498,29 @@ reconfigure(xlator_t *this, dict_t *options)
goto out;
GF_OPTION_RECONF("changelog", active_now, options, bool, out);
+ GF_OPTION_RECONF("changelog-notification", rpc_active_now, options, bool,
+ out);
+
+ /* If journalling is enabled, enable rpc notifications */
+ if (active_now && !active_earlier) {
+ if (!rpc_active_earlier)
+ iniate_rpc = _gf_true;
+ }
+
+ if (rpc_active_now && !rpc_active_earlier) {
+ iniate_rpc = _gf_true;
+ }
+
+ /* TODO: Disable of changelog-notifications is not supported for now
+ * as there is no clean way of cleaning up of rpc resources
+ */
+
+ if (iniate_rpc) {
+ ret = changelog_init_rpc(this, priv);
+ if (ret)
+ goto out;
+ priv->rpc_active = _gf_true;
+ }
/**
* changelog_handle_change() handles changes that could possibly
@@ -2482,9 +2547,7 @@ reconfigure(xlator_t *this, dict_t *options)
out);
if (active_now || active_earlier) {
- ret = changelog_fill_rollover_data(&cld, !active_now);
- if (ret)
- goto out;
+ changelog_fill_rollover_data(&cld, !active_now);
slice = &priv->slice;
@@ -2501,15 +2564,9 @@ reconfigure(xlator_t *this, dict_t *options)
if (active_now) {
if (!active_earlier) {
- gf_msg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_HTIME_INFO,
- "Reconfigure: Changelog Enable");
- if (gettimeofday(&tv, NULL)) {
- gf_msg(this->name, GF_LOG_ERROR, 0,
- CHANGELOG_MSG_HTIME_ERROR, "unable to fetch htime");
- ret = -1;
- goto out;
- }
- htime_create(this, priv, tv.tv_sec);
+ gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_RECONFIGURE,
+ NULL);
+ htime_create(this, priv, gf_time());
}
ret = changelog_spawn_helper_threads(this, priv);
}
@@ -2534,8 +2591,7 @@ changelog_freeup_options(xlator_t *this, changelog_priv_t *priv)
ret = priv->cb->dtor(this, &priv->cd);
if (ret)
- gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_FREEUP_FAILED,
- "could not cleanup bootstrapper");
+ gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_FREEUP_FAILED, NULL);
GF_FREE(priv->changelog_brick);
GF_FREE(priv->changelog_dir);
}
@@ -2587,6 +2643,7 @@ changelog_init_options(xlator_t *this, changelog_priv_t *priv)
goto dealloc_2;
GF_OPTION_INIT("changelog", priv->active, bool, dealloc_2);
+ GF_OPTION_INIT("changelog-notification", priv->rpc_active, bool, dealloc_2);
GF_OPTION_INIT("capture-del-path", priv->capture_del_path, bool, dealloc_2);
GF_OPTION_INIT("op-mode", tmp, str, dealloc_2);
@@ -2625,20 +2682,6 @@ error_return:
return -1;
}
-static void
-changelog_cleanup_rpc(xlator_t *this, changelog_priv_t *priv)
-{
- /* terminate rpc server */
- changelog_destroy_rpc_listner(this, priv);
-
- /* cleanup rot buffs */
- rbuf_dtor(priv->rbuf);
-
- /* cleanup poller thread */
- if (priv->poller)
- (void)changelog_thread_cleanup(this, priv->poller);
-}
-
static int
changelog_init_rpc(xlator_t *this, changelog_priv_t *priv)
{
@@ -2679,14 +2722,14 @@ init(xlator_t *this)
GF_VALIDATE_OR_GOTO("changelog", this, error_return);
if (!this->children || this->children->next) {
- gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_CHILD_MISCONFIGURED,
- "translator needs a single subvolume");
+ gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_CHILD_MISCONFIGURED,
+ NULL);
goto error_return;
}
if (!this->parents) {
- gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_VOL_MISCONFIGURED,
- "dangling volume. please check volfile");
+ gf_smsg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_VOL_MISCONFIGURED,
+ NULL);
goto error_return;
}
@@ -2696,13 +2739,18 @@ init(xlator_t *this)
this->local_pool = mem_pool_new(changelog_local_t, 64);
if (!this->local_pool) {
- gf_msg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
- "failed to create local memory pool");
+ gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, CHANGELOG_MSG_NO_MEMORY,
+ NULL);
goto cleanup_priv;
}
LOCK_INIT(&priv->lock);
LOCK_INIT(&priv->c_snap_lock);
+ GF_ATOMIC_INIT(priv->listnercnt, 0);
+ GF_ATOMIC_INIT(priv->clntcnt, 0);
+ GF_ATOMIC_INIT(priv->xprtcnt, 0);
+ INIT_LIST_HEAD(&priv->xprt_list);
+ priv->htime_fd = -1;
ret = changelog_init_options(this, priv);
if (ret)
@@ -2730,10 +2778,13 @@ init(xlator_t *this)
INIT_LIST_HEAD(&priv->queue);
priv->barrier_enabled = _gf_false;
- /* RPC ball rolling.. */
- ret = changelog_init_rpc(this, priv);
- if (ret)
- goto cleanup_barrier;
+ if (priv->rpc_active || priv->active) {
+ /* RPC ball rolling.. */
+ ret = changelog_init_rpc(this, priv);
+ if (ret)
+ goto cleanup_barrier;
+ priv->rpc_active = _gf_true;
+ }
ret = changelog_init(this, priv);
if (ret)
@@ -2745,13 +2796,16 @@ init(xlator_t *this)
return 0;
cleanup_rpc:
- changelog_cleanup_rpc(this, priv);
+ if (priv->rpc_active) {
+ changelog_cleanup_rpc(this, priv);
+ }
cleanup_barrier:
changelog_barrier_pthread_destroy(priv);
cleanup_options:
changelog_freeup_options(this, priv);
cleanup_mempool:
mem_pool_destroy(this->local_pool);
+ this->local_pool = NULL;
cleanup_priv:
GF_FREE(priv);
error_return:
@@ -2770,9 +2824,11 @@ fini(xlator_t *this)
priv = this->private;
if (priv) {
- /* terminate RPC server/threads */
- changelog_cleanup_rpc(this, priv);
-
+ if (priv->active || priv->rpc_active) {
+ /* terminate RPC server/threads */
+ changelog_cleanup_rpc(this, priv);
+ GF_FREE(priv->ev_dispatcher);
+ }
/* call barrier_disable to cancel timer */
if (priv->barrier_enabled)
__chlog_barrier_disable(this, &queue);
@@ -2841,6 +2897,13 @@ struct volume_options options[] = {
.flags = OPT_FLAG_SETTABLE,
.level = OPT_STATUS_BASIC,
.tags = {"journal", "georep", "glusterfind"}},
+ {.key = {"changelog-notification"},
+ .type = GF_OPTION_TYPE_BOOL,
+ .default_value = "off",
+ .description = "enable/disable changelog live notification",
+ .op_version = {3},
+ .level = OPT_STATUS_BASIC,
+ .tags = {"bitrot", "georep"}},
{.key = {"changelog-brick"},
.type = GF_OPTION_TYPE_PATH,
.description = "brick path to generate unique socket file name."
@@ -2910,3 +2973,17 @@ struct volume_options options[] = {
.tags = {"journal", "glusterfind"}},
{.key = {NULL}},
};
+
+xlator_api_t xlator_api = {
+ .init = init,
+ .fini = fini,
+ .notify = notify,
+ .reconfigure = reconfigure,
+ .mem_acct_init = mem_acct_init,
+ .op_version = {1}, /* Present from the initial version */
+ .fops = &fops,
+ .cbks = &cbks,
+ .options = options,
+ .identifier = "changelog",
+ .category = GF_MAINTAINED,
+};