diff options
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 68 | 
1 files changed, 18 insertions, 50 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index a193b57caff..257d34a743b 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -14,7 +14,6 @@ import time  import signal  import logging  import xml.etree.ElementTree as XET -from subprocess import PIPE  from threading import Lock  from errno import ECHILD, ESRCH  import random @@ -23,9 +22,9 @@ from resource import SSH  import gsyncdconfig as gconf  from rconf import rconf  from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile -from syncdutils import set_term_handler, is_host_local, GsyncdError -from syncdutils import Thread, finalize, Popen, Volinfo -from syncdutils import gf_event, EVENT_GEOREP_FAULTY +from syncdutils import set_term_handler, GsyncdError +from syncdutils import Thread, finalize, Volinfo, VolinfoFromGconf +from syncdutils import gf_event, EVENT_GEOREP_FAULTY, get_up_nodes  from gsyncdstatus import GeorepStatus, set_monitor_status @@ -54,43 +53,6 @@ def get_subvol_num(brick_idx, vol, hot):          return str(cnt) -def get_slave_bricks_status(host, vol): -    po = Popen(['gluster', '--xml', '--remote-host=' + host, -                'volume', 'status', vol, "detail"], -               stdout=PIPE, stderr=PIPE) -    vix = po.stdout.read() -    po.wait() -    po.terminate_geterr(fail_on_err=False) -    if po.returncode != 0: -        logging.info(lf("Volume status command failed, unable to get " -                        "list of up nodes, returning empty list", -                        volume=vol, -                        error=po.returncode)) -        return [] -    vi = XET.fromstring(vix) -    if vi.find('opRet').text != '0': -        logging.info(lf("Unable to get list of up nodes, " -                        "returning empty list", -                        volume=vol, -                        error=vi.find('opErrstr').text)) -        return [] - -    up_hosts = set() - -    try: -        for el in vi.findall('volStatus/volumes/volume/node'): -            if el.find('status').text == '1': -                up_hosts.add((el.find('hostname').text, -                              el.find('peerid').text)) -    except (ParseError, AttributeError, ValueError) as e: -        logging.info(lf("Parsing failed to get list of up nodes, " -                        "returning empty list", -                        volume=vol, -                        error=e)) - -    return list(up_hosts) - -  class Monitor(object):      """class which spawns and manages gsyncd workers""" @@ -116,7 +78,7 @@ class Monitor(object):          errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH])      def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master, -                suuid): +                suuid, slavenodes):          """the monitor loop          Basic logic is a blantantly simple blunt heuristics: @@ -180,8 +142,7 @@ class Monitor(object):              # If the connected slave node is down then try to connect to              # different up node.              current_slave_host = remote_host -            slave_up_hosts = get_slave_bricks_status( -                slave_host, slave_vol) +            slave_up_hosts = get_up_nodes(slavenodes, gconf.get("ssh-port"))              if (current_slave_host, remote_id) not in slave_up_hosts:                  if len(slave_up_hosts) > 0: @@ -354,7 +315,7 @@ class Monitor(object):          self.status[w[0]['dir']].set_worker_status(self.ST_INCON)          return ret -    def multiplex(self, wspx, suuid, slave_vol, slave_host, master): +    def multiplex(self, wspx, suuid, slave_vol, slave_host, master, slavenodes):          argv = [os.path.basename(sys.executable), sys.argv[0]]          cpids = set() @@ -363,7 +324,7 @@ class Monitor(object):          for wx in wspx:              def wmon(w):                  cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, -                                       slave_host, master, suuid) +                                       slave_host, master, suuid, slavenodes)                  time.sleep(1)                  self.lock.acquire()                  for cpid in cpids: @@ -380,7 +341,10 @@ class Monitor(object):  def distribute(master, slave): -    mvol = Volinfo(master.volume, master.host) +    if rconf.args.use_gconf_volinfo: +        mvol = VolinfoFromGconf(master.volume, master=True) +    else: +        mvol = Volinfo(master.volume, master.host)      logging.debug('master bricks: ' + repr(mvol.bricks))      prelude = []      slave_host = None @@ -393,7 +357,11 @@ def distribute(master, slave):      logging.debug('slave SSH gateway: ' + slave.remote_addr) -    svol = Volinfo(slave.volume, "localhost", prelude) +    if rconf.args.use_gconf_volinfo: +        svol = VolinfoFromGconf(slave.volume, master=False) +    else: +        svol = Volinfo(slave.volume, "localhost", prelude) +      sbricks = svol.bricks      suuid = svol.uuid      slave_host = slave.remote_addr.split('@')[-1] @@ -415,14 +383,14 @@ def distribute(master, slave):      workerspex = []      for idx, brick in enumerate(mvol.bricks): -        if is_host_local(brick['uuid']): +        if rconf.args.local_node_id == brick['uuid']:              is_hot = mvol.is_hot(":".join([brick['host'], brick['dir']]))              workerspex.append((brick,                                 slaves[idx % len(slaves)],                                 get_subvol_num(idx, mvol, is_hot),                                 is_hot))      logging.debug('worker specs: ' + repr(workerspex)) -    return workerspex, suuid, slave_vol, slave_host, master +    return workerspex, suuid, slave_vol, slave_host, master, slavenodes  def monitor(local, remote):  | 
