diff options
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 4 | ||||
-rw-r--r-- | geo-replication/syncdaemon/master.py | 39 | ||||
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 45 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 4 |
4 files changed, 88 insertions, 4 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index b38f19d33f7..1542810bcd7 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -255,6 +255,8 @@ def main_i(): type=int, default=1) op.add_option('--changelog-archive-format', metavar='N', type=str, default="%Y%m") + op.add_option('--meta-volume', metavar='N', + type=str, default="") op.add_option( '--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) op.add_option('--allow-network', metavar='IPS', default='') @@ -297,6 +299,8 @@ def main_i(): op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local) op.add_option('--rpc-fd', dest='rpc_fd', type=str, help=SUPPRESS_HELP) + op.add_option('--subvol-num', dest='subvol_num', type=int, + help=SUPPRESS_HELP) op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True)) op.add_option('-N', '--no-daemon', dest="go_daemon", diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 51c26c76116..dfe65fe6709 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -14,11 +14,12 @@ import time import stat import json import logging +import fcntl import socket import string import errno import tarfile -from errno import ENOENT, ENODATA, EPIPE, EEXIST +from errno import ENOENT, ENODATA, EPIPE, EEXIST, EACCES, EAGAIN from threading import Condition, Lock from datetime import datetime from gconf import gconf @@ -452,8 +453,40 @@ class GMasterCommon(object): t = Thread(target=keep_alive) t.start() - def should_crawl(cls): - return gconf.glusterd_uuid in cls.master.server.node_uuid() + def mgmt_lock(self): + """Take management volume lock """ + bname = str(gconf.volume_id) + "_subvol_" + str(gconf.subvol_num) \ + + ".lock" + path = os.path.join(gconf.working_dir, gconf.meta_volume, bname) + logging.debug("lock_file_path: %s" % path) + fd = os.open(path, os.O_CREAT | os.O_RDWR) + try: + fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except: + ex = sys.exc_info()[1] + os.close(fd) + if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): + # cannot grab, it's taken + logging.debug("Lock held by someother worker process") + return False + raise + logging.debug("Got the lock") + return True + + + def should_crawl(self): + if not gconf.meta_volume: + return gconf.glusterd_uuid in self.master.server.node_uuid() + + mgmt_mnt = os.path.join(gconf.working_dir, gconf.meta_volume) + if not os.path.ismount(mgmt_mnt): + po = Popen(["mount", "-t", "glusterfs", "localhost:%s" + % gconf.meta_volume, mgmt_mnt], stdout=PIPE, + stderr=PIPE) + po.wait() + po.terminate_geterr() + return self.mgmt_lock() + def register(self): self.register() diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 5e0698b8c46..e50893c793f 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -18,6 +18,7 @@ import xml.etree.ElementTree as XET from subprocess import PIPE from resource import Popen, FILE, GLUSTER, SSH from threading import Lock +from errno import EEXIST import re import random from gconf import gconf @@ -29,6 +30,16 @@ from syncdutils import escape, Thread, finalize, memoize ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError +def get_subvol_num(brick_idx, replica_count, disperse_count): + subvol_size = disperse_count if disperse_count > 0 else replica_count + cnt = int((brick_idx + 1) / subvol_size) + rem = (brick_idx + 1) % subvol_size + if rem > 0: + return cnt + 1 + else: + return cnt + + def get_slave_bricks_status(host, vol): po = Popen(['gluster', '--xml', '--remote-host=' + host, 'volume', 'status', vol, "detail"], @@ -104,6 +115,15 @@ class Volinfo(object): self.volume, self.host) return ids[0].text + @property + @memoize + def replica_count(self): + return int(self.get('replicaCount')[0].text) + + @property + @memoize + def disperse_count(self): + return int(self.get('disperseCount')[0].text) class Monitor(object): @@ -154,6 +174,7 @@ class Monitor(object): # give a chance to graceful exit os.kill(-os.getpid(), signal.SIGTERM) + def monitor(self, w, argv, cpids, agents, slave_vol, slave_host): """the monitor loop @@ -247,6 +268,7 @@ class Monitor(object): '--rpc-fd', ','.join([str(rw), str(ww), str(ra), str(wa)]), + '--subvol-num', str(w[2]), '--resource-remote', remote_host]) @@ -374,7 +396,8 @@ def distribute(*resources): else: slaves = slavevols - workerspex = [(brick['dir'], slaves[idx % len(slaves)]) + workerspex = [(brick['dir'], slaves[idx % len(slaves)], + get_subvol_num(idx, mvol.replica_count, mvol.disperse_count)) for idx, brick in enumerate(mvol.bricks) if is_host_local(brick['host'])] logging.info('worker specs: ' + repr(workerspex)) @@ -382,6 +405,26 @@ def distribute(*resources): def monitor(*resources): + # Mount geo-rep management volume + if gconf.meta_volume: + mgmt_mnt = os.path.join(gconf.working_dir, gconf.meta_volume) + if not os.path.exists(mgmt_mnt): + try: + os.makedirs(mgmt_mnt) + except OSError: + ex = sys.exc_info()[1] + if ex.errno == EEXIST: + pass + else: + raise + + if not os.path.ismount(mgmt_mnt): + po = Popen(["mount", "-t", "glusterfs", "localhost:%s" + % gconf.meta_volume, mgmt_mnt], stdout=PIPE, + stderr=PIPE) + po.wait() + po.terminate_geterr() + # Check if gsyncd restarted in pause state. If # yes, send SIGSTOP to negative of monitor pid # to go back to pause state. diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 6af957ddb4a..5037004d9a5 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -16,6 +16,7 @@ import fcntl import shutil import logging import socket +from subprocess import Popen, PIPE from threading import Lock, Thread as baseThread from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED from errno import EINTR, ENOENT, EPERM, ESTALE, errorcode @@ -215,6 +216,9 @@ def finalize(*a, **kw): except: if sys.exc_info()[0] == OSError: pass + + # TODO: Clean up mgmt volume mount point only monitor dies + if gconf.log_exit: logging.info("exiting.") sys.stdout.flush() |