diff options
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 496 |
1 files changed, 252 insertions, 244 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 3e0360332bd..6aa7b9dfc99 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -13,91 +13,49 @@ import sys import time import signal import logging -import uuid import xml.etree.ElementTree as XET -from subprocess import PIPE -from resource import Popen, FILE, GLUSTER, SSH from threading import Lock -import re +from errno import ECHILD, ESRCH import random -from gconf import gconf -from syncdutils import update_file, select, waitpid -from syncdutils import set_term_handler, is_host_local, GsyncdError -from syncdutils import escape, Thread, finalize, memoize +from resource import SSH +import gsyncdconfig as gconf +import libgfchangelog +from rconf import rconf +from syncdutils import (select, waitpid, errno_wrap, lf, grabpidfile, + set_term_handler, GsyncdError, + Thread, finalize, Volinfo, VolinfoFromGconf, + gf_event, EVENT_GEOREP_FAULTY, get_up_nodes, + unshare_propagation_supported) +from gsyncdstatus import GeorepStatus, set_monitor_status +import py2py3 +from py2py3 import pipe ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError -def get_slave_bricks_status(host, vol): - po = Popen(['gluster', '--xml', '--remote-host=' + host, - 'volume', 'status', vol, "detail"], - stdout=PIPE, stderr=PIPE) - vix = po.stdout.read() - po.wait() - po.terminate_geterr() - vi = XET.fromstring(vix) - if vi.find('opRet').text != '0': - logging.info("Unable to get list of up nodes of %s, " - "returning empty list: %s" % - (vol, vi.find('opErrstr').text)) - return [] - - up_hosts = set() - - try: - for el in vi.findall('volStatus/volumes/volume/node'): - if el.find('status').text == '1': - up_hosts.add(el.find('hostname').text) - except (ParseError, AttributeError, ValueError) as e: - logging.info("Parsing failed to get list of up nodes of %s, " - "returning empty list: %s" % (vol, e)) - - return list(up_hosts) - - -class Volinfo(object): - - def __init__(self, vol, host='localhost', prelude=[]): - po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, - 'volume', 'info', vol], - stdout=PIPE, stderr=PIPE) - vix = po.stdout.read() - po.wait() - po.terminate_geterr() - vi = XET.fromstring(vix) - if vi.find('opRet').text != '0': - if prelude: - via = '(via %s) ' % prelude.join(' ') - else: - via = ' ' - raise GsyncdError('getting volume info of %s%s ' - 'failed with errorcode %s', - (vol, via, vi.find('opErrno').text)) - self.tree = vi - self.volume = vol - self.host = host - - def get(self, elem): - return self.tree.findall('.//' + elem) - - @property - @memoize - def bricks(self): - def bparse(b): - host, dirp = b.text.split(':', 2) - return {'host': host, 'dir': dirp} - return [bparse(b) for b in self.get('brick')] - - @property - @memoize - def uuid(self): - ids = self.get('id') - if len(ids) != 1: - raise GsyncdError("volume info of %s obtained from %s: " - "ambiguous uuid", - self.volume, self.host) - return ids[0].text +def get_subvol_num(brick_idx, vol, hot): + tier = vol.is_tier() + disperse_count = vol.disperse_count(tier, hot) + replica_count = vol.replica_count(tier, hot) + distribute_count = vol.distribution_count(tier, hot) + gconf.setconfig("master-distribution-count", distribute_count) + + if (tier and not hot): + brick_idx = brick_idx - vol.get_hot_bricks_count(tier) + + subvol_size = disperse_count if disperse_count > 0 else replica_count + cnt = int((brick_idx + 1) / subvol_size) + rem = (brick_idx + 1) % subvol_size + if rem > 0: + cnt = cnt + 1 + + if (tier and hot): + return "hot_" + str(cnt) + elif (tier and not hot): + return "cold_" + str(cnt) + else: + return str(cnt) class Monitor(object): @@ -105,41 +63,16 @@ class Monitor(object): """class which spawns and manages gsyncd workers""" ST_INIT = 'Initializing...' - ST_STABLE = 'Stable' - ST_FAULTY = 'faulty' + ST_STARTED = 'Started' + ST_STABLE = 'Active' + ST_FAULTY = 'Faulty' ST_INCON = 'inconsistent' _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON] def __init__(self): self.lock = Lock() self.state = {} - - def set_state(self, state, w=None): - """set the state that can be used by external agents - like glusterd for status reporting""" - computestate = lambda: self.state and self._ST_ORD[ - max(self._ST_ORD.index(s) for s in self.state.values())] - if w: - self.lock.acquire() - old_state = computestate() - self.state[w] = state - state = computestate() - self.lock.release() - if state != old_state: - self.set_state(state) - else: - if getattr(gconf, 'state_file', None): - # If previous state is paused, suffix the - # new state with '(Paused)' - try: - with open(gconf.state_file, "r") as f: - content = f.read() - if "paused" in content.lower(): - state = state + '(Paused)' - except IOError: - pass - logging.info('new state: %s' % state) - update_file(gconf.state_file, lambda f: f.write(state + '\n')) + self.status = {} @staticmethod def terminate(): @@ -147,9 +80,10 @@ class Monitor(object): # standard handler set_term_handler(lambda *a: set_term_handler()) # give a chance to graceful exit - os.kill(-os.getpid(), signal.SIGTERM) + errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH]) - def monitor(self, w, argv, cpids, agents, slave_vol, slave_host): + def monitor(self, w, argv, cpids, slave_vol, slave_host, master, + suuid, slavenodes): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -168,19 +102,32 @@ class Monitor(object): blown worker blows up on EPIPE if the net goes down, due to the keep-alive thread) """ - - self.set_state(self.ST_INIT, w) - + if not self.status.get(w[0]['dir'], None): + self.status[w[0]['dir']] = GeorepStatus(gconf.get("state-file"), + w[0]['host'], + w[0]['dir'], + w[0]['uuid'], + master, + "%s::%s" % (slave_host, + slave_vol)) ret = 0 def nwait(p, o=0): - p2, r = waitpid(p, o) - if not p2: - return - return r + try: + p2, r = waitpid(p, o) + if not p2: + return + return r + except OSError as e: + # no child process, this happens if the child process + # already died and has been cleaned up + if e.errno == ECHILD: + return -1 + else: + raise def exit_signalled(s): - """ child teminated due to receipt of SIGUSR1 """ + """ child terminated due to receipt of SIGUSR1 """ return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1)) def exit_status(s): @@ -188,72 +135,76 @@ class Monitor(object): return os.WEXITSTATUS(s) return 1 - conn_timeout = int(gconf.connection_timeout) + conn_timeout = gconf.get("connection-timeout") while ret in (0, 1): - remote_host = w[1] + remote_user, remote_host = w[1][0].split("@") + remote_id = w[1][1] # Check the status of the connected slave node # If the connected slave node is down then try to connect to # different up node. - m = re.match("(ssh|gluster|file):\/\/(.+)@([^:]+):(.+)", - remote_host) - if m: - current_slave_host = m.group(3) - slave_up_hosts = get_slave_bricks_status( - slave_host, slave_vol) - - if current_slave_host not in slave_up_hosts: - if len(slave_up_hosts) > 0: - remote_host = "%s://%s@%s:%s" % (m.group(1), - m.group(2), - random.choice( - slave_up_hosts), - m.group(4)) - - # Spawn the worker and agent in lock to avoid fd leak + current_slave_host = remote_host + slave_up_hosts = get_up_nodes(slavenodes, gconf.get("ssh-port")) + + if (current_slave_host, remote_id) not in slave_up_hosts: + if len(slave_up_hosts) > 0: + remote_new = random.choice(slave_up_hosts) + remote_host = "%s@%s" % (remote_user, remote_new[0]) + remote_id = remote_new[1] + + # Spawn the worker in lock to avoid fd leak self.lock.acquire() - logging.info('-' * conn_timeout) - logging.info('starting gsyncd worker') - - # Couple of pipe pairs for RPC communication b/w - # worker and changelog agent. - - # read/write end for agent - (ra, ww) = os.pipe() - # read/write end for worker - (rw, wa) = os.pipe() - - # spawn the agent process - apid = os.fork() - if apid == 0: - os.execv(sys.executable, argv + ['--local-path', w[0], - '--agent', - '--rpc-fd', - ','.join([str(ra), str(wa), - str(rw), str(ww)])]) - pr, pw = os.pipe() + self.status[w[0]['dir']].set_worker_status(self.ST_INIT) + logging.info(lf('starting gsyncd worker', + brick=w[0]['dir'], + slave_node=remote_host)) + + pr, pw = pipe() cpid = os.fork() if cpid == 0: os.close(pr) - os.execv(sys.executable, argv + ['--feedback-fd', str(pw), - '--local-path', w[0], - '--local-id', - '.' + escape(w[0]), - '--rpc-fd', - ','.join([str(rw), str(ww), - str(ra), str(wa)]), - '--resource-remote', - remote_host]) + + args_to_worker = argv + [ + 'worker', + rconf.args.master, + rconf.args.slave, + '--feedback-fd', str(pw), + '--local-path', w[0]['dir'], + '--local-node', w[0]['host'], + '--local-node-id', w[0]['uuid'], + '--slave-id', suuid, + '--subvol-num', str(w[2]), + '--resource-remote', remote_host, + '--resource-remote-id', remote_id + ] + + if rconf.args.config_file is not None: + args_to_worker += ['-c', rconf.args.config_file] + + if w[3]: + args_to_worker.append("--is-hottier") + + if rconf.args.debug: + args_to_worker.append("--debug") + + access_mount = gconf.get("access-mount") + if access_mount: + os.execv(sys.executable, args_to_worker) + else: + if unshare_propagation_supported(): + logging.debug("Worker would mount volume privately") + unshare_cmd = ['unshare', '-m', '--propagation', + 'private'] + cmd = unshare_cmd + args_to_worker + os.execvp("unshare", cmd) + else: + logging.debug("Mount is not private. It would be lazy" + " umounted") + os.execv(sys.executable, args_to_worker) cpids.add(cpid) - agents.add(apid) os.close(pw) - # close all RPC pipes in monitor - os.close(ra) - os.close(wa) - os.close(rw) - os.close(ww) self.lock.release() t0 = time.time() @@ -262,126 +213,183 @@ class Monitor(object): if so: ret = nwait(cpid, os.WNOHANG) + if ret is not None: - logging.info("worker(%s) died before establishing " - "connection" % w[0]) - nwait(apid) #wait for agent + logging.info(lf("worker died before establishing " + "connection", + brick=w[0]['dir'])) else: - logging.debug("worker(%s) connected" % w[0]) + logging.debug("worker(%s) connected" % w[0]['dir']) while time.time() < t0 + conn_timeout: ret = nwait(cpid, os.WNOHANG) + if ret is not None: - logging.info("worker(%s) died in startup " - "phase" % w[0]) - nwait(apid) #wait for agent + logging.info(lf("worker died in startup phase", + brick=w[0]['dir'])) break + time.sleep(1) else: - logging.info("worker(%s) not confirmed in %d sec, " - "aborting it" % (w[0], conn_timeout)) - os.kill(cpid, signal.SIGKILL) - nwait(apid) #wait for agent + logging.info( + lf("Worker not confirmed after wait, aborting it. " + "Gsyncd invocation on remote slave via SSH or " + "gluster master mount might have hung. Please " + "check the above logs for exact issue and check " + "master or slave volume for errors. Restarting " + "master/slave volume accordingly might help.", + brick=w[0]['dir'], + timeout=conn_timeout)) + errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) ret = nwait(cpid) if ret is None: - self.set_state(self.ST_STABLE, w) - #If worker dies, agent terminates on EOF. - #So lets wait for agent first. - nwait(apid) ret = nwait(cpid) if exit_signalled(ret): ret = 0 else: ret = exit_status(ret) if ret in (0, 1): - self.set_state(self.ST_FAULTY, w) + self.status[w[0]['dir']].set_worker_status(self.ST_FAULTY) + gf_event(EVENT_GEOREP_FAULTY, + master_volume=master.volume, + master_node=w[0]['host'], + master_node_id=w[0]['uuid'], + slave_host=slave_host, + slave_volume=slave_vol, + current_slave_host=current_slave_host, + brick_path=w[0]['dir']) time.sleep(10) - self.set_state(self.ST_INCON, w) + self.status[w[0]['dir']].set_worker_status(self.ST_INCON) return ret - def multiplex(self, wspx, suuid, slave_vol, slave_host): - argv = sys.argv[:] - for o in ('-N', '--no-daemon', '--monitor'): - while o in argv: - argv.remove(o) - argv.extend(('-N', '-p', '', '--slave-id', suuid)) - argv.insert(0, os.path.basename(sys.executable)) + def multiplex(self, wspx, suuid, slave_vol, slave_host, master, slavenodes): + argv = [os.path.basename(sys.executable), sys.argv[0]] cpids = set() - agents = set() ta = [] for wx in wspx: def wmon(w): - cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, - slave_host) + cpid, _ = self.monitor(w, argv, cpids, slave_vol, + slave_host, master, suuid, slavenodes) time.sleep(1) self.lock.acquire() for cpid in cpids: - os.kill(cpid, signal.SIGKILL) - for apid in agents: - os.kill(apid, signal.SIGKILL) + errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) self.lock.release() finalize(exval=1) t = Thread(target=wmon, args=[wx]) t.start() ta.append(t) + + # monitor status was being updated in each monitor thread. It + # should not be done as it can cause deadlock for a worker start. + # set_monitor_status uses flock to synchronize multple instances + # updating the file. Since each monitor thread forks worker, + # these processes can hold the reference to fd of status + # file causing deadlock to workers which starts later as flock + # will not be release until all references to same fd is closed. + # It will also cause fd leaks. + + self.lock.acquire() + set_monitor_status(gconf.get("state-file"), self.ST_STARTED) + self.lock.release() for t in ta: t.join() -def distribute(*resources): - master, slave = resources - mvol = Volinfo(master.volume, master.host) +def distribute(master, slave): + if rconf.args.use_gconf_volinfo: + mvol = VolinfoFromGconf(master.volume, master=True) + else: + mvol = Volinfo(master.volume, master.host, master=True) logging.debug('master bricks: ' + repr(mvol.bricks)) prelude = [] - si = slave slave_host = None slave_vol = None - if isinstance(slave, SSH): - prelude = gconf.ssh_command.split() + [slave.remote_addr] - si = slave.inner_rsc - logging.debug('slave SSH gateway: ' + slave.remote_addr) - if isinstance(si, FILE): - sbricks = {'host': 'localhost', 'dir': si.path} - suuid = uuid.uuid5(uuid.NAMESPACE_URL, slave.get_url(canonical=True)) - elif isinstance(si, GLUSTER): - svol = Volinfo(si.volume, slave.remote_addr.split('@')[-1]) - sbricks = svol.bricks - suuid = svol.uuid - slave_host = slave.remote_addr.split('@')[-1] - slave_vol = si.volume - else: - raise GsyncdError("unkown slave type " + slave.url) - logging.info('slave bricks: ' + repr(sbricks)) - if isinstance(si, FILE): - slaves = [slave.url] - else: - slavenodes = set(b['host'] for b in sbricks) - if isinstance(slave, SSH) and not gconf.isolated_slave: - rap = SSH.parse_ssh_address(slave) - slaves = ['ssh://' + rap['user'] + '@' + h + ':' + si.url - for h in slavenodes] - else: - slavevols = [h + ':' + si.volume for h in slavenodes] - if isinstance(slave, SSH): - slaves = ['ssh://' + rap.remote_addr + ':' + v - for v in slavevols] - else: - slaves = slavevols - - workerspex = [(brick['dir'], slaves[idx % len(slaves)]) - for idx, brick in enumerate(mvol.bricks) - if is_host_local(brick['host'])] - logging.info('worker specs: ' + repr(workerspex)) - return workerspex, suuid, slave_vol, slave_host + prelude = [gconf.get("ssh-command")] + \ + gconf.get("ssh-options").split() + \ + ["-p", str(gconf.get("ssh-port"))] + \ + [slave.remote_addr] + logging.debug('slave SSH gateway: ' + slave.remote_addr) -def monitor(*resources): + if rconf.args.use_gconf_volinfo: + svol = VolinfoFromGconf(slave.volume, master=False) + else: + svol = Volinfo(slave.volume, "localhost", prelude, master=False) + + sbricks = svol.bricks + suuid = svol.uuid + slave_host = slave.remote_addr.split('@')[-1] + slave_vol = slave.volume + + # save this xattr for the session delete command + old_stime_xattr_prefix = gconf.get("stime-xattr-prefix", None) + new_stime_xattr_prefix = "trusted.glusterfs." + mvol.uuid + "." + \ + svol.uuid + if not old_stime_xattr_prefix or \ + old_stime_xattr_prefix != new_stime_xattr_prefix: + gconf.setconfig("stime-xattr-prefix", new_stime_xattr_prefix) + + logging.debug('slave bricks: ' + repr(sbricks)) + + slavenodes = set((b['host'], b["uuid"]) for b in sbricks) + rap = SSH.parse_ssh_address(slave) + slaves = [(rap['user'] + '@' + h[0], h[1]) for h in slavenodes] + + workerspex = [] + for idx, brick in enumerate(mvol.bricks): + if rconf.args.local_node_id == brick['uuid']: + is_hot = mvol.is_hot(":".join([brick['host'], brick['dir']])) + workerspex.append((brick, + slaves[idx % len(slaves)], + get_subvol_num(idx, mvol, is_hot), + is_hot)) + logging.debug('worker specs: ' + repr(workerspex)) + return workerspex, suuid, slave_vol, slave_host, master, slavenodes + + +def monitor(local, remote): # Check if gsyncd restarted in pause state. If # yes, send SIGSTOP to negative of monitor pid # to go back to pause state. - if gconf.pause_on_start: - os.kill(-os.getpid(), signal.SIGSTOP) + if rconf.args.pause_on_start: + errno_wrap(os.kill, [-os.getpid(), signal.SIGSTOP], [ESRCH]) """oh yeah, actually Monitor is used as singleton, too""" - return Monitor().multiplex(*distribute(*resources)) + return Monitor().multiplex(*distribute(local, remote)) + + +def startup(go_daemon=True): + """set up logging, pidfile grabbing, daemonization""" + pid_file = gconf.get("pid-file") + if not grabpidfile(): + sys.stderr.write("pidfile is taken, exiting.\n") + sys.exit(2) + rconf.pid_file_owned = True + + if not go_daemon: + return + + x, y = pipe() + cpid = os.fork() + if cpid: + os.close(x) + sys.exit() + os.close(y) + os.setsid() + dn = os.open(os.devnull, os.O_RDWR) + for f in (sys.stdin, sys.stdout, sys.stderr): + os.dup2(dn, f.fileno()) + + if not grabpidfile(pid_file + '.tmp'): + raise GsyncdError("cannot grab temporary pidfile") + + os.rename(pid_file + '.tmp', pid_file) + + # wait for parent to terminate + # so we can start up with + # no messing from the dirty + # ol' bustard + select((x,), (), ()) + os.close(x) |
