summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/nsr-server/src/leader.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/nsr-server/src/leader.c')
-rw-r--r--xlators/cluster/nsr-server/src/leader.c420
1 files changed, 420 insertions, 0 deletions
diff --git a/xlators/cluster/nsr-server/src/leader.c b/xlators/cluster/nsr-server/src/leader.c
new file mode 100644
index 000000000..bb0dbabe7
--- /dev/null
+++ b/xlators/cluster/nsr-server/src/leader.c
@@ -0,0 +1,420 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <regex.h>
+//#include <stdlib.h>
+#include <string.h>
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "call-stub.h"
+#include "defaults.h"
+#include "xlator.h"
+#include "api/src/glfs.h"
+#include "api/src/glfs-internal.h"
+
+#include "etcd-api.h"
+#include "nsr-internal.h"
+#include "../../nsr-recon/src/recon_driver.h"
+#include "../../nsr-recon/src/recon_xlator.h"
+
+/* Vote format: UUID,vote_status,fitness,term_number */
+#define VOTE_ELEMS 4 /* Whole match plus four actual pieces. */
+#define DEFAULT_FITNESS 42
+#define DEFAULT_KEY "nsr"
+#define LEADER_TTL 5 /* TBD: make this tunable */
+
+typedef enum { LS_SUCCESS, LS_FAILURE, LS_ERROR } leader_retval_t;
+enum { NO_LEADER, TENTATIVE, CONFIRMED };
+
+regex_t vote_re;
+
+long
+nsr_get_fitness (xlator_t *this)
+{
+ /* TBD: calculate based on presence/absence from terms */
+ return 42;
+}
+
+long
+nsr_get_term (xlator_t *this)
+{
+ nsr_private_t *priv = this->private;
+ char *text = NULL;
+ etcd_session etcd = priv->etcd;
+
+ text = etcd_get(etcd, priv->term_uuid);
+ // first time and hence no key at all.
+ // this should ideally be done at vol creation time
+ // by glusterd. Move it there later
+ if(text == NULL) {
+ gf_log (this->name, GF_LOG_TRACE, "nsr_get_term returns 1");
+ return 0;
+ } else {
+ gf_log (this->name, GF_LOG_TRACE,
+ "nsr_get_term returns %ld", strtol(text, NULL, 10));
+ return (strtol(text, NULL, 10));
+ }
+}
+
+
+// in etcd-api-master.
+// send a patch to this package to expose this
+extern size_t
+parse_get_response (void *ptr, size_t size, size_t nmemb, void *stream);
+typedef struct {
+ etcd_server *servers;
+} _etcd_session;
+typedef size_t curl_callback_t (void *, size_t, size_t, void *);
+extern etcd_result etcd_get_one (_etcd_session *this, char *key, etcd_server *srv, char *prefix,
+ char *post, curl_callback_t cb, char **stream);
+
+
+
+void
+nsr_leader_cb(glfs_fd_t *fd, ssize_t ret, void *data)
+{
+ xlator_t *this = (xlator_t *) data;
+ nsr_private_t *priv = this->private;
+
+ gf_log (this->name, GF_LOG_INFO,
+ "nsr_leader_cb arrived with return value %d", (int)ret);
+
+ // TBD - error handling; look at ret
+ atomic_fetch_and(&(priv->fence_io), 0);
+
+ return;
+}
+
+void
+nsr_set_leader (xlator_t *this)
+{
+ long term = 0;
+ etcd_server *srv;
+ etcd_result res;
+ char *value = NULL;
+ nsr_private_t *priv = this->private;
+ _etcd_session *etcd = priv->etcd;
+ char *term_key = priv->term_uuid;
+ char *master_key = priv->vol_uuid;
+ char n_t[sizeof(long)+1];
+ nsr_recon_role_t role;
+ char *text = NULL;
+
+ gf_log (this->name, GF_LOG_INFO, "Just became leader");
+
+ text = etcd_get(etcd, priv->term_uuid);
+ if(text == NULL) {
+ term = 0;
+ } else {
+ term = strtol(text, NULL, 10);
+ }
+ sprintf(n_t,"%ld",term+1);
+ res = etcd_set(etcd, term_key,n_t,text,0);
+ if(res != ETCD_OK) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to set term");
+ return;
+ }
+ priv->leader = _gf_true;
+
+ if (priv->nsr_recon_start == _gf_false) {
+ atomic_fetch_and(&(priv->fence_io), 0);
+ return;
+ }
+
+ priv->current_term = term + 1;
+
+ atomic_fetch_or(&(priv->fence_io), 1);
+
+ role.num = 0;
+ role.role = leader;
+ // Get the rest of nodes for this term.
+ // TBD: fix this so that it uses per-brick keys instead of violating
+ // modularity and making bad assumptions about etcd behavior.
+ for (srv = etcd->servers; srv->host; ++srv) {
+ res = etcd_get_one(etcd,master_key,srv,"keys/",NULL,
+ parse_get_response,&value);
+ gf_log (this->name, GF_LOG_INFO,
+ "Probing for %s, got %d, value:%s",
+ srv->host, res, value);
+ if ((res == ETCD_OK) && value) {
+ gf_log (this->name, GF_LOG_INFO,
+ "Found for %s", srv->host);
+ strcpy(role.info[role.num].name, srv->host);
+ (role.num)++;
+ }
+ value = NULL;
+ }
+ gf_log (this->name, GF_LOG_INFO,
+ "Discovered %d nodes that has key %s", role.num, master_key);
+
+ gf_log (this->name, GF_LOG_INFO,
+ "setting current term as %ld", term + 1);
+ role.current_term = term + 1;
+ ENDIAN_CONVERSION_RR(role, _gf_false);
+
+ // inform the reconciliator that this is leader
+ // in the callback (once reconciliation is done),
+ // we will unfence the IOs.
+ // TBD - error handling later.
+ glfs_lseek(priv->fd, nsr_recon_xlator_sector_1, SEEK_SET);
+ gf_log (this->name, GF_LOG_INFO,
+ "Writing to local node to set leader");
+ glfs_write_async(priv->fd, &role,
+ sizeof(role),nsr_recon_xlator_sector_1,
+ nsr_leader_cb, this);
+}
+
+
+leader_retval_t
+nsr_get_leader (xlator_t *this, etcd_session etcd, char *key)
+{
+ char *text = NULL;
+ regmatch_t matches[VOTE_ELEMS];
+ char *nominee;
+ long state;
+ long fitness;
+ char *vote = NULL;
+ int retval = LS_ERROR;
+ nsr_private_t *priv = this->private;
+
+ for (;;sleep(1)) {
+
+ if (text) {
+ free(text);
+ }
+
+ text = etcd_get(etcd,key);
+ if (text) {
+ if (regexec(&vote_re,text,VOTE_ELEMS,matches,0) != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "got malformed vote %s\n", text);
+ continue;
+ }
+ /* We can be destructive here, so convert commas. */
+ text[matches[1].rm_eo] = '\0';
+ text[matches[2].rm_eo] = '\0';
+ nominee = text + matches[1].rm_so;
+ state = strtol(text+matches[2].rm_so,NULL,10);
+ fitness = strtol(text+matches[3].rm_so,NULL,10);
+ }
+ else {
+ nominee = NULL;
+ state = NO_LEADER;
+ fitness = 0;
+ }
+
+ if (state == CONFIRMED) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "leader is %s\n",nominee);
+ if (strcmp(nominee,priv->brick_uuid) == 0) {
+ nsr_set_leader(this);
+ retval = LS_SUCCESS;
+ }
+ else {
+ priv->leader = _gf_false;
+ retval = LS_FAILURE;
+ }
+ break;
+ }
+
+ /* TBD: override based on fitness */
+ if ((state >= TENTATIVE) && (strcmp(nominee,
+ priv->brick_uuid) != 0)) {
+ continue;
+ }
+
+ if (vote) {
+ free(vote);
+ }
+
+ fitness = nsr_get_fitness(this);
+ if (asprintf(&vote,"%s,%ld,%ld",priv->brick_uuid,
+ state+1,fitness) < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to construct vote\n");
+ break;
+ }
+
+ if (text) {
+ text[matches[1].rm_eo] = ',';
+ text[matches[2].rm_eo] = ',';
+ }
+ if (etcd_set(etcd,key,vote,text,LEADER_TTL) != ETCD_OK) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to cast vote\n");
+ continue;
+ }
+
+ }
+
+ if (text) {
+ free(text);
+ }
+ if (vote) {
+ free(vote);
+ }
+ return retval;
+}
+
+leader_retval_t
+nsr_confirm (xlator_t *this, etcd_session etcd, char *key)
+{
+ char *vote;
+ long fitness;
+ nsr_private_t *priv = this->private;
+
+ fitness = nsr_get_fitness(this);
+ if (asprintf(&vote,"%s,%ld,%ld",priv->brick_uuid,(long)CONFIRMED,
+ fitness) < 0) {
+ fprintf (stderr, "%s: failed to construct confirmation\n",
+ __func__);
+ return LS_ERROR;
+ }
+
+ if (etcd_set(etcd,key,vote,vote,LEADER_TTL) != ETCD_OK) {
+ fprintf (stderr, "%s: failed to confirm\n", __func__);
+ free(vote);
+ return LS_FAILURE;
+ }
+
+ free(vote);
+ return LS_SUCCESS;
+}
+
+gf_boolean_t
+nsr_init_re (xlator_t *this)
+{
+ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+ static int was_inited = 0;
+ static char *vote_re_str = "([^,]+),([^,]+),([^,]+)";
+ gf_boolean_t retval = _gf_false;
+
+ pthread_mutex_lock(&mutex);
+ if (!was_inited) {
+ if (regcomp(&vote_re,vote_re_str,REG_EXTENDED) == 0) {
+ retval = _gf_true;
+ }
+ else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set up vote regex\n");
+ }
+ }
+ pthread_mutex_unlock(&mutex);
+
+ return retval;
+}
+
+
+uint32_t
+nsr_leader_setup_recon (xlator_t *this)
+{
+ nsr_private_t *priv = this->private;
+ xlator_t *old = this;
+ uint32_t ret = 0;
+
+ if (priv->nsr_recon_start == _gf_false)
+ return 0;
+
+ priv->fs = glfs_new(priv->vol_uuid);
+ if (!priv->fs) {
+ ret = 1;
+ gf_log (this->name, GF_LOG_ERROR, "failed to initialise glfs \n");
+ goto done;
+ }
+
+ glusterfs_this_set(old);
+ ret = glfs_set_volfile(priv->fs, priv->vol_file);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to set volfile \n");
+ goto done;
+ }
+
+ glusterfs_this_set(old);
+ /*
+ * REVIEW
+ * Logs belong in /var/log not /tmp.
+ */
+ glfs_set_logging (priv->fs,"/tmp/glfs-log", 7);
+ if (glfs_init(priv->fs) < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to init volfile \n");
+ ret = 1;
+ goto done;
+ }
+
+ glusterfs_this_set(old);
+ priv->fd = glfs_open (priv->fs, "/", O_RDWR);
+ if (priv->fd == NULL) {
+ ret = 1;
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to open fd to communicate with recon process \n");
+ goto done;
+ }
+
+
+done:
+ glusterfs_this_set(old);
+ return ret;
+}
+
+void *
+nsr_leader_thread (xlator_t *this)
+{
+ leader_retval_t retval;
+ nsr_private_t *priv = this->private;
+
+ if (!nsr_init_re(this)) {
+ gf_log (this->name, GF_LOG_ERROR, "could not init regex");
+ return NULL;
+ }
+
+ if (nsr_leader_setup_recon(this)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to do glfs initialisation inside leader thread");
+ return NULL;
+ }
+
+ priv->leader_inited = 1;
+
+ gf_log (this->name, GF_LOG_INFO,
+ "calling glfs_opens_str on servers %s", priv->etcd_servers);
+
+ priv->etcd = etcd_open_str(priv->etcd_servers);
+ if (!(priv->etcd)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to open etcd session\n");
+ return NULL;
+ }
+
+ for (;;) {
+ if (nsr_get_leader(this,priv->etcd,priv->vol_uuid) == LS_ERROR) {
+ break;
+ }
+ if (priv->leader) {
+ do {
+ sleep(1);
+ retval = nsr_confirm(this,priv->etcd,priv->vol_uuid);
+ } while (retval == LS_SUCCESS);
+ if (retval == LS_ERROR) {
+ break;
+ }
+ }
+ else {
+ sleep(1);
+ }
+ }
+
+ etcd_close_str(priv->etcd);
+ return NULL;
+}
+