diff options
Diffstat (limited to 'xlators/cluster/nsr-server/src/leader.c')
-rw-r--r-- | xlators/cluster/nsr-server/src/leader.c | 420 |
1 files changed, 420 insertions, 0 deletions
diff --git a/xlators/cluster/nsr-server/src/leader.c b/xlators/cluster/nsr-server/src/leader.c new file mode 100644 index 000000000..bb0dbabe7 --- /dev/null +++ b/xlators/cluster/nsr-server/src/leader.c @@ -0,0 +1,420 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <regex.h> +//#include <stdlib.h> +#include <string.h> + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "call-stub.h" +#include "defaults.h" +#include "xlator.h" +#include "api/src/glfs.h" +#include "api/src/glfs-internal.h" + +#include "etcd-api.h" +#include "nsr-internal.h" +#include "../../nsr-recon/src/recon_driver.h" +#include "../../nsr-recon/src/recon_xlator.h" + +/* Vote format: UUID,vote_status,fitness,term_number */ +#define VOTE_ELEMS 4 /* Whole match plus four actual pieces. */ +#define DEFAULT_FITNESS 42 +#define DEFAULT_KEY "nsr" +#define LEADER_TTL 5 /* TBD: make this tunable */ + +typedef enum { LS_SUCCESS, LS_FAILURE, LS_ERROR } leader_retval_t; +enum { NO_LEADER, TENTATIVE, CONFIRMED }; + +regex_t vote_re; + +long +nsr_get_fitness (xlator_t *this) +{ + /* TBD: calculate based on presence/absence from terms */ + return 42; +} + +long +nsr_get_term (xlator_t *this) +{ + nsr_private_t *priv = this->private; + char *text = NULL; + etcd_session etcd = priv->etcd; + + text = etcd_get(etcd, priv->term_uuid); + // first time and hence no key at all. + // this should ideally be done at vol creation time + // by glusterd. Move it there later + if(text == NULL) { + gf_log (this->name, GF_LOG_TRACE, "nsr_get_term returns 1"); + return 0; + } else { + gf_log (this->name, GF_LOG_TRACE, + "nsr_get_term returns %ld", strtol(text, NULL, 10)); + return (strtol(text, NULL, 10)); + } +} + + +// in etcd-api-master. +// send a patch to this package to expose this +extern size_t +parse_get_response (void *ptr, size_t size, size_t nmemb, void *stream); +typedef struct { + etcd_server *servers; +} _etcd_session; +typedef size_t curl_callback_t (void *, size_t, size_t, void *); +extern etcd_result etcd_get_one (_etcd_session *this, char *key, etcd_server *srv, char *prefix, + char *post, curl_callback_t cb, char **stream); + + + +void +nsr_leader_cb(glfs_fd_t *fd, ssize_t ret, void *data) +{ + xlator_t *this = (xlator_t *) data; + nsr_private_t *priv = this->private; + + gf_log (this->name, GF_LOG_INFO, + "nsr_leader_cb arrived with return value %d", (int)ret); + + // TBD - error handling; look at ret + atomic_fetch_and(&(priv->fence_io), 0); + + return; +} + +void +nsr_set_leader (xlator_t *this) +{ + long term = 0; + etcd_server *srv; + etcd_result res; + char *value = NULL; + nsr_private_t *priv = this->private; + _etcd_session *etcd = priv->etcd; + char *term_key = priv->term_uuid; + char *master_key = priv->vol_uuid; + char n_t[sizeof(long)+1]; + nsr_recon_role_t role; + char *text = NULL; + + gf_log (this->name, GF_LOG_INFO, "Just became leader"); + + text = etcd_get(etcd, priv->term_uuid); + if(text == NULL) { + term = 0; + } else { + term = strtol(text, NULL, 10); + } + sprintf(n_t,"%ld",term+1); + res = etcd_set(etcd, term_key,n_t,text,0); + if(res != ETCD_OK) { + gf_log (this->name, GF_LOG_ERROR, "failed to set term"); + return; + } + priv->leader = _gf_true; + + if (priv->nsr_recon_start == _gf_false) { + atomic_fetch_and(&(priv->fence_io), 0); + return; + } + + priv->current_term = term + 1; + + atomic_fetch_or(&(priv->fence_io), 1); + + role.num = 0; + role.role = leader; + // Get the rest of nodes for this term. + // TBD: fix this so that it uses per-brick keys instead of violating + // modularity and making bad assumptions about etcd behavior. + for (srv = etcd->servers; srv->host; ++srv) { + res = etcd_get_one(etcd,master_key,srv,"keys/",NULL, + parse_get_response,&value); + gf_log (this->name, GF_LOG_INFO, + "Probing for %s, got %d, value:%s", + srv->host, res, value); + if ((res == ETCD_OK) && value) { + gf_log (this->name, GF_LOG_INFO, + "Found for %s", srv->host); + strcpy(role.info[role.num].name, srv->host); + (role.num)++; + } + value = NULL; + } + gf_log (this->name, GF_LOG_INFO, + "Discovered %d nodes that has key %s", role.num, master_key); + + gf_log (this->name, GF_LOG_INFO, + "setting current term as %ld", term + 1); + role.current_term = term + 1; + ENDIAN_CONVERSION_RR(role, _gf_false); + + // inform the reconciliator that this is leader + // in the callback (once reconciliation is done), + // we will unfence the IOs. + // TBD - error handling later. + glfs_lseek(priv->fd, nsr_recon_xlator_sector_1, SEEK_SET); + gf_log (this->name, GF_LOG_INFO, + "Writing to local node to set leader"); + glfs_write_async(priv->fd, &role, + sizeof(role),nsr_recon_xlator_sector_1, + nsr_leader_cb, this); +} + + +leader_retval_t +nsr_get_leader (xlator_t *this, etcd_session etcd, char *key) +{ + char *text = NULL; + regmatch_t matches[VOTE_ELEMS]; + char *nominee; + long state; + long fitness; + char *vote = NULL; + int retval = LS_ERROR; + nsr_private_t *priv = this->private; + + for (;;sleep(1)) { + + if (text) { + free(text); + } + + text = etcd_get(etcd,key); + if (text) { + if (regexec(&vote_re,text,VOTE_ELEMS,matches,0) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "got malformed vote %s\n", text); + continue; + } + /* We can be destructive here, so convert commas. */ + text[matches[1].rm_eo] = '\0'; + text[matches[2].rm_eo] = '\0'; + nominee = text + matches[1].rm_so; + state = strtol(text+matches[2].rm_so,NULL,10); + fitness = strtol(text+matches[3].rm_so,NULL,10); + } + else { + nominee = NULL; + state = NO_LEADER; + fitness = 0; + } + + if (state == CONFIRMED) { + gf_log (this->name, GF_LOG_TRACE, + "leader is %s\n",nominee); + if (strcmp(nominee,priv->brick_uuid) == 0) { + nsr_set_leader(this); + retval = LS_SUCCESS; + } + else { + priv->leader = _gf_false; + retval = LS_FAILURE; + } + break; + } + + /* TBD: override based on fitness */ + if ((state >= TENTATIVE) && (strcmp(nominee, + priv->brick_uuid) != 0)) { + continue; + } + + if (vote) { + free(vote); + } + + fitness = nsr_get_fitness(this); + if (asprintf(&vote,"%s,%ld,%ld",priv->brick_uuid, + state+1,fitness) < 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to construct vote\n"); + break; + } + + if (text) { + text[matches[1].rm_eo] = ','; + text[matches[2].rm_eo] = ','; + } + if (etcd_set(etcd,key,vote,text,LEADER_TTL) != ETCD_OK) { + gf_log (this->name, GF_LOG_ERROR, + "failed to cast vote\n"); + continue; + } + + } + + if (text) { + free(text); + } + if (vote) { + free(vote); + } + return retval; +} + +leader_retval_t +nsr_confirm (xlator_t *this, etcd_session etcd, char *key) +{ + char *vote; + long fitness; + nsr_private_t *priv = this->private; + + fitness = nsr_get_fitness(this); + if (asprintf(&vote,"%s,%ld,%ld",priv->brick_uuid,(long)CONFIRMED, + fitness) < 0) { + fprintf (stderr, "%s: failed to construct confirmation\n", + __func__); + return LS_ERROR; + } + + if (etcd_set(etcd,key,vote,vote,LEADER_TTL) != ETCD_OK) { + fprintf (stderr, "%s: failed to confirm\n", __func__); + free(vote); + return LS_FAILURE; + } + + free(vote); + return LS_SUCCESS; +} + +gf_boolean_t +nsr_init_re (xlator_t *this) +{ + static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + static int was_inited = 0; + static char *vote_re_str = "([^,]+),([^,]+),([^,]+)"; + gf_boolean_t retval = _gf_false; + + pthread_mutex_lock(&mutex); + if (!was_inited) { + if (regcomp(&vote_re,vote_re_str,REG_EXTENDED) == 0) { + retval = _gf_true; + } + else { + gf_log (this->name, GF_LOG_ERROR, + "failed to set up vote regex\n"); + } + } + pthread_mutex_unlock(&mutex); + + return retval; +} + + +uint32_t +nsr_leader_setup_recon (xlator_t *this) +{ + nsr_private_t *priv = this->private; + xlator_t *old = this; + uint32_t ret = 0; + + if (priv->nsr_recon_start == _gf_false) + return 0; + + priv->fs = glfs_new(priv->vol_uuid); + if (!priv->fs) { + ret = 1; + gf_log (this->name, GF_LOG_ERROR, "failed to initialise glfs \n"); + goto done; + } + + glusterfs_this_set(old); + ret = glfs_set_volfile(priv->fs, priv->vol_file); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, "failed to set volfile \n"); + goto done; + } + + glusterfs_this_set(old); + /* + * REVIEW + * Logs belong in /var/log not /tmp. + */ + glfs_set_logging (priv->fs,"/tmp/glfs-log", 7); + if (glfs_init(priv->fs) < 0) { + gf_log (this->name, GF_LOG_ERROR, "failed to init volfile \n"); + ret = 1; + goto done; + } + + glusterfs_this_set(old); + priv->fd = glfs_open (priv->fs, "/", O_RDWR); + if (priv->fd == NULL) { + ret = 1; + gf_log (this->name, GF_LOG_ERROR, + "failed to open fd to communicate with recon process \n"); + goto done; + } + + +done: + glusterfs_this_set(old); + return ret; +} + +void * +nsr_leader_thread (xlator_t *this) +{ + leader_retval_t retval; + nsr_private_t *priv = this->private; + + if (!nsr_init_re(this)) { + gf_log (this->name, GF_LOG_ERROR, "could not init regex"); + return NULL; + } + + if (nsr_leader_setup_recon(this)) { + gf_log (this->name, GF_LOG_ERROR, + "failed to do glfs initialisation inside leader thread"); + return NULL; + } + + priv->leader_inited = 1; + + gf_log (this->name, GF_LOG_INFO, + "calling glfs_opens_str on servers %s", priv->etcd_servers); + + priv->etcd = etcd_open_str(priv->etcd_servers); + if (!(priv->etcd)) { + gf_log (this->name, GF_LOG_ERROR, + "failed to open etcd session\n"); + return NULL; + } + + for (;;) { + if (nsr_get_leader(this,priv->etcd,priv->vol_uuid) == LS_ERROR) { + break; + } + if (priv->leader) { + do { + sleep(1); + retval = nsr_confirm(this,priv->etcd,priv->vol_uuid); + } while (retval == LS_SUCCESS); + if (retval == LS_ERROR) { + break; + } + } + else { + sleep(1); + } + } + + etcd_close_str(priv->etcd); + return NULL; +} + |