summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r--geo-replication/syncdaemon/resource.py54
1 files changed, 51 insertions, 3 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index ed0e7efe2b2..91ca1916f6a 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -31,11 +31,10 @@ import shutil
from gconf import gconf
import repce
from repce import RepceServer, RepceClient
-from master import gmaster_builder
import syncdutils
from syncdutils import GsyncdError, select, privileged, boolify, funcode
from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
-from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
+from syncdutils import NoStimeAvailable, PartialHistoryAvailable
from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
from gsyncdstatus import GeorepStatus
@@ -522,6 +521,29 @@ class Server(object):
raise
@classmethod
+ @_pathguard
+ def entry_stime(cls, path, uuid):
+ """
+ entry_stime xattr to reduce the number of retry of Entry changes when
+ Geo-rep worker crashes and restarts. entry_stime is updated after
+ processing every changelog file. On failure and restart, worker only
+ have to reprocess the last changelog for Entry ops.
+ Xattr Key: <PFX>.<MASTERVOL_UUID>.<SLAVEVOL_UUID>.entry_stime
+ """
+ try:
+ val = Xattr.lgetxattr(path,
+ '.'.join([cls.GX_NSPACE, uuid,
+ 'entry_stime']),
+ 8)
+ return struct.unpack('!II', val)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOENT, ENODATA, ENOTDIR):
+ return ex.errno
+ else:
+ raise
+
+ @classmethod
def node_uuid(cls, path='.'):
try:
uuid_l = Xattr.lgetxattr_buf(
@@ -542,6 +564,16 @@ class Server(object):
@classmethod
@_pathguard
+ def set_entry_stime(cls, path, uuid, mark):
+ """set @mark as stime for @uuid on @path"""
+ errno_wrap(Xattr.lsetxattr,
+ [path,
+ '.'.join([cls.GX_NSPACE, uuid, 'entry_stime']),
+ struct.pack('!II', *mark)],
+ [ENOENT])
+
+ @classmethod
+ @_pathguard
def set_xtime(cls, path, uuid, mark):
"""set @mark as xtime for @uuid on @path"""
errno_wrap(Xattr.lsetxattr,
@@ -1376,6 +1408,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def gmaster_instantiate_tuple(self, slave):
"""return a tuple of the 'one shot' and the 'main crawl'
class instance"""
+ from master import gmaster_builder
return (gmaster_builder('xsync')(self, slave),
gmaster_builder()(self, slave),
gmaster_builder('changeloghistory')(self, slave))
@@ -1436,6 +1469,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
uuid + '.' + gconf.slave_id)
),
slave.server)
+ slave.server.entry_stime = types.MethodType(
+ lambda _self, path, uuid: (
+ brickserver.entry_stime(
+ path,
+ uuid + '.' + gconf.slave_id)
+ ),
+ slave.server)
slave.server.set_stime = types.MethodType(
lambda _self, path, uuid, mark: (
brickserver.set_stime(path,
@@ -1443,6 +1483,14 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
mark)
),
slave.server)
+ slave.server.set_entry_stime = types.MethodType(
+ lambda _self, path, uuid, mark: (
+ brickserver.set_entry_stime(
+ path,
+ uuid + '.' + gconf.slave_id,
+ mark)
+ ),
+ slave.server)
(g1, g2, g3) = self.gmaster_instantiate_tuple(slave)
g1.master.server = brickserver
g2.master.server = brickserver
@@ -1506,7 +1554,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
except ChangelogHistoryNotAvailable:
logging.info('Changelog history not available, using xsync')
g1.crawlwrap(oneshot=True, register_time=register_time)
- except NoPurgeTimeAvailable:
+ except NoStimeAvailable:
logging.info('No stime available, using xsync crawl')
g1.crawlwrap(oneshot=True, register_time=register_time)
except ChangelogException as e: