From 2d92565e60485c81bbe6b3ebde60aeb623eda36a Mon Sep 17 00:00:00 2001 From: Aravinda VK Date: Fri, 10 Oct 2014 16:13:25 +0530 Subject: geo-rep: Failover when a Slave node goes down When a slave node goes down, worker in master node can connect to different slave node and resume the operation. Existing georep was not checking the status of slave node before worker restart. With this patch, geo-rep worker will check the node status using `gluster volume status` when it goes faulty. BUG: 1151412 Change-Id: If3ab7fdcf47f5b3f3ba383c515703c5f1f9dd668 Signed-off-by: Aravinda VK Reviewed-on: http://review.gluster.org/8921 Reviewed-by: Kotresh HR Tested-by: Gluster Build System Reviewed-by: Venky Shankar Tested-by: Venky Shankar --- geo-replication/syncdaemon/monitor.py | 69 ++++++++++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 5 deletions(-) (limited to 'geo-replication/syncdaemon') diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index dbe9c0b0d40..3e0360332bd 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -18,12 +18,44 @@ import xml.etree.ElementTree as XET from subprocess import PIPE from resource import Popen, FILE, GLUSTER, SSH from threading import Lock +import re +import random from gconf import gconf from syncdutils import update_file, select, waitpid from syncdutils import set_term_handler, is_host_local, GsyncdError from syncdutils import escape, Thread, finalize, memoize +ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError + + +def get_slave_bricks_status(host, vol): + po = Popen(['gluster', '--xml', '--remote-host=' + host, + 'volume', 'status', vol, "detail"], + stdout=PIPE, stderr=PIPE) + vix = po.stdout.read() + po.wait() + po.terminate_geterr() + vi = XET.fromstring(vix) + if vi.find('opRet').text != '0': + logging.info("Unable to get list of up nodes of %s, " + "returning empty list: %s" % + (vol, vi.find('opErrstr').text)) + return [] + + up_hosts = set() + + try: + for el in vi.findall('volStatus/volumes/volume/node'): + if el.find('status').text == '1': + up_hosts.add(el.find('hostname').text) + except (ParseError, AttributeError, ValueError) as e: + logging.info("Parsing failed to get list of up nodes of %s, " + "returning empty list: %s" % (vol, e)) + + return list(up_hosts) + + class Volinfo(object): def __init__(self, vol, host='localhost', prelude=[]): @@ -117,7 +149,7 @@ class Monitor(object): # give a chance to graceful exit os.kill(-os.getpid(), signal.SIGTERM) - def monitor(self, w, argv, cpids, agents): + def monitor(self, w, argv, cpids, agents, slave_vol, slave_host): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -155,8 +187,28 @@ class Monitor(object): if os.WIFEXITED(s): return os.WEXITSTATUS(s) return 1 + conn_timeout = int(gconf.connection_timeout) while ret in (0, 1): + remote_host = w[1] + # Check the status of the connected slave node + # If the connected slave node is down then try to connect to + # different up node. + m = re.match("(ssh|gluster|file):\/\/(.+)@([^:]+):(.+)", + remote_host) + if m: + current_slave_host = m.group(3) + slave_up_hosts = get_slave_bricks_status( + slave_host, slave_vol) + + if current_slave_host not in slave_up_hosts: + if len(slave_up_hosts) > 0: + remote_host = "%s://%s@%s:%s" % (m.group(1), + m.group(2), + random.choice( + slave_up_hosts), + m.group(4)) + # Spawn the worker and agent in lock to avoid fd leak self.lock.acquire() @@ -190,7 +242,8 @@ class Monitor(object): '--rpc-fd', ','.join([str(rw), str(ww), str(ra), str(wa)]), - '--resource-remote', w[1]]) + '--resource-remote', + remote_host]) cpids.add(cpid) agents.add(apid) @@ -245,7 +298,7 @@ class Monitor(object): self.set_state(self.ST_INCON, w) return ret - def multiplex(self, wspx, suuid): + def multiplex(self, wspx, suuid, slave_vol, slave_host): argv = sys.argv[:] for o in ('-N', '--no-daemon', '--monitor'): while o in argv: @@ -258,7 +311,8 @@ class Monitor(object): ta = [] for wx in wspx: def wmon(w): - cpid, _ = self.monitor(w, argv, cpids, agents) + cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, + slave_host) time.sleep(1) self.lock.acquire() for cpid in cpids: @@ -280,6 +334,9 @@ def distribute(*resources): logging.debug('master bricks: ' + repr(mvol.bricks)) prelude = [] si = slave + slave_host = None + slave_vol = None + if isinstance(slave, SSH): prelude = gconf.ssh_command.split() + [slave.remote_addr] si = slave.inner_rsc @@ -291,6 +348,8 @@ def distribute(*resources): svol = Volinfo(si.volume, slave.remote_addr.split('@')[-1]) sbricks = svol.bricks suuid = svol.uuid + slave_host = slave.remote_addr.split('@')[-1] + slave_vol = si.volume else: raise GsyncdError("unkown slave type " + slave.url) logging.info('slave bricks: ' + repr(sbricks)) @@ -314,7 +373,7 @@ def distribute(*resources): for idx, brick in enumerate(mvol.bricks) if is_host_local(brick['host'])] logging.info('worker specs: ' + repr(workerspex)) - return workerspex, suuid + return workerspex, suuid, slave_vol, slave_host def monitor(*resources): -- cgit