summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/monitor.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
-rw-r--r--geo-replication/syncdaemon/monitor.py240
1 files changed, 138 insertions, 102 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index c6fa1076a85..a193b57caff 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -13,21 +13,19 @@ import sys
import time
import signal
import logging
-import uuid
import xml.etree.ElementTree as XET
from subprocess import PIPE
-from resource import FILE, GLUSTER, SSH
from threading import Lock
from errno import ECHILD, ESRCH
-import re
import random
-from gconf import gconf
-from syncdutils import select, waitpid, errno_wrap, lf
+
+from resource import SSH
+import gsyncdconfig as gconf
+from rconf import rconf
+from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile
from syncdutils import set_term_handler, is_host_local, GsyncdError
-from syncdutils import escape, Thread, finalize
+from syncdutils import Thread, finalize, Popen, Volinfo
from syncdutils import gf_event, EVENT_GEOREP_FAULTY
-from syncdutils import Volinfo, Popen
-
from gsyncdstatus import GeorepStatus, set_monitor_status
@@ -82,7 +80,8 @@ def get_slave_bricks_status(host, vol):
try:
for el in vi.findall('volStatus/volumes/volume/node'):
if el.find('status').text == '1':
- up_hosts.add(el.find('hostname').text)
+ up_hosts.add((el.find('hostname').text,
+ el.find('peerid').text))
except (ParseError, AttributeError, ValueError) as e:
logging.info(lf("Parsing failed to get list of up nodes, "
"returning empty list",
@@ -116,7 +115,8 @@ class Monitor(object):
# give a chance to graceful exit
errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH])
- def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master):
+ def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master,
+ suuid):
"""the monitor loop
Basic logic is a blantantly simple blunt heuristics:
@@ -136,7 +136,7 @@ class Monitor(object):
due to the keep-alive thread)
"""
if not self.status.get(w[0]['dir'], None):
- self.status[w[0]['dir']] = GeorepStatus(gconf.state_file,
+ self.status[w[0]['dir']] = GeorepStatus(gconf.get("state-file"),
w[0]['host'],
w[0]['dir'],
w[0]['uuid'],
@@ -144,7 +144,7 @@ class Monitor(object):
"%s::%s" % (slave_host,
slave_vol))
- set_monitor_status(gconf.state_file, self.ST_STARTED)
+ set_monitor_status(gconf.get("state-file"), self.ST_STARTED)
self.status[w[0]['dir']].set_worker_status(self.ST_INIT)
ret = 0
@@ -172,26 +172,22 @@ class Monitor(object):
return os.WEXITSTATUS(s)
return 1
- conn_timeout = int(gconf.connection_timeout)
+ conn_timeout = gconf.get("connection-timeout")
while ret in (0, 1):
- remote_host = w[1]
+ remote_user, remote_host = w[1][0].split("@")
+ remote_id = w[1][1]
# Check the status of the connected slave node
# If the connected slave node is down then try to connect to
# different up node.
- m = re.match("(ssh|gluster|file):\/\/(.+)@([^:]+):(.+)",
- remote_host)
- if m:
- current_slave_host = m.group(3)
- slave_up_hosts = get_slave_bricks_status(
- slave_host, slave_vol)
-
- if current_slave_host not in slave_up_hosts:
- if len(slave_up_hosts) > 0:
- remote_host = "%s://%s@%s:%s" % (m.group(1),
- m.group(2),
- random.choice(
- slave_up_hosts),
- m.group(4))
+ current_slave_host = remote_host
+ slave_up_hosts = get_slave_bricks_status(
+ slave_host, slave_vol)
+
+ if (current_slave_host, remote_id) not in slave_up_hosts:
+ if len(slave_up_hosts) > 0:
+ remote_new = random.choice(slave_up_hosts)
+ remote_host = "%s@%s" % (remote_user, remote_new[0])
+ remote_id = remote_new[1]
# Spawn the worker and agent in lock to avoid fd leak
self.lock.acquire()
@@ -213,33 +209,58 @@ class Monitor(object):
if apid == 0:
os.close(rw)
os.close(ww)
- os.execv(sys.executable, argv + ['--local-path', w[0]['dir'],
- '--local-node', w[0]['host'],
- '--local-node-id',
- w[0]['uuid'],
- '--agent',
- '--rpc-fd',
- ','.join([str(ra), str(wa),
- str(rw), str(ww)])])
+ args_to_agent = argv + [
+ 'agent',
+ rconf.args.master,
+ rconf.args.slave,
+ '--local-path', w[0]['dir'],
+ '--local-node', w[0]['host'],
+ '--local-node-id', w[0]['uuid'],
+ '--slave-id', suuid,
+ '--rpc-fd', ','.join([str(ra), str(wa), str(rw), str(ww)])
+ ]
+
+ if rconf.args.config_file is not None:
+ args_to_agent += ['-c', rconf.args.config_file]
+
+ if rconf.args.debug:
+ args_to_agent.append("--debug")
+
+ os.execv(sys.executable, args_to_agent)
+
pr, pw = os.pipe()
cpid = os.fork()
if cpid == 0:
os.close(pr)
os.close(ra)
os.close(wa)
- os.execv(sys.executable, argv + ['--feedback-fd', str(pw),
- '--local-path', w[0]['dir'],
- '--local-node', w[0]['host'],
- '--local-node-id',
- w[0]['uuid'],
- '--local-id',
- '.' + escape(w[0]['dir']),
- '--rpc-fd',
- ','.join([str(rw), str(ww),
- str(ra), str(wa)]),
- '--subvol-num', str(w[2])] +
- (['--is-hottier'] if w[3] else []) +
- ['--resource-remote', remote_host])
+
+ args_to_worker = argv + [
+ 'worker',
+ rconf.args.master,
+ rconf.args.slave,
+ '--feedback-fd', str(pw),
+ '--local-path', w[0]['dir'],
+ '--local-node', w[0]['host'],
+ '--local-node-id', w[0]['uuid'],
+ '--slave-id', suuid,
+ '--rpc-fd',
+ ','.join([str(rw), str(ww), str(ra), str(wa)]),
+ '--subvol-num', str(w[2]),
+ '--resource-remote', remote_host,
+ '--resource-remote-id', remote_id
+ ]
+
+ if rconf.args.config_file is not None:
+ args_to_worker += ['-c', rconf.args.config_file]
+
+ if w[3]:
+ args_to_worker.append("--is-hottier")
+
+ if rconf.args.debug:
+ args_to_worker.append("--debug")
+
+ os.execv(sys.executable, args_to_worker)
cpids.add(cpid)
agents.add(apid)
@@ -290,7 +311,8 @@ class Monitor(object):
logging.info(lf("Changelog Agent died, Aborting "
"Worker",
brick=w[0]['dir']))
- errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
+ errno_wrap(os.kill, [cpid, signal.SIGKILL],
+ [ESRCH])
nwait(cpid)
nwait(apid)
break
@@ -333,12 +355,7 @@ class Monitor(object):
return ret
def multiplex(self, wspx, suuid, slave_vol, slave_host, master):
- argv = sys.argv[:]
- for o in ('-N', '--no-daemon', '--monitor'):
- while o in argv:
- argv.remove(o)
- argv.extend(('-N', '-p', '', '--slave-id', suuid))
- argv.insert(0, os.path.basename(sys.executable))
+ argv = [os.path.basename(sys.executable), sys.argv[0]]
cpids = set()
agents = set()
@@ -346,7 +363,7 @@ class Monitor(object):
for wx in wspx:
def wmon(w):
cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol,
- slave_host, master)
+ slave_host, master, suuid)
time.sleep(1)
self.lock.acquire()
for cpid in cpids:
@@ -362,55 +379,39 @@ class Monitor(object):
t.join()
-def distribute(*resources):
- master, slave = resources
+def distribute(master, slave):
mvol = Volinfo(master.volume, master.host)
logging.debug('master bricks: ' + repr(mvol.bricks))
prelude = []
- si = slave
slave_host = None
slave_vol = None
- if isinstance(slave, SSH):
- prelude = gconf.ssh_command.split() + [slave.remote_addr]
- si = slave.inner_rsc
- logging.debug('slave SSH gateway: ' + slave.remote_addr)
- if isinstance(si, FILE):
- sbricks = {'host': 'localhost', 'dir': si.path}
- suuid = uuid.uuid5(uuid.NAMESPACE_URL, slave.get_url(canonical=True))
- elif isinstance(si, GLUSTER):
- svol = Volinfo(si.volume, slave.remote_addr.split('@')[-1])
- sbricks = svol.bricks
- suuid = svol.uuid
- slave_host = slave.remote_addr.split('@')[-1]
- slave_vol = si.volume
-
- # save this xattr for the session delete command
- old_stime_xattr_name = getattr(gconf, "master.stime_xattr_name", None)
- new_stime_xattr_name = "trusted.glusterfs." + mvol.uuid + "." + \
- svol.uuid + ".stime"
- if not old_stime_xattr_name or \
- old_stime_xattr_name != new_stime_xattr_name:
- gconf.configinterface.set("master.stime_xattr_name",
- new_stime_xattr_name)
- else:
- raise GsyncdError("unknown slave type " + slave.url)
+ prelude = [gconf.get("ssh-command")] + \
+ gconf.get("ssh-options").split() + \
+ ["-p", str(gconf.get("ssh-port"))] + \
+ [slave.remote_addr]
+
+ logging.debug('slave SSH gateway: ' + slave.remote_addr)
+
+ svol = Volinfo(slave.volume, "localhost", prelude)
+ sbricks = svol.bricks
+ suuid = svol.uuid
+ slave_host = slave.remote_addr.split('@')[-1]
+ slave_vol = slave.volume
+
+ # save this xattr for the session delete command
+ old_stime_xattr_prefix = gconf.get("stime-xattr-prefix", None)
+ new_stime_xattr_prefix = "trusted.glusterfs." + mvol.uuid + "." + \
+ svol.uuid
+ if not old_stime_xattr_prefix or \
+ old_stime_xattr_prefix != new_stime_xattr_prefix:
+ gconf.setconfig("stime-xattr-prefix", new_stime_xattr_prefix)
+
logging.debug('slave bricks: ' + repr(sbricks))
- if isinstance(si, FILE):
- slaves = [slave.url]
- else:
- slavenodes = set(b['host'] for b in sbricks)
- if isinstance(slave, SSH) and not gconf.isolated_slave:
- rap = SSH.parse_ssh_address(slave)
- slaves = ['ssh://' + rap['user'] + '@' + h + ':' + si.url
- for h in slavenodes]
- else:
- slavevols = [h + ':' + si.volume for h in slavenodes]
- if isinstance(slave, SSH):
- slaves = ['ssh://' + rap.remote_addr + ':' + v
- for v in slavevols]
- else:
- slaves = slavevols
+
+ slavenodes = set((b['host'], b["uuid"]) for b in sbricks)
+ rap = SSH.parse_ssh_address(slave)
+ slaves = [(rap['user'] + '@' + h[0], h[1]) for h in slavenodes]
workerspex = []
for idx, brick in enumerate(mvol.bricks):
@@ -424,12 +425,47 @@ def distribute(*resources):
return workerspex, suuid, slave_vol, slave_host, master
-def monitor(*resources):
+def monitor(local, remote):
# Check if gsyncd restarted in pause state. If
# yes, send SIGSTOP to negative of monitor pid
# to go back to pause state.
- if gconf.pause_on_start:
+ if rconf.args.pause_on_start:
errno_wrap(os.kill, [-os.getpid(), signal.SIGSTOP], [ESRCH])
"""oh yeah, actually Monitor is used as singleton, too"""
- return Monitor().multiplex(*distribute(*resources))
+ return Monitor().multiplex(*distribute(local, remote))
+
+
+def startup(go_daemon=True):
+ """set up logging, pidfile grabbing, daemonization"""
+ pid_file = gconf.get("pid-file")
+ if not grabpidfile():
+ sys.stderr.write("pidfile is taken, exiting.\n")
+ sys.exit(2)
+ rconf.pid_file_owned = True
+
+ if not go_daemon:
+ return
+
+ x, y = os.pipe()
+ cpid = os.fork()
+ if cpid:
+ os.close(x)
+ sys.exit()
+ os.close(y)
+ os.setsid()
+ dn = os.open(os.devnull, os.O_RDWR)
+ for f in (sys.stdin, sys.stdout, sys.stderr):
+ os.dup2(dn, f.fileno())
+
+ if not grabpidfile(pid_file + '.tmp'):
+ raise GsyncdError("cannot grab temporary pidfile")
+
+ os.rename(pid_file + '.tmp', pid_file)
+
+ # wait for parent to terminate
+ # so we can start up with
+ # no messing from the dirty
+ # ol' bustard
+ select((x,), (), ())
+ os.close(x)