diff options
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 78 | 
1 files changed, 53 insertions, 25 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index b0262ee30a8..8ed6f832618 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -1,3 +1,13 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# +  import os  import sys  import time @@ -9,12 +19,16 @@ from subprocess import PIPE  from resource import Popen, FILE, GLUSTER, SSH  from threading import Lock  from gconf import gconf -from syncdutils import update_file, select, waitpid, set_term_handler, is_host_local, GsyncdError +from syncdutils import update_file, select, waitpid +from syncdutils import set_term_handler, is_host_local, GsyncdError  from syncdutils import escape, Thread, finalize, memoize +  class Volinfo(object): +      def __init__(self, vol, host='localhost', prelude=[]): -        po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, 'volume', 'info', vol], +        po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, +                              'volume', 'info', vol],                     stdout=PIPE, stderr=PIPE)          vix = po.stdout.read()          po.wait() @@ -25,7 +39,8 @@ class Volinfo(object):                  via = '(via %s) ' % prelude.join(' ')              else:                  via = ' ' -            raise GsyncdError('getting volume info of %s%s failed with errorcode %s', +            raise GsyncdError('getting volume info of %s%s ' +                              'failed with errorcode %s',                                (vol, via, vi.find('opErrno').text))          self.tree = vi          self.volume = vol @@ -40,25 +55,27 @@ class Volinfo(object):          def bparse(b):              host, dirp = b.text.split(':', 2)              return {'host': host, 'dir': dirp} -        return [ bparse(b) for b in self.get('brick') ] +        return [bparse(b) for b in self.get('brick')]      @property      @memoize      def uuid(self):          ids = self.get('id')          if len(ids) != 1: -            raise GsyncdError("volume info of %s obtained from %s: ambiguous uuid", +            raise GsyncdError("volume info of %s obtained from %s: " +                              "ambiguous uuid",                                self.volume, self.host)          return ids[0].text  class Monitor(object): +      """class which spawns and manages gsyncd workers""" -    ST_INIT     = 'Initializing...' -    ST_STABLE   = 'Stable' -    ST_FAULTY   = 'faulty' -    ST_INCON    = 'inconsistent' +    ST_INIT = 'Initializing...' +    ST_STABLE = 'Stable' +    ST_FAULTY = 'faulty' +    ST_INCON = 'inconsistent'      _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]      def __init__(self): @@ -68,7 +85,8 @@ class Monitor(object):      def set_state(self, state, w=None):          """set the state that can be used by external agents             like glusterd for status reporting""" -        computestate = lambda: self.state and self._ST_ORD[max(self._ST_ORD.index(s) for s in self.state.values())] +        computestate = lambda: self.state and self._ST_ORD[ +            max(self._ST_ORD.index(s) for s in self.state.values())]          if w:              self.lock.acquire()              old_state = computestate() @@ -112,14 +130,17 @@ class Monitor(object):          self.set_state(self.ST_INIT, w)          ret = 0 +          def nwait(p, o=0):              p2, r = waitpid(p, o)              if not p2:                  return              return r +          def exit_signalled(s):              """ child teminated due to receipt of SIGUSR1 """              return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1)) +          def exit_status(s):              if os.WIFEXITED(s):                  return os.WEXITSTATUS(s) @@ -134,7 +155,8 @@ class Monitor(object):                  os.close(pr)                  os.execv(sys.executable, argv + ['--feedback-fd', str(pw),                                                   '--local-path', w[0], -                                                 '--local-id', '.' + escape(w[0]), +                                                 '--local-id', +                                                 '.' + escape(w[0]),                                                   '--resource-remote', w[1]])              self.lock.acquire()              cpids.add(cpid) @@ -145,31 +167,31 @@ class Monitor(object):              os.close(pr)              if so:                  ret = nwait(cpid, os.WNOHANG) -                if ret != None: -                    logging.info("worker(%s) died before establishing " \ +                if ret is not None: +                    logging.info("worker(%s) died before establishing "                                   "connection" % w[0])                  else:                      logging.debug("worker(%s) connected" % w[0])                      while time.time() < t0 + conn_timeout:                          ret = nwait(cpid, os.WNOHANG) -                        if ret != None: -                            logging.info("worker(%s) died in startup " \ +                        if ret is not None: +                            logging.info("worker(%s) died in startup "                                           "phase" % w[0])                              break                          time.sleep(1)              else: -                logging.info("worker(%s) not confirmed in %d sec, " \ +                logging.info("worker(%s) not confirmed in %d sec, "                               "aborting it" % (w[0], conn_timeout))                  os.kill(cpid, signal.SIGKILL)                  ret = nwait(cpid) -            if ret == None: +            if ret is None:                  self.set_state(self.ST_STABLE, w)                  ret = nwait(cpid)              if exit_signalled(ret):                  ret = 0              else:                  ret = exit_status(ret) -                if ret in (0,1): +                if ret in (0, 1):                      self.set_state(self.ST_FAULTY, w)              time.sleep(10)          self.set_state(self.ST_INCON, w) @@ -194,17 +216,18 @@ class Monitor(object):                      os.kill(cpid, signal.SIGKILL)                  self.lock.release()                  finalize(exval=1) -            t = Thread(target = wmon, args=[wx]) +            t = Thread(target=wmon, args=[wx])              t.start()              ta.append(t)          for t in ta:              t.join() +  def distribute(*resources):      master, slave = resources      mvol = Volinfo(master.volume, master.host)      logging.debug('master bricks: ' + repr(mvol.bricks)) -    prelude  = [] +    prelude = []      si = slave      if isinstance(slave, SSH):          prelude = gconf.ssh_command.split() + [slave.remote_addr] @@ -221,23 +244,28 @@ def distribute(*resources):          raise GsyncdError("unkown slave type " + slave.url)      logging.info('slave bricks: ' + repr(sbricks))      if isinstance(si, FILE): -        slaves = [ slave.url ] +        slaves = [slave.url]      else:          slavenodes = set(b['host'] for b in sbricks)          if isinstance(slave, SSH) and not gconf.isolated_slave:              rap = SSH.parse_ssh_address(slave) -            slaves = [ 'ssh://' + rap['user'] + '@' + h + ':' + si.url for h in slavenodes ] +            slaves = ['ssh://' + rap['user'] + '@' + h + ':' + si.url +                      for h in slavenodes]          else: -            slavevols = [ h + ':' + si.volume for h in slavenodes ] +            slavevols = [h + ':' + si.volume for h in slavenodes]              if isinstance(slave, SSH): -                slaves = [ 'ssh://' + rap.remote_addr + ':' + v for v in slavevols ] +                slaves = ['ssh://' + rap.remote_addr + ':' + v +                          for v in slavevols]              else:                  slaves = slavevols -    workerspex = [ (brick['dir'], slaves[idx % len(slaves)]) for idx, brick in enumerate(mvol.bricks) if is_host_local(brick['host']) ] +    workerspex = [(brick['dir'], slaves[idx % len(slaves)]) +                  for idx, brick in enumerate(mvol.bricks) +                  if is_host_local(brick['host'])]      logging.info('worker specs: ' + repr(workerspex))      return workerspex, suuid +  def monitor(*resources):      """oh yeah, actually Monitor is used as singleton, too"""      return Monitor().multiplex(*distribute(*resources))  | 
