summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--glusterfsd/src/glusterfsd-mgmt.c24
-rw-r--r--xlators/features/changelog/src/Makefile.am2
-rw-r--r--xlators/features/changelog/src/changelog-barrier.c70
-rw-r--r--xlators/features/changelog/src/changelog-helpers.c17
-rw-r--r--xlators/features/changelog/src/changelog-helpers.h17
-rw-r--r--xlators/features/changelog/src/changelog.c240
6 files changed, 334 insertions, 36 deletions
diff --git a/glusterfsd/src/glusterfsd-mgmt.c b/glusterfsd/src/glusterfsd-mgmt.c
index b1f00691c8c..0febaf20cbd 100644
--- a/glusterfsd/src/glusterfsd-mgmt.c
+++ b/glusterfsd/src/glusterfsd-mgmt.c
@@ -1219,6 +1219,8 @@ glusterfs_handle_barrier (rpcsvc_request_t *req)
xlator_t *old_THIS = NULL;
dict_t *dict = NULL;
char name[1024] = {0,};
+ gf_boolean_t barrier = _gf_true;
+ gf_boolean_t barrier_err = _gf_false;
GF_ASSERT (req);
@@ -1269,7 +1271,14 @@ glusterfs_handle_barrier (rpcsvc_request_t *req)
brick_rsp.op_ret = ret;
brick_rsp.op_errstr = gf_strdup ("Failed to reconfigure "
"barrier.");
- goto submit_reply;
+ /* This is to invoke changelog-barrier disable if barrier
+ * disable fails and don't invoke if barrier enable fails.
+ */
+ barrier = dict_get_str_boolean (dict, "barrier", _gf_true);
+ if (barrier)
+ goto submit_reply;
+ else
+ barrier_err = _gf_true;
}
/* Reset THIS so that we have it correct in case of an error below
@@ -1277,7 +1286,6 @@ glusterfs_handle_barrier (rpcsvc_request_t *req)
THIS = old_THIS;
/* Send barrier request to changelog as well */
- /* Commenting out the below code till the changelog changes are merged
memset (name, 0, sizeof (name));
snprintf (name, sizeof (name), "%s-changelog", brick_req.name);
@@ -1290,16 +1298,16 @@ glusterfs_handle_barrier (rpcsvc_request_t *req)
}
THIS = xlator;
- ret = xlator->reconfigure (xlator, dict);
-
-
+ ret = xlator->notify (xlator, GF_EVENT_TRANSLATOR_OP, dict);
if (ret) {
brick_rsp.op_ret = ret;
- brick_rsp.op_errstr = gf_strdup ("Failed to reconfigure "
- "changelog.");
+ brick_rsp.op_errstr = gf_strdup ("changelog notify failed");
goto submit_reply;
}
- */
+
+ if (barrier_err)
+ ret = -1;
+
submit_reply:
THIS = old_THIS;
diff --git a/xlators/features/changelog/src/Makefile.am b/xlators/features/changelog/src/Makefile.am
index c5d5e100e41..8e748f9dcde 100644
--- a/xlators/features/changelog/src/Makefile.am
+++ b/xlators/features/changelog/src/Makefile.am
@@ -8,7 +8,7 @@ noinst_HEADERS = changelog-helpers.h changelog-mem-types.h changelog-rt.h \
changelog_la_LDFLAGS = -module -avoid-version
changelog_la_SOURCES = changelog.c changelog-rt.c changelog-helpers.c \
- changelog-encoders.c changelog-notifier.c
+ changelog-encoders.c changelog-notifier.c changelog-barrier.c
changelog_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src -fPIC -D_FILE_OFFSET_BITS=64 \
diff --git a/xlators/features/changelog/src/changelog-barrier.c b/xlators/features/changelog/src/changelog-barrier.c
new file mode 100644
index 00000000000..c20eed85b1c
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-barrier.c
@@ -0,0 +1,70 @@
+/*
+ Copyright (c) 2014 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-helpers.h"
+#include "call-stub.h"
+
+/* Enqueue a stub*/
+void
+__chlog_barrier_enqueue (xlator_t *this, call_stub_t *stub)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+ GF_ASSERT (priv);
+
+ list_add_tail (&stub->list, &priv->queue);
+ priv->queue_size++;
+
+ return;
+}
+
+/* Dequeue a stub */
+call_stub_t *
+__chlog_barrier_dequeue (xlator_t *this, struct list_head *queue)
+{
+ call_stub_t *stub = NULL;
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+ GF_ASSERT (priv);
+
+ if (list_empty (queue))
+ goto out;
+
+ stub = list_entry (queue->next, call_stub_t, list);
+ list_del_init (&stub->list);
+
+out:
+ return stub;
+}
+
+/* Dequeue all the stubs and call corresponding resume functions */
+void
+chlog_barrier_dequeue_all (xlator_t *this, struct list_head *queue)
+{
+ call_stub_t *stub = NULL;
+
+ while ((stub = __chlog_barrier_dequeue (this, queue)))
+ call_resume (stub);
+
+ return;
+}
+
+/* Disable changelog barrier enable flag */
+void
+__chlog_barrier_disable (xlator_t *this, struct list_head *queue)
+{
+ changelog_priv_t *priv = this->private;
+
+ list_splice_init (&priv->queue, queue);
+ priv->queue_size = 0;
+ priv->barrier_enabled = _gf_false;
+}
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c
index c3661b9b76c..2d412e7b60d 100644
--- a/xlators/features/changelog/src/changelog-helpers.c
+++ b/xlators/features/changelog/src/changelog-helpers.c
@@ -988,9 +988,9 @@ changelog_barrier_notify (changelog_priv_t *priv, char *buf)
}
/* Clean up flags set on barrier notification */
-/*TODO: Add changelog barrier stop code with changelog barrier patch*/
inline void
-changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv)
+changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv,
+ struct list_head *queue)
{
int ret = 0;
@@ -1005,6 +1005,19 @@ changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv)
}
ret = pthread_mutex_unlock (&priv->bn.bnotify_mutex);
CHANGELOG_PTHREAD_ERROR_HANDLE_0 (ret, out);
+
+ /* Disable changelog barrier and dequeue fops */
+ LOCK (&priv->lock);
+ {
+ if (priv->barrier_enabled == _gf_true)
+ __chlog_barrier_disable (this, queue);
+ else
+ ret = -1;
+ }
+ UNLOCK (&priv->lock);
+ if (ret == 0)
+ chlog_barrier_dequeue_all(this, queue);
+
out:
return;
}
diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h
index 54577592c90..8e591e2f935 100644
--- a/xlators/features/changelog/src/changelog-helpers.h
+++ b/xlators/features/changelog/src/changelog-helpers.h
@@ -17,6 +17,7 @@
#include "iobuf.h"
#include "changelog-misc.h"
+#include "call-stub.h"
/**
* the changelog entry
@@ -186,7 +187,7 @@ typedef struct drain_mgmt {
gf_boolean_t drain_wait_white;
}drain_mgmt_t;
-/* Internal and External barrier on/off indicating flags */
+/* External barrier as a result of snap on/off indicating flag*/
typedef struct barrier_flags {
gf_lock_t lock;
gf_boolean_t barrier_ext;
@@ -265,6 +266,11 @@ struct changelog_priv {
/* barrier on/off indicating flags */
barrier_flags_t bflags;
+
+ /* changelog barrier on/off indicating flag */
+ gf_boolean_t barrier_enabled;
+ struct list_head queue;
+ uint32_t queue_size;
};
struct changelog_local {
@@ -401,12 +407,19 @@ changelog_dec_fop_cnt (xlator_t *this, changelog_priv_t *priv,
inline int
changelog_barrier_notify (changelog_priv_t *priv, char* buf);
inline void
-changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv);
+changelog_barrier_cleanup (xlator_t *this, changelog_priv_t *priv,
+ struct list_head *queue);
void
changelog_drain_white_fops (xlator_t *this, changelog_priv_t *priv);
void
changelog_drain_black_fops (xlator_t *this, changelog_priv_t *priv);
+/* Changelog barrier routines */
+void __chlog_barrier_enqueue (xlator_t *this, call_stub_t *stub);
+void __chlog_barrier_disable (xlator_t *this, struct list_head *queue);
+void chlog_barrier_dequeue_all (xlator_t *this, struct list_head *queue);
+call_stub_t *__chlog_barrier_dequeue (xlator_t *this, struct list_head *queue);
+
/* macros */
#define CHANGELOG_STACK_UNWIND(fop, frame, params ...) do { \
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c
index 0a491c5ac07..1253a1a6829 100644
--- a/xlators/features/changelog/src/changelog.c
+++ b/xlators/features/changelog/src/changelog.c
@@ -69,12 +69,33 @@ changelog_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int32_t
+changelog_rmdir_resume (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int xflags, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+
+ gf_log (this->name, GF_LOG_DEBUG, "Dequeue rmdir");
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
+ STACK_WIND (frame, changelog_rmdir_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->rmdir,
+ loc, xflags, xdata);
+ return 0;
+}
+
+int32_t
changelog_rmdir (call_frame_t *frame, xlator_t *this,
loc_t *loc, int xflags, dict_t *xdata)
{
- size_t xtra_len = 0;
- changelog_priv_t *priv = NULL;
- changelog_opt_t *co = NULL;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ call_stub_t *stub = NULL;
+ struct list_head queue = {0, };
+ gf_boolean_t barrier_enabled = _gf_false;
+
+ INIT_LIST_HEAD (&queue);
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind);
@@ -94,11 +115,49 @@ changelog_rmdir (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
+/* changelog barrier */
+ /* Color assignment and increment of fop_cnt for rmdir/unlink/rename
+ * should be made with in priv lock if changelog barrier is not enabled.
+ * Because if counter is not incremented yet, draining wakes up and
+ * publishes the changelog but later these fops might hit the disk and
+ * present in snapped volume but where as the intention is these fops
+ * should not be present in snapped volume.
+ */
+ LOCK (&priv->lock);
+ {
+ if ((barrier_enabled = priv->barrier_enabled)) {
+ stub = fop_rmdir_stub (frame, changelog_rmdir_resume,
+ loc, xflags, xdata);
+ if (!stub)
+ __chlog_barrier_disable (this, &queue);
+ else
+ __chlog_barrier_enqueue (this, stub);
+ } else {
+ ((changelog_local_t *)frame->local)->color
+ = priv->current_color;
+ changelog_inc_fop_cnt (this, priv, frame->local);
+ }
+ }
+ UNLOCK (&priv->lock);
+
+ if (barrier_enabled && stub) {
+ gf_log (this->name, GF_LOG_DEBUG, "Enqueue rmdir");
+ goto out;
+ }
+ if (barrier_enabled && !stub) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to barrier FOPs, disabling changelog barrier "
+ "FOP: rmdir, ERROR: %s", strerror (ENOMEM));
+ chlog_barrier_dequeue_all (this, &queue);
+ }
+
+/* changelog barrier */
+
wind:
- changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_rmdir_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->rmdir,
loc, xflags, xdata);
+ out:
return 0;
}
@@ -127,12 +186,33 @@ changelog_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int32_t
+changelog_unlink_resume (call_frame_t *frame, xlator_t *this,
+ loc_t *loc, int xflags, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+
+ gf_log (this->name, GF_LOG_DEBUG, "Dequeue unlink");
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
+ STACK_WIND (frame, changelog_unlink_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->unlink,
+ loc, xflags, xdata);
+ return 0;
+}
+
+int32_t
changelog_unlink (call_frame_t *frame, xlator_t *this,
loc_t *loc, int xflags, dict_t *xdata)
{
- size_t xtra_len = 0;
- changelog_priv_t *priv = NULL;
- changelog_opt_t *co = NULL;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ call_stub_t *stub = NULL;
+ struct list_head queue = {0, };
+ gf_boolean_t barrier_enabled = _gf_false;
+
+ INIT_LIST_HEAD (&queue);
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind);
@@ -152,11 +232,42 @@ changelog_unlink (call_frame_t *frame, xlator_t *this,
changelog_set_usable_record_and_length (frame->local, xtra_len, 2);
+/* changelog barrier */
+ LOCK (&priv->lock);
+ {
+ if ((barrier_enabled = priv->barrier_enabled)) {
+ stub = fop_unlink_stub (frame, changelog_unlink_resume,
+ loc, xflags, xdata);
+ if (!stub)
+ __chlog_barrier_disable (this, &queue);
+ else
+ __chlog_barrier_enqueue (this, stub);
+ } else {
+ ((changelog_local_t *)frame->local)->color
+ = priv->current_color;
+ changelog_inc_fop_cnt (this, priv, frame->local);
+ }
+ }
+ UNLOCK (&priv->lock);
+
+ if (barrier_enabled && stub) {
+ gf_log (this->name, GF_LOG_DEBUG, "Enqueue unlink");
+ goto out;
+ }
+ if (barrier_enabled && !stub) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to barrier FOPs, disabling changelog barrier "
+ "FOP: unlink, ERROR: %s", strerror (ENOMEM));
+ chlog_barrier_dequeue_all (this, &queue);
+ }
+
+/* changelog barrier */
+
wind:
- changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_unlink_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->unlink,
loc, xflags, xdata);
+ out:
return 0;
}
@@ -190,12 +301,33 @@ changelog_rename_cbk (call_frame_t *frame,
int32_t
+changelog_rename_resume (call_frame_t *frame, xlator_t *this,
+ loc_t *oldloc, loc_t *newloc, dict_t *xdata)
+{
+ changelog_priv_t *priv = NULL;
+
+ priv = this->private;
+
+ gf_log (this->name, GF_LOG_DEBUG, "Dequeue rename");
+ changelog_color_fop_and_inc_cnt (this, priv, frame->local);
+ STACK_WIND (frame, changelog_rename_cbk,
+ FIRST_CHILD (this), FIRST_CHILD (this)->fops->rename,
+ oldloc, newloc, xdata);
+ return 0;
+}
+
+int32_t
changelog_rename (call_frame_t *frame, xlator_t *this,
loc_t *oldloc, loc_t *newloc, dict_t *xdata)
{
- size_t xtra_len = 0;
- changelog_priv_t *priv = NULL;
- changelog_opt_t *co = NULL;
+ size_t xtra_len = 0;
+ changelog_priv_t *priv = NULL;
+ changelog_opt_t *co = NULL;
+ call_stub_t *stub = NULL;
+ struct list_head queue = {0, };
+ gf_boolean_t barrier_enabled = _gf_false;
+
+ INIT_LIST_HEAD (&queue);
priv = this->private;
CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind);
@@ -219,12 +351,41 @@ changelog_rename (call_frame_t *frame, xlator_t *this,
entry_fn, entry_free_fn, xtra_len, wind);
changelog_set_usable_record_and_length (frame->local, xtra_len, 3);
+/* changelog barrier */
+ LOCK (&priv->lock);
+ {
+ if ((barrier_enabled = priv->barrier_enabled)) {
+ stub = fop_rename_stub (frame, changelog_rename_resume,
+ oldloc, newloc, xdata);
+ if (!stub)
+ __chlog_barrier_disable (this, &queue);
+ else
+ __chlog_barrier_enqueue (this, stub);
+ } else {
+ ((changelog_local_t *)frame->local)->color
+ = priv->current_color;
+ changelog_inc_fop_cnt (this, priv, frame->local);
+ }
+ }
+ UNLOCK (&priv->lock);
+
+ if (barrier_enabled && stub) {
+ gf_log (this->name, GF_LOG_DEBUG, "Enqueue rename");
+ goto out;
+ }
+ if (barrier_enabled && !stub) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to barrier FOPs, disabling changelog barrier "
+ "FOP: rename, ERROR: %s", strerror (ENOMEM));
+ chlog_barrier_dequeue_all (this, &queue);
+ }
+/* changelog barrier */
wind:
- changelog_color_fop_and_inc_cnt (this, priv, frame->local);
STACK_WIND (frame, changelog_rename_cbk,
FIRST_CHILD (this), FIRST_CHILD (this)->fops->rename,
oldloc, newloc, xdata);
+ out:
return 0;
}
@@ -1265,12 +1426,15 @@ changelog_spawn_notifier (xlator_t *this, changelog_priv_t *priv)
int
notify (xlator_t *this, int event, void *data, ...)
{
- changelog_priv_t *priv = NULL;
- dict_t *dict = NULL;
- char buf[1] = {1};
- int barrier = DICT_DEFAULT;
- gf_boolean_t bclean_req = _gf_false;
- int ret = 0;
+ changelog_priv_t *priv = NULL;
+ dict_t *dict = NULL;
+ char buf[1] = {1};
+ int barrier = DICT_DEFAULT;
+ gf_boolean_t bclean_req = _gf_false;
+ int ret = 0;
+ struct list_head queue = {0, };
+
+ INIT_LIST_HEAD (&queue);
priv = this->private;
if (!priv)
@@ -1311,14 +1475,33 @@ notify (xlator_t *this, int event, void *data, ...)
goto out;
}
- /*TODO: STOP CHANGELOG BARRIER */
+ /* Stop changelog barrier and dequeue all fops */
+ LOCK (&priv->lock);
+ {
+ if (priv->barrier_enabled == _gf_true)
+ __chlog_barrier_disable (this, &queue);
+ else
+ ret = -1;
+ }
+ UNLOCK (&priv->lock);
+ /* If ret = -1, then changelog barrier is already
+ * disabled because of error or timeout.
+ */
+ if (ret == 0) {
+ chlog_barrier_dequeue_all(this, &queue);
+ gf_log(this->name, GF_LOG_DEBUG,
+ "Disabled changelog barrier");
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Changelog barrier already disabled");
+ }
+
LOCK (&priv->bflags.lock);
{
priv->bflags.barrier_ext = _gf_false;
}
UNLOCK (&priv->bflags.lock);
- ret = 0;
goto out;
case BARRIER_ON:
@@ -1353,13 +1536,20 @@ notify (xlator_t *this, int event, void *data, ...)
CHANGELOG_PTHREAD_ERROR_HANDLE_1 (ret, out,
bclean_req);
- /*TODO: START CHANGELOG BARRIER */
+ /* Start changelog barrier */
+ LOCK (&priv->lock);
+ {
+ priv->barrier_enabled = _gf_true;
+ }
+ UNLOCK (&priv->lock);
+ gf_log(this->name, GF_LOG_DEBUG,
+ "Enabled changelog barrier");
ret = changelog_barrier_notify(priv, buf);
if (ret) {
gf_log (this->name, GF_LOG_ERROR,
"Explicit roll over: write failed");
- changelog_barrier_cleanup (this, priv);
+ changelog_barrier_cleanup (this, priv, &queue);
ret = -1;
goto out;
}
@@ -1406,7 +1596,7 @@ notify (xlator_t *this, int event, void *data, ...)
out:
if (bclean_req)
- changelog_barrier_cleanup (this, priv);
+ changelog_barrier_cleanup (this, priv, &queue);
return ret;
}
@@ -1787,6 +1977,10 @@ init (xlator_t *this)
cond_lock_init = _gf_true;
priv->bflags.barrier_ext = _gf_false;
+ /* Changelog barrier init */
+ INIT_LIST_HEAD (&priv->queue);
+ priv->barrier_enabled = _gf_false;
+
ret = changelog_init (this, priv);
if (ret)
goto out;