diff options
-rw-r--r-- | geo-replication/gsyncd.conf.in | 21 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 1 | ||||
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 68 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 88 |
4 files changed, 128 insertions, 50 deletions
diff --git a/geo-replication/gsyncd.conf.in b/geo-replication/gsyncd.conf.in index 53cc76b842a..80a9e4a8e8b 100644 --- a/geo-replication/gsyncd.conf.in +++ b/geo-replication/gsyncd.conf.in @@ -1,6 +1,26 @@ [__meta__] version = 4.0 +[master-bricks] +configurable=false + +[slave-bricks] +configurable=false + +[master-volume-id] +configurable=false + +[slave-volume-id] +configurable=false + +[master-replica-count] +configurable=false +type=int + +[master-disperse_count] +configurable=false +type=int + [glusterd-workdir] value = @GLUSTERD_WORKDIR@ @@ -234,6 +254,7 @@ allowed_values=ERROR,INFO,WARNING,DEBUG value=22 validation=int help=Set SSH port +type=int [ssh-command] value=ssh diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 04ceb435bf7..3458898646e 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -56,6 +56,7 @@ def main(): help="Start with Paused state") p.add_argument("--local-node-id", help="Local Node ID") p.add_argument("--debug", action="store_true") + p.add_argument("--use-gconf-volinfo", action="store_true") # Worker p = sp.add_parser("worker") diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index a193b57caff..257d34a743b 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -14,7 +14,6 @@ import time import signal import logging import xml.etree.ElementTree as XET -from subprocess import PIPE from threading import Lock from errno import ECHILD, ESRCH import random @@ -23,9 +22,9 @@ from resource import SSH import gsyncdconfig as gconf from rconf import rconf from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile -from syncdutils import set_term_handler, is_host_local, GsyncdError -from syncdutils import Thread, finalize, Popen, Volinfo -from syncdutils import gf_event, EVENT_GEOREP_FAULTY +from syncdutils import set_term_handler, GsyncdError +from syncdutils import Thread, finalize, Volinfo, VolinfoFromGconf +from syncdutils import gf_event, EVENT_GEOREP_FAULTY, get_up_nodes from gsyncdstatus import GeorepStatus, set_monitor_status @@ -54,43 +53,6 @@ def get_subvol_num(brick_idx, vol, hot): return str(cnt) -def get_slave_bricks_status(host, vol): - po = Popen(['gluster', '--xml', '--remote-host=' + host, - 'volume', 'status', vol, "detail"], - stdout=PIPE, stderr=PIPE) - vix = po.stdout.read() - po.wait() - po.terminate_geterr(fail_on_err=False) - if po.returncode != 0: - logging.info(lf("Volume status command failed, unable to get " - "list of up nodes, returning empty list", - volume=vol, - error=po.returncode)) - return [] - vi = XET.fromstring(vix) - if vi.find('opRet').text != '0': - logging.info(lf("Unable to get list of up nodes, " - "returning empty list", - volume=vol, - error=vi.find('opErrstr').text)) - return [] - - up_hosts = set() - - try: - for el in vi.findall('volStatus/volumes/volume/node'): - if el.find('status').text == '1': - up_hosts.add((el.find('hostname').text, - el.find('peerid').text)) - except (ParseError, AttributeError, ValueError) as e: - logging.info(lf("Parsing failed to get list of up nodes, " - "returning empty list", - volume=vol, - error=e)) - - return list(up_hosts) - - class Monitor(object): """class which spawns and manages gsyncd workers""" @@ -116,7 +78,7 @@ class Monitor(object): errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH]) def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master, - suuid): + suuid, slavenodes): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -180,8 +142,7 @@ class Monitor(object): # If the connected slave node is down then try to connect to # different up node. current_slave_host = remote_host - slave_up_hosts = get_slave_bricks_status( - slave_host, slave_vol) + slave_up_hosts = get_up_nodes(slavenodes, gconf.get("ssh-port")) if (current_slave_host, remote_id) not in slave_up_hosts: if len(slave_up_hosts) > 0: @@ -354,7 +315,7 @@ class Monitor(object): self.status[w[0]['dir']].set_worker_status(self.ST_INCON) return ret - def multiplex(self, wspx, suuid, slave_vol, slave_host, master): + def multiplex(self, wspx, suuid, slave_vol, slave_host, master, slavenodes): argv = [os.path.basename(sys.executable), sys.argv[0]] cpids = set() @@ -363,7 +324,7 @@ class Monitor(object): for wx in wspx: def wmon(w): cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, - slave_host, master, suuid) + slave_host, master, suuid, slavenodes) time.sleep(1) self.lock.acquire() for cpid in cpids: @@ -380,7 +341,10 @@ class Monitor(object): def distribute(master, slave): - mvol = Volinfo(master.volume, master.host) + if rconf.args.use_gconf_volinfo: + mvol = VolinfoFromGconf(master.volume, master=True) + else: + mvol = Volinfo(master.volume, master.host) logging.debug('master bricks: ' + repr(mvol.bricks)) prelude = [] slave_host = None @@ -393,7 +357,11 @@ def distribute(master, slave): logging.debug('slave SSH gateway: ' + slave.remote_addr) - svol = Volinfo(slave.volume, "localhost", prelude) + if rconf.args.use_gconf_volinfo: + svol = VolinfoFromGconf(slave.volume, master=False) + else: + svol = Volinfo(slave.volume, "localhost", prelude) + sbricks = svol.bricks suuid = svol.uuid slave_host = slave.remote_addr.split('@')[-1] @@ -415,14 +383,14 @@ def distribute(master, slave): workerspex = [] for idx, brick in enumerate(mvol.bricks): - if is_host_local(brick['uuid']): + if rconf.args.local_node_id == brick['uuid']: is_hot = mvol.is_hot(":".join([brick['host'], brick['dir']])) workerspex.append((brick, slaves[idx % len(slaves)], get_subvol_num(idx, mvol, is_hot), is_hot)) logging.debug('worker specs: ' + repr(workerspex)) - return workerspex, suuid, slave_vol, slave_host, master + return workerspex, suuid, slave_vol, slave_host, master, slavenodes def monitor(local, remote): diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 1f2692254db..e546f558265 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -18,6 +18,7 @@ import logging import errno import threading import subprocess +import socket from subprocess import PIPE from threading import Lock, Thread as baseThread from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED @@ -871,3 +872,90 @@ class Volinfo(object): return int(self.get('hotBricks/hotbrickCount')[0].text) else: return 0 + + +class VolinfoFromGconf(object): + # Glusterd will generate following config items before Geo-rep start + # So that Geo-rep need not run gluster commands from inside + # Volinfo object API/interface kept as is so that caller need not + # change anything exept calling this instead of Volinfo() + # + # master-bricks= + # master-bricks=NODEID:HOSTNAME:PATH,.. + # slave-bricks=NODEID:HOSTNAME,.. + # master-volume-id= + # slave-volume-id= + # master-replica-count= + # master-disperse_count= + def __init__(self, vol, host='localhost', master=True): + self.volume = vol + self.host = host + self.master = master + + def is_tier(self): + return False + + def is_hot(self, brickpath): + return False + + @property + @memoize + def bricks(self): + pfx = "master-" if self.master else "slave-" + bricks_data = gconf.get(pfx + "bricks") + if bricks_data is None: + return [] + + bricks_data = bricks_data.split(",") + bricks_data = [b.strip() for b in bricks_data] + out = [] + for b in bricks_data: + parts = b.split(":") + bpath = parts[2] if len(parts) == 3 else "" + out.append({"host": parts[1], "dir": bpath, "uuid": parts[0]}) + + return out + + @property + @memoize + def uuid(self): + if self.master: + return gconf.get("master-volume-id") + else: + return gconf.get("slave-volume-id") + + def replica_count(self, tier, hot): + return gconf.get("master-replica-count") + + def disperse_count(self, tier, hot): + return gconf.get("master-disperse-count") + + @property + @memoize + def hot_bricks(self): + return [] + + def get_hot_bricks_count(self, tier): + return 0 + + +def can_ssh(host, port=22): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.connect((host, port)) + flag = True + except socket.error: + flag = False + + s.close() + return flag + + +def get_up_nodes(hosts, port): + # List of hosts with Hostname/IP and UUID + up_nodes = [] + for h in hosts: + if can_ssh(h[0], port): + up_nodes.append(h) + + return up_nodes |