diff options
-rw-r--r-- | geo-replication/syncdaemon/Makefile.am | 2 | ||||
-rw-r--r-- | geo-replication/syncdaemon/changelogagent.py | 78 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 13 | ||||
-rw-r--r-- | geo-replication/syncdaemon/master.py | 34 | ||||
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 34 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 69 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 3 |
7 files changed, 162 insertions, 71 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index 83f969639cc..885963eae2b 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -2,6 +2,6 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \ resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ - $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py + $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py CLEANFILES = diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py new file mode 100644 index 00000000000..54d82cefcd2 --- /dev/null +++ b/geo-replication/syncdaemon/changelogagent.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import os +import logging +import syncdutils +from syncdutils import select, CHANGELOG_AGENT_SERVER_VERSION +from repce import RepceServer + + +class _MetaChangelog(object): + + def __getattr__(self, meth): + from libgfchangelog import Changes as LChanges + xmeth = [m for m in dir(LChanges) if m[0] != '_'] + if meth not in xmeth: + return + for m in xmeth: + setattr(self, m, getattr(LChanges, m)) + return getattr(self, meth) + +Changes = _MetaChangelog() + + +class Changelog(object): + def version(self): + return CHANGELOG_AGENT_SERVER_VERSION + + def register(self, cl_brick, cl_dir, cl_log, cl_level, retries=0): + return Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) + + def scan(self): + return Changes.cl_scan() + + def getchanges(self): + return Changes.cl_getchanges() + + def done(self, clfile): + return Changes.cl_done(clfile) + + def history(self, changelog_path, start, end, num_parallel): + return Changes.cl_history_changelog(changelog_path, start, end, + num_parallel) + + def history_scan(self): + return Changes.cl_history_scan() + + def history_getchanges(self): + return Changes.cl_history_getchanges() + + def history_done(self, clfile): + return Changes.cl_history_done(clfile) + + +class ChangelogAgent(object): + def __init__(self, obj, fd_tup): + (inf, ouf, rw, ww) = fd_tup.split(',') + os.close(int(rw)) + os.close(int(ww)) + repce = RepceServer(obj, int(inf), int(ouf), 1) + t = syncdutils.Thread(target=lambda: (repce.service_loop(), + syncdutils.finalize())) + t.start() + logging.info('Agent listining...') + + select((), (), ()) + + +def agent(obj, fd_tup): + return ChangelogAgent(obj, fd_tup) diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 426d964de95..7d463ad23f3 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -32,6 +32,7 @@ from syncdutils import GsyncdError, select, set_term_handler from configinterface import GConffile, upgrade_config_file import resource from monitor import monitor +from changelogagent import agent, Changelog class GLogger(Logger): @@ -175,6 +176,7 @@ def main_i(): - query/manipulate configuration - format gsyncd urls using gsyncd's url parsing engine - start service in following modes, in given stages: + - agent: startup(), ChangelogAgent() - monitor: startup(), monitor() - master: startup(), connect_remote(), connect(), service_loop() - slave: startup(), connect(), service_loop() @@ -275,12 +277,15 @@ def main_i(): # duh. need to specify dest or value will be mapped to None :S op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True)) + op.add_option('--agent', dest='agent', action='callback', + callback=store_local_curry(True)) op.add_option('--resource-local', dest='resource_local', type=str, action='callback', callback=store_local) op.add_option('--resource-remote', dest='resource_remote', type=str, action='callback', callback=store_local) op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local) + op.add_option('--rpc-fd', dest='rpc_fd', type=str, help=SUPPRESS_HELP) op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True)) op.add_option('-N', '--no-daemon', dest="go_daemon", @@ -586,6 +591,7 @@ def main_i(): go_daemon = rconf['go_daemon'] be_monitor = rconf.get('monitor') + be_agent = rconf.get('agent') rscs, local, remote = makersc(args) if not be_monitor and isinstance(remote, resource.SSH) and \ @@ -596,6 +602,8 @@ def main_i(): log_file = gconf.log_file if be_monitor: label = 'monitor' + elif be_agent: + label = 'agent' elif remote: # master label = gconf.local_path @@ -604,6 +612,11 @@ def main_i(): startup(go_daemon=go_daemon, log_file=log_file, label=label) resource.Popen.init_errhandler() + if be_agent: + os.setsid() + logging.debug('rpc_fd: %s' % repr(gconf.rpc_fd)) + return agent(Changelog(), gconf.rpc_fd) + if be_monitor: return monitor(*rscs) diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index b6a7c894814..1f1fa1122cb 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -1108,9 +1108,9 @@ class GMasterChangelogMixin(GMasterCommon): if isinstance(purge_time, int): purge_time = None - self.master.server.changelog_scan() + self.changelog_agent.scan() self.crawls += 1 - changes = self.master.server.changelog_getchanges() + changes = self.changelog_agent.getchanges() if changes: if purge_time: logging.info("slave's time: %s" % repr(purge_time)) @@ -1120,22 +1120,24 @@ class GMasterChangelogMixin(GMasterCommon): logging.info( 'skipping already processed change: %s...' % os.path.basename(pr)) - self.master.server.changelog_done(pr) + self.changelog_done_func(pr) changes.remove(pr) if changes: logging.debug('processing changes %s' % repr(changes)) self.process(changes) - def register(self): + def register(self, changelog_agent): + self.changelog_agent = changelog_agent self.sleep_interval = int(gconf.change_interval) - self.changelog_done_func = self.master.server.changelog_done + self.changelog_done_func = self.changelog_agent.done class GMasterChangeloghistoryMixin(GMasterChangelogMixin): - def register(self): + def register(self, changelog_agent): + self.changelog_agent = changelog_agent self.changelog_register_time = int(time.time()) - self.changelog_done_func = self.master.server.history_changelog_done + self.changelog_done_func = self.changelog_agent.history_done def crawl(self): self.update_worker_crawl_status("History Crawl") @@ -1157,21 +1159,21 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # location then consuming history will not work(Known issue as of now) changelog_path = os.path.join(gconf.local_path, ".glusterfs/changelogs") - ts = self.master.server.history_changelog(changelog_path, - purge_time[0], - self.changelog_register_time, - int(gconf.sync_jobs)) + ts = self.changelog_agent.history(changelog_path, + purge_time[0], + self.changelog_register_time, + int(gconf.sync_jobs)) # scan followed by getchanges till scan returns zero. - # history_changelog_scan() is blocking call, till it gets the number + # history_scan() is blocking call, till it gets the number # of changelogs to process. Returns zero when no changelogs # to be processed. returns positive value as number of changelogs # to be processed, which will be fetched using - # history_changelog_getchanges() - while self.master.server.history_changelog_scan() > 0: + # history_getchanges() + while self.changelog_agent.history_scan() > 0: self.crawls += 1 - changes = self.master.server.history_changelog_getchanges() + changes = self.changelog_agent.history_getchanges() if changes: if purge_time: logging.info("slave's time: %s" % repr(purge_time)) @@ -1208,7 +1210,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): XSYNC_MAX_ENTRIES = 1 << 13 - def register(self): + def register(self, changelog_agent=None): self.counter = 0 self.comlist = [] self.stimes = [] diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 8ed6f832618..e49a24ee5f5 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -108,7 +108,7 @@ class Monitor(object): # give a chance to graceful exit os.kill(-os.getpid(), signal.SIGTERM) - def monitor(self, w, argv, cpids): + def monitor(self, w, argv, cpids, agents): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -149,6 +149,23 @@ class Monitor(object): while ret in (0, 1): logging.info('-' * conn_timeout) logging.info('starting gsyncd worker') + + # Couple of pipe pairs for RPC communication b/w + # worker and changelog agent. + + # read/write end for agent + (ra, ww) = os.pipe() + # read/write end for worker + (rw, wa) = os.pipe() + + # spawn the agent process + apid = os.fork() + if apid == 0: + os.execv(sys.executable, argv + ['--local-path', w[0], + '--agent', + '--rpc-fd', + ','.join([str(ra), str(wa), + str(rw), str(ww)])]) pr, pw = os.pipe() cpid = os.fork() if cpid == 0: @@ -157,14 +174,26 @@ class Monitor(object): '--local-path', w[0], '--local-id', '.' + escape(w[0]), + '--rpc-fd', + ','.join([str(rw), str(ww), + str(ra), str(wa)]), '--resource-remote', w[1]]) self.lock.acquire() cpids.add(cpid) + agents.add(apid) self.lock.release() os.close(pw) + t0 = time.time() so = select((pr,), (), (), conn_timeout)[0] os.close(pr) + + # close all RPC pipes in monitor + os.close(ra) + os.close(wa) + os.close(rw) + os.close(ww) + if so: ret = nwait(cpid, os.WNOHANG) if ret is not None: @@ -206,10 +235,11 @@ class Monitor(object): argv.insert(0, os.path.basename(sys.executable)) cpids = set() + agents = set() ta = [] for wx in wspx: def wmon(w): - cpid, _ = self.monitor(w, argv, cpids) + cpid, _ = self.monitor(w, argv, cpids, agents) time.sleep(1) self.lock.acquire() for cpid in cpids: diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 185722f5df0..79dc9e79e9d 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -35,6 +35,8 @@ from syncdutils import GsyncdError, select, privileged, boolify, funcode from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable from syncdutils import ChangelogException +from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION + UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) @@ -127,19 +129,7 @@ class _MetaXattr(object): return getattr(self, meth) -class _MetaChangelog(object): - - def __getattr__(self, meth): - from libgfchangelog import Changes as LChanges - xmeth = [m for m in dir(LChanges) if m[0] != '_'] - if not meth in xmeth: - return - for m in xmeth: - setattr(self, m, getattr(LChanges, m)) - return getattr(self, meth) - Xattr = _MetaXattr() -Changes = _MetaChangelog() class Popen(subprocess.Popen): @@ -669,39 +659,6 @@ class Server(object): errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL]) @classmethod - def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries=0): - Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) - - @classmethod - def changelog_scan(cls): - Changes.cl_scan() - - @classmethod - def changelog_getchanges(cls): - return Changes.cl_getchanges() - - @classmethod - def changelog_done(cls, clfile): - Changes.cl_done(clfile) - - @classmethod - def history_changelog(cls, changelog_path, start, end, num_parallel): - return Changes.cl_history_changelog(changelog_path, start, end, - num_parallel) - - @classmethod - def history_changelog_scan(cls): - return Changes.cl_history_scan() - - @classmethod - def history_changelog_getchanges(cls): - return Changes.cl_history_getchanges() - - @classmethod - def history_changelog_done(cls, clfile): - Changes.cl_history_done(clfile) - - @classmethod @_pathguard def setattr(cls, path, adct): """set file attributes @@ -932,9 +889,6 @@ class AbstractUrl(object): return self.get_url() - ### Concrete resource classes ### - - class FILE(AbstractUrl, SlaveLocal, SlaveRemote): """scheme class for file:// urls @@ -1311,16 +1265,27 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): # register the crawlers and start crawling # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) # g3 ==> changelog History + (inf, ouf, ra, wa) = gconf.rpc_fd.split(',') + os.close(int(ra)) + os.close(int(wa)) + changelog_agent = RepceClient(int(inf), int(ouf)) + rv = changelog_agent.version() + if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: + raise GsyncdError( + "RePCe major version mismatch(changelog agent): " + "local %s, remote %s" % + (CHANGELOG_AGENT_CLIENT_VERSION, rv)) + g1.register() try: (workdir, logfile) = g2.setup_working_dir() # register with the changelog library # 9 == log level (DEBUG) # 5 == connection retries - brickserver.changelog_register(gconf.local_path, - workdir, logfile, 9, 5) - g2.register() - g3.register() + changelog_agent.register(gconf.local_path, + workdir, logfile, 9, 5) + g2.register(changelog_agent) + g3.register(changelog_agent) except ChangelogException as e: logging.debug("Changelog register failed: %s - %s" % (e.errno, e.strerror)) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 65daeb0fe7c..9eda6044472 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -47,6 +47,9 @@ except ImportError: _CL_AUX_GFID_PFX = ".gfid/" GF_OP_RETRIES = 20 +CHANGELOG_AGENT_SERVER_VERSION = 1.0 +CHANGELOG_AGENT_CLIENT_VERSION = 1.0 + def escape(s): """the chosen flavor of string escaping, used all over |