summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--geo-replication/syncdaemon/monitor.py69
1 files changed, 64 insertions, 5 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index dbe9c0b0d40..3e0360332bd 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -18,12 +18,44 @@ import xml.etree.ElementTree as XET
from subprocess import PIPE
from resource import Popen, FILE, GLUSTER, SSH
from threading import Lock
+import re
+import random
from gconf import gconf
from syncdutils import update_file, select, waitpid
from syncdutils import set_term_handler, is_host_local, GsyncdError
from syncdutils import escape, Thread, finalize, memoize
+ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
+
+
+def get_slave_bricks_status(host, vol):
+ po = Popen(['gluster', '--xml', '--remote-host=' + host,
+ 'volume', 'status', vol, "detail"],
+ stdout=PIPE, stderr=PIPE)
+ vix = po.stdout.read()
+ po.wait()
+ po.terminate_geterr()
+ vi = XET.fromstring(vix)
+ if vi.find('opRet').text != '0':
+ logging.info("Unable to get list of up nodes of %s, "
+ "returning empty list: %s" %
+ (vol, vi.find('opErrstr').text))
+ return []
+
+ up_hosts = set()
+
+ try:
+ for el in vi.findall('volStatus/volumes/volume/node'):
+ if el.find('status').text == '1':
+ up_hosts.add(el.find('hostname').text)
+ except (ParseError, AttributeError, ValueError) as e:
+ logging.info("Parsing failed to get list of up nodes of %s, "
+ "returning empty list: %s" % (vol, e))
+
+ return list(up_hosts)
+
+
class Volinfo(object):
def __init__(self, vol, host='localhost', prelude=[]):
@@ -117,7 +149,7 @@ class Monitor(object):
# give a chance to graceful exit
os.kill(-os.getpid(), signal.SIGTERM)
- def monitor(self, w, argv, cpids, agents):
+ def monitor(self, w, argv, cpids, agents, slave_vol, slave_host):
"""the monitor loop
Basic logic is a blantantly simple blunt heuristics:
@@ -155,8 +187,28 @@ class Monitor(object):
if os.WIFEXITED(s):
return os.WEXITSTATUS(s)
return 1
+
conn_timeout = int(gconf.connection_timeout)
while ret in (0, 1):
+ remote_host = w[1]
+ # Check the status of the connected slave node
+ # If the connected slave node is down then try to connect to
+ # different up node.
+ m = re.match("(ssh|gluster|file):\/\/(.+)@([^:]+):(.+)",
+ remote_host)
+ if m:
+ current_slave_host = m.group(3)
+ slave_up_hosts = get_slave_bricks_status(
+ slave_host, slave_vol)
+
+ if current_slave_host not in slave_up_hosts:
+ if len(slave_up_hosts) > 0:
+ remote_host = "%s://%s@%s:%s" % (m.group(1),
+ m.group(2),
+ random.choice(
+ slave_up_hosts),
+ m.group(4))
+
# Spawn the worker and agent in lock to avoid fd leak
self.lock.acquire()
@@ -190,7 +242,8 @@ class Monitor(object):
'--rpc-fd',
','.join([str(rw), str(ww),
str(ra), str(wa)]),
- '--resource-remote', w[1]])
+ '--resource-remote',
+ remote_host])
cpids.add(cpid)
agents.add(apid)
@@ -245,7 +298,7 @@ class Monitor(object):
self.set_state(self.ST_INCON, w)
return ret
- def multiplex(self, wspx, suuid):
+ def multiplex(self, wspx, suuid, slave_vol, slave_host):
argv = sys.argv[:]
for o in ('-N', '--no-daemon', '--monitor'):
while o in argv:
@@ -258,7 +311,8 @@ class Monitor(object):
ta = []
for wx in wspx:
def wmon(w):
- cpid, _ = self.monitor(w, argv, cpids, agents)
+ cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol,
+ slave_host)
time.sleep(1)
self.lock.acquire()
for cpid in cpids:
@@ -280,6 +334,9 @@ def distribute(*resources):
logging.debug('master bricks: ' + repr(mvol.bricks))
prelude = []
si = slave
+ slave_host = None
+ slave_vol = None
+
if isinstance(slave, SSH):
prelude = gconf.ssh_command.split() + [slave.remote_addr]
si = slave.inner_rsc
@@ -291,6 +348,8 @@ def distribute(*resources):
svol = Volinfo(si.volume, slave.remote_addr.split('@')[-1])
sbricks = svol.bricks
suuid = svol.uuid
+ slave_host = slave.remote_addr.split('@')[-1]
+ slave_vol = si.volume
else:
raise GsyncdError("unkown slave type " + slave.url)
logging.info('slave bricks: ' + repr(sbricks))
@@ -314,7 +373,7 @@ def distribute(*resources):
for idx, brick in enumerate(mvol.bricks)
if is_host_local(brick['host'])]
logging.info('worker specs: ' + repr(workerspex))
- return workerspex, suuid
+ return workerspex, suuid, slave_vol, slave_host
def monitor(*resources):