diff options
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 483 |
1 files changed, 199 insertions, 284 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index a26de0c9cf5..6aa7b9dfc99 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -13,21 +13,23 @@ import sys import time import signal import logging -import uuid import xml.etree.ElementTree as XET -from subprocess import PIPE -from resource import Popen, FILE, GLUSTER, SSH from threading import Lock from errno import ECHILD, ESRCH -import re import random -from gconf import gconf -from syncdutils import select, waitpid, errno_wrap -from syncdutils import set_term_handler, is_host_local, GsyncdError -from syncdutils import escape, Thread, finalize, memoize +from resource import SSH +import gsyncdconfig as gconf +import libgfchangelog +from rconf import rconf +from syncdutils import (select, waitpid, errno_wrap, lf, grabpidfile, + set_term_handler, GsyncdError, + Thread, finalize, Volinfo, VolinfoFromGconf, + gf_event, EVENT_GEOREP_FAULTY, get_up_nodes, + unshare_propagation_supported) from gsyncdstatus import GeorepStatus, set_monitor_status - +import py2py3 +from py2py3 import pipe ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -36,6 +38,8 @@ def get_subvol_num(brick_idx, vol, hot): tier = vol.is_tier() disperse_count = vol.disperse_count(tier, hot) replica_count = vol.replica_count(tier, hot) + distribute_count = vol.distribution_count(tier, hot) + gconf.setconfig("master-distribution-count", distribute_count) if (tier and not hot): brick_idx = brick_idx - vol.get_hot_bricks_count(tier) @@ -54,118 +58,6 @@ def get_subvol_num(brick_idx, vol, hot): return str(cnt) -def get_slave_bricks_status(host, vol): - po = Popen(['gluster', '--xml', '--remote-host=' + host, - 'volume', 'status', vol, "detail"], - stdout=PIPE, stderr=PIPE) - vix = po.stdout.read() - po.wait() - po.terminate_geterr(fail_on_err=False) - if po.returncode != 0: - logging.info("Volume status command failed, unable to get " - "list of up nodes of %s, returning empty list: %s" % - (vol, po.returncode)) - return [] - vi = XET.fromstring(vix) - if vi.find('opRet').text != '0': - logging.info("Unable to get list of up nodes of %s, " - "returning empty list: %s" % - (vol, vi.find('opErrstr').text)) - return [] - - up_hosts = set() - - try: - for el in vi.findall('volStatus/volumes/volume/node'): - if el.find('status').text == '1': - up_hosts.add(el.find('hostname').text) - except (ParseError, AttributeError, ValueError) as e: - logging.info("Parsing failed to get list of up nodes of %s, " - "returning empty list: %s" % (vol, e)) - - return list(up_hosts) - - -class Volinfo(object): - - def __init__(self, vol, host='localhost', prelude=[]): - po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, - 'volume', 'info', vol], - stdout=PIPE, stderr=PIPE) - vix = po.stdout.read() - po.wait() - po.terminate_geterr() - vi = XET.fromstring(vix) - if vi.find('opRet').text != '0': - if prelude: - via = '(via %s) ' % prelude.join(' ') - else: - via = ' ' - raise GsyncdError('getting volume info of %s%s ' - 'failed with errorcode %s' % - (vol, via, vi.find('opErrno').text)) - self.tree = vi - self.volume = vol - self.host = host - - def get(self, elem): - return self.tree.findall('.//' + elem) - - def is_tier(self): - return (self.get('typeStr')[0].text == 'Tier') - - def is_hot(self, brickpath): - logging.debug('brickpath: ' + repr(brickpath)) - return brickpath in self.hot_bricks - - @property - @memoize - def bricks(self): - def bparse(b): - host, dirp = b.text.split(':', 2) - return {'host': host, 'dir': dirp} - return [bparse(b) for b in self.get('brick')] - - @property - @memoize - def uuid(self): - ids = self.get('id') - if len(ids) != 1: - raise GsyncdError("volume info of %s obtained from %s: " - "ambiguous uuid" % (self.volume, self.host)) - return ids[0].text - - def replica_count(self, tier, hot): - if (tier and hot): - return int(self.get('hotBricks/hotreplicaCount')[0].text) - elif (tier and not hot): - return int(self.get('coldBricks/coldreplicaCount')[0].text) - else: - return int(self.get('replicaCount')[0].text) - - def disperse_count(self, tier, hot): - if (tier and hot): - # Tiering doesn't support disperse volume as hot brick, - # hence no xml output, so returning 0. In case, if it's - # supported later, we should change here. - return 0 - elif (tier and not hot): - return int(self.get('coldBricks/colddisperseCount')[0].text) - else: - return int(self.get('disperseCount')[0].text) - - @property - @memoize - def hot_bricks(self): - return [b.text for b in self.get('hotBricks/brick')] - - def get_hot_bricks_count(self, tier): - if (tier): - return int(self.get('hotBricks/hotbrickCount')[0].text) - else: - return 0 - - class Monitor(object): """class which spawns and manages gsyncd workers""" @@ -190,7 +82,8 @@ class Monitor(object): # give a chance to graceful exit errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH]) - def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master): + def monitor(self, w, argv, cpids, slave_vol, slave_host, master, + suuid, slavenodes): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -209,12 +102,14 @@ class Monitor(object): blown worker blows up on EPIPE if the net goes down, due to the keep-alive thread) """ - if not self.status.get(w[0], None): - self.status[w[0]] = GeorepStatus(gconf.state_file, w[0]) - - set_monitor_status(gconf.state_file, self.ST_STARTED) - self.status[w[0]].set_worker_status(self.ST_INIT) - + if not self.status.get(w[0]['dir'], None): + self.status[w[0]['dir']] = GeorepStatus(gconf.get("state-file"), + w[0]['host'], + w[0]['dir'], + w[0]['uuid'], + master, + "%s::%s" % (slave_host, + slave_vol)) ret = 0 def nwait(p, o=0): @@ -232,7 +127,7 @@ class Monitor(object): raise def exit_signalled(s): - """ child teminated due to receipt of SIGUSR1 """ + """ child terminated due to receipt of SIGUSR1 """ return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1)) def exit_status(s): @@ -240,77 +135,76 @@ class Monitor(object): return os.WEXITSTATUS(s) return 1 - conn_timeout = int(gconf.connection_timeout) + conn_timeout = gconf.get("connection-timeout") while ret in (0, 1): - remote_host = w[1] + remote_user, remote_host = w[1][0].split("@") + remote_id = w[1][1] # Check the status of the connected slave node # If the connected slave node is down then try to connect to # different up node. - m = re.match("(ssh|gluster|file):\/\/(.+)@([^:]+):(.+)", - remote_host) - if m: - current_slave_host = m.group(3) - slave_up_hosts = get_slave_bricks_status( - slave_host, slave_vol) - - if current_slave_host not in slave_up_hosts: - if len(slave_up_hosts) > 0: - remote_host = "%s://%s@%s:%s" % (m.group(1), - m.group(2), - random.choice( - slave_up_hosts), - m.group(4)) - - # Spawn the worker and agent in lock to avoid fd leak + current_slave_host = remote_host + slave_up_hosts = get_up_nodes(slavenodes, gconf.get("ssh-port")) + + if (current_slave_host, remote_id) not in slave_up_hosts: + if len(slave_up_hosts) > 0: + remote_new = random.choice(slave_up_hosts) + remote_host = "%s@%s" % (remote_user, remote_new[0]) + remote_id = remote_new[1] + + # Spawn the worker in lock to avoid fd leak self.lock.acquire() - logging.info('-' * conn_timeout) - logging.info('starting gsyncd worker') - - # Couple of pipe pairs for RPC communication b/w - # worker and changelog agent. - - # read/write end for agent - (ra, ww) = os.pipe() - # read/write end for worker - (rw, wa) = os.pipe() - - # spawn the agent process - apid = os.fork() - if apid == 0: - os.close(rw) - os.close(ww) - os.execv(sys.executable, argv + ['--local-path', w[0], - '--agent', - '--rpc-fd', - ','.join([str(ra), str(wa), - str(rw), str(ww)])]) - pr, pw = os.pipe() + self.status[w[0]['dir']].set_worker_status(self.ST_INIT) + logging.info(lf('starting gsyncd worker', + brick=w[0]['dir'], + slave_node=remote_host)) + + pr, pw = pipe() cpid = os.fork() if cpid == 0: os.close(pr) - os.close(ra) - os.close(wa) - os.execv(sys.executable, argv + ['--feedback-fd', str(pw), - '--local-path', w[0], - '--local-id', - '.' + escape(w[0]), - '--rpc-fd', - ','.join([str(rw), str(ww), - str(ra), str(wa)]), - '--subvol-num', str(w[2])] + - (['--is-hottier'] if w[3] else []) + - ['--resource-remote', remote_host]) + + args_to_worker = argv + [ + 'worker', + rconf.args.master, + rconf.args.slave, + '--feedback-fd', str(pw), + '--local-path', w[0]['dir'], + '--local-node', w[0]['host'], + '--local-node-id', w[0]['uuid'], + '--slave-id', suuid, + '--subvol-num', str(w[2]), + '--resource-remote', remote_host, + '--resource-remote-id', remote_id + ] + + if rconf.args.config_file is not None: + args_to_worker += ['-c', rconf.args.config_file] + + if w[3]: + args_to_worker.append("--is-hottier") + + if rconf.args.debug: + args_to_worker.append("--debug") + + access_mount = gconf.get("access-mount") + if access_mount: + os.execv(sys.executable, args_to_worker) + else: + if unshare_propagation_supported(): + logging.debug("Worker would mount volume privately") + unshare_cmd = ['unshare', '-m', '--propagation', + 'private'] + cmd = unshare_cmd + args_to_worker + os.execvp("unshare", cmd) + else: + logging.debug("Mount is not private. It would be lazy" + " umounted") + os.execv(sys.executable, args_to_worker) cpids.add(cpid) - agents.add(apid) os.close(pw) - # close all RPC pipes in monitor - os.close(ra) - os.close(wa) - os.close(rw) - os.close(ww) self.lock.release() t0 = time.time() @@ -319,162 +213,183 @@ class Monitor(object): if so: ret = nwait(cpid, os.WNOHANG) - ret_agent = nwait(apid, os.WNOHANG) - - if ret_agent is not None: - # Agent is died Kill Worker - logging.info("Changelog Agent died, " - "Aborting Worker(%s)" % w[0]) - errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - nwait(cpid) - nwait(apid) if ret is not None: - logging.info("worker(%s) died before establishing " - "connection" % w[0]) - nwait(apid) # wait for agent + logging.info(lf("worker died before establishing " + "connection", + brick=w[0]['dir'])) else: - logging.debug("worker(%s) connected" % w[0]) + logging.debug("worker(%s) connected" % w[0]['dir']) while time.time() < t0 + conn_timeout: ret = nwait(cpid, os.WNOHANG) - ret_agent = nwait(apid, os.WNOHANG) if ret is not None: - logging.info("worker(%s) died in startup " - "phase" % w[0]) - nwait(apid) # wait for agent - break - - if ret_agent is not None: - # Agent is died Kill Worker - logging.info("Changelog Agent died, Aborting " - "Worker(%s)" % w[0]) - errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - nwait(cpid) - nwait(apid) + logging.info(lf("worker died in startup phase", + brick=w[0]['dir'])) break time.sleep(1) else: - logging.info("worker(%s) not confirmed in %d sec, " - "aborting it" % (w[0], conn_timeout)) + logging.info( + lf("Worker not confirmed after wait, aborting it. " + "Gsyncd invocation on remote slave via SSH or " + "gluster master mount might have hung. Please " + "check the above logs for exact issue and check " + "master or slave volume for errors. Restarting " + "master/slave volume accordingly might help.", + brick=w[0]['dir'], + timeout=conn_timeout)) errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - nwait(apid) # wait for agent ret = nwait(cpid) if ret is None: - self.status[w[0]].set_worker_status(self.ST_STABLE) - # If worker dies, agent terminates on EOF. - # So lets wait for agent first. - nwait(apid) ret = nwait(cpid) if exit_signalled(ret): ret = 0 else: ret = exit_status(ret) if ret in (0, 1): - self.status[w[0]].set_worker_status(self.ST_FAULTY) + self.status[w[0]['dir']].set_worker_status(self.ST_FAULTY) + gf_event(EVENT_GEOREP_FAULTY, + master_volume=master.volume, + master_node=w[0]['host'], + master_node_id=w[0]['uuid'], + slave_host=slave_host, + slave_volume=slave_vol, + current_slave_host=current_slave_host, + brick_path=w[0]['dir']) time.sleep(10) - self.status[w[0]].set_worker_status(self.ST_INCON) + self.status[w[0]['dir']].set_worker_status(self.ST_INCON) return ret - def multiplex(self, wspx, suuid, slave_vol, slave_host, master): - argv = sys.argv[:] - for o in ('-N', '--no-daemon', '--monitor'): - while o in argv: - argv.remove(o) - argv.extend(('-N', '-p', '', '--slave-id', suuid)) - argv.insert(0, os.path.basename(sys.executable)) + def multiplex(self, wspx, suuid, slave_vol, slave_host, master, slavenodes): + argv = [os.path.basename(sys.executable), sys.argv[0]] cpids = set() - agents = set() ta = [] for wx in wspx: def wmon(w): - cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, - slave_host, master) + cpid, _ = self.monitor(w, argv, cpids, slave_vol, + slave_host, master, suuid, slavenodes) time.sleep(1) self.lock.acquire() for cpid in cpids: errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - for apid in agents: - errno_wrap(os.kill, [apid, signal.SIGKILL], [ESRCH]) self.lock.release() finalize(exval=1) t = Thread(target=wmon, args=[wx]) t.start() ta.append(t) + + # monitor status was being updated in each monitor thread. It + # should not be done as it can cause deadlock for a worker start. + # set_monitor_status uses flock to synchronize multple instances + # updating the file. Since each monitor thread forks worker, + # these processes can hold the reference to fd of status + # file causing deadlock to workers which starts later as flock + # will not be release until all references to same fd is closed. + # It will also cause fd leaks. + + self.lock.acquire() + set_monitor_status(gconf.get("state-file"), self.ST_STARTED) + self.lock.release() for t in ta: t.join() -def distribute(*resources): - master, slave = resources - mvol = Volinfo(master.volume, master.host) +def distribute(master, slave): + if rconf.args.use_gconf_volinfo: + mvol = VolinfoFromGconf(master.volume, master=True) + else: + mvol = Volinfo(master.volume, master.host, master=True) logging.debug('master bricks: ' + repr(mvol.bricks)) prelude = [] - si = slave slave_host = None slave_vol = None - if isinstance(slave, SSH): - prelude = gconf.ssh_command.split() + [slave.remote_addr] - si = slave.inner_rsc - logging.debug('slave SSH gateway: ' + slave.remote_addr) - if isinstance(si, FILE): - sbricks = {'host': 'localhost', 'dir': si.path} - suuid = uuid.uuid5(uuid.NAMESPACE_URL, slave.get_url(canonical=True)) - elif isinstance(si, GLUSTER): - svol = Volinfo(si.volume, slave.remote_addr.split('@')[-1]) - sbricks = svol.bricks - suuid = svol.uuid - slave_host = slave.remote_addr.split('@')[-1] - slave_vol = si.volume - - # save this xattr for the session delete command - old_stime_xattr_name = getattr(gconf, "master.stime_xattr_name", None) - new_stime_xattr_name = "trusted.glusterfs." + mvol.uuid + "." + \ - svol.uuid + ".stime" - if not old_stime_xattr_name or \ - old_stime_xattr_name != new_stime_xattr_name: - gconf.configinterface.set("master.stime_xattr_name", - new_stime_xattr_name) - else: - raise GsyncdError("unknown slave type " + slave.url) - logging.info('slave bricks: ' + repr(sbricks)) - if isinstance(si, FILE): - slaves = [slave.url] + prelude = [gconf.get("ssh-command")] + \ + gconf.get("ssh-options").split() + \ + ["-p", str(gconf.get("ssh-port"))] + \ + [slave.remote_addr] + + logging.debug('slave SSH gateway: ' + slave.remote_addr) + + if rconf.args.use_gconf_volinfo: + svol = VolinfoFromGconf(slave.volume, master=False) else: - slavenodes = set(b['host'] for b in sbricks) - if isinstance(slave, SSH) and not gconf.isolated_slave: - rap = SSH.parse_ssh_address(slave) - slaves = ['ssh://' + rap['user'] + '@' + h + ':' + si.url - for h in slavenodes] - else: - slavevols = [h + ':' + si.volume for h in slavenodes] - if isinstance(slave, SSH): - slaves = ['ssh://' + rap.remote_addr + ':' + v - for v in slavevols] - else: - slaves = slavevols + svol = Volinfo(slave.volume, "localhost", prelude, master=False) + + sbricks = svol.bricks + suuid = svol.uuid + slave_host = slave.remote_addr.split('@')[-1] + slave_vol = slave.volume + + # save this xattr for the session delete command + old_stime_xattr_prefix = gconf.get("stime-xattr-prefix", None) + new_stime_xattr_prefix = "trusted.glusterfs." + mvol.uuid + "." + \ + svol.uuid + if not old_stime_xattr_prefix or \ + old_stime_xattr_prefix != new_stime_xattr_prefix: + gconf.setconfig("stime-xattr-prefix", new_stime_xattr_prefix) + + logging.debug('slave bricks: ' + repr(sbricks)) + + slavenodes = set((b['host'], b["uuid"]) for b in sbricks) + rap = SSH.parse_ssh_address(slave) + slaves = [(rap['user'] + '@' + h[0], h[1]) for h in slavenodes] workerspex = [] for idx, brick in enumerate(mvol.bricks): - if is_host_local(brick['host']): + if rconf.args.local_node_id == brick['uuid']: is_hot = mvol.is_hot(":".join([brick['host'], brick['dir']])) - workerspex.append((brick['dir'], + workerspex.append((brick, slaves[idx % len(slaves)], get_subvol_num(idx, mvol, is_hot), is_hot)) - logging.info('worker specs: ' + repr(workerspex)) - return workerspex, suuid, slave_vol, slave_host, master + logging.debug('worker specs: ' + repr(workerspex)) + return workerspex, suuid, slave_vol, slave_host, master, slavenodes -def monitor(*resources): +def monitor(local, remote): # Check if gsyncd restarted in pause state. If # yes, send SIGSTOP to negative of monitor pid # to go back to pause state. - if gconf.pause_on_start: + if rconf.args.pause_on_start: errno_wrap(os.kill, [-os.getpid(), signal.SIGSTOP], [ESRCH]) """oh yeah, actually Monitor is used as singleton, too""" - return Monitor().multiplex(*distribute(*resources)) + return Monitor().multiplex(*distribute(local, remote)) + + +def startup(go_daemon=True): + """set up logging, pidfile grabbing, daemonization""" + pid_file = gconf.get("pid-file") + if not grabpidfile(): + sys.stderr.write("pidfile is taken, exiting.\n") + sys.exit(2) + rconf.pid_file_owned = True + + if not go_daemon: + return + + x, y = pipe() + cpid = os.fork() + if cpid: + os.close(x) + sys.exit() + os.close(y) + os.setsid() + dn = os.open(os.devnull, os.O_RDWR) + for f in (sys.stdin, sys.stdout, sys.stderr): + os.dup2(dn, f.fileno()) + + if not grabpidfile(pid_file + '.tmp'): + raise GsyncdError("cannot grab temporary pidfile") + + os.rename(pid_file + '.tmp', pid_file) + + # wait for parent to terminate + # so we can start up with + # no messing from the dirty + # ol' bustard + select((x,), (), ()) + os.close(x) |
