summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon')
-rw-r--r--geo-replication/syncdaemon/Makefile.am4
-rw-r--r--geo-replication/syncdaemon/README.md1
-rw-r--r--geo-replication/syncdaemon/argsupgrade.py21
-rw-r--r--geo-replication/syncdaemon/changelogagent.py78
-rw-r--r--geo-replication/syncdaemon/conf.py.in1
-rw-r--r--geo-replication/syncdaemon/gsyncd.py43
-rw-r--r--geo-replication/syncdaemon/gsyncdconfig.py102
-rw-r--r--geo-replication/syncdaemon/gsyncdstatus.py7
-rw-r--r--geo-replication/syncdaemon/libcxattr.py21
-rw-r--r--geo-replication/syncdaemon/libgfchangelog.py253
-rw-r--r--geo-replication/syncdaemon/master.py278
-rw-r--r--geo-replication/syncdaemon/monitor.py121
-rw-r--r--geo-replication/syncdaemon/py2py3.py184
-rw-r--r--geo-replication/syncdaemon/rconf.py6
-rw-r--r--geo-replication/syncdaemon/repce.py17
-rw-r--r--geo-replication/syncdaemon/resource.py362
-rw-r--r--geo-replication/syncdaemon/subcmds.py29
-rw-r--r--geo-replication/syncdaemon/syncdutils.py190
18 files changed, 1072 insertions, 646 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am
index 19f0bfce1b7..d70e3368faf 100644
--- a/geo-replication/syncdaemon/Makefile.am
+++ b/geo-replication/syncdaemon/Makefile.am
@@ -2,7 +2,7 @@ syncdaemondir = $(GLUSTERFS_LIBEXECDIR)/python/syncdaemon
syncdaemon_PYTHON = rconf.py gsyncd.py __init__.py master.py README.md repce.py \
resource.py syncdutils.py monitor.py libcxattr.py gsyncdconfig.py \
- libgfchangelog.py changelogagent.py gsyncdstatus.py conf.py logutils.py \
- subcmds.py argsupgrade.py
+ libgfchangelog.py gsyncdstatus.py conf.py logutils.py \
+ subcmds.py argsupgrade.py py2py3.py
CLEANFILES =
diff --git a/geo-replication/syncdaemon/README.md b/geo-replication/syncdaemon/README.md
index 2a202e3f99e..5ab785ae669 100644
--- a/geo-replication/syncdaemon/README.md
+++ b/geo-replication/syncdaemon/README.md
@@ -19,7 +19,6 @@ INSTALLATION
As of now, the supported way of operation is running from the source directory or using the RPMs given.
-If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/).
CONFIGURATION
-------------
diff --git a/geo-replication/syncdaemon/argsupgrade.py b/geo-replication/syncdaemon/argsupgrade.py
index 632271daf81..7af40633ef8 100644
--- a/geo-replication/syncdaemon/argsupgrade.py
+++ b/geo-replication/syncdaemon/argsupgrade.py
@@ -84,6 +84,10 @@ def upgrade():
# fail when it does stat to check the existence.
init_gsyncd_template_conf()
+ inet6 = False
+ if "--inet6" in sys.argv:
+ inet6 = True
+
if "--monitor" in sys.argv:
# python gsyncd.py --path=/bricks/b1
# --monitor -c gsyncd.conf
@@ -147,8 +151,11 @@ def upgrade():
user, hname = remote_addr.split("@")
+ if not inet6:
+ hname = gethostbyname(hname)
+
print(("ssh://%s@%s:gluster://127.0.0.1:%s" % (
- user, gethostbyname(hname), vol)))
+ user, hname, vol)))
sys.exit(0)
elif "--normalize-url" in sys.argv:
@@ -268,7 +275,9 @@ def upgrade():
p = ArgumentParser()
p.add_argument("master")
p.add_argument("slave")
- p.add_argument("--config-set", nargs=2)
+ p.add_argument("--config-set", action='store_true')
+ p.add_argument("name")
+ p.add_argument("--value")
p.add_argument("-c")
pargs = p.parse_known_args(sys.argv[1:])[0]
@@ -280,8 +289,8 @@ def upgrade():
"config-set",
pargs.master.strip(":"),
slave_url(pargs.slave),
- pargs.config_set[0],
- pargs.config_set[1]
+ "--name=%s" % pargs.name,
+ "--value=%s" % pargs.value
]
elif "--config-check" in sys.argv:
# --config-check georep_session_working_dir
@@ -344,3 +353,7 @@ def upgrade():
if pargs.reset_sync_time:
sys.argv.append("--reset-sync-time")
+
+ if inet6:
+ # Add `--inet6` as first argument
+ sys.argv = [sys.argv[0], "--inet6"] + sys.argv[1:]
diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py
deleted file mode 100644
index 5eade137d25..00000000000
--- a/geo-replication/syncdaemon/changelogagent.py
+++ /dev/null
@@ -1,78 +0,0 @@
-#!/usr/bin/python2
-#
-# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
-# This file is part of GlusterFS.
-
-# This file is licensed to you under your choice of the GNU Lesser
-# General Public License, version 3 or any later version (LGPLv3 or
-# later), or the GNU General Public License, version 2 (GPLv2), in all
-# cases as published by the Free Software Foundation.
-#
-
-import logging
-import syncdutils
-from syncdutils import select, CHANGELOG_AGENT_SERVER_VERSION
-from repce import RepceServer
-
-
-class _MetaChangelog(object):
-
- def __getattr__(self, meth):
- from libgfchangelog import Changes as LChanges
- xmeth = [m for m in dir(LChanges) if m[0] != '_']
- if meth not in xmeth:
- return
- for m in xmeth:
- setattr(self, m, getattr(LChanges, m))
- return getattr(self, meth)
-
-Changes = _MetaChangelog()
-
-
-class Changelog(object):
- def version(self):
- return CHANGELOG_AGENT_SERVER_VERSION
-
- def init(self):
- return Changes.cl_init()
-
- def register(self, cl_brick, cl_dir, cl_log, cl_level, retries=0):
- return Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)
-
- def scan(self):
- return Changes.cl_scan()
-
- def getchanges(self):
- return Changes.cl_getchanges()
-
- def done(self, clfile):
- return Changes.cl_done(clfile)
-
- def history(self, changelog_path, start, end, num_parallel):
- return Changes.cl_history_changelog(changelog_path, start, end,
- num_parallel)
-
- def history_scan(self):
- return Changes.cl_history_scan()
-
- def history_getchanges(self):
- return Changes.cl_history_getchanges()
-
- def history_done(self, clfile):
- return Changes.cl_history_done(clfile)
-
-
-class ChangelogAgent(object):
- def __init__(self, obj, fd_tup):
- (inf, ouf, rw, ww) = fd_tup.split(',')
- repce = RepceServer(obj, int(inf), int(ouf), 1)
- t = syncdutils.Thread(target=lambda: (repce.service_loop(),
- syncdutils.finalize()))
- t.start()
- logging.info('Agent listining...')
-
- select((), (), ())
-
-
-def agent(obj, fd_tup):
- return ChangelogAgent(obj, fd_tup)
diff --git a/geo-replication/syncdaemon/conf.py.in b/geo-replication/syncdaemon/conf.py.in
index 5846b9b5f26..2042fa9cdfb 100644
--- a/geo-replication/syncdaemon/conf.py.in
+++ b/geo-replication/syncdaemon/conf.py.in
@@ -1,4 +1,3 @@
-#!/usr/bin/python2
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
# This file is part of GlusterFS.
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index 77fca4c9d1f..257ed72c6ae 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -22,8 +22,8 @@ import gsyncdconfig as gconf
from rconf import rconf
import subcmds
from conf import GLUSTERD_WORKDIR, GLUSTERFS_CONFDIR, GCONF_VERSION
-from syncdutils import set_term_handler, finalize, lf
-from syncdutils import log_raise_exception, FreeObject, escape
+from syncdutils import (set_term_handler, finalize, lf,
+ log_raise_exception, FreeObject, escape)
import argsupgrade
@@ -47,6 +47,7 @@ def main():
sys.exit(0)
parser = ArgumentParser()
+ parser.add_argument("--inet6", action="store_true")
sp = parser.add_subparsers(dest="subcmd")
# Monitor Status File update
@@ -78,8 +79,6 @@ def main():
help="feedback fd between monitor and worker")
p.add_argument("--local-node", help="Local master node")
p.add_argument("--local-node-id", help="Local Node ID")
- p.add_argument("--rpc-fd",
- help="Read and Write fds for worker-agent communication")
p.add_argument("--subvol-num", type=int, help="Subvolume number")
p.add_argument("--is-hottier", action="store_true",
help="Is this brick part of hot tier")
@@ -91,19 +90,6 @@ def main():
p.add_argument("-c", "--config-file", help="Config File")
p.add_argument("--debug", action="store_true")
- # Agent
- p = sp.add_parser("agent")
- p.add_argument("master", help="Master Volume Name")
- p.add_argument("slave", help="Slave details user@host::vol format")
- p.add_argument("--local-path", help="Local brick path")
- p.add_argument("--local-node", help="Local master node")
- p.add_argument("--local-node-id", help="Local Node ID")
- p.add_argument("--slave-id", help="Slave Volume ID")
- p.add_argument("--rpc-fd",
- help="Read and Write fds for worker-agent communication")
- p.add_argument("-c", "--config-file", help="Config File")
- p.add_argument("--debug", action="store_true")
-
# Slave
p = sp.add_parser("slave")
p.add_argument("master", help="Master Volume Name")
@@ -133,6 +119,8 @@ def main():
help="Directory where Gluster binaries exist on slave")
p.add_argument("--slave-access-mount", action="store_true",
help="Do not lazy umount the slave volume")
+ p.add_argument("--master-dist-count", type=int,
+ help="Master Distribution count")
# Status
p = sp.add_parser("status")
@@ -165,8 +153,8 @@ def main():
p = sp.add_parser("config-set")
p.add_argument("master", help="Master Volume Name")
p.add_argument("slave", help="Slave")
- p.add_argument("name", help="Config Name")
- p.add_argument("value", help="Config Value")
+ p.add_argument("-n", "--name", help="Config Name")
+ p.add_argument("-v", "--value", help="Config Value")
p.add_argument("-c", "--config-file", help="Config File")
p.add_argument("--debug", action="store_true")
@@ -228,7 +216,8 @@ def main():
# Set default path for config file in that case
# If an subcmd accepts config file then it also accepts
# master and Slave arguments.
- if config_file is None and hasattr(args, "config_file"):
+ if config_file is None and hasattr(args, "config_file") \
+ and args.subcmd != "slave":
config_file = "%s/geo-replication/%s_%s_%s/gsyncd.conf" % (
GLUSTERD_WORKDIR,
args.master,
@@ -252,6 +241,12 @@ def main():
if args.subcmd == "slave":
override_from_args = True
+ if config_file is not None and \
+ args.subcmd in ["monitor", "config-get", "config-set", "config-reset"]:
+ ret = gconf.is_config_file_old(config_file, args.master, extra_tmpl_args["slavevol"])
+ if ret is not None:
+ gconf.config_upgrade(config_file, ret)
+
# Load Config file
gconf.load(GLUSTERFS_CONFDIR + "/gsyncd.conf",
config_file,
@@ -261,8 +256,8 @@ def main():
# Default label to print in log file
label = args.subcmd
- if args.subcmd in ("worker", "agent"):
- # If Worker or agent, then add brick path also to label
+ if args.subcmd in ("worker"):
+ # If Worker, then add brick path also to label
label = "%s %s" % (args.subcmd, args.local_path)
elif args.subcmd == "slave":
# If Slave add Master node and Brick details
@@ -305,7 +300,7 @@ def main():
# Log message for loaded config file
if config_file is not None:
- logging.info(lf("Using session config file", path=config_file))
+ logging.debug(lf("Using session config file", path=config_file))
set_term_handler()
excont = FreeObject(exval=0)
diff --git a/geo-replication/syncdaemon/gsyncdconfig.py b/geo-replication/syncdaemon/gsyncdconfig.py
index b2517f031ca..8848071997a 100644
--- a/geo-replication/syncdaemon/gsyncdconfig.py
+++ b/geo-replication/syncdaemon/gsyncdconfig.py
@@ -10,12 +10,14 @@
#
try:
- from configparser import configparser as ConfigParser, NoSectionError
+ from ConfigParser import RawConfigParser, NoSectionError
except ImportError:
- from ConfigParser import ConfigParser, NoSectionError
+ from configparser import RawConfigParser, NoSectionError
import os
+import shutil
from string import Template
from datetime import datetime
+from threading import Lock
# Global object which can be used in other modules
@@ -34,6 +36,7 @@ class GconfInvalidValue(Exception):
class Gconf(object):
def __init__(self, default_conf_file, custom_conf_file=None,
args={}, extra_tmpl_args={}, override_from_args=False):
+ self.lock = Lock()
self.default_conf_file = default_conf_file
self.custom_conf_file = custom_conf_file
self.tmp_conf_file = None
@@ -50,7 +53,7 @@ class Gconf(object):
self.args = args
self.extra_tmpl_args = extra_tmpl_args
self.override_from_args = override_from_args
- # Store default values only if overwriten, Only for JSON/CLI output
+ # Store default values only if overwritten, Only for JSON/CLI output
self.default_values = {}
self._load()
@@ -91,7 +94,7 @@ class Gconf(object):
if name != "all" and not self._is_configurable(name):
raise GconfNotConfigurable()
- cnf = ConfigParser()
+ cnf = RawConfigParser()
with open(self.custom_conf_file) as f:
cnf.readfp(f)
@@ -135,7 +138,7 @@ class Gconf(object):
if curr_val == value:
return True
- cnf = ConfigParser()
+ cnf = RawConfigParser()
with open(self.custom_conf_file) as f:
cnf.readfp(f)
@@ -162,6 +165,11 @@ class Gconf(object):
if value is not None and not self._is_valid_value(name, value):
raise GconfInvalidValue()
+
+ def _load_with_lock(self):
+ with self.lock:
+ self._load()
+
def _load(self):
self.gconf = {}
self.template_conf = []
@@ -170,7 +178,7 @@ class Gconf(object):
self.session_conf_items = []
self.default_values = {}
- conf = ConfigParser()
+ conf = RawConfigParser()
# Default Template config file
with open(self.default_conf_file) as f:
conf.readfp(f)
@@ -229,12 +237,19 @@ class Gconf(object):
self._tmpl_substitute()
self._do_typecast()
- def reload(self):
+ def reload(self, with_lock=True):
if self._is_config_changed():
- self._load()
+ if with_lock:
+ self._load_with_lock()
+ else:
+ self._load()
- def get(self, name, default_value=None):
- return self.gconf.get(name, default_value)
+ def get(self, name, default_value=None, with_lock=True):
+ if with_lock:
+ with self.lock:
+ return self.gconf.get(name, default_value)
+ else:
+ return self.gconf.get(name, default_value)
def getall(self, show_defaults=False, show_non_configurable=False):
cnf = {}
@@ -275,8 +290,9 @@ class Gconf(object):
return cnf
def getr(self, name, default_value=None):
- self.reload()
- return self.get(name, default_value)
+ with self.lock:
+ self.reload(with_lock=False)
+ return self.get(name, default_value, with_lock=False)
def get_help(self, name=None):
pass
@@ -313,6 +329,9 @@ class Gconf(object):
if item["validation"] == "unixtime":
return validate_unixtime(value)
+ if item["validation"] == "int":
+ return validate_int(value)
+
return False
def _is_config_changed(self):
@@ -325,6 +344,53 @@ class Gconf(object):
return False
+def is_config_file_old(config_file, mastervol, slavevol):
+ cnf = RawConfigParser()
+ cnf.read(config_file)
+ session_section = "peers %s %s" % (mastervol, slavevol)
+ try:
+ return dict(cnf.items(session_section))
+ except NoSectionError:
+ return None
+
+def config_upgrade(config_file, ret):
+ config_file_backup = os.path.join(os.path.dirname(config_file), "gsyncd.conf.bkp")
+
+ #copy old config file in a backup file
+ shutil.copyfile(config_file, config_file_backup)
+
+ #write a new config file
+ config = RawConfigParser()
+ config.add_section('vars')
+
+ for key, value in ret.items():
+ #handle option name changes
+ if key == "use_tarssh":
+ new_key = "sync-method"
+ if value == "true":
+ new_value = "tarssh"
+ else:
+ new_value = "rsync"
+ config.set('vars', new_key, new_value)
+ elif key == "timeout":
+ new_key = "slave-timeout"
+ config.set('vars', new_key, value)
+ #for changes like: ignore_deletes to ignore-deletes
+ else:
+ new_key = key.replace("_", "-")
+ config.set('vars', new_key, value)
+
+ with open(config_file, 'w') as configfile:
+ config.write(configfile)
+
+
+def validate_int(value):
+ try:
+ _ = int(value)
+ return True
+ except ValueError:
+ return False
+
def validate_unixtime(value):
try:
@@ -338,11 +404,13 @@ def validate_unixtime(value):
def validate_minmax(value, minval, maxval):
- value = int(value)
- minval = int(minval)
- maxval = int(maxval)
-
- return value >= minval and value <= maxval
+ try:
+ value = int(value)
+ minval = int(minval)
+ maxval = int(maxval)
+ return value >= minval and value <= maxval
+ except ValueError:
+ return False
def validate_choice(value, allowed_values):
diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py
index e8a810f4b38..1a655ff8887 100644
--- a/geo-replication/syncdaemon/gsyncdstatus.py
+++ b/geo-replication/syncdaemon/gsyncdstatus.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python2
+#!/usr/bin/python3
#
# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
# This file is part of GlusterFS.
@@ -23,8 +23,8 @@ from datetime import datetime
from errno import EACCES, EAGAIN, ENOENT
import logging
-from syncdutils import EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event
-from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED, lf
+from syncdutils import (EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event,
+ EVENT_GEOREP_CHECKPOINT_COMPLETED, lf)
DEFAULT_STATUS = "N/A"
MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped")
@@ -103,6 +103,7 @@ class LockedOpen(object):
return f
def __exit__(self, _exc_type, _exc_value, _traceback):
+ fcntl.flock(self.fileobj, fcntl.LOCK_UN)
self.fileobj.close()
diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py
index f576648b7a8..e6406c36bd7 100644
--- a/geo-replication/syncdaemon/libcxattr.py
+++ b/geo-replication/syncdaemon/libcxattr.py
@@ -9,12 +9,14 @@
#
import os
-from ctypes import CDLL, create_string_buffer, get_errno
+from ctypes import CDLL, get_errno
+from py2py3 import (bytearray_to_str, gr_create_string_buffer,
+ gr_query_xattr, gr_lsetxattr, gr_lremovexattr)
class Xattr(object):
- """singleton that wraps the extended attribues system
+ """singleton that wraps the extended attributes system
interface for python using ctypes
Just implement it to the degree we need it, in particular
@@ -38,20 +40,23 @@ class Xattr(object):
@classmethod
def _query_xattr(cls, path, siz, syscall, *a):
if siz:
- buf = create_string_buffer('\0' * siz)
+ buf = gr_create_string_buffer(siz)
else:
buf = None
ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz)))
if ret == -1:
cls.raise_oserr()
if siz:
- return buf.raw[:ret]
+ # py2 and py3 compatibility. Convert bytes array
+ # to string
+ result = bytearray_to_str(buf.raw)
+ return result[:ret]
else:
return ret
@classmethod
def lgetxattr(cls, path, attr, siz=0):
- return cls._query_xattr(path, siz, 'lgetxattr', attr)
+ return gr_query_xattr(cls, path, siz, 'lgetxattr', attr)
@classmethod
def lgetxattr_buf(cls, path, attr):
@@ -65,7 +70,7 @@ class Xattr(object):
@classmethod
def llistxattr(cls, path, siz=0):
- ret = cls._query_xattr(path, siz, 'llistxattr')
+ ret = gr_query_xattr(cls, path, siz, 'llistxattr')
if isinstance(ret, str):
ret = ret.strip('\0')
ret = ret.split('\0') if ret else []
@@ -73,13 +78,13 @@ class Xattr(object):
@classmethod
def lsetxattr(cls, path, attr, val):
- ret = cls.libc.lsetxattr(path, attr, val, len(val), 0)
+ ret = gr_lsetxattr(cls, path, attr, val)
if ret == -1:
cls.raise_oserr()
@classmethod
def lremovexattr(cls, path, attr):
- ret = cls.libc.lremovexattr(path, attr)
+ ret = gr_lremovexattr(cls, path, attr)
if ret == -1:
cls.raise_oserr()
diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py
index da12438d069..a3bda7282c0 100644
--- a/geo-replication/syncdaemon/libgfchangelog.py
+++ b/geo-replication/syncdaemon/libgfchangelog.py
@@ -9,130 +9,135 @@
#
import os
-from ctypes import CDLL, RTLD_GLOBAL, create_string_buffer, \
- get_errno, byref, c_ulong
+from ctypes import CDLL, RTLD_GLOBAL, get_errno, byref, c_ulong
+from ctypes.util import find_library
from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
+from py2py3 import (gr_cl_history_changelog, gr_cl_done,
+ gr_create_string_buffer, gr_cl_register,
+ gr_cl_history_done, bytearray_to_str)
-class Changes(object):
- libgfc = CDLL("libgfchangelog.so", mode=RTLD_GLOBAL,
- use_errno=True)
-
- @classmethod
- def geterrno(cls):
- return get_errno()
-
- @classmethod
- def raise_changelog_err(cls):
- errn = cls.geterrno()
- raise ChangelogException(errn, os.strerror(errn))
-
- @classmethod
- def _get_api(cls, call):
- return getattr(cls.libgfc, call)
-
- @classmethod
- def cl_init(cls):
- ret = cls._get_api('gf_changelog_init')(None)
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_register(cls, brick, path, log_file, log_level, retries=0):
- ret = cls._get_api('gf_changelog_register')(brick, path,
- log_file,
- log_level, retries)
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_scan(cls):
- ret = cls._get_api('gf_changelog_scan')()
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_startfresh(cls):
- ret = cls._get_api('gf_changelog_start_fresh')()
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_getchanges(cls):
- """ remove hardcoding for path name length """
- def clsort(f):
- return f.split('.')[-1]
- changes = []
- buf = create_string_buffer('\0', 4096)
- call = cls._get_api('gf_changelog_next_change')
-
- while True:
- ret = call(buf, 4096)
- if ret in (0, -1):
- break
- changes.append(buf.raw[:ret - 1])
- if ret == -1:
- cls.raise_changelog_err()
- # cleanup tracker
- cls.cl_startfresh()
- return sorted(changes, key=clsort)
-
- @classmethod
- def cl_done(cls, clfile):
- ret = cls._get_api('gf_changelog_done')(clfile)
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_history_scan(cls):
- ret = cls._get_api('gf_history_changelog_scan')()
- if ret == -1:
- cls.raise_changelog_err()
-
- return ret
-
- @classmethod
- def cl_history_changelog(cls, changelog_path, start, end, num_parallel):
- actual_end = c_ulong()
- ret = cls._get_api('gf_history_changelog')(changelog_path, start, end,
- num_parallel,
- byref(actual_end))
- if ret == -1:
- cls.raise_changelog_err()
-
- if ret == -2:
- raise ChangelogHistoryNotAvailable()
-
- return (ret, actual_end.value)
-
- @classmethod
- def cl_history_startfresh(cls):
- ret = cls._get_api('gf_history_changelog_start_fresh')()
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_history_getchanges(cls):
- """ remove hardcoding for path name length """
- def clsort(f):
- return f.split('.')[-1]
-
- changes = []
- buf = create_string_buffer('\0', 4096)
- call = cls._get_api('gf_history_changelog_next_change')
-
- while True:
- ret = call(buf, 4096)
- if ret in (0, -1):
- break
- changes.append(buf.raw[:ret - 1])
- if ret == -1:
- cls.raise_changelog_err()
-
- return sorted(changes, key=clsort)
-
- @classmethod
- def cl_history_done(cls, clfile):
- ret = cls._get_api('gf_history_changelog_done')(clfile)
- if ret == -1:
- cls.raise_changelog_err()
+libgfc = CDLL(
+ find_library("gfchangelog"),
+ mode=RTLD_GLOBAL,
+ use_errno=True
+)
+
+
+def _raise_changelog_err():
+ errn = get_errno()
+ raise ChangelogException(errn, os.strerror(errn))
+
+
+def _init():
+ if libgfc.gf_changelog_init(None) == -1:
+ _raise_changelog_err()
+
+
+def register(brick, path, log_file, log_level, retries=0):
+ _init()
+
+ ret = gr_cl_register(libgfc, brick, path, log_file, log_level, retries)
+
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def scan():
+ ret = libgfc.gf_changelog_scan()
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def startfresh():
+ ret = libgfc.gf_changelog_start_fresh()
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def getchanges():
+ def clsort(cfile):
+ return cfile.split('.')[-1]
+
+ changes = []
+ buf = gr_create_string_buffer(4096)
+ call = libgfc.gf_changelog_next_change
+
+ while True:
+ ret = call(buf, 4096)
+ if ret in (0, -1):
+ break
+
+ # py2 and py3 compatibility
+ result = bytearray_to_str(buf.raw[:ret - 1])
+ changes.append(result)
+
+ if ret == -1:
+ _raise_changelog_err()
+
+ # cleanup tracker
+ startfresh()
+
+ return sorted(changes, key=clsort)
+
+
+def done(clfile):
+ ret = gr_cl_done(libgfc, clfile)
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def history_scan():
+ ret = libgfc.gf_history_changelog_scan()
+ if ret == -1:
+ _raise_changelog_err()
+
+ return ret
+
+
+def history_changelog(changelog_path, start, end, num_parallel):
+ actual_end = c_ulong()
+ ret = gr_cl_history_changelog(libgfc, changelog_path, start, end,
+ num_parallel, byref(actual_end))
+ if ret == -1:
+ _raise_changelog_err()
+
+ if ret == -2:
+ raise ChangelogHistoryNotAvailable()
+
+ return (ret, actual_end.value)
+
+
+def history_startfresh():
+ ret = libgfc.gf_history_changelog_start_fresh()
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def history_getchanges():
+ def clsort(cfile):
+ return cfile.split('.')[-1]
+
+ changes = []
+ buf = gr_create_string_buffer(4096)
+ call = libgfc.gf_history_changelog_next_change
+
+ while True:
+ ret = call(buf, 4096)
+ if ret in (0, -1):
+ break
+
+ # py2 and py3 compatibility
+ result = bytearray_to_str(buf.raw[:ret - 1])
+ changes.append(result)
+
+ if ret == -1:
+ _raise_changelog_err()
+
+ return sorted(changes, key=clsort)
+
+
+def history_done(clfile):
+ ret = gr_cl_history_done(libgfc, clfile)
+ if ret == -1:
+ _raise_changelog_err()
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index d9f63a440fb..9501aeae6b5 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -22,11 +22,13 @@ from threading import Condition, Lock
from datetime import datetime
import gsyncdconfig as gconf
+import libgfchangelog
from rconf import rconf
-from syncdutils import Thread, GsyncdError, escape_space_newline
-from syncdutils import unescape_space_newline, gauxpfx, escape
-from syncdutils import lstat, errno_wrap, FreeObject, lf, matching_disk_gfid
-from syncdutils import NoStimeAvailable, PartialHistoryAvailable
+from syncdutils import (Thread, GsyncdError, escape_space_newline,
+ unescape_space_newline, gauxpfx, escape,
+ lstat, errno_wrap, FreeObject, lf, matching_disk_gfid,
+ NoStimeAvailable, PartialHistoryAvailable,
+ host_brick_split)
URXTIME = (-1, 0)
@@ -65,6 +67,9 @@ def _volinfo_hook_relax_foreign(self):
def edct(op, **ed):
dct = {}
dct['op'] = op
+ # This is used in automatic gfid conflict resolution.
+ # When marked True, it's skipped during re-processing.
+ dct['skip_entry'] = False
for k in ed:
if k == 'stat':
st = ed[k]
@@ -192,7 +197,7 @@ class NormalMixin(object):
vi = vi.copy()
vi['timeout'] = int(time.time()) + timo
else:
- # send keep-alives more frequently to
+ # send keep-alive more frequently to
# avoid a delay in announcing our volume info
# to slave if it becomes established in the
# meantime
@@ -448,18 +453,10 @@ class GMasterCommon(object):
if rconf.mgmt_lock_fd:
try:
fcntl.lockf(rconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
- if not rconf.active_earlier:
- rconf.active_earlier = True
- logging.info(lf("Got lock Becoming ACTIVE",
- brick=rconf.args.local_path))
return True
except:
ex = sys.exc_info()[1]
if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
- if not rconf.passive_earlier:
- rconf.passive_earlier = True
- logging.info(lf("Didn't get lock Becoming PASSIVE",
- brick=rconf.local_path))
return False
raise
@@ -494,18 +491,10 @@ class GMasterCommon(object):
ex = sys.exc_info()[1]
if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
# cannot grab, it's taken
- if not rconf.passive_earlier:
- rconf.passive_earlier = True
- logging.info(lf("Didn't get lock Becoming PASSIVE",
- brick=rconf.args.local_path))
rconf.mgmt_lock_fd = fd
return False
raise
- if not rconf.active_earlier:
- rconf.active_earlier = True
- logging.info(lf("Got lock Becoming ACTIVE",
- brick=rconf.args.local_path))
return True
def should_crawl(self):
@@ -529,8 +518,8 @@ class GMasterCommon(object):
# If crawlwrap is called when partial history available,
# then it sets register_time which is the time when geo-rep
- # worker registerd to changelog consumption. Since nsec is
- # not considered in register time, their are chances of skipping
+ # worker registered to changelog consumption. Since nsec is
+ # not considered in register time, there are chances of skipping
# changes detection in xsync crawl. This limit will be reset when
# crawlwrap is called again.
self.live_changelog_start_time = None
@@ -540,7 +529,7 @@ class GMasterCommon(object):
# no need to maintain volinfo state machine.
# in a cascading setup, each geo-replication session is
# independent (ie. 'volume-mark' and 'xtime' are not
- # propogated). This is because the slave's xtime is now
+ # propagated). This is because the slave's xtime is now
# stored on the master itself. 'volume-mark' just identifies
# that we are in a cascading setup and need to enable
# 'geo-replication.ignore-pid-check' option.
@@ -711,7 +700,8 @@ class GMasterChangelogMixin(GMasterCommon):
TYPE_GFID = "D "
TYPE_ENTRY = "E "
- MAX_EF_RETRIES = 15
+ MAX_EF_RETRIES = 10
+ MAX_OE_RETRIES = 10
# flat directory hierarchy for gfid based access
FLAT_DIR_HIERARCHY = '.'
@@ -803,21 +793,29 @@ class GMasterChangelogMixin(GMasterCommon):
self.status.inc_value("failures", num_failures)
- def fix_possible_entry_failures(self, failures, retry_count):
+ def fix_possible_entry_failures(self, failures, retry_count, entries):
pfx = gauxpfx()
fix_entry_ops = []
failures1 = []
+ remove_gfids = set()
for failure in failures:
- if failure[2]['dst']:
+ if failure[2]['name_mismatch']:
+ pbname = failure[2]['slave_entry']
+ elif failure[2]['dst']:
pbname = failure[0]['entry1']
else:
pbname = failure[0]['entry']
- if failure[2]['gfid_mismatch']:
+
+ op = failure[0]['op']
+ # name exists but gfid is different
+ if failure[2]['gfid_mismatch'] or failure[2]['name_mismatch']:
slave_gfid = failure[2]['slave_gfid']
st = lstat(os.path.join(pfx, slave_gfid))
+ # Takes care of scenarios with no hardlinks
if isinstance(st, int) and st == ENOENT:
- logging.info(lf('Fixing gfid mismatch in slave. Deleting'
- ' the entry', retry_count=retry_count,
+ logging.debug(lf('Entry not present on master. Fixing gfid '
+ 'mismatch in slave. Deleting the entry',
+ retry_count=retry_count,
entry=repr(failure)))
# Add deletion to fix_entry_ops list
if failure[2]['slave_isdir']:
@@ -830,79 +828,163 @@ class GMasterChangelogMixin(GMasterCommon):
edct('UNLINK',
gfid=failure[2]['slave_gfid'],
entry=pbname))
+ remove_gfids.add(slave_gfid)
+ if op in ['RENAME']:
+ # If renamed gfid doesn't exists on master, remove
+ # rename entry and unlink src on slave
+ st = lstat(os.path.join(pfx, failure[0]['gfid']))
+ if isinstance(st, int) and st == ENOENT:
+ logging.debug("Unlink source %s" % repr(failure))
+ remove_gfids.add(failure[0]['gfid'])
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[0]['gfid'],
+ entry=failure[0]['entry']))
+ # Takes care of scenarios of hardlinks/renames on master
elif not isinstance(st, int):
- # The file exists on master but with different name.
- # Probabaly renamed and got missed during xsync crawl.
- if failure[2]['slave_isdir']:
- logging.info(lf('Fixing gfid mismatch in slave',
+ if matching_disk_gfid(slave_gfid, pbname):
+ # Safe to ignore the failure as master contains same
+ # file with same gfid. Remove entry from entries list
+ logging.debug(lf('Fixing gfid mismatch in slave. '
+ ' Safe to ignore, take out entry',
retry_count=retry_count,
entry=repr(failure)))
+ remove_gfids.add(failure[0]['gfid'])
+ if op == 'RENAME':
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[0]['gfid'],
+ entry=failure[0]['entry']))
+ # The file exists on master but with different name.
+ # Probably renamed and got missed during xsync crawl.
+ elif failure[2]['slave_isdir']:
realpath = os.readlink(os.path.join(
- rconf.args.local_path,
- ".glusterfs",
- slave_gfid[0:2],
- slave_gfid[2:4],
- slave_gfid))
+ rconf.args.local_path,
+ ".glusterfs",
+ slave_gfid[0:2],
+ slave_gfid[2:4],
+ slave_gfid))
dst_entry = os.path.join(pfx, realpath.split('/')[-2],
realpath.split('/')[-1])
- rename_dict = edct('RENAME', gfid=slave_gfid,
- entry=failure[0]['entry'],
- entry1=dst_entry, stat=st,
- link=None)
- logging.info(lf('Fixing gfid mismatch in slave. '
- 'Renaming', retry_count=retry_count,
- entry=repr(rename_dict)))
- fix_entry_ops.append(rename_dict)
+ src_entry = pbname
+ logging.debug(lf('Fixing dir name/gfid mismatch in '
+ 'slave', retry_count=retry_count,
+ entry=repr(failure)))
+ if src_entry == dst_entry:
+ # Safe to ignore the failure as master contains
+ # same directory as in slave with same gfid.
+ # Remove the failure entry from entries list
+ logging.debug(lf('Fixing dir name/gfid mismatch'
+ ' in slave. Safe to ignore, '
+ 'take out entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ try:
+ entries.remove(failure[0])
+ except ValueError:
+ pass
+ else:
+ rename_dict = edct('RENAME', gfid=slave_gfid,
+ entry=src_entry,
+ entry1=dst_entry, stat=st,
+ link=None)
+ logging.debug(lf('Fixing dir name/gfid mismatch'
+ ' in slave. Renaming',
+ retry_count=retry_count,
+ entry=repr(rename_dict)))
+ fix_entry_ops.append(rename_dict)
else:
- logging.info(lf('Fixing gfid mismatch in slave. '
- ' Deleting the entry',
+ # A hardlink file exists with different name or
+ # renamed file exists and we are sure from
+ # matching_disk_gfid check that the entry doesn't
+ # exist with same gfid so we can safely delete on slave
+ logging.debug(lf('Fixing file gfid mismatch in slave. '
+ 'Hardlink/Rename Case. Deleting entry',
retry_count=retry_count,
entry=repr(failure)))
fix_entry_ops.append(
edct('UNLINK',
gfid=failure[2]['slave_gfid'],
entry=pbname))
- logging.error(lf('Entry cannot be fixed in slave due '
- 'to GFID mismatch, find respective '
- 'path for the GFID and trigger sync',
- gfid=slave_gfid))
+ elif failure[1] == ENOENT:
+ if op in ['RENAME']:
+ pbname = failure[0]['entry1']
+ else:
+ pbname = failure[0]['entry']
+
+ pargfid = pbname.split('/')[1]
+ st = lstat(os.path.join(pfx, pargfid))
+ # Safe to ignore the failure as master doesn't contain
+ # parent directory.
+ if isinstance(st, int):
+ logging.debug(lf('Fixing ENOENT error in slave. Parent '
+ 'does not exist on master. Safe to '
+ 'ignore, take out entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ try:
+ entries.remove(failure[0])
+ except ValueError:
+ pass
+ else:
+ logging.debug(lf('Fixing ENOENT error in slave. Create '
+ 'parent directory on slave.',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ realpath = os.readlink(os.path.join(rconf.args.local_path,
+ ".glusterfs",
+ pargfid[0:2],
+ pargfid[2:4],
+ pargfid))
+ dir_entry = os.path.join(pfx, realpath.split('/')[-2],
+ realpath.split('/')[-1])
+ fix_entry_ops.append(
+ edct('MKDIR', gfid=pargfid, entry=dir_entry,
+ mode=st.st_mode, uid=st.st_uid, gid=st.st_gid))
+
+ logging.debug("remove_gfids: %s" % repr(remove_gfids))
+ if remove_gfids:
+ for e in entries:
+ if e['op'] in ['MKDIR', 'MKNOD', 'CREATE', 'RENAME'] \
+ and e['gfid'] in remove_gfids:
+ logging.debug("Removed entry op from retrial list: entry: %s" % repr(e))
+ e['skip_entry'] = True
if fix_entry_ops:
# Process deletions of entries whose gfids are mismatched
failures1 = self.slave.server.entry_ops(fix_entry_ops)
- if not failures1:
- logging.info("Sucessfully fixed entry ops with gfid mismatch")
- return failures1
+ return (failures1, fix_entry_ops)
def handle_entry_failures(self, failures, entries):
retries = 0
pending_failures = False
failures1 = []
failures2 = []
+ entry_ops1 = []
+ entry_ops2 = []
if failures:
pending_failures = True
failures1 = failures
+ entry_ops1 = entries
while pending_failures and retries < self.MAX_EF_RETRIES:
retries += 1
- failures2 = self.fix_possible_entry_failures(failures1,
- retries)
+ (failures2, entry_ops2) = self.fix_possible_entry_failures(
+ failures1, retries, entry_ops1)
if not failures2:
pending_failures = False
+ logging.info(lf('Successfully fixed entry ops with gfid '
+ 'mismatch', retry_count=retries))
else:
pending_failures = True
failures1 = failures2
+ entry_ops1 = entry_ops2
if pending_failures:
for failure in failures1:
logging.error("Failed to fix entry ops %s", repr(failure))
- else:
- # Retry original entry list 5 times
- failures = self.slave.server.entry_ops(entries)
-
- self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
def process_change(self, change, done, retry):
pfx = gauxpfx()
@@ -1003,7 +1085,7 @@ class GMasterChangelogMixin(GMasterCommon):
# Special case: record mknod as link
if ty in ['MKNOD']:
mode = int(ec[2])
- if mode & 01000:
+ if mode & 0o1000:
# Avoid stat'ing the file as it
# may be deleted in the interim
st = FreeObject(st_mode=int(ec[2]),
@@ -1044,6 +1126,11 @@ class GMasterChangelogMixin(GMasterCommon):
os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en,
stat=st, link=rl))
+ # If src doesn't exist while doing rename, destination
+ # is created. If data is not followed by rename, this
+ # remains zero byte file on slave. Hence add data entry
+ # for renames
+ datas.add(os.path.join(pfx, gfid))
else:
# stat() to get mode and other information
if not matching_disk_gfid(gfid, en):
@@ -1067,6 +1154,12 @@ class GMasterChangelogMixin(GMasterCommon):
rl = None
entries.append(edct(ty, stat=st, entry=en, gfid=gfid,
link=rl))
+ # If src doesn't exist while doing link, destination
+ # is created based on file type. If data is not
+ # followed by link, this remains zero byte file on
+ # slave. Hence add data entry for links
+ if rl is None:
+ datas.add(os.path.join(pfx, gfid))
elif ty == 'SYMLINK':
rl = errno_wrap(os.readlink, [en], [ENOENT],
[ESTALE, EINTR])
@@ -1114,7 +1207,6 @@ class GMasterChangelogMixin(GMasterCommon):
logging.debug('entries: %s' % repr(entries))
# Increment counters for Status
- self.status.inc_value("entry", len(entries))
self.files_in_batch += len(datas)
self.status.inc_value("data", len(datas))
@@ -1129,7 +1221,23 @@ class GMasterChangelogMixin(GMasterCommon):
self.status.inc_value("entry", len(entries))
failures = self.slave.server.entry_ops(entries)
- self.handle_entry_failures(failures, entries)
+
+ if gconf.get("gfid-conflict-resolution"):
+ count = 0
+ if failures:
+ logging.info(lf('Entry ops failed with gfid mismatch',
+ count=len(failures)))
+ while failures and count < self.MAX_OE_RETRIES:
+ count += 1
+ self.handle_entry_failures(failures, entries)
+ logging.info(lf('Retry original entries', count=count))
+ failures = self.slave.server.entry_ops(entries)
+ if not failures:
+ logging.info("Successfully fixed all entry ops with "
+ "gfid mismatch")
+ break
+
+ self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
self.status.dec_value("entry", len(entries))
# Update Entry stime in Brick Root only in case of Changelog mode
@@ -1164,10 +1272,10 @@ class GMasterChangelogMixin(GMasterCommon):
continue
meta_entries.append(edct('META', go=go[0], stat=st))
if meta_entries:
- self.status.inc_value("meta", len(entries))
+ self.status.inc_value("meta", len(meta_entries))
failures = self.slave.server.meta_ops(meta_entries)
self.log_failures(failures, 'go', '', 'META')
- self.status.dec_value("meta", len(entries))
+ self.status.dec_value("meta", len(meta_entries))
self.batch_stats["META_SYNC_TIME"] += time.time() - meta_start_time
@@ -1359,7 +1467,7 @@ class GMasterChangelogMixin(GMasterCommon):
node = rconf.args.resource_remote
node_data = node.split("@")
node = node_data[-1]
- remote_node_ip = node.split(":")[0]
+ remote_node_ip, _ = host_brick_split(node)
self.status.set_slave_node(remote_node_ip)
def changelogs_batch_process(self, changes):
@@ -1392,9 +1500,9 @@ class GMasterChangelogMixin(GMasterCommon):
# that are _historical_ to that time.
data_stime = self.get_data_stime()
- self.changelog_agent.scan()
+ libgfchangelog.scan()
self.crawls += 1
- changes = self.changelog_agent.getchanges()
+ changes = libgfchangelog.getchanges()
if changes:
if data_stime:
logging.info(lf("slave's time",
@@ -1411,10 +1519,9 @@ class GMasterChangelogMixin(GMasterCommon):
self.changelogs_batch_process(changes)
- def register(self, register_time, changelog_agent, status):
- self.changelog_agent = changelog_agent
+ def register(self, register_time, status):
self.sleep_interval = gconf.get("change-interval")
- self.changelog_done_func = self.changelog_agent.done
+ self.changelog_done_func = libgfchangelog.done
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
".processed")
@@ -1423,11 +1530,10 @@ class GMasterChangelogMixin(GMasterCommon):
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
- def register(self, register_time, changelog_agent, status):
- self.changelog_agent = changelog_agent
+ def register(self, register_time, status):
self.changelog_register_time = register_time
self.history_crawl_start_time = register_time
- self.changelog_done_func = self.changelog_agent.history_done
+ self.changelog_done_func = libgfchangelog.history_done
self.history_turns = 0
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
@@ -1441,6 +1547,12 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
data_stime = self.get_data_stime()
end_time = int(time.time())
+
+ #as start of historical crawl marks Geo-rep worker restart
+ if gconf.get("ignore-deletes"):
+ logging.info(lf('ignore-deletes config option is set',
+ stime=data_stime))
+
logging.info(lf('starting history crawl',
turns=self.history_turns,
stime=data_stime,
@@ -1455,7 +1567,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# location then consuming history will not work(Known issue as of now)
changelog_path = os.path.join(rconf.args.local_path,
".glusterfs/changelogs")
- ret, actual_end = self.changelog_agent.history(
+ ret, actual_end = libgfchangelog.history_changelog(
changelog_path,
data_stime[0],
end_time,
@@ -1467,10 +1579,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# to be processed. returns positive value as number of changelogs
# to be processed, which will be fetched using
# history_getchanges()
- while self.changelog_agent.history_scan() > 0:
+ while libgfchangelog.history_scan() > 0:
self.crawls += 1
- changes = self.changelog_agent.history_getchanges()
+ changes = libgfchangelog.history_getchanges()
if changes:
if data_stime:
logging.info(lf("slave's time",
@@ -1504,7 +1616,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
self.history_crawl_start_time = int(time.time())
self.crawl()
else:
- # This exeption will be catched in resource.py and
+ # This exception will be caught in resource.py and
# fallback to xsync for the small gap.
raise PartialHistoryAvailable(str(actual_end))
@@ -1523,7 +1635,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self, register_time=None, changelog_agent=None, status=None):
+ def register(self, register_time=None, status=None):
self.status = status
self.counter = 0
self.comlist = []
@@ -1641,7 +1753,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
def is_sticky(self, path, mo):
"""check for DHTs linkto sticky bit file"""
sticky = False
- if mo & 01000:
+ if mo & 0o1000:
sticky = self.master.server.linkto_check(path)
return sticky
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index 40818427bfe..6aa7b9dfc99 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -20,13 +20,16 @@ import random
from resource import SSH
import gsyncdconfig as gconf
+import libgfchangelog
from rconf import rconf
-from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile
-from syncdutils import set_term_handler, GsyncdError
-from syncdutils import Thread, finalize, Volinfo, VolinfoFromGconf
-from syncdutils import gf_event, EVENT_GEOREP_FAULTY, get_up_nodes
+from syncdutils import (select, waitpid, errno_wrap, lf, grabpidfile,
+ set_term_handler, GsyncdError,
+ Thread, finalize, Volinfo, VolinfoFromGconf,
+ gf_event, EVENT_GEOREP_FAULTY, get_up_nodes,
+ unshare_propagation_supported)
from gsyncdstatus import GeorepStatus, set_monitor_status
-from syncdutils import unshare_propagation_supported
+import py2py3
+from py2py3 import pipe
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
@@ -35,6 +38,8 @@ def get_subvol_num(brick_idx, vol, hot):
tier = vol.is_tier()
disperse_count = vol.disperse_count(tier, hot)
replica_count = vol.replica_count(tier, hot)
+ distribute_count = vol.distribution_count(tier, hot)
+ gconf.setconfig("master-distribution-count", distribute_count)
if (tier and not hot):
brick_idx = brick_idx - vol.get_hot_bricks_count(tier)
@@ -77,7 +82,7 @@ class Monitor(object):
# give a chance to graceful exit
errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH])
- def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master,
+ def monitor(self, w, argv, cpids, slave_vol, slave_host, master,
suuid, slavenodes):
"""the monitor loop
@@ -105,10 +110,6 @@ class Monitor(object):
master,
"%s::%s" % (slave_host,
slave_vol))
-
- set_monitor_status(gconf.get("state-file"), self.ST_STARTED)
- self.status[w[0]['dir']].set_worker_status(self.ST_INIT)
-
ret = 0
def nwait(p, o=0):
@@ -126,7 +127,7 @@ class Monitor(object):
raise
def exit_signalled(s):
- """ child teminated due to receipt of SIGUSR1 """
+ """ child terminated due to receipt of SIGUSR1 """
return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1))
def exit_status(s):
@@ -150,51 +151,18 @@ class Monitor(object):
remote_host = "%s@%s" % (remote_user, remote_new[0])
remote_id = remote_new[1]
- # Spawn the worker and agent in lock to avoid fd leak
+ # Spawn the worker in lock to avoid fd leak
self.lock.acquire()
+ self.status[w[0]['dir']].set_worker_status(self.ST_INIT)
logging.info(lf('starting gsyncd worker',
brick=w[0]['dir'],
slave_node=remote_host))
- # Couple of pipe pairs for RPC communication b/w
- # worker and changelog agent.
-
- # read/write end for agent
- (ra, ww) = os.pipe()
- # read/write end for worker
- (rw, wa) = os.pipe()
-
- # spawn the agent process
- apid = os.fork()
- if apid == 0:
- os.close(rw)
- os.close(ww)
- args_to_agent = argv + [
- 'agent',
- rconf.args.master,
- rconf.args.slave,
- '--local-path', w[0]['dir'],
- '--local-node', w[0]['host'],
- '--local-node-id', w[0]['uuid'],
- '--slave-id', suuid,
- '--rpc-fd', ','.join([str(ra), str(wa), str(rw), str(ww)])
- ]
-
- if rconf.args.config_file is not None:
- args_to_agent += ['-c', rconf.args.config_file]
-
- if rconf.args.debug:
- args_to_agent.append("--debug")
-
- os.execv(sys.executable, args_to_agent)
-
- pr, pw = os.pipe()
+ pr, pw = pipe()
cpid = os.fork()
if cpid == 0:
os.close(pr)
- os.close(ra)
- os.close(wa)
args_to_worker = argv + [
'worker',
@@ -205,8 +173,6 @@ class Monitor(object):
'--local-node', w[0]['host'],
'--local-node-id', w[0]['uuid'],
'--slave-id', suuid,
- '--rpc-fd',
- ','.join([str(rw), str(ww), str(ra), str(wa)]),
'--subvol-num', str(w[2]),
'--resource-remote', remote_host,
'--resource-remote-id', remote_id
@@ -237,14 +203,8 @@ class Monitor(object):
os.execv(sys.executable, args_to_worker)
cpids.add(cpid)
- agents.add(apid)
os.close(pw)
- # close all RPC pipes in monitor
- os.close(ra)
- os.close(wa)
- os.close(rw)
- os.close(ww)
self.lock.release()
t0 = time.time()
@@ -253,42 +213,19 @@ class Monitor(object):
if so:
ret = nwait(cpid, os.WNOHANG)
- ret_agent = nwait(apid, os.WNOHANG)
-
- if ret_agent is not None:
- # Agent is died Kill Worker
- logging.info(lf("Changelog Agent died, Aborting Worker",
- brick=w[0]['dir']))
- errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
- nwait(cpid)
- nwait(apid)
if ret is not None:
logging.info(lf("worker died before establishing "
"connection",
brick=w[0]['dir']))
- nwait(apid) # wait for agent
else:
logging.debug("worker(%s) connected" % w[0]['dir'])
while time.time() < t0 + conn_timeout:
ret = nwait(cpid, os.WNOHANG)
- ret_agent = nwait(apid, os.WNOHANG)
if ret is not None:
logging.info(lf("worker died in startup phase",
brick=w[0]['dir']))
- nwait(apid) # wait for agent
- break
-
- if ret_agent is not None:
- # Agent is died Kill Worker
- logging.info(lf("Changelog Agent died, Aborting "
- "Worker",
- brick=w[0]['dir']))
- errno_wrap(os.kill, [cpid, signal.SIGKILL],
- [ESRCH])
- nwait(cpid)
- nwait(apid)
break
time.sleep(1)
@@ -303,12 +240,8 @@ class Monitor(object):
brick=w[0]['dir'],
timeout=conn_timeout))
errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
- nwait(apid) # wait for agent
ret = nwait(cpid)
if ret is None:
- # If worker dies, agent terminates on EOF.
- # So lets wait for agent first.
- nwait(apid)
ret = nwait(cpid)
if exit_signalled(ret):
ret = 0
@@ -332,23 +265,33 @@ class Monitor(object):
argv = [os.path.basename(sys.executable), sys.argv[0]]
cpids = set()
- agents = set()
ta = []
for wx in wspx:
def wmon(w):
- cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol,
+ cpid, _ = self.monitor(w, argv, cpids, slave_vol,
slave_host, master, suuid, slavenodes)
time.sleep(1)
self.lock.acquire()
for cpid in cpids:
errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
- for apid in agents:
- errno_wrap(os.kill, [apid, signal.SIGKILL], [ESRCH])
self.lock.release()
finalize(exval=1)
t = Thread(target=wmon, args=[wx])
t.start()
ta.append(t)
+
+ # monitor status was being updated in each monitor thread. It
+ # should not be done as it can cause deadlock for a worker start.
+ # set_monitor_status uses flock to synchronize multple instances
+ # updating the file. Since each monitor thread forks worker,
+ # these processes can hold the reference to fd of status
+ # file causing deadlock to workers which starts later as flock
+ # will not be release until all references to same fd is closed.
+ # It will also cause fd leaks.
+
+ self.lock.acquire()
+ set_monitor_status(gconf.get("state-file"), self.ST_STARTED)
+ self.lock.release()
for t in ta:
t.join()
@@ -357,7 +300,7 @@ def distribute(master, slave):
if rconf.args.use_gconf_volinfo:
mvol = VolinfoFromGconf(master.volume, master=True)
else:
- mvol = Volinfo(master.volume, master.host)
+ mvol = Volinfo(master.volume, master.host, master=True)
logging.debug('master bricks: ' + repr(mvol.bricks))
prelude = []
slave_host = None
@@ -373,7 +316,7 @@ def distribute(master, slave):
if rconf.args.use_gconf_volinfo:
svol = VolinfoFromGconf(slave.volume, master=False)
else:
- svol = Volinfo(slave.volume, "localhost", prelude)
+ svol = Volinfo(slave.volume, "localhost", prelude, master=False)
sbricks = svol.bricks
suuid = svol.uuid
@@ -428,7 +371,7 @@ def startup(go_daemon=True):
if not go_daemon:
return
- x, y = os.pipe()
+ x, y = pipe()
cpid = os.fork()
if cpid:
os.close(x)
diff --git a/geo-replication/syncdaemon/py2py3.py b/geo-replication/syncdaemon/py2py3.py
new file mode 100644
index 00000000000..f9c76e1b50a
--- /dev/null
+++ b/geo-replication/syncdaemon/py2py3.py
@@ -0,0 +1,184 @@
+#
+# Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+# All python2/python3 compatibility routines
+
+import sys
+import os
+import stat
+import struct
+from syncdutils import umask
+from ctypes import create_string_buffer
+
+if sys.version_info >= (3,):
+ def pipe():
+ (r, w) = os.pipe()
+ os.set_inheritable(r, True)
+ os.set_inheritable(w, True)
+ return (r, w)
+
+ # Raw conversion of bytearray to string. Used in the cases where
+ # buffer is created by create_string_buffer which is a 8-bit char
+ # array and passed to syscalls to fetch results. Using encode/decode
+ # doesn't work as it converts to string altering the size.
+ def bytearray_to_str(byte_arr):
+ return ''.join([chr(b) for b in byte_arr])
+
+ # Raw conversion of string to bytes. This is required to convert
+ # back the string into bytearray(c char array) to use in struc
+ # pack/unpacking. Again encode/decode can't be used as it
+ # converts it alters size.
+ def str_to_bytearray(string):
+ return bytes([ord(c) for c in string])
+
+ def gr_create_string_buffer(size):
+ return create_string_buffer(b'\0', size)
+
+ def gr_query_xattr(cls, path, size, syscall, attr=None):
+ if attr:
+ return cls._query_xattr(path.encode(), size, syscall,
+ attr.encode())
+ else:
+ return cls._query_xattr(path.encode(), size, syscall)
+
+ def gr_lsetxattr(cls, path, attr, val):
+ return cls.libc.lsetxattr(path.encode(), attr.encode(), val,
+ len(val), 0)
+
+ def gr_lremovexattr(cls, path, attr):
+ return cls.libc.lremovexattr(path.encode(), attr.encode())
+
+ def gr_cl_register(libgfapi, brick, path, log_file, log_level, retries):
+ return libgfapi.gf_changelog_register(brick.encode(),
+ path.encode(),
+ log_file.encode(),
+ log_level, retries)
+
+ def gr_cl_done(libgfapi, clfile):
+ return libgfapi.gf_changelog_done(clfile.encode())
+
+ def gr_cl_history_changelog(libgfapi, changelog_path, start, end, num_parallel,
+ actual_end):
+ return libgfapi.gf_history_changelog(changelog_path.encode(),
+ start, end, num_parallel,
+ actual_end)
+
+ def gr_cl_history_done(libgfapi, clfile):
+ return libgfapi.gf_history_changelog_done(clfile.encode())
+
+ # regular file
+
+ def entry_pack_reg(cls, gf, bn, mo, uid, gid):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ return struct.pack(cls._fmt_mknod(blen),
+ uid, gid, gf.encode(), mo, bn_encoded,
+ stat.S_IMODE(mo), 0, umask())
+
+ def entry_pack_reg_stat(cls, gf, bn, st):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ mo = st['mode']
+ return struct.pack(cls._fmt_mknod(blen),
+ st['uid'], st['gid'],
+ gf.encode(), mo, bn_encoded,
+ stat.S_IMODE(mo), 0, umask())
+ # mkdir
+
+ def entry_pack_mkdir(cls, gf, bn, mo, uid, gid):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ return struct.pack(cls._fmt_mkdir(blen),
+ uid, gid, gf.encode(), mo, bn_encoded,
+ stat.S_IMODE(mo), umask())
+ # symlink
+
+ def entry_pack_symlink(cls, gf, bn, lnk, st):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ lnk_encoded = lnk.encode()
+ llen = len(lnk_encoded)
+ return struct.pack(cls._fmt_symlink(blen, llen),
+ st['uid'], st['gid'],
+ gf.encode(), st['mode'], bn_encoded,
+ lnk_encoded)
+else:
+ def pipe():
+ (r, w) = os.pipe()
+ return (r, w)
+
+ # Raw conversion of bytearray to string
+ def bytearray_to_str(byte_arr):
+ return byte_arr
+
+ # Raw conversion of string to bytearray
+ def str_to_bytearray(string):
+ return string
+
+ def gr_create_string_buffer(size):
+ return create_string_buffer('\0', size)
+
+ def gr_query_xattr(cls, path, size, syscall, attr=None):
+ if attr:
+ return cls._query_xattr(path, size, syscall, attr)
+ else:
+ return cls._query_xattr(path, size, syscall)
+
+ def gr_lsetxattr(cls, path, attr, val):
+ return cls.libc.lsetxattr(path, attr, val, len(val), 0)
+
+ def gr_lremovexattr(cls, path, attr):
+ return cls.libc.lremovexattr(path, attr)
+
+ def gr_cl_register(libgfapi, brick, path, log_file, log_level, retries):
+ return libgfapi.gf_changelog_register(brick, path, log_file,
+ log_level, retries)
+
+ def gr_cl_done(libgfapi, clfile):
+ return libgfapi.gf_changelog_done(clfile)
+
+ def gr_cl_history_changelog(libgfapi, changelog_path, start, end, num_parallel,
+ actual_end):
+ return libgfapi.gf_history_changelog(changelog_path, start, end,
+ num_parallel, actual_end)
+
+ def gr_cl_history_done(libgfapi, clfile):
+ return libgfapi.gf_history_changelog_done(clfile)
+
+ # regular file
+
+ def entry_pack_reg(cls, gf, bn, mo, uid, gid):
+ blen = len(bn)
+ return struct.pack(cls._fmt_mknod(blen),
+ uid, gid, gf, mo, bn,
+ stat.S_IMODE(mo), 0, umask())
+
+ def entry_pack_reg_stat(cls, gf, bn, st):
+ blen = len(bn)
+ mo = st['mode']
+ return struct.pack(cls._fmt_mknod(blen),
+ st['uid'], st['gid'],
+ gf, mo, bn,
+ stat.S_IMODE(mo), 0, umask())
+ # mkdir
+
+ def entry_pack_mkdir(cls, gf, bn, mo, uid, gid):
+ blen = len(bn)
+ return struct.pack(cls._fmt_mkdir(blen),
+ uid, gid, gf, mo, bn,
+ stat.S_IMODE(mo), umask())
+ # symlink
+
+ def entry_pack_symlink(cls, gf, bn, lnk, st):
+ blen = len(bn)
+ llen = len(lnk)
+ return struct.pack(cls._fmt_symlink(blen, llen),
+ st['uid'], st['gid'],
+ gf, st['mode'], bn, lnk)
diff --git a/geo-replication/syncdaemon/rconf.py b/geo-replication/syncdaemon/rconf.py
index ccac62c63a8..ff716ee4d6d 100644
--- a/geo-replication/syncdaemon/rconf.py
+++ b/geo-replication/syncdaemon/rconf.py
@@ -21,12 +21,6 @@ class RConf(object):
log_exit = False
permanent_handles = []
log_metadata = {}
- """One variable is sufficient to track the
- switching of worker to ACTIVE. Two variables
- are intentionally used to track worker going
- to PASSIVE as well mainly for debugging"""
- active_earlier = False
- passive_earlier = False
mgmt_lock_fd = None
args = None
turns = 0
diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py
index 9d5da666858..c622afa6373 100644
--- a/geo-replication/syncdaemon/repce.py
+++ b/geo-replication/syncdaemon/repce.py
@@ -28,13 +28,13 @@ except ImportError:
from syncdutils import Thread, select, lf
-pickle_proto = -1
+pickle_proto = 2
repce_version = 1.0
def ioparse(i, o):
if isinstance(i, int):
- i = os.fdopen(i)
+ i = os.fdopen(i, 'rb')
# rely on duck typing for recognizing
# streams as that works uniformly
# in py2 and py3
@@ -54,8 +54,15 @@ def send(out, *args):
def recv(inf):
- """load an object from input stream"""
- return pickle.load(inf)
+ """load an object from input stream
+ python2 and python3 compatibility, inf is sys.stdin
+ and is opened as text stream by default. Hence using the
+ buffer attribute in python3
+ """
+ if hasattr(inf, "buffer"):
+ return pickle.load(inf.buffer)
+ else:
+ return pickle.load(inf)
class RepceServer(object):
@@ -193,7 +200,7 @@ class RepceClient(object):
"""RePCe client is callabe, calling it implements a synchronous
remote call.
- We do a .push with a cbk which does a wakeup upon receiving anwser,
+ We do a .push with a cbk which does a wakeup upon receiving answer,
then wait on the RepceJob.
"""
rjob = self.push(
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 575a6605393..f12c7ceaa36 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -19,35 +19,37 @@ import struct
import logging
import tempfile
import subprocess
-from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES
-from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM
+from errno import (EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES,
+ EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM)
import errno
from rconf import rconf
import gsyncdconfig as gconf
+import libgfchangelog
import repce
from repce import RepceServer, RepceClient
from master import gmaster_builder
import syncdutils
-from syncdutils import GsyncdError, select, privileged, funcode
-from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
-from syncdutils import NoStimeAvailable, PartialHistoryAvailable
-from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
-from syncdutils import get_changelog_log_level, get_rsync_version
-from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
-from syncdutils import GX_GFID_CANONICAL_LEN
+from syncdutils import (GsyncdError, select, privileged, funcode,
+ entry2pb, gauxpfx, errno_wrap, lstat,
+ NoStimeAvailable, PartialHistoryAvailable,
+ ChangelogException, ChangelogHistoryNotAvailable,
+ get_changelog_log_level, get_rsync_version,
+ GX_GFID_CANONICAL_LEN,
+ gf_mount_ready, lf, Popen, sup,
+ Xattr, matching_disk_gfid, get_gfid_from_mnt,
+ unshare_propagation_supported, get_slv_dir_path)
from gsyncdstatus import GeorepStatus
-from syncdutils import lf, Popen, sup, Volinfo
-from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
-from syncdutils import unshare_propagation_supported
+from py2py3 import (pipe, str_to_bytearray, entry_pack_reg,
+ entry_pack_reg_stat, entry_pack_mkdir,
+ entry_pack_symlink)
ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
slv_volume = None
slv_host = None
-slv_bricks = None
class Server(object):
@@ -146,6 +148,7 @@ class Server(object):
if buf == ENOENT:
return buf
else:
+ buf = str_to_bytearray(buf)
m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(
['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)]))
return '-'.join(m.groups())
@@ -236,6 +239,7 @@ class Server(object):
val = Xattr.lgetxattr(path,
'.'.join([cls.GX_NSPACE, uuid, 'xtime']),
8)
+ val = str_to_bytearray(val)
return struct.unpack('!II', val)
except OSError:
ex = sys.exc_info()[1]
@@ -258,6 +262,7 @@ class Server(object):
val = Xattr.lgetxattr(path,
'.'.join([cls.GX_NSPACE, uuid, 'stime']),
8)
+ val = str_to_bytearray(val)
return struct.unpack('!II', val)
except OSError:
ex = sys.exc_info()[1]
@@ -280,6 +285,7 @@ class Server(object):
val = Xattr.lgetxattr(path,
'.'.join([cls.GX_NSPACE, uuid, 'stime']),
8)
+ val = str_to_bytearray(val)
return struct.unpack('!II', val)
except OSError:
ex = sys.exc_info()[1]
@@ -303,6 +309,7 @@ class Server(object):
'.'.join([cls.GX_NSPACE, uuid,
'entry_stime']),
8)
+ val = str_to_bytearray(val)
return struct.unpack('!II', val)
except OSError:
ex = sys.exc_info()[1]
@@ -371,50 +378,31 @@ class Server(object):
def entry_ops(cls, entries):
pfx = gauxpfx()
logging.debug('entries: %s' % repr(entries))
- # regular file
-
- def entry_pack_reg(gf, bn, mo, uid, gid):
- blen = len(bn)
- return struct.pack(cls._fmt_mknod(blen),
- uid, gid, gf, mo, bn,
- stat.S_IMODE(mo), 0, umask())
-
- def entry_pack_reg_stat(gf, bn, st):
- blen = len(bn)
- mo = st['mode']
- return struct.pack(cls._fmt_mknod(blen),
- st['uid'], st['gid'],
- gf, mo, bn,
- stat.S_IMODE(mo), 0, umask())
- # mkdir
-
- def entry_pack_mkdir(gf, bn, mo, uid, gid):
- blen = len(bn)
- return struct.pack(cls._fmt_mkdir(blen),
- uid, gid, gf, mo, bn,
- stat.S_IMODE(mo), umask())
- # symlink
-
- def entry_pack_symlink(gf, bn, lnk, st):
- blen = len(bn)
- llen = len(lnk)
- return struct.pack(cls._fmt_symlink(blen, llen),
- st['uid'], st['gid'],
- gf, st['mode'], bn, lnk)
-
- def entry_purge(op, entry, gfid, e):
+ dist_count = rconf.args.master_dist_count
+
+ def entry_purge(op, entry, gfid, e, uid, gid):
# This is an extremely racy code and needs to be fixed ASAP.
# The GFID check here is to be sure that the pargfid/bname
# to be purged is the GFID gotten from the changelog.
# (a stat(changelog_gfid) would also be valid here)
# The race here is between the GFID check and the purge.
+
+ # If the entry or the gfid of the file to be deleted is not present
+ # on slave, we can ignore the unlink/rmdir
+ if isinstance(lstat(entry), int) or \
+ isinstance(lstat(os.path.join(pfx, gfid)), int):
+ return
+
if not matching_disk_gfid(gfid, entry):
- collect_failure(e, EEXIST)
+ collect_failure(e, EEXIST, uid, gid)
return
if op == 'UNLINK':
er = errno_wrap(os.unlink, [entry], [ENOENT, ESTALE], [EBUSY])
- return er
+ # EISDIR is safe error, ignore. This can only happen when
+ # unlink is sent from master while fixing gfid conflicts.
+ if er != EISDIR:
+ return er
elif op == 'RMDIR':
er = errno_wrap(os.rmdir, [entry], [ENOENT, ESTALE,
@@ -422,28 +410,40 @@ class Server(object):
if er == ENOTEMPTY:
return er
- def collect_failure(e, cmd_ret, dst=False):
+ def collect_failure(e, cmd_ret, uid, gid, dst=False):
slv_entry_info = {}
slv_entry_info['gfid_mismatch'] = False
+ slv_entry_info['name_mismatch'] = False
slv_entry_info['dst'] = dst
+ slv_entry_info['slave_isdir'] = False
+ slv_entry_info['slave_name'] = None
+ slv_entry_info['slave_gfid'] = None
# We do this for failing fops on Slave
# Master should be logging this
if cmd_ret is None:
return False
- if cmd_ret == EEXIST:
+ if e.get("stat", {}):
+ # Copy actual UID/GID value back to entry stat
+ e['stat']['uid'] = uid
+ e['stat']['gid'] = gid
+
+ if cmd_ret in [EEXIST, ESTALE]:
if dst:
en = e['entry1']
else:
en = e['entry']
disk_gfid = get_gfid_from_mnt(en)
- if isinstance(disk_gfid, basestring) and \
+ if isinstance(disk_gfid, str) and \
e['gfid'] != disk_gfid:
slv_entry_info['gfid_mismatch'] = True
st = lstat(en)
if not isinstance(st, int):
if st and stat.S_ISDIR(st.st_mode):
slv_entry_info['slave_isdir'] = True
+ dir_name = get_slv_dir_path(slv_host, slv_volume,
+ disk_gfid)
+ slv_entry_info['slave_name'] = dir_name
else:
slv_entry_info['slave_isdir'] = False
slv_entry_info['slave_gfid'] = disk_gfid
@@ -489,7 +489,7 @@ class Server(object):
errno_wrap(os.rmdir, [path], [ENOENT, ESTALE], [EBUSY])
- def rename_with_disk_gfid_confirmation(gfid, entry, en):
+ def rename_with_disk_gfid_confirmation(gfid, entry, en, uid, gid):
if not matching_disk_gfid(gfid, entry):
logging.error(lf("RENAME ignored: source entry does not match "
"with on-disk gfid",
@@ -497,13 +497,13 @@ class Server(object):
gfid=gfid,
disk_gfid=get_gfid_from_mnt(entry),
target=en))
- collect_failure(e, EEXIST)
+ collect_failure(e, EEXIST, uid, gid)
return
cmd_ret = errno_wrap(os.rename,
[entry, en],
[ENOENT, EEXIST], [ESTALE, EBUSY])
- collect_failure(e, cmd_ret)
+ collect_failure(e, cmd_ret, uid, gid)
for e in entries:
blob = None
@@ -512,6 +512,12 @@ class Server(object):
entry = e['entry']
uid = 0
gid = 0
+
+ # Skip entry processing if it's marked true during gfid
+ # conflict resolution
+ if e['skip_entry']:
+ continue
+
if e.get("stat", {}):
# Copy UID/GID value and then reset to zero. Copied UID/GID
# will be used to run chown once entry is created.
@@ -524,7 +530,7 @@ class Server(object):
if op in ['RMDIR', 'UNLINK']:
# Try once, if rmdir failed with ENOTEMPTY
# then delete recursively.
- er = entry_purge(op, entry, gfid, e)
+ er = entry_purge(op, entry, gfid, e, uid, gid)
if isinstance(er, int):
if er == ENOTEMPTY and op == 'RMDIR':
# Retry if ENOTEMPTY, ESTALE
@@ -552,8 +558,8 @@ class Server(object):
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
- blob = entry_pack_reg(
- gfid, bname, e['mode'], e['uid'], e['gid'])
+ blob = entry_pack_reg(cls, gfid, bname,
+ e['mode'], e['uid'], e['gid'])
# Self healed hardlinks are recorded as MKNOD.
# So if the gfid already exists, it should be
# processed as hard link not mknod.
@@ -561,94 +567,104 @@ class Server(object):
cmd_ret = errno_wrap(os.link,
[slink, entry],
[ENOENT, EEXIST], [ESTALE])
- collect_failure(e, cmd_ret)
+ collect_failure(e, cmd_ret, uid, gid)
elif op == 'MKDIR':
+ en = e['entry']
slink = os.path.join(pfx, gfid)
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
- blob = entry_pack_mkdir(
- gfid, bname, e['mode'], e['uid'], e['gid'])
- else:
+ blob = entry_pack_mkdir(cls, gfid, bname,
+ e['mode'], e['uid'], e['gid'])
+ elif (isinstance(lstat(en), int) or
+ not matching_disk_gfid(gfid, en)):
# If gfid of a directory exists on slave but path based
# create is getting EEXIST. This means the directory is
# renamed in master but recorded as MKDIR during hybrid
# crawl. Get the directory path by reading the backend
# symlink and trying to rename to new name as said by
# master.
- global slv_bricks
- global slv_volume
- global slv_host
- if not slv_bricks:
- slv_info = Volinfo(slv_volume, slv_host)
- slv_bricks = slv_info.bricks
- # Result of readlink would be of format as below.
- # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
- realpath = os.readlink(os.path.join(slv_bricks[0]['dir'],
- ".glusterfs",
- gfid[0:2],
- gfid[2:4],
- gfid))
- realpath_parts = realpath.split('/')
- src_pargfid = realpath_parts[-2]
- src_basename = realpath_parts[-1]
- src_entry = os.path.join(pfx, src_pargfid, src_basename)
logging.info(lf("Special case: rename on mkdir",
gfid=gfid, entry=repr(entry)))
- rename_with_disk_gfid_confirmation(gfid, src_entry, entry)
+ src_entry = get_slv_dir_path(slv_host, slv_volume, gfid)
+ if src_entry is None:
+ collect_failure(e, ENOENT, uid, gid)
+ if src_entry is not None and src_entry != entry:
+ slv_entry_info = {}
+ slv_entry_info['gfid_mismatch'] = False
+ slv_entry_info['name_mismatch'] = True
+ slv_entry_info['dst'] = False
+ slv_entry_info['slave_isdir'] = True
+ slv_entry_info['slave_gfid'] = gfid
+ slv_entry_info['slave_entry'] = src_entry
+
+ failures.append((e, EEXIST, slv_entry_info))
elif op == 'LINK':
slink = os.path.join(pfx, gfid)
st = lstat(slink)
if isinstance(st, int):
(pg, bname) = entry2pb(entry)
if stat.S_ISREG(e['stat']['mode']):
- blob = entry_pack_reg_stat(gfid, bname, e['stat'])
+ blob = entry_pack_reg_stat(cls, gfid, bname, e['stat'])
elif stat.S_ISLNK(e['stat']['mode']):
- blob = entry_pack_symlink(gfid, bname, e['link'],
+ blob = entry_pack_symlink(cls, gfid, bname, e['link'],
e['stat'])
else:
cmd_ret = errno_wrap(os.link,
[slink, entry],
[ENOENT, EEXIST], [ESTALE])
- collect_failure(e, cmd_ret)
+ collect_failure(e, cmd_ret, uid, gid)
elif op == 'SYMLINK':
en = e['entry']
st = lstat(entry)
if isinstance(st, int):
- blob = entry_pack_symlink(gfid, bname, e['link'],
+ blob = entry_pack_symlink(cls, gfid, bname, e['link'],
e['stat'])
elif not matching_disk_gfid(gfid, en):
- collect_failure(e, EEXIST)
+ collect_failure(e, EEXIST, uid, gid)
elif op == 'RENAME':
en = e['entry1']
- st = lstat(entry)
- if isinstance(st, int):
+ # The matching disk gfid check validates two things
+ # 1. Validates name is present, return false otherwise
+ # 2. Validates gfid is same, returns false otherwise
+ # So both validations are necessary to decide src doesn't
+ # exist. We can't rely on only gfid stat as hardlink could
+ # be present and we can't rely only on name as name could
+ # exist with different gfid.
+ if not matching_disk_gfid(gfid, entry):
if e['stat'] and not stat.S_ISDIR(e['stat']['mode']):
- if stat.S_ISLNK(e['stat']['mode']) and \
- e['link'] is not None:
- st1 = lstat(en)
- if isinstance(st1, int):
- (pg, bname) = entry2pb(en)
- blob = entry_pack_symlink(gfid, bname,
- e['link'], e['stat'])
- elif not matching_disk_gfid(gfid, en):
- collect_failure(e, EEXIST, True)
+ if stat.S_ISLNK(e['stat']['mode']):
+ # src is not present, so don't sync symlink as
+ # we don't know target. It's ok to ignore. If
+ # it's unliked, it's fine. If it's renamed to
+ # something else, it will be synced then.
+ if e['link'] is not None:
+ st1 = lstat(en)
+ if isinstance(st1, int):
+ (pg, bname) = entry2pb(en)
+ blob = entry_pack_symlink(cls, gfid, bname,
+ e['link'],
+ e['stat'])
+ elif not matching_disk_gfid(gfid, en):
+ collect_failure(e, EEXIST, uid, gid, True)
else:
slink = os.path.join(pfx, gfid)
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
(pg, bname) = entry2pb(en)
- blob = entry_pack_reg_stat(gfid, bname,
+ blob = entry_pack_reg_stat(cls, gfid, bname,
e['stat'])
else:
cmd_ret = errno_wrap(os.link, [slink, en],
[ENOENT, EEXIST], [ESTALE])
- collect_failure(e, cmd_ret)
+ collect_failure(e, cmd_ret, uid, gid)
else:
+ st = lstat(entry)
st1 = lstat(en)
if isinstance(st1, int):
- rename_with_disk_gfid_confirmation(gfid, entry, en)
+ rename_with_disk_gfid_confirmation(gfid, entry, en,
+ uid, gid)
else:
if st.st_ino == st1.st_ino:
# we have a hard link, we can now unlink source
@@ -672,16 +688,23 @@ class Server(object):
raise
else:
raise
- elif not matching_disk_gfid(gfid, en):
- collect_failure(e, EEXIST, True)
+ elif not matching_disk_gfid(gfid, en) and dist_count > 1:
+ collect_failure(e, EEXIST, uid, gid, True)
else:
- rename_with_disk_gfid_confirmation(gfid, entry, en)
+ # We are here which means matching_disk_gfid for
+ # both source and destination has returned false
+ # and distribution count for master vol is greater
+ # then one. Which basically says both the source and
+ # destination exist and not hardlinks.
+ # So we are safe to go ahead with rename here.
+ rename_with_disk_gfid_confirmation(gfid, entry, en,
+ uid, gid)
if blob:
cmd_ret = errno_wrap(Xattr.lsetxattr,
[pg, 'glusterfs.gfid.newfile', blob],
- [EEXIST, ENOENT],
+ [EEXIST, ENOENT, ESTALE],
[ESTALE, EINVAL, EBUSY])
- collect_failure(e, cmd_ret)
+ collect_failure(e, cmd_ret, uid, gid)
# If UID/GID is different than zero that means we are trying
# create Entry with different UID/GID. Create Entry with
@@ -690,7 +713,7 @@ class Server(object):
path = os.path.join(pfx, gfid)
cmd_ret = errno_wrap(os.lchown, [path, uid, gid], [ENOENT],
[ESTALE, EINVAL])
- collect_failure(e, cmd_ret)
+ collect_failure(e, cmd_ret, uid, gid)
return failures
@@ -717,10 +740,8 @@ class Server(object):
# 'lchown' 'lchmod' 'utime with no-deference' blindly.
# But since 'lchmod' and 'utime with no de-reference' is
# not supported in python3, we have to rely on 'chmod'
- # and 'utime with de-reference'. But 'chmod'
- # de-reference the symlink and gets ENOENT, EACCES,
- # EPERM errors, hence ignoring those errors if it's on
- # symlink file.
+ # and 'utime with de-reference'. Hence avoiding 'chmod'
+ # and 'utime' if it's symlink file.
is_symlink = False
cmd_ret = errno_wrap(os.lchown, [go, uid, gid], [ENOENT],
@@ -728,19 +749,17 @@ class Server(object):
if isinstance(cmd_ret, int):
continue
- cmd_ret = errno_wrap(os.chmod, [go, mode],
- [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
- if isinstance(cmd_ret, int):
- is_symlink = os.path.islink(go)
- if not is_symlink:
+ is_symlink = os.path.islink(go)
+
+ if not is_symlink:
+ cmd_ret = errno_wrap(os.chmod, [go, mode],
+ [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
+ if isinstance(cmd_ret, int):
failures.append((e, cmd_ret, "chmod"))
- cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)],
- [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
- if isinstance(cmd_ret, int):
- if not is_symlink:
- is_symlink = os.path.islink(go)
- if not is_symlink:
+ cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)],
+ [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
+ if isinstance(cmd_ret, int):
failures.append((e, cmd_ret, "utime"))
return failures
@@ -816,7 +835,8 @@ class Mounter(object):
def umount_l(self, d):
"""perform lazy umount"""
- po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE)
+ po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE,
+ universal_newlines=True)
po.wait()
return po
@@ -840,7 +860,7 @@ class Mounter(object):
change into the mount, and lazy unmount the
filesystem.
"""
- mpi, mpo = os.pipe()
+ mpi, mpo = pipe()
mh = Popen.fork()
if mh:
# Parent
@@ -851,7 +871,9 @@ class Mounter(object):
if self.mntpt:
# mntpt is determined pre-mount
d = self.mntpt
- os.write(mpo, d + '\0')
+ mnt_msg = d + '\0'
+ encoded_msg = mnt_msg.encode()
+ os.write(mpo, encoded_msg)
po = Popen(margv, **self.mountkw)
self.handle_mounter(po)
po.terminate_geterr()
@@ -859,8 +881,11 @@ class Mounter(object):
if not d:
# mntpt is determined during mount
d = self.mntpt
- os.write(mpo, d + '\0')
- os.write(mpo, 'M')
+ mnt_msg = d + '\0'
+ encoded_msg = mnt_msg.encode()
+ os.write(mpo, encoded_msg)
+ encoded_msg = 'M'.encode()
+ os.write(mpo, encoded_msg)
t = syncdutils.Thread(target=lambda: os.chdir(d))
t.start()
tlim = rconf.starttime + gconf.get("connection-timeout")
@@ -889,6 +914,7 @@ class Mounter(object):
mntdata = ''
while True:
c = os.read(mpi, 1)
+ c = c.decode()
if not c:
break
mntdata += c
@@ -925,6 +951,16 @@ class Mounter(object):
logging.exception('mount cleanup failure:')
rv = 200
os._exit(rv)
+
+ #Polling the dht.subvol.status value.
+ RETRIES = 10
+ while not gf_mount_ready():
+ if RETRIES < 0:
+ logging.error('Subvols are not up')
+ break
+ RETRIES -= 1
+ time.sleep(0.2)
+
logging.debug('auxiliary glusterfs mount prepared')
@@ -932,7 +968,7 @@ class DirectMounter(Mounter):
"""mounter backend which calls mount(8), umount(8) directly"""
- mountkw = {'stderr': subprocess.PIPE}
+ mountkw = {'stderr': subprocess.PIPE, 'universal_newlines': True}
glusterprog = 'glusterfs'
@staticmethod
@@ -955,7 +991,8 @@ class MountbrokerMounter(Mounter):
"""mounter backend using the mountbroker gluster service"""
- mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE}
+ mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE,
+ 'universal_newlines': True}
glusterprog = 'gluster'
@classmethod
@@ -996,6 +1033,7 @@ class GLUSTERServer(Server):
"""generic volume mark fetching/parsing backed"""
fmt_string = cls.NTV_FMTSTR + extra_fields
buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))
+ buf = str_to_bytearray(buf)
vm = struct.unpack(fmt_string, buf)
m = re.match(
'(.{8})(.{4})(.{4})(.{4})(.{12})',
@@ -1218,9 +1256,6 @@ class GLUSTER(object):
# register the crawlers and start crawling
# g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
# g3 ==> changelog History
- (inf, ouf, ra, wa) = rconf.args.rpc_fd.split(',')
- changelog_agent = RepceClient(int(inf), int(ouf))
-
status = GeorepStatus(gconf.get("state-file"),
rconf.args.local_node,
rconf.args.local_path,
@@ -1228,12 +1263,6 @@ class GLUSTER(object):
rconf.args.master,
rconf.args.slave)
status.reset_on_worker_start()
- rv = changelog_agent.version()
- if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:
- raise GsyncdError(
- "RePCe major version mismatch(changelog agent): "
- "local %s, remote %s" %
- (CHANGELOG_AGENT_CLIENT_VERSION, rv))
try:
workdir = g2.setup_working_dir()
@@ -1244,17 +1273,16 @@ class GLUSTER(object):
# register with the changelog library
# 9 == log level (DEBUG)
# 5 == connection retries
- changelog_agent.init()
- changelog_agent.register(rconf.args.local_path,
- workdir,
- gconf.get("changelog-log-file"),
- get_changelog_log_level(
- gconf.get("changelog-log-level")),
- g2.CHANGELOG_CONN_RETRIES)
+ libgfchangelog.register(rconf.args.local_path,
+ workdir,
+ gconf.get("changelog-log-file"),
+ get_changelog_log_level(
+ gconf.get("changelog-log-level")),
+ g2.CHANGELOG_CONN_RETRIES)
register_time = int(time.time())
- g2.register(register_time, changelog_agent, status)
- g3.register(register_time, changelog_agent, status)
+ g2.register(register_time, status)
+ g3.register(register_time, status)
except ChangelogException as e:
logging.error(lf("Changelog register failed", error=e))
sys.exit(1)
@@ -1382,14 +1410,16 @@ class SSH(object):
'--local-node-id', rconf.args.resource_remote_id] + \
[
# Add all config arguments here, slave gsyncd will not use
- # config file in slave side, so all overridding options should
+ # config file in slave side, so all overriding options should
# be sent as arguments
'--slave-timeout', str(gconf.get("slave-timeout")),
'--slave-log-level', gconf.get("slave-log-level"),
'--slave-gluster-log-level',
gconf.get("slave-gluster-log-level"),
'--slave-gluster-command-dir',
- gconf.get("slave-gluster-command-dir")]
+ gconf.get("slave-gluster-command-dir"),
+ '--master-dist-count',
+ str(gconf.get("master-distribution-count"))]
if gconf.get("slave-access-mount"):
args_to_slave.append('--slave-access-mount')
@@ -1454,12 +1484,13 @@ class SSH(object):
if log_rsync_performance:
# use stdout=PIPE only when log_rsync_performance enabled
- # Else rsync will write to stdout and nobody is their
+ # Else rsync will write to stdout and nobody is there
# to consume. If PIPE is full rsync hangs.
po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ stderr=subprocess.PIPE, universal_newlines=True)
else:
- po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+ po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=True)
for f in files:
po.stdin.write(f)
@@ -1491,7 +1522,7 @@ class SSH(object):
return po
- def tarssh(self, files, slaveurl, log_err=False):
+ def tarssh(self, files, log_err=False):
"""invoke tar+ssh
-z (compress) can be use if needed, but omitting it now
as it results in weird error (tar+ssh errors out (errcode: 2)
@@ -1499,32 +1530,49 @@ class SSH(object):
if not files:
raise GsyncdError("no files to sync")
logging.debug("files: " + ", ".join(files))
- (host, rdir) = slaveurl.split(':')
+ (host, rdir) = self.slaveurl.split(':')
+
tar_cmd = ["tar"] + \
["--sparse", "-cf", "-", "--files-from", "-"]
- ssh_cmd = gconf.get("ssh-command-tar").split() + \
+ ssh_cmd = gconf.get("ssh-command").split() + \
gconf.get("ssh-options-tar").split() + \
["-p", str(gconf.get("ssh-port"))] + \
[host, "tar"] + \
["--overwrite", "-xf", "-", "-C", rdir]
p0 = Popen(tar_cmd, stdout=subprocess.PIPE,
- stdin=subprocess.PIPE, stderr=subprocess.PIPE)
- p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE)
+ stdin=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=True)
+ p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE,
+ universal_newlines=True)
for f in files:
p0.stdin.write(f)
p0.stdin.write('\n')
p0.stdin.close()
p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits.
- # wait for tar to terminate, collecting any errors, further
- # waiting for transfer to complete
- _, stderr1 = p1.communicate()
# stdin and stdout of p0 is already closed, Reset to None and
# wait for child process to complete
p0.stdin = None
p0.stdout = None
- p0.communicate()
+
+ def wait_for_tar(p0):
+ _, stderr = p0.communicate()
+ if log_err:
+ for errline in stderr.strip().split("\n")[:-1]:
+ if "No such file or directory" not in errline:
+ logging.error(lf("SYNC Error",
+ sync_engine="Tarssh",
+ error=errline))
+
+ t = syncdutils.Thread(target=wait_for_tar, args=(p0, ))
+ # wait for tar to terminate, collecting any errors, further
+ # waiting for transfer to complete
+ t.start()
+
+ # wait for ssh process
+ _, stderr1 = p1.communicate()
+ t.join()
if log_err:
for errline in stderr1.strip().split("\n")[:-1]:
diff --git a/geo-replication/syncdaemon/subcmds.py b/geo-replication/syncdaemon/subcmds.py
index 11d263d7e03..b8508532e30 100644
--- a/geo-replication/syncdaemon/subcmds.py
+++ b/geo-replication/syncdaemon/subcmds.py
@@ -73,7 +73,11 @@ def subcmd_worker(args):
Popen.init_errhandler()
fcntl.fcntl(args.feedback_fd, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
local = GLUSTER("localhost", args.master)
- slavehost, slavevol = args.slave.split("::")
+ slave_url, slavevol = args.slave.split("::")
+ if "@" not in slave_url:
+ slavehost = args.resource_remote
+ else:
+ slavehost = "%s@%s" % (slave_url.split("@")[0], args.resource_remote)
remote = SSH(slavehost, slavevol)
remote.connect_remote()
local.connect()
@@ -93,26 +97,21 @@ def subcmd_slave(args):
local.service_loop()
-def subcmd_agent(args):
- import os
- from changelogagent import agent, Changelog
- from syncdutils import lf
-
- os.setsid()
- logging.debug(lf("RPC FD",
- rpc_fd=repr(args.rpc_fd)))
- return agent(Changelog(), args.rpc_fd)
-
-
def subcmd_voluuidget(args):
from subprocess import Popen, PIPE
import xml.etree.ElementTree as XET
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
- po = Popen(['gluster', '--xml', '--remote-host=' + args.host,
- 'volume', 'info', args.volname], bufsize=0,
- stdin=None, stdout=PIPE, stderr=PIPE)
+ cmd = ['gluster', '--xml', '--remote-host=' + args.host,
+ 'volume', 'info', args.volname]
+
+ if args.inet6:
+ cmd.append("--inet6")
+
+ po = Popen(cmd, bufsize=0,
+ stdin=None, stdout=PIPE, stderr=PIPE,
+ universal_newlines=True)
vix, err = po.communicate()
if po.returncode != 0:
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 6acc9f17ad7..a3df103e76c 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -21,8 +21,8 @@ import subprocess
import socket
from subprocess import PIPE
from threading import Lock, Thread as baseThread
-from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
-from errno import EINTR, ENOENT, ESTALE, EBUSY, errorcode
+from errno import (EACCES, EAGAIN, EPIPE, ENOTCONN, ENOMEM, ECONNABORTED,
+ EINTR, ENOENT, ESTALE, EBUSY, ENODATA, errorcode, EIO)
from signal import signal, SIGTERM
import select as oselect
from os import waitpid as owaitpid
@@ -37,10 +37,10 @@ from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE
sys.path.insert(1, GLUSTERFS_LIBEXECDIR)
EVENTS_ENABLED = True
try:
- from events.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY
- from events.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE
- from events.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE
- from events.eventtypes import GEOREP_CHECKPOINT_COMPLETED \
+ from gfevents.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY
+ from gfevents.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE
+ from gfevents.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE
+ from gfevents.eventtypes import GEOREP_CHECKPOINT_COMPLETED \
as EVENT_GEOREP_CHECKPOINT_COMPLETED
except ImportError:
# Events APIs not installed, dummy eventtypes with None
@@ -55,17 +55,19 @@ from rconf import rconf
from hashlib import sha256 as sha256
+ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
+
# auxiliary gfid based access prefix
_CL_AUX_GFID_PFX = ".gfid/"
+ROOT_GFID = "00000000-0000-0000-0000-000000000001"
GF_OP_RETRIES = 10
GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0'
-CHANGELOG_AGENT_SERVER_VERSION = 1.0
-CHANGELOG_AGENT_CLIENT_VERSION = 1.0
NodeID = None
rsync_version = None
unshare_mnt_propagation = None
+slv_bricks = None
SPACE_ESCAPE_CHAR = "%20"
NEWLINE_ESCAPE_CHAR = "%0A"
PERCENTAGE_ESCAPE_CHAR = "%25"
@@ -98,6 +100,19 @@ def unescape_space_newline(s):
.replace(NEWLINE_ESCAPE_CHAR, "\n")\
.replace(PERCENTAGE_ESCAPE_CHAR, "%")
+# gf_mount_ready() returns 1 if all subvols are up, else 0
+def gf_mount_ready():
+ ret = errno_wrap(Xattr.lgetxattr,
+ ['.', 'dht.subvol.status', 16],
+ [ENOENT, ENOTSUP, ENODATA], [ENOMEM])
+
+ if isinstance(ret, int):
+ logging.error("failed to get the xattr value")
+ return 1
+ ret = ret.rstrip('\x00')
+ if ret == "1":
+ return 1
+ return 0
def norm(s):
if s:
@@ -159,7 +174,8 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url):
rconf.ssh_ctl_dir = ctld
content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr,
resource_url)
- content_sha256 = sha256hex(content)
+ encoded_content = content.encode()
+ content_sha256 = sha256hex(encoded_content)
"""
The length of ctl_path for ssh connection should not be > 108.
ssh fails with ctl_path too long if it is so. But when rsync
@@ -171,7 +187,7 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url):
fname = os.path.join(rconf.ssh_ctl_dir,
"%s.mft" % content_sha256)
- create_manifest(fname, content)
+ create_manifest(fname, encoded_content)
ssh_ctl_path = os.path.join(rconf.ssh_ctl_dir,
"%s.sock" % content_sha256)
rconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path]
@@ -184,7 +200,7 @@ def grabfile(fname, content=None):
"""
# damn those messy open() mode codes
fd = os.open(fname, os.O_CREAT | os.O_RDWR)
- f = os.fdopen(fd, 'r+b', 0)
+ f = os.fdopen(fd, 'r+')
try:
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except:
@@ -198,6 +214,7 @@ def grabfile(fname, content=None):
try:
f.truncate()
f.write(content)
+ f.flush()
except:
f.close()
raise
@@ -260,7 +277,8 @@ def finalize(*args, **kwargs):
umount_cmd = rconf.mbr_umount_cmd + [rconf.mount_point, 'lazy']
else:
umount_cmd = ['umount', '-l', rconf.mount_point]
- p0 = subprocess.Popen(umount_cmd, stderr=subprocess.PIPE)
+ p0 = subprocess.Popen(umount_cmd, stderr=subprocess.PIPE,
+ universal_newlines=True)
_, errdata = p0.communicate()
if p0.returncode == 0:
try:
@@ -326,13 +344,24 @@ def log_raise_exception(excont):
ECONNABORTED):
logging.error(lf('Gluster Mount process exited',
error=errorcode[exc.errno]))
+ elif isinstance(exc, OSError) and exc.errno == EIO:
+ logging.error("Getting \"Input/Output error\" "
+ "is most likely due to "
+ "a. Brick is down or "
+ "b. Split brain issue.")
+ logging.error("This is expected as per design to "
+ "keep the consistency of the file system. "
+ "Once the above issue is resolved "
+ "geo-replication would automatically "
+ "proceed further.")
+ logtag = "FAIL"
else:
logtag = "FAIL"
if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG):
logtag = "FULL EXCEPTION TRACE"
if logtag:
logging.exception(logtag + ": ")
- sys.stderr.write("failed with %s.\n" % type(exc).__name__)
+ sys.stderr.write("failed with %s: %s.\n" % (type(exc).__name__, exc))
excont.exval = 1
sys.exit(excont.exval)
@@ -559,7 +588,6 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):
def lstat(e):
return errno_wrap(os.lstat, [e], [ENOENT], [ESTALE, EBUSY])
-
def get_gfid_from_mnt(gfidpath):
return errno_wrap(Xattr.lgetxattr,
[gfidpath, 'glusterfs.gfid.string',
@@ -595,7 +623,7 @@ class ChangelogException(OSError):
def gf_event(event_type, **kwargs):
if EVENTS_ENABLED:
- from events.gf_event import gf_event as gfevent
+ from gfevents.gf_event import gf_event as gfevent
gfevent(event_type, **kwargs)
@@ -635,7 +663,8 @@ def unshare_propagation_supported():
unshare_mnt_propagation = False
p = subprocess.Popen(["unshare", "--help"],
stderr=subprocess.PIPE,
- stdout=subprocess.PIPE)
+ stdout=subprocess.PIPE,
+ universal_newlines=True)
out, err = p.communicate()
if p.returncode == 0:
if "propagation" in out:
@@ -652,7 +681,8 @@ def get_rsync_version(rsync_cmd):
rsync_version = "0"
p = subprocess.Popen([rsync_cmd, "--version"],
stderr=subprocess.PIPE,
- stdout=subprocess.PIPE)
+ stdout=subprocess.PIPE,
+ universal_newlines=True)
out, err = p.communicate()
if p.returncode == 0:
rsync_version = out.split(" ", 4)[3]
@@ -660,17 +690,65 @@ def get_rsync_version(rsync_cmd):
return rsync_version
+def get_slv_dir_path(slv_host, slv_volume, gfid):
+ global slv_bricks
+
+ dir_path = ENOENT
+ pfx = gauxpfx()
+
+ if not slv_bricks:
+ slv_info = Volinfo(slv_volume, slv_host, master=False)
+ slv_bricks = slv_info.bricks
+ # Result of readlink would be of format as below.
+ # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
+ for brick in slv_bricks:
+ dir_path = errno_wrap(os.path.join,
+ [brick['dir'],
+ ".glusterfs", gfid[0:2],
+ gfid[2:4],
+ gfid], [ENOENT], [ESTALE])
+ if dir_path != ENOENT:
+ try:
+ realpath = errno_wrap(os.readlink, [dir_path],
+ [ENOENT], [ESTALE])
+ if not isinstance(realpath, int):
+ realpath_parts = realpath.split('/')
+ pargfid = realpath_parts[-2]
+ basename = realpath_parts[-1]
+ dir_entry = os.path.join(pfx, pargfid, basename)
+ return dir_entry
+ except OSError:
+ # .gfid/GFID
+ gfidpath = unescape_space_newline(os.path.join(pfx, gfid))
+ realpath = errno_wrap(Xattr.lgetxattr_buf,
+ [gfidpath, 'glusterfs.gfid2path'], [ENOENT], [ESTALE])
+ if not isinstance(realpath, int):
+ basename = os.path.basename(realpath).rstrip('\x00')
+ dirpath = os.path.dirname(realpath)
+ if dirpath == "/":
+ pargfid = ROOT_GFID
+ else:
+ dirpath = dirpath.strip("/")
+ pargfid = get_gfid_from_mnt(dirpath)
+ if isinstance(pargfid, int):
+ return None
+ dir_entry = os.path.join(pfx, pargfid, basename)
+ return dir_entry
+
+ return None
+
+
def lf(event, **kwargs):
"""
Log Format helper function, log messages can be
easily modified to structured log format.
lf("Config Change", sync_jobs=4, brick=/bricks/b1) will be
- converted as "Config Change<TAB>brick=/bricks/b1<TAB>sync_jobs=4"
+ converted as "Config Change [{brick=/bricks/b1}, {sync_jobs=4}]"
"""
- msg = event
+ msgparts = []
for k, v in kwargs.items():
- msg += "\t{0}={1}".format(k, v)
- return msg
+ msgparts.append("{%s=%s}" % (k, v))
+ return "%s [%s]" % (event, ", ".join(msgparts))
class Popen(subprocess.Popen):
@@ -807,7 +885,7 @@ class Popen(subprocess.Popen):
break
b = os.read(self.stderr.fileno(), 1024)
if b:
- elines.append(b)
+ elines.append(b.decode())
else:
break
self.stderr.close()
@@ -816,12 +894,31 @@ class Popen(subprocess.Popen):
self.errfail()
+def host_brick_split(value):
+ """
+ IPv6 compatible way to split and get the host
+ and brick information. Example inputs:
+ node1.example.com:/exports/bricks/brick1/brick
+ fe80::af0f:df82:844f:ef66%utun0:/exports/bricks/brick1/brick
+ """
+ parts = value.split(":")
+ brick = parts[-1]
+ hostparts = parts[0:-1]
+ return (":".join(hostparts), brick)
+
+
class Volinfo(object):
- def __init__(self, vol, host='localhost', prelude=[]):
- po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
+ def __init__(self, vol, host='localhost', prelude=[], master=True):
+ if master:
+ gluster_cmd_dir = gconf.get("gluster-command-dir")
+ else:
+ gluster_cmd_dir = gconf.get("slave-gluster-command-dir")
+
+ gluster_cmd = os.path.join(gluster_cmd_dir, 'gluster')
+ po = Popen(prelude + [gluster_cmd, '--xml', '--remote-host=' + host,
'volume', 'info', vol],
- stdout=PIPE, stderr=PIPE)
+ stdout=PIPE, stderr=PIPE, universal_newlines=True)
vix = po.stdout.read()
po.wait()
po.terminate_geterr()
@@ -852,7 +949,7 @@ class Volinfo(object):
@memoize
def bricks(self):
def bparse(b):
- host, dirp = b.find("name").text.split(':', 2)
+ host, dirp = host_brick_split(b.find("name").text)
return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
return [bparse(b) for b in self.get('brick')]
@@ -884,6 +981,14 @@ class Volinfo(object):
else:
return int(self.get('disperseCount')[0].text)
+ def distribution_count(self, tier, hot):
+ if (tier and hot):
+ return int(self.get('hotBricks/hotdistCount')[0].text)
+ elif (tier and not hot):
+ return int(self.get('coldBricks/colddistCount')[0].text)
+ else:
+ return int(self.get('distCount')[0].text)
+
@property
@memoize
def hot_bricks(self):
@@ -900,7 +1005,7 @@ class VolinfoFromGconf(object):
# Glusterd will generate following config items before Geo-rep start
# So that Geo-rep need not run gluster commands from inside
# Volinfo object API/interface kept as is so that caller need not
- # change anything exept calling this instead of Volinfo()
+ # change anything except calling this instead of Volinfo()
#
# master-bricks=
# master-bricks=NODEID:HOSTNAME:PATH,..
@@ -920,6 +1025,16 @@ class VolinfoFromGconf(object):
def is_hot(self, brickpath):
return False
+ def is_uuid(self, value):
+ try:
+ uuid.UUID(value)
+ return True
+ except ValueError:
+ return False
+
+ def possible_path(self, value):
+ return "/" in value
+
@property
@memoize
def bricks(self):
@@ -933,8 +1048,22 @@ class VolinfoFromGconf(object):
out = []
for b in bricks_data:
parts = b.split(":")
- bpath = parts[2] if len(parts) == 3 else ""
- out.append({"host": parts[1], "dir": bpath, "uuid": parts[0]})
+ b_uuid = None
+ if self.is_uuid(parts[0]):
+ b_uuid = parts[0]
+ # Set all parts except first
+ parts = parts[1:]
+
+ if self.possible_path(parts[-1]):
+ bpath = parts[-1]
+ # Set all parts except last
+ parts = parts[0:-1]
+
+ out.append({
+ "host": ":".join(parts), # if remaining parts are IPv6 name
+ "dir": bpath,
+ "uuid": b_uuid
+ })
return out
@@ -952,6 +1081,9 @@ class VolinfoFromGconf(object):
def disperse_count(self, tier, hot):
return gconf.get("master-disperse-count")
+ def distribution_count(self, tier, hot):
+ return gconf.get("master-distribution-count")
+
@property
@memoize
def hot_bricks(self):