diff options
author | Avra Sengupta <asengupt@redhat.com> | 2013-06-01 16:17:57 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2013-07-26 13:18:57 -0700 |
commit | b13c483dca20e4015b958f8959328e665a357f60 (patch) | |
tree | 2af62fc50bae39e930fcbe09101d3e51c76eb6fc /geo-replication/syncdaemon/monitor.py | |
parent | 4944fc943efc41df1841e4e559180171f6541112 (diff) |
gsyncd: distribute the crawling load
* also consume changelog for change detection.
* Status fixes
* Use new libgfchangelog done API
* process (and sync) one changelog at a time
Change-Id: I24891615bb762e0741b1819ddfdef8802326cb16
BUG: 847839
Original Author: Csaba Henk <csaba@redhat.com>
Original Author: Aravinda VK <avishwan@redhat.com>
Original Author: Venky Shankar <vshankar@redhat.com>
Original Author: Amar Tumballi <amarts@redhat.com>
Original Author: Avra Sengupta <asengupt@redhat.com>
Signed-off-by: Avra Sengupta <asengupt@redhat.com>
Reviewed-on: http://review.gluster.org/5131
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Tested-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 216 |
1 files changed, 174 insertions, 42 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index b8956dcc2b9..badd0d9c5f8 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -3,26 +3,94 @@ import sys import time import signal import logging +import uuid +import xml.etree.ElementTree as XET +from subprocess import PIPE +from resource import Popen, FILE, GLUSTER, SSH +from threading import Lock from gconf import gconf -from syncdutils import update_file, select, waitpid, set_term_handler +from syncdutils import update_file, select, waitpid, set_term_handler, is_host_local, GsyncdError +from syncdutils import escape, Thread, finalize, memoize + +class Volinfo(object): + def __init__(self, vol, host='localhost', prelude=[]): + po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, 'volume', 'info', vol], + stdout=PIPE, stderr=PIPE) + vix = po.stdout.read() + po.wait() + po.terminate_geterr() + vi = XET.fromstring(vix) + if vi.find('opRet').text != '0': + if prelude: + via = '(via %s) ' % prelude.join(' ') + else: + via = ' ' + raise GsyncdError('getting volume info of %s%s failed with errorcode %s', + (vol, via, vi.find('opErrno').text)) + self.tree = vi + self.volume = vol + self.host = host + + def get(self, elem): + return self.tree.findall('.//' + elem) + + @property + @memoize + def bricks(self): + def bparse(b): + host, dirp = b.text.split(':', 2) + return {'host': host, 'dir': dirp} + return [ bparse(b) for b in self.get('brick') ] + + @property + @memoize + def uuid(self): + ids = self.get('id') + if len(ids) != 1: + raise GsyncdError("volume info of %s obtained from %s: ambiguous uuid", + self.volume, self.host) + return ids[0].text + class Monitor(object): """class which spawns and manages gsyncd workers""" + ST_INIT = 'Initializing...' + ST_STABLE = 'Stable' + ST_FAULTY = 'faulty' + ST_INCON = 'inconsistent' + _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON] + def __init__(self): - self.state = None + self.lock = Lock() + self.state = {} - def set_state(self, state): + def set_state(self, state, w=None): """set the state that can be used by external agents like glusterd for status reporting""" - if state == self.state: - return - self.state = state - logging.info('new state: %s' % state) - if getattr(gconf, 'state_file', None): - update_file(gconf.state_file, lambda f: f.write(state + '\n')) - - def monitor(self): + computestate = lambda: self.state and self._ST_ORD[max(self._ST_ORD.index(s) for s in self.state.values())] + if w: + self.lock.acquire() + old_state = computestate() + self.state[w] = state + state = computestate() + self.lock.release() + if state != old_state: + self.set_state(state) + else: + logging.info('new state: %s' % state) + if getattr(gconf, 'state_file', None): + update_file(gconf.state_file, lambda f: f.write(state + '\n')) + + @staticmethod + def terminate(): + # relax one SIGTERM by setting a handler that sets back + # standard handler + set_term_handler(lambda *a: set_term_handler()) + # give a chance to graceful exit + os.kill(-os.getpid(), signal.SIGTERM) + + def monitor(self, w, argv, cpids): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -41,27 +109,8 @@ class Monitor(object): blown worker blows up on EPIPE if the net goes down, due to the keep-alive thread) """ - def sigcont_handler(*a): - """ - Re-init logging and send group kill signal - """ - md = gconf.log_metadata - logging.shutdown() - lcls = logging.getLoggerClass() - lcls.setup(label=md.get('saved_label'), **md) - pid = os.getpid() - os.kill(-pid, signal.SIGUSR1) - signal.signal(signal.SIGUSR1, lambda *a: ()) - signal.signal(signal.SIGCONT, sigcont_handler) - - argv = sys.argv[:] - for o in ('-N', '--no-daemon', '--monitor'): - while o in argv: - argv.remove(o) - argv.extend(('-N', '-p', '')) - argv.insert(0, os.path.basename(sys.executable)) - self.set_state('starting...') + self.set_state(self.ST_INIT, w) ret = 0 def nwait(p, o=0): p2, r = waitpid(p, o) @@ -83,7 +132,13 @@ class Monitor(object): cpid = os.fork() if cpid == 0: os.close(pr) - os.execv(sys.executable, argv + ['--feedback-fd', str(pw)]) + os.execv(sys.executable, argv + ['--feedback-fd', str(pw), + '--local-path', w[0], + '--local-id', '.' + escape(w[0]), + '--resource-remote', w[1]]) + self.lock.acquire() + cpids.add(cpid) + self.lock.release() os.close(pw) t0 = time.time() so = select((pr,), (), (), conn_timeout)[0] @@ -103,27 +158,104 @@ class Monitor(object): else: logging.debug("worker not confirmed in %d sec, aborting it" % \ conn_timeout) - # relax one SIGTERM by setting a handler that sets back - # standard handler - set_term_handler(lambda *a: set_term_handler()) - # give a chance to graceful exit - os.kill(-os.getpid(), signal.SIGTERM) + self.terminate() time.sleep(1) os.kill(cpid, signal.SIGKILL) ret = nwait(cpid) if ret == None: - self.set_state('OK') + self.set_state(self.ST_STABLE, w) ret = nwait(cpid) if exit_signalled(ret): ret = 0 else: ret = exit_status(ret) if ret in (0,1): - self.set_state('faulty') + self.set_state(self.ST_FAULTY, w) time.sleep(10) - self.set_state('inconsistent') + self.set_state(self.ST_INCON, w) return ret -def monitor(): + def multiplex(self, wspx, suuid): + def sigcont_handler(*a): + """ + Re-init logging and send group kill signal + """ + md = gconf.log_metadata + logging.shutdown() + lcls = logging.getLoggerClass() + lcls.setup(label=md.get('saved_label'), **md) + pid = os.getpid() + os.kill(-pid, signal.SIGUSR1) + signal.signal(signal.SIGUSR1, lambda *a: ()) + signal.signal(signal.SIGCONT, sigcont_handler) + + argv = sys.argv[:] + for o in ('-N', '--no-daemon', '--monitor'): + while o in argv: + argv.remove(o) + argv.extend(('-N', '-p', '', '--slave-id', suuid)) + argv.insert(0, os.path.basename(sys.executable)) + + cpids = set() + ta = [] + for wx in wspx: + def wmon(w): + cpid, _ = self.monitor(w, argv, cpids) + terminate() + time.sleep(1) + self.lock.acquire() + for cpid in cpids: + os.kill(cpid, signal.SIGKILL) + self.lock.release() + finalize(exval=1) + t = Thread(target = wmon, args=[wx]) + t.start() + ta.append(t) + for t in ta: + t.join() + +def distribute(*resources): + master, slave = resources + mvol = Volinfo(master.volume, master.host) + logging.debug('master bricks: ' + repr(mvol.bricks)) + locmbricks = [ b['dir'] for b in mvol.bricks if is_host_local(b['host']) ] + prelude = [] + si = slave + if isinstance(slave, SSH): + prelude = gconf.ssh_command.split() + [slave.remote_addr] + si = slave.inner_rsc + logging.debug('slave SSH gateway: ' + slave.remote_addr) + if isinstance(si, FILE): + sbricks = {'host': 'localhost', 'dir': si.path} + suuid = uuid.uuid5(uuid.NAMESPACE_URL, slave.get_url(canonical=True)) + elif isinstance(si, GLUSTER): + svol = Volinfo(si.volume, si.host, prelude) + sbricks = svol.bricks + suuid = svol.uuid + else: + raise GsyncdError("unkown slave type " + slave.url) + logging.info('slave bricks: ' + repr(sbricks)) + if isinstance(si, FILE): + slaves = [ slave.url ] + else: + slavenodes = set(b['host'] for b in sbricks) + if isinstance(slave, SSH) and not gconf.isolated_slave: + rap = SSH.parse_ssh_address(slave.remote_addr) + slaves = [ 'ssh://' + rap['user'] + '@' + h + ':' + si.url for h in slavenodes ] + else: + slavevols = [ h + ':' + si.volume for h in slavenodes ] + if isinstance(slave, SSH): + slaves = [ 'ssh://' + rap.remote_addr + ':' + v for v in slavevols ] + else: + slaves = slavevols + locmbricks.sort() + slaves.sort() + workerspex = [] + for i in range(len(locmbricks)): + workerspex.append((locmbricks[i], slaves[i % len(slaves)])) + logging.info('worker specs: ' + repr(workerspex)) + return workerspex, suuid + +def monitor(*resources): """oh yeah, actually Monitor is used as singleton, too""" - return Monitor().monitor() + return Monitor().multiplex(*distribute(*resources)) |