summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2014-04-29 12:14:24 +0530
committerVenky Shankar <vshankar@redhat.com>2014-05-09 00:27:40 -0700
commitc7b0396f680863528248e6f5a162de47184b6c88 (patch)
treeead2f295d041df1e258db4bf09fe10944b15f4d7
parent65757e0f57f93103d87fdf9534c5ca25b66d14b7 (diff)
geo-rep: Pause and Resume feature for geo-replication
Changelog consumption/processing now happens in seperate process group than monitor. When monitor process group gets SIGSTOP all worker process, ssh, rsync will be paused except the changelog processing. When it gets SIGCONT it resumes its operation. Changelog agent runs as RepceServer, geo-rep worker communicates with changelog agent using RepceClient. Change-Id: I35c333e4d8b13d03a7808aed601960eef23cfa04 BUG: 1093602 Signed-off-by: Venky Shankar <vshankar@redhat.com> Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/7322
-rw-r--r--geo-replication/syncdaemon/Makefile.am2
-rw-r--r--geo-replication/syncdaemon/changelogagent.py78
-rw-r--r--geo-replication/syncdaemon/gsyncd.py13
-rw-r--r--geo-replication/syncdaemon/master.py34
-rw-r--r--geo-replication/syncdaemon/monitor.py34
-rw-r--r--geo-replication/syncdaemon/resource.py69
-rw-r--r--geo-replication/syncdaemon/syncdutils.py3
7 files changed, 162 insertions, 71 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am
index 83f969639cc..885963eae2b 100644
--- a/geo-replication/syncdaemon/Makefile.am
+++ b/geo-replication/syncdaemon/Makefile.am
@@ -2,6 +2,6 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon
syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \
resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \
- $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py
+ $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py
CLEANFILES =
diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py
new file mode 100644
index 00000000000..54d82cefcd2
--- /dev/null
+++ b/geo-replication/syncdaemon/changelogagent.py
@@ -0,0 +1,78 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import os
+import logging
+import syncdutils
+from syncdutils import select, CHANGELOG_AGENT_SERVER_VERSION
+from repce import RepceServer
+
+
+class _MetaChangelog(object):
+
+ def __getattr__(self, meth):
+ from libgfchangelog import Changes as LChanges
+ xmeth = [m for m in dir(LChanges) if m[0] != '_']
+ if meth not in xmeth:
+ return
+ for m in xmeth:
+ setattr(self, m, getattr(LChanges, m))
+ return getattr(self, meth)
+
+Changes = _MetaChangelog()
+
+
+class Changelog(object):
+ def version(self):
+ return CHANGELOG_AGENT_SERVER_VERSION
+
+ def register(self, cl_brick, cl_dir, cl_log, cl_level, retries=0):
+ return Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)
+
+ def scan(self):
+ return Changes.cl_scan()
+
+ def getchanges(self):
+ return Changes.cl_getchanges()
+
+ def done(self, clfile):
+ return Changes.cl_done(clfile)
+
+ def history(self, changelog_path, start, end, num_parallel):
+ return Changes.cl_history_changelog(changelog_path, start, end,
+ num_parallel)
+
+ def history_scan(self):
+ return Changes.cl_history_scan()
+
+ def history_getchanges(self):
+ return Changes.cl_history_getchanges()
+
+ def history_done(self, clfile):
+ return Changes.cl_history_done(clfile)
+
+
+class ChangelogAgent(object):
+ def __init__(self, obj, fd_tup):
+ (inf, ouf, rw, ww) = fd_tup.split(',')
+ os.close(int(rw))
+ os.close(int(ww))
+ repce = RepceServer(obj, int(inf), int(ouf), 1)
+ t = syncdutils.Thread(target=lambda: (repce.service_loop(),
+ syncdutils.finalize()))
+ t.start()
+ logging.info('Agent listining...')
+
+ select((), (), ())
+
+
+def agent(obj, fd_tup):
+ return ChangelogAgent(obj, fd_tup)
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index 426d964de95..7d463ad23f3 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -32,6 +32,7 @@ from syncdutils import GsyncdError, select, set_term_handler
from configinterface import GConffile, upgrade_config_file
import resource
from monitor import monitor
+from changelogagent import agent, Changelog
class GLogger(Logger):
@@ -175,6 +176,7 @@ def main_i():
- query/manipulate configuration
- format gsyncd urls using gsyncd's url parsing engine
- start service in following modes, in given stages:
+ - agent: startup(), ChangelogAgent()
- monitor: startup(), monitor()
- master: startup(), connect_remote(), connect(), service_loop()
- slave: startup(), connect(), service_loop()
@@ -275,12 +277,15 @@ def main_i():
# duh. need to specify dest or value will be mapped to None :S
op.add_option('--monitor', dest='monitor', action='callback',
callback=store_local_curry(True))
+ op.add_option('--agent', dest='agent', action='callback',
+ callback=store_local_curry(True))
op.add_option('--resource-local', dest='resource_local',
type=str, action='callback', callback=store_local)
op.add_option('--resource-remote', dest='resource_remote',
type=str, action='callback', callback=store_local)
op.add_option('--feedback-fd', dest='feedback_fd', type=int,
help=SUPPRESS_HELP, action='callback', callback=store_local)
+ op.add_option('--rpc-fd', dest='rpc_fd', type=str, help=SUPPRESS_HELP)
op.add_option('--listen', dest='listen', help=SUPPRESS_HELP,
action='callback', callback=store_local_curry(True))
op.add_option('-N', '--no-daemon', dest="go_daemon",
@@ -586,6 +591,7 @@ def main_i():
go_daemon = rconf['go_daemon']
be_monitor = rconf.get('monitor')
+ be_agent = rconf.get('agent')
rscs, local, remote = makersc(args)
if not be_monitor and isinstance(remote, resource.SSH) and \
@@ -596,6 +602,8 @@ def main_i():
log_file = gconf.log_file
if be_monitor:
label = 'monitor'
+ elif be_agent:
+ label = 'agent'
elif remote:
# master
label = gconf.local_path
@@ -604,6 +612,11 @@ def main_i():
startup(go_daemon=go_daemon, log_file=log_file, label=label)
resource.Popen.init_errhandler()
+ if be_agent:
+ os.setsid()
+ logging.debug('rpc_fd: %s' % repr(gconf.rpc_fd))
+ return agent(Changelog(), gconf.rpc_fd)
+
if be_monitor:
return monitor(*rscs)
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index b6a7c894814..1f1fa1122cb 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -1108,9 +1108,9 @@ class GMasterChangelogMixin(GMasterCommon):
if isinstance(purge_time, int):
purge_time = None
- self.master.server.changelog_scan()
+ self.changelog_agent.scan()
self.crawls += 1
- changes = self.master.server.changelog_getchanges()
+ changes = self.changelog_agent.getchanges()
if changes:
if purge_time:
logging.info("slave's time: %s" % repr(purge_time))
@@ -1120,22 +1120,24 @@ class GMasterChangelogMixin(GMasterCommon):
logging.info(
'skipping already processed change: %s...' %
os.path.basename(pr))
- self.master.server.changelog_done(pr)
+ self.changelog_done_func(pr)
changes.remove(pr)
if changes:
logging.debug('processing changes %s' % repr(changes))
self.process(changes)
- def register(self):
+ def register(self, changelog_agent):
+ self.changelog_agent = changelog_agent
self.sleep_interval = int(gconf.change_interval)
- self.changelog_done_func = self.master.server.changelog_done
+ self.changelog_done_func = self.changelog_agent.done
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
- def register(self):
+ def register(self, changelog_agent):
+ self.changelog_agent = changelog_agent
self.changelog_register_time = int(time.time())
- self.changelog_done_func = self.master.server.history_changelog_done
+ self.changelog_done_func = self.changelog_agent.history_done
def crawl(self):
self.update_worker_crawl_status("History Crawl")
@@ -1157,21 +1159,21 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# location then consuming history will not work(Known issue as of now)
changelog_path = os.path.join(gconf.local_path,
".glusterfs/changelogs")
- ts = self.master.server.history_changelog(changelog_path,
- purge_time[0],
- self.changelog_register_time,
- int(gconf.sync_jobs))
+ ts = self.changelog_agent.history(changelog_path,
+ purge_time[0],
+ self.changelog_register_time,
+ int(gconf.sync_jobs))
# scan followed by getchanges till scan returns zero.
- # history_changelog_scan() is blocking call, till it gets the number
+ # history_scan() is blocking call, till it gets the number
# of changelogs to process. Returns zero when no changelogs
# to be processed. returns positive value as number of changelogs
# to be processed, which will be fetched using
- # history_changelog_getchanges()
- while self.master.server.history_changelog_scan() > 0:
+ # history_getchanges()
+ while self.changelog_agent.history_scan() > 0:
self.crawls += 1
- changes = self.master.server.history_changelog_getchanges()
+ changes = self.changelog_agent.history_getchanges()
if changes:
if purge_time:
logging.info("slave's time: %s" % repr(purge_time))
@@ -1208,7 +1210,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self):
+ def register(self, changelog_agent=None):
self.counter = 0
self.comlist = []
self.stimes = []
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index 8ed6f832618..e49a24ee5f5 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -108,7 +108,7 @@ class Monitor(object):
# give a chance to graceful exit
os.kill(-os.getpid(), signal.SIGTERM)
- def monitor(self, w, argv, cpids):
+ def monitor(self, w, argv, cpids, agents):
"""the monitor loop
Basic logic is a blantantly simple blunt heuristics:
@@ -149,6 +149,23 @@ class Monitor(object):
while ret in (0, 1):
logging.info('-' * conn_timeout)
logging.info('starting gsyncd worker')
+
+ # Couple of pipe pairs for RPC communication b/w
+ # worker and changelog agent.
+
+ # read/write end for agent
+ (ra, ww) = os.pipe()
+ # read/write end for worker
+ (rw, wa) = os.pipe()
+
+ # spawn the agent process
+ apid = os.fork()
+ if apid == 0:
+ os.execv(sys.executable, argv + ['--local-path', w[0],
+ '--agent',
+ '--rpc-fd',
+ ','.join([str(ra), str(wa),
+ str(rw), str(ww)])])
pr, pw = os.pipe()
cpid = os.fork()
if cpid == 0:
@@ -157,14 +174,26 @@ class Monitor(object):
'--local-path', w[0],
'--local-id',
'.' + escape(w[0]),
+ '--rpc-fd',
+ ','.join([str(rw), str(ww),
+ str(ra), str(wa)]),
'--resource-remote', w[1]])
self.lock.acquire()
cpids.add(cpid)
+ agents.add(apid)
self.lock.release()
os.close(pw)
+
t0 = time.time()
so = select((pr,), (), (), conn_timeout)[0]
os.close(pr)
+
+ # close all RPC pipes in monitor
+ os.close(ra)
+ os.close(wa)
+ os.close(rw)
+ os.close(ww)
+
if so:
ret = nwait(cpid, os.WNOHANG)
if ret is not None:
@@ -206,10 +235,11 @@ class Monitor(object):
argv.insert(0, os.path.basename(sys.executable))
cpids = set()
+ agents = set()
ta = []
for wx in wspx:
def wmon(w):
- cpid, _ = self.monitor(w, argv, cpids)
+ cpid, _ = self.monitor(w, argv, cpids, agents)
time.sleep(1)
self.lock.acquire()
for cpid in cpids:
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 185722f5df0..79dc9e79e9d 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -35,6 +35,8 @@ from syncdutils import GsyncdError, select, privileged, boolify, funcode
from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
from syncdutils import ChangelogException
+from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
+
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
@@ -127,19 +129,7 @@ class _MetaXattr(object):
return getattr(self, meth)
-class _MetaChangelog(object):
-
- def __getattr__(self, meth):
- from libgfchangelog import Changes as LChanges
- xmeth = [m for m in dir(LChanges) if m[0] != '_']
- if not meth in xmeth:
- return
- for m in xmeth:
- setattr(self, m, getattr(LChanges, m))
- return getattr(self, meth)
-
Xattr = _MetaXattr()
-Changes = _MetaChangelog()
class Popen(subprocess.Popen):
@@ -669,39 +659,6 @@ class Server(object):
errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL])
@classmethod
- def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries=0):
- Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)
-
- @classmethod
- def changelog_scan(cls):
- Changes.cl_scan()
-
- @classmethod
- def changelog_getchanges(cls):
- return Changes.cl_getchanges()
-
- @classmethod
- def changelog_done(cls, clfile):
- Changes.cl_done(clfile)
-
- @classmethod
- def history_changelog(cls, changelog_path, start, end, num_parallel):
- return Changes.cl_history_changelog(changelog_path, start, end,
- num_parallel)
-
- @classmethod
- def history_changelog_scan(cls):
- return Changes.cl_history_scan()
-
- @classmethod
- def history_changelog_getchanges(cls):
- return Changes.cl_history_getchanges()
-
- @classmethod
- def history_changelog_done(cls, clfile):
- Changes.cl_history_done(clfile)
-
- @classmethod
@_pathguard
def setattr(cls, path, adct):
"""set file attributes
@@ -932,9 +889,6 @@ class AbstractUrl(object):
return self.get_url()
- ### Concrete resource classes ###
-
-
class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
"""scheme class for file:// urls
@@ -1311,16 +1265,27 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
# register the crawlers and start crawling
# g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
# g3 ==> changelog History
+ (inf, ouf, ra, wa) = gconf.rpc_fd.split(',')
+ os.close(int(ra))
+ os.close(int(wa))
+ changelog_agent = RepceClient(int(inf), int(ouf))
+ rv = changelog_agent.version()
+ if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:
+ raise GsyncdError(
+ "RePCe major version mismatch(changelog agent): "
+ "local %s, remote %s" %
+ (CHANGELOG_AGENT_CLIENT_VERSION, rv))
+
g1.register()
try:
(workdir, logfile) = g2.setup_working_dir()
# register with the changelog library
# 9 == log level (DEBUG)
# 5 == connection retries
- brickserver.changelog_register(gconf.local_path,
- workdir, logfile, 9, 5)
- g2.register()
- g3.register()
+ changelog_agent.register(gconf.local_path,
+ workdir, logfile, 9, 5)
+ g2.register(changelog_agent)
+ g3.register(changelog_agent)
except ChangelogException as e:
logging.debug("Changelog register failed: %s - %s" %
(e.errno, e.strerror))
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 65daeb0fe7c..9eda6044472 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -47,6 +47,9 @@ except ImportError:
_CL_AUX_GFID_PFX = ".gfid/"
GF_OP_RETRIES = 20
+CHANGELOG_AGENT_SERVER_VERSION = 1.0
+CHANGELOG_AGENT_CLIENT_VERSION = 1.0
+
def escape(s):
"""the chosen flavor of string escaping, used all over