diff options
Diffstat (limited to 'xlators/cluster/nsr-server/src/recon_notify.c')
-rw-r--r-- | xlators/cluster/nsr-server/src/recon_notify.c | 389 |
1 files changed, 389 insertions, 0 deletions
diff --git a/xlators/cluster/nsr-server/src/recon_notify.c b/xlators/cluster/nsr-server/src/recon_notify.c new file mode 100644 index 000000000..1c50de234 --- /dev/null +++ b/xlators/cluster/nsr-server/src/recon_notify.c @@ -0,0 +1,389 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <string.h> + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "call-stub.h" +#include "defaults.h" +#include "xlator.h" +#include "api/src/glfs.h" +#include "api/src/glfs-internal.h" +#include "etcd-api.h" +#include "nsr-internal.h" +#include "../../nsr-recon/src/recon_driver.h" +#include "../../nsr-recon/src/recon_xlator.h" + + + +typedef struct _nsr_recon_notify_ctx_t { + nsr_recon_notify_ev_t recon_head; + pthread_mutex_t recon_mutex; + pthread_cond_t recon_cv; + char **hosts; // list of hosts ordered depending on child indices + uint32_t current_term; + uint32_t last_reconciled_term; + glfs_t *fs; + glfs_fd_t *fd; +} nsr_recon_notify_ctx_t; + +static int +xlator_get_option (xlator_t *xl, char *key, char **value) +{ + GF_ASSERT (xl); + return dict_get_str (xl->options, key, value); +} + +void nsr_recon_notify_event_set_leader(nsr_private_t *priv) +{ + nsr_recon_notify_ev_t *ev; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + + ev = GF_CALLOC (1, sizeof (nsr_recon_notify_ev_t), 0); + ev->id = NSR_RECON_SET_LEADER; + INIT_LIST_HEAD(&(ev->list)); + pthread_mutex_lock(&ctx->recon_mutex); + list_add_tail(&ev->list, &ctx->recon_head.list); + pthread_cond_signal(&ctx->recon_cv); + pthread_mutex_unlock(&ctx->recon_mutex); +} + +void nsr_recon_notify_event_add_child(nsr_private_t *priv, uint32_t index) +{ + nsr_recon_notify_ev_t *ev; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + + ev = GF_CALLOC (1, sizeof (nsr_recon_notify_ev_t), 0); + ev->id = NSR_RECON_ADD_CHILD; + ev->index = index; + INIT_LIST_HEAD(&(ev->list)); + pthread_mutex_lock(&ctx->recon_mutex); + list_add_tail(&ev->list, &ctx->recon_head.list); + pthread_cond_signal(&ctx->recon_cv); + pthread_mutex_unlock(&ctx->recon_mutex); +} + + +static void +nsr_recon_set_leader (xlator_t *this) +{ + + nsr_private_t *priv = this->private; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + nsr_recon_role_t role; + xlator_t *old = this; + uint32_t i=0; + + if (priv->leader != _gf_true) + return; + + if (ctx->last_reconciled_term == priv->current_term) + return; + + /* + * Quorum for reconciliation is not the same as quorum for I/O. Here, + * we require a true majority. The +1 is because we don't count + * ourselves as part of n_children or up_children. + * + * TBD: re-evaluate when to reconcile (including partial) + */ + if (priv->up_children <= (priv->n_children / 2)) + return; + + gf_log (this->name, GF_LOG_INFO, + "Sending message to do recon with %d nodes\n", + priv->up_children); + + role.num = 0; + role.role = leader; + for (i = 0; i < priv->n_children; ++i) { + if (priv->kid_state & (1 << i)) { + gf_log (this->name, GF_LOG_INFO, + "Recon using host %s", + ctx->hosts[i]); + strcpy(role.info[role.num].name, ctx->hosts[i]); + (role.num)++; + } + } + + gf_log (this->name, GF_LOG_INFO, + "setting current term as %d", priv->current_term); + role.current_term = priv->current_term; + ENDIAN_CONVERSION_RR(role, _gf_false); + + // inform the reconciliator that this is leader + // in the callback (once reconciliation is done), + // we will unfence the IOs. + // TBD - error handling later. + if (glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET) == -1) { + gf_log (this->name, GF_LOG_ERROR, + "doing lseek failed\n"); + return; + } + + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_INFO, + "Writing to local node to set leader"); + do { + if (priv->leader != _gf_true) { + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_ERROR, "no longer leader\n"); + return; + } + if (glfs_write(ctx->fd, &role, sizeof(role), 0) == -1) { + if (errno == EAGAIN) { + // Wait for old reconciliation to bail out. + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_ERROR, + "write failed with retry. retrying after some time\n"); + sleep(5); + continue; + } + else{ + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_ERROR, + "doing write failed\n"); + // This is because reconciliation has returned with error + // because some node has died in between. + // What should be done? Either we retry being leader + // or hook to CHILD_DOWN notification. + // Put that logic later. As of now we will just retry. + // This is easier. + sleep(5); + continue; + } + } else { + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_INFO, "doing write with success\n"); + break; + } + } while(1); + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_INFO, + "glfs_write returned. unfencing IO\n"); + + // TBD - error handling + + ctx->last_reconciled_term = priv->current_term; + priv->index = 0; // reset changelog index + atomic_fetch_and(&(priv->fence_io), 0); + + return; +} + +static void +nsr_recon_add_child (xlator_t *this, uint32_t index) +{ + nsr_private_t *priv = this->private; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + nsr_recon_role_t role; + xlator_t *old = this; + + if (priv->leader != _gf_true) + return; + + // reconciliation still pending. + // Check if we have majority + if (ctx->last_reconciled_term != priv->current_term) { + nsr_recon_set_leader(this); + } else { + // Reconciliation done. + // new child joining the majority/ + // Do reconciliation only fot this child but after fencing new IO and draining old IO + role.num = 1; + role.role = joiner; + + atomic_fetch_or(&(priv->fence_io), 1); + while(priv->ops_in_flight) { + sleep(1); + } + + strcpy(role.info[0].name, ctx->hosts[index]); + role.current_term = priv->current_term; + ENDIAN_CONVERSION_RR(role, _gf_false); + glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET); + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_INFO, + "Writing to local node to join %s\n", role.info[0].name); + glfs_write(ctx->fd, &role, + sizeof(role), 0); + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_INFO, + "Write to local node to set joiner returned\n"); + + // TBD - error handling + atomic_fetch_and(&(priv->fence_io), 0); + } + + return; +} + +static uint32_t +nsr_setup_recon (xlator_t *this) +{ + nsr_private_t *priv = this->private; + xlator_t *old = this; + uint32_t ret = 0; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + + if (priv->nsr_recon_start == _gf_false) + return 0; + + ctx->fs = glfs_new(priv->subvol_uuid); + if (!ctx->fs) { + ret = 1; + gf_log (this->name, GF_LOG_ERROR, "failed to initialise glfs \n"); + goto done; + } + + glusterfs_this_set(old); + ret = glfs_set_volfile(ctx->fs, priv->vol_file); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, "failed to set volfile \n"); + goto done; + } + + glusterfs_this_set(old); + /* + * REVIEW + * Logs belong in /var/log not /tmp. + */ + glfs_set_logging (ctx->fs,"/tmp/glfs-log", 7); + if (glfs_init(ctx->fs) < 0) { + gf_log (this->name, GF_LOG_ERROR, "failed to init volfile \n"); + ret = 1; + goto done; + } + + glusterfs_this_set(old); + ctx->fd = glfs_open (ctx->fs, "/", O_RDWR); + if (ctx->fd == NULL) { + ret = 1; + gf_log (this->name, GF_LOG_ERROR, + "failed to open fd to communicate with recon process \n"); + goto done; + } + + +done: + glusterfs_this_set(old); + return ret; +} + + +static void +nsr_setup_hosts(xlator_t *this) +{ + xlator_list_t *trav; + nsr_private_t *priv = this->private; + uint32_t i = 0; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + + ctx->hosts = GF_CALLOC(sizeof(char *), priv->n_children, gf_mt_nsr_private_t); + // Iterate thru all the children + for (trav = this->children; trav; trav = trav->next) { + char *hostname = NULL, *vol = NULL; + int ret1 = 0, ret2 = 0, ret = 0; + xlator_t *xl = trav->xlator; + // If the child type is that of protocol/client + if (!strcmp(trav->xlator->type, "protocol/client")) { + ret1 = xlator_get_option (xl, "remote-host", &hostname); + ret2 = xlator_get_option (xl, "remote-subvolume", &vol); + if (!ret1 && !ret2) { + // add the name of that host to the hosts + ctx->hosts[i] = GF_CALLOC(sizeof(char), strlen(hostname) + strlen(vol) + 2, 0); + strcpy(ctx->hosts[i], hostname); + strcat(ctx->hosts[i], ":"); + strcat(ctx->hosts[i], vol); + gf_log (this->name, GF_LOG_INFO, + "adding hosts %s to recon notfiy list", ctx->hosts[i]); + } else { + gf_log (this->name, GF_LOG_ERROR, + "CANNOT FIND HOSTNAME FOR A CHILD"); + GF_ASSERT(0); + } + // local brick + } else { + ret = xlator_get_option (this, "my-name", &hostname); + if (!ret) { + uint32_t len = strlen(hostname); + ctx->hosts[i] = GF_CALLOC(sizeof(char), + len+1, + gf_mt_nsr_private_t); + strcpy(ctx->hosts[i], hostname); + gf_log (this->name, GF_LOG_INFO, + "adding my host %s to recon notfiy list", ctx->hosts[i]); + } else { + gf_log (this->name, GF_LOG_ERROR, + "CANNOT FIND MY HOSTNAME"); + GF_ASSERT(0); + } + } + i++; + } +} + +void * +nsr_recon_notify_thread (void *arg) +{ + xlator_t *this = (xlator_t *)arg; + nsr_private_t *priv = this->private; + nsr_recon_notify_ev_t *ev; + nsr_recon_notify_ctx_t *ctx; + + priv->recon_ctx = GF_CALLOC(1, sizeof(nsr_recon_notify_ctx_t), gf_mt_nsr_private_t); + if (!priv->recon_ctx) { + gf_log (this->name, GF_LOG_ERROR, "calloc error"); + return NULL; + } + ctx = priv->recon_ctx; + + pthread_mutex_init(&(ctx->recon_mutex), NULL); + pthread_cond_init(&(ctx->recon_cv), NULL); + INIT_LIST_HEAD(&(ctx->recon_head.list)); + + nsr_setup_hosts(this); + + if (nsr_setup_recon(this)) { + gf_log (this->name, GF_LOG_ERROR, "recon notify thread : initing glfs error"); + return NULL; + } + + priv->recon_notify_inited = 1; + + while(1) { + pthread_mutex_lock(&ctx->recon_mutex); + while (list_empty(&(ctx->recon_head.list))) { + pthread_cond_wait(&ctx->recon_cv, &ctx->recon_mutex); + } + pthread_mutex_unlock(&ctx->recon_mutex); + + list_for_each_entry(ev, &(ctx->recon_head.list), list) { + + if (ev->id == NSR_RECON_SET_LEADER) { + gf_log (this->name, GF_LOG_INFO, + "got add leader notfiy event"); + nsr_recon_set_leader(this); + } else if (ev->id == NSR_RECON_ADD_CHILD) { + gf_log (this->name, GF_LOG_INFO, + "got add child notify event"); + nsr_recon_add_child(this, ev->index); + } + } + list_del_init (&ev->list); + } + + return NULL; +} + |