diff options
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 194 |
1 files changed, 59 insertions, 135 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index a193b57caff..6aa7b9dfc99 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -14,20 +14,22 @@ import time import signal import logging import xml.etree.ElementTree as XET -from subprocess import PIPE from threading import Lock from errno import ECHILD, ESRCH import random from resource import SSH import gsyncdconfig as gconf +import libgfchangelog from rconf import rconf -from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile -from syncdutils import set_term_handler, is_host_local, GsyncdError -from syncdutils import Thread, finalize, Popen, Volinfo -from syncdutils import gf_event, EVENT_GEOREP_FAULTY +from syncdutils import (select, waitpid, errno_wrap, lf, grabpidfile, + set_term_handler, GsyncdError, + Thread, finalize, Volinfo, VolinfoFromGconf, + gf_event, EVENT_GEOREP_FAULTY, get_up_nodes, + unshare_propagation_supported) from gsyncdstatus import GeorepStatus, set_monitor_status - +import py2py3 +from py2py3 import pipe ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -36,6 +38,8 @@ def get_subvol_num(brick_idx, vol, hot): tier = vol.is_tier() disperse_count = vol.disperse_count(tier, hot) replica_count = vol.replica_count(tier, hot) + distribute_count = vol.distribution_count(tier, hot) + gconf.setconfig("master-distribution-count", distribute_count) if (tier and not hot): brick_idx = brick_idx - vol.get_hot_bricks_count(tier) @@ -54,43 +58,6 @@ def get_subvol_num(brick_idx, vol, hot): return str(cnt) -def get_slave_bricks_status(host, vol): - po = Popen(['gluster', '--xml', '--remote-host=' + host, - 'volume', 'status', vol, "detail"], - stdout=PIPE, stderr=PIPE) - vix = po.stdout.read() - po.wait() - po.terminate_geterr(fail_on_err=False) - if po.returncode != 0: - logging.info(lf("Volume status command failed, unable to get " - "list of up nodes, returning empty list", - volume=vol, - error=po.returncode)) - return [] - vi = XET.fromstring(vix) - if vi.find('opRet').text != '0': - logging.info(lf("Unable to get list of up nodes, " - "returning empty list", - volume=vol, - error=vi.find('opErrstr').text)) - return [] - - up_hosts = set() - - try: - for el in vi.findall('volStatus/volumes/volume/node'): - if el.find('status').text == '1': - up_hosts.add((el.find('hostname').text, - el.find('peerid').text)) - except (ParseError, AttributeError, ValueError) as e: - logging.info(lf("Parsing failed to get list of up nodes, " - "returning empty list", - volume=vol, - error=e)) - - return list(up_hosts) - - class Monitor(object): """class which spawns and manages gsyncd workers""" @@ -115,8 +82,8 @@ class Monitor(object): # give a chance to graceful exit errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH]) - def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master, - suuid): + def monitor(self, w, argv, cpids, slave_vol, slave_host, master, + suuid, slavenodes): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -143,10 +110,6 @@ class Monitor(object): master, "%s::%s" % (slave_host, slave_vol)) - - set_monitor_status(gconf.get("state-file"), self.ST_STARTED) - self.status[w[0]['dir']].set_worker_status(self.ST_INIT) - ret = 0 def nwait(p, o=0): @@ -164,7 +127,7 @@ class Monitor(object): raise def exit_signalled(s): - """ child teminated due to receipt of SIGUSR1 """ + """ child terminated due to receipt of SIGUSR1 """ return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1)) def exit_status(s): @@ -180,8 +143,7 @@ class Monitor(object): # If the connected slave node is down then try to connect to # different up node. current_slave_host = remote_host - slave_up_hosts = get_slave_bricks_status( - slave_host, slave_vol) + slave_up_hosts = get_up_nodes(slavenodes, gconf.get("ssh-port")) if (current_slave_host, remote_id) not in slave_up_hosts: if len(slave_up_hosts) > 0: @@ -189,51 +151,18 @@ class Monitor(object): remote_host = "%s@%s" % (remote_user, remote_new[0]) remote_id = remote_new[1] - # Spawn the worker and agent in lock to avoid fd leak + # Spawn the worker in lock to avoid fd leak self.lock.acquire() + self.status[w[0]['dir']].set_worker_status(self.ST_INIT) logging.info(lf('starting gsyncd worker', brick=w[0]['dir'], slave_node=remote_host)) - # Couple of pipe pairs for RPC communication b/w - # worker and changelog agent. - - # read/write end for agent - (ra, ww) = os.pipe() - # read/write end for worker - (rw, wa) = os.pipe() - - # spawn the agent process - apid = os.fork() - if apid == 0: - os.close(rw) - os.close(ww) - args_to_agent = argv + [ - 'agent', - rconf.args.master, - rconf.args.slave, - '--local-path', w[0]['dir'], - '--local-node', w[0]['host'], - '--local-node-id', w[0]['uuid'], - '--slave-id', suuid, - '--rpc-fd', ','.join([str(ra), str(wa), str(rw), str(ww)]) - ] - - if rconf.args.config_file is not None: - args_to_agent += ['-c', rconf.args.config_file] - - if rconf.args.debug: - args_to_agent.append("--debug") - - os.execv(sys.executable, args_to_agent) - - pr, pw = os.pipe() + pr, pw = pipe() cpid = os.fork() if cpid == 0: os.close(pr) - os.close(ra) - os.close(wa) args_to_worker = argv + [ 'worker', @@ -244,8 +173,6 @@ class Monitor(object): '--local-node', w[0]['host'], '--local-node-id', w[0]['uuid'], '--slave-id', suuid, - '--rpc-fd', - ','.join([str(rw), str(ww), str(ra), str(wa)]), '--subvol-num', str(w[2]), '--resource-remote', remote_host, '--resource-remote-id', remote_id @@ -260,17 +187,24 @@ class Monitor(object): if rconf.args.debug: args_to_worker.append("--debug") - os.execv(sys.executable, args_to_worker) + access_mount = gconf.get("access-mount") + if access_mount: + os.execv(sys.executable, args_to_worker) + else: + if unshare_propagation_supported(): + logging.debug("Worker would mount volume privately") + unshare_cmd = ['unshare', '-m', '--propagation', + 'private'] + cmd = unshare_cmd + args_to_worker + os.execvp("unshare", cmd) + else: + logging.debug("Mount is not private. It would be lazy" + " umounted") + os.execv(sys.executable, args_to_worker) cpids.add(cpid) - agents.add(apid) os.close(pw) - # close all RPC pipes in monitor - os.close(ra) - os.close(wa) - os.close(rw) - os.close(ww) self.lock.release() t0 = time.time() @@ -279,42 +213,19 @@ class Monitor(object): if so: ret = nwait(cpid, os.WNOHANG) - ret_agent = nwait(apid, os.WNOHANG) - - if ret_agent is not None: - # Agent is died Kill Worker - logging.info(lf("Changelog Agent died, Aborting Worker", - brick=w[0]['dir'])) - errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - nwait(cpid) - nwait(apid) if ret is not None: logging.info(lf("worker died before establishing " "connection", brick=w[0]['dir'])) - nwait(apid) # wait for agent else: logging.debug("worker(%s) connected" % w[0]['dir']) while time.time() < t0 + conn_timeout: ret = nwait(cpid, os.WNOHANG) - ret_agent = nwait(apid, os.WNOHANG) if ret is not None: logging.info(lf("worker died in startup phase", brick=w[0]['dir'])) - nwait(apid) # wait for agent - break - - if ret_agent is not None: - # Agent is died Kill Worker - logging.info(lf("Changelog Agent died, Aborting " - "Worker", - brick=w[0]['dir'])) - errno_wrap(os.kill, [cpid, signal.SIGKILL], - [ESRCH]) - nwait(cpid) - nwait(apid) break time.sleep(1) @@ -329,12 +240,8 @@ class Monitor(object): brick=w[0]['dir'], timeout=conn_timeout)) errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - nwait(apid) # wait for agent ret = nwait(cpid) if ret is None: - # If worker dies, agent terminates on EOF. - # So lets wait for agent first. - nwait(apid) ret = nwait(cpid) if exit_signalled(ret): ret = 0 @@ -354,33 +261,46 @@ class Monitor(object): self.status[w[0]['dir']].set_worker_status(self.ST_INCON) return ret - def multiplex(self, wspx, suuid, slave_vol, slave_host, master): + def multiplex(self, wspx, suuid, slave_vol, slave_host, master, slavenodes): argv = [os.path.basename(sys.executable), sys.argv[0]] cpids = set() - agents = set() ta = [] for wx in wspx: def wmon(w): - cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, - slave_host, master, suuid) + cpid, _ = self.monitor(w, argv, cpids, slave_vol, + slave_host, master, suuid, slavenodes) time.sleep(1) self.lock.acquire() for cpid in cpids: errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - for apid in agents: - errno_wrap(os.kill, [apid, signal.SIGKILL], [ESRCH]) self.lock.release() finalize(exval=1) t = Thread(target=wmon, args=[wx]) t.start() ta.append(t) + + # monitor status was being updated in each monitor thread. It + # should not be done as it can cause deadlock for a worker start. + # set_monitor_status uses flock to synchronize multple instances + # updating the file. Since each monitor thread forks worker, + # these processes can hold the reference to fd of status + # file causing deadlock to workers which starts later as flock + # will not be release until all references to same fd is closed. + # It will also cause fd leaks. + + self.lock.acquire() + set_monitor_status(gconf.get("state-file"), self.ST_STARTED) + self.lock.release() for t in ta: t.join() def distribute(master, slave): - mvol = Volinfo(master.volume, master.host) + if rconf.args.use_gconf_volinfo: + mvol = VolinfoFromGconf(master.volume, master=True) + else: + mvol = Volinfo(master.volume, master.host, master=True) logging.debug('master bricks: ' + repr(mvol.bricks)) prelude = [] slave_host = None @@ -393,7 +313,11 @@ def distribute(master, slave): logging.debug('slave SSH gateway: ' + slave.remote_addr) - svol = Volinfo(slave.volume, "localhost", prelude) + if rconf.args.use_gconf_volinfo: + svol = VolinfoFromGconf(slave.volume, master=False) + else: + svol = Volinfo(slave.volume, "localhost", prelude, master=False) + sbricks = svol.bricks suuid = svol.uuid slave_host = slave.remote_addr.split('@')[-1] @@ -415,14 +339,14 @@ def distribute(master, slave): workerspex = [] for idx, brick in enumerate(mvol.bricks): - if is_host_local(brick['uuid']): + if rconf.args.local_node_id == brick['uuid']: is_hot = mvol.is_hot(":".join([brick['host'], brick['dir']])) workerspex.append((brick, slaves[idx % len(slaves)], get_subvol_num(idx, mvol, is_hot), is_hot)) logging.debug('worker specs: ' + repr(workerspex)) - return workerspex, suuid, slave_vol, slave_host, master + return workerspex, suuid, slave_vol, slave_host, master, slavenodes def monitor(local, remote): @@ -447,7 +371,7 @@ def startup(go_daemon=True): if not go_daemon: return - x, y = os.pipe() + x, y = pipe() cpid = os.fork() if cpid: os.close(x) |
