diff options
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 69 | 
1 files changed, 64 insertions, 5 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index dbe9c0b0d40..3e0360332bd 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -18,12 +18,44 @@ import xml.etree.ElementTree as XET  from subprocess import PIPE  from resource import Popen, FILE, GLUSTER, SSH  from threading import Lock +import re +import random  from gconf import gconf  from syncdutils import update_file, select, waitpid  from syncdutils import set_term_handler, is_host_local, GsyncdError  from syncdutils import escape, Thread, finalize, memoize +ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError + + +def get_slave_bricks_status(host, vol): +    po = Popen(['gluster', '--xml', '--remote-host=' + host, +                'volume', 'status', vol, "detail"], +               stdout=PIPE, stderr=PIPE) +    vix = po.stdout.read() +    po.wait() +    po.terminate_geterr() +    vi = XET.fromstring(vix) +    if vi.find('opRet').text != '0': +        logging.info("Unable to get list of up nodes of %s, " +                     "returning empty list: %s" % +                     (vol, vi.find('opErrstr').text)) +        return [] + +    up_hosts = set() + +    try: +        for el in vi.findall('volStatus/volumes/volume/node'): +            if el.find('status').text == '1': +                up_hosts.add(el.find('hostname').text) +    except (ParseError, AttributeError, ValueError) as e: +        logging.info("Parsing failed to get list of up nodes of %s, " +                     "returning empty list: %s" % (vol, e)) + +    return list(up_hosts) + +  class Volinfo(object):      def __init__(self, vol, host='localhost', prelude=[]): @@ -117,7 +149,7 @@ class Monitor(object):          # give a chance to graceful exit          os.kill(-os.getpid(), signal.SIGTERM) -    def monitor(self, w, argv, cpids, agents): +    def monitor(self, w, argv, cpids, agents, slave_vol, slave_host):          """the monitor loop          Basic logic is a blantantly simple blunt heuristics: @@ -155,8 +187,28 @@ class Monitor(object):              if os.WIFEXITED(s):                  return os.WEXITSTATUS(s)              return 1 +          conn_timeout = int(gconf.connection_timeout)          while ret in (0, 1): +            remote_host = w[1] +            # Check the status of the connected slave node +            # If the connected slave node is down then try to connect to +            # different up node. +            m = re.match("(ssh|gluster|file):\/\/(.+)@([^:]+):(.+)", +                         remote_host) +            if m: +                current_slave_host = m.group(3) +                slave_up_hosts = get_slave_bricks_status( +                    slave_host, slave_vol) + +                if current_slave_host not in slave_up_hosts: +                    if len(slave_up_hosts) > 0: +                        remote_host = "%s://%s@%s:%s" % (m.group(1), +                                                         m.group(2), +                                                         random.choice( +                                                             slave_up_hosts), +                                                         m.group(4)) +              # Spawn the worker and agent in lock to avoid fd leak              self.lock.acquire() @@ -190,7 +242,8 @@ class Monitor(object):                                                   '--rpc-fd',                                                   ','.join([str(rw), str(ww),                                                             str(ra), str(wa)]), -                                                 '--resource-remote', w[1]]) +                                                 '--resource-remote', +                                                 remote_host])              cpids.add(cpid)              agents.add(apid) @@ -245,7 +298,7 @@ class Monitor(object):          self.set_state(self.ST_INCON, w)          return ret -    def multiplex(self, wspx, suuid): +    def multiplex(self, wspx, suuid, slave_vol, slave_host):          argv = sys.argv[:]          for o in ('-N', '--no-daemon', '--monitor'):              while o in argv: @@ -258,7 +311,8 @@ class Monitor(object):          ta = []          for wx in wspx:              def wmon(w): -                cpid, _ = self.monitor(w, argv, cpids, agents) +                cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, +                                       slave_host)                  time.sleep(1)                  self.lock.acquire()                  for cpid in cpids: @@ -280,6 +334,9 @@ def distribute(*resources):      logging.debug('master bricks: ' + repr(mvol.bricks))      prelude = []      si = slave +    slave_host = None +    slave_vol = None +      if isinstance(slave, SSH):          prelude = gconf.ssh_command.split() + [slave.remote_addr]          si = slave.inner_rsc @@ -291,6 +348,8 @@ def distribute(*resources):          svol = Volinfo(si.volume, slave.remote_addr.split('@')[-1])          sbricks = svol.bricks          suuid = svol.uuid +        slave_host = slave.remote_addr.split('@')[-1] +        slave_vol = si.volume      else:          raise GsyncdError("unkown slave type " + slave.url)      logging.info('slave bricks: ' + repr(sbricks)) @@ -314,7 +373,7 @@ def distribute(*resources):                    for idx, brick in enumerate(mvol.bricks)                    if is_host_local(brick['host'])]      logging.info('worker specs: ' + repr(workerspex)) -    return workerspex, suuid +    return workerspex, suuid, slave_vol, slave_host  def monitor(*resources):  | 
