summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/nsr-server/src/recon_notify.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/nsr-server/src/recon_notify.c')
-rw-r--r--xlators/cluster/nsr-server/src/recon_notify.c389
1 files changed, 389 insertions, 0 deletions
diff --git a/xlators/cluster/nsr-server/src/recon_notify.c b/xlators/cluster/nsr-server/src/recon_notify.c
new file mode 100644
index 000000000..1c50de234
--- /dev/null
+++ b/xlators/cluster/nsr-server/src/recon_notify.c
@@ -0,0 +1,389 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <string.h>
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "call-stub.h"
+#include "defaults.h"
+#include "xlator.h"
+#include "api/src/glfs.h"
+#include "api/src/glfs-internal.h"
+#include "etcd-api.h"
+#include "nsr-internal.h"
+#include "../../nsr-recon/src/recon_driver.h"
+#include "../../nsr-recon/src/recon_xlator.h"
+
+
+
+typedef struct _nsr_recon_notify_ctx_t {
+ nsr_recon_notify_ev_t recon_head;
+ pthread_mutex_t recon_mutex;
+ pthread_cond_t recon_cv;
+ char **hosts; // list of hosts ordered depending on child indices
+ uint32_t current_term;
+ uint32_t last_reconciled_term;
+ glfs_t *fs;
+ glfs_fd_t *fd;
+} nsr_recon_notify_ctx_t;
+
+static int
+xlator_get_option (xlator_t *xl, char *key, char **value)
+{
+ GF_ASSERT (xl);
+ return dict_get_str (xl->options, key, value);
+}
+
+void nsr_recon_notify_event_set_leader(nsr_private_t *priv)
+{
+ nsr_recon_notify_ev_t *ev;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+
+ ev = GF_CALLOC (1, sizeof (nsr_recon_notify_ev_t), 0);
+ ev->id = NSR_RECON_SET_LEADER;
+ INIT_LIST_HEAD(&(ev->list));
+ pthread_mutex_lock(&ctx->recon_mutex);
+ list_add_tail(&ev->list, &ctx->recon_head.list);
+ pthread_cond_signal(&ctx->recon_cv);
+ pthread_mutex_unlock(&ctx->recon_mutex);
+}
+
+void nsr_recon_notify_event_add_child(nsr_private_t *priv, uint32_t index)
+{
+ nsr_recon_notify_ev_t *ev;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+
+ ev = GF_CALLOC (1, sizeof (nsr_recon_notify_ev_t), 0);
+ ev->id = NSR_RECON_ADD_CHILD;
+ ev->index = index;
+ INIT_LIST_HEAD(&(ev->list));
+ pthread_mutex_lock(&ctx->recon_mutex);
+ list_add_tail(&ev->list, &ctx->recon_head.list);
+ pthread_cond_signal(&ctx->recon_cv);
+ pthread_mutex_unlock(&ctx->recon_mutex);
+}
+
+
+static void
+nsr_recon_set_leader (xlator_t *this)
+{
+
+ nsr_private_t *priv = this->private;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+ nsr_recon_role_t role;
+ xlator_t *old = this;
+ uint32_t i=0;
+
+ if (priv->leader != _gf_true)
+ return;
+
+ if (ctx->last_reconciled_term == priv->current_term)
+ return;
+
+ /*
+ * Quorum for reconciliation is not the same as quorum for I/O. Here,
+ * we require a true majority. The +1 is because we don't count
+ * ourselves as part of n_children or up_children.
+ *
+ * TBD: re-evaluate when to reconcile (including partial)
+ */
+ if (priv->up_children <= (priv->n_children / 2))
+ return;
+
+ gf_log (this->name, GF_LOG_INFO,
+ "Sending message to do recon with %d nodes\n",
+ priv->up_children);
+
+ role.num = 0;
+ role.role = leader;
+ for (i = 0; i < priv->n_children; ++i) {
+ if (priv->kid_state & (1 << i)) {
+ gf_log (this->name, GF_LOG_INFO,
+ "Recon using host %s",
+ ctx->hosts[i]);
+ strcpy(role.info[role.num].name, ctx->hosts[i]);
+ (role.num)++;
+ }
+ }
+
+ gf_log (this->name, GF_LOG_INFO,
+ "setting current term as %d", priv->current_term);
+ role.current_term = priv->current_term;
+ ENDIAN_CONVERSION_RR(role, _gf_false);
+
+ // inform the reconciliator that this is leader
+ // in the callback (once reconciliation is done),
+ // we will unfence the IOs.
+ // TBD - error handling later.
+ if (glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET) == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "doing lseek failed\n");
+ return;
+ }
+
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_INFO,
+ "Writing to local node to set leader");
+ do {
+ if (priv->leader != _gf_true) {
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_ERROR, "no longer leader\n");
+ return;
+ }
+ if (glfs_write(ctx->fd, &role, sizeof(role), 0) == -1) {
+ if (errno == EAGAIN) {
+ // Wait for old reconciliation to bail out.
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_ERROR,
+ "write failed with retry. retrying after some time\n");
+ sleep(5);
+ continue;
+ }
+ else{
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_ERROR,
+ "doing write failed\n");
+ // This is because reconciliation has returned with error
+ // because some node has died in between.
+ // What should be done? Either we retry being leader
+ // or hook to CHILD_DOWN notification.
+ // Put that logic later. As of now we will just retry.
+ // This is easier.
+ sleep(5);
+ continue;
+ }
+ } else {
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_INFO, "doing write with success\n");
+ break;
+ }
+ } while(1);
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_INFO,
+ "glfs_write returned. unfencing IO\n");
+
+ // TBD - error handling
+
+ ctx->last_reconciled_term = priv->current_term;
+ priv->index = 0; // reset changelog index
+ atomic_fetch_and(&(priv->fence_io), 0);
+
+ return;
+}
+
+static void
+nsr_recon_add_child (xlator_t *this, uint32_t index)
+{
+ nsr_private_t *priv = this->private;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+ nsr_recon_role_t role;
+ xlator_t *old = this;
+
+ if (priv->leader != _gf_true)
+ return;
+
+ // reconciliation still pending.
+ // Check if we have majority
+ if (ctx->last_reconciled_term != priv->current_term) {
+ nsr_recon_set_leader(this);
+ } else {
+ // Reconciliation done.
+ // new child joining the majority/
+ // Do reconciliation only fot this child but after fencing new IO and draining old IO
+ role.num = 1;
+ role.role = joiner;
+
+ atomic_fetch_or(&(priv->fence_io), 1);
+ while(priv->ops_in_flight) {
+ sleep(1);
+ }
+
+ strcpy(role.info[0].name, ctx->hosts[index]);
+ role.current_term = priv->current_term;
+ ENDIAN_CONVERSION_RR(role, _gf_false);
+ glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET);
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_INFO,
+ "Writing to local node to join %s\n", role.info[0].name);
+ glfs_write(ctx->fd, &role,
+ sizeof(role), 0);
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_INFO,
+ "Write to local node to set joiner returned\n");
+
+ // TBD - error handling
+ atomic_fetch_and(&(priv->fence_io), 0);
+ }
+
+ return;
+}
+
+static uint32_t
+nsr_setup_recon (xlator_t *this)
+{
+ nsr_private_t *priv = this->private;
+ xlator_t *old = this;
+ uint32_t ret = 0;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+
+ if (priv->nsr_recon_start == _gf_false)
+ return 0;
+
+ ctx->fs = glfs_new(priv->subvol_uuid);
+ if (!ctx->fs) {
+ ret = 1;
+ gf_log (this->name, GF_LOG_ERROR, "failed to initialise glfs \n");
+ goto done;
+ }
+
+ glusterfs_this_set(old);
+ ret = glfs_set_volfile(ctx->fs, priv->vol_file);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to set volfile \n");
+ goto done;
+ }
+
+ glusterfs_this_set(old);
+ /*
+ * REVIEW
+ * Logs belong in /var/log not /tmp.
+ */
+ glfs_set_logging (ctx->fs,"/tmp/glfs-log", 7);
+ if (glfs_init(ctx->fs) < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to init volfile \n");
+ ret = 1;
+ goto done;
+ }
+
+ glusterfs_this_set(old);
+ ctx->fd = glfs_open (ctx->fs, "/", O_RDWR);
+ if (ctx->fd == NULL) {
+ ret = 1;
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to open fd to communicate with recon process \n");
+ goto done;
+ }
+
+
+done:
+ glusterfs_this_set(old);
+ return ret;
+}
+
+
+static void
+nsr_setup_hosts(xlator_t *this)
+{
+ xlator_list_t *trav;
+ nsr_private_t *priv = this->private;
+ uint32_t i = 0;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+
+ ctx->hosts = GF_CALLOC(sizeof(char *), priv->n_children, gf_mt_nsr_private_t);
+ // Iterate thru all the children
+ for (trav = this->children; trav; trav = trav->next) {
+ char *hostname = NULL, *vol = NULL;
+ int ret1 = 0, ret2 = 0, ret = 0;
+ xlator_t *xl = trav->xlator;
+ // If the child type is that of protocol/client
+ if (!strcmp(trav->xlator->type, "protocol/client")) {
+ ret1 = xlator_get_option (xl, "remote-host", &hostname);
+ ret2 = xlator_get_option (xl, "remote-subvolume", &vol);
+ if (!ret1 && !ret2) {
+ // add the name of that host to the hosts
+ ctx->hosts[i] = GF_CALLOC(sizeof(char), strlen(hostname) + strlen(vol) + 2, 0);
+ strcpy(ctx->hosts[i], hostname);
+ strcat(ctx->hosts[i], ":");
+ strcat(ctx->hosts[i], vol);
+ gf_log (this->name, GF_LOG_INFO,
+ "adding hosts %s to recon notfiy list", ctx->hosts[i]);
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "CANNOT FIND HOSTNAME FOR A CHILD");
+ GF_ASSERT(0);
+ }
+ // local brick
+ } else {
+ ret = xlator_get_option (this, "my-name", &hostname);
+ if (!ret) {
+ uint32_t len = strlen(hostname);
+ ctx->hosts[i] = GF_CALLOC(sizeof(char),
+ len+1,
+ gf_mt_nsr_private_t);
+ strcpy(ctx->hosts[i], hostname);
+ gf_log (this->name, GF_LOG_INFO,
+ "adding my host %s to recon notfiy list", ctx->hosts[i]);
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "CANNOT FIND MY HOSTNAME");
+ GF_ASSERT(0);
+ }
+ }
+ i++;
+ }
+}
+
+void *
+nsr_recon_notify_thread (void *arg)
+{
+ xlator_t *this = (xlator_t *)arg;
+ nsr_private_t *priv = this->private;
+ nsr_recon_notify_ev_t *ev;
+ nsr_recon_notify_ctx_t *ctx;
+
+ priv->recon_ctx = GF_CALLOC(1, sizeof(nsr_recon_notify_ctx_t), gf_mt_nsr_private_t);
+ if (!priv->recon_ctx) {
+ gf_log (this->name, GF_LOG_ERROR, "calloc error");
+ return NULL;
+ }
+ ctx = priv->recon_ctx;
+
+ pthread_mutex_init(&(ctx->recon_mutex), NULL);
+ pthread_cond_init(&(ctx->recon_cv), NULL);
+ INIT_LIST_HEAD(&(ctx->recon_head.list));
+
+ nsr_setup_hosts(this);
+
+ if (nsr_setup_recon(this)) {
+ gf_log (this->name, GF_LOG_ERROR, "recon notify thread : initing glfs error");
+ return NULL;
+ }
+
+ priv->recon_notify_inited = 1;
+
+ while(1) {
+ pthread_mutex_lock(&ctx->recon_mutex);
+ while (list_empty(&(ctx->recon_head.list))) {
+ pthread_cond_wait(&ctx->recon_cv, &ctx->recon_mutex);
+ }
+ pthread_mutex_unlock(&ctx->recon_mutex);
+
+ list_for_each_entry(ev, &(ctx->recon_head.list), list) {
+
+ if (ev->id == NSR_RECON_SET_LEADER) {
+ gf_log (this->name, GF_LOG_INFO,
+ "got add leader notfiy event");
+ nsr_recon_set_leader(this);
+ } else if (ev->id == NSR_RECON_ADD_CHILD) {
+ gf_log (this->name, GF_LOG_INFO,
+ "got add child notify event");
+ nsr_recon_add_child(this, ev->index);
+ }
+ }
+ list_del_init (&ev->list);
+ }
+
+ return NULL;
+}
+