diff options
Diffstat (limited to 'geo-replication/syncdaemon')
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 69 |
1 files changed, 64 insertions, 5 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index dbe9c0b0d40..3e0360332bd 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -18,12 +18,44 @@ import xml.etree.ElementTree as XET from subprocess import PIPE from resource import Popen, FILE, GLUSTER, SSH from threading import Lock +import re +import random from gconf import gconf from syncdutils import update_file, select, waitpid from syncdutils import set_term_handler, is_host_local, GsyncdError from syncdutils import escape, Thread, finalize, memoize +ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError + + +def get_slave_bricks_status(host, vol): + po = Popen(['gluster', '--xml', '--remote-host=' + host, + 'volume', 'status', vol, "detail"], + stdout=PIPE, stderr=PIPE) + vix = po.stdout.read() + po.wait() + po.terminate_geterr() + vi = XET.fromstring(vix) + if vi.find('opRet').text != '0': + logging.info("Unable to get list of up nodes of %s, " + "returning empty list: %s" % + (vol, vi.find('opErrstr').text)) + return [] + + up_hosts = set() + + try: + for el in vi.findall('volStatus/volumes/volume/node'): + if el.find('status').text == '1': + up_hosts.add(el.find('hostname').text) + except (ParseError, AttributeError, ValueError) as e: + logging.info("Parsing failed to get list of up nodes of %s, " + "returning empty list: %s" % (vol, e)) + + return list(up_hosts) + + class Volinfo(object): def __init__(self, vol, host='localhost', prelude=[]): @@ -117,7 +149,7 @@ class Monitor(object): # give a chance to graceful exit os.kill(-os.getpid(), signal.SIGTERM) - def monitor(self, w, argv, cpids, agents): + def monitor(self, w, argv, cpids, agents, slave_vol, slave_host): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -155,8 +187,28 @@ class Monitor(object): if os.WIFEXITED(s): return os.WEXITSTATUS(s) return 1 + conn_timeout = int(gconf.connection_timeout) while ret in (0, 1): + remote_host = w[1] + # Check the status of the connected slave node + # If the connected slave node is down then try to connect to + # different up node. + m = re.match("(ssh|gluster|file):\/\/(.+)@([^:]+):(.+)", + remote_host) + if m: + current_slave_host = m.group(3) + slave_up_hosts = get_slave_bricks_status( + slave_host, slave_vol) + + if current_slave_host not in slave_up_hosts: + if len(slave_up_hosts) > 0: + remote_host = "%s://%s@%s:%s" % (m.group(1), + m.group(2), + random.choice( + slave_up_hosts), + m.group(4)) + # Spawn the worker and agent in lock to avoid fd leak self.lock.acquire() @@ -190,7 +242,8 @@ class Monitor(object): '--rpc-fd', ','.join([str(rw), str(ww), str(ra), str(wa)]), - '--resource-remote', w[1]]) + '--resource-remote', + remote_host]) cpids.add(cpid) agents.add(apid) @@ -245,7 +298,7 @@ class Monitor(object): self.set_state(self.ST_INCON, w) return ret - def multiplex(self, wspx, suuid): + def multiplex(self, wspx, suuid, slave_vol, slave_host): argv = sys.argv[:] for o in ('-N', '--no-daemon', '--monitor'): while o in argv: @@ -258,7 +311,8 @@ class Monitor(object): ta = [] for wx in wspx: def wmon(w): - cpid, _ = self.monitor(w, argv, cpids, agents) + cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, + slave_host) time.sleep(1) self.lock.acquire() for cpid in cpids: @@ -280,6 +334,9 @@ def distribute(*resources): logging.debug('master bricks: ' + repr(mvol.bricks)) prelude = [] si = slave + slave_host = None + slave_vol = None + if isinstance(slave, SSH): prelude = gconf.ssh_command.split() + [slave.remote_addr] si = slave.inner_rsc @@ -291,6 +348,8 @@ def distribute(*resources): svol = Volinfo(si.volume, slave.remote_addr.split('@')[-1]) sbricks = svol.bricks suuid = svol.uuid + slave_host = slave.remote_addr.split('@')[-1] + slave_vol = si.volume else: raise GsyncdError("unkown slave type " + slave.url) logging.info('slave bricks: ' + repr(sbricks)) @@ -314,7 +373,7 @@ def distribute(*resources): for idx, brick in enumerate(mvol.bricks) if is_host_local(brick['host'])] logging.info('worker specs: ' + repr(workerspex)) - return workerspex, suuid + return workerspex, suuid, slave_vol, slave_host def monitor(*resources): |