summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--geo-replication/gsyncd.conf.in21
-rw-r--r--geo-replication/syncdaemon/gsyncd.py1
-rw-r--r--geo-replication/syncdaemon/monitor.py68
-rw-r--r--geo-replication/syncdaemon/syncdutils.py88
4 files changed, 128 insertions, 50 deletions
diff --git a/geo-replication/gsyncd.conf.in b/geo-replication/gsyncd.conf.in
index 53cc76b842a..80a9e4a8e8b 100644
--- a/geo-replication/gsyncd.conf.in
+++ b/geo-replication/gsyncd.conf.in
@@ -1,6 +1,26 @@
[__meta__]
version = 4.0
+[master-bricks]
+configurable=false
+
+[slave-bricks]
+configurable=false
+
+[master-volume-id]
+configurable=false
+
+[slave-volume-id]
+configurable=false
+
+[master-replica-count]
+configurable=false
+type=int
+
+[master-disperse_count]
+configurable=false
+type=int
+
[glusterd-workdir]
value = @GLUSTERD_WORKDIR@
@@ -234,6 +254,7 @@ allowed_values=ERROR,INFO,WARNING,DEBUG
value=22
validation=int
help=Set SSH port
+type=int
[ssh-command]
value=ssh
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index 04ceb435bf7..3458898646e 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -56,6 +56,7 @@ def main():
help="Start with Paused state")
p.add_argument("--local-node-id", help="Local Node ID")
p.add_argument("--debug", action="store_true")
+ p.add_argument("--use-gconf-volinfo", action="store_true")
# Worker
p = sp.add_parser("worker")
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index a193b57caff..257d34a743b 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -14,7 +14,6 @@ import time
import signal
import logging
import xml.etree.ElementTree as XET
-from subprocess import PIPE
from threading import Lock
from errno import ECHILD, ESRCH
import random
@@ -23,9 +22,9 @@ from resource import SSH
import gsyncdconfig as gconf
from rconf import rconf
from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile
-from syncdutils import set_term_handler, is_host_local, GsyncdError
-from syncdutils import Thread, finalize, Popen, Volinfo
-from syncdutils import gf_event, EVENT_GEOREP_FAULTY
+from syncdutils import set_term_handler, GsyncdError
+from syncdutils import Thread, finalize, Volinfo, VolinfoFromGconf
+from syncdutils import gf_event, EVENT_GEOREP_FAULTY, get_up_nodes
from gsyncdstatus import GeorepStatus, set_monitor_status
@@ -54,43 +53,6 @@ def get_subvol_num(brick_idx, vol, hot):
return str(cnt)
-def get_slave_bricks_status(host, vol):
- po = Popen(['gluster', '--xml', '--remote-host=' + host,
- 'volume', 'status', vol, "detail"],
- stdout=PIPE, stderr=PIPE)
- vix = po.stdout.read()
- po.wait()
- po.terminate_geterr(fail_on_err=False)
- if po.returncode != 0:
- logging.info(lf("Volume status command failed, unable to get "
- "list of up nodes, returning empty list",
- volume=vol,
- error=po.returncode))
- return []
- vi = XET.fromstring(vix)
- if vi.find('opRet').text != '0':
- logging.info(lf("Unable to get list of up nodes, "
- "returning empty list",
- volume=vol,
- error=vi.find('opErrstr').text))
- return []
-
- up_hosts = set()
-
- try:
- for el in vi.findall('volStatus/volumes/volume/node'):
- if el.find('status').text == '1':
- up_hosts.add((el.find('hostname').text,
- el.find('peerid').text))
- except (ParseError, AttributeError, ValueError) as e:
- logging.info(lf("Parsing failed to get list of up nodes, "
- "returning empty list",
- volume=vol,
- error=e))
-
- return list(up_hosts)
-
-
class Monitor(object):
"""class which spawns and manages gsyncd workers"""
@@ -116,7 +78,7 @@ class Monitor(object):
errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH])
def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master,
- suuid):
+ suuid, slavenodes):
"""the monitor loop
Basic logic is a blantantly simple blunt heuristics:
@@ -180,8 +142,7 @@ class Monitor(object):
# If the connected slave node is down then try to connect to
# different up node.
current_slave_host = remote_host
- slave_up_hosts = get_slave_bricks_status(
- slave_host, slave_vol)
+ slave_up_hosts = get_up_nodes(slavenodes, gconf.get("ssh-port"))
if (current_slave_host, remote_id) not in slave_up_hosts:
if len(slave_up_hosts) > 0:
@@ -354,7 +315,7 @@ class Monitor(object):
self.status[w[0]['dir']].set_worker_status(self.ST_INCON)
return ret
- def multiplex(self, wspx, suuid, slave_vol, slave_host, master):
+ def multiplex(self, wspx, suuid, slave_vol, slave_host, master, slavenodes):
argv = [os.path.basename(sys.executable), sys.argv[0]]
cpids = set()
@@ -363,7 +324,7 @@ class Monitor(object):
for wx in wspx:
def wmon(w):
cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol,
- slave_host, master, suuid)
+ slave_host, master, suuid, slavenodes)
time.sleep(1)
self.lock.acquire()
for cpid in cpids:
@@ -380,7 +341,10 @@ class Monitor(object):
def distribute(master, slave):
- mvol = Volinfo(master.volume, master.host)
+ if rconf.args.use_gconf_volinfo:
+ mvol = VolinfoFromGconf(master.volume, master=True)
+ else:
+ mvol = Volinfo(master.volume, master.host)
logging.debug('master bricks: ' + repr(mvol.bricks))
prelude = []
slave_host = None
@@ -393,7 +357,11 @@ def distribute(master, slave):
logging.debug('slave SSH gateway: ' + slave.remote_addr)
- svol = Volinfo(slave.volume, "localhost", prelude)
+ if rconf.args.use_gconf_volinfo:
+ svol = VolinfoFromGconf(slave.volume, master=False)
+ else:
+ svol = Volinfo(slave.volume, "localhost", prelude)
+
sbricks = svol.bricks
suuid = svol.uuid
slave_host = slave.remote_addr.split('@')[-1]
@@ -415,14 +383,14 @@ def distribute(master, slave):
workerspex = []
for idx, brick in enumerate(mvol.bricks):
- if is_host_local(brick['uuid']):
+ if rconf.args.local_node_id == brick['uuid']:
is_hot = mvol.is_hot(":".join([brick['host'], brick['dir']]))
workerspex.append((brick,
slaves[idx % len(slaves)],
get_subvol_num(idx, mvol, is_hot),
is_hot))
logging.debug('worker specs: ' + repr(workerspex))
- return workerspex, suuid, slave_vol, slave_host, master
+ return workerspex, suuid, slave_vol, slave_host, master, slavenodes
def monitor(local, remote):
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 1f2692254db..e546f558265 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -18,6 +18,7 @@ import logging
import errno
import threading
import subprocess
+import socket
from subprocess import PIPE
from threading import Lock, Thread as baseThread
from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
@@ -871,3 +872,90 @@ class Volinfo(object):
return int(self.get('hotBricks/hotbrickCount')[0].text)
else:
return 0
+
+
+class VolinfoFromGconf(object):
+ # Glusterd will generate following config items before Geo-rep start
+ # So that Geo-rep need not run gluster commands from inside
+ # Volinfo object API/interface kept as is so that caller need not
+ # change anything exept calling this instead of Volinfo()
+ #
+ # master-bricks=
+ # master-bricks=NODEID:HOSTNAME:PATH,..
+ # slave-bricks=NODEID:HOSTNAME,..
+ # master-volume-id=
+ # slave-volume-id=
+ # master-replica-count=
+ # master-disperse_count=
+ def __init__(self, vol, host='localhost', master=True):
+ self.volume = vol
+ self.host = host
+ self.master = master
+
+ def is_tier(self):
+ return False
+
+ def is_hot(self, brickpath):
+ return False
+
+ @property
+ @memoize
+ def bricks(self):
+ pfx = "master-" if self.master else "slave-"
+ bricks_data = gconf.get(pfx + "bricks")
+ if bricks_data is None:
+ return []
+
+ bricks_data = bricks_data.split(",")
+ bricks_data = [b.strip() for b in bricks_data]
+ out = []
+ for b in bricks_data:
+ parts = b.split(":")
+ bpath = parts[2] if len(parts) == 3 else ""
+ out.append({"host": parts[1], "dir": bpath, "uuid": parts[0]})
+
+ return out
+
+ @property
+ @memoize
+ def uuid(self):
+ if self.master:
+ return gconf.get("master-volume-id")
+ else:
+ return gconf.get("slave-volume-id")
+
+ def replica_count(self, tier, hot):
+ return gconf.get("master-replica-count")
+
+ def disperse_count(self, tier, hot):
+ return gconf.get("master-disperse-count")
+
+ @property
+ @memoize
+ def hot_bricks(self):
+ return []
+
+ def get_hot_bricks_count(self, tier):
+ return 0
+
+
+def can_ssh(host, port=22):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ s.connect((host, port))
+ flag = True
+ except socket.error:
+ flag = False
+
+ s.close()
+ return flag
+
+
+def get_up_nodes(hosts, port):
+ # List of hosts with Hostname/IP and UUID
+ up_nodes = []
+ for h in hosts:
+ if can_ssh(h[0], port):
+ up_nodes.append(h)
+
+ return up_nodes