diff options
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 240 |
1 files changed, 138 insertions, 102 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index c6fa1076a85..a193b57caff 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -13,21 +13,19 @@ import sys import time import signal import logging -import uuid import xml.etree.ElementTree as XET from subprocess import PIPE -from resource import FILE, GLUSTER, SSH from threading import Lock from errno import ECHILD, ESRCH -import re import random -from gconf import gconf -from syncdutils import select, waitpid, errno_wrap, lf + +from resource import SSH +import gsyncdconfig as gconf +from rconf import rconf +from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile from syncdutils import set_term_handler, is_host_local, GsyncdError -from syncdutils import escape, Thread, finalize +from syncdutils import Thread, finalize, Popen, Volinfo from syncdutils import gf_event, EVENT_GEOREP_FAULTY -from syncdutils import Volinfo, Popen - from gsyncdstatus import GeorepStatus, set_monitor_status @@ -82,7 +80,8 @@ def get_slave_bricks_status(host, vol): try: for el in vi.findall('volStatus/volumes/volume/node'): if el.find('status').text == '1': - up_hosts.add(el.find('hostname').text) + up_hosts.add((el.find('hostname').text, + el.find('peerid').text)) except (ParseError, AttributeError, ValueError) as e: logging.info(lf("Parsing failed to get list of up nodes, " "returning empty list", @@ -116,7 +115,8 @@ class Monitor(object): # give a chance to graceful exit errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH]) - def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master): + def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master, + suuid): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -136,7 +136,7 @@ class Monitor(object): due to the keep-alive thread) """ if not self.status.get(w[0]['dir'], None): - self.status[w[0]['dir']] = GeorepStatus(gconf.state_file, + self.status[w[0]['dir']] = GeorepStatus(gconf.get("state-file"), w[0]['host'], w[0]['dir'], w[0]['uuid'], @@ -144,7 +144,7 @@ class Monitor(object): "%s::%s" % (slave_host, slave_vol)) - set_monitor_status(gconf.state_file, self.ST_STARTED) + set_monitor_status(gconf.get("state-file"), self.ST_STARTED) self.status[w[0]['dir']].set_worker_status(self.ST_INIT) ret = 0 @@ -172,26 +172,22 @@ class Monitor(object): return os.WEXITSTATUS(s) return 1 - conn_timeout = int(gconf.connection_timeout) + conn_timeout = gconf.get("connection-timeout") while ret in (0, 1): - remote_host = w[1] + remote_user, remote_host = w[1][0].split("@") + remote_id = w[1][1] # Check the status of the connected slave node # If the connected slave node is down then try to connect to # different up node. - m = re.match("(ssh|gluster|file):\/\/(.+)@([^:]+):(.+)", - remote_host) - if m: - current_slave_host = m.group(3) - slave_up_hosts = get_slave_bricks_status( - slave_host, slave_vol) - - if current_slave_host not in slave_up_hosts: - if len(slave_up_hosts) > 0: - remote_host = "%s://%s@%s:%s" % (m.group(1), - m.group(2), - random.choice( - slave_up_hosts), - m.group(4)) + current_slave_host = remote_host + slave_up_hosts = get_slave_bricks_status( + slave_host, slave_vol) + + if (current_slave_host, remote_id) not in slave_up_hosts: + if len(slave_up_hosts) > 0: + remote_new = random.choice(slave_up_hosts) + remote_host = "%s@%s" % (remote_user, remote_new[0]) + remote_id = remote_new[1] # Spawn the worker and agent in lock to avoid fd leak self.lock.acquire() @@ -213,33 +209,58 @@ class Monitor(object): if apid == 0: os.close(rw) os.close(ww) - os.execv(sys.executable, argv + ['--local-path', w[0]['dir'], - '--local-node', w[0]['host'], - '--local-node-id', - w[0]['uuid'], - '--agent', - '--rpc-fd', - ','.join([str(ra), str(wa), - str(rw), str(ww)])]) + args_to_agent = argv + [ + 'agent', + rconf.args.master, + rconf.args.slave, + '--local-path', w[0]['dir'], + '--local-node', w[0]['host'], + '--local-node-id', w[0]['uuid'], + '--slave-id', suuid, + '--rpc-fd', ','.join([str(ra), str(wa), str(rw), str(ww)]) + ] + + if rconf.args.config_file is not None: + args_to_agent += ['-c', rconf.args.config_file] + + if rconf.args.debug: + args_to_agent.append("--debug") + + os.execv(sys.executable, args_to_agent) + pr, pw = os.pipe() cpid = os.fork() if cpid == 0: os.close(pr) os.close(ra) os.close(wa) - os.execv(sys.executable, argv + ['--feedback-fd', str(pw), - '--local-path', w[0]['dir'], - '--local-node', w[0]['host'], - '--local-node-id', - w[0]['uuid'], - '--local-id', - '.' + escape(w[0]['dir']), - '--rpc-fd', - ','.join([str(rw), str(ww), - str(ra), str(wa)]), - '--subvol-num', str(w[2])] + - (['--is-hottier'] if w[3] else []) + - ['--resource-remote', remote_host]) + + args_to_worker = argv + [ + 'worker', + rconf.args.master, + rconf.args.slave, + '--feedback-fd', str(pw), + '--local-path', w[0]['dir'], + '--local-node', w[0]['host'], + '--local-node-id', w[0]['uuid'], + '--slave-id', suuid, + '--rpc-fd', + ','.join([str(rw), str(ww), str(ra), str(wa)]), + '--subvol-num', str(w[2]), + '--resource-remote', remote_host, + '--resource-remote-id', remote_id + ] + + if rconf.args.config_file is not None: + args_to_worker += ['-c', rconf.args.config_file] + + if w[3]: + args_to_worker.append("--is-hottier") + + if rconf.args.debug: + args_to_worker.append("--debug") + + os.execv(sys.executable, args_to_worker) cpids.add(cpid) agents.add(apid) @@ -290,7 +311,8 @@ class Monitor(object): logging.info(lf("Changelog Agent died, Aborting " "Worker", brick=w[0]['dir'])) - errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) + errno_wrap(os.kill, [cpid, signal.SIGKILL], + [ESRCH]) nwait(cpid) nwait(apid) break @@ -333,12 +355,7 @@ class Monitor(object): return ret def multiplex(self, wspx, suuid, slave_vol, slave_host, master): - argv = sys.argv[:] - for o in ('-N', '--no-daemon', '--monitor'): - while o in argv: - argv.remove(o) - argv.extend(('-N', '-p', '', '--slave-id', suuid)) - argv.insert(0, os.path.basename(sys.executable)) + argv = [os.path.basename(sys.executable), sys.argv[0]] cpids = set() agents = set() @@ -346,7 +363,7 @@ class Monitor(object): for wx in wspx: def wmon(w): cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, - slave_host, master) + slave_host, master, suuid) time.sleep(1) self.lock.acquire() for cpid in cpids: @@ -362,55 +379,39 @@ class Monitor(object): t.join() -def distribute(*resources): - master, slave = resources +def distribute(master, slave): mvol = Volinfo(master.volume, master.host) logging.debug('master bricks: ' + repr(mvol.bricks)) prelude = [] - si = slave slave_host = None slave_vol = None - if isinstance(slave, SSH): - prelude = gconf.ssh_command.split() + [slave.remote_addr] - si = slave.inner_rsc - logging.debug('slave SSH gateway: ' + slave.remote_addr) - if isinstance(si, FILE): - sbricks = {'host': 'localhost', 'dir': si.path} - suuid = uuid.uuid5(uuid.NAMESPACE_URL, slave.get_url(canonical=True)) - elif isinstance(si, GLUSTER): - svol = Volinfo(si.volume, slave.remote_addr.split('@')[-1]) - sbricks = svol.bricks - suuid = svol.uuid - slave_host = slave.remote_addr.split('@')[-1] - slave_vol = si.volume - - # save this xattr for the session delete command - old_stime_xattr_name = getattr(gconf, "master.stime_xattr_name", None) - new_stime_xattr_name = "trusted.glusterfs." + mvol.uuid + "." + \ - svol.uuid + ".stime" - if not old_stime_xattr_name or \ - old_stime_xattr_name != new_stime_xattr_name: - gconf.configinterface.set("master.stime_xattr_name", - new_stime_xattr_name) - else: - raise GsyncdError("unknown slave type " + slave.url) + prelude = [gconf.get("ssh-command")] + \ + gconf.get("ssh-options").split() + \ + ["-p", str(gconf.get("ssh-port"))] + \ + [slave.remote_addr] + + logging.debug('slave SSH gateway: ' + slave.remote_addr) + + svol = Volinfo(slave.volume, "localhost", prelude) + sbricks = svol.bricks + suuid = svol.uuid + slave_host = slave.remote_addr.split('@')[-1] + slave_vol = slave.volume + + # save this xattr for the session delete command + old_stime_xattr_prefix = gconf.get("stime-xattr-prefix", None) + new_stime_xattr_prefix = "trusted.glusterfs." + mvol.uuid + "." + \ + svol.uuid + if not old_stime_xattr_prefix or \ + old_stime_xattr_prefix != new_stime_xattr_prefix: + gconf.setconfig("stime-xattr-prefix", new_stime_xattr_prefix) + logging.debug('slave bricks: ' + repr(sbricks)) - if isinstance(si, FILE): - slaves = [slave.url] - else: - slavenodes = set(b['host'] for b in sbricks) - if isinstance(slave, SSH) and not gconf.isolated_slave: - rap = SSH.parse_ssh_address(slave) - slaves = ['ssh://' + rap['user'] + '@' + h + ':' + si.url - for h in slavenodes] - else: - slavevols = [h + ':' + si.volume for h in slavenodes] - if isinstance(slave, SSH): - slaves = ['ssh://' + rap.remote_addr + ':' + v - for v in slavevols] - else: - slaves = slavevols + + slavenodes = set((b['host'], b["uuid"]) for b in sbricks) + rap = SSH.parse_ssh_address(slave) + slaves = [(rap['user'] + '@' + h[0], h[1]) for h in slavenodes] workerspex = [] for idx, brick in enumerate(mvol.bricks): @@ -424,12 +425,47 @@ def distribute(*resources): return workerspex, suuid, slave_vol, slave_host, master -def monitor(*resources): +def monitor(local, remote): # Check if gsyncd restarted in pause state. If # yes, send SIGSTOP to negative of monitor pid # to go back to pause state. - if gconf.pause_on_start: + if rconf.args.pause_on_start: errno_wrap(os.kill, [-os.getpid(), signal.SIGSTOP], [ESRCH]) """oh yeah, actually Monitor is used as singleton, too""" - return Monitor().multiplex(*distribute(*resources)) + return Monitor().multiplex(*distribute(local, remote)) + + +def startup(go_daemon=True): + """set up logging, pidfile grabbing, daemonization""" + pid_file = gconf.get("pid-file") + if not grabpidfile(): + sys.stderr.write("pidfile is taken, exiting.\n") + sys.exit(2) + rconf.pid_file_owned = True + + if not go_daemon: + return + + x, y = os.pipe() + cpid = os.fork() + if cpid: + os.close(x) + sys.exit() + os.close(y) + os.setsid() + dn = os.open(os.devnull, os.O_RDWR) + for f in (sys.stdin, sys.stdout, sys.stderr): + os.dup2(dn, f.fileno()) + + if not grabpidfile(pid_file + '.tmp'): + raise GsyncdError("cannot grab temporary pidfile") + + os.rename(pid_file + '.tmp', pid_file) + + # wait for parent to terminate + # so we can start up with + # no messing from the dirty + # ol' bustard + select((x,), (), ()) + os.close(x) |