diff options
author | Raghavan P <rpichai@redhat.com> | 2014-01-03 16:09:04 +0530 |
---|---|---|
committer | Raghavan P <rpichai@redhat.com> | 2014-01-08 14:48:21 +0530 |
commit | e0cce4cf7c22d5cd8ab6c2aff4ecf28c18c6a469 (patch) | |
tree | 5e30d20eaf43c77f77d5aa9d4351492af659b39f /xlators/cluster/nsr-server | |
parent | 82ce8acfdfb141c6b34b6b6b43ef78eee891f9e8 (diff) |
Changes to NSR reconciliation code.
Following is list of changes:
1) Simulation of etcd using a file as a registry protected using locks.
2) Implement notifications for child up and child down.
3) Join a new brick into quorum.
4) add support for proper fencing and draining of IO required for reconciliaiton
5) misc changes and addressed review comments.
Change-Id: Iddd1137ad6205252ed03301888bb1e83fa2221e0
Signed-off-by: Raghavan P <rpichai@redhat.com>
Diffstat (limited to 'xlators/cluster/nsr-server')
-rw-r--r-- | xlators/cluster/nsr-server/src/Makefile.am | 8 | ||||
-rw-r--r-- | xlators/cluster/nsr-server/src/all-templates.c | 8 | ||||
-rw-r--r-- | xlators/cluster/nsr-server/src/etcd-sim.c | 222 | ||||
-rw-r--r-- | xlators/cluster/nsr-server/src/leader.c | 173 | ||||
-rw-r--r-- | xlators/cluster/nsr-server/src/nsr-internal.h | 20 | ||||
-rw-r--r-- | xlators/cluster/nsr-server/src/nsr.c | 45 | ||||
-rw-r--r-- | xlators/cluster/nsr-server/src/recon_notify.c | 345 |
7 files changed, 648 insertions, 173 deletions
diff --git a/xlators/cluster/nsr-server/src/Makefile.am b/xlators/cluster/nsr-server/src/Makefile.am index df0d68539..85a560d09 100644 --- a/xlators/cluster/nsr-server/src/Makefile.am +++ b/xlators/cluster/nsr-server/src/Makefile.am @@ -4,9 +4,15 @@ xlator_LTLIBRARIES = nsr.la xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/cluster nsr_la_LDFLAGS = -module -avoid-version -lgfapi -lcurl -nsr_la_SOURCES = nsr.c leader.c etcd-api.c \ + +if ENABLE_ETCD_SIM +nsr_la_SOURCES = nsr.c leader.c recon_notify.c etcd-sim.c +else +nsr_la_SOURCES = nsr.c leader.c recon_notify.c etcd-api.c \ yajl.c yajl_alloc.c yajl_buf.c yajl_encode.c yajl_gen.c \ yajl_lex.c yajl_parser.c yajl_tree.c yajl_version.c +endif + nsr_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la diff --git a/xlators/cluster/nsr-server/src/all-templates.c b/xlators/cluster/nsr-server/src/all-templates.c index 541653029..7300973d5 100644 --- a/xlators/cluster/nsr-server/src/all-templates.c +++ b/xlators/cluster/nsr-server/src/all-templates.c @@ -83,17 +83,20 @@ nsr_$NAME$ (call_frame_t *frame, xlator_t *this, // follower/recon path // just send it to local node if (from_leader || from_recon) { + atomic_inc(&priv->ops_in_flight); STACK_WIND (frame, nsr_$NAME$_complete, FIRST_CHILD(this), FIRST_CHILD(this)->fops->$NAME$, $ARGS_SHORT$); return 0; } + if (!priv->leader || priv->fence_io) { op_errno = EREMOTE; goto err; } + if (!xdata) { xdata = dict_new(); if (!xdata) { @@ -115,6 +118,7 @@ nsr_$NAME$ (call_frame_t *frame, xlator_t *this, goto err; } + #if defined(NSR_CG_QUEUE) nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this,fd->inode); if (!ictx) { @@ -188,6 +192,8 @@ nsr_$NAME$_dispatch (call_frame_t *frame, xlator_t *this, nsr_private_t *priv = this->private; xlator_list_t *trav; + atomic_inc(&priv->ops_in_flight); + /* * TBD: unblock pending request(s) if we fail after this point but * before we get to nsr_$NAME$_complete (where that code currently @@ -246,6 +252,7 @@ nsr_$NAME$_complete (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, $ARGS_LONG$) { + nsr_private_t *priv = this->private; #if defined(NSR_CG_NEED_FD) nsr_local_t *local = frame->local; #endif @@ -294,6 +301,7 @@ nsr_$NAME$_complete (call_frame_t *frame, void *cookie, xlator_t *this, STACK_UNWIND_STRICT ($NAME$, frame, op_ret, op_errno, $ARGS_SHORT$); + atomic_dec(&priv->ops_in_flight); return 0; } diff --git a/xlators/cluster/nsr-server/src/etcd-sim.c b/xlators/cluster/nsr-server/src/etcd-sim.c new file mode 100644 index 000000000..5c5cdcec0 --- /dev/null +++ b/xlators/cluster/nsr-server/src/etcd-sim.c @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2013, Red Hat + * All rights reserved. + + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. Redistributions in binary + * form must reproduce the above copyright notice, this list of conditions and + * the following disclaimer in the documentation and/or other materials + * provided with the distribution. + + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include "call-stub.h" +#include "defaults.h" +#include "xlator.h" +#include "api/src/glfs.h" +#include "api/src/glfs-internal.h" +#include "run.h" + + +/* + * Mock implementation of etcd + * The etcd file is simulated in /tmp/<server-names> + * Writes from Multiple writers are protected using file lock. +*/ + +#include "etcd-api.h" +#define T_FORMAT "%d-%b-%Y,%H:%M:%S" +#define MAX_KEY_LEN 64 +#define MAX_VALUE_LEN 64 +#define MAX_TTL_LEN 12 + +etcd_session +etcd_open (etcd_server *server_list) +{ + return NULL; +} + +typedef struct _etcd_sim_s { + int fd; + FILE *stream; +} etcd_sim_t; + +void +etcd_close (etcd_session this) +{ + etcd_sim_t *sim = (etcd_sim_t *)this; + fflush(sim->stream); + fclose(sim->stream); + close(sim->fd); + free(this); +} + + +char * +etcd_get (etcd_session this, char *key) +{ + char *str = NULL; + size_t len; + etcd_sim_t *sim = (etcd_sim_t *)this; + struct tm tm; + time_t old, new; + lockf(sim->fd, F_LOCK, 0 ); + if (fseek(sim->stream, 0, SEEK_SET) == -1) { + lockf(sim->fd, F_ULOCK, 0 ); + return NULL; + } + // Read the file + while(1) { + if(str) { + free(str); + str = NULL; + } + if (getline((char **)&str, &len,sim->stream) == -1) { + break; + } + if (!strncmp(str, key, strlen(key))) { + char k[256], s[256], *ret, past[256]; + unsigned int ttl; + double delta; + sscanf(str,"%s %s %d %s",k, s, &ttl, past); + strptime(past, T_FORMAT, &tm); + old = mktime(&tm); + new = time(NULL); + delta = difftime(new, old); + // check if key is expired. + // If ttl is 0, it means key has infinite ttk=l. + if ((!ttl) || ((delta >= 0) && (delta < ttl))) { + ret = calloc(1, strlen(s) + 1); + strcpy(ret,s); + free(str); + lockf(sim->fd, F_ULOCK, 0 ); + return(ret); + } + } + } + lockf(sim->fd, F_ULOCK, 0 ); + return NULL; +} + + +etcd_result +etcd_set (etcd_session this, char *key, char *value, + char *precond, unsigned int ttl) +{ + char *str = NULL; + char buf[255]; + char tp[255]; + char s[255]; + size_t len; + etcd_sim_t *sim = (etcd_sim_t *)this; + struct tm tm; + time_t old, new; + lockf(sim->fd, F_LOCK, 0 ); + if (fseek(sim->stream, 0, SEEK_SET) == -1) { + lockf(sim->fd, F_ULOCK, 0 ); + return ETCD_WTF; + } + while(1) { + if(str) { + free(str); + str = NULL; + } + if (getline((char **)&str, &len,sim->stream) == -1) { + break; + } + if (!strncmp(str, key, strlen(key))) { + char k[256], s[256], past[256]; + unsigned int ttl; + double delta; + sscanf(str,"%s %s %d %s",k, s, &ttl, past); + strptime(past, T_FORMAT, &tm); + old = mktime(&tm); + new = time(NULL); + delta = difftime(new, old); + // check if the present key is expired + if ( (!ttl) || ((delta >= 0) && (delta < ttl))) { + // present key not expired. In case of precondition, + // check if it matches. If not return with error + // In case of no precond, return error since + // present key not yet expired. + if ((!precond) || (strcmp(precond, s))) { + free(str); + lockf(sim->fd, F_ULOCK, 0 ); + return ETCD_WTF; + } + } + fseek(sim->stream, -strlen(str), SEEK_CUR); + free(str); + goto here; + } + } +here: + memset(tp, 0, 255); + new = time(NULL); + memcpy(&tm, localtime(&new), sizeof(struct tm)); + strftime(buf, sizeof(buf), T_FORMAT, &tm); + // what we want to print in the file is something like this + // key value(at offset of 64) ttl(offset to 128) time(left offset to 140) + // hence we would want to create a format buf as follows: + // "%-64s%-64s%-16d%-18s" + // Hence we construct this first (in string s) and use that to print into tp + // which gets written to the registry file. + sprintf(s,"%%-%ds%%-%ds%%-%dd%%s\n", + MAX_KEY_LEN, MAX_VALUE_LEN, MAX_TTL_LEN); + sprintf(tp,s,key, value, ttl, buf); + if (fwrite(tp, 1,strlen(tp), sim->stream) != strlen(tp)) { + lockf(sim->fd, F_ULOCK, 0 ); + return ETCD_WTF; + } + fflush(sim->stream); + lockf(sim->fd, F_ULOCK, 0 ); + return ETCD_OK; +} + + + +etcd_session +etcd_open_str (char *server_names) +{ + etcd_sim_t *sim; + char name[256]; + + sim = calloc(1, sizeof(etcd_sim_t)); + sprintf(name, "/tmp/%s", server_names); + sim->fd = open(name, O_RDWR | O_CREAT); + if (sim->fd == -1) + return NULL; + sim->stream = fopen(name, "r+"); + if (sim->stream == NULL) + return NULL; + + return ((void *)sim); +} + + +void +etcd_close_str (etcd_session this) +{ + etcd_close(this); +} diff --git a/xlators/cluster/nsr-server/src/leader.c b/xlators/cluster/nsr-server/src/leader.c index bb0dbabe7..319f99317 100644 --- a/xlators/cluster/nsr-server/src/leader.c +++ b/xlators/cluster/nsr-server/src/leader.c @@ -23,11 +23,14 @@ #include "api/src/glfs.h" #include "api/src/glfs-internal.h" +#ifndef NSR_SIM_ETCD #include "etcd-api.h" +#endif #include "nsr-internal.h" #include "../../nsr-recon/src/recon_driver.h" #include "../../nsr-recon/src/recon_xlator.h" + /* Vote format: UUID,vote_status,fitness,term_number */ #define VOTE_ELEMS 4 /* Whole match plus four actual pieces. */ #define DEFAULT_FITNESS 42 @@ -39,6 +42,10 @@ enum { NO_LEADER, TENTATIVE, CONFIRMED }; regex_t vote_re; +// Simulation of etcd routines +#ifndef NSR_SIM_ETCD +#endif + long nsr_get_fitness (xlator_t *this) { @@ -46,69 +53,14 @@ nsr_get_fitness (xlator_t *this) return 42; } -long -nsr_get_term (xlator_t *this) -{ - nsr_private_t *priv = this->private; - char *text = NULL; - etcd_session etcd = priv->etcd; - - text = etcd_get(etcd, priv->term_uuid); - // first time and hence no key at all. - // this should ideally be done at vol creation time - // by glusterd. Move it there later - if(text == NULL) { - gf_log (this->name, GF_LOG_TRACE, "nsr_get_term returns 1"); - return 0; - } else { - gf_log (this->name, GF_LOG_TRACE, - "nsr_get_term returns %ld", strtol(text, NULL, 10)); - return (strtol(text, NULL, 10)); - } -} - - -// in etcd-api-master. -// send a patch to this package to expose this -extern size_t -parse_get_response (void *ptr, size_t size, size_t nmemb, void *stream); -typedef struct { - etcd_server *servers; -} _etcd_session; -typedef size_t curl_callback_t (void *, size_t, size_t, void *); -extern etcd_result etcd_get_one (_etcd_session *this, char *key, etcd_server *srv, char *prefix, - char *post, curl_callback_t cb, char **stream); - - - -void -nsr_leader_cb(glfs_fd_t *fd, ssize_t ret, void *data) -{ - xlator_t *this = (xlator_t *) data; - nsr_private_t *priv = this->private; - - gf_log (this->name, GF_LOG_INFO, - "nsr_leader_cb arrived with return value %d", (int)ret); - - // TBD - error handling; look at ret - atomic_fetch_and(&(priv->fence_io), 0); - - return; -} - -void -nsr_set_leader (xlator_t *this) +static void +nsr_set_leader (xlator_t *this, etcd_session etcd) { long term = 0; - etcd_server *srv; etcd_result res; - char *value = NULL; nsr_private_t *priv = this->private; - _etcd_session *etcd = priv->etcd; char *term_key = priv->term_uuid; - char *master_key = priv->vol_uuid; char n_t[sizeof(long)+1]; - nsr_recon_role_t role; char *text = NULL; gf_log (this->name, GF_LOG_INFO, "Just became leader"); @@ -134,45 +86,12 @@ nsr_set_leader (xlator_t *this) priv->current_term = term + 1; + // Move this inside recon notify??? atomic_fetch_or(&(priv->fence_io), 1); - role.num = 0; - role.role = leader; - // Get the rest of nodes for this term. - // TBD: fix this so that it uses per-brick keys instead of violating - // modularity and making bad assumptions about etcd behavior. - for (srv = etcd->servers; srv->host; ++srv) { - res = etcd_get_one(etcd,master_key,srv,"keys/",NULL, - parse_get_response,&value); - gf_log (this->name, GF_LOG_INFO, - "Probing for %s, got %d, value:%s", - srv->host, res, value); - if ((res == ETCD_OK) && value) { - gf_log (this->name, GF_LOG_INFO, - "Found for %s", srv->host); - strcpy(role.info[role.num].name, srv->host); - (role.num)++; - } - value = NULL; - } - gf_log (this->name, GF_LOG_INFO, - "Discovered %d nodes that has key %s", role.num, master_key); - - gf_log (this->name, GF_LOG_INFO, - "setting current term as %ld", term + 1); - role.current_term = term + 1; - ENDIAN_CONVERSION_RR(role, _gf_false); - - // inform the reconciliator that this is leader - // in the callback (once reconciliation is done), - // we will unfence the IOs. - // TBD - error handling later. - glfs_lseek(priv->fd, nsr_recon_xlator_sector_1, SEEK_SET); - gf_log (this->name, GF_LOG_INFO, - "Writing to local node to set leader"); - glfs_write_async(priv->fd, &role, - sizeof(role),nsr_recon_xlator_sector_1, - nsr_leader_cb, this); + nsr_recon_notify_event_set_leader(priv); + + return; } @@ -218,7 +137,7 @@ nsr_get_leader (xlator_t *this, etcd_session etcd, char *key) gf_log (this->name, GF_LOG_TRACE, "leader is %s\n",nominee); if (strcmp(nominee,priv->brick_uuid) == 0) { - nsr_set_leader(this); + nsr_set_leader(this, etcd); retval = LS_SUCCESS; } else { @@ -316,60 +235,10 @@ nsr_init_re (xlator_t *this) } -uint32_t -nsr_leader_setup_recon (xlator_t *this) -{ - nsr_private_t *priv = this->private; - xlator_t *old = this; - uint32_t ret = 0; - - if (priv->nsr_recon_start == _gf_false) - return 0; - - priv->fs = glfs_new(priv->vol_uuid); - if (!priv->fs) { - ret = 1; - gf_log (this->name, GF_LOG_ERROR, "failed to initialise glfs \n"); - goto done; - } - - glusterfs_this_set(old); - ret = glfs_set_volfile(priv->fs, priv->vol_file); - if (ret != 0) { - gf_log (this->name, GF_LOG_ERROR, "failed to set volfile \n"); - goto done; - } - - glusterfs_this_set(old); - /* - * REVIEW - * Logs belong in /var/log not /tmp. - */ - glfs_set_logging (priv->fs,"/tmp/glfs-log", 7); - if (glfs_init(priv->fs) < 0) { - gf_log (this->name, GF_LOG_ERROR, "failed to init volfile \n"); - ret = 1; - goto done; - } - - glusterfs_this_set(old); - priv->fd = glfs_open (priv->fs, "/", O_RDWR); - if (priv->fd == NULL) { - ret = 1; - gf_log (this->name, GF_LOG_ERROR, - "failed to open fd to communicate with recon process \n"); - goto done; - } - - -done: - glusterfs_this_set(old); - return ret; -} - void * -nsr_leader_thread (xlator_t *this) +nsr_leader_thread (void *arg) { + xlator_t *this = (xlator_t *) arg; leader_retval_t retval; nsr_private_t *priv = this->private; @@ -378,14 +247,6 @@ nsr_leader_thread (xlator_t *this) return NULL; } - if (nsr_leader_setup_recon(this)) { - gf_log (this->name, GF_LOG_ERROR, - "failed to do glfs initialisation inside leader thread"); - return NULL; - } - - priv->leader_inited = 1; - gf_log (this->name, GF_LOG_INFO, "calling glfs_opens_str on servers %s", priv->etcd_servers); @@ -396,6 +257,8 @@ nsr_leader_thread (xlator_t *this) return NULL; } + priv->leader_inited = 1; + for (;;) { if (nsr_get_leader(this,priv->etcd,priv->vol_uuid) == LS_ERROR) { break; diff --git a/xlators/cluster/nsr-server/src/nsr-internal.h b/xlators/cluster/nsr-server/src/nsr-internal.h index 282247a47..4382f5426 100644 --- a/xlators/cluster/nsr-server/src/nsr-internal.h +++ b/xlators/cluster/nsr-server/src/nsr-internal.h @@ -22,6 +22,16 @@ enum { gf_mt_nsr_end }; +typedef enum nsr_recon_notify_ev_id_t { + NSR_RECON_SET_LEADER = 1, + NSR_RECON_ADD_CHILD = 2 +} nsr_recon_notify_ev_id_t; + +typedef struct _nsr_recon_notify_ev_s { + nsr_recon_notify_ev_id_t id; + uint32_t index; // in case of add + struct list_head list; +} nsr_recon_notify_ev_t; typedef struct { char *etcd_servers; @@ -29,21 +39,23 @@ typedef struct { char *term_uuid; char *brick_uuid; gf_boolean_t leader; + uint8_t up_children; uint8_t n_children; char *vol_file; - glfs_t *fs; etcd_session etcd; volatile unsigned int fence_io; - glfs_fd_t *fd; uint32_t current_term; #ifdef NSR_DEBUG uint32_t leader_log_fd; #endif + volatile int recon_notify_inited; volatile int leader_inited; uint32_t kid_state; gf_lock_t dirty_lock; struct list_head dirty_fds; gf_boolean_t nsr_recon_start; + void * recon_ctx; + volatile uint32_t ops_in_flight; } nsr_private_t; typedef struct { @@ -79,3 +91,7 @@ typedef struct { struct list_head pqueue; } nsr_inode_ctx_t; +void nsr_recon_notify_event_set_leader(nsr_private_t *priv); +void nsr_recon_notify_event_add_child(nsr_private_t *priv, uint32_t index); +void* nsr_recon_notify_thread (void *this); + diff --git a/xlators/cluster/nsr-server/src/nsr.c b/xlators/cluster/nsr-server/src/nsr.c index 3707b3003..f85368456 100644 --- a/xlators/cluster/nsr-server/src/nsr.c +++ b/xlators/cluster/nsr-server/src/nsr.c @@ -258,7 +258,6 @@ nsr_getxattr_special (call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name, dict_t *xdata) { dict_t *result; - uint8_t up; nsr_private_t *priv = this->private; if (!priv->leader) { @@ -279,8 +278,8 @@ nsr_getxattr_special (call_frame_t *frame, xlator_t *this, loc_t *loc, goto dn_failed; } - up = nsr_count_up_kids(this->private); - if (dict_set_uint32(result,NSR_REP_COUNT_XATTR,up) != 0) { + priv->up_children = nsr_count_up_kids(this->private); + if (dict_set_uint32(result,NSR_REP_COUNT_XATTR,priv->up_children) != 0) { goto dsu_failed; } @@ -399,6 +398,8 @@ nsr_reconfigure (xlator_t *this, dict_t *options) nsr_private_t *priv = this->private; GF_OPTION_RECONF ("leader", priv->leader, options, bool, err); + gf_log (this->name, GF_LOG_INFO, + "reconfigure called. setting priv->leader to %d\n", priv->leader); return 0; err: @@ -440,20 +441,25 @@ nsr_notify (xlator_t *this, int event, void *data, ...) index = nsr_get_child_index(this,data); if (index >= 0) { priv->kid_state |= (1 << index); + priv->up_children = nsr_count_up_kids(priv); gf_log (this->name, GF_LOG_INFO, "got CHILD_UP for %s, now %u kids", ((xlator_t *)data)->name, - nsr_count_up_kids(priv)); + priv->up_children); + if (priv->nsr_recon_start == _gf_true) { + nsr_recon_notify_event_add_child(priv, index); + } } break; case GF_EVENT_CHILD_DOWN: index = nsr_get_child_index(this,data); if (index >= 0) { priv->kid_state &= ~(1 << index); + priv->up_children = nsr_count_up_kids(priv); gf_log (this->name, GF_LOG_INFO, "got CHILD_DOWN for %s, now %u kids", ((xlator_t *)data)->name, - nsr_count_up_kids(priv)); + priv->up_children); } break; default: @@ -475,7 +481,7 @@ nsr_init (xlator_t *this) xlator_list_t *trav; pthread_t kid; uuid_t tmp_uuid; - char *my_name = NULL, *recon_file = NULL, *recon_pid_file = NULL, *ptr = NULL; + char *my_name = NULL, *morph_name = NULL, *recon_file = NULL, *recon_pid_file = NULL, *ptr = NULL; char *volname; extern xlator_t global_xlator; glusterfs_ctx_t *oldctx = global_xlator.ctx; @@ -552,34 +558,36 @@ nsr_init (xlator_t *this) gf_log (this->name, GF_LOG_ERROR, "vol name not generated. ???"); goto err; } - - recon_file = GF_CALLOC (1,PATH_MAX + strlen(my_name) + strlen("con") +1, gf_mt_nsr_private_t); - recon_pid_file = GF_CALLOC (1,PATH_MAX + strlen(my_name) + strlen("recon") +1, gf_mt_nsr_private_t); + + morph_name = GF_CALLOC (1, strlen(my_name) + 1, gf_mt_nsr_private_t); + strcpy(morph_name, my_name); + recon_file = GF_CALLOC (1,PATH_MAX + strlen(morph_name) + strlen("con") +1, gf_mt_nsr_private_t); + recon_pid_file = GF_CALLOC (1,PATH_MAX + strlen(morph_name) + strlen("recon") +1, gf_mt_nsr_private_t); if ((!recon_file) || (!recon_pid_file)) { gf_log (this->name, GF_LOG_ERROR, "could not allocate reconciliation file name"); goto err; } - ptr = strchr (my_name, '/'); + ptr = strchr (morph_name, '/'); while (ptr) { *ptr = '-'; - ptr = strchr (my_name, '/'); + ptr = strchr (morph_name, '/'); } sprintf(recon_file,"/%s/%s/%s/%s/",GLUSTERD_DEFAULT_WORKDIR, GLUSTERD_VOLUME_DIR_PREFIX, volname, GLUSTERD_BRICK_INFO_DIR); - strcat(recon_file, my_name); + strcat(recon_file, morph_name); strcat(recon_file, "-nsr-recon.vol"); sprintf(recon_pid_file,"/%s/%s/%s/%s/",GLUSTERD_DEFAULT_WORKDIR, GLUSTERD_VOLUME_DIR_PREFIX, volname, "run"); - strcat(recon_pid_file, my_name); + strcat(recon_pid_file, morph_name); strcat(recon_pid_file, "-recon.pid"); - priv->vol_file = GF_CALLOC (1,PATH_MAX + strlen(my_name) + strlen("con") +1, gf_mt_nsr_private_t); + priv->vol_file = GF_CALLOC (1,PATH_MAX + strlen(morph_name) + strlen("con") +1, gf_mt_nsr_private_t); if (!priv->vol_file) { gf_log (this->name, GF_LOG_ERROR, "could not allocate reconciliation file name"); goto err; @@ -590,7 +598,7 @@ nsr_init (xlator_t *this) volname, GLUSTERD_BRICK_INFO_DIR); strcat(priv->vol_file, "con:"); - strcat(priv->vol_file, my_name); + strcat(priv->vol_file, morph_name); if (pthread_create(&kid,NULL,nsr_flush_thread,this) != 0) { gf_log (this->name, GF_LOG_ERROR, @@ -622,10 +630,17 @@ nsr_init (xlator_t *this) } + (void)pthread_create(&kid,NULL,nsr_recon_notify_thread,this); + while (priv->recon_notify_inited == 0) { + sleep(1); + } + (void)pthread_create(&kid,NULL,nsr_leader_thread,this); while (priv->leader_inited == 0) { sleep(1); } + + /* * Calling glfs_new changes old->ctx, even if THIS still points * to global_xlator. That causes problems later in the main diff --git a/xlators/cluster/nsr-server/src/recon_notify.c b/xlators/cluster/nsr-server/src/recon_notify.c new file mode 100644 index 000000000..9cf2fce5d --- /dev/null +++ b/xlators/cluster/nsr-server/src/recon_notify.c @@ -0,0 +1,345 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <string.h> + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "call-stub.h" +#include "defaults.h" +#include "xlator.h" +#include "api/src/glfs.h" +#include "api/src/glfs-internal.h" +#include "etcd-api.h" +#include "nsr-internal.h" +#include "../../nsr-recon/src/recon_driver.h" +#include "../../nsr-recon/src/recon_xlator.h" + + + +typedef struct _nsr_recon_notify_ctx_t { + nsr_recon_notify_ev_t recon_head; + pthread_mutex_t recon_mutex; + pthread_cond_t recon_cv; + char **hosts; // list of hosts ordered depending on child indices + uint32_t current_term; + uint32_t last_reconciled_term; + glfs_t *fs; + glfs_fd_t *fd; +} nsr_recon_notify_ctx_t; + +static int +xlator_get_option (xlator_t *xl, char *key, char **value) +{ + GF_ASSERT (xl); + return dict_get_str (xl->options, key, value); +} + +void nsr_recon_notify_event_set_leader(nsr_private_t *priv) +{ + nsr_recon_notify_ev_t *ev; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + + ev = GF_CALLOC (1, sizeof (nsr_recon_notify_ev_t), 0); + ev->id = NSR_RECON_SET_LEADER; + INIT_LIST_HEAD(&(ev->list)); + pthread_mutex_lock(&ctx->recon_mutex); + list_add_tail(&ev->list, &ctx->recon_head.list); + pthread_cond_signal(&ctx->recon_cv); + pthread_mutex_unlock(&ctx->recon_mutex); +} + +void nsr_recon_notify_event_add_child(nsr_private_t *priv, uint32_t index) +{ + nsr_recon_notify_ev_t *ev; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + + ev = GF_CALLOC (1, sizeof (nsr_recon_notify_ev_t), 0); + ev->id = NSR_RECON_ADD_CHILD; + ev->index = index; + INIT_LIST_HEAD(&(ev->list)); + pthread_mutex_lock(&ctx->recon_mutex); + list_add_tail(&ev->list, &ctx->recon_head.list); + pthread_cond_signal(&ctx->recon_cv); + pthread_mutex_unlock(&ctx->recon_mutex); +} + + +static void +nsr_recon_set_leader (xlator_t *this) +{ + + nsr_private_t *priv = this->private; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + nsr_recon_role_t role; + xlator_t *old = this; + uint32_t i=0; + + if (priv->leader != _gf_true) + return; + + if (ctx->last_reconciled_term == priv->current_term) + return; + + // No majority as of yet + if (priv->up_children <= (priv->n_children / 2)) + return; + + gf_log (this->name, GF_LOG_INFO, + "Sending message to do recon with %d nodes\n", + priv->up_children); + + role.num = 0; + role.role = leader; + for (i = 0; i < priv->n_children; ++i) { + if (priv->kid_state & (1 << i)) { + gf_log (this->name, GF_LOG_INFO, + "Recon using host %s", + ctx->hosts[i]); + strcpy(role.info[role.num].name, ctx->hosts[i]); + (role.num)++; + } + } + + gf_log (this->name, GF_LOG_INFO, + "setting current term as %d", priv->current_term); + role.current_term = priv->current_term; + ENDIAN_CONVERSION_RR(role, _gf_false); + + // inform the reconciliator that this is leader + // in the callback (once reconciliation is done), + // we will unfence the IOs. + // TBD - error handling later. + glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET); + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_INFO, + "Writing to local node to set leader"); + glfs_write(ctx->fd, &role, + sizeof(role), 0); + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_INFO, + "glfs_write returned. unfencing IO\n"); + + // TBD - error handling + + ctx->last_reconciled_term = priv->current_term; + atomic_fetch_and(&(priv->fence_io), 0); + + return; +} + +static void +nsr_recon_add_child (xlator_t *this, uint32_t index) +{ + nsr_private_t *priv = this->private; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + nsr_recon_role_t role; + xlator_t *old = this; + + if (priv->leader != _gf_true) + return; + + // reconciliation still pending. + // Check if we have majority + if (ctx->last_reconciled_term != priv->current_term) { + nsr_recon_set_leader(this); + } else { + // Reconciliation done. + // new child joining the majority/ + // Do reconciliation only fot this child but after fencing new IO and draining old IO + role.num = 1; + role.role = joiner; + + atomic_fetch_or(&(priv->fence_io), 1); + while(priv->ops_in_flight) { + sleep(1); + } + + strcpy(role.info[0].name, ctx->hosts[index]); + role.current_term = priv->current_term; + ENDIAN_CONVERSION_RR(role, _gf_false); + glfs_lseek(ctx->fd, nsr_recon_xlator_sector_1, SEEK_SET); + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_INFO, + "Writing to local node to join %s\n", role.info[0].name); + glfs_write(ctx->fd, &role, + sizeof(role), 0); + glusterfs_this_set(old); + gf_log (this->name, GF_LOG_INFO, + "Write to local node to set joiner returned\n"); + + // TBD - error handling + atomic_fetch_and(&(priv->fence_io), 0); + } + + return; +} + +static uint32_t +nsr_setup_recon (xlator_t *this) +{ + nsr_private_t *priv = this->private; + xlator_t *old = this; + uint32_t ret = 0; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + + if (priv->nsr_recon_start == _gf_false) + return 0; + + ctx->fs = glfs_new(priv->vol_uuid); + if (!ctx->fs) { + ret = 1; + gf_log (this->name, GF_LOG_ERROR, "failed to initialise glfs \n"); + goto done; + } + + glusterfs_this_set(old); + ret = glfs_set_volfile(ctx->fs, priv->vol_file); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, "failed to set volfile \n"); + goto done; + } + + glusterfs_this_set(old); + /* + * REVIEW + * Logs belong in /var/log not /tmp. + */ + glfs_set_logging (ctx->fs,"/tmp/glfs-log", 7); + if (glfs_init(ctx->fs) < 0) { + gf_log (this->name, GF_LOG_ERROR, "failed to init volfile \n"); + ret = 1; + goto done; + } + + glusterfs_this_set(old); + ctx->fd = glfs_open (ctx->fs, "/", O_RDWR); + if (ctx->fd == NULL) { + ret = 1; + gf_log (this->name, GF_LOG_ERROR, + "failed to open fd to communicate with recon process \n"); + goto done; + } + + +done: + glusterfs_this_set(old); + return ret; +} + + +static void +nsr_setup_hosts(xlator_t *this) +{ + xlator_list_t *trav; + nsr_private_t *priv = this->private; + uint32_t i = 0; + nsr_recon_notify_ctx_t *ctx = (nsr_recon_notify_ctx_t *)priv->recon_ctx; + + ctx->hosts = GF_CALLOC(sizeof(char *), priv->n_children, gf_mt_nsr_private_t); + // Iterate thru all the children + for (trav = this->children; trav; trav = trav->next) { + char *hostname = NULL, *vol = NULL; + int ret1 = 0, ret2 = 0, ret = 0; + xlator_t *xl = trav->xlator; + // If the child type is that of protocol/client + if (!strcmp(trav->xlator->type, "protocol/client")) { + ret1 = xlator_get_option (xl, "remote-host", &hostname); + ret2 = xlator_get_option (xl, "remote-subvolume", &vol); + if (!ret1 && !ret2) { + // add the name of that host to the hosts + ctx->hosts[i] = GF_CALLOC(sizeof(char), strlen(hostname) + strlen(vol) + 2, 0); + strcpy(ctx->hosts[i], hostname); + strcat(ctx->hosts[i], ":"); + strcat(ctx->hosts[i], vol); + gf_log (this->name, GF_LOG_INFO, + "adding hosts %s to recon notfiy list", ctx->hosts[i]); + } else { + gf_log (this->name, GF_LOG_ERROR, + "CANNOT FIND HOSTNAME FOR A CHILD"); + GF_ASSERT(0); + } + // local brick + } else { + ret = xlator_get_option (this, "my-name", &hostname); + if (!ret) { + uint32_t len = strlen(hostname); + ctx->hosts[i] = GF_CALLOC(sizeof(char), + len+1, + gf_mt_nsr_private_t); + strcpy(ctx->hosts[i], hostname); + gf_log (this->name, GF_LOG_INFO, + "adding my host %s to recon notfiy list", ctx->hosts[i]); + } else { + gf_log (this->name, GF_LOG_ERROR, + "CANNOT FIND MY HOSTNAME"); + GF_ASSERT(0); + } + } + i++; + } +} + +void * +nsr_recon_notify_thread (void *arg) +{ + xlator_t *this = (xlator_t *)arg; + nsr_private_t *priv = this->private; + nsr_recon_notify_ev_t *ev; + nsr_recon_notify_ctx_t *ctx; + + priv->recon_ctx = GF_CALLOC(1, sizeof(nsr_recon_notify_ctx_t), gf_mt_nsr_private_t); + if (!priv->recon_ctx) { + gf_log (this->name, GF_LOG_ERROR, "calloc error"); + return NULL; + } + ctx = priv->recon_ctx; + + pthread_mutex_init(&(ctx->recon_mutex), NULL); + pthread_cond_init(&(ctx->recon_cv), NULL); + INIT_LIST_HEAD(&(ctx->recon_head.list)); + + nsr_setup_hosts(this); + + if (nsr_setup_recon(this)) { + gf_log (this->name, GF_LOG_ERROR, "recon notify thread : initing glfs error"); + return NULL; + } + + priv->recon_notify_inited = 1; + + while(1) { + pthread_mutex_lock(&ctx->recon_mutex); + while (list_empty(&(ctx->recon_head.list))) { + pthread_cond_wait(&ctx->recon_cv, &ctx->recon_mutex); + } + pthread_mutex_unlock(&ctx->recon_mutex); + + list_for_each_entry(ev, &(ctx->recon_head.list), list) { + + if (ev->id == NSR_RECON_SET_LEADER) { + gf_log (this->name, GF_LOG_INFO, + "got add leader notfiy event"); + nsr_recon_set_leader(this); + } else if (ev->id == NSR_RECON_ADD_CHILD) { + gf_log (this->name, GF_LOG_INFO, + "got add child notify event"); + nsr_recon_add_child(this, ev->index); + } + } + list_del_init (&ev->list); + } + + return NULL; +} + |