diff options
Diffstat (limited to 'geo-replication/syncdaemon')
| -rw-r--r-- | geo-replication/syncdaemon/Makefile.am | 4 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/README.md | 1 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/argsupgrade.py | 21 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/changelogagent.py | 78 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/conf.py.in | 1 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 43 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncdconfig.py | 102 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncdstatus.py | 7 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/libcxattr.py | 21 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/libgfchangelog.py | 253 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 278 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 121 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/py2py3.py | 184 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/rconf.py | 6 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/repce.py | 17 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 362 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/subcmds.py | 29 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 190 |
18 files changed, 1072 insertions, 646 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index 19f0bfce1b7..d70e3368faf 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -2,7 +2,7 @@ syncdaemondir = $(GLUSTERFS_LIBEXECDIR)/python/syncdaemon syncdaemon_PYTHON = rconf.py gsyncd.py __init__.py master.py README.md repce.py \ resource.py syncdutils.py monitor.py libcxattr.py gsyncdconfig.py \ - libgfchangelog.py changelogagent.py gsyncdstatus.py conf.py logutils.py \ - subcmds.py argsupgrade.py + libgfchangelog.py gsyncdstatus.py conf.py logutils.py \ + subcmds.py argsupgrade.py py2py3.py CLEANFILES = diff --git a/geo-replication/syncdaemon/README.md b/geo-replication/syncdaemon/README.md index 2a202e3f99e..5ab785ae669 100644 --- a/geo-replication/syncdaemon/README.md +++ b/geo-replication/syncdaemon/README.md @@ -19,7 +19,6 @@ INSTALLATION As of now, the supported way of operation is running from the source directory or using the RPMs given. -If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/). CONFIGURATION ------------- diff --git a/geo-replication/syncdaemon/argsupgrade.py b/geo-replication/syncdaemon/argsupgrade.py index 632271daf81..7af40633ef8 100644 --- a/geo-replication/syncdaemon/argsupgrade.py +++ b/geo-replication/syncdaemon/argsupgrade.py @@ -84,6 +84,10 @@ def upgrade(): # fail when it does stat to check the existence. init_gsyncd_template_conf() + inet6 = False + if "--inet6" in sys.argv: + inet6 = True + if "--monitor" in sys.argv: # python gsyncd.py --path=/bricks/b1 # --monitor -c gsyncd.conf @@ -147,8 +151,11 @@ def upgrade(): user, hname = remote_addr.split("@") + if not inet6: + hname = gethostbyname(hname) + print(("ssh://%s@%s:gluster://127.0.0.1:%s" % ( - user, gethostbyname(hname), vol))) + user, hname, vol))) sys.exit(0) elif "--normalize-url" in sys.argv: @@ -268,7 +275,9 @@ def upgrade(): p = ArgumentParser() p.add_argument("master") p.add_argument("slave") - p.add_argument("--config-set", nargs=2) + p.add_argument("--config-set", action='store_true') + p.add_argument("name") + p.add_argument("--value") p.add_argument("-c") pargs = p.parse_known_args(sys.argv[1:])[0] @@ -280,8 +289,8 @@ def upgrade(): "config-set", pargs.master.strip(":"), slave_url(pargs.slave), - pargs.config_set[0], - pargs.config_set[1] + "--name=%s" % pargs.name, + "--value=%s" % pargs.value ] elif "--config-check" in sys.argv: # --config-check georep_session_working_dir @@ -344,3 +353,7 @@ def upgrade(): if pargs.reset_sync_time: sys.argv.append("--reset-sync-time") + + if inet6: + # Add `--inet6` as first argument + sys.argv = [sys.argv[0], "--inet6"] + sys.argv[1:] diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py deleted file mode 100644 index 5eade137d25..00000000000 --- a/geo-replication/syncdaemon/changelogagent.py +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/python2 -# -# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> -# This file is part of GlusterFS. - -# This file is licensed to you under your choice of the GNU Lesser -# General Public License, version 3 or any later version (LGPLv3 or -# later), or the GNU General Public License, version 2 (GPLv2), in all -# cases as published by the Free Software Foundation. -# - -import logging -import syncdutils -from syncdutils import select, CHANGELOG_AGENT_SERVER_VERSION -from repce import RepceServer - - -class _MetaChangelog(object): - - def __getattr__(self, meth): - from libgfchangelog import Changes as LChanges - xmeth = [m for m in dir(LChanges) if m[0] != '_'] - if meth not in xmeth: - return - for m in xmeth: - setattr(self, m, getattr(LChanges, m)) - return getattr(self, meth) - -Changes = _MetaChangelog() - - -class Changelog(object): - def version(self): - return CHANGELOG_AGENT_SERVER_VERSION - - def init(self): - return Changes.cl_init() - - def register(self, cl_brick, cl_dir, cl_log, cl_level, retries=0): - return Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) - - def scan(self): - return Changes.cl_scan() - - def getchanges(self): - return Changes.cl_getchanges() - - def done(self, clfile): - return Changes.cl_done(clfile) - - def history(self, changelog_path, start, end, num_parallel): - return Changes.cl_history_changelog(changelog_path, start, end, - num_parallel) - - def history_scan(self): - return Changes.cl_history_scan() - - def history_getchanges(self): - return Changes.cl_history_getchanges() - - def history_done(self, clfile): - return Changes.cl_history_done(clfile) - - -class ChangelogAgent(object): - def __init__(self, obj, fd_tup): - (inf, ouf, rw, ww) = fd_tup.split(',') - repce = RepceServer(obj, int(inf), int(ouf), 1) - t = syncdutils.Thread(target=lambda: (repce.service_loop(), - syncdutils.finalize())) - t.start() - logging.info('Agent listining...') - - select((), (), ()) - - -def agent(obj, fd_tup): - return ChangelogAgent(obj, fd_tup) diff --git a/geo-replication/syncdaemon/conf.py.in b/geo-replication/syncdaemon/conf.py.in index 5846b9b5f26..2042fa9cdfb 100644 --- a/geo-replication/syncdaemon/conf.py.in +++ b/geo-replication/syncdaemon/conf.py.in @@ -1,4 +1,3 @@ -#!/usr/bin/python2 # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> # This file is part of GlusterFS. diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 77fca4c9d1f..257ed72c6ae 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -1,4 +1,4 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -22,8 +22,8 @@ import gsyncdconfig as gconf from rconf import rconf import subcmds from conf import GLUSTERD_WORKDIR, GLUSTERFS_CONFDIR, GCONF_VERSION -from syncdutils import set_term_handler, finalize, lf -from syncdutils import log_raise_exception, FreeObject, escape +from syncdutils import (set_term_handler, finalize, lf, + log_raise_exception, FreeObject, escape) import argsupgrade @@ -47,6 +47,7 @@ def main(): sys.exit(0) parser = ArgumentParser() + parser.add_argument("--inet6", action="store_true") sp = parser.add_subparsers(dest="subcmd") # Monitor Status File update @@ -78,8 +79,6 @@ def main(): help="feedback fd between monitor and worker") p.add_argument("--local-node", help="Local master node") p.add_argument("--local-node-id", help="Local Node ID") - p.add_argument("--rpc-fd", - help="Read and Write fds for worker-agent communication") p.add_argument("--subvol-num", type=int, help="Subvolume number") p.add_argument("--is-hottier", action="store_true", help="Is this brick part of hot tier") @@ -91,19 +90,6 @@ def main(): p.add_argument("-c", "--config-file", help="Config File") p.add_argument("--debug", action="store_true") - # Agent - p = sp.add_parser("agent") - p.add_argument("master", help="Master Volume Name") - p.add_argument("slave", help="Slave details user@host::vol format") - p.add_argument("--local-path", help="Local brick path") - p.add_argument("--local-node", help="Local master node") - p.add_argument("--local-node-id", help="Local Node ID") - p.add_argument("--slave-id", help="Slave Volume ID") - p.add_argument("--rpc-fd", - help="Read and Write fds for worker-agent communication") - p.add_argument("-c", "--config-file", help="Config File") - p.add_argument("--debug", action="store_true") - # Slave p = sp.add_parser("slave") p.add_argument("master", help="Master Volume Name") @@ -133,6 +119,8 @@ def main(): help="Directory where Gluster binaries exist on slave") p.add_argument("--slave-access-mount", action="store_true", help="Do not lazy umount the slave volume") + p.add_argument("--master-dist-count", type=int, + help="Master Distribution count") # Status p = sp.add_parser("status") @@ -165,8 +153,8 @@ def main(): p = sp.add_parser("config-set") p.add_argument("master", help="Master Volume Name") p.add_argument("slave", help="Slave") - p.add_argument("name", help="Config Name") - p.add_argument("value", help="Config Value") + p.add_argument("-n", "--name", help="Config Name") + p.add_argument("-v", "--value", help="Config Value") p.add_argument("-c", "--config-file", help="Config File") p.add_argument("--debug", action="store_true") @@ -228,7 +216,8 @@ def main(): # Set default path for config file in that case # If an subcmd accepts config file then it also accepts # master and Slave arguments. - if config_file is None and hasattr(args, "config_file"): + if config_file is None and hasattr(args, "config_file") \ + and args.subcmd != "slave": config_file = "%s/geo-replication/%s_%s_%s/gsyncd.conf" % ( GLUSTERD_WORKDIR, args.master, @@ -252,6 +241,12 @@ def main(): if args.subcmd == "slave": override_from_args = True + if config_file is not None and \ + args.subcmd in ["monitor", "config-get", "config-set", "config-reset"]: + ret = gconf.is_config_file_old(config_file, args.master, extra_tmpl_args["slavevol"]) + if ret is not None: + gconf.config_upgrade(config_file, ret) + # Load Config file gconf.load(GLUSTERFS_CONFDIR + "/gsyncd.conf", config_file, @@ -261,8 +256,8 @@ def main(): # Default label to print in log file label = args.subcmd - if args.subcmd in ("worker", "agent"): - # If Worker or agent, then add brick path also to label + if args.subcmd in ("worker"): + # If Worker, then add brick path also to label label = "%s %s" % (args.subcmd, args.local_path) elif args.subcmd == "slave": # If Slave add Master node and Brick details @@ -305,7 +300,7 @@ def main(): # Log message for loaded config file if config_file is not None: - logging.info(lf("Using session config file", path=config_file)) + logging.debug(lf("Using session config file", path=config_file)) set_term_handler() excont = FreeObject(exval=0) diff --git a/geo-replication/syncdaemon/gsyncdconfig.py b/geo-replication/syncdaemon/gsyncdconfig.py index b2517f031ca..8848071997a 100644 --- a/geo-replication/syncdaemon/gsyncdconfig.py +++ b/geo-replication/syncdaemon/gsyncdconfig.py @@ -10,12 +10,14 @@ # try: - from configparser import configparser as ConfigParser, NoSectionError + from ConfigParser import RawConfigParser, NoSectionError except ImportError: - from ConfigParser import ConfigParser, NoSectionError + from configparser import RawConfigParser, NoSectionError import os +import shutil from string import Template from datetime import datetime +from threading import Lock # Global object which can be used in other modules @@ -34,6 +36,7 @@ class GconfInvalidValue(Exception): class Gconf(object): def __init__(self, default_conf_file, custom_conf_file=None, args={}, extra_tmpl_args={}, override_from_args=False): + self.lock = Lock() self.default_conf_file = default_conf_file self.custom_conf_file = custom_conf_file self.tmp_conf_file = None @@ -50,7 +53,7 @@ class Gconf(object): self.args = args self.extra_tmpl_args = extra_tmpl_args self.override_from_args = override_from_args - # Store default values only if overwriten, Only for JSON/CLI output + # Store default values only if overwritten, Only for JSON/CLI output self.default_values = {} self._load() @@ -91,7 +94,7 @@ class Gconf(object): if name != "all" and not self._is_configurable(name): raise GconfNotConfigurable() - cnf = ConfigParser() + cnf = RawConfigParser() with open(self.custom_conf_file) as f: cnf.readfp(f) @@ -135,7 +138,7 @@ class Gconf(object): if curr_val == value: return True - cnf = ConfigParser() + cnf = RawConfigParser() with open(self.custom_conf_file) as f: cnf.readfp(f) @@ -162,6 +165,11 @@ class Gconf(object): if value is not None and not self._is_valid_value(name, value): raise GconfInvalidValue() + + def _load_with_lock(self): + with self.lock: + self._load() + def _load(self): self.gconf = {} self.template_conf = [] @@ -170,7 +178,7 @@ class Gconf(object): self.session_conf_items = [] self.default_values = {} - conf = ConfigParser() + conf = RawConfigParser() # Default Template config file with open(self.default_conf_file) as f: conf.readfp(f) @@ -229,12 +237,19 @@ class Gconf(object): self._tmpl_substitute() self._do_typecast() - def reload(self): + def reload(self, with_lock=True): if self._is_config_changed(): - self._load() + if with_lock: + self._load_with_lock() + else: + self._load() - def get(self, name, default_value=None): - return self.gconf.get(name, default_value) + def get(self, name, default_value=None, with_lock=True): + if with_lock: + with self.lock: + return self.gconf.get(name, default_value) + else: + return self.gconf.get(name, default_value) def getall(self, show_defaults=False, show_non_configurable=False): cnf = {} @@ -275,8 +290,9 @@ class Gconf(object): return cnf def getr(self, name, default_value=None): - self.reload() - return self.get(name, default_value) + with self.lock: + self.reload(with_lock=False) + return self.get(name, default_value, with_lock=False) def get_help(self, name=None): pass @@ -313,6 +329,9 @@ class Gconf(object): if item["validation"] == "unixtime": return validate_unixtime(value) + if item["validation"] == "int": + return validate_int(value) + return False def _is_config_changed(self): @@ -325,6 +344,53 @@ class Gconf(object): return False +def is_config_file_old(config_file, mastervol, slavevol): + cnf = RawConfigParser() + cnf.read(config_file) + session_section = "peers %s %s" % (mastervol, slavevol) + try: + return dict(cnf.items(session_section)) + except NoSectionError: + return None + +def config_upgrade(config_file, ret): + config_file_backup = os.path.join(os.path.dirname(config_file), "gsyncd.conf.bkp") + + #copy old config file in a backup file + shutil.copyfile(config_file, config_file_backup) + + #write a new config file + config = RawConfigParser() + config.add_section('vars') + + for key, value in ret.items(): + #handle option name changes + if key == "use_tarssh": + new_key = "sync-method" + if value == "true": + new_value = "tarssh" + else: + new_value = "rsync" + config.set('vars', new_key, new_value) + elif key == "timeout": + new_key = "slave-timeout" + config.set('vars', new_key, value) + #for changes like: ignore_deletes to ignore-deletes + else: + new_key = key.replace("_", "-") + config.set('vars', new_key, value) + + with open(config_file, 'w') as configfile: + config.write(configfile) + + +def validate_int(value): + try: + _ = int(value) + return True + except ValueError: + return False + def validate_unixtime(value): try: @@ -338,11 +404,13 @@ def validate_unixtime(value): def validate_minmax(value, minval, maxval): - value = int(value) - minval = int(minval) - maxval = int(maxval) - - return value >= minval and value <= maxval + try: + value = int(value) + minval = int(minval) + maxval = int(maxval) + return value >= minval and value <= maxval + except ValueError: + return False def validate_choice(value, allowed_values): diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py index e8a810f4b38..1a655ff8887 100644 --- a/geo-replication/syncdaemon/gsyncdstatus.py +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -1,4 +1,4 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # # Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> # This file is part of GlusterFS. @@ -23,8 +23,8 @@ from datetime import datetime from errno import EACCES, EAGAIN, ENOENT import logging -from syncdutils import EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event -from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED, lf +from syncdutils import (EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event, + EVENT_GEOREP_CHECKPOINT_COMPLETED, lf) DEFAULT_STATUS = "N/A" MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped") @@ -103,6 +103,7 @@ class LockedOpen(object): return f def __exit__(self, _exc_type, _exc_value, _traceback): + fcntl.flock(self.fileobj, fcntl.LOCK_UN) self.fileobj.close() diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py index f576648b7a8..e6406c36bd7 100644 --- a/geo-replication/syncdaemon/libcxattr.py +++ b/geo-replication/syncdaemon/libcxattr.py @@ -9,12 +9,14 @@ # import os -from ctypes import CDLL, create_string_buffer, get_errno +from ctypes import CDLL, get_errno +from py2py3 import (bytearray_to_str, gr_create_string_buffer, + gr_query_xattr, gr_lsetxattr, gr_lremovexattr) class Xattr(object): - """singleton that wraps the extended attribues system + """singleton that wraps the extended attributes system interface for python using ctypes Just implement it to the degree we need it, in particular @@ -38,20 +40,23 @@ class Xattr(object): @classmethod def _query_xattr(cls, path, siz, syscall, *a): if siz: - buf = create_string_buffer('\0' * siz) + buf = gr_create_string_buffer(siz) else: buf = None ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz))) if ret == -1: cls.raise_oserr() if siz: - return buf.raw[:ret] + # py2 and py3 compatibility. Convert bytes array + # to string + result = bytearray_to_str(buf.raw) + return result[:ret] else: return ret @classmethod def lgetxattr(cls, path, attr, siz=0): - return cls._query_xattr(path, siz, 'lgetxattr', attr) + return gr_query_xattr(cls, path, siz, 'lgetxattr', attr) @classmethod def lgetxattr_buf(cls, path, attr): @@ -65,7 +70,7 @@ class Xattr(object): @classmethod def llistxattr(cls, path, siz=0): - ret = cls._query_xattr(path, siz, 'llistxattr') + ret = gr_query_xattr(cls, path, siz, 'llistxattr') if isinstance(ret, str): ret = ret.strip('\0') ret = ret.split('\0') if ret else [] @@ -73,13 +78,13 @@ class Xattr(object): @classmethod def lsetxattr(cls, path, attr, val): - ret = cls.libc.lsetxattr(path, attr, val, len(val), 0) + ret = gr_lsetxattr(cls, path, attr, val) if ret == -1: cls.raise_oserr() @classmethod def lremovexattr(cls, path, attr): - ret = cls.libc.lremovexattr(path, attr) + ret = gr_lremovexattr(cls, path, attr) if ret == -1: cls.raise_oserr() diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py index da12438d069..a3bda7282c0 100644 --- a/geo-replication/syncdaemon/libgfchangelog.py +++ b/geo-replication/syncdaemon/libgfchangelog.py @@ -9,130 +9,135 @@ # import os -from ctypes import CDLL, RTLD_GLOBAL, create_string_buffer, \ - get_errno, byref, c_ulong +from ctypes import CDLL, RTLD_GLOBAL, get_errno, byref, c_ulong +from ctypes.util import find_library from syncdutils import ChangelogException, ChangelogHistoryNotAvailable +from py2py3 import (gr_cl_history_changelog, gr_cl_done, + gr_create_string_buffer, gr_cl_register, + gr_cl_history_done, bytearray_to_str) -class Changes(object): - libgfc = CDLL("libgfchangelog.so", mode=RTLD_GLOBAL, - use_errno=True) - - @classmethod - def geterrno(cls): - return get_errno() - - @classmethod - def raise_changelog_err(cls): - errn = cls.geterrno() - raise ChangelogException(errn, os.strerror(errn)) - - @classmethod - def _get_api(cls, call): - return getattr(cls.libgfc, call) - - @classmethod - def cl_init(cls): - ret = cls._get_api('gf_changelog_init')(None) - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_register(cls, brick, path, log_file, log_level, retries=0): - ret = cls._get_api('gf_changelog_register')(brick, path, - log_file, - log_level, retries) - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_scan(cls): - ret = cls._get_api('gf_changelog_scan')() - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_startfresh(cls): - ret = cls._get_api('gf_changelog_start_fresh')() - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_getchanges(cls): - """ remove hardcoding for path name length """ - def clsort(f): - return f.split('.')[-1] - changes = [] - buf = create_string_buffer('\0', 4096) - call = cls._get_api('gf_changelog_next_change') - - while True: - ret = call(buf, 4096) - if ret in (0, -1): - break - changes.append(buf.raw[:ret - 1]) - if ret == -1: - cls.raise_changelog_err() - # cleanup tracker - cls.cl_startfresh() - return sorted(changes, key=clsort) - - @classmethod - def cl_done(cls, clfile): - ret = cls._get_api('gf_changelog_done')(clfile) - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_history_scan(cls): - ret = cls._get_api('gf_history_changelog_scan')() - if ret == -1: - cls.raise_changelog_err() - - return ret - - @classmethod - def cl_history_changelog(cls, changelog_path, start, end, num_parallel): - actual_end = c_ulong() - ret = cls._get_api('gf_history_changelog')(changelog_path, start, end, - num_parallel, - byref(actual_end)) - if ret == -1: - cls.raise_changelog_err() - - if ret == -2: - raise ChangelogHistoryNotAvailable() - - return (ret, actual_end.value) - - @classmethod - def cl_history_startfresh(cls): - ret = cls._get_api('gf_history_changelog_start_fresh')() - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_history_getchanges(cls): - """ remove hardcoding for path name length """ - def clsort(f): - return f.split('.')[-1] - - changes = [] - buf = create_string_buffer('\0', 4096) - call = cls._get_api('gf_history_changelog_next_change') - - while True: - ret = call(buf, 4096) - if ret in (0, -1): - break - changes.append(buf.raw[:ret - 1]) - if ret == -1: - cls.raise_changelog_err() - - return sorted(changes, key=clsort) - - @classmethod - def cl_history_done(cls, clfile): - ret = cls._get_api('gf_history_changelog_done')(clfile) - if ret == -1: - cls.raise_changelog_err() +libgfc = CDLL( + find_library("gfchangelog"), + mode=RTLD_GLOBAL, + use_errno=True +) + + +def _raise_changelog_err(): + errn = get_errno() + raise ChangelogException(errn, os.strerror(errn)) + + +def _init(): + if libgfc.gf_changelog_init(None) == -1: + _raise_changelog_err() + + +def register(brick, path, log_file, log_level, retries=0): + _init() + + ret = gr_cl_register(libgfc, brick, path, log_file, log_level, retries) + + if ret == -1: + _raise_changelog_err() + + +def scan(): + ret = libgfc.gf_changelog_scan() + if ret == -1: + _raise_changelog_err() + + +def startfresh(): + ret = libgfc.gf_changelog_start_fresh() + if ret == -1: + _raise_changelog_err() + + +def getchanges(): + def clsort(cfile): + return cfile.split('.')[-1] + + changes = [] + buf = gr_create_string_buffer(4096) + call = libgfc.gf_changelog_next_change + + while True: + ret = call(buf, 4096) + if ret in (0, -1): + break + + # py2 and py3 compatibility + result = bytearray_to_str(buf.raw[:ret - 1]) + changes.append(result) + + if ret == -1: + _raise_changelog_err() + + # cleanup tracker + startfresh() + + return sorted(changes, key=clsort) + + +def done(clfile): + ret = gr_cl_done(libgfc, clfile) + if ret == -1: + _raise_changelog_err() + + +def history_scan(): + ret = libgfc.gf_history_changelog_scan() + if ret == -1: + _raise_changelog_err() + + return ret + + +def history_changelog(changelog_path, start, end, num_parallel): + actual_end = c_ulong() + ret = gr_cl_history_changelog(libgfc, changelog_path, start, end, + num_parallel, byref(actual_end)) + if ret == -1: + _raise_changelog_err() + + if ret == -2: + raise ChangelogHistoryNotAvailable() + + return (ret, actual_end.value) + + +def history_startfresh(): + ret = libgfc.gf_history_changelog_start_fresh() + if ret == -1: + _raise_changelog_err() + + +def history_getchanges(): + def clsort(cfile): + return cfile.split('.')[-1] + + changes = [] + buf = gr_create_string_buffer(4096) + call = libgfc.gf_history_changelog_next_change + + while True: + ret = call(buf, 4096) + if ret in (0, -1): + break + + # py2 and py3 compatibility + result = bytearray_to_str(buf.raw[:ret - 1]) + changes.append(result) + + if ret == -1: + _raise_changelog_err() + + return sorted(changes, key=clsort) + + +def history_done(clfile): + ret = gr_cl_history_done(libgfc, clfile) + if ret == -1: + _raise_changelog_err() diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index d9f63a440fb..9501aeae6b5 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -22,11 +22,13 @@ from threading import Condition, Lock from datetime import datetime import gsyncdconfig as gconf +import libgfchangelog from rconf import rconf -from syncdutils import Thread, GsyncdError, escape_space_newline -from syncdutils import unescape_space_newline, gauxpfx, escape -from syncdutils import lstat, errno_wrap, FreeObject, lf, matching_disk_gfid -from syncdutils import NoStimeAvailable, PartialHistoryAvailable +from syncdutils import (Thread, GsyncdError, escape_space_newline, + unescape_space_newline, gauxpfx, escape, + lstat, errno_wrap, FreeObject, lf, matching_disk_gfid, + NoStimeAvailable, PartialHistoryAvailable, + host_brick_split) URXTIME = (-1, 0) @@ -65,6 +67,9 @@ def _volinfo_hook_relax_foreign(self): def edct(op, **ed): dct = {} dct['op'] = op + # This is used in automatic gfid conflict resolution. + # When marked True, it's skipped during re-processing. + dct['skip_entry'] = False for k in ed: if k == 'stat': st = ed[k] @@ -192,7 +197,7 @@ class NormalMixin(object): vi = vi.copy() vi['timeout'] = int(time.time()) + timo else: - # send keep-alives more frequently to + # send keep-alive more frequently to # avoid a delay in announcing our volume info # to slave if it becomes established in the # meantime @@ -448,18 +453,10 @@ class GMasterCommon(object): if rconf.mgmt_lock_fd: try: fcntl.lockf(rconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) - if not rconf.active_earlier: - rconf.active_earlier = True - logging.info(lf("Got lock Becoming ACTIVE", - brick=rconf.args.local_path)) return True except: ex = sys.exc_info()[1] if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): - if not rconf.passive_earlier: - rconf.passive_earlier = True - logging.info(lf("Didn't get lock Becoming PASSIVE", - brick=rconf.local_path)) return False raise @@ -494,18 +491,10 @@ class GMasterCommon(object): ex = sys.exc_info()[1] if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): # cannot grab, it's taken - if not rconf.passive_earlier: - rconf.passive_earlier = True - logging.info(lf("Didn't get lock Becoming PASSIVE", - brick=rconf.args.local_path)) rconf.mgmt_lock_fd = fd return False raise - if not rconf.active_earlier: - rconf.active_earlier = True - logging.info(lf("Got lock Becoming ACTIVE", - brick=rconf.args.local_path)) return True def should_crawl(self): @@ -529,8 +518,8 @@ class GMasterCommon(object): # If crawlwrap is called when partial history available, # then it sets register_time which is the time when geo-rep - # worker registerd to changelog consumption. Since nsec is - # not considered in register time, their are chances of skipping + # worker registered to changelog consumption. Since nsec is + # not considered in register time, there are chances of skipping # changes detection in xsync crawl. This limit will be reset when # crawlwrap is called again. self.live_changelog_start_time = None @@ -540,7 +529,7 @@ class GMasterCommon(object): # no need to maintain volinfo state machine. # in a cascading setup, each geo-replication session is # independent (ie. 'volume-mark' and 'xtime' are not - # propogated). This is because the slave's xtime is now + # propagated). This is because the slave's xtime is now # stored on the master itself. 'volume-mark' just identifies # that we are in a cascading setup and need to enable # 'geo-replication.ignore-pid-check' option. @@ -711,7 +700,8 @@ class GMasterChangelogMixin(GMasterCommon): TYPE_GFID = "D " TYPE_ENTRY = "E " - MAX_EF_RETRIES = 15 + MAX_EF_RETRIES = 10 + MAX_OE_RETRIES = 10 # flat directory hierarchy for gfid based access FLAT_DIR_HIERARCHY = '.' @@ -803,21 +793,29 @@ class GMasterChangelogMixin(GMasterCommon): self.status.inc_value("failures", num_failures) - def fix_possible_entry_failures(self, failures, retry_count): + def fix_possible_entry_failures(self, failures, retry_count, entries): pfx = gauxpfx() fix_entry_ops = [] failures1 = [] + remove_gfids = set() for failure in failures: - if failure[2]['dst']: + if failure[2]['name_mismatch']: + pbname = failure[2]['slave_entry'] + elif failure[2]['dst']: pbname = failure[0]['entry1'] else: pbname = failure[0]['entry'] - if failure[2]['gfid_mismatch']: + + op = failure[0]['op'] + # name exists but gfid is different + if failure[2]['gfid_mismatch'] or failure[2]['name_mismatch']: slave_gfid = failure[2]['slave_gfid'] st = lstat(os.path.join(pfx, slave_gfid)) + # Takes care of scenarios with no hardlinks if isinstance(st, int) and st == ENOENT: - logging.info(lf('Fixing gfid mismatch in slave. Deleting' - ' the entry', retry_count=retry_count, + logging.debug(lf('Entry not present on master. Fixing gfid ' + 'mismatch in slave. Deleting the entry', + retry_count=retry_count, entry=repr(failure))) # Add deletion to fix_entry_ops list if failure[2]['slave_isdir']: @@ -830,79 +828,163 @@ class GMasterChangelogMixin(GMasterCommon): edct('UNLINK', gfid=failure[2]['slave_gfid'], entry=pbname)) + remove_gfids.add(slave_gfid) + if op in ['RENAME']: + # If renamed gfid doesn't exists on master, remove + # rename entry and unlink src on slave + st = lstat(os.path.join(pfx, failure[0]['gfid'])) + if isinstance(st, int) and st == ENOENT: + logging.debug("Unlink source %s" % repr(failure)) + remove_gfids.add(failure[0]['gfid']) + fix_entry_ops.append( + edct('UNLINK', + gfid=failure[0]['gfid'], + entry=failure[0]['entry'])) + # Takes care of scenarios of hardlinks/renames on master elif not isinstance(st, int): - # The file exists on master but with different name. - # Probabaly renamed and got missed during xsync crawl. - if failure[2]['slave_isdir']: - logging.info(lf('Fixing gfid mismatch in slave', + if matching_disk_gfid(slave_gfid, pbname): + # Safe to ignore the failure as master contains same + # file with same gfid. Remove entry from entries list + logging.debug(lf('Fixing gfid mismatch in slave. ' + ' Safe to ignore, take out entry', retry_count=retry_count, entry=repr(failure))) + remove_gfids.add(failure[0]['gfid']) + if op == 'RENAME': + fix_entry_ops.append( + edct('UNLINK', + gfid=failure[0]['gfid'], + entry=failure[0]['entry'])) + # The file exists on master but with different name. + # Probably renamed and got missed during xsync crawl. + elif failure[2]['slave_isdir']: realpath = os.readlink(os.path.join( - rconf.args.local_path, - ".glusterfs", - slave_gfid[0:2], - slave_gfid[2:4], - slave_gfid)) + rconf.args.local_path, + ".glusterfs", + slave_gfid[0:2], + slave_gfid[2:4], + slave_gfid)) dst_entry = os.path.join(pfx, realpath.split('/')[-2], realpath.split('/')[-1]) - rename_dict = edct('RENAME', gfid=slave_gfid, - entry=failure[0]['entry'], - entry1=dst_entry, stat=st, - link=None) - logging.info(lf('Fixing gfid mismatch in slave. ' - 'Renaming', retry_count=retry_count, - entry=repr(rename_dict))) - fix_entry_ops.append(rename_dict) + src_entry = pbname + logging.debug(lf('Fixing dir name/gfid mismatch in ' + 'slave', retry_count=retry_count, + entry=repr(failure))) + if src_entry == dst_entry: + # Safe to ignore the failure as master contains + # same directory as in slave with same gfid. + # Remove the failure entry from entries list + logging.debug(lf('Fixing dir name/gfid mismatch' + ' in slave. Safe to ignore, ' + 'take out entry', + retry_count=retry_count, + entry=repr(failure))) + try: + entries.remove(failure[0]) + except ValueError: + pass + else: + rename_dict = edct('RENAME', gfid=slave_gfid, + entry=src_entry, + entry1=dst_entry, stat=st, + link=None) + logging.debug(lf('Fixing dir name/gfid mismatch' + ' in slave. Renaming', + retry_count=retry_count, + entry=repr(rename_dict))) + fix_entry_ops.append(rename_dict) else: - logging.info(lf('Fixing gfid mismatch in slave. ' - ' Deleting the entry', + # A hardlink file exists with different name or + # renamed file exists and we are sure from + # matching_disk_gfid check that the entry doesn't + # exist with same gfid so we can safely delete on slave + logging.debug(lf('Fixing file gfid mismatch in slave. ' + 'Hardlink/Rename Case. Deleting entry', retry_count=retry_count, entry=repr(failure))) fix_entry_ops.append( edct('UNLINK', gfid=failure[2]['slave_gfid'], entry=pbname)) - logging.error(lf('Entry cannot be fixed in slave due ' - 'to GFID mismatch, find respective ' - 'path for the GFID and trigger sync', - gfid=slave_gfid)) + elif failure[1] == ENOENT: + if op in ['RENAME']: + pbname = failure[0]['entry1'] + else: + pbname = failure[0]['entry'] + + pargfid = pbname.split('/')[1] + st = lstat(os.path.join(pfx, pargfid)) + # Safe to ignore the failure as master doesn't contain + # parent directory. + if isinstance(st, int): + logging.debug(lf('Fixing ENOENT error in slave. Parent ' + 'does not exist on master. Safe to ' + 'ignore, take out entry', + retry_count=retry_count, + entry=repr(failure))) + try: + entries.remove(failure[0]) + except ValueError: + pass + else: + logging.debug(lf('Fixing ENOENT error in slave. Create ' + 'parent directory on slave.', + retry_count=retry_count, + entry=repr(failure))) + realpath = os.readlink(os.path.join(rconf.args.local_path, + ".glusterfs", + pargfid[0:2], + pargfid[2:4], + pargfid)) + dir_entry = os.path.join(pfx, realpath.split('/')[-2], + realpath.split('/')[-1]) + fix_entry_ops.append( + edct('MKDIR', gfid=pargfid, entry=dir_entry, + mode=st.st_mode, uid=st.st_uid, gid=st.st_gid)) + + logging.debug("remove_gfids: %s" % repr(remove_gfids)) + if remove_gfids: + for e in entries: + if e['op'] in ['MKDIR', 'MKNOD', 'CREATE', 'RENAME'] \ + and e['gfid'] in remove_gfids: + logging.debug("Removed entry op from retrial list: entry: %s" % repr(e)) + e['skip_entry'] = True if fix_entry_ops: # Process deletions of entries whose gfids are mismatched failures1 = self.slave.server.entry_ops(fix_entry_ops) - if not failures1: - logging.info("Sucessfully fixed entry ops with gfid mismatch") - return failures1 + return (failures1, fix_entry_ops) def handle_entry_failures(self, failures, entries): retries = 0 pending_failures = False failures1 = [] failures2 = [] + entry_ops1 = [] + entry_ops2 = [] if failures: pending_failures = True failures1 = failures + entry_ops1 = entries while pending_failures and retries < self.MAX_EF_RETRIES: retries += 1 - failures2 = self.fix_possible_entry_failures(failures1, - retries) + (failures2, entry_ops2) = self.fix_possible_entry_failures( + failures1, retries, entry_ops1) if not failures2: pending_failures = False + logging.info(lf('Successfully fixed entry ops with gfid ' + 'mismatch', retry_count=retries)) else: pending_failures = True failures1 = failures2 + entry_ops1 = entry_ops2 if pending_failures: for failure in failures1: logging.error("Failed to fix entry ops %s", repr(failure)) - else: - # Retry original entry list 5 times - failures = self.slave.server.entry_ops(entries) - - self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') def process_change(self, change, done, retry): pfx = gauxpfx() @@ -1003,7 +1085,7 @@ class GMasterChangelogMixin(GMasterCommon): # Special case: record mknod as link if ty in ['MKNOD']: mode = int(ec[2]) - if mode & 01000: + if mode & 0o1000: # Avoid stat'ing the file as it # may be deleted in the interim st = FreeObject(st_mode=int(ec[2]), @@ -1044,6 +1126,11 @@ class GMasterChangelogMixin(GMasterCommon): os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st, link=rl)) + # If src doesn't exist while doing rename, destination + # is created. If data is not followed by rename, this + # remains zero byte file on slave. Hence add data entry + # for renames + datas.add(os.path.join(pfx, gfid)) else: # stat() to get mode and other information if not matching_disk_gfid(gfid, en): @@ -1067,6 +1154,12 @@ class GMasterChangelogMixin(GMasterCommon): rl = None entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) + # If src doesn't exist while doing link, destination + # is created based on file type. If data is not + # followed by link, this remains zero byte file on + # slave. Hence add data entry for links + if rl is None: + datas.add(os.path.join(pfx, gfid)) elif ty == 'SYMLINK': rl = errno_wrap(os.readlink, [en], [ENOENT], [ESTALE, EINTR]) @@ -1114,7 +1207,6 @@ class GMasterChangelogMixin(GMasterCommon): logging.debug('entries: %s' % repr(entries)) # Increment counters for Status - self.status.inc_value("entry", len(entries)) self.files_in_batch += len(datas) self.status.inc_value("data", len(datas)) @@ -1129,7 +1221,23 @@ class GMasterChangelogMixin(GMasterCommon): self.status.inc_value("entry", len(entries)) failures = self.slave.server.entry_ops(entries) - self.handle_entry_failures(failures, entries) + + if gconf.get("gfid-conflict-resolution"): + count = 0 + if failures: + logging.info(lf('Entry ops failed with gfid mismatch', + count=len(failures))) + while failures and count < self.MAX_OE_RETRIES: + count += 1 + self.handle_entry_failures(failures, entries) + logging.info(lf('Retry original entries', count=count)) + failures = self.slave.server.entry_ops(entries) + if not failures: + logging.info("Successfully fixed all entry ops with " + "gfid mismatch") + break + + self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') self.status.dec_value("entry", len(entries)) # Update Entry stime in Brick Root only in case of Changelog mode @@ -1164,10 +1272,10 @@ class GMasterChangelogMixin(GMasterCommon): continue meta_entries.append(edct('META', go=go[0], stat=st)) if meta_entries: - self.status.inc_value("meta", len(entries)) + self.status.inc_value("meta", len(meta_entries)) failures = self.slave.server.meta_ops(meta_entries) self.log_failures(failures, 'go', '', 'META') - self.status.dec_value("meta", len(entries)) + self.status.dec_value("meta", len(meta_entries)) self.batch_stats["META_SYNC_TIME"] += time.time() - meta_start_time @@ -1359,7 +1467,7 @@ class GMasterChangelogMixin(GMasterCommon): node = rconf.args.resource_remote node_data = node.split("@") node = node_data[-1] - remote_node_ip = node.split(":")[0] + remote_node_ip, _ = host_brick_split(node) self.status.set_slave_node(remote_node_ip) def changelogs_batch_process(self, changes): @@ -1392,9 +1500,9 @@ class GMasterChangelogMixin(GMasterCommon): # that are _historical_ to that time. data_stime = self.get_data_stime() - self.changelog_agent.scan() + libgfchangelog.scan() self.crawls += 1 - changes = self.changelog_agent.getchanges() + changes = libgfchangelog.getchanges() if changes: if data_stime: logging.info(lf("slave's time", @@ -1411,10 +1519,9 @@ class GMasterChangelogMixin(GMasterCommon): self.changelogs_batch_process(changes) - def register(self, register_time, changelog_agent, status): - self.changelog_agent = changelog_agent + def register(self, register_time, status): self.sleep_interval = gconf.get("change-interval") - self.changelog_done_func = self.changelog_agent.done + self.changelog_done_func = libgfchangelog.done self.tempdir = self.setup_working_dir() self.processed_changelogs_dir = os.path.join(self.tempdir, ".processed") @@ -1423,11 +1530,10 @@ class GMasterChangelogMixin(GMasterCommon): class GMasterChangeloghistoryMixin(GMasterChangelogMixin): - def register(self, register_time, changelog_agent, status): - self.changelog_agent = changelog_agent + def register(self, register_time, status): self.changelog_register_time = register_time self.history_crawl_start_time = register_time - self.changelog_done_func = self.changelog_agent.history_done + self.changelog_done_func = libgfchangelog.history_done self.history_turns = 0 self.tempdir = self.setup_working_dir() self.processed_changelogs_dir = os.path.join(self.tempdir, @@ -1441,6 +1547,12 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): data_stime = self.get_data_stime() end_time = int(time.time()) + + #as start of historical crawl marks Geo-rep worker restart + if gconf.get("ignore-deletes"): + logging.info(lf('ignore-deletes config option is set', + stime=data_stime)) + logging.info(lf('starting history crawl', turns=self.history_turns, stime=data_stime, @@ -1455,7 +1567,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # location then consuming history will not work(Known issue as of now) changelog_path = os.path.join(rconf.args.local_path, ".glusterfs/changelogs") - ret, actual_end = self.changelog_agent.history( + ret, actual_end = libgfchangelog.history_changelog( changelog_path, data_stime[0], end_time, @@ -1467,10 +1579,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # to be processed. returns positive value as number of changelogs # to be processed, which will be fetched using # history_getchanges() - while self.changelog_agent.history_scan() > 0: + while libgfchangelog.history_scan() > 0: self.crawls += 1 - changes = self.changelog_agent.history_getchanges() + changes = libgfchangelog.history_getchanges() if changes: if data_stime: logging.info(lf("slave's time", @@ -1504,7 +1616,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): self.history_crawl_start_time = int(time.time()) self.crawl() else: - # This exeption will be catched in resource.py and + # This exception will be caught in resource.py and # fallback to xsync for the small gap. raise PartialHistoryAvailable(str(actual_end)) @@ -1523,7 +1635,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): XSYNC_MAX_ENTRIES = 1 << 13 - def register(self, register_time=None, changelog_agent=None, status=None): + def register(self, register_time=None, status=None): self.status = status self.counter = 0 self.comlist = [] @@ -1641,7 +1753,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): def is_sticky(self, path, mo): """check for DHTs linkto sticky bit file""" sticky = False - if mo & 01000: + if mo & 0o1000: sticky = self.master.server.linkto_check(path) return sticky diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 40818427bfe..6aa7b9dfc99 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -20,13 +20,16 @@ import random from resource import SSH import gsyncdconfig as gconf +import libgfchangelog from rconf import rconf -from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile -from syncdutils import set_term_handler, GsyncdError -from syncdutils import Thread, finalize, Volinfo, VolinfoFromGconf -from syncdutils import gf_event, EVENT_GEOREP_FAULTY, get_up_nodes +from syncdutils import (select, waitpid, errno_wrap, lf, grabpidfile, + set_term_handler, GsyncdError, + Thread, finalize, Volinfo, VolinfoFromGconf, + gf_event, EVENT_GEOREP_FAULTY, get_up_nodes, + unshare_propagation_supported) from gsyncdstatus import GeorepStatus, set_monitor_status -from syncdutils import unshare_propagation_supported +import py2py3 +from py2py3 import pipe ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -35,6 +38,8 @@ def get_subvol_num(brick_idx, vol, hot): tier = vol.is_tier() disperse_count = vol.disperse_count(tier, hot) replica_count = vol.replica_count(tier, hot) + distribute_count = vol.distribution_count(tier, hot) + gconf.setconfig("master-distribution-count", distribute_count) if (tier and not hot): brick_idx = brick_idx - vol.get_hot_bricks_count(tier) @@ -77,7 +82,7 @@ class Monitor(object): # give a chance to graceful exit errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH]) - def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master, + def monitor(self, w, argv, cpids, slave_vol, slave_host, master, suuid, slavenodes): """the monitor loop @@ -105,10 +110,6 @@ class Monitor(object): master, "%s::%s" % (slave_host, slave_vol)) - - set_monitor_status(gconf.get("state-file"), self.ST_STARTED) - self.status[w[0]['dir']].set_worker_status(self.ST_INIT) - ret = 0 def nwait(p, o=0): @@ -126,7 +127,7 @@ class Monitor(object): raise def exit_signalled(s): - """ child teminated due to receipt of SIGUSR1 """ + """ child terminated due to receipt of SIGUSR1 """ return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1)) def exit_status(s): @@ -150,51 +151,18 @@ class Monitor(object): remote_host = "%s@%s" % (remote_user, remote_new[0]) remote_id = remote_new[1] - # Spawn the worker and agent in lock to avoid fd leak + # Spawn the worker in lock to avoid fd leak self.lock.acquire() + self.status[w[0]['dir']].set_worker_status(self.ST_INIT) logging.info(lf('starting gsyncd worker', brick=w[0]['dir'], slave_node=remote_host)) - # Couple of pipe pairs for RPC communication b/w - # worker and changelog agent. - - # read/write end for agent - (ra, ww) = os.pipe() - # read/write end for worker - (rw, wa) = os.pipe() - - # spawn the agent process - apid = os.fork() - if apid == 0: - os.close(rw) - os.close(ww) - args_to_agent = argv + [ - 'agent', - rconf.args.master, - rconf.args.slave, - '--local-path', w[0]['dir'], - '--local-node', w[0]['host'], - '--local-node-id', w[0]['uuid'], - '--slave-id', suuid, - '--rpc-fd', ','.join([str(ra), str(wa), str(rw), str(ww)]) - ] - - if rconf.args.config_file is not None: - args_to_agent += ['-c', rconf.args.config_file] - - if rconf.args.debug: - args_to_agent.append("--debug") - - os.execv(sys.executable, args_to_agent) - - pr, pw = os.pipe() + pr, pw = pipe() cpid = os.fork() if cpid == 0: os.close(pr) - os.close(ra) - os.close(wa) args_to_worker = argv + [ 'worker', @@ -205,8 +173,6 @@ class Monitor(object): '--local-node', w[0]['host'], '--local-node-id', w[0]['uuid'], '--slave-id', suuid, - '--rpc-fd', - ','.join([str(rw), str(ww), str(ra), str(wa)]), '--subvol-num', str(w[2]), '--resource-remote', remote_host, '--resource-remote-id', remote_id @@ -237,14 +203,8 @@ class Monitor(object): os.execv(sys.executable, args_to_worker) cpids.add(cpid) - agents.add(apid) os.close(pw) - # close all RPC pipes in monitor - os.close(ra) - os.close(wa) - os.close(rw) - os.close(ww) self.lock.release() t0 = time.time() @@ -253,42 +213,19 @@ class Monitor(object): if so: ret = nwait(cpid, os.WNOHANG) - ret_agent = nwait(apid, os.WNOHANG) - - if ret_agent is not None: - # Agent is died Kill Worker - logging.info(lf("Changelog Agent died, Aborting Worker", - brick=w[0]['dir'])) - errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - nwait(cpid) - nwait(apid) if ret is not None: logging.info(lf("worker died before establishing " "connection", brick=w[0]['dir'])) - nwait(apid) # wait for agent else: logging.debug("worker(%s) connected" % w[0]['dir']) while time.time() < t0 + conn_timeout: ret = nwait(cpid, os.WNOHANG) - ret_agent = nwait(apid, os.WNOHANG) if ret is not None: logging.info(lf("worker died in startup phase", brick=w[0]['dir'])) - nwait(apid) # wait for agent - break - - if ret_agent is not None: - # Agent is died Kill Worker - logging.info(lf("Changelog Agent died, Aborting " - "Worker", - brick=w[0]['dir'])) - errno_wrap(os.kill, [cpid, signal.SIGKILL], - [ESRCH]) - nwait(cpid) - nwait(apid) break time.sleep(1) @@ -303,12 +240,8 @@ class Monitor(object): brick=w[0]['dir'], timeout=conn_timeout)) errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - nwait(apid) # wait for agent ret = nwait(cpid) if ret is None: - # If worker dies, agent terminates on EOF. - # So lets wait for agent first. - nwait(apid) ret = nwait(cpid) if exit_signalled(ret): ret = 0 @@ -332,23 +265,33 @@ class Monitor(object): argv = [os.path.basename(sys.executable), sys.argv[0]] cpids = set() - agents = set() ta = [] for wx in wspx: def wmon(w): - cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, + cpid, _ = self.monitor(w, argv, cpids, slave_vol, slave_host, master, suuid, slavenodes) time.sleep(1) self.lock.acquire() for cpid in cpids: errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - for apid in agents: - errno_wrap(os.kill, [apid, signal.SIGKILL], [ESRCH]) self.lock.release() finalize(exval=1) t = Thread(target=wmon, args=[wx]) t.start() ta.append(t) + + # monitor status was being updated in each monitor thread. It + # should not be done as it can cause deadlock for a worker start. + # set_monitor_status uses flock to synchronize multple instances + # updating the file. Since each monitor thread forks worker, + # these processes can hold the reference to fd of status + # file causing deadlock to workers which starts later as flock + # will not be release until all references to same fd is closed. + # It will also cause fd leaks. + + self.lock.acquire() + set_monitor_status(gconf.get("state-file"), self.ST_STARTED) + self.lock.release() for t in ta: t.join() @@ -357,7 +300,7 @@ def distribute(master, slave): if rconf.args.use_gconf_volinfo: mvol = VolinfoFromGconf(master.volume, master=True) else: - mvol = Volinfo(master.volume, master.host) + mvol = Volinfo(master.volume, master.host, master=True) logging.debug('master bricks: ' + repr(mvol.bricks)) prelude = [] slave_host = None @@ -373,7 +316,7 @@ def distribute(master, slave): if rconf.args.use_gconf_volinfo: svol = VolinfoFromGconf(slave.volume, master=False) else: - svol = Volinfo(slave.volume, "localhost", prelude) + svol = Volinfo(slave.volume, "localhost", prelude, master=False) sbricks = svol.bricks suuid = svol.uuid @@ -428,7 +371,7 @@ def startup(go_daemon=True): if not go_daemon: return - x, y = os.pipe() + x, y = pipe() cpid = os.fork() if cpid: os.close(x) diff --git a/geo-replication/syncdaemon/py2py3.py b/geo-replication/syncdaemon/py2py3.py new file mode 100644 index 00000000000..f9c76e1b50a --- /dev/null +++ b/geo-replication/syncdaemon/py2py3.py @@ -0,0 +1,184 @@ +# +# Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +# All python2/python3 compatibility routines + +import sys +import os +import stat +import struct +from syncdutils import umask +from ctypes import create_string_buffer + +if sys.version_info >= (3,): + def pipe(): + (r, w) = os.pipe() + os.set_inheritable(r, True) + os.set_inheritable(w, True) + return (r, w) + + # Raw conversion of bytearray to string. Used in the cases where + # buffer is created by create_string_buffer which is a 8-bit char + # array and passed to syscalls to fetch results. Using encode/decode + # doesn't work as it converts to string altering the size. + def bytearray_to_str(byte_arr): + return ''.join([chr(b) for b in byte_arr]) + + # Raw conversion of string to bytes. This is required to convert + # back the string into bytearray(c char array) to use in struc + # pack/unpacking. Again encode/decode can't be used as it + # converts it alters size. + def str_to_bytearray(string): + return bytes([ord(c) for c in string]) + + def gr_create_string_buffer(size): + return create_string_buffer(b'\0', size) + + def gr_query_xattr(cls, path, size, syscall, attr=None): + if attr: + return cls._query_xattr(path.encode(), size, syscall, + attr.encode()) + else: + return cls._query_xattr(path.encode(), size, syscall) + + def gr_lsetxattr(cls, path, attr, val): + return cls.libc.lsetxattr(path.encode(), attr.encode(), val, + len(val), 0) + + def gr_lremovexattr(cls, path, attr): + return cls.libc.lremovexattr(path.encode(), attr.encode()) + + def gr_cl_register(libgfapi, brick, path, log_file, log_level, retries): + return libgfapi.gf_changelog_register(brick.encode(), + path.encode(), + log_file.encode(), + log_level, retries) + + def gr_cl_done(libgfapi, clfile): + return libgfapi.gf_changelog_done(clfile.encode()) + + def gr_cl_history_changelog(libgfapi, changelog_path, start, end, num_parallel, + actual_end): + return libgfapi.gf_history_changelog(changelog_path.encode(), + start, end, num_parallel, + actual_end) + + def gr_cl_history_done(libgfapi, clfile): + return libgfapi.gf_history_changelog_done(clfile.encode()) + + # regular file + + def entry_pack_reg(cls, gf, bn, mo, uid, gid): + bn_encoded = bn.encode() + blen = len(bn_encoded) + return struct.pack(cls._fmt_mknod(blen), + uid, gid, gf.encode(), mo, bn_encoded, + stat.S_IMODE(mo), 0, umask()) + + def entry_pack_reg_stat(cls, gf, bn, st): + bn_encoded = bn.encode() + blen = len(bn_encoded) + mo = st['mode'] + return struct.pack(cls._fmt_mknod(blen), + st['uid'], st['gid'], + gf.encode(), mo, bn_encoded, + stat.S_IMODE(mo), 0, umask()) + # mkdir + + def entry_pack_mkdir(cls, gf, bn, mo, uid, gid): + bn_encoded = bn.encode() + blen = len(bn_encoded) + return struct.pack(cls._fmt_mkdir(blen), + uid, gid, gf.encode(), mo, bn_encoded, + stat.S_IMODE(mo), umask()) + # symlink + + def entry_pack_symlink(cls, gf, bn, lnk, st): + bn_encoded = bn.encode() + blen = len(bn_encoded) + lnk_encoded = lnk.encode() + llen = len(lnk_encoded) + return struct.pack(cls._fmt_symlink(blen, llen), + st['uid'], st['gid'], + gf.encode(), st['mode'], bn_encoded, + lnk_encoded) +else: + def pipe(): + (r, w) = os.pipe() + return (r, w) + + # Raw conversion of bytearray to string + def bytearray_to_str(byte_arr): + return byte_arr + + # Raw conversion of string to bytearray + def str_to_bytearray(string): + return string + + def gr_create_string_buffer(size): + return create_string_buffer('\0', size) + + def gr_query_xattr(cls, path, size, syscall, attr=None): + if attr: + return cls._query_xattr(path, size, syscall, attr) + else: + return cls._query_xattr(path, size, syscall) + + def gr_lsetxattr(cls, path, attr, val): + return cls.libc.lsetxattr(path, attr, val, len(val), 0) + + def gr_lremovexattr(cls, path, attr): + return cls.libc.lremovexattr(path, attr) + + def gr_cl_register(libgfapi, brick, path, log_file, log_level, retries): + return libgfapi.gf_changelog_register(brick, path, log_file, + log_level, retries) + + def gr_cl_done(libgfapi, clfile): + return libgfapi.gf_changelog_done(clfile) + + def gr_cl_history_changelog(libgfapi, changelog_path, start, end, num_parallel, + actual_end): + return libgfapi.gf_history_changelog(changelog_path, start, end, + num_parallel, actual_end) + + def gr_cl_history_done(libgfapi, clfile): + return libgfapi.gf_history_changelog_done(clfile) + + # regular file + + def entry_pack_reg(cls, gf, bn, mo, uid, gid): + blen = len(bn) + return struct.pack(cls._fmt_mknod(blen), + uid, gid, gf, mo, bn, + stat.S_IMODE(mo), 0, umask()) + + def entry_pack_reg_stat(cls, gf, bn, st): + blen = len(bn) + mo = st['mode'] + return struct.pack(cls._fmt_mknod(blen), + st['uid'], st['gid'], + gf, mo, bn, + stat.S_IMODE(mo), 0, umask()) + # mkdir + + def entry_pack_mkdir(cls, gf, bn, mo, uid, gid): + blen = len(bn) + return struct.pack(cls._fmt_mkdir(blen), + uid, gid, gf, mo, bn, + stat.S_IMODE(mo), umask()) + # symlink + + def entry_pack_symlink(cls, gf, bn, lnk, st): + blen = len(bn) + llen = len(lnk) + return struct.pack(cls._fmt_symlink(blen, llen), + st['uid'], st['gid'], + gf, st['mode'], bn, lnk) diff --git a/geo-replication/syncdaemon/rconf.py b/geo-replication/syncdaemon/rconf.py index ccac62c63a8..ff716ee4d6d 100644 --- a/geo-replication/syncdaemon/rconf.py +++ b/geo-replication/syncdaemon/rconf.py @@ -21,12 +21,6 @@ class RConf(object): log_exit = False permanent_handles = [] log_metadata = {} - """One variable is sufficient to track the - switching of worker to ACTIVE. Two variables - are intentionally used to track worker going - to PASSIVE as well mainly for debugging""" - active_earlier = False - passive_earlier = False mgmt_lock_fd = None args = None turns = 0 diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py index 9d5da666858..c622afa6373 100644 --- a/geo-replication/syncdaemon/repce.py +++ b/geo-replication/syncdaemon/repce.py @@ -28,13 +28,13 @@ except ImportError: from syncdutils import Thread, select, lf -pickle_proto = -1 +pickle_proto = 2 repce_version = 1.0 def ioparse(i, o): if isinstance(i, int): - i = os.fdopen(i) + i = os.fdopen(i, 'rb') # rely on duck typing for recognizing # streams as that works uniformly # in py2 and py3 @@ -54,8 +54,15 @@ def send(out, *args): def recv(inf): - """load an object from input stream""" - return pickle.load(inf) + """load an object from input stream + python2 and python3 compatibility, inf is sys.stdin + and is opened as text stream by default. Hence using the + buffer attribute in python3 + """ + if hasattr(inf, "buffer"): + return pickle.load(inf.buffer) + else: + return pickle.load(inf) class RepceServer(object): @@ -193,7 +200,7 @@ class RepceClient(object): """RePCe client is callabe, calling it implements a synchronous remote call. - We do a .push with a cbk which does a wakeup upon receiving anwser, + We do a .push with a cbk which does a wakeup upon receiving answer, then wait on the RepceJob. """ rjob = self.push( diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 575a6605393..f12c7ceaa36 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -19,35 +19,37 @@ import struct import logging import tempfile import subprocess -from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES -from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM +from errno import (EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES, + EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM) import errno from rconf import rconf import gsyncdconfig as gconf +import libgfchangelog import repce from repce import RepceServer, RepceClient from master import gmaster_builder import syncdutils -from syncdutils import GsyncdError, select, privileged, funcode -from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat -from syncdutils import NoStimeAvailable, PartialHistoryAvailable -from syncdutils import ChangelogException, ChangelogHistoryNotAvailable -from syncdutils import get_changelog_log_level, get_rsync_version -from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION -from syncdutils import GX_GFID_CANONICAL_LEN +from syncdutils import (GsyncdError, select, privileged, funcode, + entry2pb, gauxpfx, errno_wrap, lstat, + NoStimeAvailable, PartialHistoryAvailable, + ChangelogException, ChangelogHistoryNotAvailable, + get_changelog_log_level, get_rsync_version, + GX_GFID_CANONICAL_LEN, + gf_mount_ready, lf, Popen, sup, + Xattr, matching_disk_gfid, get_gfid_from_mnt, + unshare_propagation_supported, get_slv_dir_path) from gsyncdstatus import GeorepStatus -from syncdutils import lf, Popen, sup, Volinfo -from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt -from syncdutils import unshare_propagation_supported +from py2py3 import (pipe, str_to_bytearray, entry_pack_reg, + entry_pack_reg_stat, entry_pack_mkdir, + entry_pack_symlink) ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') slv_volume = None slv_host = None -slv_bricks = None class Server(object): @@ -146,6 +148,7 @@ class Server(object): if buf == ENOENT: return buf else: + buf = str_to_bytearray(buf) m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join( ['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)])) return '-'.join(m.groups()) @@ -236,6 +239,7 @@ class Server(object): val = Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8) + val = str_to_bytearray(val) return struct.unpack('!II', val) except OSError: ex = sys.exc_info()[1] @@ -258,6 +262,7 @@ class Server(object): val = Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8) + val = str_to_bytearray(val) return struct.unpack('!II', val) except OSError: ex = sys.exc_info()[1] @@ -280,6 +285,7 @@ class Server(object): val = Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8) + val = str_to_bytearray(val) return struct.unpack('!II', val) except OSError: ex = sys.exc_info()[1] @@ -303,6 +309,7 @@ class Server(object): '.'.join([cls.GX_NSPACE, uuid, 'entry_stime']), 8) + val = str_to_bytearray(val) return struct.unpack('!II', val) except OSError: ex = sys.exc_info()[1] @@ -371,50 +378,31 @@ class Server(object): def entry_ops(cls, entries): pfx = gauxpfx() logging.debug('entries: %s' % repr(entries)) - # regular file - - def entry_pack_reg(gf, bn, mo, uid, gid): - blen = len(bn) - return struct.pack(cls._fmt_mknod(blen), - uid, gid, gf, mo, bn, - stat.S_IMODE(mo), 0, umask()) - - def entry_pack_reg_stat(gf, bn, st): - blen = len(bn) - mo = st['mode'] - return struct.pack(cls._fmt_mknod(blen), - st['uid'], st['gid'], - gf, mo, bn, - stat.S_IMODE(mo), 0, umask()) - # mkdir - - def entry_pack_mkdir(gf, bn, mo, uid, gid): - blen = len(bn) - return struct.pack(cls._fmt_mkdir(blen), - uid, gid, gf, mo, bn, - stat.S_IMODE(mo), umask()) - # symlink - - def entry_pack_symlink(gf, bn, lnk, st): - blen = len(bn) - llen = len(lnk) - return struct.pack(cls._fmt_symlink(blen, llen), - st['uid'], st['gid'], - gf, st['mode'], bn, lnk) - - def entry_purge(op, entry, gfid, e): + dist_count = rconf.args.master_dist_count + + def entry_purge(op, entry, gfid, e, uid, gid): # This is an extremely racy code and needs to be fixed ASAP. # The GFID check here is to be sure that the pargfid/bname # to be purged is the GFID gotten from the changelog. # (a stat(changelog_gfid) would also be valid here) # The race here is between the GFID check and the purge. + + # If the entry or the gfid of the file to be deleted is not present + # on slave, we can ignore the unlink/rmdir + if isinstance(lstat(entry), int) or \ + isinstance(lstat(os.path.join(pfx, gfid)), int): + return + if not matching_disk_gfid(gfid, entry): - collect_failure(e, EEXIST) + collect_failure(e, EEXIST, uid, gid) return if op == 'UNLINK': er = errno_wrap(os.unlink, [entry], [ENOENT, ESTALE], [EBUSY]) - return er + # EISDIR is safe error, ignore. This can only happen when + # unlink is sent from master while fixing gfid conflicts. + if er != EISDIR: + return er elif op == 'RMDIR': er = errno_wrap(os.rmdir, [entry], [ENOENT, ESTALE, @@ -422,28 +410,40 @@ class Server(object): if er == ENOTEMPTY: return er - def collect_failure(e, cmd_ret, dst=False): + def collect_failure(e, cmd_ret, uid, gid, dst=False): slv_entry_info = {} slv_entry_info['gfid_mismatch'] = False + slv_entry_info['name_mismatch'] = False slv_entry_info['dst'] = dst + slv_entry_info['slave_isdir'] = False + slv_entry_info['slave_name'] = None + slv_entry_info['slave_gfid'] = None # We do this for failing fops on Slave # Master should be logging this if cmd_ret is None: return False - if cmd_ret == EEXIST: + if e.get("stat", {}): + # Copy actual UID/GID value back to entry stat + e['stat']['uid'] = uid + e['stat']['gid'] = gid + + if cmd_ret in [EEXIST, ESTALE]: if dst: en = e['entry1'] else: en = e['entry'] disk_gfid = get_gfid_from_mnt(en) - if isinstance(disk_gfid, basestring) and \ + if isinstance(disk_gfid, str) and \ e['gfid'] != disk_gfid: slv_entry_info['gfid_mismatch'] = True st = lstat(en) if not isinstance(st, int): if st and stat.S_ISDIR(st.st_mode): slv_entry_info['slave_isdir'] = True + dir_name = get_slv_dir_path(slv_host, slv_volume, + disk_gfid) + slv_entry_info['slave_name'] = dir_name else: slv_entry_info['slave_isdir'] = False slv_entry_info['slave_gfid'] = disk_gfid @@ -489,7 +489,7 @@ class Server(object): errno_wrap(os.rmdir, [path], [ENOENT, ESTALE], [EBUSY]) - def rename_with_disk_gfid_confirmation(gfid, entry, en): + def rename_with_disk_gfid_confirmation(gfid, entry, en, uid, gid): if not matching_disk_gfid(gfid, entry): logging.error(lf("RENAME ignored: source entry does not match " "with on-disk gfid", @@ -497,13 +497,13 @@ class Server(object): gfid=gfid, disk_gfid=get_gfid_from_mnt(entry), target=en)) - collect_failure(e, EEXIST) + collect_failure(e, EEXIST, uid, gid) return cmd_ret = errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST], [ESTALE, EBUSY]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) for e in entries: blob = None @@ -512,6 +512,12 @@ class Server(object): entry = e['entry'] uid = 0 gid = 0 + + # Skip entry processing if it's marked true during gfid + # conflict resolution + if e['skip_entry']: + continue + if e.get("stat", {}): # Copy UID/GID value and then reset to zero. Copied UID/GID # will be used to run chown once entry is created. @@ -524,7 +530,7 @@ class Server(object): if op in ['RMDIR', 'UNLINK']: # Try once, if rmdir failed with ENOTEMPTY # then delete recursively. - er = entry_purge(op, entry, gfid, e) + er = entry_purge(op, entry, gfid, e, uid, gid) if isinstance(er, int): if er == ENOTEMPTY and op == 'RMDIR': # Retry if ENOTEMPTY, ESTALE @@ -552,8 +558,8 @@ class Server(object): st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): - blob = entry_pack_reg( - gfid, bname, e['mode'], e['uid'], e['gid']) + blob = entry_pack_reg(cls, gfid, bname, + e['mode'], e['uid'], e['gid']) # Self healed hardlinks are recorded as MKNOD. # So if the gfid already exists, it should be # processed as hard link not mknod. @@ -561,94 +567,104 @@ class Server(object): cmd_ret = errno_wrap(os.link, [slink, entry], [ENOENT, EEXIST], [ESTALE]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) elif op == 'MKDIR': + en = e['entry'] slink = os.path.join(pfx, gfid) st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): - blob = entry_pack_mkdir( - gfid, bname, e['mode'], e['uid'], e['gid']) - else: + blob = entry_pack_mkdir(cls, gfid, bname, + e['mode'], e['uid'], e['gid']) + elif (isinstance(lstat(en), int) or + not matching_disk_gfid(gfid, en)): # If gfid of a directory exists on slave but path based # create is getting EEXIST. This means the directory is # renamed in master but recorded as MKDIR during hybrid # crawl. Get the directory path by reading the backend # symlink and trying to rename to new name as said by # master. - global slv_bricks - global slv_volume - global slv_host - if not slv_bricks: - slv_info = Volinfo(slv_volume, slv_host) - slv_bricks = slv_info.bricks - # Result of readlink would be of format as below. - # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename" - realpath = os.readlink(os.path.join(slv_bricks[0]['dir'], - ".glusterfs", - gfid[0:2], - gfid[2:4], - gfid)) - realpath_parts = realpath.split('/') - src_pargfid = realpath_parts[-2] - src_basename = realpath_parts[-1] - src_entry = os.path.join(pfx, src_pargfid, src_basename) logging.info(lf("Special case: rename on mkdir", gfid=gfid, entry=repr(entry))) - rename_with_disk_gfid_confirmation(gfid, src_entry, entry) + src_entry = get_slv_dir_path(slv_host, slv_volume, gfid) + if src_entry is None: + collect_failure(e, ENOENT, uid, gid) + if src_entry is not None and src_entry != entry: + slv_entry_info = {} + slv_entry_info['gfid_mismatch'] = False + slv_entry_info['name_mismatch'] = True + slv_entry_info['dst'] = False + slv_entry_info['slave_isdir'] = True + slv_entry_info['slave_gfid'] = gfid + slv_entry_info['slave_entry'] = src_entry + + failures.append((e, EEXIST, slv_entry_info)) elif op == 'LINK': slink = os.path.join(pfx, gfid) st = lstat(slink) if isinstance(st, int): (pg, bname) = entry2pb(entry) if stat.S_ISREG(e['stat']['mode']): - blob = entry_pack_reg_stat(gfid, bname, e['stat']) + blob = entry_pack_reg_stat(cls, gfid, bname, e['stat']) elif stat.S_ISLNK(e['stat']['mode']): - blob = entry_pack_symlink(gfid, bname, e['link'], + blob = entry_pack_symlink(cls, gfid, bname, e['link'], e['stat']) else: cmd_ret = errno_wrap(os.link, [slink, entry], [ENOENT, EEXIST], [ESTALE]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) elif op == 'SYMLINK': en = e['entry'] st = lstat(entry) if isinstance(st, int): - blob = entry_pack_symlink(gfid, bname, e['link'], + blob = entry_pack_symlink(cls, gfid, bname, e['link'], e['stat']) elif not matching_disk_gfid(gfid, en): - collect_failure(e, EEXIST) + collect_failure(e, EEXIST, uid, gid) elif op == 'RENAME': en = e['entry1'] - st = lstat(entry) - if isinstance(st, int): + # The matching disk gfid check validates two things + # 1. Validates name is present, return false otherwise + # 2. Validates gfid is same, returns false otherwise + # So both validations are necessary to decide src doesn't + # exist. We can't rely on only gfid stat as hardlink could + # be present and we can't rely only on name as name could + # exist with different gfid. + if not matching_disk_gfid(gfid, entry): if e['stat'] and not stat.S_ISDIR(e['stat']['mode']): - if stat.S_ISLNK(e['stat']['mode']) and \ - e['link'] is not None: - st1 = lstat(en) - if isinstance(st1, int): - (pg, bname) = entry2pb(en) - blob = entry_pack_symlink(gfid, bname, - e['link'], e['stat']) - elif not matching_disk_gfid(gfid, en): - collect_failure(e, EEXIST, True) + if stat.S_ISLNK(e['stat']['mode']): + # src is not present, so don't sync symlink as + # we don't know target. It's ok to ignore. If + # it's unliked, it's fine. If it's renamed to + # something else, it will be synced then. + if e['link'] is not None: + st1 = lstat(en) + if isinstance(st1, int): + (pg, bname) = entry2pb(en) + blob = entry_pack_symlink(cls, gfid, bname, + e['link'], + e['stat']) + elif not matching_disk_gfid(gfid, en): + collect_failure(e, EEXIST, uid, gid, True) else: slink = os.path.join(pfx, gfid) st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): (pg, bname) = entry2pb(en) - blob = entry_pack_reg_stat(gfid, bname, + blob = entry_pack_reg_stat(cls, gfid, bname, e['stat']) else: cmd_ret = errno_wrap(os.link, [slink, en], [ENOENT, EEXIST], [ESTALE]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) else: + st = lstat(entry) st1 = lstat(en) if isinstance(st1, int): - rename_with_disk_gfid_confirmation(gfid, entry, en) + rename_with_disk_gfid_confirmation(gfid, entry, en, + uid, gid) else: if st.st_ino == st1.st_ino: # we have a hard link, we can now unlink source @@ -672,16 +688,23 @@ class Server(object): raise else: raise - elif not matching_disk_gfid(gfid, en): - collect_failure(e, EEXIST, True) + elif not matching_disk_gfid(gfid, en) and dist_count > 1: + collect_failure(e, EEXIST, uid, gid, True) else: - rename_with_disk_gfid_confirmation(gfid, entry, en) + # We are here which means matching_disk_gfid for + # both source and destination has returned false + # and distribution count for master vol is greater + # then one. Which basically says both the source and + # destination exist and not hardlinks. + # So we are safe to go ahead with rename here. + rename_with_disk_gfid_confirmation(gfid, entry, en, + uid, gid) if blob: cmd_ret = errno_wrap(Xattr.lsetxattr, [pg, 'glusterfs.gfid.newfile', blob], - [EEXIST, ENOENT], + [EEXIST, ENOENT, ESTALE], [ESTALE, EINVAL, EBUSY]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) # If UID/GID is different than zero that means we are trying # create Entry with different UID/GID. Create Entry with @@ -690,7 +713,7 @@ class Server(object): path = os.path.join(pfx, gfid) cmd_ret = errno_wrap(os.lchown, [path, uid, gid], [ENOENT], [ESTALE, EINVAL]) - collect_failure(e, cmd_ret) + collect_failure(e, cmd_ret, uid, gid) return failures @@ -717,10 +740,8 @@ class Server(object): # 'lchown' 'lchmod' 'utime with no-deference' blindly. # But since 'lchmod' and 'utime with no de-reference' is # not supported in python3, we have to rely on 'chmod' - # and 'utime with de-reference'. But 'chmod' - # de-reference the symlink and gets ENOENT, EACCES, - # EPERM errors, hence ignoring those errors if it's on - # symlink file. + # and 'utime with de-reference'. Hence avoiding 'chmod' + # and 'utime' if it's symlink file. is_symlink = False cmd_ret = errno_wrap(os.lchown, [go, uid, gid], [ENOENT], @@ -728,19 +749,17 @@ class Server(object): if isinstance(cmd_ret, int): continue - cmd_ret = errno_wrap(os.chmod, [go, mode], - [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) - if isinstance(cmd_ret, int): - is_symlink = os.path.islink(go) - if not is_symlink: + is_symlink = os.path.islink(go) + + if not is_symlink: + cmd_ret = errno_wrap(os.chmod, [go, mode], + [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) + if isinstance(cmd_ret, int): failures.append((e, cmd_ret, "chmod")) - cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)], - [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) - if isinstance(cmd_ret, int): - if not is_symlink: - is_symlink = os.path.islink(go) - if not is_symlink: + cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)], + [ENOENT, EACCES, EPERM], [ESTALE, EINVAL]) + if isinstance(cmd_ret, int): failures.append((e, cmd_ret, "utime")) return failures @@ -816,7 +835,8 @@ class Mounter(object): def umount_l(self, d): """perform lazy umount""" - po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE) + po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE, + universal_newlines=True) po.wait() return po @@ -840,7 +860,7 @@ class Mounter(object): change into the mount, and lazy unmount the filesystem. """ - mpi, mpo = os.pipe() + mpi, mpo = pipe() mh = Popen.fork() if mh: # Parent @@ -851,7 +871,9 @@ class Mounter(object): if self.mntpt: # mntpt is determined pre-mount d = self.mntpt - os.write(mpo, d + '\0') + mnt_msg = d + '\0' + encoded_msg = mnt_msg.encode() + os.write(mpo, encoded_msg) po = Popen(margv, **self.mountkw) self.handle_mounter(po) po.terminate_geterr() @@ -859,8 +881,11 @@ class Mounter(object): if not d: # mntpt is determined during mount d = self.mntpt - os.write(mpo, d + '\0') - os.write(mpo, 'M') + mnt_msg = d + '\0' + encoded_msg = mnt_msg.encode() + os.write(mpo, encoded_msg) + encoded_msg = 'M'.encode() + os.write(mpo, encoded_msg) t = syncdutils.Thread(target=lambda: os.chdir(d)) t.start() tlim = rconf.starttime + gconf.get("connection-timeout") @@ -889,6 +914,7 @@ class Mounter(object): mntdata = '' while True: c = os.read(mpi, 1) + c = c.decode() if not c: break mntdata += c @@ -925,6 +951,16 @@ class Mounter(object): logging.exception('mount cleanup failure:') rv = 200 os._exit(rv) + + #Polling the dht.subvol.status value. + RETRIES = 10 + while not gf_mount_ready(): + if RETRIES < 0: + logging.error('Subvols are not up') + break + RETRIES -= 1 + time.sleep(0.2) + logging.debug('auxiliary glusterfs mount prepared') @@ -932,7 +968,7 @@ class DirectMounter(Mounter): """mounter backend which calls mount(8), umount(8) directly""" - mountkw = {'stderr': subprocess.PIPE} + mountkw = {'stderr': subprocess.PIPE, 'universal_newlines': True} glusterprog = 'glusterfs' @staticmethod @@ -955,7 +991,8 @@ class MountbrokerMounter(Mounter): """mounter backend using the mountbroker gluster service""" - mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE} + mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE, + 'universal_newlines': True} glusterprog = 'gluster' @classmethod @@ -996,6 +1033,7 @@ class GLUSTERServer(Server): """generic volume mark fetching/parsing backed""" fmt_string = cls.NTV_FMTSTR + extra_fields buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) + buf = str_to_bytearray(buf) vm = struct.unpack(fmt_string, buf) m = re.match( '(.{8})(.{4})(.{4})(.{4})(.{12})', @@ -1218,9 +1256,6 @@ class GLUSTER(object): # register the crawlers and start crawling # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) # g3 ==> changelog History - (inf, ouf, ra, wa) = rconf.args.rpc_fd.split(',') - changelog_agent = RepceClient(int(inf), int(ouf)) - status = GeorepStatus(gconf.get("state-file"), rconf.args.local_node, rconf.args.local_path, @@ -1228,12 +1263,6 @@ class GLUSTER(object): rconf.args.master, rconf.args.slave) status.reset_on_worker_start() - rv = changelog_agent.version() - if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: - raise GsyncdError( - "RePCe major version mismatch(changelog agent): " - "local %s, remote %s" % - (CHANGELOG_AGENT_CLIENT_VERSION, rv)) try: workdir = g2.setup_working_dir() @@ -1244,17 +1273,16 @@ class GLUSTER(object): # register with the changelog library # 9 == log level (DEBUG) # 5 == connection retries - changelog_agent.init() - changelog_agent.register(rconf.args.local_path, - workdir, - gconf.get("changelog-log-file"), - get_changelog_log_level( - gconf.get("changelog-log-level")), - g2.CHANGELOG_CONN_RETRIES) + libgfchangelog.register(rconf.args.local_path, + workdir, + gconf.get("changelog-log-file"), + get_changelog_log_level( + gconf.get("changelog-log-level")), + g2.CHANGELOG_CONN_RETRIES) register_time = int(time.time()) - g2.register(register_time, changelog_agent, status) - g3.register(register_time, changelog_agent, status) + g2.register(register_time, status) + g3.register(register_time, status) except ChangelogException as e: logging.error(lf("Changelog register failed", error=e)) sys.exit(1) @@ -1382,14 +1410,16 @@ class SSH(object): '--local-node-id', rconf.args.resource_remote_id] + \ [ # Add all config arguments here, slave gsyncd will not use - # config file in slave side, so all overridding options should + # config file in slave side, so all overriding options should # be sent as arguments '--slave-timeout', str(gconf.get("slave-timeout")), '--slave-log-level', gconf.get("slave-log-level"), '--slave-gluster-log-level', gconf.get("slave-gluster-log-level"), '--slave-gluster-command-dir', - gconf.get("slave-gluster-command-dir")] + gconf.get("slave-gluster-command-dir"), + '--master-dist-count', + str(gconf.get("master-distribution-count"))] if gconf.get("slave-access-mount"): args_to_slave.append('--slave-access-mount') @@ -1454,12 +1484,13 @@ class SSH(object): if log_rsync_performance: # use stdout=PIPE only when log_rsync_performance enabled - # Else rsync will write to stdout and nobody is their + # Else rsync will write to stdout and nobody is there # to consume. If PIPE is full rsync hangs. po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + stderr=subprocess.PIPE, universal_newlines=True) else: - po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) for f in files: po.stdin.write(f) @@ -1491,7 +1522,7 @@ class SSH(object): return po - def tarssh(self, files, slaveurl, log_err=False): + def tarssh(self, files, log_err=False): """invoke tar+ssh -z (compress) can be use if needed, but omitting it now as it results in weird error (tar+ssh errors out (errcode: 2) @@ -1499,32 +1530,49 @@ class SSH(object): if not files: raise GsyncdError("no files to sync") logging.debug("files: " + ", ".join(files)) - (host, rdir) = slaveurl.split(':') + (host, rdir) = self.slaveurl.split(':') + tar_cmd = ["tar"] + \ ["--sparse", "-cf", "-", "--files-from", "-"] - ssh_cmd = gconf.get("ssh-command-tar").split() + \ + ssh_cmd = gconf.get("ssh-command").split() + \ gconf.get("ssh-options-tar").split() + \ ["-p", str(gconf.get("ssh-port"))] + \ [host, "tar"] + \ ["--overwrite", "-xf", "-", "-C", rdir] p0 = Popen(tar_cmd, stdout=subprocess.PIPE, - stdin=subprocess.PIPE, stderr=subprocess.PIPE) - p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE) + stdin=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) + p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE, + universal_newlines=True) for f in files: p0.stdin.write(f) p0.stdin.write('\n') p0.stdin.close() p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits. - # wait for tar to terminate, collecting any errors, further - # waiting for transfer to complete - _, stderr1 = p1.communicate() # stdin and stdout of p0 is already closed, Reset to None and # wait for child process to complete p0.stdin = None p0.stdout = None - p0.communicate() + + def wait_for_tar(p0): + _, stderr = p0.communicate() + if log_err: + for errline in stderr.strip().split("\n")[:-1]: + if "No such file or directory" not in errline: + logging.error(lf("SYNC Error", + sync_engine="Tarssh", + error=errline)) + + t = syncdutils.Thread(target=wait_for_tar, args=(p0, )) + # wait for tar to terminate, collecting any errors, further + # waiting for transfer to complete + t.start() + + # wait for ssh process + _, stderr1 = p1.communicate() + t.join() if log_err: for errline in stderr1.strip().split("\n")[:-1]: diff --git a/geo-replication/syncdaemon/subcmds.py b/geo-replication/syncdaemon/subcmds.py index 11d263d7e03..b8508532e30 100644 --- a/geo-replication/syncdaemon/subcmds.py +++ b/geo-replication/syncdaemon/subcmds.py @@ -73,7 +73,11 @@ def subcmd_worker(args): Popen.init_errhandler() fcntl.fcntl(args.feedback_fd, fcntl.F_SETFD, fcntl.FD_CLOEXEC) local = GLUSTER("localhost", args.master) - slavehost, slavevol = args.slave.split("::") + slave_url, slavevol = args.slave.split("::") + if "@" not in slave_url: + slavehost = args.resource_remote + else: + slavehost = "%s@%s" % (slave_url.split("@")[0], args.resource_remote) remote = SSH(slavehost, slavevol) remote.connect_remote() local.connect() @@ -93,26 +97,21 @@ def subcmd_slave(args): local.service_loop() -def subcmd_agent(args): - import os - from changelogagent import agent, Changelog - from syncdutils import lf - - os.setsid() - logging.debug(lf("RPC FD", - rpc_fd=repr(args.rpc_fd))) - return agent(Changelog(), args.rpc_fd) - - def subcmd_voluuidget(args): from subprocess import Popen, PIPE import xml.etree.ElementTree as XET ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError - po = Popen(['gluster', '--xml', '--remote-host=' + args.host, - 'volume', 'info', args.volname], bufsize=0, - stdin=None, stdout=PIPE, stderr=PIPE) + cmd = ['gluster', '--xml', '--remote-host=' + args.host, + 'volume', 'info', args.volname] + + if args.inet6: + cmd.append("--inet6") + + po = Popen(cmd, bufsize=0, + stdin=None, stdout=PIPE, stderr=PIPE, + universal_newlines=True) vix, err = po.communicate() if po.returncode != 0: diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 6acc9f17ad7..a3df103e76c 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -21,8 +21,8 @@ import subprocess import socket from subprocess import PIPE from threading import Lock, Thread as baseThread -from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED -from errno import EINTR, ENOENT, ESTALE, EBUSY, errorcode +from errno import (EACCES, EAGAIN, EPIPE, ENOTCONN, ENOMEM, ECONNABORTED, + EINTR, ENOENT, ESTALE, EBUSY, ENODATA, errorcode, EIO) from signal import signal, SIGTERM import select as oselect from os import waitpid as owaitpid @@ -37,10 +37,10 @@ from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE sys.path.insert(1, GLUSTERFS_LIBEXECDIR) EVENTS_ENABLED = True try: - from events.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY - from events.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE - from events.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE - from events.eventtypes import GEOREP_CHECKPOINT_COMPLETED \ + from gfevents.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY + from gfevents.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE + from gfevents.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE + from gfevents.eventtypes import GEOREP_CHECKPOINT_COMPLETED \ as EVENT_GEOREP_CHECKPOINT_COMPLETED except ImportError: # Events APIs not installed, dummy eventtypes with None @@ -55,17 +55,19 @@ from rconf import rconf from hashlib import sha256 as sha256 +ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') + # auxiliary gfid based access prefix _CL_AUX_GFID_PFX = ".gfid/" +ROOT_GFID = "00000000-0000-0000-0000-000000000001" GF_OP_RETRIES = 10 GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' -CHANGELOG_AGENT_SERVER_VERSION = 1.0 -CHANGELOG_AGENT_CLIENT_VERSION = 1.0 NodeID = None rsync_version = None unshare_mnt_propagation = None +slv_bricks = None SPACE_ESCAPE_CHAR = "%20" NEWLINE_ESCAPE_CHAR = "%0A" PERCENTAGE_ESCAPE_CHAR = "%25" @@ -98,6 +100,19 @@ def unescape_space_newline(s): .replace(NEWLINE_ESCAPE_CHAR, "\n")\ .replace(PERCENTAGE_ESCAPE_CHAR, "%") +# gf_mount_ready() returns 1 if all subvols are up, else 0 +def gf_mount_ready(): + ret = errno_wrap(Xattr.lgetxattr, + ['.', 'dht.subvol.status', 16], + [ENOENT, ENOTSUP, ENODATA], [ENOMEM]) + + if isinstance(ret, int): + logging.error("failed to get the xattr value") + return 1 + ret = ret.rstrip('\x00') + if ret == "1": + return 1 + return 0 def norm(s): if s: @@ -159,7 +174,8 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url): rconf.ssh_ctl_dir = ctld content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr, resource_url) - content_sha256 = sha256hex(content) + encoded_content = content.encode() + content_sha256 = sha256hex(encoded_content) """ The length of ctl_path for ssh connection should not be > 108. ssh fails with ctl_path too long if it is so. But when rsync @@ -171,7 +187,7 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url): fname = os.path.join(rconf.ssh_ctl_dir, "%s.mft" % content_sha256) - create_manifest(fname, content) + create_manifest(fname, encoded_content) ssh_ctl_path = os.path.join(rconf.ssh_ctl_dir, "%s.sock" % content_sha256) rconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path] @@ -184,7 +200,7 @@ def grabfile(fname, content=None): """ # damn those messy open() mode codes fd = os.open(fname, os.O_CREAT | os.O_RDWR) - f = os.fdopen(fd, 'r+b', 0) + f = os.fdopen(fd, 'r+') try: fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB) except: @@ -198,6 +214,7 @@ def grabfile(fname, content=None): try: f.truncate() f.write(content) + f.flush() except: f.close() raise @@ -260,7 +277,8 @@ def finalize(*args, **kwargs): umount_cmd = rconf.mbr_umount_cmd + [rconf.mount_point, 'lazy'] else: umount_cmd = ['umount', '-l', rconf.mount_point] - p0 = subprocess.Popen(umount_cmd, stderr=subprocess.PIPE) + p0 = subprocess.Popen(umount_cmd, stderr=subprocess.PIPE, + universal_newlines=True) _, errdata = p0.communicate() if p0.returncode == 0: try: @@ -326,13 +344,24 @@ def log_raise_exception(excont): ECONNABORTED): logging.error(lf('Gluster Mount process exited', error=errorcode[exc.errno])) + elif isinstance(exc, OSError) and exc.errno == EIO: + logging.error("Getting \"Input/Output error\" " + "is most likely due to " + "a. Brick is down or " + "b. Split brain issue.") + logging.error("This is expected as per design to " + "keep the consistency of the file system. " + "Once the above issue is resolved " + "geo-replication would automatically " + "proceed further.") + logtag = "FAIL" else: logtag = "FAIL" if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG): logtag = "FULL EXCEPTION TRACE" if logtag: logging.exception(logtag + ": ") - sys.stderr.write("failed with %s.\n" % type(exc).__name__) + sys.stderr.write("failed with %s: %s.\n" % (type(exc).__name__, exc)) excont.exval = 1 sys.exit(excont.exval) @@ -559,7 +588,6 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]): def lstat(e): return errno_wrap(os.lstat, [e], [ENOENT], [ESTALE, EBUSY]) - def get_gfid_from_mnt(gfidpath): return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', @@ -595,7 +623,7 @@ class ChangelogException(OSError): def gf_event(event_type, **kwargs): if EVENTS_ENABLED: - from events.gf_event import gf_event as gfevent + from gfevents.gf_event import gf_event as gfevent gfevent(event_type, **kwargs) @@ -635,7 +663,8 @@ def unshare_propagation_supported(): unshare_mnt_propagation = False p = subprocess.Popen(["unshare", "--help"], stderr=subprocess.PIPE, - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, + universal_newlines=True) out, err = p.communicate() if p.returncode == 0: if "propagation" in out: @@ -652,7 +681,8 @@ def get_rsync_version(rsync_cmd): rsync_version = "0" p = subprocess.Popen([rsync_cmd, "--version"], stderr=subprocess.PIPE, - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, + universal_newlines=True) out, err = p.communicate() if p.returncode == 0: rsync_version = out.split(" ", 4)[3] @@ -660,17 +690,65 @@ def get_rsync_version(rsync_cmd): return rsync_version +def get_slv_dir_path(slv_host, slv_volume, gfid): + global slv_bricks + + dir_path = ENOENT + pfx = gauxpfx() + + if not slv_bricks: + slv_info = Volinfo(slv_volume, slv_host, master=False) + slv_bricks = slv_info.bricks + # Result of readlink would be of format as below. + # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename" + for brick in slv_bricks: + dir_path = errno_wrap(os.path.join, + [brick['dir'], + ".glusterfs", gfid[0:2], + gfid[2:4], + gfid], [ENOENT], [ESTALE]) + if dir_path != ENOENT: + try: + realpath = errno_wrap(os.readlink, [dir_path], + [ENOENT], [ESTALE]) + if not isinstance(realpath, int): + realpath_parts = realpath.split('/') + pargfid = realpath_parts[-2] + basename = realpath_parts[-1] + dir_entry = os.path.join(pfx, pargfid, basename) + return dir_entry + except OSError: + # .gfid/GFID + gfidpath = unescape_space_newline(os.path.join(pfx, gfid)) + realpath = errno_wrap(Xattr.lgetxattr_buf, + [gfidpath, 'glusterfs.gfid2path'], [ENOENT], [ESTALE]) + if not isinstance(realpath, int): + basename = os.path.basename(realpath).rstrip('\x00') + dirpath = os.path.dirname(realpath) + if dirpath == "/": + pargfid = ROOT_GFID + else: + dirpath = dirpath.strip("/") + pargfid = get_gfid_from_mnt(dirpath) + if isinstance(pargfid, int): + return None + dir_entry = os.path.join(pfx, pargfid, basename) + return dir_entry + + return None + + def lf(event, **kwargs): """ Log Format helper function, log messages can be easily modified to structured log format. lf("Config Change", sync_jobs=4, brick=/bricks/b1) will be - converted as "Config Change<TAB>brick=/bricks/b1<TAB>sync_jobs=4" + converted as "Config Change [{brick=/bricks/b1}, {sync_jobs=4}]" """ - msg = event + msgparts = [] for k, v in kwargs.items(): - msg += "\t{0}={1}".format(k, v) - return msg + msgparts.append("{%s=%s}" % (k, v)) + return "%s [%s]" % (event, ", ".join(msgparts)) class Popen(subprocess.Popen): @@ -807,7 +885,7 @@ class Popen(subprocess.Popen): break b = os.read(self.stderr.fileno(), 1024) if b: - elines.append(b) + elines.append(b.decode()) else: break self.stderr.close() @@ -816,12 +894,31 @@ class Popen(subprocess.Popen): self.errfail() +def host_brick_split(value): + """ + IPv6 compatible way to split and get the host + and brick information. Example inputs: + node1.example.com:/exports/bricks/brick1/brick + fe80::af0f:df82:844f:ef66%utun0:/exports/bricks/brick1/brick + """ + parts = value.split(":") + brick = parts[-1] + hostparts = parts[0:-1] + return (":".join(hostparts), brick) + + class Volinfo(object): - def __init__(self, vol, host='localhost', prelude=[]): - po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, + def __init__(self, vol, host='localhost', prelude=[], master=True): + if master: + gluster_cmd_dir = gconf.get("gluster-command-dir") + else: + gluster_cmd_dir = gconf.get("slave-gluster-command-dir") + + gluster_cmd = os.path.join(gluster_cmd_dir, 'gluster') + po = Popen(prelude + [gluster_cmd, '--xml', '--remote-host=' + host, 'volume', 'info', vol], - stdout=PIPE, stderr=PIPE) + stdout=PIPE, stderr=PIPE, universal_newlines=True) vix = po.stdout.read() po.wait() po.terminate_geterr() @@ -852,7 +949,7 @@ class Volinfo(object): @memoize def bricks(self): def bparse(b): - host, dirp = b.find("name").text.split(':', 2) + host, dirp = host_brick_split(b.find("name").text) return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text} return [bparse(b) for b in self.get('brick')] @@ -884,6 +981,14 @@ class Volinfo(object): else: return int(self.get('disperseCount')[0].text) + def distribution_count(self, tier, hot): + if (tier and hot): + return int(self.get('hotBricks/hotdistCount')[0].text) + elif (tier and not hot): + return int(self.get('coldBricks/colddistCount')[0].text) + else: + return int(self.get('distCount')[0].text) + @property @memoize def hot_bricks(self): @@ -900,7 +1005,7 @@ class VolinfoFromGconf(object): # Glusterd will generate following config items before Geo-rep start # So that Geo-rep need not run gluster commands from inside # Volinfo object API/interface kept as is so that caller need not - # change anything exept calling this instead of Volinfo() + # change anything except calling this instead of Volinfo() # # master-bricks= # master-bricks=NODEID:HOSTNAME:PATH,.. @@ -920,6 +1025,16 @@ class VolinfoFromGconf(object): def is_hot(self, brickpath): return False + def is_uuid(self, value): + try: + uuid.UUID(value) + return True + except ValueError: + return False + + def possible_path(self, value): + return "/" in value + @property @memoize def bricks(self): @@ -933,8 +1048,22 @@ class VolinfoFromGconf(object): out = [] for b in bricks_data: parts = b.split(":") - bpath = parts[2] if len(parts) == 3 else "" - out.append({"host": parts[1], "dir": bpath, "uuid": parts[0]}) + b_uuid = None + if self.is_uuid(parts[0]): + b_uuid = parts[0] + # Set all parts except first + parts = parts[1:] + + if self.possible_path(parts[-1]): + bpath = parts[-1] + # Set all parts except last + parts = parts[0:-1] + + out.append({ + "host": ":".join(parts), # if remaining parts are IPv6 name + "dir": bpath, + "uuid": b_uuid + }) return out @@ -952,6 +1081,9 @@ class VolinfoFromGconf(object): def disperse_count(self, tier, hot): return gconf.get("master-disperse-count") + def distribution_count(self, tier, hot): + return gconf.get("master-distribution-count") + @property @memoize def hot_bricks(self): |
