summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/nsr-server
diff options
context:
space:
mode:
authorRaghavan P <rpichai@redhat.com>2014-01-03 16:09:04 +0530
committerRaghavan P <rpichai@redhat.com>2014-01-08 14:48:21 +0530
commite0cce4cf7c22d5cd8ab6c2aff4ecf28c18c6a469 (patch)
tree5e30d20eaf43c77f77d5aa9d4351492af659b39f /xlators/cluster/nsr-server
parent82ce8acfdfb141c6b34b6b6b43ef78eee891f9e8 (diff)
Changes to NSR reconciliation code.
Following is list of changes: 1) Simulation of etcd using a file as a registry protected using locks. 2) Implement notifications for child up and child down. 3) Join a new brick into quorum. 4) add support for proper fencing and draining of IO required for reconciliaiton 5) misc changes and addressed review comments. Change-Id: Iddd1137ad6205252ed03301888bb1e83fa2221e0 Signed-off-by: Raghavan P <rpichai@redhat.com>
Diffstat (limited to 'xlators/cluster/nsr-server')
-rw-r--r--xlators/cluster/nsr-server/src/Makefile.am8
-rw-r--r--xlators/cluster/nsr-server/src/all-templates.c8
-rw-r--r--xlators/cluster/nsr-server/src/etcd-sim.c222
-rw-r--r--xlators/cluster/nsr-server/src/leader.c173
-rw-r--r--xlators/cluster/nsr-server/src/nsr-internal.h20
-rw-r--r--xlators/cluster/nsr-server/src/nsr.c45
-rw-r--r--xlators/cluster/nsr-server/src/recon_notify.c345
7 files changed, 648 insertions, 173 deletions
diff --git a/xlators/cluster/nsr-server/src/Makefile.am b/xlators/cluster/nsr-server/src/Makefile.am
index df0d68539..85a560d09 100644
--- a/xlators/cluster/nsr-server/src/Makefile.am
+++ b/xlators/cluster/nsr-server/src/Makefile.am
@@ -4,9 +4,15 @@ xlator_LTLIBRARIES = nsr.la
xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/cluster
nsr_la_LDFLAGS = -module -avoid-version -lgfapi -lcurl
-nsr_la_SOURCES = nsr.c leader.c etcd-api.c \
+
+if ENABLE_ETCD_SIM
+nsr_la_SOURCES = nsr.c leader.c recon_notify.c etcd-sim.c
+else
+nsr_la_SOURCES = nsr.c leader.c recon_notify.c etcd-api.c \
yajl.c yajl_alloc.c yajl_buf.c yajl_encode.c yajl_gen.c \
yajl_lex.c yajl_parser.c yajl_tree.c yajl_version.c
+endif
+
nsr_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
diff --git a/xlators/cluster/nsr-server/src/all-templates.c b/xlators/cluster/nsr-server/src/all-templates.c
index 541653029..7300973d5 100644
--- a/xlators/cluster/nsr-server/src/all-templates.c
+++ b/xlators/cluster/nsr-server/src/all-templates.c
@@ -83,17 +83,20 @@ nsr_$NAME$ (call_frame_t *frame, xlator_t *this,
// follower/recon path
// just send it to local node
if (from_leader || from_recon) {
+ atomic_inc(&priv->ops_in_flight);
STACK_WIND (frame, nsr_$NAME$_complete,
FIRST_CHILD(this), FIRST_CHILD(this)->fops->$NAME$,
$ARGS_SHORT$);
return 0;
}
+
if (!priv->leader || priv->fence_io) {
op_errno = EREMOTE;
goto err;
}
+
if (!xdata) {
xdata = dict_new();
if (!xdata) {
@@ -115,6 +118,7 @@ nsr_$NAME$ (call_frame_t *frame, xlator_t *this,
goto err;
}
+
#if defined(NSR_CG_QUEUE)
nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd->inode);
if (!ictx) {
@@ -188,6 +192,8 @@ nsr_$NAME$_dispatch (call_frame_t *frame, xlator_t *this,
nsr_private_t *priv = this->private;
xlator_list_t *trav;
+ atomic_inc(&priv->ops_in_flight);
+
/*
* TBD: unblock pending request(s) if we fail after this point but
* before we get to nsr_$NAME$_complete (where that code currently
@@ -246,6 +252,7 @@ nsr_$NAME$_complete (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
$ARGS_LONG$)
{
+ nsr_private_t *priv = this->private;
#if defined(NSR_CG_NEED_FD)
nsr_local_t *local = frame->local;
#endif
@@ -294,6 +301,7 @@ nsr_$NAME$_complete (call_frame_t *frame, void *cookie, xlator_t *this,
STACK_UNWIND_STRICT ($NAME$, frame, op_ret, op_errno,
$ARGS_SHORT$);
+ atomic_dec(&priv->ops_in_flight);
return 0;
}
diff --git a/xlators/cluster/nsr-server/src/etcd-sim.c b/xlators/cluster/nsr-server/src/etcd-sim.c
new file mode 100644
index 000000000..5c5cdcec0
--- /dev/null
+++ b/xlators/cluster/nsr-server/src/etcd-sim.c
@@ -0,0 +1,222 @@
+/*
+ * Copyright (c) 2013, Red Hat
+ * All rights reserved.
+
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer. Redistributions in binary
+ * form must reproduce the above copyright notice, this list of conditions and
+ * the following disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "call-stub.h"
+#include "defaults.h"
+#include "xlator.h"
+#include "api/src/glfs.h"
+#include "api/src/glfs-internal.h"
+#include "run.h"
+
+
+/*
+ * Mock implementation of etcd
+ * The etcd file is simulated in /tmp/<server-names>
+ * Writes from Multiple writers are protected using file lock.
+*/
+
+#include "etcd-api.h"
+#define T_FORMAT "%d-%b-%Y,%H:%M:%S"
+#define MAX_KEY_LEN 64
+#define MAX_VALUE_LEN 64
+#define MAX_TTL_LEN 12
+
+etcd_session
+etcd_open (etcd_server *server_list)
+{
+ return NULL;
+}
+
+typedef struct _etcd_sim_s {
+ int fd;
+ FILE *stream;
+} etcd_sim_t;
+
+void
+etcd_close (etcd_session this)
+{
+ etcd_sim_t *sim = (etcd_sim_t *)this;
+ fflush(sim->stream);
+ fclose(sim->stream);
+ close(sim->fd);
+ free(this);
+}
+
+
+char *
+etcd_get (etcd_session this, char *key)
+{
+ char *str = NULL;
+ size_t len;
+ etcd_sim_t *sim = (etcd_sim_t *)this;
+ struct tm tm;
+ time_t old, new;
+ lockf(sim->fd, F_LOCK, 0 );
+ if (fseek(sim->stream, 0, SEEK_SET) == -1) {
+ lockf(sim->fd, F_ULOCK, 0 );
+ return NULL;
+ }
+ // Read the file
+ while(1) {
+ if(str) {
+ free(str);
+ str = NULL;
+ }
+ if (getline((char **)&str, &len,sim->stream) == -1) {
+ break;
+ }
+ if (!strncmp(str, key, strlen(key))) {
+ char k[256], s[256], *ret, past[256];
+ unsigned int ttl;
+ double delta;
+ sscanf(str,"%s %s %d %s",k, s, &ttl, past);
+ strptime(past, T_FORMAT, &tm);
+ old = mktime(&tm);
+ new = time(NULL);
+ delta = difftime(new, old);
+ // check if key is expired.
+ // If ttl is 0, it means key has infinite ttk=l.
+ if ((!ttl) || ((delta >= 0) && (delta < ttl))) {
+ ret = calloc(1, strlen(s) + 1);
+ strcpy(ret,s);
+ free(str);
+ lockf(sim->fd, F_ULOCK, 0 );
+ return(ret);
+ }
+ }
+ }
+ lockf(sim->fd, F_ULOCK, 0 );
+ return NULL;
+}
+
+
+etcd_result
+etcd_set (etcd_session this, char *key, char *value,
+ char *precond, unsigned int ttl)
+{
+ char *str = NULL;
+ char buf[255];
+ char tp[255];
+ char s[255];
+ size_t len;
+ etcd_sim_t *sim = (etcd_sim_t *)this;
+ struct tm tm;
+ time_t old, new;
+ lockf(sim->fd, F_LOCK, 0 );
+ if (fseek(sim->stream, 0, SEEK_SET) == -1) {
+ lockf(sim->fd, F_ULOCK, 0 );
+ return ETCD_WTF;
+ }
+ while(1) {
+ if(str) {
+ free(str);
+ str = NULL;
+ }
+ if (getline((char **)&str, &len,sim->stream) == -1) {
+ break;
+ }
+ if (!strncmp(str, key, strlen(key))) {
+ char k[256], s[256], past[256];
+ unsigned int ttl;
+ double delta;
+ sscanf(str,"%s %s %d %s",k, s, &ttl, past);
+ strptime(past, T_FORMAT, &tm);
+ old = mktime(&tm);
+ new = time(NULL);
+ delta = difftime(new, old);
+ // check if the present key is expired
+ if ( (!ttl) || ((delta >= 0) && (delta < ttl))) {
+ // present key not expired. In case of precondition,
+ // check if it matches. If not return with error
+ // In case of no precond, return error since
+ // present key not yet expired.
+ if ((!precond) || (strcmp(precond, s))) {
+ free(str);
+ lockf(sim->fd, F_ULOCK, 0 );
+ return ETCD_WTF;
+ }
+ }
+ fseek(sim->stream, -strlen(str), SEEK_CUR);
+ free(str);
+ goto here;
+ }
+ }
+here:
+ memset(tp, 0, 255);
+ new = time(NULL);
+ memcpy(&tm, localtime(&new), sizeof(struct tm));
+ strftime(buf, sizeof(buf), T_FORMAT, &tm);
+ // what we want to print in the file is something like this
+ // key value(at offset of 64) ttl(offset to 128) time(left offset to 140)
+ // hence we would want to create a format buf as follows:
+ // "%-64s%-64s%-16d%-18s"
+ // Hence we construct this first (in string s) and use that to print into tp
+ // which gets written to the registry file.
+ sprintf(s,"%%-%ds%%-%ds%%-%dd%%s\n",
+ MAX_KEY_LEN, MAX_VALUE_LEN, MAX_TTL_LEN);
+ sprintf(tp,s,key, value, ttl, buf);
+ if (fwrite(tp, 1,strlen(tp), sim->stream) != strlen(tp)) {
+ lockf(sim->fd, F_ULOCK, 0 );
+ return ETCD_WTF;
+ }
+ fflush(sim->stream);
+ lockf(sim->fd, F_ULOCK, 0 );
+ return ETCD_OK;
+}
+
+
+
+etcd_session
+etcd_open_str (char *server_names)
+{
+ etcd_sim_t *sim;
+ char name[256];
+
+ sim = calloc(1, sizeof(etcd_sim_t));
+ sprintf(name, "/tmp/%s", server_names);
+ sim->fd = open(name, O_RDWR | O_CREAT);
+ if (sim->fd == -1)
+ return NULL;
+ sim->stream = fopen(name, "r+");
+ if (sim->stream == NULL)
+ return NULL;
+
+ return ((void *)sim);
+}
+
+
+void
+etcd_close_str (etcd_session this)
+{
+ etcd_close(this);
+}
diff --git a/xlators/cluster/nsr-server/src/leader.c b/xlators/cluster/nsr-server/src/leader.c
index bb0dbabe7..319f99317 100644
--- a/xlators/cluster/nsr-server/src/leader.c
+++ b/xlators/cluster/nsr-server/src/leader.c
@@ -23,11 +23,14 @@
#include "api/src/glfs.h"
#include "api/src/glfs-internal.h"
+#ifndef NSR_SIM_ETCD
#include "etcd-api.h"
+#endif
#include "nsr-internal.h"
#include "../../nsr-recon/src/recon_driver.h"
#include "../../nsr-recon/src/recon_xlator.h"
+
/* Vote format: UUID,vote_status,fitness,term_number */
#define VOTE_ELEMS 4 /* Whole match plus four actual pieces. */
#define DEFAULT_FITNESS 42
@@ -39,6 +42,10 @@ enum { NO_LEADER, TENTATIVE, CONFIRMED };
regex_t vote_re;
+// Simulation of etcd routines
+#ifndef NSR_SIM_ETCD
+#endif
+
long
nsr_get_fitness (xlator_t *this)
{
@@ -46,69 +53,14 @@ nsr_get_fitness (xlator_t *this)
return 42;
}
-long
-nsr_get_term (xlator_t *this)
-{
- nsr_private_t *priv = this->private;
- char *text = NULL;
- etcd_session etcd = priv->etcd;
-
- text = etcd_get(etcd, priv->term_uuid);
- // first time and hence no key at all.
- // this should ideally be done at vol creation time
- // by glusterd. Move it there later
- if(text == NULL) {
- gf_log (this->name, GF_LOG_TRACE, "nsr_get_term returns 1");
- return 0;
- } else {
- gf_log (this->name, GF_LOG_TRACE,
- "nsr_get_term returns %ld", strtol(text, NULL, 10));
- return (strtol(text, NULL, 10));
- }
-}
-
-
-// in etcd-api-master.
-// send a patch to this package to expose this
-extern size_t
-parse_get_response (void *ptr, size_t size, size_t nmemb, void *stream);
-typedef struct {
- etcd_server *servers;
-} _etcd_session;
-typedef size_t curl_callback_t (void *, size_t, size_t, void *);
-extern etcd_result etcd_get_one (_etcd_session *this, char *key, etcd_server *srv, char *prefix,
- char *post, curl_callback_t cb, char **stream);
-
-
-
-void
-nsr_leader_cb(glfs_fd_t *fd, ssize_t ret, void *data)
-{
- xlator_t *this = (xlator_t *) data;
- nsr_private_t *priv = this->private;
-
- gf_log (this->name, GF_LOG_INFO,
- "nsr_leader_cb arrived with return value %d", (int)ret);
-
- // TBD - error handling; look at ret
- atomic_fetch_and(&(priv->fence_io), 0);
-
- return;
-}
-
-void
-nsr_set_leader (xlator_t *this)
+static void
+nsr_set_leader (xlator_t *this, etcd_session etcd)
{
long term = 0;
- etcd_server *srv;
etcd_result res;
- char *value = NULL;
nsr_private_t *priv = this->private;
- _etcd_session *etcd = priv->etcd;
char *term_key = priv->term_uuid;
- char *master_key = priv->vol_uuid;
char n_t[sizeof(long)+1];
- nsr_recon_role_t role;
char *text = NULL;
gf_log (this->name, GF_LOG_INFO, "Just became leader");
@@ -134,45 +86,12 @@ nsr_set_leader (xlator_t *this)
priv->current_term = term + 1;
+ // Move this inside recon notify???
atomic_fetch_or(&(priv->fence_io), 1);
- role.num = 0;
- role.role = leader;
- // Get the rest of nodes for this term.
- // TBD: fix this so that it uses per-brick keys instead of violating
- // modularity and making bad assumptions about etcd behavior.
- for (srv = etcd->servers; srv->host; ++srv) {
- res = etcd_get_one(etcd,master_key,srv,"keys/",NULL,
- parse_get_response,&value);
- gf_log (this->name, GF_LOG_INFO,
- "Probing for %s, got %d, value:%s",
- srv->host, res, value);
- if ((res == ETCD_OK) && value) {
- gf_log (this->name, GF_LOG_INFO,
- "Found for %s", srv->host);
- strcpy(role.info[role.num].name, srv->host);
- (role.num)++;
- }
- value = NULL;
- }
- gf_log (this->name, GF_LOG_INFO,
- "Discovered %d nodes that has key %s", role.num, master_key);
-
- gf_log (this->name, GF_LOG_INFO,
- "setting current term as %ld", term + 1);
- role.current_term = term + 1;
- ENDIAN_CONVERSION_RR(role, _gf_false);
-
- // inform the reconciliator that this is leader
- // in the callback (once reconciliation is done),
- // we will unfence the IOs.
- // TBD - error handling later.
- glfs_lseek(priv->fd, nsr_recon_xlator_sector_1, SEEK_SET);
- gf_log (this->name, GF_LOG_INFO,
- "Writing to local node to set leader");
- glfs_write_async(priv->fd, &role,
- sizeof(role),nsr_recon_xlator_sector_1,
- nsr_leader_cb, this);
+ nsr_recon_notify_event_set_leader(priv);
+
+ return;
}
@@ -218,7 +137,7 @@ nsr_get_leader (xlator_t *this, etcd_session etcd, char *key)
gf_log (this->name, GF_LOG_TRACE,
"leader is %s\n",nominee);
if (strcmp(nominee,priv->brick_uuid) == 0) {
- nsr_set_leader(this);
+ nsr_set_leader(this, etcd);
retval = LS_SUCCESS;
}
else {
@@ -316,60 +235,10 @@ nsr_init_re (xlator_t *this)
}
-uint32_t
-nsr_leader_setup_recon (xlator_t *this)
-{
- nsr_private_t *priv = this->private;
- xlator_t *old = this;
- uint32_t ret = 0;
-
- if (priv->nsr_recon_start == _gf_false)
- return 0;
-
- priv->fs = glfs_new(priv->vol_uuid);
- if (!priv->fs) {
- ret = 1;
- gf_log (this->name, GF_LOG_ERROR, "failed to initialise glfs \n");
- goto done;
- }
-
- glusterfs_this_set(old);
- ret = glfs_set_volfile(priv->fs, priv->vol_file);
- if (ret != 0) {
- gf_log (this->name, GF_LOG_ERROR, "failed to set volfile \n");
- goto done;
- }
-
- glusterfs_this_set(old);
- /*
- * REVIEW
- * Logs belong in /var/log not /tmp.
- */
- glfs_set_logging (priv->fs,"/tmp/glfs-log", 7);
- if (glfs_init(priv->fs) < 0) {
- gf_log (this->name, GF_LOG_ERROR, "failed to init volfile \n");
- ret = 1;
- goto done;
- }
-
- glusterfs_this_set(old);
- priv->fd = glfs_open (priv->fs, "/", O_RDWR);
- if (priv->fd == NULL) {
- ret = 1;
- gf_log (this->name, GF_LOG_ERROR,
- "failed to open fd to communicate with recon process \n");
- goto done;
- }
-
-
-done:
- glusterfs_this_set(old);
- return ret;
-}
-
void *
-nsr_leader_thread (xlator_t *this)
+nsr_leader_thread (void *arg)
{
+ xlator_t *this = (xlator_t *) arg;
leader_retval_t retval;
nsr_private_t *priv = this->private;
@@ -378,14 +247,6 @@ nsr_leader_thread (xlator_t *this)
return NULL;
}
- if (nsr_leader_setup_recon(this)) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to do glfs initialisation inside leader thread");
- return NULL;
- }
-
- priv->leader_inited = 1;
-
gf_log (this->name, GF_LOG_INFO,
"calling glfs_opens_str on servers %s", priv->etcd_servers);
@@ -396,6 +257,8 @@ nsr_leader_thread (xlator_t *this)
return NULL;
}
+ priv->leader_inited = 1;
+
for (;;) {
if (nsr_get_leader(this,priv->etcd,priv->vol_uuid) == LS_ERROR) {
break;
diff --git a/xlators/cluster/nsr-server/src/nsr-internal.h b/xlators/cluster/nsr-server/src/nsr-internal.h
index 282247a47..4382f5426 100644
--- a/xlators/cluster/nsr-server/src/nsr-internal.h
+++ b/xlators/cluster/nsr-server/src/nsr-internal.h
@@ -22,6 +22,16 @@ enum {
gf_mt_nsr_end
};
+typedef enum nsr_recon_notify_ev_id_t {
+ NSR_RECON_SET_LEADER = 1,
+ NSR_RECON_ADD_CHILD = 2
+} nsr_recon_notify_ev_id_t;
+
+typedef struct _nsr_recon_notify_ev_s {
+ nsr_recon_notify_ev_id_t id;
+ uint32_t index; // in case of add
+ struct list_head list;
+} nsr_recon_notify_ev_t;
typedef struct {
char *etcd_servers;
@@ -29,21 +39,23 @@ typedef struct {
char *term_uuid;
char *brick_uuid;
gf_boolean_t leader;
+ uint8_t up_children;
uint8_t n_children;
char *vol_file;
- glfs_t *fs;
etcd_session etcd;
volatile unsigned int fence_io;
- glfs_fd_t *fd;
uint32_t current_term;
#ifdef NSR_DEBUG
uint32_t leader_log_fd;
#endif
+ volatile int recon_notify_inited;
volatile int leader_inited;
uint32_t kid_state;
gf_lock_t dirty_lock;
struct list_head dirty_fds;
gf_boolean_t nsr_recon_start;
+ void * recon_ctx;
+ volatile uint32_t ops_in_flight;
} nsr_private_t;
typedef struct {
@@ -79,3 +91,7 @@ typedef struct {
struct list_head pqueue;
} nsr_inode_ctx_t;
+void nsr_recon_notify_event_set_leader(nsr_private_t *priv);
+void nsr_recon_notify_event_add_child(nsr_private_t *priv, uint32_t index);
+void* nsr_recon_notify_thread (void *this);
+
diff --git a/xlators/cluster/nsr-server/src/nsr.c b/xlators/cluster/nsr-server/src/nsr.c
index 3707b3003..f85368456 100644
--- a/xlators/cluster/nsr-server/src/nsr.c
+++ b/xlators/cluster/nsr-server/src/nsr.c
@@ -258,7 +258,6 @@ nsr_getxattr_special (call_frame_t *frame, xlator_t *this, loc_t *loc,
const char *name, dict_t *xdata)
{
dict_t *result;
- uint8_t up;
nsr_private_t *priv = this->private;
if (!priv->leader) {
@@ -279,8 +278,8 @@ nsr_getxattr_special (call_frame_t *frame, xlator_t *this, loc_t *loc,
goto dn_failed;
}
- up = nsr_count_up_kids(this->private);
- if (dict_set_uint32(result,NSR_REP_COUNT_XATTR,up) != 0) {
+ priv->up_children = nsr_count_up_kids(this->private);
+ if (dict_set_uint32(result,NSR_REP_COUNT_XATTR,priv->up_children) != 0) {
goto dsu_failed;
}
@@ -399,6 +398,8 @@ nsr_reconfigure (xlator_t *this, dict_t *options)
nsr_private_t *priv = this->private;
GF_OPTION_RECONF ("leader", priv->leader, options, bool, err);
+ gf_log (this->name, GF_LOG_INFO,
+ "reconfigure called. setting priv->leader to %d\n", priv->leader);
return 0;
err:
@@ -440,20 +441,25 @@ nsr_notify (xlator_t *this, int event, void *data, ...)
index = nsr_get_child_index(this,data);
if (index >= 0) {
priv->kid_state |= (1 << index);
+ priv->up_children = nsr_count_up_kids(priv);
gf_log (this->name, GF_LOG_INFO,
"got CHILD_UP for %s, now %u kids",
((xlator_t *)data)->name,
- nsr_count_up_kids(priv));
+ priv->up_children);
+ if (priv->nsr_recon_start == _gf_true) {
+ nsr_recon_notify_event_add_child(priv, index);
+ }
}
break;
case GF_EVENT_CHILD_DOWN:
index = nsr_get_child_index(this,data);
if (index >= 0) {
priv->kid_state &= ~(1 << index);
+ priv->up_children = nsr_count_up_kids(priv);
gf_log (this->name, GF_LOG_INFO,
"got CHILD_DOWN for %s, now %u kids",
((xlator_t *)data)->name,
- nsr_count_up_kids(priv));
+ priv->up_children);
}
break;
default:
@@ -475,7 +481,7 @@ nsr_init (xlator_t *this)
xlator_list_t *trav;
pthread_t kid;
uuid_t tmp_uuid;
- char *my_name = NULL, *recon_file = NULL, *recon_pid_file = NULL, *ptr = NULL;
+ char *my_name = NULL, *morph_name = NULL, *recon_file = NULL, *recon_pid_file = NULL, *ptr = NULL;
char *volname;
extern xlator_t global_xlator;
glusterfs_ctx_t *oldctx = global_xlator.ctx;
@@ -552,34 +558,36 @@ nsr_init (xlator_t *this)
gf_log (this->name, GF_LOG_ERROR, "vol name not generated. ???");
goto err;
}
-
- recon_file = GF_CALLOC (1,PATH_MAX + strlen(my_name) + strlen("con") +1, gf_mt_nsr_private_t);
- recon_pid_file = GF_CALLOC (1,PATH_MAX + strlen(my_name) + strlen("recon") +1, gf_mt_nsr_private_t);
+
+ morph_name = GF_CALLOC (1, strlen(my_name) + 1, gf_mt_nsr_private_t);
+ strcpy(morph_name, my_name);
+ recon_file = GF_CALLOC (1,PATH_MAX + strlen(morph_name) + strlen("con") +1, gf_mt_nsr_private_t);
+ recon_pid_file = GF_CALLOC (1,PATH_MAX + strlen(morph_name) + strlen("recon") +1, gf_mt_nsr_private_t);
if ((!recon_file) || (!recon_pid_file)) {
gf_log (this->name, GF_LOG_ERROR, "could not allocate reconciliation file name");
goto err;
}
- ptr = strchr (my_name, '/');
+ ptr = strchr (morph_name, '/');
while (ptr) {
*ptr = '-';
- ptr = strchr (my_name, '/');
+ ptr = strchr (morph_name, '/');
}
sprintf(recon_file,"/%s/%s/%s/%s/",GLUSTERD_DEFAULT_WORKDIR,
GLUSTERD_VOLUME_DIR_PREFIX,
volname,
GLUSTERD_BRICK_INFO_DIR);
- strcat(recon_file, my_name);
+ strcat(recon_file, morph_name);
strcat(recon_file, "-nsr-recon.vol");
sprintf(recon_pid_file,"/%s/%s/%s/%s/",GLUSTERD_DEFAULT_WORKDIR,
GLUSTERD_VOLUME_DIR_PREFIX,
volname,
"run");
- strcat(recon_pid_file, my_name);
+ strcat(recon_pid_file, morph_name);
strcat(recon_pid_file, "-recon.pid");
- priv->vol_file = GF_CALLOC (1,PATH_MAX + strlen(my_name) + strlen("con") +1, gf_mt_nsr_private_t);
+ priv->vol_file = GF_CALLOC (1,PATH_MAX + strlen(morph_name) + strlen("con") +1, gf_mt_nsr_private_t);
if (!priv->vol_file) {
gf_log (this->name, GF_LOG_ERROR, "could not allocate reconciliation file name");
goto err;
@@ -590,7 +598,7 @@ nsr_init (xlator_t *this)
volname,
GLUSTERD_BRICK_INFO_DIR);
strcat(priv->vol_file, "con:");
- strcat(priv->vol_file, my_name);
+ strcat(priv->vol_file, morph_name);
if (pthread_create(&kid,NULL,nsr_flush_thread,this) != 0) {
gf_log (this->name, GF_LOG_ERROR,
@@ -622,10 +630,17 @@ nsr_init (xlator_t *this)
}
+ (void)pthread_create(&kid,NULL,nsr_recon_notify_thread,this);
+ while (priv->recon_notify_inited == 0) {
+ sleep(1);
+ }
+
(void)pthread_create(&kid,NULL,nsr_leader_thread,this);
while (priv->leader_inited == 0) {
sleep(1);
}
+
+
/*
* Calling glfs_new changes old->ctx, even if THIS still points
* to global_xlator. That causes problems later in the main
diff --git a/xlators/cluster/nsr-server/src/recon_notify.c b/xlators/cluster/nsr-server/src/recon_notify.c
new file mode 100644
index 000000000..9cf2fce5d
--- /dev/null
+++ b/xlators/cluster/nsr-server/src/recon_notify.c
@@ -0,0 +1,345 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <string.h>
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "call-stub.h"
+#include "defaults.h"
+#include "xlator.h"
+#include "api/src/glfs.h"
+#include "api/src/glfs-internal.h"
+#include "etcd-api.h"
+#include "nsr-internal.h"
+#include "../../nsr-recon/src/recon_driver.h"
+#include "../../nsr-recon/src/recon_xlator.h"
+
+
+
+typedef struct _nsr_recon_notify_ctx_t {
+ nsr_recon_notify_ev_t recon_head;
+ pthread_mutex_t recon_mutex;
+ pthread_cond_t recon_cv;
+ char **hosts; // list of hosts ordered depending on child indices
+ uint32_t current_term;
+ uint32_t last_reconciled_term;
+ glfs_t *fs;
+ glfs_fd_t *fd;
+} nsr_recon_notify_ctx_t;
+
+static int
+xlator_get_option (xlator_t *xl, char *key, char **value)
+{
+ GF_ASSERT (xl);
+ return dict_get_str (xl->options, key, value);
+}
+
+void nsr_recon_notify_event_set_leader(nsr_private_t *priv)
+{
+ nsr_recon_notify_ev_t *ev;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+
+ ev = GF_CALLOC (1, sizeof (nsr_recon_notify_ev_t), 0);
+ ev->id = NSR_RECON_SET_LEADER;
+ INIT_LIST_HEAD(&(ev->list));
+ pthread_mutex_lock(&ctx->recon_mutex);
+ list_add_tail(&ev->list, &ctx->recon_head.list);
+ pthread_cond_signal(&ctx->recon_cv);
+ pthread_mutex_unlock(&ctx->recon_mutex);
+}
+
+void nsr_recon_notify_event_add_child(nsr_private_t *priv, uint32_t index)
+{
+ nsr_recon_notify_ev_t *ev;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+
+ ev = GF_CALLOC (1, sizeof (nsr_recon_notify_ev_t), 0);
+ ev->id = NSR_RECON_ADD_CHILD;
+ ev->index = index;
+ INIT_LIST_HEAD(&(ev->list));
+ pthread_mutex_lock(&ctx->recon_mutex);
+ list_add_tail(&ev->list, &ctx->recon_head.list);
+ pthread_cond_signal(&ctx->recon_cv);
+ pthread_mutex_unlock(&ctx->recon_mutex);
+}
+
+
+static void
+nsr_recon_set_leader (xlator_t *this)
+{
+
+ nsr_private_t *priv = this->private;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+ nsr_recon_role_t role;
+ xlator_t *old = this;
+ uint32_t i=0;
+
+ if (priv->leader != _gf_true)
+ return;
+
+ if (ctx->last_reconciled_term == priv->current_term)
+ return;
+
+ // No majority as of yet
+ if (priv->up_children <= (priv->n_children / 2))
+ return;
+
+ gf_log (this->name, GF_LOG_INFO,
+ "Sending message to do recon with %d nodes\n",
+ priv->up_children);
+
+ role.num = 0;
+ role.role = leader;
+ for (i = 0; i < priv->n_children; ++i) {
+ if (priv->kid_state & (1 << i)) {
+ gf_log (this->name, GF_LOG_INFO,
+ "Recon using host %s",
+ ctx->hosts[i]);
+ strcpy(role.info[role.num].name, ctx->hosts[i]);
+ (role.num)++;
+ }
+ }
+
+ gf_log (this->name, GF_LOG_INFO,
+ "setting current term as %d", priv->current_term);
+ role.current_term = priv->current_term;
+ ENDIAN_CONVERSION_RR(role, _gf_false);
+
+ // inform the reconciliator that this is leader
+ // in the callback (once reconciliation is done),
+ // we will unfence the IOs.
+ // TBD - error handling later.
+ glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET);
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_INFO,
+ "Writing to local node to set leader");
+ glfs_write(ctx->fd, &role,
+ sizeof(role), 0);
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_INFO,
+ "glfs_write returned. unfencing IO\n");
+
+ // TBD - error handling
+
+ ctx->last_reconciled_term = priv->current_term;
+ atomic_fetch_and(&(priv->fence_io), 0);
+
+ return;
+}
+
+static void
+nsr_recon_add_child (xlator_t *this, uint32_t index)
+{
+ nsr_private_t *priv = this->private;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+ nsr_recon_role_t role;
+ xlator_t *old = this;
+
+ if (priv->leader != _gf_true)
+ return;
+
+ // reconciliation still pending.
+ // Check if we have majority
+ if (ctx->last_reconciled_term != priv->current_term) {
+ nsr_recon_set_leader(this);
+ } else {
+ // Reconciliation done.
+ // new child joining the majority/
+ // Do reconciliation only fot this child but after fencing new IO and draining old IO
+ role.num = 1;
+ role.role = joiner;
+
+ atomic_fetch_or(&(priv->fence_io), 1);
+ while(priv->ops_in_flight) {
+ sleep(1);
+ }
+
+ strcpy(role.info[0].name, ctx->hosts[index]);
+ role.current_term = priv->current_term;
+ ENDIAN_CONVERSION_RR(role, _gf_false);
+ glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET);
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_INFO,
+ "Writing to local node to join %s\n", role.info[0].name);
+ glfs_write(ctx->fd, &role,
+ sizeof(role), 0);
+ glusterfs_this_set(old);
+ gf_log (this->name, GF_LOG_INFO,
+ "Write to local node to set joiner returned\n");
+
+ // TBD - error handling
+ atomic_fetch_and(&(priv->fence_io), 0);
+ }
+
+ return;
+}
+
+static uint32_t
+nsr_setup_recon (xlator_t *this)
+{
+ nsr_private_t *priv = this->private;
+ xlator_t *old = this;
+ uint32_t ret = 0;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+
+ if (priv->nsr_recon_start == _gf_false)
+ return 0;
+
+ ctx->fs = glfs_new(priv->vol_uuid);
+ if (!ctx->fs) {
+ ret = 1;
+ gf_log (this->name, GF_LOG_ERROR, "failed to initialise glfs \n");
+ goto done;
+ }
+
+ glusterfs_this_set(old);
+ ret = glfs_set_volfile(ctx->fs, priv->vol_file);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to set volfile \n");
+ goto done;
+ }
+
+ glusterfs_this_set(old);
+ /*
+ * REVIEW
+ * Logs belong in /var/log not /tmp.
+ */
+ glfs_set_logging (ctx->fs,"/tmp/glfs-log", 7);
+ if (glfs_init(ctx->fs) < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to init volfile \n");
+ ret = 1;
+ goto done;
+ }
+
+ glusterfs_this_set(old);
+ ctx->fd = glfs_open (ctx->fs, "/", O_RDWR);
+ if (ctx->fd == NULL) {
+ ret = 1;
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to open fd to communicate with recon process \n");
+ goto done;
+ }
+
+
+done:
+ glusterfs_this_set(old);
+ return ret;
+}
+
+
+static void
+nsr_setup_hosts(xlator_t *this)
+{
+ xlator_list_t *trav;
+ nsr_private_t *priv = this->private;
+ uint32_t i = 0;
+ nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx;
+
+ ctx->hosts = GF_CALLOC(sizeof(char *), priv->n_children, gf_mt_nsr_private_t);
+ // Iterate thru all the children
+ for (trav = this->children; trav; trav = trav->next) {
+ char *hostname = NULL, *vol = NULL;
+ int ret1 = 0, ret2 = 0, ret = 0;
+ xlator_t *xl = trav->xlator;
+ // If the child type is that of protocol/client
+ if (!strcmp(trav->xlator->type, "protocol/client")) {
+ ret1 = xlator_get_option (xl, "remote-host", &hostname);
+ ret2 = xlator_get_option (xl, "remote-subvolume", &vol);
+ if (!ret1 && !ret2) {
+ // add the name of that host to the hosts
+ ctx->hosts[i] = GF_CALLOC(sizeof(char), strlen(hostname) + strlen(vol) + 2, 0);
+ strcpy(ctx->hosts[i], hostname);
+ strcat(ctx->hosts[i], ":");
+ strcat(ctx->hosts[i], vol);
+ gf_log (this->name, GF_LOG_INFO,
+ "adding hosts %s to recon notfiy list", ctx->hosts[i]);
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "CANNOT FIND HOSTNAME FOR A CHILD");
+ GF_ASSERT(0);
+ }
+ // local brick
+ } else {
+ ret = xlator_get_option (this, "my-name", &hostname);
+ if (!ret) {
+ uint32_t len = strlen(hostname);
+ ctx->hosts[i] = GF_CALLOC(sizeof(char),
+ len+1,
+ gf_mt_nsr_private_t);
+ strcpy(ctx->hosts[i], hostname);
+ gf_log (this->name, GF_LOG_INFO,
+ "adding my host %s to recon notfiy list", ctx->hosts[i]);
+ } else {
+ gf_log (this->name, GF_LOG_ERROR,
+ "CANNOT FIND MY HOSTNAME");
+ GF_ASSERT(0);
+ }
+ }
+ i++;
+ }
+}
+
+void *
+nsr_recon_notify_thread (void *arg)
+{
+ xlator_t *this = (xlator_t *)arg;
+ nsr_private_t *priv = this->private;
+ nsr_recon_notify_ev_t *ev;
+ nsr_recon_notify_ctx_t *ctx;
+
+ priv->recon_ctx = GF_CALLOC(1, sizeof(nsr_recon_notify_ctx_t), gf_mt_nsr_private_t);
+ if (!priv->recon_ctx) {
+ gf_log (this->name, GF_LOG_ERROR, "calloc error");
+ return NULL;
+ }
+ ctx = priv->recon_ctx;
+
+ pthread_mutex_init(&(ctx->recon_mutex), NULL);
+ pthread_cond_init(&(ctx->recon_cv), NULL);
+ INIT_LIST_HEAD(&(ctx->recon_head.list));
+
+ nsr_setup_hosts(this);
+
+ if (nsr_setup_recon(this)) {
+ gf_log (this->name, GF_LOG_ERROR, "recon notify thread : initing glfs error");
+ return NULL;
+ }
+
+ priv->recon_notify_inited = 1;
+
+ while(1) {
+ pthread_mutex_lock(&ctx->recon_mutex);
+ while (list_empty(&(ctx->recon_head.list))) {
+ pthread_cond_wait(&ctx->recon_cv, &ctx->recon_mutex);
+ }
+ pthread_mutex_unlock(&ctx->recon_mutex);
+
+ list_for_each_entry(ev, &(ctx->recon_head.list), list) {
+
+ if (ev->id == NSR_RECON_SET_LEADER) {
+ gf_log (this->name, GF_LOG_INFO,
+ "got add leader notfiy event");
+ nsr_recon_set_leader(this);
+ } else if (ev->id == NSR_RECON_ADD_CHILD) {
+ gf_log (this->name, GF_LOG_INFO,
+ "got add child notify event");
+ nsr_recon_add_child(this, ev->index);
+ }
+ }
+ list_del_init (&ev->list);
+ }
+
+ return NULL;
+}
+