diff options
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 54 |
1 files changed, 51 insertions, 3 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index ed0e7efe2b2..91ca1916f6a 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -31,11 +31,10 @@ import shutil from gconf import gconf import repce from repce import RepceServer, RepceClient -from master import gmaster_builder import syncdutils from syncdutils import GsyncdError, select, privileged, boolify, funcode from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat -from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable +from syncdutils import NoStimeAvailable, PartialHistoryAvailable from syncdutils import ChangelogException, ChangelogHistoryNotAvailable from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION from gsyncdstatus import GeorepStatus @@ -522,6 +521,29 @@ class Server(object): raise @classmethod + @_pathguard + def entry_stime(cls, path, uuid): + """ + entry_stime xattr to reduce the number of retry of Entry changes when + Geo-rep worker crashes and restarts. entry_stime is updated after + processing every changelog file. On failure and restart, worker only + have to reprocess the last changelog for Entry ops. + Xattr Key: <PFX>.<MASTERVOL_UUID>.<SLAVEVOL_UUID>.entry_stime + """ + try: + val = Xattr.lgetxattr(path, + '.'.join([cls.GX_NSPACE, uuid, + 'entry_stime']), + 8) + return struct.unpack('!II', val) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOENT, ENODATA, ENOTDIR): + return ex.errno + else: + raise + + @classmethod def node_uuid(cls, path='.'): try: uuid_l = Xattr.lgetxattr_buf( @@ -542,6 +564,16 @@ class Server(object): @classmethod @_pathguard + def set_entry_stime(cls, path, uuid, mark): + """set @mark as stime for @uuid on @path""" + errno_wrap(Xattr.lsetxattr, + [path, + '.'.join([cls.GX_NSPACE, uuid, 'entry_stime']), + struct.pack('!II', *mark)], + [ENOENT]) + + @classmethod + @_pathguard def set_xtime(cls, path, uuid, mark): """set @mark as xtime for @uuid on @path""" errno_wrap(Xattr.lsetxattr, @@ -1376,6 +1408,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def gmaster_instantiate_tuple(self, slave): """return a tuple of the 'one shot' and the 'main crawl' class instance""" + from master import gmaster_builder return (gmaster_builder('xsync')(self, slave), gmaster_builder()(self, slave), gmaster_builder('changeloghistory')(self, slave)) @@ -1436,6 +1469,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): uuid + '.' + gconf.slave_id) ), slave.server) + slave.server.entry_stime = types.MethodType( + lambda _self, path, uuid: ( + brickserver.entry_stime( + path, + uuid + '.' + gconf.slave_id) + ), + slave.server) slave.server.set_stime = types.MethodType( lambda _self, path, uuid, mark: ( brickserver.set_stime(path, @@ -1443,6 +1483,14 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): mark) ), slave.server) + slave.server.set_entry_stime = types.MethodType( + lambda _self, path, uuid, mark: ( + brickserver.set_entry_stime( + path, + uuid + '.' + gconf.slave_id, + mark) + ), + slave.server) (g1, g2, g3) = self.gmaster_instantiate_tuple(slave) g1.master.server = brickserver g2.master.server = brickserver @@ -1506,7 +1554,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): except ChangelogHistoryNotAvailable: logging.info('Changelog history not available, using xsync') g1.crawlwrap(oneshot=True, register_time=register_time) - except NoPurgeTimeAvailable: + except NoStimeAvailable: logging.info('No stime available, using xsync crawl') g1.crawlwrap(oneshot=True, register_time=register_time) except ChangelogException as e: |