diff options
author | Aravinda VK <avishwan@redhat.com> | 2014-04-29 12:14:24 +0530 |
---|---|---|
committer | Venky Shankar <vshankar@redhat.com> | 2014-05-09 00:27:40 -0700 |
commit | c7b0396f680863528248e6f5a162de47184b6c88 (patch) | |
tree | ead2f295d041df1e258db4bf09fe10944b15f4d7 /geo-replication/syncdaemon/resource.py | |
parent | 65757e0f57f93103d87fdf9534c5ca25b66d14b7 (diff) |
geo-rep: Pause and Resume feature for geo-replication
Changelog consumption/processing now happens in seperate process
group than monitor. When monitor process group gets SIGSTOP all
worker process, ssh, rsync will be paused except the changelog
processing. When it gets SIGCONT it resumes its operation.
Changelog agent runs as RepceServer, geo-rep worker communicates
with changelog agent using RepceClient.
Change-Id: I35c333e4d8b13d03a7808aed601960eef23cfa04
BUG: 1093602
Signed-off-by: Venky Shankar <vshankar@redhat.com>
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/7322
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 69 |
1 files changed, 17 insertions, 52 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 185722f5df0..79dc9e79e9d 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -35,6 +35,8 @@ from syncdutils import GsyncdError, select, privileged, boolify, funcode from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable from syncdutils import ChangelogException +from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION + UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) @@ -127,19 +129,7 @@ class _MetaXattr(object): return getattr(self, meth) -class _MetaChangelog(object): - - def __getattr__(self, meth): - from libgfchangelog import Changes as LChanges - xmeth = [m for m in dir(LChanges) if m[0] != '_'] - if not meth in xmeth: - return - for m in xmeth: - setattr(self, m, getattr(LChanges, m)) - return getattr(self, meth) - Xattr = _MetaXattr() -Changes = _MetaChangelog() class Popen(subprocess.Popen): @@ -669,39 +659,6 @@ class Server(object): errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL]) @classmethod - def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries=0): - Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) - - @classmethod - def changelog_scan(cls): - Changes.cl_scan() - - @classmethod - def changelog_getchanges(cls): - return Changes.cl_getchanges() - - @classmethod - def changelog_done(cls, clfile): - Changes.cl_done(clfile) - - @classmethod - def history_changelog(cls, changelog_path, start, end, num_parallel): - return Changes.cl_history_changelog(changelog_path, start, end, - num_parallel) - - @classmethod - def history_changelog_scan(cls): - return Changes.cl_history_scan() - - @classmethod - def history_changelog_getchanges(cls): - return Changes.cl_history_getchanges() - - @classmethod - def history_changelog_done(cls, clfile): - Changes.cl_history_done(clfile) - - @classmethod @_pathguard def setattr(cls, path, adct): """set file attributes @@ -932,9 +889,6 @@ class AbstractUrl(object): return self.get_url() - ### Concrete resource classes ### - - class FILE(AbstractUrl, SlaveLocal, SlaveRemote): """scheme class for file:// urls @@ -1311,16 +1265,27 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): # register the crawlers and start crawling # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) # g3 ==> changelog History + (inf, ouf, ra, wa) = gconf.rpc_fd.split(',') + os.close(int(ra)) + os.close(int(wa)) + changelog_agent = RepceClient(int(inf), int(ouf)) + rv = changelog_agent.version() + if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: + raise GsyncdError( + "RePCe major version mismatch(changelog agent): " + "local %s, remote %s" % + (CHANGELOG_AGENT_CLIENT_VERSION, rv)) + g1.register() try: (workdir, logfile) = g2.setup_working_dir() # register with the changelog library # 9 == log level (DEBUG) # 5 == connection retries - brickserver.changelog_register(gconf.local_path, - workdir, logfile, 9, 5) - g2.register() - g3.register() + changelog_agent.register(gconf.local_path, + workdir, logfile, 9, 5) + g2.register(changelog_agent) + g3.register(changelog_agent) except ChangelogException as e: logging.debug("Changelog register failed: %s - %s" % (e.errno, e.strerror)) |