diff options
Diffstat (limited to 'geo-replication/syncdaemon')
-rw-r--r-- | geo-replication/syncdaemon/Makefile.am | 6 | ||||
-rw-r--r-- | geo-replication/syncdaemon/argsupgrade.py | 335 | ||||
-rw-r--r-- | geo-replication/syncdaemon/changelogagent.py | 1 | ||||
-rw-r--r-- | geo-replication/syncdaemon/conf.py.in | 2 | ||||
-rw-r--r-- | geo-replication/syncdaemon/configinterface.py | 434 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 1077 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncdconfig.py | 365 | ||||
-rw-r--r-- | geo-replication/syncdaemon/libgfchangelog.py | 6 | ||||
-rw-r--r-- | geo-replication/syncdaemon/logutils.py | 66 | ||||
-rw-r--r-- | geo-replication/syncdaemon/master.py | 235 | ||||
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 240 | ||||
-rw-r--r-- | geo-replication/syncdaemon/rconf.py (renamed from geo-replication/syncdaemon/gconf.py) | 8 | ||||
-rw-r--r-- | geo-replication/syncdaemon/repce.py | 18 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 1389 | ||||
-rw-r--r-- | geo-replication/syncdaemon/subcmds.py | 306 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 121 |
16 files changed, 2299 insertions, 2310 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index f80fb26c28a..5fcf9f27a34 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -1,8 +1,8 @@ syncdaemondir = $(GLUSTERFS_LIBEXECDIR)/python/syncdaemon -syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \ - resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ +syncdaemon_PYTHON = rconf.py gsyncd.py __init__.py master.py README.md repce.py \ + resource.py syncdutils.py monitor.py libcxattr.py gsyncdconfig.py \ $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py \ - gsyncdstatus.py conf.py + gsyncdstatus.py conf.py logutils.py subcmds.py argsupgrade.py CLEANFILES = diff --git a/geo-replication/syncdaemon/argsupgrade.py b/geo-replication/syncdaemon/argsupgrade.py new file mode 100644 index 00000000000..18edb6ba5b7 --- /dev/null +++ b/geo-replication/syncdaemon/argsupgrade.py @@ -0,0 +1,335 @@ +# Converts old style args into new style args + +import sys +from argparse import ArgumentParser +import socket +import os + +from syncdutils import GsyncdError +from conf import GLUSTERD_WORKDIR + + +def gethostbyname(hnam): + """gethostbyname wrapper""" + try: + return socket.gethostbyname(hnam) + except socket.gaierror: + ex = sys.exc_info()[1] + raise GsyncdError("failed to resolve %s: %s" % + (hnam, ex.strerror)) + + +def slave_url(urldata): + urldata = urldata.replace("ssh://", "") + host, vol = urldata.split("::") + vol = vol.split(":")[0] + return "%s::%s" % (host, vol) + + +def init_gsyncd_template_conf(): + path = GLUSTERD_WORKDIR + "/geo-replication/gsyncd_template.conf" + dname = os.path.dirname(path) + if not os.path.exists(dname): + try: + os.mkdir(dname) + except OSError: + pass + + if not os.path.exists(path): + fd = os.open(path, os.O_CREAT | os.O_RDWR) + os.close(fd) + + +def init_gsyncd_session_conf(master, slave): + slave = slave_url(slave) + master = master.strip(":") + slavehost, slavevol = slave.split("::") + slavehost = slavehost.split("@")[-1] + + # Session Config File + path = "%s/geo-replication/%s_%s_%s/gsyncd.conf" % ( + GLUSTERD_WORKDIR, master, slavehost, slavevol) + + if os.path.exists(os.path.dirname(path)) and not os.path.exists(path): + fd = os.open(path, os.O_CREAT | os.O_RDWR) + os.close(fd) + + +def init_gsyncd_conf(path): + dname = os.path.dirname(path) + if not os.path.exists(dname): + try: + os.mkdir(dname) + except OSError: + pass + + if os.path.exists(dname) and not os.path.exists(path): + fd = os.open(path, os.O_CREAT | os.O_RDWR) + os.close(fd) + + +def upgrade(): + # Create dummy template conf(empty), hack to avoid glusterd + # fail when it does stat to check the existence. + init_gsyncd_template_conf() + + if "--monitor" in sys.argv: + # python gsyncd.py --path=/bricks/b1 + # --monitor -c gsyncd.conf + # --iprefix=/var :gv1 + # --glusterd-uuid=f26ac7a8-eb1b-4ea7-959c-80b27d3e43d0 + # f241::gv2 + p = ArgumentParser() + p.add_argument("master") + p.add_argument("slave") + p.add_argument("--glusterd-uuid") + p.add_argument("-c") + p.add_argument("--iprefix") + p.add_argument("--path", action="append") + pargs = p.parse_known_args(sys.argv[1:])[0] + + # Overwrite the sys.argv after rearrange + init_gsyncd_session_conf(pargs.master, pargs.slave) + sys.argv = [ + sys.argv[0], + "monitor", + pargs.master.strip(":"), + slave_url(pargs.slave), + "--local-node-id", + pargs.glusterd_uuid + ] + elif "--status-get" in sys.argv: + # -c gsyncd.conf --iprefix=/var :gv1 f241::gv2 + # --status-get --path /bricks/b1 + p = ArgumentParser() + p.add_argument("master") + p.add_argument("slave") + p.add_argument("-c") + p.add_argument("--path") + p.add_argument("--iprefix") + pargs = p.parse_known_args(sys.argv[1:])[0] + + init_gsyncd_session_conf(pargs.master, pargs.slave) + + sys.argv = [ + sys.argv[0], + "status", + pargs.master.strip(":"), + slave_url(pargs.slave), + "--local-path", + pargs.path + ] + elif "--canonicalize-url" in sys.argv: + # This can accept multiple URLs and converts each URL to the + # format ssh://USER@IP:gluster://127.0.0.1:VOLUME + # This format not used in gsyncd, but added for glusterd compatibility + p = ArgumentParser() + p.add_argument("--canonicalize-url", nargs="+") + pargs = p.parse_known_args(sys.argv[1:])[0] + + for url in pargs.canonicalize_url: + host, vol = url.split("::") + host = host.replace("ssh://", "") + remote_addr = host + if "@" not in remote_addr: + remote_addr = "root@" + remote_addr + + user, hname = remote_addr.split("@") + + print("ssh://%s@%s:gluster://127.0.0.1:%s" % ( + user, gethostbyname(hname), vol)) + + sys.exit(0) + elif "--normalize-url" in sys.argv: + # Adds schema prefix as ssh:// + # This format not used in gsyncd, but added for glusterd compatibility + p = ArgumentParser() + p.add_argument("--normalize-url") + pargs = p.parse_known_args(sys.argv[1:])[0] + print("ssh://%s" % slave_url(pargs.normalize_url)) + sys.exit(0) + elif "--config-get-all" in sys.argv: + # -c gsyncd.conf --iprefix=/var :gv1 f241::gv2 --config-get-all + p = ArgumentParser() + p.add_argument("master") + p.add_argument("slave") + p.add_argument("-c") + p.add_argument("--iprefix") + pargs = p.parse_known_args(sys.argv[1:])[0] + + init_gsyncd_session_conf(pargs.master, pargs.slave) + + sys.argv = [ + sys.argv[0], + "config-get", + pargs.master.strip(":"), + slave_url(pargs.slave), + "--show-defaults", + "--use-underscore" + ] + elif "--verify" in sys.argv and "spawning" in sys.argv: + # Just checks that able to spawn gsyncd or not + sys.exit(0) + elif "--slavevoluuid-get" in sys.argv: + # --slavevoluuid-get f241::gv2 + p = ArgumentParser() + p.add_argument("--slavevoluuid-get") + p.add_argument("-c") + p.add_argument("--iprefix") + pargs = p.parse_known_args(sys.argv[1:])[0] + host, vol = pargs.slavevoluuid_get.split("::") + + # Modified sys.argv + sys.argv = [ + sys.argv[0], + "voluuidget", + host, + vol + ] + elif "--config-set-rx" in sys.argv: + # Not required since default conf is not generated + # and custom conf generated only when required + # -c gsyncd.conf --config-set-rx remote-gsyncd + # /usr/local/libexec/glusterfs/gsyncd . . + # Touch the gsyncd.conf file and create session + # directory if required + p = ArgumentParser() + p.add_argument("-c", dest="config_file") + pargs = p.parse_known_args(sys.argv[1:])[0] + + # If not template conf then it is trying to create + # session config, create a empty file instead + if pargs.config_file.endswith("gsyncd.conf"): + init_gsyncd_conf(pargs.config_file) + sys.exit(0) + elif "--create" in sys.argv: + # To update monitor status file + # --create Created -c gsyncd.conf + # --iprefix=/var :gv1 f241::gv2 + p = ArgumentParser() + p.add_argument("--create") + p.add_argument("master") + p.add_argument("slave") + p.add_argument("-c") + p.add_argument("--iprefix") + pargs = p.parse_known_args(sys.argv[1:])[0] + + init_gsyncd_session_conf(pargs.master, pargs.slave) + + # Modified sys.argv + sys.argv = [ + sys.argv[0], + "monitor-status", + pargs.master.strip(":"), + slave_url(pargs.slave), + pargs.create + ] + elif "--config-get" in sys.argv: + # -c gsyncd.conf --iprefix=/var :gv1 f241::gv2 --config-get pid-file + p = ArgumentParser() + p.add_argument("--config-get") + p.add_argument("master") + p.add_argument("slave") + p.add_argument("-c") + p.add_argument("--iprefix") + pargs = p.parse_known_args(sys.argv[1:])[0] + + init_gsyncd_session_conf(pargs.master, pargs.slave) + + # Modified sys.argv + sys.argv = [ + sys.argv[0], + "config-get", + pargs.master.strip(":"), + slave_url(pargs.slave), + "--only-value", + "--show-defaults", + "--name", + pargs.config_get.replace("_", "-") + ] + elif "--config-set" in sys.argv: + # ignore session-owner + if "session-owner" in sys.argv: + sys.exit(0) + + # --path=/bricks/b1 -c gsyncd.conf :gv1 f241::gv2 + # --config-set log_level DEBUG + p = ArgumentParser() + p.add_argument("master") + p.add_argument("slave") + p.add_argument("--config-set", nargs=2) + p.add_argument("-c") + pargs = p.parse_known_args(sys.argv[1:])[0] + + init_gsyncd_session_conf(pargs.master, pargs.slave) + + # Modified sys.argv + sys.argv = [ + sys.argv[0], + "config-set", + pargs.master.strip(":"), + slave_url(pargs.slave), + pargs.config_set[0], + pargs.config_set[1] + ] + elif "--config-check" in sys.argv: + # --config-check georep_session_working_dir + p = ArgumentParser() + p.add_argument("--config-check") + p.add_argument("-c") + pargs = p.parse_known_args(sys.argv[1:])[0] + + # Modified sys.argv + sys.argv = [ + sys.argv[0], + "config-check", + pargs.config_check.replace("_", "-") + ] + elif "--config-del" in sys.argv: + # -c gsyncd.conf --iprefix=/var :gv1 f241::gv2 --config-del log_level + p = ArgumentParser() + p.add_argument("--config-del") + p.add_argument("master") + p.add_argument("slave") + p.add_argument("-c") + p.add_argument("--iprefix") + pargs = p.parse_known_args(sys.argv[1:])[0] + + init_gsyncd_session_conf(pargs.master, pargs.slave) + + # Modified sys.argv + sys.argv = [ + sys.argv[0], + "config-reset", + pargs.master.strip(":"), + slave_url(pargs.slave), + pargs.config_del.replace("_", "-") + ] + elif "--delete" in sys.argv: + # --delete -c gsyncd.conf --iprefix=/var + # --path-list=--path=/bricks/b1 :gv1 f241::gv2 + p = ArgumentParser() + p.add_argument("--reset-sync-time", action="store_true") + p.add_argument("--path-list") + p.add_argument("master") + p.add_argument("slave") + p.add_argument("--iprefix") + p.add_argument("-c") + pargs = p.parse_known_args(sys.argv[1:])[0] + + init_gsyncd_session_conf(pargs.master, pargs.slave) + + paths = pargs.path_list.split("--path=") + paths = ["--path=%s" % x.strip() for x in paths if x.strip() != ""] + + # Modified sys.argv + sys.argv = [ + sys.argv[0], + "delete", + pargs.master.strip(":"), + slave_url(pargs.slave) + ] + sys.argv += paths + + if pargs.reset_sync_time: + sys.argv.append("--reset-sync-time") diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py index 731dbd06f57..4fb8d254dea 100644 --- a/geo-replication/syncdaemon/changelogagent.py +++ b/geo-replication/syncdaemon/changelogagent.py @@ -9,7 +9,6 @@ # cases as published by the Free Software Foundation. # -import os import logging import syncdutils from syncdutils import select, CHANGELOG_AGENT_SERVER_VERSION diff --git a/geo-replication/syncdaemon/conf.py.in b/geo-replication/syncdaemon/conf.py.in index 9b7c64df9d7..2042fa9cdfb 100644 --- a/geo-replication/syncdaemon/conf.py.in +++ b/geo-replication/syncdaemon/conf.py.in @@ -13,3 +13,5 @@ GLUSTERD_WORKDIR = "@GLUSTERD_WORKDIR@" LOCALSTATEDIR = "@localstatedir@" UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info" +GLUSTERFS_CONFDIR = "@SYSCONF_DIR@/glusterfs" +GCONF_VERSION = 4.0 diff --git a/geo-replication/syncdaemon/configinterface.py b/geo-replication/syncdaemon/configinterface.py deleted file mode 100644 index a59dee7a4a6..00000000000 --- a/geo-replication/syncdaemon/configinterface.py +++ /dev/null @@ -1,434 +0,0 @@ -# -# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> -# This file is part of GlusterFS. - -# This file is licensed to you under your choice of the GNU Lesser -# General Public License, version 3 or any later version (LGPLv3 or -# later), or the GNU General Public License, version 2 (GPLv2), in all -# cases as published by the Free Software Foundation. -# - -try: - import ConfigParser -except ImportError: - # py 3 - import configparser as ConfigParser -import re -from string import Template -import os -import errno -import sys -from stat import ST_DEV, ST_INO, ST_MTIME -import tempfile -import shutil - -from syncdutils import escape, unescape, norm, update_file, GsyncdError -from conf import GLUSTERD_WORKDIR, LOCALSTATEDIR - -SECT_ORD = '__section_order__' -SECT_META = '__meta__' -config_version = 2.0 - -re_type = type(re.compile('')) - -TMPL_CONFIG_FILE = GLUSTERD_WORKDIR + "/geo-replication/gsyncd_template.conf" - -# (SECTION, OPTION, OLD VALUE, NEW VALUE) -CONFIGS = ( - ("peersrx . .", - "georep_session_working_dir", - "", - GLUSTERD_WORKDIR + "/geo-replication/${mastervol}_${remotehost}_" - "${slavevol}/"), - ("peersrx .", - "gluster_params", - "aux-gfid-mount xlator-option=\*-dht.assert-no-child-down=true", - "aux-gfid-mount"), - ("peersrx .", - "gluster_params", - "aux-gfid-mount", - "aux-gfid-mount acl"), - ("peersrx . .", - "ssh_command_tar", - "", - "ssh -oPasswordAuthentication=no -oStrictHostKeyChecking=no " - "-i " + GLUSTERD_WORKDIR + "/geo-replication/tar_ssh.pem"), - ("peersrx . .", - "changelog_log_file", - "", - "${iprefix}/log/glusterfs/geo-replication/${mastervol}" - "/${eSlave}${local_id}-changes.log"), - ("peersrx . .", - "working_dir", - LOCALSTATEDIR + "/run/gluster/${mastervol}/${eSlave}", - "${iprefix}/lib/misc/glusterfsd/${mastervol}/${eSlave}"), - ("peersrx . .", - "ignore_deletes", - "true", - "false"), - ("peersrx . .", - "pid-file", - GLUSTERD_WORKDIR + "/geo-replication/${mastervol}_${remotehost}_" - "${slavevol}/${eSlave}.pid", - GLUSTERD_WORKDIR + "/geo-replication/${mastervol}_${remotehost}_" - "${slavevol}/monitor.pid"), - ("peersrx . .", - "state-file", - GLUSTERD_WORKDIR + "/geo-replication/${mastervol}_${remotehost}_" - "${slavevol}/${eSlave}.status", - GLUSTERD_WORKDIR + "/geo-replication/${mastervol}_${remotehost}_" - "${slavevol}/monitor.status"), - ("peersrx .", - "log_file", - "${iprefix}/log/glusterfs/geo-replication-slaves/${session_owner}:${eSlave}.log", - "${iprefix}/log/glusterfs/geo-replication-slaves/${session_owner}:${local_node}${local_id}.${slavevol}.log"), - ("peersrx .", - "log_file_mbr", - "${iprefix}/log/glusterfs/geo-replication-slaves/mbr/${session_owner}:${eSlave}.log", - "${iprefix}/log/glusterfs/geo-replication-slaves/mbr/${session_owner}:${local_node}${local_id}.${slavevol}.log"), - ("peersrx .", - "gluster_log_file", - "${iprefix}/log/glusterfs/geo-replication-slaves/${session_owner}:${eSlave}.gluster.log", - "${iprefix}/log/glusterfs/geo-replication-slaves/${session_owner}:${local_node}${local_id}.${slavevol}.gluster.log") -) - - -def upgrade_config_file(path, confdata): - config_change = False - config = ConfigParser.RawConfigParser() - # If confdata.rx present then glusterd is adding config values, - # it will create config file if not exists. config.read is fine in - # this case since any other error will be raised during write. - if getattr(confdata, "rx", False): - config.read(path) - else: - with open(path) as fp: - config.readfp(fp) - - for sec, opt, oldval, newval in CONFIGS: - try: - val = config.get(sec, opt) - except ConfigParser.NoOptionError: - # if new config opt not exists - config_change = True - config.set(sec, opt, newval) - continue - except ConfigParser.Error: - """ - When gsyncd invoked at the time of create, config file - will not be their. Ignore any ConfigParser errors - """ - continue - - if val == newval: - # value is same as new val - continue - - if val == oldval: - # config value needs update - config_change = True - config.set(sec, opt, newval) - - # To convert from old peers section format to new peers section format. - # Old format: peers gluster://<master ip>:<master vol> \ - # ssh://root@<slave ip>:gluster://<master ip>:<slave vol> - # New format: peers <master vol name> <slave vol name> - for old_sect in config.sections(): - if old_sect.startswith("peers "): - peers_data = old_sect.split(" ") - mvol = peers_data[1].split("%3A")[-1] - svol = peers_data[2].split("%3A")[-1] - new_sect = "peers {0} {1}".format(mvol, svol) - - if old_sect == new_sect: - # Already in new format "peers mastervol slavevol" - continue - - # Create new section if not exists - try: - config.add_section(new_sect) - except ConfigParser.DuplicateSectionError: - pass - - config_change = True - # Add all the items of old_sect to new_sect - for key, val in config.items(old_sect): - config.set(new_sect, key, val) - - # Delete old section - config.remove_section(old_sect) - - if config_change: - tempConfigFile = tempfile.NamedTemporaryFile(mode="wb", delete=False) - with open(tempConfigFile.name, 'wb') as configFile: - config.write(configFile) - - # If src and dst are two different file system, then os.rename - # fails, In this case if temp file created in /tmp and if /tmp is - # separate fs then os.rename gives following error, so use shutil - # OSError: [Errno 18] Invalid cross-device link - # mail.python.org/pipermail/python-list/2005-February/342893.html - shutil.move(tempConfigFile.name, path) - - -class MultiDict(object): - - """a virtual dict-like class which functions as the union - of underlying dicts""" - - def __init__(self, *dd): - self.dicts = dd - - def __getitem__(self, key): - val = None - for d in self.dicts: - if d.get(key) is not None: - val = d[key] - if val is None: - raise KeyError(key) - return val - - -class GConffile(object): - - """A high-level interface to ConfigParser which flattens the two-tiered - config layout by implenting automatic section dispatch based on initial - parameters. - - Also ensure section ordering in terms of their time of addition -- a compat - hack for Python < 2.7. - """ - - def _normconfig(self): - """normalize config keys by s/-/_/g""" - for n, s in self.config._sections.items(): - if n.find('__') == 0: - continue - s2 = type(s)() - for k, v in s.items(): - if k.find('__') != 0: - k = norm(k) - s2[k] = v - self.config._sections[n] = s2 - - def __init__(self, path, peers, confdata, *dd): - """ - - .path: location of config file - - .config: underlying ConfigParser instance - - .peers: on behalf of whom we flatten .config - (master, or master-slave url pair) - - .auxdicts: template subtituents - """ - self.peers = peers - self.path = path - self.auxdicts = dd - self.config = ConfigParser.RawConfigParser() - if getattr(confdata, "rx", False): - self.config.read(path) - else: - with open(path) as fp: - self.config.readfp(fp) - - self.dev, self.ino, self.mtime = -1, -1, -1 - self._normconfig() - - def _load(self): - try: - sres = os.stat(self.path) - self.dev = sres[ST_DEV] - self.ino = sres[ST_INO] - self.mtime = sres[ST_MTIME] - except (OSError, IOError): - if sys.exc_info()[1].errno == errno.ENOENT: - sres = None - - self.config = ConfigParser.RawConfigParser() - with open(self.path) as fp: - self.config.readfp(fp) - self._normconfig() - - def get_realtime(self, opt, default_value=None): - try: - sres = os.stat(self.path) - except (OSError, IOError): - if sys.exc_info()[1].errno == errno.ENOENT: - sres = None - else: - raise - - # compare file system stat with that of our stream file handle - if not sres or sres[ST_DEV] != self.dev or \ - sres[ST_INO] != self.ino or self.mtime != sres[ST_MTIME]: - self._load() - - return self.get(opt, printValue=False, default_value=default_value) - - def section(self, rx=False): - """get the section name of the section representing .peers - in .config""" - peers = self.peers - if not peers: - peers = ['.', '.'] - rx = True - if rx: - return ' '.join(['peersrx'] + [escape(u) for u in peers]) - else: - return ' '.join(['peers'] + [u.split(':')[-1] for u in peers]) - - @staticmethod - def parse_section(section): - """retrieve peers sequence encoded by section name - (as urls or regexen, depending on section type) - """ - sl = section.split() - st = sl.pop(0) - sl = [unescape(u) for u in sl] - if st == 'peersrx': - sl = [re.compile(u) for u in sl] - return sl - - def ord_sections(self): - """Return an ordered list of sections. - - Ordering happens based on the auxiliary - SECT_ORD section storing indices for each - section added through the config API. - - To not to go corrupt in case of manually - written config files, we take care to append - also those sections which are not registered - in SECT_ORD. - - Needed for python 2.{4,5,6} where ConfigParser - cannot yet order sections/options internally. - """ - so = {} - if self.config.has_section(SECT_ORD): - so = self.config._sections[SECT_ORD] - so2 = {} - for k, v in so.items(): - if k != '__name__': - so2[k] = int(v) - tv = 0 - if so2: - tv = max(so2.values()) + 1 - ss = [s for s in self.config.sections() if s.find('__') != 0] - for s in ss: - if s in so.keys(): - continue - so2[s] = tv - tv += 1 - - def scmp(x, y): - return cmp(*(so2[s] for s in (x, y))) - ss.sort(scmp) - return ss - - def update_to(self, dct, allow_unresolved=False): - """update @dct from key/values of ours. - - key/values are collected from .config by filtering the regexp sections - according to match, and from .section. The values are treated as - templates, which are substituted from .auxdicts and (in case of regexp - sections) match groups. - """ - if not self.peers: - raise GsyncdError('no peers given, cannot select matching options') - - def update_from_sect(sect, mud): - for k, v in self.config._sections[sect].items(): - # Template expects String to be passed - # if any config value is not string then it - # fails with ValueError - v = "{0}".format(v) - - if k == '__name__': - continue - if allow_unresolved: - dct[k] = Template(v).safe_substitute(mud) - else: - dct[k] = Template(v).substitute(mud) - - for sect in self.ord_sections(): - sp = self.parse_section(sect) - if isinstance(sp[0], re_type) and len(sp) == len(self.peers): - match = True - mad = {} - for i in range(len(sp)): - m = sp[i].search(self.peers[i]) - if not m: - match = False - break - for j in range(len(m.groups())): - mad['match%d_%d' % (i + 1, j + 1)] = m.groups()[j] - if match: - update_from_sect(sect, MultiDict(dct, mad, *self.auxdicts)) - if self.config.has_section(self.section()): - update_from_sect(self.section(), MultiDict(dct, *self.auxdicts)) - - def get(self, opt=None, printValue=True, default_value=None): - """print the matching key/value pairs from .config, - or if @opt given, the value for @opt (according to the - logic described in .update_to) - """ - d = {} - self.update_to(d, allow_unresolved=True) - if opt: - opt = norm(opt) - v = d.get(opt, default_value) - - if printValue: - if v is not None: - print(v) - else: - return v - else: - for k, v in d.iteritems(): - if k == '__name__': - continue - print("%s: %s" % (k, v)) - - def write(self, trfn, opt, *a, **kw): - """update on-disk config transactionally - - @trfn is the transaction function - """ - def mergeconf(f): - self.config = ConfigParser.RawConfigParser() - self.config.readfp(f) - self._normconfig() - if not self.config.has_section(SECT_META): - self.config.add_section(SECT_META) - self.config.set(SECT_META, 'version', config_version) - return trfn(norm(opt), *a, **kw) - - def updateconf(f): - self.config.write(f) - update_file(self.path, updateconf, mergeconf) - - def _set(self, opt, val, rx=False): - """set @opt to @val in .section""" - sect = self.section(rx) - if not self.config.has_section(sect): - self.config.add_section(sect) - # regarding SECT_ORD, cf. ord_sections - if not self.config.has_section(SECT_ORD): - self.config.add_section(SECT_ORD) - self.config.set( - SECT_ORD, sect, len(self.config._sections[SECT_ORD])) - self.config.set(sect, opt, val) - return True - - def set(self, opt, *a, **kw): - """perform ._set transactionally""" - self.write(self._set, opt, *a, **kw) - - def _delete(self, opt, rx=False): - """delete @opt from .section""" - sect = self.section(rx) - if self.config.has_section(sect): - return self.config.remove_option(sect, opt) - - def delete(self, opt, *a, **kw): - """perform ._delete transactionally""" - self.write(self._delete, opt, *a, **kw) diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index adca0374c6c..fbab5cbf386 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -1,805 +1,310 @@ -#!/usr/bin/env python -# -# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> -# This file is part of GlusterFS. - -# This file is licensed to you under your choice of the GNU Lesser -# General Public License, version 3 or any later version (LGPLv3 or -# later), or the GNU General Public License, version 2 (GPLv2), in all -# cases as published by the Free Software Foundation. -# - +#!/usr/bin/python +from argparse import ArgumentParser +import time import os -import os.path -import glob +from errno import EEXIST import sys -import time import logging -import shutil -import optparse -import fcntl -import fnmatch -from optparse import OptionParser, SUPPRESS_HELP -from logging import Logger, handlers -from errno import ENOENT - -from ipaddr import IPAddress, IPNetwork - -from gconf import gconf -from syncdutils import FreeObject, norm, grabpidfile, finalize -from syncdutils import log_raise_exception, privileged, boolify -from syncdutils import GsyncdError, select, set_term_handler -from configinterface import GConffile, upgrade_config_file, TMPL_CONFIG_FILE -import resource -from monitor import monitor -import xml.etree.ElementTree as XET -from subprocess import PIPE -import subprocess -from changelogagent import agent, Changelog -from gsyncdstatus import set_monitor_status, GeorepStatus, human_time_utc -from libcxattr import Xattr -import struct -from syncdutils import get_master_and_slave_data_from_args, lf, Popen - -ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError - - -class GLogger(Logger): - - """Logger customizations for gsyncd. - - It implements a log format similar to that of glusterfs. - """ - - def makeRecord(self, name, level, *a): - rv = Logger.makeRecord(self, name, level, *a) - rv.nsecs = (rv.created - int(rv.created)) * 1000000 - fr = sys._getframe(4) - callee = fr.f_locals.get('self') - if callee: - ctx = str(type(callee)).split("'")[1].split('.')[-1] - else: - ctx = '<top>' - if not hasattr(rv, 'funcName'): - rv.funcName = fr.f_code.co_name - rv.lvlnam = logging.getLevelName(level)[0] - rv.ctx = ctx - return rv - - @classmethod - def setup(cls, **kw): - lbl = kw.get('label', "") - if lbl: - lbl = '(' + lbl + ')' - lprm = {'datefmt': "%Y-%m-%d %H:%M:%S", - 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + - lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"} - lprm.update(kw) - lvl = kw.get('level', logging.INFO) - lprm['level'] = lvl - logging.root = cls("root", lvl) - logging.setLoggerClass(cls) - logging.getLogger().handlers = [] - logging.getLogger().setLevel(lprm['level']) - logging.Formatter.converter = time.gmtime # Log in GMT/UTC time - - if 'filename' in lprm: - try: - logging_handler = handlers.WatchedFileHandler(lprm['filename']) - formatter = logging.Formatter(fmt=lprm['format'], - datefmt=lprm['datefmt']) - logging_handler.setFormatter(formatter) - logging.getLogger().addHandler(logging_handler) - except AttributeError: - # Python version < 2.6 will not have WatchedFileHandler - # so fallback to logging without any handler. - # Note: logrotate will not work if Python version is < 2.6 - logging.basicConfig(**lprm) - else: - # If filename not passed(not available in lprm) then it may be - # streaming.(Ex: {"stream": "/dev/stdout"}) - logging.basicConfig(**lprm) - - @classmethod - def _gsyncd_loginit(cls, **kw): - lkw = {} - if gconf.log_level: - lkw['level'] = gconf.log_level - if kw.get('log_file'): - if kw['log_file'] in ('-', '/dev/stderr'): - lkw['stream'] = sys.stderr - elif kw['log_file'] == '/dev/stdout': - lkw['stream'] = sys.stdout - else: - lkw['filename'] = kw['log_file'] - - cls.setup(label=kw.get('label'), **lkw) - - lkw.update({'saved_label': kw.get('label')}) - gconf.log_metadata = lkw - gconf.log_exit = True - - -# Given slave host and its volume name, get corresponding volume uuid -def slave_vol_uuid_get(host, vol): - po = subprocess.Popen(['gluster', '--xml', '--remote-host=' + host, - 'volume', 'info', vol], bufsize=0, - stdin=None, stdout=PIPE, stderr=PIPE) - vix, err = po.communicate() - if po.returncode != 0: - logging.info(lf("Volume info failed, unable to get " - "volume uuid of slavevol, " - "returning empty string", - slavevol=vol, - slavehost=host, - error=po.returncode)) - return "" - vi = XET.fromstring(vix) - if vi.find('opRet').text != '0': - logging.info(lf("Unable to get volume uuid of slavevol, " - "returning empty string", - slavevol=vol, - slavehost=host, - error=vi.find('opErrstr').text)) - return "" - try: - voluuid = vi.find("volInfo/volumes/volume/id").text - except (ParseError, AttributeError, ValueError) as e: - logging.info(lf("Parsing failed to volume uuid of slavevol, " - "returning empty string", - slavevol=vol, - slavehost=host, - error=e)) - voluuid = "" - - return voluuid - - -def startup(**kw): - """set up logging, pidfile grabbing, daemonization""" - if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn': - if not grabpidfile(): - sys.stderr.write("pidfile is taken, exiting.\n") - sys.exit(2) - gconf.pid_file_owned = True - - if kw.get('go_daemon') == 'should': - x, y = os.pipe() - gconf.cpid = os.fork() - if gconf.cpid: - os.close(x) - sys.exit() - os.close(y) - os.setsid() - dn = os.open(os.devnull, os.O_RDWR) - for f in (sys.stdin, sys.stdout, sys.stderr): - os.dup2(dn, f.fileno()) - if getattr(gconf, 'pid_file', None): - if not grabpidfile(gconf.pid_file + '.tmp'): - raise GsyncdError("cannot grab temporary pidfile") - os.rename(gconf.pid_file + '.tmp', gconf.pid_file) - # wait for parent to terminate - # so we can start up with - # no messing from the dirty - # ol' bustard - select((x,), (), ()) - os.close(x) - - GLogger._gsyncd_loginit(**kw) - - -def _unlink(path): - try: - os.unlink(path) - except (OSError, IOError): - if sys.exc_info()[1].errno == ENOENT: - pass - else: - raise GsyncdError('Unlink error: %s' % path) +from logutils import setup_logging +import gsyncdconfig as gconf +from rconf import rconf +import subcmds +from conf import GLUSTERD_WORKDIR, GLUSTERFS_CONFDIR, GCONF_VERSION +from syncdutils import set_term_handler, finalize, lf +from syncdutils import log_raise_exception, FreeObject, escape +import argsupgrade + + +GSYNCD_VERSION = "gsyncd.py %s.0" % GCONF_VERSION def main(): - """main routine, signal/exception handling boilerplates""" - gconf.starttime = time.time() + rconf.starttime = time.time() + + # If old Glusterd sends commands in old format, below function + # converts the sys.argv to new format. This conversion is added + # temporarily for backward compatibility. This can be removed + # once integrated with Glusterd2 + # This modifies sys.argv globally, so rest of the code works as usual + argsupgrade.upgrade() + + # Default argparse version handler prints to stderr, which is fixed in + # 3.x series but not in 2.x, using custom parser to fix this issue + if "--version" in sys.argv: + print(GSYNCD_VERSION) + sys.exit(0) + + parser = ArgumentParser() + sp = parser.add_subparsers(dest="subcmd") + + # Monitor Status File update + p = sp.add_parser("monitor-status") + p.add_argument("master", help="Master Volume Name") + p.add_argument("slave", help="Slave details user@host::vol format") + p.add_argument("status", help="Update Monitor Status") + p.add_argument("-c", "--config-file", help="Config File") + p.add_argument("--debug", action="store_true") + + # Monitor + p = sp.add_parser("monitor") + p.add_argument("master", help="Master Volume Name") + p.add_argument("slave", help="Slave details user@host::vol format") + p.add_argument("-c", "--config-file", help="Config File") + p.add_argument("--pause-on-start", + action="store_true", + help="Start with Paused state") + p.add_argument("--local-node-id", help="Local Node ID") + p.add_argument("--debug", action="store_true") + + # Worker + p = sp.add_parser("worker") + p.add_argument("master", help="Master Volume Name") + p.add_argument("slave", help="Slave details user@host::vol format") + p.add_argument("--local-path", help="Local Brick Path") + p.add_argument("--feedback-fd", type=int, + help="feedback fd between monitor and worker") + p.add_argument("--local-node", help="Local master node") + p.add_argument("--local-node-id", help="Local Node ID") + p.add_argument("--rpc-fd", + help="Read and Write fds for worker-agent communication") + p.add_argument("--subvol-num", type=int, help="Subvolume number") + p.add_argument("--is-hottier", action="store_true", + help="Is this brick part of hot tier") + p.add_argument("--resource-remote", + help="Remote node to connect to Slave Volume") + p.add_argument("--resource-remote-id", + help="Remote node ID to connect to Slave Volume") + p.add_argument("--slave-id", help="Slave Volume ID") + p.add_argument("-c", "--config-file", help="Config File") + p.add_argument("--access-mount", action="store_true", + help="Do not umount the mount") + p.add_argument("--debug", action="store_true") + + # Agent + p = sp.add_parser("agent") + p.add_argument("master", help="Master Volume Name") + p.add_argument("slave", help="Slave details user@host::vol format") + p.add_argument("--local-path", help="Local brick path") + p.add_argument("--local-node", help="Local master node") + p.add_argument("--local-node-id", help="Local Node ID") + p.add_argument("--slave-id", help="Slave Volume ID") + p.add_argument("--rpc-fd", + help="Read and Write fds for worker-agent communication") + p.add_argument("-c", "--config-file", help="Config File") + p.add_argument("--debug", action="store_true") + + # Slave + p = sp.add_parser("slave") + p.add_argument("master", help="Master Volume Name") + p.add_argument("slave", help="Slave details user@host::vol format") + p.add_argument("--session-owner") + p.add_argument("--master-brick", + help="Master brick which is connected to the Slave") + p.add_argument("--master-node", + help="Master node which is connected to the Slave") + p.add_argument("--master-node-id", + help="Master node ID which is connected to the Slave") + p.add_argument("--local-node", help="Local Slave node") + p.add_argument("--local-node-id", help="Local Slave ID") + p.add_argument("-c", "--config-file", help="Config File") + p.add_argument("--debug", action="store_true") + + # All configurations which are configured via "slave-" options + # DO NOT add default values for these configurations, default values + # will be picked from template config file + p.add_argument("--slave-timeout", type=int, + help="Timeout to end gsyncd at Slave side") + p.add_argument("--use-rsync-xattrs", action="store_true") + p.add_argument("--slave-log-level", help="Slave Gsyncd Log level") + p.add_argument("--slave-gluster-log-level", + help="Slave Gluster mount Log level") + p.add_argument("--slave-access-mount", action="store_true", + help="Do not umount the mount") + + # Status + p = sp.add_parser("status") + p.add_argument("master", help="Master Volume Name") + p.add_argument("slave", help="Slave") + p.add_argument("-c", "--config-file", help="Config File") + p.add_argument("--local-path", help="Local Brick Path") + p.add_argument("--debug", action="store_true") + + # Config-check + p = sp.add_parser("config-check") + p.add_argument("name", help="Config Name") + p.add_argument("--value", help="Config Value") + p.add_argument("--debug", action="store_true") + + # Config-get + p = sp.add_parser("config-get") + p.add_argument("master", help="Master Volume Name") + p.add_argument("slave", help="Slave") + p.add_argument("--name", help="Config Name") + p.add_argument("-c", "--config-file", help="Config File") + p.add_argument("--debug", action="store_true") + p.add_argument("--show-defaults", action="store_true") + p.add_argument("--only-value", action="store_true") + p.add_argument("--use-underscore", action="store_true") + + # Config-set + p = sp.add_parser("config-set") + p.add_argument("master", help="Master Volume Name") + p.add_argument("slave", help="Slave") + p.add_argument("name", help="Config Name") + p.add_argument("value", help="Config Value") + p.add_argument("-c", "--config-file", help="Config File") + p.add_argument("--debug", action="store_true") + + # Config-reset + p = sp.add_parser("config-reset") + p.add_argument("master", help="Master Volume Name") + p.add_argument("slave", help="Slave") + p.add_argument("name", help="Config Name") + p.add_argument("-c", "--config-file", help="Config File") + p.add_argument("--debug", action="store_true") + + # voluuidget + p = sp.add_parser("voluuidget") + p.add_argument("host", help="Hostname") + p.add_argument("volname", help="Volume Name") + p.add_argument("--debug", action="store_true") + + # Delete + p = sp.add_parser("delete") + p.add_argument("master", help="Master Volume Name") + p.add_argument("slave", help="Slave") + p.add_argument("-c", "--config-file", help="Config File") + p.add_argument('--path', dest='paths', action="append") + p.add_argument("--reset-sync-time", action="store_true", + help="Reset Sync Time") + p.add_argument("--debug", action="store_true") + + # Parse arguments + args = parser.parse_args() + + # Extra template values, All arguments are already part of template + # variables, use this for adding extra variables + extra_tmpl_args = {} + + # Add First/Primary Slave host, user and volume + if getattr(args, "slave", None) is not None: + hostdata, slavevol = args.slave.split("::") + hostdata = hostdata.split("@") + slavehost = hostdata[-1] + slaveuser = "root" + if len(hostdata) == 2: + slaveuser = hostdata[0] + extra_tmpl_args["primary_slave_host"] = slavehost + extra_tmpl_args["slaveuser"] = slaveuser + extra_tmpl_args["slavevol"] = slavevol + + # Add Bricks encoded path + if getattr(args, "local_path", None) is not None: + extra_tmpl_args["local_id"] = escape(args.local_path) + + # Add Master Bricks encoded path(For Slave) + if getattr(args, "master_brick", None) is not None: + extra_tmpl_args["master_brick_id"] = escape(args.master_brick) + + # Load configurations + config_file = getattr(args, "config_file", None) + + # Subcmd accepts config file argument but not passed + # Set default path for config file in that case + # If an subcmd accepts config file then it also accepts + # master and Slave arguments. + if config_file is None and hasattr(args, "config_file"): + config_file = "%s/geo-replication/%s_%s_%s/gsyncd.conf" % ( + GLUSTERD_WORKDIR, + args.master, + extra_tmpl_args["primary_slave_host"], + extra_tmpl_args["slavevol"]) + + # If Config file path not exists, log error and continue using default conf + config_file_error_msg = None + if config_file is not None and not os.path.exists(config_file): + # Logging not yet initialized, create the error message to + # log later and reset the config_file to None + config_file_error_msg = lf( + "Session config file not exists, using the default config", + path=config_file) + config_file = None + + rconf.config_file = config_file + + # Load Config file + gconf.load(GLUSTERFS_CONFDIR + "/gsyncd.conf", + config_file, + vars(args), + extra_tmpl_args) + + # Default label to print in log file + label = args.subcmd + if args.subcmd in ("worker", "agent"): + # If Worker or agent, then add brick path also to label + label = "%s %s" % (args.subcmd, args.local_path) + elif args.subcmd == "slave": + # If Slave add Master node and Brick details + label = "%s %s%s" % (args.subcmd, args.master_node, args.master_brick) + + # Setup Logger + # Default log file + log_file = gconf.get("cli-log-file") + log_level = gconf.get("cli-log-level") + if getattr(args, "master", None) is not None and \ + getattr(args, "slave", None) is not None: + log_file = gconf.get("log-file") + log_level = gconf.get("log-level") + + # Use different log file location for Slave log file + if args.subcmd == "slave": + log_file = gconf.get("slave-log-file") + log_level = gconf.get("slave-log-level") + + if args.debug: + log_file = "-" + log_level = "DEBUG" + + # Create Logdir if not exists + try: + if log_file != "-": + os.mkdir(os.path.dirname(log_file)) + except OSError as e: + if e.errno != EEXIST: + raise + + setup_logging( + log_file=log_file, + level=log_level, + label=label + ) + + if config_file_error_msg is not None: + logging.warn(config_file_error_msg) + + # Log message for loaded config file + if config_file is not None: + logging.info(lf("Using session config file", path=config_file)) + set_term_handler() - GLogger.setup() excont = FreeObject(exval=0) + + # Gets the function name based on the input argument. For example + # if subcommand passed as argument is monitor then it looks for + # function with name "subcmd_monitor" in subcmds file + func = getattr(subcmds, "subcmd_" + args.subcmd.replace("-", "_"), None) + try: try: - main_i() + if func is not None: + rconf.args = args + func(args) except: log_raise_exception(excont) finally: finalize(exval=excont.exval) -def main_i(): - """internal main routine - - parse command line, decide what action will be taken; - we can either: - - query/manipulate configuration - - format gsyncd urls using gsyncd's url parsing engine - - start service in following modes, in given stages: - - agent: startup(), ChangelogAgent() - - monitor: startup(), monitor() - - master: startup(), connect_remote(), connect(), service_loop() - - slave: startup(), connect(), service_loop() - """ - rconf = {'go_daemon': 'should'} - - def store_abs(opt, optstr, val, parser): - if val and val != '-': - val = os.path.abspath(val) - setattr(parser.values, opt.dest, val) - - def store_local(opt, optstr, val, parser): - rconf[opt.dest] = val - - def store_local_curry(val): - return lambda o, oo, vx, p: store_local(o, oo, val, p) - - def store_local_obj(op, dmake): - return lambda o, oo, vx, p: store_local( - o, oo, FreeObject(op=op, **dmake(vx)), p) - - op = OptionParser( - usage="%prog [options...] <master> <slave>", version="%prog 0.0.1") - op.add_option('--gluster-command-dir', metavar='DIR', default='') - op.add_option('--gluster-log-file', metavar='LOGF', - default=os.devnull, type=str, action='callback', - callback=store_abs) - op.add_option('--gluster-log-level', metavar='LVL') - op.add_option('--changelog-log-level', metavar='LVL', default="INFO") - op.add_option('--gluster-params', metavar='PRMS', default='') - op.add_option( - '--glusterd-uuid', metavar='UUID', type=str, default='', - help=SUPPRESS_HELP) - op.add_option( - '--gluster-cli-options', metavar='OPTS', default='--log-file=-') - op.add_option('--mountbroker', metavar='LABEL') - op.add_option('-p', '--pid-file', metavar='PIDF', type=str, - action='callback', callback=store_abs) - op.add_option('-l', '--log-file', metavar='LOGF', type=str, - action='callback', callback=store_abs) - op.add_option('--iprefix', metavar='LOGD', type=str, - action='callback', callback=store_abs) - op.add_option('--changelog-log-file', metavar='LOGF', type=str, - action='callback', callback=store_abs) - op.add_option('--log-file-mbr', metavar='LOGF', type=str, - action='callback', callback=store_abs) - op.add_option('--state-file', metavar='STATF', type=str, - action='callback', callback=store_abs) - op.add_option('--state-detail-file', metavar='STATF', - type=str, action='callback', callback=store_abs) - op.add_option('--georep-session-working-dir', metavar='STATF', - type=str, action='callback', callback=store_abs) - op.add_option('--access-mount', default=False, action='store_true') - op.add_option('--ignore-deletes', default=False, action='store_true') - op.add_option('--isolated-slave', default=False, action='store_true') - op.add_option('--use-rsync-xattrs', default=False, action='store_true') - op.add_option('--sync-xattrs', default=True, action='store_true') - op.add_option('--sync-acls', default=True, action='store_true') - op.add_option('--log-rsync-performance', default=False, - action='store_true') - op.add_option('--max-rsync-retries', type=int, default=10) - # Max size of Changelogs to process per batch, Changelogs Processing is - # not limited by the number of changelogs but instead based on - # size of the changelog file, One sample changelog file size was 145408 - # with ~1000 CREATE and ~1000 DATA. 5 such files in one batch is 727040 - # If geo-rep worker crashes while processing a batch, it has to retry only - # that batch since stime will get updated after each batch. - op.add_option('--changelog-batch-size', type=int, default=727040) - op.add_option('--pause-on-start', default=False, action='store_true') - op.add_option('-L', '--log-level', metavar='LVL') - op.add_option('-r', '--remote-gsyncd', metavar='CMD', - default=os.path.abspath(sys.argv[0])) - op.add_option('--volume-id', metavar='UUID') - op.add_option('--slave-id', metavar='ID') - op.add_option('--session-owner', metavar='ID') - op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='') - op.add_option( - '--local-node', metavar='NODE', help=SUPPRESS_HELP, default='') - op.add_option( - '--local-node-id', metavar='NODEID', help=SUPPRESS_HELP, default='') - op.add_option( - '--local-path', metavar='PATH', help=SUPPRESS_HELP, default='') - op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh') - op.add_option('--ssh-port', metavar='PORT', type=int, default=22) - op.add_option('--ssh-command-tar', metavar='CMD', default='ssh') - op.add_option('--rsync-command', metavar='CMD', default='rsync') - op.add_option('--rsync-options', metavar='OPTS', default='') - op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress') - op.add_option('--rsync-opt-ignore-missing-args', default="true") - op.add_option('--rsync-opt-existing', default="true") - op.add_option('--timeout', metavar='SEC', type=int, default=120) - op.add_option('--connection-timeout', metavar='SEC', - type=int, default=60, help=SUPPRESS_HELP) - op.add_option('--sync-jobs', metavar='N', type=int, default=3) - op.add_option('--replica-failover-interval', metavar='N', - type=int, default=1) - op.add_option('--changelog-archive-format', metavar='N', - type=str, default="%Y%m") - op.add_option('--use-meta-volume', default=False, action='store_true') - op.add_option('--meta-volume-mnt', metavar='N', - type=str, default="/var/run/gluster/shared_storage") - op.add_option( - '--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) - op.add_option('--allow-network', metavar='IPS', default='') - op.add_option('--socketdir', metavar='DIR') - op.add_option('--state-socket-unencoded', metavar='SOCKF', - type=str, action='callback', callback=store_abs) - op.add_option('--checkpoint', metavar='LABEL', default='0') - - # tunables for failover/failback mechanism: - # None - gsyncd behaves as normal - # blind - gsyncd works with xtime pairs to identify - # candidates for synchronization - # wrapup - same as normal mode but does not assign - # xtimes to orphaned files - # see crawl() for usage of the above tunables - op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP) - - # changelog or xtime? (TODO: Change the default) - op.add_option( - '--change-detector', metavar='MODE', type=str, default='xtime') - # sleep interval for change detection (xtime crawl uses a hardcoded 1 - # second sleep time) - op.add_option('--change-interval', metavar='SEC', type=int, default=3) - # working directory for changelog based mechanism - op.add_option('--working-dir', metavar='DIR', type=str, - action='callback', callback=store_abs) - op.add_option('--use-tarssh', default=False, action='store_true') - - op.add_option('-c', '--config-file', metavar='CONF', - type=str, action='callback', callback=store_local) - # duh. need to specify dest or value will be mapped to None :S - op.add_option('--monitor', dest='monitor', action='callback', - callback=store_local_curry(True)) - op.add_option('--agent', dest='agent', action='callback', - callback=store_local_curry(True)) - op.add_option('--resource-local', dest='resource_local', - type=str, action='callback', callback=store_local) - op.add_option('--resource-remote', dest='resource_remote', - type=str, action='callback', callback=store_local) - op.add_option('--feedback-fd', dest='feedback_fd', type=int, - help=SUPPRESS_HELP, action='callback', callback=store_local) - op.add_option('--rpc-fd', dest='rpc_fd', type=str, help=SUPPRESS_HELP) - op.add_option('--subvol-num', dest='subvol_num', type=str, - help=SUPPRESS_HELP) - op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, - action='callback', callback=store_local_curry(True)) - op.add_option('-N', '--no-daemon', dest="go_daemon", - action='callback', callback=store_local_curry('dont')) - op.add_option('--verify', type=str, dest="verify", - action='callback', callback=store_local) - op.add_option('--slavevoluuid-get', type=str, dest="slavevoluuid_get", - action='callback', callback=store_local) - op.add_option('--create', type=str, dest="create", - action='callback', callback=store_local) - op.add_option('--delete', dest='delete', action='callback', - callback=store_local_curry(True)) - op.add_option('--path-list', dest='path_list', action='callback', - type=str, callback=store_local) - op.add_option('--reset-sync-time', default=False, action='store_true') - op.add_option('--status-get', dest='status_get', action='callback', - callback=store_local_curry(True)) - op.add_option('--debug', dest="go_daemon", action='callback', - callback=lambda *a: (store_local_curry('dont')(*a), - setattr( - a[-1].values, 'log_file', '-'), - setattr(a[-1].values, 'log_level', - 'DEBUG'), - setattr(a[-1].values, - 'changelog_log_file', '-'))) - op.add_option('--path', type=str, action='append') - - for a in ('check', 'get'): - op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', - action='callback', - callback=store_local_obj(a, lambda vx: {'opt': vx})) - op.add_option('--config-get-all', dest='config', action='callback', - callback=store_local_obj('get', lambda vx: {'opt': None})) - for m in ('', '-rx', '-glob'): - # call this code 'Pythonic' eh? - # have to define a one-shot local function to be able - # to inject (a value depending on the) - # iteration variable into the inner lambda - def conf_mod_opt_regex_variant(rx): - op.add_option('--config-set' + m, metavar='OPT VAL', type=str, - nargs=2, dest='config', action='callback', - callback=store_local_obj('set', lambda vx: { - 'opt': vx[0], 'val': vx[1], 'rx': rx})) - op.add_option('--config-del' + m, metavar='OPT', type=str, - dest='config', action='callback', - callback=store_local_obj('del', lambda vx: { - 'opt': vx, 'rx': rx})) - conf_mod_opt_regex_variant(m and m[1:] or False) - - op.add_option('--normalize-url', dest='url_print', - action='callback', callback=store_local_curry('normal')) - op.add_option('--canonicalize-url', dest='url_print', - action='callback', callback=store_local_curry('canon')) - op.add_option('--canonicalize-escape-url', dest='url_print', - action='callback', callback=store_local_curry('canon_esc')) - op.add_option('--is-hottier', default=False, action='store_true') - - tunables = [norm(o.get_opt_string()[2:]) - for o in op.option_list - if (o.callback in (store_abs, 'store_true', None) and - o.get_opt_string() not in ('--version', '--help'))] - remote_tunables = ['listen', 'go_daemon', 'timeout', - 'session_owner', 'config_file', 'use_rsync_xattrs', - 'local_id', 'local_node', 'access_mount'] - rq_remote_tunables = {'listen': True} - - # precedence for sources of values: 1) commandline, 2) cfg file, 3) - # defaults for this to work out we need to tell apart defaults from - # explicitly set options... so churn out the defaults here and call - # the parser with virgin values container. - defaults = op.get_default_values() - opts, args = op.parse_args(values=optparse.Values()) - # slave url cleanup, if input comes with vol uuid as follows - # 'ssh://fvm1::gv2:07dfddca-94bb-4841-a051-a7e582811467' - temp_args = [] - for arg in args: - # Split based on :: - data = arg.split("::") - if len(data)>1: - slavevol_name = data[1].split(":")[0] - temp_args.append("%s::%s" % (data[0], slavevol_name)) - else: - temp_args.append(data[0]) - args = temp_args - args_orig = args[:] - - voluuid_get = rconf.get('slavevoluuid_get') - if voluuid_get: - slave_host, slave_vol = voluuid_get.split("::") - svol_uuid = slave_vol_uuid_get(slave_host, slave_vol) - print svol_uuid - return - - r = rconf.get('resource_local') - if r: - if len(args) == 0: - args.append(None) - args[0] = r - r = rconf.get('resource_remote') - if r: - if len(args) == 0: - raise GsyncdError('local resource unspecfied') - elif len(args) == 1: - args.append(None) - args[1] = r - confdata = rconf.get('config') - if not (len(args) == 2 or - (len(args) == 1 and rconf.get('listen')) or - (len(args) <= 2 and confdata) or - rconf.get('url_print')): - sys.stderr.write("error: incorrect number of arguments\n\n") - sys.stderr.write(op.get_usage() + "\n") - sys.exit(1) - - verify = rconf.get('verify') - if verify: - logging.info(verify) - logging.info("Able to spawn gsyncd.py") - return - - restricted = os.getenv('_GSYNCD_RESTRICTED_') - - if restricted: - allopts = {} - allopts.update(opts.__dict__) - allopts.update(rconf) - bannedtuns = set(allopts.keys()) - set(remote_tunables) - if bannedtuns: - raise GsyncdError('following tunables cannot be set with ' - 'restricted SSH invocaton: ' + - ', '.join(bannedtuns)) - for k, v in rq_remote_tunables.items(): - if not k in allopts or allopts[k] != v: - raise GsyncdError('tunable %s is not set to value %s required ' - 'for restricted SSH invocaton' % - (k, v)) - - confrx = getattr(confdata, 'rx', None) - - def makersc(aa, check=True): - if not aa: - return ([], None, None) - ra = [resource.parse_url(u) for u in aa] - local = ra[0] - remote = None - if len(ra) > 1: - remote = ra[1] - if check and not local.can_connect_to(remote): - raise GsyncdError("%s cannot work with %s" % - (local.path, remote and remote.path)) - return (ra, local, remote) - if confrx: - # peers are regexen, don't try to parse them - if confrx == 'glob': - args = ['\A' + fnmatch.translate(a) for a in args] - canon_peers = args - namedict = {} - else: - dc = rconf.get('url_print') - rscs, local, remote = makersc(args_orig, not dc) - if dc: - for r in rscs: - print(r.get_url(**{'normal': {}, - 'canon': {'canonical': True}, - 'canon_esc': {'canonical': True, - 'escaped': True}}[dc])) - return - pa = ([], [], []) - urlprms = ( - {}, {'canonical': True}, {'canonical': True, 'escaped': True}) - for x in rscs: - for i in range(len(pa)): - pa[i].append(x.get_url(**urlprms[i])) - _, canon_peers, canon_esc_peers = pa - # creating the namedict, a dict representing various ways of referring - # to / repreenting peers to be fillable in config templates - mods = (lambda x: x, lambda x: x[ - 0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:]) - if remote: - rmap = {local: ('local', 'master'), remote: ('remote', 'slave')} - else: - rmap = {local: ('local', 'slave')} - namedict = {} - for i in range(len(rscs)): - x = rscs[i] - for name in rmap[x]: - for j in range(3): - namedict[mods[j](name)] = pa[j][i] - namedict[name + 'vol'] = x.volume - if name == 'remote': - namedict['remotehost'] = x.remotehost - - if not 'config_file' in rconf: - rconf['config_file'] = TMPL_CONFIG_FILE - - # Upgrade Config File only if it is session conf file - if rconf['config_file'] != TMPL_CONFIG_FILE: - upgrade_config_file(rconf['config_file'], confdata) - - gcnf = GConffile( - rconf['config_file'], canon_peers, confdata, - defaults.__dict__, opts.__dict__, namedict) - - conf_change = False - if confdata: - opt_ok = norm(confdata.opt) in tunables + [None] - if confdata.op == 'check': - if opt_ok: - sys.exit(0) - else: - sys.exit(1) - elif not opt_ok: - raise GsyncdError("not a valid option: " + confdata.opt) - if confdata.op == 'get': - gcnf.get(confdata.opt) - elif confdata.op == 'set': - gcnf.set(confdata.opt, confdata.val, confdata.rx) - elif confdata.op == 'del': - gcnf.delete(confdata.opt, confdata.rx) - # when modifying checkpoint, it's important to make a log - # of that, so in that case we go on to set up logging even - # if its just config invocation - if confdata.op in ('set', 'del') and not confdata.rx: - conf_change = True - - if not conf_change: - return - - gconf.__dict__.update(defaults.__dict__) - gcnf.update_to(gconf.__dict__) - gconf.__dict__.update(opts.__dict__) - gconf.configinterface = gcnf - - delete = rconf.get('delete') - if delete: - logging.info('geo-replication delete') - # remove the stime xattr from all the brick paths so that - # a re-create of a session will start sync all over again - stime_xattr_name = getattr(gconf, 'master.stime_xattr_name', None) - - # Delete pid file, status file, socket file - cleanup_paths = [] - if getattr(gconf, 'pid_file', None): - cleanup_paths.append(gconf.pid_file) - - if getattr(gconf, 'state_file', None): - cleanup_paths.append(gconf.state_file) - - if getattr(gconf, 'state_detail_file', None): - cleanup_paths.append(gconf.state_detail_file) - - if getattr(gconf, 'state_socket_unencoded', None): - cleanup_paths.append(gconf.state_socket_unencoded) - - cleanup_paths.append(rconf['config_file'][:-11] + "*") - - # Cleanup changelog working dirs - if getattr(gconf, 'working_dir', None): - try: - shutil.rmtree(gconf.working_dir) - except (IOError, OSError): - if sys.exc_info()[1].errno == ENOENT: - pass - else: - raise GsyncdError( - 'Error while removing working dir: %s' % - gconf.working_dir) - - for path in cleanup_paths: - # To delete temp files - for f in glob.glob(path + "*"): - _unlink(f) - - reset_sync_time = boolify(gconf.reset_sync_time) - if reset_sync_time and stime_xattr_name: - path_list = rconf.get('path_list') - paths = [] - for p in path_list.split('--path='): - stripped_path = p.strip() - if stripped_path != "": - # set stime to (0,0) to trigger full volume content resync - # to slave on session recreation - # look at master.py::Xcrawl hint: zero_zero - Xattr.lsetxattr(stripped_path, stime_xattr_name, - struct.pack("!II", 0, 0)) - - return - - if restricted and gconf.allow_network: - ssh_conn = os.getenv('SSH_CONNECTION') - if not ssh_conn: - # legacy env var - ssh_conn = os.getenv('SSH_CLIENT') - if ssh_conn: - allowed_networks = [IPNetwork(a) - for a in gconf.allow_network.split(',')] - client_ip = IPAddress(ssh_conn.split()[0]) - allowed = False - for nw in allowed_networks: - if client_ip in nw: - allowed = True - break - if not allowed: - raise GsyncdError("client IP address is not allowed") - - ffd = rconf.get('feedback_fd') - if ffd: - fcntl.fcntl(ffd, fcntl.F_SETFD, fcntl.FD_CLOEXEC) - - # normalize loglevel - lvl0 = gconf.log_level - if isinstance(lvl0, str): - lvl1 = lvl0.upper() - lvl2 = logging.getLevelName(lvl1) - # I have _never_ _ever_ seen such an utterly braindead - # error condition - if lvl2 == "Level " + lvl1: - raise GsyncdError('cannot recognize log level "%s"' % lvl0) - gconf.log_level = lvl2 - - if not privileged() and gconf.log_file_mbr: - gconf.log_file = gconf.log_file_mbr - - if conf_change: - try: - GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf') - gconf.log_exit = False - - if confdata.op == 'set': - if confdata.opt == 'checkpoint': - logging.info(lf("Checkpoint Set", - time=human_time_utc(confdata.val))) - else: - logging.info(lf("Config Set", - config=confdata.opt, - value=confdata.val)) - elif confdata.op == 'del': - if confdata.opt == 'checkpoint': - logging.info("Checkpoint Reset") - else: - logging.info(lf("Config Reset", - config=confdata.opt)) - except IOError: - if sys.exc_info()[1].errno == ENOENT: - # directory of log path is not present, - # which happens if we get here from - # a peer-multiplexed "config-set checkpoint" - # (as that directory is created only on the - # original node) - pass - else: - raise - return - - create = rconf.get('create') - if create: - if getattr(gconf, 'state_file', None): - set_monitor_status(gconf.state_file, create) - - try: - GLogger._gsyncd_loginit(log_file=gconf.log_file, label='monitor') - gconf.log_exit = False - logging.info(lf("Monitor Status Change", - status=create)) - except IOError: - if sys.exc_info()[1].errno == ENOENT: - # If log dir not present - pass - else: - raise - return - - go_daemon = rconf['go_daemon'] - be_monitor = rconf.get('monitor') - be_agent = rconf.get('agent') - - rscs, local, remote = makersc(args) - - status_get = rconf.get('status_get') - if status_get: - master_name, slave_data = get_master_and_slave_data_from_args(args) - for brick in gconf.path: - brick_status = GeorepStatus(gconf.state_file, - gconf.local_node, - brick, - gconf.local_node_id, - master_name, - slave_data, - getattr(gconf, "pid_file", None)) - checkpoint_time = int(getattr(gconf, "checkpoint", "0")) - brick_status.print_status(checkpoint_time=checkpoint_time) - return - - if not be_monitor and isinstance(remote, resource.SSH) and \ - go_daemon == 'should': - go_daemon = 'postconn' - log_file = None - else: - log_file = gconf.log_file - if be_monitor: - label = 'monitor' - elif be_agent: - label = gconf.local_path - elif remote: - # master - label = gconf.local_path - else: - label = 'slave' - startup(go_daemon=go_daemon, log_file=log_file, label=label) - Popen.init_errhandler() - - if be_agent: - os.setsid() - logging.debug(lf("RPC FD", - rpc_fd=repr(gconf.rpc_fd))) - return agent(Changelog(), gconf.rpc_fd) - - if be_monitor: - return monitor(*rscs) - - if remote: - go_daemon = remote.connect_remote(go_daemon=go_daemon) - if go_daemon: - startup(go_daemon=go_daemon, log_file=gconf.log_file) - # complete remote connection in child - remote.connect_remote(go_daemon='done') - local.connect() - if ffd: - logging.info("Closing feedback fd, waking up the monitor") - os.close(ffd) - local.service_loop(*[r for r in [remote] if r]) - - if __name__ == "__main__": main() diff --git a/geo-replication/syncdaemon/gsyncdconfig.py b/geo-replication/syncdaemon/gsyncdconfig.py new file mode 100644 index 00000000000..5aa354b8e6a --- /dev/null +++ b/geo-replication/syncdaemon/gsyncdconfig.py @@ -0,0 +1,365 @@ +from ConfigParser import ConfigParser, NoSectionError +import os +from string import Template +from datetime import datetime + + +# Global object which can be used in other modules +# once load_config is called +_gconf = {} + + +class GconfNotConfigurable(Exception): + pass + + +class GconfInvalidValue(Exception): + pass + + +class Gconf(object): + def __init__(self, default_conf_file, custom_conf_file=None, + args={}, extra_tmpl_args={}): + self.default_conf_file = default_conf_file + self.custom_conf_file = custom_conf_file + self.tmp_conf_file = None + self.gconf = {} + self.gconfdata = {} + self.gconf_typecast = {} + self.template_conf = [] + self.non_configurable_configs = [] + self.prev_mtime = 0 + if custom_conf_file is not None: + self.tmp_conf_file = custom_conf_file + ".tmp" + + self.session_conf_items = [] + self.args = args + self.extra_tmpl_args = extra_tmpl_args + self._load() + + def _tmpl_substitute(self): + tmpl_values = {} + for k, v in self.gconf.items(): + tmpl_values[k.replace("-", "_")] = v + + # override the config file values with the one user passed + for k, v in self.args.items(): + # override the existing value only if set by user + if v is not None: + tmpl_values[k] = v + + for k, v in self.extra_tmpl_args.items(): + tmpl_values[k] = v + + for k, v in self.gconf.items(): + if k in self.template_conf and \ + (isinstance(v, str) or isinstance(v, unicode)): + self.gconf[k] = Template(v).safe_substitute(tmpl_values) + + def _do_typecast(self): + for k, v in self.gconf.items(): + cast_func = globals().get( + "to_" + self.gconf_typecast.get(k, "string"), None) + if cast_func is not None: + self.gconf[k] = cast_func(v) + + def reset(self, name): + # If custom conf file is not set then it is only read only configs + if self.custom_conf_file is None: + raise GconfNotConfigurable() + + # If a config can not be modified + if name != "all" and not self._is_configurable(name): + raise GconfNotConfigurable() + + cnf = ConfigParser() + with open(self.custom_conf_file) as f: + cnf.readfp(f) + + # Nothing to Reset, Not configured + if name != "all": + if not cnf.has_option("vars", name): + return True + + # Remove option from custom conf file + cnf.remove_option("vars", name) + else: + # Remove and add empty section, do not disturb if config file + # already has any other section + try: + cnf.remove_section("vars") + except NoSectionError: + pass + + cnf.add_section("vars") + + with open(self.tmp_conf_file, "w") as fw: + cnf.write(fw) + + os.rename(self.tmp_conf_file, self.custom_conf_file) + + self.reload() + + return True + + def set(self, name, value): + if self.custom_conf_file is None: + raise GconfNotConfigurable() + + if not self._is_configurable(name): + raise GconfNotConfigurable() + + if not self._is_valid_value(name, value): + raise GconfInvalidValue() + + curr_val = self.gconf.get(name, None) + if curr_val == value: + return True + + cnf = ConfigParser() + with open(self.custom_conf_file) as f: + cnf.readfp(f) + + if not cnf.has_section("vars"): + cnf.add_section("vars") + + cnf.set("vars", name, value) + with open(self.tmp_conf_file, "w") as fw: + cnf.write(fw) + + os.rename(self.tmp_conf_file, self.custom_conf_file) + + self.reload() + + return True + + def check(self, name, value=None, with_conffile=True): + if with_conffile and self.custom_conf_file is None: + raise GconfNotConfigurable() + + if not self._is_configurable(name): + raise GconfNotConfigurable() + + if value is not None and not self._is_valid_value(name, value): + raise GconfInvalidValue() + + def _load(self): + self.gconf = {} + self.template_conf = [] + self.gconf_typecast = {} + self.non_configurable_configs = [] + self.session_conf_items = [] + + conf = ConfigParser() + # Default Template config file + with open(self.default_conf_file) as f: + conf.readfp(f) + + # Custom Config file + if self.custom_conf_file is not None: + with open(self.custom_conf_file) as f: + conf.readfp(f) + + # Get version from default conf file + self.version = conf.get("__meta__", "version") + + # Populate default values + for sect in conf.sections(): + if sect in ["__meta__", "vars"]: + continue + + # Collect list of available options with help details + self.gconfdata[sect] = {} + for k, v in conf.items(sect): + self.gconfdata[sect][k] = v.strip() + + # Collect the Type cast information + if conf.has_option(sect, "type"): + self.gconf_typecast[sect] = conf.get(sect, "type") + + # Prepare list of configurable conf + if conf.has_option(sect, "configurable"): + if conf.get(sect, "configurable").lower() == "false": + self.non_configurable_configs.append(sect) + + # if it is a template conf value which needs to be substituted + if conf.has_option(sect, "template"): + if conf.get(sect, "template").lower().strip() == "true": + self.template_conf.append(sect) + + # Set default values + if conf.has_option(sect, "value"): + self.gconf[sect] = conf.get(sect, "value").strip() + + # Load the custom conf elements and overwrite + if conf.has_section("vars"): + for k, v in conf.items("vars"): + self.session_conf_items.append(k) + self.gconf[k] = v.strip() + + self._tmpl_substitute() + self._do_typecast() + + def reload(self): + if self._is_config_changed(): + self._load() + + def get(self, name, default_value=None): + return self.gconf.get(name, default_value) + + def getall(self, show_defaults=False, show_non_configurable=False): + cnf = {} + if not show_defaults: + for k in self.session_conf_items: + if k not in self.non_configurable_configs: + cnf[k] = self.get(k) + + return cnf + + # Show all configs including defaults + for k, v in self.gconf.items(): + if show_non_configurable: + cnf[k] = v + else: + if k not in self.non_configurable_configs: + cnf[k] = v + + return cnf + + def getr(self, name, default_value=None): + self.reload() + return self.get(name, default_value) + + def get_help(self, name=None): + pass + + def _is_configurable(self, name): + item = self.gconfdata.get(name, None) + if item is None: + return False + + return item.get("configurable", True) + + def _is_valid_value(self, name, value): + item = self.gconfdata.get(name, None) + if item is None: + return False + + # If validation func not defined + if item.get("validation", None) is None: + return True + + # minmax validation + if item["validation"] == "minmax": + return validate_minmax(value, item["min"], item["max"]) + + if item["validation"] == "choice": + return validate_choice(value, item["allowed_values"]) + + if item["validation"] == "bool": + return validate_bool(value) + + if item["validation"] == "execpath": + return validate_execpath(value) + + if item["validation"] == "unixtime": + return validate_unixtime(value) + + return False + + def _is_config_changed(self): + if self.custom_conf_file is not None and \ + os.path.exists(self.custom_conf_file): + st = os.lstat(self.custom_conf_file) + if st.st_mtime > self.prev_mtime: + self.prev_mtime = st.st_mtime + return True + + return False + + +def validate_unixtime(value): + try: + y = datetime.fromtimestamp(int(value)).strftime("%Y") + if y == "1970": + return False + + return True + except ValueError: + return False + + +def validate_minmax(value, minval, maxval): + value = int(value) + minval = int(minval) + maxval = int(maxval) + + return value >= minval and value <= maxval + + +def validate_choice(value, allowed_values): + allowed_values = allowed_values.split(",") + allowed_values = [v.strip() for v in allowed_values] + + return value in allowed_values + + +def validate_bool(value): + return value in ["true", "false"] + + +def validate_execpath(value): + return os.path.isfile(value) and os.access(value, os.X_OK) + + +def validate_filepath(value): + return os.path.isfile(value) + + +def validate_path(value): + return os.path.exists(value) + + +def to_int(value): + return int(value) + + +def to_float(value): + return float(value) + + +def to_bool(value): + return True if value == "true" else False + + +def get(name, default_value=None): + return _gconf.get(name, default_value) + + +def getall(show_defaults=False, show_non_configurable=False): + return _gconf.getall(show_defaults=show_defaults, + show_non_configurable=show_non_configurable) + + +def getr(name, default_value=None): + return _gconf.getr(name, default_value) + + +def load(default_conf, custom_conf=None, args={}, extra_tmpl_args={}): + global _gconf + _gconf = Gconf(default_conf, custom_conf, args, extra_tmpl_args) + + +def setconfig(name, value): + global _gconf + _gconf.set(name, value) + + +def resetconfig(name): + global _gconf + _gconf.reset(name) + + +def check(name, value=None, with_conffile=True): + global _gconf + _gconf.check(name, value=value, with_conffile=with_conffile) diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py index d87b56cd941..334f5e9eaf2 100644 --- a/geo-replication/syncdaemon/libgfchangelog.py +++ b/geo-replication/syncdaemon/libgfchangelog.py @@ -9,13 +9,15 @@ # import os -from ctypes import CDLL, RTLD_GLOBAL, create_string_buffer, get_errno, byref, c_ulong +from ctypes import CDLL, RTLD_GLOBAL, create_string_buffer, \ + get_errno, byref, c_ulong from ctypes.util import find_library from syncdutils import ChangelogException, ChangelogHistoryNotAvailable class Changes(object): - libgfc = CDLL(find_library("gfchangelog"), mode=RTLD_GLOBAL, use_errno=True) + libgfc = CDLL(find_library("gfchangelog"), mode=RTLD_GLOBAL, + use_errno=True) @classmethod def geterrno(cls): diff --git a/geo-replication/syncdaemon/logutils.py b/geo-replication/syncdaemon/logutils.py new file mode 100644 index 00000000000..f00685cd92c --- /dev/null +++ b/geo-replication/syncdaemon/logutils.py @@ -0,0 +1,66 @@ +import logging +from logging import Logger, handlers +import sys +import time + + +class GLogger(Logger): + + """Logger customizations for gsyncd. + + It implements a log format similar to that of glusterfs. + """ + + def makeRecord(self, name, level, *a): + rv = Logger.makeRecord(self, name, level, *a) + rv.nsecs = (rv.created - int(rv.created)) * 1000000 + fr = sys._getframe(4) + callee = fr.f_locals.get('self') + if callee: + ctx = str(type(callee)).split("'")[1].split('.')[-1] + else: + ctx = '<top>' + if not hasattr(rv, 'funcName'): + rv.funcName = fr.f_code.co_name + rv.lvlnam = logging.getLevelName(level)[0] + rv.ctx = ctx + return rv + + +LOGFMT = ("[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s{0}" + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s") + + +def setup_logging(level="INFO", label="", log_file=""): + if label: + label = "(" + label + ")" + + filename = None + stream = None + if log_file: + if log_file in ('-', '/dev/stderr'): + stream = sys.stderr + elif log_file == '/dev/stdout': + stream = sys.stdout + else: + filename = log_file + + datefmt = "%Y-%m-%d %H:%M:%S" + fmt = LOGFMT.format(label) + logging.root = GLogger("root", level) + logging.setLoggerClass(GLogger) + logging.Formatter.converter = time.gmtime # Log in GMT/UTC time + logging.getLogger().handlers = [] + logging.getLogger().setLevel(level) + + if filename is not None: + logging_handler = handlers.WatchedFileHandler(filename) + formatter = logging.Formatter(fmt=fmt, + datefmt=datefmt) + logging_handler.setFormatter(formatter) + logging.getLogger().addHandler(logging_handler) + else: + logging.basicConfig(stream=stream, + format=fmt, + datefmt=datefmt, + level=level) diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 4c1a529a3ed..552c4deec44 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -12,7 +12,6 @@ import os import sys import time import stat -import json import logging import fcntl import string @@ -21,9 +20,11 @@ import tarfile from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN, ESTALE, EINTR from threading import Condition, Lock from datetime import datetime -from gconf import gconf -from syncdutils import Thread, GsyncdError, boolify, escape_space_newline -from syncdutils import unescape_space_newline, gauxpfx, md5hex, selfkill + +import gsyncdconfig as gconf +from rconf import rconf +from syncdutils import Thread, GsyncdError, escape_space_newline +from syncdutils import unescape_space_newline, gauxpfx, escape from syncdutils import lstat, errno_wrap, FreeObject, lf, matching_disk_gfid from syncdutils import NoStimeAvailable, PartialHistoryAvailable @@ -85,24 +86,41 @@ def gmaster_builder(excrawl=None): """produce the GMaster class variant corresponding to sync mode""" this = sys.modules[__name__] - modemixin = gconf.special_sync_mode + modemixin = gconf.get("special-sync-mode") if not modemixin: modemixin = 'normal' - changemixin = 'xsync' if gconf.change_detector == 'xsync' \ - else excrawl or gconf.change_detector + + if gconf.get("change-detector") == 'xsync': + changemixin = 'xsync' + elif excrawl: + changemixin = excrawl + else: + changemixin = gconf.get("change-detector") + logging.debug(lf('setting up change detection mode', mode=changemixin)) modemixin = getattr(this, modemixin.capitalize() + 'Mixin') crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin') - sendmarkmixin = boolify( - gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin - purgemixin = boolify( - gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin - syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine + + if gconf.get("use-rsync-xattrs"): + sendmarkmixin = SendmarkRsyncMixin + else: + sendmarkmixin = SendmarkNormalMixin + + if gconf.get("ignore-deletes"): + purgemixin = PurgeNoopMixin + else: + purgemixin = PurgeNormalMixin + + if gconf.get("sync-method") == "tarssh": + syncengine = TarSSHEngine + else: + syncengine = RsyncEngine class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin, syncengine): pass + return _GMaster @@ -139,9 +157,9 @@ class NormalMixin(object): return xt0 >= xt1 def make_xtime_opts(self, is_master, opts): - if not 'create' in opts: + if 'create' not in opts: opts['create'] = is_master - if not 'default_xtime' in opts: + if 'default_xtime' not in opts: opts['default_xtime'] = URXTIME def xtime_low(self, rsc, path, **opts): @@ -212,9 +230,9 @@ class RecoverMixin(NormalMixin): @staticmethod def make_xtime_opts(is_master, opts): - if not 'create' in opts: + if 'create' not in opts: opts['create'] = False - if not 'default_xtime' in opts: + if 'default_xtime' not in opts: opts['default_xtime'] = URXTIME def keepalive_payload_hook(self, timo, gap): @@ -385,7 +403,7 @@ class GMasterCommon(object): self.master = master self.slave = slave self.jobtab = {} - if boolify(gconf.use_tarssh): + if gconf.get("sync-method") == "tarssh": self.syncer = Syncer(slave, self.slave.tarssh, [2]) else: # partial transfer (cf. rsync(1)), that's normal @@ -401,7 +419,7 @@ class GMasterCommon(object): # 0. self.crawls = 0 self.turns = 0 - self.total_turns = int(gconf.turns) + self.total_turns = rconf.turns self.crawl_start = datetime.now() self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} self.start = None @@ -414,7 +432,7 @@ class GMasterCommon(object): def init_keep_alive(cls): """start the keep-alive thread """ - timo = int(gconf.timeout or 0) + timo = gconf.get("slave-timeout", 0) if timo > 0: def keep_alive(): while True: @@ -427,28 +445,28 @@ class GMasterCommon(object): def mgmt_lock(self): """Take management volume lock """ - if gconf.mgmt_lock_fd: + if rconf.mgmt_lock_fd: try: - fcntl.lockf(gconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) - if not gconf.active_earlier: - gconf.active_earlier = True + fcntl.lockf(rconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + if not rconf.active_earlier: + rconf.active_earlier = True logging.info(lf("Got lock Becoming ACTIVE", - brick=gconf.local_path)) + brick=rconf.args.local_path)) return True except: ex = sys.exc_info()[1] if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): - if not gconf.passive_earlier: - gconf.passive_earlier = True + if not rconf.passive_earlier: + rconf.passive_earlier = True logging.info(lf("Didn't get lock Becoming PASSIVE", - brick=gconf.local_path)) + brick=rconf.local_path)) return False raise fd = None - bname = str(self.uuid) + "_" + str(gconf.slave_id) + "_subvol_" \ - + str(gconf.subvol_num) + ".lock" - mgmt_lock_dir = os.path.join(gconf.meta_volume_mnt, "geo-rep") + bname = str(self.uuid) + "_" + rconf.args.slave_id + "_subvol_" \ + + str(rconf.args.subvol_num) + ".lock" + mgmt_lock_dir = os.path.join(gconf.get("meta-volume-mnt"), "geo-rep") path = os.path.join(mgmt_lock_dir, bname) logging.debug(lf("lock file path", path=path)) try: @@ -471,30 +489,30 @@ class GMasterCommon(object): try: fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # Save latest FD for future use - gconf.mgmt_lock_fd = fd + rconf.mgmt_lock_fd = fd except: ex = sys.exc_info()[1] if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): # cannot grab, it's taken - if not gconf.passive_earlier: - gconf.passive_earlier = True + if not rconf.passive_earlier: + rconf.passive_earlier = True logging.info(lf("Didn't get lock Becoming PASSIVE", - brick=gconf.local_path)) - gconf.mgmt_lock_fd = fd + brick=rconf.args.local_path)) + rconf.mgmt_lock_fd = fd return False raise - if not gconf.active_earlier: - gconf.active_earlier = True + if not rconf.active_earlier: + rconf.active_earlier = True logging.info(lf("Got lock Becoming ACTIVE", - brick=gconf.local_path)) + brick=rconf.args.local_path)) return True def should_crawl(self): - if not boolify(gconf.use_meta_volume): - return gconf.glusterd_uuid in self.master.server.node_uuid() + if not gconf.get("use-meta-volume"): + return rconf.args.local_node_id in self.master.server.node_uuid() - if not os.path.ismount(gconf.meta_volume_mnt): + if not os.path.ismount(gconf.get("meta-volume-mnt")): logging.error("Meta-volume is not mounted. Worker Exiting...") sys.exit(1) return self.mgmt_lock() @@ -532,7 +550,7 @@ class GMasterCommon(object): logging.debug("%s master with volume id %s ..." % (inter_master and "intermediate" or "primary", self.uuid)) - gconf.configinterface.set('volume_id', self.uuid) + rconf.volume_id = self.uuid if self.volinfo: if self.volinfo['retval']: logging.warn(lf("master cluster's info may not be valid", @@ -557,7 +575,7 @@ class GMasterCommon(object): turns=self.turns, time=self.start) t1 = time.time() - if int(t1 - t0) >= int(gconf.replica_failover_interval): + if int(t1 - t0) >= gconf.get("replica-failover-interval"): crawl = self.should_crawl() t0 = t1 self.update_worker_remote_node() @@ -567,7 +585,7 @@ class GMasterCommon(object): # which is min of cluster (but max of the replicas) brick_stime = self.xtime('.', self.slave) cluster_stime = self.master.server.aggregated.stime_mnt( - '.', '.'.join([str(self.uuid), str(gconf.slave_id)])) + '.', '.'.join([str(self.uuid), rconf.args.slave_id])) logging.debug(lf("Crawl info", cluster_stime=cluster_stime, brick_stime=brick_stime)) @@ -675,6 +693,7 @@ class XCrawlMetadata(object): self.st_atime = float(st_atime) self.st_mtime = float(st_mtime) + class GMasterChangelogMixin(GMasterCommon): """ changelog based change detection and syncing """ @@ -701,34 +720,34 @@ class GMasterChangelogMixin(GMasterCommon): def init_fop_batch_stats(self): self.batch_stats = { - "CREATE":0, - "MKNOD":0, - "UNLINK":0, - "MKDIR":0, - "RMDIR":0, - "LINK":0, - "SYMLINK":0, - "RENAME":0, - "SETATTR":0, - "SETXATTR":0, - "XATTROP":0, - "DATA":0, - "ENTRY_SYNC_TIME":0, - "META_SYNC_TIME":0, - "DATA_START_TIME":0 + "CREATE": 0, + "MKNOD": 0, + "UNLINK": 0, + "MKDIR": 0, + "RMDIR": 0, + "LINK": 0, + "SYMLINK": 0, + "RENAME": 0, + "SETATTR": 0, + "SETXATTR": 0, + "XATTROP": 0, + "DATA": 0, + "ENTRY_SYNC_TIME": 0, + "META_SYNC_TIME": 0, + "DATA_START_TIME": 0 } def update_fop_batch_stats(self, ty): if ty in ['FSETXATTR']: - ty = 'SETXATTR' - self.batch_stats[ty] = self.batch_stats.get(ty,0) + 1 + ty = 'SETXATTR' + self.batch_stats[ty] = self.batch_stats.get(ty, 0) + 1 def archive_and_purge_changelogs(self, changelogs): # Creates tar file instead of tar.gz, since changelogs will # be appended to existing tar. archive name is # archive_<YEAR><MONTH>.tar archive_name = "archive_%s.tar" % datetime.today().strftime( - gconf.changelog_archive_format) + gconf.get("changelog-archive-format")) try: tar = tarfile.open(os.path.join(self.processed_changelogs_dir, @@ -764,13 +783,9 @@ class GMasterChangelogMixin(GMasterCommon): else: raise - def fallback_xsync(self): - logging.info('falling back to xsync mode') - gconf.configinterface.set('change-detector', 'xsync') - selfkill() - def setup_working_dir(self): - workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path)) + workdir = os.path.join(gconf.get("working-dir"), + escape(rconf.args.local_path)) logging.debug('changelog working dir %s' % workdir) return workdir @@ -804,27 +819,30 @@ class GMasterChangelogMixin(GMasterCommon): logging.info(lf('Fixing gfid mismatch in slave. Deleting' ' the entry', retry_count=retry_count, entry=repr(failure))) - #Add deletion to fix_entry_ops list + # Add deletion to fix_entry_ops list if failure[2]['slave_isdir']: - fix_entry_ops.append(edct('RMDIR', - gfid=failure[2]['slave_gfid'], - entry=pbname)) + fix_entry_ops.append( + edct('RMDIR', + gfid=failure[2]['slave_gfid'], + entry=pbname)) else: - fix_entry_ops.append(edct('UNLINK', - gfid=failure[2]['slave_gfid'], - entry=pbname)) + fix_entry_ops.append( + edct('UNLINK', + gfid=failure[2]['slave_gfid'], + entry=pbname)) elif not isinstance(st, int): - #The file exists on master but with different name. - #Probabaly renamed and got missed during xsync crawl. + # The file exists on master but with different name. + # Probabaly renamed and got missed during xsync crawl. if failure[2]['slave_isdir']: logging.info(lf('Fixing gfid mismatch in slave', retry_count=retry_count, entry=repr(failure))) - realpath = os.readlink(os.path.join(gconf.local_path, - ".glusterfs", - slave_gfid[0:2], - slave_gfid[2:4], - slave_gfid)) + realpath = os.readlink(os.path.join( + rconf.args.local_path, + ".glusterfs", + slave_gfid[0:2], + slave_gfid[2:4], + slave_gfid)) dst_entry = os.path.join(pfx, realpath.split('/')[-2], realpath.split('/')[-1]) rename_dict = edct('RENAME', gfid=slave_gfid, @@ -840,19 +858,20 @@ class GMasterChangelogMixin(GMasterCommon): ' Deleting the entry', retry_count=retry_count, entry=repr(failure))) - fix_entry_ops.append(edct('UNLINK', - gfid=failure[2]['slave_gfid'], - entry=pbname)) + fix_entry_ops.append( + edct('UNLINK', + gfid=failure[2]['slave_gfid'], + entry=pbname)) logging.error(lf('Entry cannot be fixed in slave due ' 'to GFID mismatch, find respective ' 'path for the GFID and trigger sync', gfid=slave_gfid)) if fix_entry_ops: - #Process deletions of entries whose gfids are mismatched + # Process deletions of entries whose gfids are mismatched failures1 = self.slave.server.entry_ops(fix_entry_ops) if not failures1: - logging.info ("Sucessfully fixed entry ops with gfid mismatch") + logging.info("Sucessfully fixed entry ops with gfid mismatch") return failures1 @@ -880,12 +899,11 @@ class GMasterChangelogMixin(GMasterCommon): for failure in failures1: logging.error("Failed to fix entry ops %s", repr(failure)) else: - #Retry original entry list 5 times + # Retry original entry list 5 times failures = self.slave.server.entry_ops(entries) self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') - def process_change(self, change, done, retry): pfx = gauxpfx() clist = [] @@ -930,7 +948,7 @@ class GMasterChangelogMixin(GMasterCommon): # skip ENTRY operation if hot tier brick if self.name == 'live_changelog' or \ self.name == 'history_changelog': - if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY: + if rconf.args.is_hottier and et == self.TYPE_ENTRY: logging.debug(lf('skip ENTRY op if hot tier brick', op=ec[self.POS_TYPE])) continue @@ -978,7 +996,7 @@ class GMasterChangelogMixin(GMasterCommon): 'master', gfid=gfid, pgfid_bname=en)) continue - if not boolify(gconf.ignore_deletes): + if not gconf.get("ignore-deletes"): if not ignore_entry_ops: entries.append(edct(ty, gfid=gfid, entry=en)) elif ty in ['CREATE', 'MKDIR', 'MKNOD']: @@ -1084,12 +1102,11 @@ class GMasterChangelogMixin(GMasterCommon): st_mtime=ec[6]))) else: meta_gfid.add((os.path.join(pfx, ec[0]), )) - elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \ - ec[1] == 'FXATTROP': + elif ec[1] in ['SETXATTR', 'XATTROP', 'FXATTROP']: # To sync xattr/acls use rsync/tar, --xattrs and --acls # switch to rsync and tar - if not boolify(gconf.use_tarssh) and \ - (boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)): + if not gconf.get("sync-method") == "tarssh" and \ + (gconf.get("sync-xattrs") or gconf.get("sync-acls")): datas.add(os.path.join(pfx, ec[0])) else: logging.warn(lf('got invalid fop type', @@ -1102,8 +1119,8 @@ class GMasterChangelogMixin(GMasterCommon): self.status.inc_value("data", len(datas)) self.batch_stats["DATA"] += self.files_in_batch - \ - self.batch_stats["SETXATTR"] - \ - self.batch_stats["XATTROP"] + self.batch_stats["SETXATTR"] - \ + self.batch_stats["XATTROP"] entry_start_time = time.time() # sync namespace @@ -1185,7 +1202,7 @@ class GMasterChangelogMixin(GMasterCommon): # with data of other changelogs. if retry: - if tries == (int(gconf.max_rsync_retries) - 1): + if tries == (gconf.get("max-rsync-retries") - 1): # Enable Error logging if it is last retry self.syncer.enable_errorlog() @@ -1243,7 +1260,7 @@ class GMasterChangelogMixin(GMasterCommon): # We do not know which changelog transfer failed, retry everything. retry = True tries += 1 - if tries == int(gconf.max_rsync_retries): + if tries == gconf.get("max-rsync-retries"): logging.error(lf('changelogs could not be processed ' 'completely - moving on...', files=map(os.path.basename, changes))) @@ -1331,8 +1348,7 @@ class GMasterChangelogMixin(GMasterCommon): # Update last_synced_time in status file based on stime # only update stime if stime xattr set to Brick root if path == self.FLAT_DIR_HIERARCHY: - chkpt_time = gconf.configinterface.get_realtime( - "checkpoint") + chkpt_time = gconf.getr("checkpoint") checkpoint_time = 0 if chkpt_time is not None: checkpoint_time = int(chkpt_time) @@ -1340,7 +1356,7 @@ class GMasterChangelogMixin(GMasterCommon): self.status.set_last_synced(stime, checkpoint_time) def update_worker_remote_node(self): - node = sys.argv[-1] + node = rconf.args.resource_remote node_data = node.split("@") node = node_data[-1] remote_node_ip = node.split(":")[0] @@ -1351,7 +1367,7 @@ class GMasterChangelogMixin(GMasterCommon): current_size = 0 for c in changes: si = os.lstat(c).st_size - if (si + current_size) > int(gconf.changelog_batch_size): + if (si + current_size) > gconf.get("changelog-batch-size"): # Create new batch if single Changelog file greater than # Max Size! or current batch size exceeds Max size changelogs_batches.append([c]) @@ -1397,7 +1413,7 @@ class GMasterChangelogMixin(GMasterCommon): def register(self, register_time, changelog_agent, status): self.changelog_agent = changelog_agent - self.sleep_interval = int(gconf.change_interval) + self.sleep_interval = gconf.get("change-interval") self.changelog_done_func = self.changelog_agent.done self.tempdir = self.setup_working_dir() self.processed_changelogs_dir = os.path.join(self.tempdir, @@ -1437,13 +1453,13 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # Changelogs backend path is hardcoded as # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different # location then consuming history will not work(Known issue as of now) - changelog_path = os.path.join(gconf.local_path, + changelog_path = os.path.join(rconf.args.local_path, ".glusterfs/changelogs") ret, actual_end = self.changelog_agent.history( changelog_path, data_stime[0], end_time, - int(gconf.sync_jobs)) + gconf.get("sync-jobs")) # scan followed by getchanges till scan returns zero. # history_scan() is blocking call, till it gets the number @@ -1736,7 +1752,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin): [gfid, 'MKNOD', str(mo), str(0), str(0), escape_space_newline( - os.path.join(pargfid, bname))]) + os.path.join( + pargfid, bname))]) else: self.write_entry_change( "E", [gfid, 'LINK', escape_space_newline( @@ -1837,8 +1854,8 @@ class Syncer(object): self.pb = PostBox() self.sync_engine = sync_engine self.errnos_ok = resilient_errnos - for i in range(int(gconf.sync_jobs)): - t = Thread(target=self.syncjob, args=(i+1, )) + for i in range(gconf.get("sync-jobs")): + t = Thread(target=self.syncjob, args=(i + 1, )) t.start() def syncjob(self, job_id): diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index c6fa1076a85..a193b57caff 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -13,21 +13,19 @@ import sys import time import signal import logging -import uuid import xml.etree.ElementTree as XET from subprocess import PIPE -from resource import FILE, GLUSTER, SSH from threading import Lock from errno import ECHILD, ESRCH -import re import random -from gconf import gconf -from syncdutils import select, waitpid, errno_wrap, lf + +from resource import SSH +import gsyncdconfig as gconf +from rconf import rconf +from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile from syncdutils import set_term_handler, is_host_local, GsyncdError -from syncdutils import escape, Thread, finalize +from syncdutils import Thread, finalize, Popen, Volinfo from syncdutils import gf_event, EVENT_GEOREP_FAULTY -from syncdutils import Volinfo, Popen - from gsyncdstatus import GeorepStatus, set_monitor_status @@ -82,7 +80,8 @@ def get_slave_bricks_status(host, vol): try: for el in vi.findall('volStatus/volumes/volume/node'): if el.find('status').text == '1': - up_hosts.add(el.find('hostname').text) + up_hosts.add((el.find('hostname').text, + el.find('peerid').text)) except (ParseError, AttributeError, ValueError) as e: logging.info(lf("Parsing failed to get list of up nodes, " "returning empty list", @@ -116,7 +115,8 @@ class Monitor(object): # give a chance to graceful exit errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH]) - def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master): + def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master, + suuid): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -136,7 +136,7 @@ class Monitor(object): due to the keep-alive thread) """ if not self.status.get(w[0]['dir'], None): - self.status[w[0]['dir']] = GeorepStatus(gconf.state_file, + self.status[w[0]['dir']] = GeorepStatus(gconf.get("state-file"), w[0]['host'], w[0]['dir'], w[0]['uuid'], @@ -144,7 +144,7 @@ class Monitor(object): "%s::%s" % (slave_host, slave_vol)) - set_monitor_status(gconf.state_file, self.ST_STARTED) + set_monitor_status(gconf.get("state-file"), self.ST_STARTED) self.status[w[0]['dir']].set_worker_status(self.ST_INIT) ret = 0 @@ -172,26 +172,22 @@ class Monitor(object): return os.WEXITSTATUS(s) return 1 - conn_timeout = int(gconf.connection_timeout) + conn_timeout = gconf.get("connection-timeout") while ret in (0, 1): - remote_host = w[1] + remote_user, remote_host = w[1][0].split("@") + remote_id = w[1][1] # Check the status of the connected slave node # If the connected slave node is down then try to connect to # different up node. - m = re.match("(ssh|gluster|file):\/\/(.+)@([^:]+):(.+)", - remote_host) - if m: - current_slave_host = m.group(3) - slave_up_hosts = get_slave_bricks_status( - slave_host, slave_vol) - - if current_slave_host not in slave_up_hosts: - if len(slave_up_hosts) > 0: - remote_host = "%s://%s@%s:%s" % (m.group(1), - m.group(2), - random.choice( - slave_up_hosts), - m.group(4)) + current_slave_host = remote_host + slave_up_hosts = get_slave_bricks_status( + slave_host, slave_vol) + + if (current_slave_host, remote_id) not in slave_up_hosts: + if len(slave_up_hosts) > 0: + remote_new = random.choice(slave_up_hosts) + remote_host = "%s@%s" % (remote_user, remote_new[0]) + remote_id = remote_new[1] # Spawn the worker and agent in lock to avoid fd leak self.lock.acquire() @@ -213,33 +209,58 @@ class Monitor(object): if apid == 0: os.close(rw) os.close(ww) - os.execv(sys.executable, argv + ['--local-path', w[0]['dir'], - '--local-node', w[0]['host'], - '--local-node-id', - w[0]['uuid'], - '--agent', - '--rpc-fd', - ','.join([str(ra), str(wa), - str(rw), str(ww)])]) + args_to_agent = argv + [ + 'agent', + rconf.args.master, + rconf.args.slave, + '--local-path', w[0]['dir'], + '--local-node', w[0]['host'], + '--local-node-id', w[0]['uuid'], + '--slave-id', suuid, + '--rpc-fd', ','.join([str(ra), str(wa), str(rw), str(ww)]) + ] + + if rconf.args.config_file is not None: + args_to_agent += ['-c', rconf.args.config_file] + + if rconf.args.debug: + args_to_agent.append("--debug") + + os.execv(sys.executable, args_to_agent) + pr, pw = os.pipe() cpid = os.fork() if cpid == 0: os.close(pr) os.close(ra) os.close(wa) - os.execv(sys.executable, argv + ['--feedback-fd', str(pw), - '--local-path', w[0]['dir'], - '--local-node', w[0]['host'], - '--local-node-id', - w[0]['uuid'], - '--local-id', - '.' + escape(w[0]['dir']), - '--rpc-fd', - ','.join([str(rw), str(ww), - str(ra), str(wa)]), - '--subvol-num', str(w[2])] + - (['--is-hottier'] if w[3] else []) + - ['--resource-remote', remote_host]) + + args_to_worker = argv + [ + 'worker', + rconf.args.master, + rconf.args.slave, + '--feedback-fd', str(pw), + '--local-path', w[0]['dir'], + '--local-node', w[0]['host'], + '--local-node-id', w[0]['uuid'], + '--slave-id', suuid, + '--rpc-fd', + ','.join([str(rw), str(ww), str(ra), str(wa)]), + '--subvol-num', str(w[2]), + '--resource-remote', remote_host, + '--resource-remote-id', remote_id + ] + + if rconf.args.config_file is not None: + args_to_worker += ['-c', rconf.args.config_file] + + if w[3]: + args_to_worker.append("--is-hottier") + + if rconf.args.debug: + args_to_worker.append("--debug") + + os.execv(sys.executable, args_to_worker) cpids.add(cpid) agents.add(apid) @@ -290,7 +311,8 @@ class Monitor(object): logging.info(lf("Changelog Agent died, Aborting " "Worker", brick=w[0]['dir'])) - errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) + errno_wrap(os.kill, [cpid, signal.SIGKILL], + [ESRCH]) nwait(cpid) nwait(apid) break @@ -333,12 +355,7 @@ class Monitor(object): return ret def multiplex(self, wspx, suuid, slave_vol, slave_host, master): - argv = sys.argv[:] - for o in ('-N', '--no-daemon', '--monitor'): - while o in argv: - argv.remove(o) - argv.extend(('-N', '-p', '', '--slave-id', suuid)) - argv.insert(0, os.path.basename(sys.executable)) + argv = [os.path.basename(sys.executable), sys.argv[0]] cpids = set() agents = set() @@ -346,7 +363,7 @@ class Monitor(object): for wx in wspx: def wmon(w): cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, - slave_host, master) + slave_host, master, suuid) time.sleep(1) self.lock.acquire() for cpid in cpids: @@ -362,55 +379,39 @@ class Monitor(object): t.join() -def distribute(*resources): - master, slave = resources +def distribute(master, slave): mvol = Volinfo(master.volume, master.host) logging.debug('master bricks: ' + repr(mvol.bricks)) prelude = [] - si = slave slave_host = None slave_vol = None - if isinstance(slave, SSH): - prelude = gconf.ssh_command.split() + [slave.remote_addr] - si = slave.inner_rsc - logging.debug('slave SSH gateway: ' + slave.remote_addr) - if isinstance(si, FILE): - sbricks = {'host': 'localhost', 'dir': si.path} - suuid = uuid.uuid5(uuid.NAMESPACE_URL, slave.get_url(canonical=True)) - elif isinstance(si, GLUSTER): - svol = Volinfo(si.volume, slave.remote_addr.split('@')[-1]) - sbricks = svol.bricks - suuid = svol.uuid - slave_host = slave.remote_addr.split('@')[-1] - slave_vol = si.volume - - # save this xattr for the session delete command - old_stime_xattr_name = getattr(gconf, "master.stime_xattr_name", None) - new_stime_xattr_name = "trusted.glusterfs." + mvol.uuid + "." + \ - svol.uuid + ".stime" - if not old_stime_xattr_name or \ - old_stime_xattr_name != new_stime_xattr_name: - gconf.configinterface.set("master.stime_xattr_name", - new_stime_xattr_name) - else: - raise GsyncdError("unknown slave type " + slave.url) + prelude = [gconf.get("ssh-command")] + \ + gconf.get("ssh-options").split() + \ + ["-p", str(gconf.get("ssh-port"))] + \ + [slave.remote_addr] + + logging.debug('slave SSH gateway: ' + slave.remote_addr) + + svol = Volinfo(slave.volume, "localhost", prelude) + sbricks = svol.bricks + suuid = svol.uuid + slave_host = slave.remote_addr.split('@')[-1] + slave_vol = slave.volume + + # save this xattr for the session delete command + old_stime_xattr_prefix = gconf.get("stime-xattr-prefix", None) + new_stime_xattr_prefix = "trusted.glusterfs." + mvol.uuid + "." + \ + svol.uuid + if not old_stime_xattr_prefix or \ + old_stime_xattr_prefix != new_stime_xattr_prefix: + gconf.setconfig("stime-xattr-prefix", new_stime_xattr_prefix) + logging.debug('slave bricks: ' + repr(sbricks)) - if isinstance(si, FILE): - slaves = [slave.url] - else: - slavenodes = set(b['host'] for b in sbricks) - if isinstance(slave, SSH) and not gconf.isolated_slave: - rap = SSH.parse_ssh_address(slave) - slaves = ['ssh://' + rap['user'] + '@' + h + ':' + si.url - for h in slavenodes] - else: - slavevols = [h + ':' + si.volume for h in slavenodes] - if isinstance(slave, SSH): - slaves = ['ssh://' + rap.remote_addr + ':' + v - for v in slavevols] - else: - slaves = slavevols + + slavenodes = set((b['host'], b["uuid"]) for b in sbricks) + rap = SSH.parse_ssh_address(slave) + slaves = [(rap['user'] + '@' + h[0], h[1]) for h in slavenodes] workerspex = [] for idx, brick in enumerate(mvol.bricks): @@ -424,12 +425,47 @@ def distribute(*resources): return workerspex, suuid, slave_vol, slave_host, master -def monitor(*resources): +def monitor(local, remote): # Check if gsyncd restarted in pause state. If # yes, send SIGSTOP to negative of monitor pid # to go back to pause state. - if gconf.pause_on_start: + if rconf.args.pause_on_start: errno_wrap(os.kill, [-os.getpid(), signal.SIGSTOP], [ESRCH]) """oh yeah, actually Monitor is used as singleton, too""" - return Monitor().multiplex(*distribute(*resources)) + return Monitor().multiplex(*distribute(local, remote)) + + +def startup(go_daemon=True): + """set up logging, pidfile grabbing, daemonization""" + pid_file = gconf.get("pid-file") + if not grabpidfile(): + sys.stderr.write("pidfile is taken, exiting.\n") + sys.exit(2) + rconf.pid_file_owned = True + + if not go_daemon: + return + + x, y = os.pipe() + cpid = os.fork() + if cpid: + os.close(x) + sys.exit() + os.close(y) + os.setsid() + dn = os.open(os.devnull, os.O_RDWR) + for f in (sys.stdin, sys.stdout, sys.stderr): + os.dup2(dn, f.fileno()) + + if not grabpidfile(pid_file + '.tmp'): + raise GsyncdError("cannot grab temporary pidfile") + + os.rename(pid_file + '.tmp', pid_file) + + # wait for parent to terminate + # so we can start up with + # no messing from the dirty + # ol' bustard + select((x,), (), ()) + os.close(x) diff --git a/geo-replication/syncdaemon/gconf.py b/geo-replication/syncdaemon/rconf.py index 97395b41b06..1b8f333c0c8 100644 --- a/geo-replication/syncdaemon/gconf.py +++ b/geo-replication/syncdaemon/rconf.py @@ -9,9 +9,9 @@ # -class GConf(object): +class RConf(object): - """singleton class to store globals + """singleton class to store runtime globals shared between gsyncd modules""" ssh_ctl_dir = None @@ -28,5 +28,7 @@ class GConf(object): active_earlier = False passive_earlier = False mgmt_lock_fd = None + args = None + turns = 0 -gconf = GConf() +rconf = RConf() diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py index 0ac144930db..b4d96b1de47 100644 --- a/geo-replication/syncdaemon/repce.py +++ b/geo-replication/syncdaemon/repce.py @@ -13,21 +13,9 @@ import sys import time import logging from threading import Condition -try: - import thread -except ImportError: - # py 3 - import _thread as thread -try: - from Queue import Queue -except ImportError: - # py 3 - from queue import Queue -try: - import cPickle as pickle -except ImportError: - # py 3 - import pickle +import thread +from Queue import Queue +import cPickle as pickle from syncdutils import Thread, select, lf diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index c617c7b312e..f5a19629e7a 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -13,25 +13,24 @@ import os import sys import stat import time -import signal import fcntl import types import struct -import socket import logging import tempfile import subprocess -import errno from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM -import shutil +import errno + +from rconf import rconf +import gsyncdconfig as gconf -from gconf import gconf import repce from repce import RepceServer, RepceClient from master import gmaster_builder import syncdutils -from syncdutils import GsyncdError, select, privileged, boolify, funcode +from syncdutils import GsyncdError, select, privileged, funcode from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat from syncdutils import NoStimeAvailable, PartialHistoryAvailable from syncdutils import ChangelogException, ChangelogHistoryNotAvailable @@ -39,13 +38,9 @@ from syncdutils import get_changelog_log_level, get_rsync_version from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION from syncdutils import GX_GFID_CANONICAL_LEN from gsyncdstatus import GeorepStatus -from syncdutils import get_master_and_slave_data_from_args from syncdutils import mntpt_list, lf, Popen, sup, Volinfo from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt -UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') -HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I) -UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') @@ -53,58 +48,6 @@ slv_volume = None slv_host = None slv_bricks = None -def desugar(ustr): - """transform sugared url strings to standard <scheme>://<urlbody> form - - parsing logic enforces the constraint that sugared forms should contatin - a ':' or a '/', which ensures that sugared urls do not conflict with - gluster volume names. - """ - m = re.match('([^:]*):(.*)', ustr) - if m: - if not m.groups()[0]: - return "gluster://localhost" + ustr - elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]): - return "ssh://" + ustr - else: - return "gluster://" + ustr - else: - if ustr[0] != '/': - raise GsyncdError("cannot resolve sugared url '%s'" % ustr) - ap = os.path.normpath(ustr) - if ap.startswith('//'): - ap = ap[1:] - return "file://" + ap - - -def gethostbyname(hnam): - """gethostbyname wrapper""" - try: - return socket.gethostbyname(hnam) - except socket.gaierror: - ex = sys.exc_info()[1] - raise GsyncdError("failed to resolve %s: %s" % - (hnam, ex.strerror)) - - -def parse_url(ustr): - """instantiate an url object by scheme-to-class dispatch - - The url classes taken into consideration are the ones in - this module whose names are full-caps. - """ - m = UrlRX.match(ustr) - if not m: - ustr = desugar(ustr) - m = UrlRX.match(ustr) - if not m: - raise GsyncdError("malformed url") - sch, path = m.groups() - this = sys.modules[__name__] - if not hasattr(this, sch.upper()): - raise GsyncdError("unknown url scheme " + sch) - return getattr(this, sch.upper())(path) - class Server(object): @@ -149,14 +92,14 @@ class Server(object): fc = funcode(f) pi = list(fc.co_varnames).index('path') - def ff(*a): - path = a[pi] + def ff(*args): + path = args[pi] ps = path.split('/') if path[0] == '/' or '..' in ps: raise ValueError('unsafe path') - a = list(a) - a[pi] = os.path.join(a[0].local_path, path) - return f(*a) + args = list(args) + args[pi] = os.path.join(args[0].local_path, path) + return f(*args) return ff @classmethod @@ -493,7 +436,8 @@ class Server(object): else: en = e['entry'] disk_gfid = get_gfid_from_mnt(en) - if isinstance(disk_gfid, basestring) and e['gfid'] != disk_gfid: + if isinstance(disk_gfid, basestring) and \ + e['gfid'] != disk_gfid: slv_entry_info['gfid_mismatch'] = True st = lstat(en) if not isinstance(st, int): @@ -560,7 +504,6 @@ class Server(object): [ENOENT, EEXIST], [ESTALE, EBUSY]) collect_failure(e, cmd_ret) - for e in entries: blob = None op = e['op'] @@ -636,19 +579,21 @@ class Server(object): global slv_volume global slv_host if not slv_bricks: - slv_info = Volinfo (slv_volume, slv_host) + slv_info = Volinfo(slv_volume, slv_host) slv_bricks = slv_info.bricks # Result of readlink would be of format as below. # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename" realpath = os.readlink(os.path.join(slv_bricks[0]['dir'], - ".glusterfs", gfid[0:2], - gfid[2:4], gfid)) + ".glusterfs", + gfid[0:2], + gfid[2:4], + gfid)) realpath_parts = realpath.split('/') src_pargfid = realpath_parts[-2] src_basename = realpath_parts[-1] src_entry = os.path.join(pfx, src_pargfid, src_basename) logging.info(lf("Special case: rename on mkdir", - gfid=gfid, entry=repr(entry))) + gfid=gfid, entry=repr(entry))) rename_with_disk_gfid_confirmation(gfid, src_entry, entry) elif op == 'LINK': slink = os.path.join(pfx, gfid) @@ -735,7 +680,7 @@ class Server(object): [pg, 'glusterfs.gfid.newfile', blob], [EEXIST, ENOENT], [ESTALE, EINVAL, EBUSY]) - failed = collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret) # If UID/GID is different than zero that means we are trying # create Entry with different UID/GID. Create Entry with @@ -852,274 +797,239 @@ class Server(object): return 1.0 -class SlaveLocal(object): +class Mounter(object): - """mix-in class to implement some factes of a slave server - - ("mix-in" is sort of like "abstract class", ie. it's not - instantiated just included in the ancesty DAG. I use "mix-in" - to indicate that it's not used as an abstract base class, - rather just taken in to implement additional functionality - on the basis of the assumed availability of certain interfaces.) - """ + """Abstract base class for mounter backends""" - def can_connect_to(self, remote): - """determine our position in the connectibility matrix""" - return not remote - - def service_loop(self): - """start a RePCe server serving self's server - - stop servicing if a timeout is configured and got no - keep-alime in that inteval - """ - - if boolify(gconf.use_rsync_xattrs) and not privileged(): - raise GsyncdError( - "using rsync for extended attributes is not supported") - - repce = RepceServer( - self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) - t = syncdutils.Thread(target=lambda: (repce.service_loop(), - syncdutils.finalize())) - t.start() - logging.info("slave listening") - if gconf.timeout and int(gconf.timeout) > 0: - while True: - lp = self.server.last_keep_alive - time.sleep(int(gconf.timeout)) - if lp == self.server.last_keep_alive: - logging.info( - lf("connection inactive, stopping", - timeout=int(gconf.timeout))) - break - else: - select((), (), ()) + def __init__(self, params): + self.params = params + self.mntpt = None + @classmethod + def get_glusterprog(cls): + return os.path.join(gconf.get("gluster-command-dir"), + cls.glusterprog) + + def umount_l(self, d): + """perform lazy umount""" + po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE) + po.wait() + return po -class SlaveRemote(object): + @classmethod + def make_umount_argv(cls, d): + raise NotImplementedError - """mix-in class to implement an interface to a remote slave""" + def make_mount_argv(self, label=None): + raise NotImplementedError - def connect_remote(self, rargs=[], **opts): - """connects to a remote slave + def cleanup_mntpt(self, *a): + pass - Invoke an auxiliary utility (slave gsyncd, possibly wrapped) - which sets up the connection and set up a RePCe client to - communicate throuh its stdio. - """ - slave = opts.get('slave', self.url) - extra_opts = [] - so = getattr(gconf, 'session_owner', None) - if so: - extra_opts += ['--session-owner', so] - li = getattr(gconf, 'local_id', None) - if li: - extra_opts += ['--local-id', li] - ln = getattr(gconf, 'local_node', None) - if ln: - extra_opts += ['--local-node', ln] - if boolify(gconf.use_rsync_xattrs): - extra_opts.append('--use-rsync-xattrs') - if boolify(gconf.access_mount): - extra_opts.append('--access-mount') - po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + - ['-N', '--listen', '--timeout', str(gconf.timeout), - slave], - stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - gconf.transport = po - return self.start_fd_client(po.stdout, po.stdin, **opts) + def handle_mounter(self, po): + po.wait() - def start_fd_client(self, i, o, **opts): - """set up RePCe client, handshake with server + def inhibit(self, label): + """inhibit a gluster filesystem - It's cut out as a separate method to let - subclasses hook into client startup + Mount glusterfs over a temporary mountpoint, + change into the mount, and lazy unmount the + filesystem. """ - self.server = RepceClient(i, o) - rv = self.server.__version__() - exrv = {'proto': repce.repce_version, 'object': Server.version()} - da0 = (rv, exrv) - da1 = ({}, {}) - for i in range(2): - for k, v in da0[i].iteritems(): - da1[i][k] = int(v) - if da1[0] != da1[1]: - raise GsyncdError( - "RePCe major version mismatch: local %s, remote %s" % - (exrv, rv)) - - def rsync(self, files, *args, **kw): - """invoke rsync""" - if not files: - raise GsyncdError("no files to sync") - logging.debug("files: " + ", ".join(files)) - - extra_rsync_flags = [] - # Performance flag, --ignore-missing-args, if rsync version is - # greater than 3.1.0 then include this flag. - if boolify(gconf.rsync_opt_ignore_missing_args) and \ - get_rsync_version(gconf.rsync_command) >= "3.1.0": - extra_rsync_flags = ["--ignore-missing-args"] - - argv = gconf.rsync_command.split() + \ - ['-aR0', '--inplace', '--files-from=-', '--super', - '--stats', '--numeric-ids', '--no-implied-dirs'] + \ - (boolify(gconf.rsync_opt_existing) and ['--existing'] or []) + \ - gconf.rsync_options.split() + \ - (boolify(gconf.sync_xattrs) and ['--xattrs'] or []) + \ - (boolify(gconf.sync_acls) and ['--acls'] or []) + \ - extra_rsync_flags + \ - ['.'] + list(args) - - log_rsync_performance = boolify(gconf.configinterface.get_realtime( - "log_rsync_performance", default_value=False)) + access_mount = gconf.get("access-mount") + if rconf.args.subcmd == "slave": + access_mount = gconf.get("slave-access-mount") + + mpi, mpo = os.pipe() + mh = Popen.fork() + if mh: + # Parent + os.close(mpi) + fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC) + d = None + margv = self.make_mount_argv(label) + if self.mntpt: + # mntpt is determined pre-mount + d = self.mntpt + os.write(mpo, d + '\0') + po = Popen(margv, **self.mountkw) + self.handle_mounter(po) + po.terminate_geterr() + logging.debug('auxiliary glusterfs mount in place') + if not d: + # mntpt is determined during mount + d = self.mntpt + os.write(mpo, d + '\0') + os.write(mpo, 'M') + t = syncdutils.Thread(target=lambda: os.chdir(d)) + t.start() + tlim = rconf.starttime + gconf.get("connection-timeout") + while True: + if not t.isAlive(): + break - if log_rsync_performance: - # use stdout=PIPE only when log_rsync_performance enabled - # Else rsync will write to stdout and nobody is their - # to consume. If PIPE is full rsync hangs. - po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + if time.time() >= tlim: + syncdutils.finalize(exval=1) + time.sleep(1) + os.close(mpo) + _, rv = syncdutils.waitpid(mh, 0) + if rv: + rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \ + (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0) + logging.warn(lf('stale mount possibly left behind', + path=d)) + raise GsyncdError("cleaning up temp mountpoint %s " + "failed with status %d" % + (d, rv)) else: - po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + rv = 0 + try: + os.setsid() + os.close(mpo) + mntdata = '' + while True: + c = os.read(mpi, 1) + if not c: + break + mntdata += c + if mntdata: + mounted = False + if mntdata[-1] == 'M': + mntdata = mntdata[:-1] + assert(mntdata) + mounted = True + assert(mntdata[-1] == '\0') + mntpt = mntdata[:-1] + assert(mntpt) + if mounted and not access_mount: + po = self.umount_l(mntpt) + po.terminate_geterr(fail_on_err=False) + if po.returncode != 0: + po.errlog() + rv = po.returncode + if not access_mount: + self.cleanup_mntpt(mntpt) + except: + logging.exception('mount cleanup failure:') + rv = 200 + os._exit(rv) + logging.debug('auxiliary glusterfs mount prepared') + + +class DirectMounter(Mounter): + + """mounter backend which calls mount(8), umount(8) directly""" + + mountkw = {'stderr': subprocess.PIPE} + glusterprog = 'glusterfs' - for f in files: - po.stdin.write(f) - po.stdin.write('\0') + @staticmethod + def make_umount_argv(d): + return ['umount', '-l', d] - stdout, stderr = po.communicate() + def make_mount_argv(self, label=None): + self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-') + mntpt_list.append(self.mntpt) + return [self.get_glusterprog()] + \ + ['--' + p for p in self.params] + [self.mntpt] - if kw.get("log_err", False): - for errline in stderr.strip().split("\n")[:-1]: - logging.error(lf("SYNC Error", - sync_engine="Rsync", - error=errline)) + def cleanup_mntpt(self, mntpt=None): + if not mntpt: + mntpt = self.mntpt + errno_wrap(os.rmdir, [mntpt], [ENOENT, EBUSY]) - if log_rsync_performance: - rsync_msg = [] - for line in stdout.split("\n"): - if line.startswith("Number of files:") or \ - line.startswith("Number of regular files transferred:") or \ - line.startswith("Total file size:") or \ - line.startswith("Total transferred file size:") or \ - line.startswith("Literal data:") or \ - line.startswith("Matched data:") or \ - line.startswith("Total bytes sent:") or \ - line.startswith("Total bytes received:") or \ - line.startswith("sent "): - rsync_msg.append(line) - logging.info(lf("rsync performance", - data=", ".join(rsync_msg))) - return po +class MountbrokerMounter(Mounter): - def tarssh(self, files, slaveurl, log_err=False): - """invoke tar+ssh - -z (compress) can be use if needed, but omitting it now - as it results in weird error (tar+ssh errors out (errcode: 2) - """ - if not files: - raise GsyncdError("no files to sync") - logging.debug("files: " + ", ".join(files)) - (host, rdir) = slaveurl.split(':') - tar_cmd = ["tar"] + \ - ["--sparse", "-cf", "-", "--files-from", "-"] - ssh_cmd = gconf.ssh_command_tar.split() + \ - ["-p", str(gconf.ssh_port)] + \ - [host, "tar"] + \ - ["--overwrite", "-xf", "-", "-C", rdir] - p0 = Popen(tar_cmd, stdout=subprocess.PIPE, - stdin=subprocess.PIPE, stderr=subprocess.PIPE) - p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE) - for f in files: - p0.stdin.write(f) - p0.stdin.write('\n') + """mounter backend using the mountbroker gluster service""" - p0.stdin.close() - p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits. - # wait for tar to terminate, collecting any errors, further - # waiting for transfer to complete - _, stderr1 = p1.communicate() + mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE} + glusterprog = 'gluster' - # stdin and stdout of p0 is already closed, Reset to None and - # wait for child process to complete - p0.stdin = None - p0.stdout = None - p0.communicate() - - if log_err: - for errline in stderr1.strip().split("\n")[:-1]: - logging.error(lf("SYNC Error", - sync_engine="Tarssh", - error=errline)) - - return p1 + @classmethod + def make_cli_argv(cls): + return [cls.get_glusterprog()] + ['--remote-host=localhost'] + \ + gconf.get("gluster-cli-options").split() + ['system::'] + @classmethod + def make_umount_argv(cls, d): + return cls.make_cli_argv() + ['umount', d, 'lazy'] -class AbstractUrl(object): + def make_mount_argv(self, label): + return self.make_cli_argv() + \ + ['mount', label, 'user-map-root=' + + syncdutils.getusername()] + self.params - """abstract base class for url scheme classes""" + def handle_mounter(self, po): + self.mntpt = po.stdout.readline()[:-1] + po.stdout.close() + sup(self, po) + if po.returncode != 0: + # if cli terminated with error due to being + # refused by glusterd, what it put + # out on stdout is a diagnostic message + logging.error(lf('glusterd answered', mnt=self.mntpt)) - def __init__(self, path, pattern): - m = re.search(pattern, path) - if not m: - raise GsyncdError("malformed path") - self.path = path - return m.groups() - @property - def scheme(self): - return type(self).__name__.lower() +class GLUSTERServer(Server): - def canonical_path(self): - return self.path + "server enhancements for a glusterfs backend""" - def get_url(self, canonical=False, escaped=False): - """format self's url in various styles""" - if canonical: - pa = self.canonical_path() + @classmethod + def _attr_unpack_dict(cls, xattr, extra_fields=''): + """generic volume mark fetching/parsing backed""" + fmt_string = cls.NTV_FMTSTR + extra_fields + buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) + vm = struct.unpack(fmt_string, buf) + m = re.match( + '(.{8})(.{4})(.{4})(.{4})(.{12})', + "".join(['%02x' % x for x in vm[2:18]])) + uuid = '-'.join(m.groups()) + volinfo = {'version': vm[0:2], + 'uuid': uuid, + 'retval': vm[18], + 'volume_mark': vm[19:21], + } + if extra_fields: + return volinfo, vm[-len(extra_fields):] else: - pa = self.path - u = "://".join((self.scheme, pa)) - if escaped: - u = syncdutils.escape(u) - return u - - @property - def url(self): - return self.get_url() - - -class FILE(AbstractUrl, SlaveLocal, SlaveRemote): - - """scheme class for file:// urls - - can be used to represent a file slave server - on slave side, or interface to a remote file - file server on master side - """ - - class FILEServer(Server): - - """included server flavor""" - pass - - server = FILEServer - - def __init__(self, path): - sup(self, path, '^/') + return volinfo - def connect(self): - """inhibit the resource beyond""" - os.chdir(self.path) + @classmethod + def foreign_volume_infos(cls): + """return list of valid (not expired) foreign volume marks""" + dict_list = [] + xattr_list = Xattr.llistxattr_buf('.') + for ele in xattr_list: + if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0: + d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) + now = int(time.time()) + if x[0] > now: + logging.debug("volinfo[%s] expires: %d " + "(%d sec later)" % + (d['uuid'], x[0], x[0] - now)) + d['timeout'] = x[0] + dict_list.append(d) + else: + try: + Xattr.lremovexattr('.', ele) + except OSError: + pass + return dict_list - def rsync(self, files, log_err=False): - return sup(self, files, self.path, log_err=log_err) + @classmethod + def native_volume_info(cls): + """get the native volume mark of the underlying gluster volume""" + try: + return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, + 'volume-mark'])) + except OSError: + ex = sys.exc_info()[1] + if ex.errno != ENODATA: + raise -class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): +class GLUSTER(object): """scheme class for gluster:// urls @@ -1129,247 +1039,18 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): (slave-ish features come from the mixins, master functionality is outsourced to GMaster from master) """ - - class GLUSTERServer(Server): - - "server enhancements for a glusterfs backend""" - - @classmethod - def _attr_unpack_dict(cls, xattr, extra_fields=''): - """generic volume mark fetching/parsing backed""" - fmt_string = cls.NTV_FMTSTR + extra_fields - buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) - vm = struct.unpack(fmt_string, buf) - m = re.match( - '(.{8})(.{4})(.{4})(.{4})(.{12})', - "".join(['%02x' % x for x in vm[2:18]])) - uuid = '-'.join(m.groups()) - volinfo = {'version': vm[0:2], - 'uuid': uuid, - 'retval': vm[18], - 'volume_mark': vm[19:21], - } - if extra_fields: - return volinfo, vm[-len(extra_fields):] - else: - return volinfo - - @classmethod - def foreign_volume_infos(cls): - """return list of valid (not expired) foreign volume marks""" - dict_list = [] - xattr_list = Xattr.llistxattr_buf('.') - for ele in xattr_list: - if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0: - d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) - now = int(time.time()) - if x[0] > now: - logging.debug("volinfo[%s] expires: %d " - "(%d sec later)" % - (d['uuid'], x[0], x[0] - now)) - d['timeout'] = x[0] - dict_list.append(d) - else: - try: - Xattr.lremovexattr('.', ele) - except OSError: - pass - return dict_list - - @classmethod - def native_volume_info(cls): - """get the native volume mark of the underlying gluster volume""" - try: - return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, - 'volume-mark'])) - except OSError: - ex = sys.exc_info()[1] - if ex.errno != ENODATA: - raise - server = GLUSTERServer - def __init__(self, path): - self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) + def __init__(self, host, volume): + self.path = "%s:%s" % (host, volume) + self.host = host + self.volume = volume global slv_volume global slv_host slv_volume = self.volume slv_host = self.host - def canonical_path(self): - return ':'.join([gethostbyname(self.host), self.volume]) - - def can_connect_to(self, remote): - """determine our position in the connectibility matrix""" - return not remote or \ - (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER)) - - class Mounter(object): - - """Abstract base class for mounter backends""" - - def __init__(self, params): - self.params = params - self.mntpt = None - - @classmethod - def get_glusterprog(cls): - return os.path.join(gconf.gluster_command_dir, cls.glusterprog) - - def umount_l(self, d): - """perform lazy umount""" - po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE) - po.wait() - return po - - @classmethod - def make_umount_argv(cls, d): - raise NotImplementedError - - def make_mount_argv(self, *a): - raise NotImplementedError - - def cleanup_mntpt(self, *a): - pass - - def handle_mounter(self, po): - po.wait() - - def inhibit(self, *a): - """inhibit a gluster filesystem - - Mount glusterfs over a temporary mountpoint, - change into the mount, and lazy unmount the - filesystem. - """ - - mpi, mpo = os.pipe() - mh = Popen.fork() - if mh: - os.close(mpi) - fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC) - d = None - margv = self.make_mount_argv(*a) - if self.mntpt: - # mntpt is determined pre-mount - d = self.mntpt - os.write(mpo, d + '\0') - po = Popen(margv, **self.mountkw) - self.handle_mounter(po) - po.terminate_geterr() - logging.debug('auxiliary glusterfs mount in place') - if not d: - # mntpt is determined during mount - d = self.mntpt - os.write(mpo, d + '\0') - os.write(mpo, 'M') - t = syncdutils.Thread(target=lambda: os.chdir(d)) - t.start() - tlim = gconf.starttime + int(gconf.connection_timeout) - while True: - if not t.isAlive(): - break - if time.time() >= tlim: - syncdutils.finalize(exval=1) - time.sleep(1) - os.close(mpo) - _, rv = syncdutils.waitpid(mh, 0) - if rv: - rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \ - (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0) - logging.warn(lf('stale mount possibly left behind', - path=d)) - raise GsyncdError("cleaning up temp mountpoint %s " - "failed with status %d" % - (d, rv)) - else: - rv = 0 - try: - os.setsid() - os.close(mpo) - mntdata = '' - while True: - c = os.read(mpi, 1) - if not c: - break - mntdata += c - if mntdata: - mounted = False - if mntdata[-1] == 'M': - mntdata = mntdata[:-1] - assert(mntdata) - mounted = True - assert(mntdata[-1] == '\0') - mntpt = mntdata[:-1] - assert(mntpt) - if mounted and not boolify(gconf.access_mount): - po = self.umount_l(mntpt) - po.terminate_geterr(fail_on_err=False) - if po.returncode != 0: - po.errlog() - rv = po.returncode - if not boolify(gconf.access_mount): - self.cleanup_mntpt(mntpt) - except: - logging.exception('mount cleanup failure:') - rv = 200 - os._exit(rv) - logging.debug('auxiliary glusterfs mount prepared') - - class DirectMounter(Mounter): - - """mounter backend which calls mount(8), umount(8) directly""" - - mountkw = {'stderr': subprocess.PIPE} - glusterprog = 'glusterfs' - - @staticmethod - def make_umount_argv(d): - return ['umount', '-l', d] - - def make_mount_argv(self): - self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-') - mntpt_list.append(self.mntpt) - return [self.get_glusterprog()] + \ - ['--' + p for p in self.params] + [self.mntpt] - - def cleanup_mntpt(self, mntpt=None): - if not mntpt: - mntpt = self.mntpt - errno_wrap(os.rmdir, [mntpt], [ENOENT, EBUSY]) - - class MountbrokerMounter(Mounter): - - """mounter backend using the mountbroker gluster service""" - - mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE} - glusterprog = 'gluster' - - @classmethod - def make_cli_argv(cls): - return [cls.get_glusterprog()] + ['--remote-host=localhost'] + \ - gconf.gluster_cli_options.split() + ['system::'] - - @classmethod - def make_umount_argv(cls, d): - return cls.make_cli_argv() + ['umount', d, 'lazy'] - - def make_mount_argv(self, label): - return self.make_cli_argv() + \ - ['mount', label, 'user-map-root=' + - syncdutils.getusername()] + self.params - - def handle_mounter(self, po): - self.mntpt = po.stdout.readline()[:-1] - po.stdout.close() - sup(self, po) - if po.returncode != 0: - # if cli terminated with error due to being - # refused by glusterd, what it put - # out on stdout is a diagnostic message - logging.error(lf('glusterd answered', mnt=self.mntpt)) - def connect(self): """inhibit the resource beyond @@ -1380,23 +1061,29 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): logging.info("Mounting gluster volume locally...") t0 = time.time() - label = getattr(gconf, 'mountbroker', None) + label = gconf.get('mountbroker', None) if not label and not privileged(): label = syncdutils.getusername() - mounter = label and self.MountbrokerMounter or self.DirectMounter - params = gconf.gluster_params.split() + \ - (gconf.gluster_log_level and ['log-level=' + - gconf.gluster_log_level] or []) + \ - ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + - self.host, 'volfile-id=' + self.volume, 'client-pid=-1'] - mounter(params).inhibit(*[l for l in [label] if l]) + mounter = label and MountbrokerMounter or DirectMounter + + log_file = gconf.get("gluster-log-file") + if rconf.args.subcmd == "slave": + log_file = gconf.get("slave-gluster-log-file") + + log_level = gconf.get("gluster-log-level") + if rconf.args.subcmd == "slave": + log_level = gconf.get("slave-gluster-log-level") + + params = gconf.get("gluster-params").split() + \ + ['log-level=' + log_level] + \ + ['log-file=' + log_file, 'volfile-server=' + self.host] + \ + ['volfile-id=' + self.volume, 'client-pid=-1'] + + self.mounter = mounter(params) + self.mounter.inhibit(label) logging.info(lf("Mounted gluster volume", duration="%.4f" % (time.time() - t0))) - def connect_remote(self, *a, **kw): - sup(self, *a, **kw) - self.slavedir = "/proc/%d/cwd" % self.server.pid() - def gmaster_instantiate_tuple(self, slave): """return a tuple of the 'one shot' and the 'main crawl' class instance""" @@ -1404,7 +1091,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): gmaster_builder()(self, slave), gmaster_builder('changeloghistory')(self, slave)) - def service_loop(self, *args): + def service_loop(self, slave=None): """enter service loop - if slave given, instantiate GMaster and @@ -1412,171 +1099,183 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): master behavior - else do that's what's inherited """ - if args: - slave = args[0] - if gconf.local_path: - class brickserver(FILE.FILEServer): - local_path = gconf.local_path - aggregated = self.server - - @classmethod - def entries(cls, path): - e = super(brickserver, cls).entries(path) - # on the brick don't mess with /.glusterfs - if path == '.': - try: - e.remove('.glusterfs') - e.remove('.trashcan') - except ValueError: - pass - return e - - @classmethod - def lstat(cls, e): - """ path based backend stat """ - return super(brickserver, cls).lstat(e) - - @classmethod - def gfid(cls, e): - """ path based backend gfid fetch """ - return super(brickserver, cls).gfid(e) - - @classmethod - def linkto_check(cls, e): - return super(brickserver, cls).linkto_check(e) - if gconf.slave_id: - # define {,set_}xtime in slave, thus preempting - # the call to remote, so that it takes data from - # the local brick - slave.server.xtime = types.MethodType( - lambda _self, path, uuid: ( - brickserver.xtime(path, - uuid + '.' + gconf.slave_id) - ), - slave.server) - slave.server.stime = types.MethodType( - lambda _self, path, uuid: ( - brickserver.stime(path, - uuid + '.' + gconf.slave_id) - ), - slave.server) - slave.server.entry_stime = types.MethodType( - lambda _self, path, uuid: ( - brickserver.entry_stime( - path, - uuid + '.' + gconf.slave_id) - ), - slave.server) - slave.server.set_stime = types.MethodType( - lambda _self, path, uuid, mark: ( - brickserver.set_stime(path, - uuid + '.' + gconf.slave_id, - mark) - ), - slave.server) - slave.server.set_entry_stime = types.MethodType( - lambda _self, path, uuid, mark: ( - brickserver.set_entry_stime( - path, - uuid + '.' + gconf.slave_id, - mark) - ), - slave.server) - (g1, g2, g3) = self.gmaster_instantiate_tuple(slave) - g1.master.server = brickserver - g2.master.server = brickserver - g3.master.server = brickserver - else: - (g1, g2, g3) = self.gmaster_instantiate_tuple(slave) - g1.master.server.aggregated = gmaster.master.server - g2.master.server.aggregated = gmaster.master.server - g3.master.server.aggregated = gmaster.master.server - # bad bad bad: bad way to do things like this - # need to make this elegant - # register the crawlers and start crawling - # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) - # g3 ==> changelog History - changelog_register_failed = False - (inf, ouf, ra, wa) = gconf.rpc_fd.split(',') - changelog_agent = RepceClient(int(inf), int(ouf)) - master_name, slave_data = get_master_and_slave_data_from_args( - sys.argv) - status = GeorepStatus(gconf.state_file, gconf.local_node, - gconf.local_path, - gconf.local_node_id, - master_name, slave_data) - status.reset_on_worker_start() - rv = changelog_agent.version() - if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: + if rconf.args.subcmd == "slave": + if gconf.get("use-rsync-xattrs") and not privileged(): raise GsyncdError( - "RePCe major version mismatch(changelog agent): " - "local %s, remote %s" % - (CHANGELOG_AGENT_CLIENT_VERSION, rv)) + "using rsync for extended attributes is not supported") + + repce = RepceServer( + self.server, sys.stdin, sys.stdout, gconf.get("sync-jobs")) + t = syncdutils.Thread(target=lambda: (repce.service_loop(), + syncdutils.finalize())) + t.start() + logging.info("slave listening") + if gconf.get("slave-timeout") and gconf.get("slave-timeout") > 0: + while True: + lp = self.server.last_keep_alive + time.sleep(gconf.get("slave-timeout")) + if lp == self.server.last_keep_alive: + logging.info( + lf("connection inactive, stopping", + timeout=gconf.get("slave-timeout"))) + break + else: + select((), (), ()) - try: - workdir = g2.setup_working_dir() - # Register only when change_detector is not set to - # xsync, else agent will generate changelog files - # in .processing directory of working dir - if gconf.change_detector != 'xsync': - # register with the changelog library - # 9 == log level (DEBUG) - # 5 == connection retries - changelog_agent.init() - changelog_agent.register(gconf.local_path, - workdir, gconf.changelog_log_file, - get_changelog_log_level( - gconf.changelog_log_level), - g2.CHANGELOG_CONN_RETRIES) - - register_time = int(time.time()) - g2.register(register_time, changelog_agent, status) - g3.register(register_time, changelog_agent, status) - except ChangelogException as e: - logging.error(lf("Changelog register failed", error=e)) - sys.exit(1) - - g1.register(status=status) - logging.info(lf("Register time", - time=register_time)) - # oneshot: Try to use changelog history api, if not - # available switch to FS crawl - # Note: if config.change_detector is xsync then - # it will not use changelog history api - try: - g3.crawlwrap(oneshot=True) - except PartialHistoryAvailable as e: - logging.info(lf('Partial history available, using xsync crawl' - ' after consuming history', - till=e)) - g1.crawlwrap(oneshot=True, register_time=register_time) - except ChangelogHistoryNotAvailable: - logging.info('Changelog history not available, using xsync') - g1.crawlwrap(oneshot=True, register_time=register_time) - except NoStimeAvailable: - logging.info('No stime available, using xsync crawl') - g1.crawlwrap(oneshot=True, register_time=register_time) - except ChangelogException as e: - logging.error(lf("Changelog History Crawl failed", - error=e)) - sys.exit(1) + return - try: - g2.crawlwrap() - except ChangelogException as e: - logging.error(lf("Changelog crawl failed", error=e)) - sys.exit(1) - else: - sup(self, *args) + class brickserver(Server): + local_path = rconf.args.local_path + aggregated = self.server + + @classmethod + def entries(cls, path): + e = super(brickserver, cls).entries(path) + # on the brick don't mess with /.glusterfs + if path == '.': + try: + e.remove('.glusterfs') + e.remove('.trashcan') + except ValueError: + pass + return e + + @classmethod + def lstat(cls, e): + """ path based backend stat """ + return super(brickserver, cls).lstat(e) + + @classmethod + def gfid(cls, e): + """ path based backend gfid fetch """ + return super(brickserver, cls).gfid(e) + + @classmethod + def linkto_check(cls, e): + return super(brickserver, cls).linkto_check(e) + + # define {,set_}xtime in slave, thus preempting + # the call to remote, so that it takes data from + # the local brick + slave.server.xtime = types.MethodType( + lambda _self, path, uuid: ( + brickserver.xtime(path, + uuid + '.' + rconf.args.slave_id) + ), + slave.server) + slave.server.stime = types.MethodType( + lambda _self, path, uuid: ( + brickserver.stime(path, + uuid + '.' + rconf.args.slave_id) + ), + slave.server) + slave.server.entry_stime = types.MethodType( + lambda _self, path, uuid: ( + brickserver.entry_stime( + path, + uuid + '.' + rconf.args.slave_id) + ), + slave.server) + slave.server.set_stime = types.MethodType( + lambda _self, path, uuid, mark: ( + brickserver.set_stime(path, + uuid + '.' + rconf.args.slave_id, + mark) + ), + slave.server) + slave.server.set_entry_stime = types.MethodType( + lambda _self, path, uuid, mark: ( + brickserver.set_entry_stime( + path, + uuid + '.' + rconf.args.slave_id, + mark) + ), + slave.server) + + (g1, g2, g3) = self.gmaster_instantiate_tuple(slave) + g1.master.server = brickserver + g2.master.server = brickserver + g3.master.server = brickserver + + # bad bad bad: bad way to do things like this + # need to make this elegant + # register the crawlers and start crawling + # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) + # g3 ==> changelog History + (inf, ouf, ra, wa) = rconf.args.rpc_fd.split(',') + changelog_agent = RepceClient(int(inf), int(ouf)) + + status = GeorepStatus(gconf.get("state-file"), + rconf.args.local_node, + rconf.args.local_path, + rconf.args.local_node_id, + rconf.args.master, + rconf.args.slave) + status.reset_on_worker_start() + rv = changelog_agent.version() + if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: + raise GsyncdError( + "RePCe major version mismatch(changelog agent): " + "local %s, remote %s" % + (CHANGELOG_AGENT_CLIENT_VERSION, rv)) - def rsync(self, files, log_err=False): - return sup(self, files, self.slavedir, log_err=log_err) + try: + workdir = g2.setup_working_dir() + # Register only when change_detector is not set to + # xsync, else agent will generate changelog files + # in .processing directory of working dir + if gconf.get("change-detector") != 'xsync': + # register with the changelog library + # 9 == log level (DEBUG) + # 5 == connection retries + changelog_agent.init() + changelog_agent.register(rconf.args.local_path, + workdir, + gconf.get("changelog-log-file"), + get_changelog_log_level( + gconf.get("changelog-log-level")), + g2.CHANGELOG_CONN_RETRIES) + + register_time = int(time.time()) + g2.register(register_time, changelog_agent, status) + g3.register(register_time, changelog_agent, status) + except ChangelogException as e: + logging.error(lf("Changelog register failed", error=e)) + sys.exit(1) + + g1.register(status=status) + logging.info(lf("Register time", + time=register_time)) + # oneshot: Try to use changelog history api, if not + # available switch to FS crawl + # Note: if config.change_detector is xsync then + # it will not use changelog history api + try: + g3.crawlwrap(oneshot=True) + except PartialHistoryAvailable as e: + logging.info(lf('Partial history available, using xsync crawl' + ' after consuming history', + till=e)) + g1.crawlwrap(oneshot=True, register_time=register_time) + except ChangelogHistoryNotAvailable: + logging.info('Changelog history not available, using xsync') + g1.crawlwrap(oneshot=True, register_time=register_time) + except NoStimeAvailable: + logging.info('No stime available, using xsync crawl') + g1.crawlwrap(oneshot=True, register_time=register_time) + except ChangelogException as e: + logging.error(lf("Changelog History Crawl failed", + error=e)) + sys.exit(1) - def tarssh(self, files, log_err=False): - return sup(self, files, self.slavedir, log_err=log_err) + try: + g2.crawlwrap() + except ChangelogException as e: + logging.error(lf("Changelog crawl failed", error=e)) + sys.exit(1) -class SSH(AbstractUrl, SlaveRemote): +class SSH(object): """scheme class for ssh:// urls @@ -1584,13 +1283,9 @@ class SSH(AbstractUrl, SlaveRemote): implementing an ssh based proxy """ - def __init__(self, path): - self.remote_addr, inner_url = sup(self, path, - '^((?:%s@)?%s):(.+)' % - tuple([r.pattern - for r in (UserRX, HostRX)])) - self.inner_rsc = parse_url(inner_url) - self.volume = inner_url[1:] + def __init__(self, host, volume): + self.remote_addr = host + self.volume = volume @staticmethod def parse_ssh_address(self): @@ -1602,35 +1297,28 @@ class SSH(AbstractUrl, SlaveRemote): self.remotehost = h return {'user': u, 'host': h} - def canonical_path(self): - rap = self.parse_ssh_address(self) - remote_addr = '@'.join([rap['user'], gethostbyname(rap['host'])]) - return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)]) - - def can_connect_to(self, remote): - """determine our position in the connectibility matrix""" - return False - - def start_fd_client(self, *a, **opts): - """customizations for client startup + def start_fd_client(self, i, o): + """set up RePCe client, handshake with server - - be a no-op if we are to daemonize (client startup is deferred - to post-daemon stage) - - determine target url for rsync after consulting server + It's cut out as a separate method to let + subclasses hook into client startup """ - if opts.get('deferred'): - return a - sup(self, *a) - ityp = type(self.inner_rsc) - if ityp == FILE: - slavepath = self.inner_rsc.path - elif ityp == GLUSTER: - slavepath = "/proc/%d/cwd" % self.server.pid() - else: - raise NotImplementedError + self.server = RepceClient(i, o) + rv = self.server.__version__() + exrv = {'proto': repce.repce_version, 'object': Server.version()} + da0 = (rv, exrv) + da1 = ({}, {}) + for i in range(2): + for k, v in da0[i].iteritems(): + da1[i][k] = int(v) + if da1[0] != da1[1]: + raise GsyncdError( + "RePCe major version mismatch: local %s, remote %s" % + (exrv, rv)) + slavepath = "/proc/%d/cwd" % self.server.pid() self.slaveurl = ':'.join([self.remote_addr, slavepath]) - def connect_remote(self, go_daemon=None): + def connect_remote(self): """connect to inner slave url through outer ssh url Wrap the connecting utility in ssh. @@ -1648,49 +1336,182 @@ class SSH(AbstractUrl, SlaveRemote): [NB. ATM gluster product does not makes use of interactive authentication.] """ - if go_daemon == 'done': - return self.start_fd_client(*self.fd_pair) - syncdutils.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-'), self.remote_addr, - self.inner_rsc.url) + self.volume) - deferred = go_daemon == 'postconn' logging.info("Initializing SSH connection between master and slave...") t0 = time.time() - ret = sup(self, gconf.ssh_command.split() + - ["-p", str(gconf.ssh_port)] + - gconf.ssh_ctl_args + [self.remote_addr], - slave=self.inner_rsc.url, deferred=deferred) + + extra_opts = [] + remote_gsyncd = gconf.get("remote-gsyncd") + if remote_gsyncd == "": + remote_gsyncd = "/nonexistent/gsyncd" + + if gconf.get("use-rsync-xattrs"): + extra_opts.append('--use-rsync-xattrs') + + args_to_slave = [gconf.get("ssh-command")] + \ + gconf.get("ssh-options").split() + \ + ["-p", str(gconf.get("ssh-port"))] + \ + rconf.ssh_ctl_args + [self.remote_addr] + \ + [remote_gsyncd, "slave"] + \ + extra_opts + \ + [rconf.args.master, rconf.args.slave] + \ + [ + '--master-node', rconf.args.local_node, + '--master-node-id', rconf.args.local_node_id, + '--master-brick', rconf.args.local_path, + '--local-node', rconf.args.resource_remote, + '--local-node-id', rconf.args.resource_remote_id] + \ + [ + # Add all config arguments here, slave gsyncd will not use + # config file in slave side, so all overridding options should + # be sent as arguments + '--slave-timeout', str(gconf.get("slave-timeout")), + '--slave-log-level', gconf.get("slave-log-level"), + '--slave-gluster-log-level', + gconf.get("slave-gluster-log-level")] + + if gconf.get("slave-access-mount"): + args_to_slave.append('--slave-access-mount') + + if rconf.args.debug: + args_to_slave.append('--debug') + + po = Popen(args_to_slave, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + rconf.transport = po + self.start_fd_client(po.stdout, po.stdin) logging.info(lf("SSH connection between master and slave established.", duration="%.4f" % (time.time() - t0))) - if deferred: - # send a message to peer so that we can wait for - # the answer from which we know connection is - # established and we can proceed with daemonization - # (doing that too early robs the ssh passwd prompt...) - # However, we'd better not start the RepceClient - # before daemonization (that's not preserved properly - # in daemon), we just do a an ad-hoc linear put/get. - i, o = ret - inf = os.fdopen(i) - repce.send(o, None, '__repce_version__') - select((inf,), (), ()) - repce.recv(inf) - # hack hack hack: store a global reference to the file - # to save it from getting GC'd which implies closing it - gconf.permanent_handles.append(inf) - self.fd_pair = (i, o) - return 'should' - - def rsync(self, files, log_err=False): - return sup(self, files, '-e', - " ".join(gconf.ssh_command.split() + - ["-p", str(gconf.ssh_port)] + - gconf.ssh_ctl_args), - *(gconf.rsync_ssh_options.split() + [self.slaveurl]), - log_err=log_err) - - def tarssh(self, files, log_err=False): - return sup(self, files, self.slaveurl, log_err=log_err) + def rsync(self, files, *args, **kw): + """invoke rsync""" + if not files: + raise GsyncdError("no files to sync") + logging.debug("files: " + ", ".join(files)) + + extra_rsync_flags = [] + # Performance flag, --ignore-missing-args, if rsync version is + # greater than 3.1.0 then include this flag. + if gconf.get("rsync-opt-ignore-missing-args") and \ + get_rsync_version(gconf.get("rsync-command")) >= "3.1.0": + extra_rsync_flags = ["--ignore-missing-args"] + + rsync_ssh_opts = [gconf.get("ssh-command")] + \ + gconf.get("ssh-options").split() + \ + ["-p", str(gconf.get("ssh-port"))] + \ + rconf.ssh_ctl_args + \ + gconf.get("rsync-ssh-options").split() + + argv = [ + gconf.get("rsync-command"), + '-aR0', + '--inplace', + '--files-from=-', + '--super', + '--stats', + '--numeric-ids', + '--no-implied-dirs' + ] + + if gconf.get("rsync-opt-existing"): + argv += ["--existing"] + + if gconf.get("sync-xattrs"): + argv += ['--xattrs'] + + if gconf.get("sync-acls"): + argv += ['--acls'] + + argv = argv + \ + gconf.get("rsync-options").split() + \ + extra_rsync_flags + ['.'] + \ + ["-e", " ".join(rsync_ssh_opts)] + \ + [self.slaveurl] + + log_rsync_performance = gconf.getr("log-rsync-performance", False) + + if log_rsync_performance: + # use stdout=PIPE only when log_rsync_performance enabled + # Else rsync will write to stdout and nobody is their + # to consume. If PIPE is full rsync hangs. + po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + else: + po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + + for f in files: + po.stdin.write(f) + po.stdin.write('\0') + + stdout, stderr = po.communicate() + + if kw.get("log_err", False): + for errline in stderr.strip().split("\n")[:-1]: + logging.error(lf("SYNC Error", + sync_engine="Rsync", + error=errline)) + + if log_rsync_performance: + rsync_msg = [] + for line in stdout.split("\n"): + if line.startswith("Number of files:") or \ + line.startswith("Number of regular files transferred:") or \ + line.startswith("Total file size:") or \ + line.startswith("Total transferred file size:") or \ + line.startswith("Literal data:") or \ + line.startswith("Matched data:") or \ + line.startswith("Total bytes sent:") or \ + line.startswith("Total bytes received:") or \ + line.startswith("sent "): + rsync_msg.append(line) + logging.info(lf("rsync performance", + data=", ".join(rsync_msg))) + + return po + + def tarssh(self, files, slaveurl, log_err=False): + """invoke tar+ssh + -z (compress) can be use if needed, but omitting it now + as it results in weird error (tar+ssh errors out (errcode: 2) + """ + if not files: + raise GsyncdError("no files to sync") + logging.debug("files: " + ", ".join(files)) + (host, rdir) = slaveurl.split(':') + tar_cmd = ["tar"] + \ + ["--sparse", "-cf", "-", "--files-from", "-"] + ssh_cmd = gconf.get("ssh-command-tar").split() + \ + gconf.get("ssh-options-tar").split() + \ + ["-p", str(gconf.get("ssh-port"))] + \ + [host, "tar"] + \ + ["--overwrite", "-xf", "-", "-C", rdir] + p0 = Popen(tar_cmd, stdout=subprocess.PIPE, + stdin=subprocess.PIPE, stderr=subprocess.PIPE) + p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE) + for f in files: + p0.stdin.write(f) + p0.stdin.write('\n') + + p0.stdin.close() + p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits. + # wait for tar to terminate, collecting any errors, further + # waiting for transfer to complete + _, stderr1 = p1.communicate() + + # stdin and stdout of p0 is already closed, Reset to None and + # wait for child process to complete + p0.stdin = None + p0.stdout = None + p0.communicate() + + if log_err: + for errline in stderr1.strip().split("\n")[:-1]: + logging.error(lf("SYNC Error", + sync_engine="Tarssh", + error=errline)) + + return p1 diff --git a/geo-replication/syncdaemon/subcmds.py b/geo-replication/syncdaemon/subcmds.py new file mode 100644 index 00000000000..8ce5f219b6a --- /dev/null +++ b/geo-replication/syncdaemon/subcmds.py @@ -0,0 +1,306 @@ +import logging + +from syncdutils import lf +import gsyncdconfig as gconf + + +ERROR_CONFIG_INVALID = 2 +ERROR_CONFIG_INVALID_VALUE = 3 +ERROR_CONFIG_NOT_CONFIGURABLE = 4 + + +def subcmd_monitor_status(args): + from gsyncdstatus import set_monitor_status + from rconf import rconf + + set_monitor_status(gconf.get("state-file"), args.status) + rconf.log_exit = False + logging.info(lf("Monitor Status Change", status=args.status)) + + +def subcmd_status(args): + from gsyncdstatus import GeorepStatus + + master_name = args.master.replace(":", "") + slave_data = args.slave.replace("ssh://", "") + + brick_status = GeorepStatus(gconf.get("state-file"), + "", + args.local_path, + "", + master_name, + slave_data, + gconf.get("pid-file")) + checkpoint_time = gconf.get("checkpoint", 0) + brick_status.print_status(checkpoint_time=checkpoint_time) + + +def subcmd_monitor(args): + import monitor + from resource import GLUSTER, SSH, Popen + go_daemon = False if args.debug else True + + monitor.startup(go_daemon) + Popen.init_errhandler() + local = GLUSTER("localhost", args.master) + slavehost, slavevol = args.slave.split("::") + remote = SSH(slavehost, slavevol) + return monitor.monitor(local, remote) + + +def subcmd_verify_spawning(args): + logging.info("Able to spawn gsyncd.py") + + +def subcmd_worker(args): + import os + import fcntl + + from resource import GLUSTER, SSH, Popen + + Popen.init_errhandler() + fcntl.fcntl(args.feedback_fd, fcntl.F_SETFD, fcntl.FD_CLOEXEC) + local = GLUSTER("localhost", args.master) + slavehost, slavevol = args.slave.split("::") + remote = SSH(slavehost, slavevol) + remote.connect_remote() + local.connect() + logging.info("Closing feedback fd, waking up the monitor") + os.close(args.feedback_fd) + local.service_loop(remote) + + +def subcmd_slave(args): + from resource import GLUSTER, Popen + + Popen.init_errhandler() + slavevol = args.slave.split("::")[-1] + local = GLUSTER("localhost", slavevol) + + local.connect() + local.service_loop() + + +def subcmd_agent(args): + import os + from changelogagent import agent, Changelog + from syncdutils import lf + + os.setsid() + logging.debug(lf("RPC FD", + rpc_fd=repr(args.rpc_fd))) + return agent(Changelog(), args.rpc_fd) + + +def subcmd_voluuidget(args): + from subprocess import Popen, PIPE + import xml.etree.ElementTree as XET + + ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError + + po = Popen(['gluster', '--xml', '--remote-host=' + args.host, + 'volume', 'info', args.volname], bufsize=0, + stdin=None, stdout=PIPE, stderr=PIPE) + + vix, err = po.communicate() + if po.returncode != 0: + logging.info(lf("Volume info failed, unable to get " + "volume uuid of slavevol, " + "returning empty string", + slavevol=args.volname, + slavehost=args.host, + error=po.returncode)) + return "" + vi = XET.fromstring(vix) + if vi.find('opRet').text != '0': + logging.info(lf("Unable to get volume uuid of slavevol, " + "returning empty string", + slavevol=args.volname, + slavehost=args.host, + error=vi.find('opErrstr').text)) + return "" + + try: + voluuid = vi.find("volInfo/volumes/volume/id").text + except (ParseError, AttributeError, ValueError) as e: + logging.info(lf("Parsing failed to volume uuid of slavevol, " + "returning empty string", + slavevol=args.volname, + slavehost=args.host, + error=e)) + voluuid = "" + + print(voluuid) + + +def _unlink(path): + import os + from errno import ENOENT + from syncdutils import GsyncdError + import sys + + try: + os.unlink(path) + except (OSError, IOError): + if sys.exc_info()[1].errno == ENOENT: + pass + else: + raise GsyncdError('Unlink error: %s' % path) + + +def subcmd_delete(args): + import logging + import shutil + import glob + import sys + from errno import ENOENT, ENODATA + import struct + + from syncdutils import GsyncdError, Xattr, errno_wrap + import gsyncdconfig as gconf + + logging.info('geo-replication delete') + # remove the stime xattr from all the brick paths so that + # a re-create of a session will start sync all over again + stime_xattr_prefix = gconf.get('stime-xattr-prefix', None) + + # Delete pid file, status file, socket file + cleanup_paths = [] + cleanup_paths.append(gconf.get("pid-file")) + + # Cleanup Session dir + try: + shutil.rmtree(gconf.get("georep-session-working-dir")) + except (IOError, OSError): + if sys.exc_info()[1].errno == ENOENT: + pass + else: + raise GsyncdError( + 'Error while removing working dir: %s' % + gconf.get("georep-session-working-dir")) + + # Cleanup changelog working dirs + try: + shutil.rmtree(gconf.get("working-dir")) + except (IOError, OSError): + if sys.exc_info()[1].errno == ENOENT: + pass + else: + raise GsyncdError( + 'Error while removing working dir: %s' % + gconf.get("working-dir")) + + for path in cleanup_paths: + # To delete temp files + for f in glob.glob(path + "*"): + _unlink(f) + + if args.reset_sync_time and stime_xattr_prefix: + for p in args.paths: + if p != "": + # set stime to (0,0) to trigger full volume content resync + # to slave on session recreation + # look at master.py::Xcrawl hint: zero_zero + errno_wrap(Xattr.lsetxattr, + (p, stime_xattr_prefix + ".stime", + struct.pack("!II", 0, 0)), + [ENOENT, ENODATA]) + errno_wrap(Xattr.lremovexattr, + (p, stime_xattr_prefix + ".entry_stime"), + [ENOENT, ENODATA]) + + return + + +def print_config(name, value, only_value=False, use_underscore=False): + val = value + if isinstance(value, bool): + val = str(value).lower() + + if only_value: + print(val) + else: + if use_underscore: + name = name.replace("-", "_") + + print("%s:%s" % (name, val)) + + +def config_name_format(val): + return val.replace("_", "-") + + +def subcmd_config_get(args): + import sys + + all_config = gconf.getall(show_defaults=args.show_defaults, + show_non_configurable=True) + if args.name is not None: + val = all_config.get(config_name_format(args.name), None) + if val is None: + sys.stderr.write("Invalid config name \"%s\"\n" % args.name) + sys.exit(ERROR_CONFIG_INVALID) + + print_config(args.name, val, only_value=args.only_value, + use_underscore=args.use_underscore) + return + + for k in sorted(all_config): + print_config(k, all_config[k], use_underscore=args.use_underscore) + + +def subcmd_config_check(args): + import sys + + try: + gconf.check(config_name_format(args.name), value=args.value, + with_conffile=False) + except gconf.GconfNotConfigurable: + cnf_val = gconf.get(config_name_format(args.name), None) + if cnf_val is None: + sys.stderr.write("Invalid config name \"%s\"\n" % args.name) + sys.exit(ERROR_CONFIG_INVALID) + + # Not configurable + sys.stderr.write("Not configurable \"%s\"\n" % args.name) + sys.exit(ERROR_CONFIG_NOT_CONFIGURABLE) + except gconf.GconfInvalidValue: + sys.stderr.write("Invalid config value \"%s=%s\"\n" % (args.name, + args.value)) + sys.exit(ERROR_CONFIG_INVALID_VALUE) + + +def subcmd_config_set(args): + import sys + + try: + gconf.setconfig(config_name_format(args.name), args.value) + except gconf.GconfNotConfigurable: + cnf_val = gconf.get(config_name_format(args.name), None) + if cnf_val is None: + sys.stderr.write("Invalid config name \"%s\"\n" % args.name) + sys.exit(ERROR_CONFIG_INVALID) + + # Not configurable + sys.stderr.write("Not configurable \"%s\"\n" % args.name) + sys.exit(ERROR_CONFIG_NOT_CONFIGURABLE) + except gconf.GconfInvalidValue: + sys.stderr.write("Invalid config value \"%s=%s\"\n" % (args.name, + args.value)) + sys.exit(ERROR_CONFIG_INVALID_VALUE) + + +def subcmd_config_reset(args): + import sys + + try: + gconf.resetconfig(config_name_format(args.name)) + except gconf.GconfNotConfigurable: + cnf_val = gconf.get(config_name_format(args.name), None) + if cnf_val is None: + sys.stderr.write("Invalid config name \"%s\"\n" % args.name) + sys.exit(ERROR_CONFIG_INVALID) + + # Not configurable + sys.stderr.write("Not configurable \"%s\"\n" % args.name) + sys.exit(ERROR_CONFIG_NOT_CONFIGURABLE) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index e611b7b6ae5..bc03522fdda 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -15,19 +15,19 @@ import time import fcntl import shutil import logging -import socket import errno import threading import subprocess from subprocess import PIPE from threading import Lock, Thread as baseThread from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED -from errno import EINTR, ENOENT, EPERM, ESTALE, EBUSY, errorcode +from errno import EINTR, ENOENT, ESTALE, EBUSY, errorcode from signal import signal, SIGTERM import select as oselect from os import waitpid as owaitpid import xml.etree.ElementTree as XET from select import error as SelectError +from cPickle import PickleError from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE sys.path.insert(1, GLUSTERFS_LIBEXECDIR) @@ -46,25 +46,10 @@ except ImportError: EVENT_GEOREP_PASSIVE = None EVENT_GEOREP_CHECKPOINT_COMPLETED = None -try: - from cPickle import PickleError -except ImportError: - # py 3 - from pickle import PickleError - -from gconf import gconf +import gsyncdconfig as gconf +from rconf import rconf -try: - # py 3 - from urllib import parse as urllib -except ImportError: - import urllib - -try: - from hashlib import md5 as md5 -except ImportError: - # py 2.4 - from md5 import new as md5 +from hashlib import md5 as md5 # auxiliary gfid based access prefix _CL_AUX_GFID_PFX = ".gfid/" @@ -80,6 +65,10 @@ SPACE_ESCAPE_CHAR = "%20" NEWLINE_ESCAPE_CHAR = "%0A" PERCENTAGE_ESCAPE_CHAR = "%25" +final_lock = Lock() + +mntpt_list = [] + def sup(x, *a, **kw): """a rubyesque "super" for python ;) @@ -93,12 +82,7 @@ def sup(x, *a, **kw): def escape(s): """the chosen flavor of string escaping, used all over to turn whatever data to creatable representation""" - return urllib.quote_plus(s) - - -def unescape(s): - """inverse of .escape""" - return urllib.unquote_plus(s) + return s.replace("/", "-").strip("-") def escape_space_newline(s): @@ -170,17 +154,17 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url): """ Setup GConf ssh control path parameters """ - gconf.ssh_ctl_dir = ctld + rconf.ssh_ctl_dir = ctld content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr, resource_url) content_md5 = md5hex(content) - fname = os.path.join(gconf.ssh_ctl_dir, + fname = os.path.join(rconf.ssh_ctl_dir, "%s.mft" % content_md5) create_manifest(fname, content) - ssh_ctl_path = os.path.join(gconf.ssh_ctl_dir, + ssh_ctl_path = os.path.join(rconf.ssh_ctl_dir, "%s.sock" % content_md5) - gconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path] + rconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path] def grabfile(fname, content=None): @@ -207,32 +191,30 @@ def grabfile(fname, content=None): except: f.close() raise - gconf.permanent_handles.append(f) + rconf.permanent_handles.append(f) return f def grabpidfile(fname=None, setpid=True): """.grabfile customization for pid files""" if not fname: - fname = gconf.pid_file + fname = gconf.get("pid-file") content = None if setpid: content = str(os.getpid()) + '\n' return grabfile(fname, content=content) -final_lock = Lock() -mntpt_list = [] -def finalize(*a, **kw): +def finalize(*args, **kwargs): """all those messy final steps we go trough upon termination Do away with pidfile, ssh control dir and logging. """ final_lock.acquire() - if getattr(gconf, 'pid_file', None): - rm_pidf = gconf.pid_file_owned - if gconf.cpid: + if gconf.get('pid_file'): + rm_pidf = rconf.pid_file_owned + if rconf.cpid: # exit path from parent branch of daemonization rm_pidf = False while True: @@ -240,37 +222,31 @@ def finalize(*a, **kw): if not f: # child has already taken over pidfile break - if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid: + if os.waitpid(rconf.cpid, os.WNOHANG)[0] == rconf.cpid: # child has terminated rm_pidf = True break time.sleep(0.1) if rm_pidf: try: - os.unlink(gconf.pid_file) + os.unlink(rconf.pid_file) except: ex = sys.exc_info()[1] if ex.errno == ENOENT: pass else: raise - if gconf.ssh_ctl_dir and not gconf.cpid: + if rconf.ssh_ctl_dir and not rconf.cpid: def handle_rm_error(func, path, exc_info): if exc_info[1].errno == ENOENT: return raise exc_info[1] - shutil.rmtree(gconf.ssh_ctl_dir, onerror=handle_rm_error) - if getattr(gconf, 'state_socket', None): - try: - os.unlink(gconf.state_socket) - except: - if sys.exc_info()[0] == OSError: - pass + shutil.rmtree(rconf.ssh_ctl_dir, onerror=handle_rm_error) """ Unmount if not done """ for mnt in mntpt_list: - p0 = subprocess.Popen (["umount", "-l", mnt], stderr=subprocess.PIPE) + p0 = subprocess.Popen(["umount", "-l", mnt], stderr=subprocess.PIPE) _, errdata = p0.communicate() if p0.returncode == 0: try: @@ -280,12 +256,11 @@ def finalize(*a, **kw): else: pass - if gconf.log_exit: + if rconf.log_exit: logging.info("exiting.") sys.stdout.flush() sys.stderr.flush() - os._exit(kw.get('exval', 0)) - + os._exit(kwargs.get('exval', 0)) def log_raise_exception(excont): @@ -315,9 +290,9 @@ def log_raise_exception(excont): ((isinstance(exc, OSError) or isinstance(exc, IOError)) and exc.errno == EPIPE): logging.error('connection to peer is broken') - if hasattr(gconf, 'transport'): - gconf.transport.wait() - if gconf.transport.returncode == 127: + if hasattr(rconf, 'transport'): + rconf.transport.wait() + if rconf.transport.returncode == 127: logging.error("getting \"No such file or directory\"" "errors is most likely due to " "MISCONFIGURATION, please remove all " @@ -331,7 +306,7 @@ def log_raise_exception(excont): "<SLAVEVOL> config remote-gsyncd " "<GSYNCD_PATH> (Example GSYNCD_PATH: " "`/usr/libexec/glusterfs/gsyncd`)") - gconf.transport.terminate_geterr() + rconf.transport.terminate_geterr() elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED): logging.error(lf('glusterfs session went down', @@ -365,20 +340,20 @@ class Thread(baseThread): function coughs up an exception """ - def __init__(self, *a, **kw): - tf = kw.get('target') + def __init__(self, *args, **kwargs): + tf = kwargs.get('target') if tf: - def twrap(*aa): + def twrap(*aargs): excont = FreeObject(exval=0) try: - tf(*aa) + tf(*aargs) except: try: log_raise_exception(excont) finally: finalize(exval=excont.exval) - kw['target'] = twrap - baseThread.__init__(self, *a, **kw) + kwargs['target'] = twrap + baseThread.__init__(self, *args, **kwargs) self.setDaemon(True) @@ -443,7 +418,7 @@ def boolify(s): lstr = s.lower() if lstr in true_list: rv = True - elif not lstr in false_list: + elif lstr not in false_list: logging.warn(lf("Unknown string in \"string to boolean\" conversion, " "defaulting to False", str=s)) @@ -451,29 +426,33 @@ def boolify(s): return rv -def eintr_wrap(func, exc, *a): +def eintr_wrap(func, exc, *args): """ wrapper around syscalls resilient to interrupt caused by signals """ while True: try: - return func(*a) + return func(*args) except exc: ex = sys.exc_info()[1] if not ex.args[0] == EINTR: raise -def select(*a): - return eintr_wrap(oselect.select, oselect.error, *a) +def select(*args): + return eintr_wrap(oselect.select, oselect.error, *args) + + +def waitpid(*args): + return eintr_wrap(owaitpid, OSError, *args) -def waitpid(*a): - return eintr_wrap(owaitpid, OSError, *a) +def term_handler_default_hook(signum, frame): + finalize(signum, frame, exval=1) -def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})): +def set_term_handler(hook=term_handler_default_hook): signal(SIGTERM, hook) @@ -550,7 +529,7 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]): ex = sys.exc_info()[1] if ex.errno in errnos: return ex.errno - if not ex.errno in retry_errnos: + if ex.errno not in retry_errnos: raise nr_tries += 1 if nr_tries == GF_OP_RETRIES: |