summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/monitor.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
-rw-r--r--geo-replication/syncdaemon/monitor.py194
1 files changed, 59 insertions, 135 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index a193b57caff..6aa7b9dfc99 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -14,20 +14,22 @@ import time
import signal
import logging
import xml.etree.ElementTree as XET
-from subprocess import PIPE
from threading import Lock
from errno import ECHILD, ESRCH
import random
from resource import SSH
import gsyncdconfig as gconf
+import libgfchangelog
from rconf import rconf
-from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile
-from syncdutils import set_term_handler, is_host_local, GsyncdError
-from syncdutils import Thread, finalize, Popen, Volinfo
-from syncdutils import gf_event, EVENT_GEOREP_FAULTY
+from syncdutils import (select, waitpid, errno_wrap, lf, grabpidfile,
+ set_term_handler, GsyncdError,
+ Thread, finalize, Volinfo, VolinfoFromGconf,
+ gf_event, EVENT_GEOREP_FAULTY, get_up_nodes,
+ unshare_propagation_supported)
from gsyncdstatus import GeorepStatus, set_monitor_status
-
+import py2py3
+from py2py3 import pipe
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
@@ -36,6 +38,8 @@ def get_subvol_num(brick_idx, vol, hot):
tier = vol.is_tier()
disperse_count = vol.disperse_count(tier, hot)
replica_count = vol.replica_count(tier, hot)
+ distribute_count = vol.distribution_count(tier, hot)
+ gconf.setconfig("master-distribution-count", distribute_count)
if (tier and not hot):
brick_idx = brick_idx - vol.get_hot_bricks_count(tier)
@@ -54,43 +58,6 @@ def get_subvol_num(brick_idx, vol, hot):
return str(cnt)
-def get_slave_bricks_status(host, vol):
- po = Popen(['gluster', '--xml', '--remote-host=' + host,
- 'volume', 'status', vol, "detail"],
- stdout=PIPE, stderr=PIPE)
- vix = po.stdout.read()
- po.wait()
- po.terminate_geterr(fail_on_err=False)
- if po.returncode != 0:
- logging.info(lf("Volume status command failed, unable to get "
- "list of up nodes, returning empty list",
- volume=vol,
- error=po.returncode))
- return []
- vi = XET.fromstring(vix)
- if vi.find('opRet').text != '0':
- logging.info(lf("Unable to get list of up nodes, "
- "returning empty list",
- volume=vol,
- error=vi.find('opErrstr').text))
- return []
-
- up_hosts = set()
-
- try:
- for el in vi.findall('volStatus/volumes/volume/node'):
- if el.find('status').text == '1':
- up_hosts.add((el.find('hostname').text,
- el.find('peerid').text))
- except (ParseError, AttributeError, ValueError) as e:
- logging.info(lf("Parsing failed to get list of up nodes, "
- "returning empty list",
- volume=vol,
- error=e))
-
- return list(up_hosts)
-
-
class Monitor(object):
"""class which spawns and manages gsyncd workers"""
@@ -115,8 +82,8 @@ class Monitor(object):
# give a chance to graceful exit
errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH])
- def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master,
- suuid):
+ def monitor(self, w, argv, cpids, slave_vol, slave_host, master,
+ suuid, slavenodes):
"""the monitor loop
Basic logic is a blantantly simple blunt heuristics:
@@ -143,10 +110,6 @@ class Monitor(object):
master,
"%s::%s" % (slave_host,
slave_vol))
-
- set_monitor_status(gconf.get("state-file"), self.ST_STARTED)
- self.status[w[0]['dir']].set_worker_status(self.ST_INIT)
-
ret = 0
def nwait(p, o=0):
@@ -164,7 +127,7 @@ class Monitor(object):
raise
def exit_signalled(s):
- """ child teminated due to receipt of SIGUSR1 """
+ """ child terminated due to receipt of SIGUSR1 """
return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1))
def exit_status(s):
@@ -180,8 +143,7 @@ class Monitor(object):
# If the connected slave node is down then try to connect to
# different up node.
current_slave_host = remote_host
- slave_up_hosts = get_slave_bricks_status(
- slave_host, slave_vol)
+ slave_up_hosts = get_up_nodes(slavenodes, gconf.get("ssh-port"))
if (current_slave_host, remote_id) not in slave_up_hosts:
if len(slave_up_hosts) > 0:
@@ -189,51 +151,18 @@ class Monitor(object):
remote_host = "%s@%s" % (remote_user, remote_new[0])
remote_id = remote_new[1]
- # Spawn the worker and agent in lock to avoid fd leak
+ # Spawn the worker in lock to avoid fd leak
self.lock.acquire()
+ self.status[w[0]['dir']].set_worker_status(self.ST_INIT)
logging.info(lf('starting gsyncd worker',
brick=w[0]['dir'],
slave_node=remote_host))
- # Couple of pipe pairs for RPC communication b/w
- # worker and changelog agent.
-
- # read/write end for agent
- (ra, ww) = os.pipe()
- # read/write end for worker
- (rw, wa) = os.pipe()
-
- # spawn the agent process
- apid = os.fork()
- if apid == 0:
- os.close(rw)
- os.close(ww)
- args_to_agent = argv + [
- 'agent',
- rconf.args.master,
- rconf.args.slave,
- '--local-path', w[0]['dir'],
- '--local-node', w[0]['host'],
- '--local-node-id', w[0]['uuid'],
- '--slave-id', suuid,
- '--rpc-fd', ','.join([str(ra), str(wa), str(rw), str(ww)])
- ]
-
- if rconf.args.config_file is not None:
- args_to_agent += ['-c', rconf.args.config_file]
-
- if rconf.args.debug:
- args_to_agent.append("--debug")
-
- os.execv(sys.executable, args_to_agent)
-
- pr, pw = os.pipe()
+ pr, pw = pipe()
cpid = os.fork()
if cpid == 0:
os.close(pr)
- os.close(ra)
- os.close(wa)
args_to_worker = argv + [
'worker',
@@ -244,8 +173,6 @@ class Monitor(object):
'--local-node', w[0]['host'],
'--local-node-id', w[0]['uuid'],
'--slave-id', suuid,
- '--rpc-fd',
- ','.join([str(rw), str(ww), str(ra), str(wa)]),
'--subvol-num', str(w[2]),
'--resource-remote', remote_host,
'--resource-remote-id', remote_id
@@ -260,17 +187,24 @@ class Monitor(object):
if rconf.args.debug:
args_to_worker.append("--debug")
- os.execv(sys.executable, args_to_worker)
+ access_mount = gconf.get("access-mount")
+ if access_mount:
+ os.execv(sys.executable, args_to_worker)
+ else:
+ if unshare_propagation_supported():
+ logging.debug("Worker would mount volume privately")
+ unshare_cmd = ['unshare', '-m', '--propagation',
+ 'private']
+ cmd = unshare_cmd + args_to_worker
+ os.execvp("unshare", cmd)
+ else:
+ logging.debug("Mount is not private. It would be lazy"
+ " umounted")
+ os.execv(sys.executable, args_to_worker)
cpids.add(cpid)
- agents.add(apid)
os.close(pw)
- # close all RPC pipes in monitor
- os.close(ra)
- os.close(wa)
- os.close(rw)
- os.close(ww)
self.lock.release()
t0 = time.time()
@@ -279,42 +213,19 @@ class Monitor(object):
if so:
ret = nwait(cpid, os.WNOHANG)
- ret_agent = nwait(apid, os.WNOHANG)
-
- if ret_agent is not None:
- # Agent is died Kill Worker
- logging.info(lf("Changelog Agent died, Aborting Worker",
- brick=w[0]['dir']))
- errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
- nwait(cpid)
- nwait(apid)
if ret is not None:
logging.info(lf("worker died before establishing "
"connection",
brick=w[0]['dir']))
- nwait(apid) # wait for agent
else:
logging.debug("worker(%s) connected" % w[0]['dir'])
while time.time() < t0 + conn_timeout:
ret = nwait(cpid, os.WNOHANG)
- ret_agent = nwait(apid, os.WNOHANG)
if ret is not None:
logging.info(lf("worker died in startup phase",
brick=w[0]['dir']))
- nwait(apid) # wait for agent
- break
-
- if ret_agent is not None:
- # Agent is died Kill Worker
- logging.info(lf("Changelog Agent died, Aborting "
- "Worker",
- brick=w[0]['dir']))
- errno_wrap(os.kill, [cpid, signal.SIGKILL],
- [ESRCH])
- nwait(cpid)
- nwait(apid)
break
time.sleep(1)
@@ -329,12 +240,8 @@ class Monitor(object):
brick=w[0]['dir'],
timeout=conn_timeout))
errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
- nwait(apid) # wait for agent
ret = nwait(cpid)
if ret is None:
- # If worker dies, agent terminates on EOF.
- # So lets wait for agent first.
- nwait(apid)
ret = nwait(cpid)
if exit_signalled(ret):
ret = 0
@@ -354,33 +261,46 @@ class Monitor(object):
self.status[w[0]['dir']].set_worker_status(self.ST_INCON)
return ret
- def multiplex(self, wspx, suuid, slave_vol, slave_host, master):
+ def multiplex(self, wspx, suuid, slave_vol, slave_host, master, slavenodes):
argv = [os.path.basename(sys.executable), sys.argv[0]]
cpids = set()
- agents = set()
ta = []
for wx in wspx:
def wmon(w):
- cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol,
- slave_host, master, suuid)
+ cpid, _ = self.monitor(w, argv, cpids, slave_vol,
+ slave_host, master, suuid, slavenodes)
time.sleep(1)
self.lock.acquire()
for cpid in cpids:
errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
- for apid in agents:
- errno_wrap(os.kill, [apid, signal.SIGKILL], [ESRCH])
self.lock.release()
finalize(exval=1)
t = Thread(target=wmon, args=[wx])
t.start()
ta.append(t)
+
+ # monitor status was being updated in each monitor thread. It
+ # should not be done as it can cause deadlock for a worker start.
+ # set_monitor_status uses flock to synchronize multple instances
+ # updating the file. Since each monitor thread forks worker,
+ # these processes can hold the reference to fd of status
+ # file causing deadlock to workers which starts later as flock
+ # will not be release until all references to same fd is closed.
+ # It will also cause fd leaks.
+
+ self.lock.acquire()
+ set_monitor_status(gconf.get("state-file"), self.ST_STARTED)
+ self.lock.release()
for t in ta:
t.join()
def distribute(master, slave):
- mvol = Volinfo(master.volume, master.host)
+ if rconf.args.use_gconf_volinfo:
+ mvol = VolinfoFromGconf(master.volume, master=True)
+ else:
+ mvol = Volinfo(master.volume, master.host, master=True)
logging.debug('master bricks: ' + repr(mvol.bricks))
prelude = []
slave_host = None
@@ -393,7 +313,11 @@ def distribute(master, slave):
logging.debug('slave SSH gateway: ' + slave.remote_addr)
- svol = Volinfo(slave.volume, "localhost", prelude)
+ if rconf.args.use_gconf_volinfo:
+ svol = VolinfoFromGconf(slave.volume, master=False)
+ else:
+ svol = Volinfo(slave.volume, "localhost", prelude, master=False)
+
sbricks = svol.bricks
suuid = svol.uuid
slave_host = slave.remote_addr.split('@')[-1]
@@ -415,14 +339,14 @@ def distribute(master, slave):
workerspex = []
for idx, brick in enumerate(mvol.bricks):
- if is_host_local(brick['uuid']):
+ if rconf.args.local_node_id == brick['uuid']:
is_hot = mvol.is_hot(":".join([brick['host'], brick['dir']]))
workerspex.append((brick,
slaves[idx % len(slaves)],
get_subvol_num(idx, mvol, is_hot),
is_hot))
logging.debug('worker specs: ' + repr(workerspex))
- return workerspex, suuid, slave_vol, slave_host, master
+ return workerspex, suuid, slave_vol, slave_host, master, slavenodes
def monitor(local, remote):
@@ -447,7 +371,7 @@ def startup(go_daemon=True):
if not go_daemon:
return
- x, y = os.pipe()
+ x, y = pipe()
cpid = os.fork()
if cpid:
os.close(x)