summaryrefslogtreecommitdiffstats
path: root/geo-replication
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2019-10-23 10:10:12 +0530
committerAmar Tumballi <amarts@gmail.com>2019-11-07 06:24:39 +0000
commit0fc68040b72fc94dec3874345547e294b9ec1f45 (patch)
treeea053550c79c3903804a7e3c3812551a41e17b01 /geo-replication
parent0ab6c178468b6cce095c54ab62cfa51162d01fcc (diff)
georep: Merge Worker and Agent as a single process
- libgfchangelog is simplified by removing unnecessary API Class - Merged Agent logic into Worker instead of running Worker and Agent as two separate processes and maintaining RPC between Worker and Agent. - Geo-rep command Pause and Resume will continue without any changes. But Agent functionality also gets paused with that. Updates: #755 Change-Id: Ie2c00fa7dddf21f180f0649e0aaf084d29023c98 Signed-off-by: Aravinda VK <avishwan@redhat.com>
Diffstat (limited to 'geo-replication')
-rw-r--r--geo-replication/syncdaemon/Makefile.am2
-rw-r--r--geo-replication/syncdaemon/changelogagent.py78
-rw-r--r--geo-replication/syncdaemon/gsyncd.py19
-rw-r--r--geo-replication/syncdaemon/libgfchangelog.py247
-rw-r--r--geo-replication/syncdaemon/master.py23
-rw-r--r--geo-replication/syncdaemon/monitor.py83
-rw-r--r--geo-replication/syncdaemon/py2py3.py46
-rw-r--r--geo-replication/syncdaemon/resource.py28
-rw-r--r--geo-replication/syncdaemon/subcmds.py11
-rw-r--r--geo-replication/syncdaemon/syncdutils.py2
10 files changed, 176 insertions, 363 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am
index 62c5ce7fe30..d70e3368faf 100644
--- a/geo-replication/syncdaemon/Makefile.am
+++ b/geo-replication/syncdaemon/Makefile.am
@@ -2,7 +2,7 @@ syncdaemondir = $(GLUSTERFS_LIBEXECDIR)/python/syncdaemon
syncdaemon_PYTHON = rconf.py gsyncd.py __init__.py master.py README.md repce.py \
resource.py syncdutils.py monitor.py libcxattr.py gsyncdconfig.py \
- libgfchangelog.py changelogagent.py gsyncdstatus.py conf.py logutils.py \
+ libgfchangelog.py gsyncdstatus.py conf.py logutils.py \
subcmds.py argsupgrade.py py2py3.py
CLEANFILES =
diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py
deleted file mode 100644
index c5fdbc3a74f..00000000000
--- a/geo-replication/syncdaemon/changelogagent.py
+++ /dev/null
@@ -1,78 +0,0 @@
-#!/usr/bin/python3
-#
-# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
-# This file is part of GlusterFS.
-
-# This file is licensed to you under your choice of the GNU Lesser
-# General Public License, version 3 or any later version (LGPLv3 or
-# later), or the GNU General Public License, version 2 (GPLv2), in all
-# cases as published by the Free Software Foundation.
-#
-
-import logging
-import syncdutils
-from syncdutils import select, CHANGELOG_AGENT_SERVER_VERSION
-from repce import RepceServer
-
-
-class _MetaChangelog(object):
-
- def __getattr__(self, meth):
- from libgfchangelog import Changes as LChanges
- xmeth = [m for m in dir(LChanges) if m[0] != '_']
- if meth not in xmeth:
- return
- for m in xmeth:
- setattr(self, m, getattr(LChanges, m))
- return getattr(self, meth)
-
-Changes = _MetaChangelog()
-
-
-class Changelog(object):
- def version(self):
- return CHANGELOG_AGENT_SERVER_VERSION
-
- def init(self):
- return Changes.cl_init()
-
- def register(self, cl_brick, cl_dir, cl_log, cl_level, retries=0):
- return Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)
-
- def scan(self):
- return Changes.cl_scan()
-
- def getchanges(self):
- return Changes.cl_getchanges()
-
- def done(self, clfile):
- return Changes.cl_done(clfile)
-
- def history(self, changelog_path, start, end, num_parallel):
- return Changes.cl_history_changelog(changelog_path, start, end,
- num_parallel)
-
- def history_scan(self):
- return Changes.cl_history_scan()
-
- def history_getchanges(self):
- return Changes.cl_history_getchanges()
-
- def history_done(self, clfile):
- return Changes.cl_history_done(clfile)
-
-
-class ChangelogAgent(object):
- def __init__(self, obj, fd_tup):
- (inf, ouf, rw, ww) = fd_tup.split(',')
- repce = RepceServer(obj, int(inf), int(ouf), 1)
- t = syncdutils.Thread(target=lambda: (repce.service_loop(),
- syncdutils.finalize()))
- t.start()
- logging.info('Agent listining...')
-
- select((), (), ())
-
-
-def agent(obj, fd_tup):
- return ChangelogAgent(obj, fd_tup)
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index 8940384616a..95b26c5f3fb 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -79,8 +79,6 @@ def main():
help="feedback fd between monitor and worker")
p.add_argument("--local-node", help="Local master node")
p.add_argument("--local-node-id", help="Local Node ID")
- p.add_argument("--rpc-fd",
- help="Read and Write fds for worker-agent communication")
p.add_argument("--subvol-num", type=int, help="Subvolume number")
p.add_argument("--is-hottier", action="store_true",
help="Is this brick part of hot tier")
@@ -92,19 +90,6 @@ def main():
p.add_argument("-c", "--config-file", help="Config File")
p.add_argument("--debug", action="store_true")
- # Agent
- p = sp.add_parser("agent")
- p.add_argument("master", help="Master Volume Name")
- p.add_argument("slave", help="Slave details user@host::vol format")
- p.add_argument("--local-path", help="Local brick path")
- p.add_argument("--local-node", help="Local master node")
- p.add_argument("--local-node-id", help="Local Node ID")
- p.add_argument("--slave-id", help="Slave Volume ID")
- p.add_argument("--rpc-fd",
- help="Read and Write fds for worker-agent communication")
- p.add_argument("-c", "--config-file", help="Config File")
- p.add_argument("--debug", action="store_true")
-
# Slave
p = sp.add_parser("slave")
p.add_argument("master", help="Master Volume Name")
@@ -271,8 +256,8 @@ def main():
# Default label to print in log file
label = args.subcmd
- if args.subcmd in ("worker", "agent"):
- # If Worker or agent, then add brick path also to label
+ if args.subcmd in ("worker"):
+ # If Worker, then add brick path also to label
label = "%s %s" % (args.subcmd, args.local_path)
elif args.subcmd == "slave":
# If Slave add Master node and Brick details
diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py
index 8d129567075..34beadb3552 100644
--- a/geo-replication/syncdaemon/libgfchangelog.py
+++ b/geo-replication/syncdaemon/libgfchangelog.py
@@ -16,126 +16,127 @@ from py2py3 import gr_cl_history_changelog, gr_cl_done, gr_create_string_buffer
from py2py3 import gr_cl_register, gr_cl_history_done, bytearray_to_str
-class Changes(object):
- libgfc = CDLL(find_library("gfchangelog"), mode=RTLD_GLOBAL,
- use_errno=True)
-
- @classmethod
- def geterrno(cls):
- return get_errno()
-
- @classmethod
- def raise_changelog_err(cls):
- errn = cls.geterrno()
- raise ChangelogException(errn, os.strerror(errn))
-
- @classmethod
- def _get_api(cls, call):
- return getattr(cls.libgfc, call)
-
- @classmethod
- def cl_init(cls):
- ret = cls._get_api('gf_changelog_init')(None)
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_register(cls, brick, path, log_file, log_level, retries=0):
- ret = gr_cl_register(cls, brick, path, log_file, log_level, retries)
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_scan(cls):
- ret = cls._get_api('gf_changelog_scan')()
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_startfresh(cls):
- ret = cls._get_api('gf_changelog_start_fresh')()
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_getchanges(cls):
- """ remove hardcoding for path name length """
- def clsort(f):
- return f.split('.')[-1]
- changes = []
- buf = gr_create_string_buffer(4096)
- call = cls._get_api('gf_changelog_next_change')
-
- while True:
- ret = call(buf, 4096)
- if ret in (0, -1):
- break
- # py2 and py3 compatibility
- result = bytearray_to_str(buf.raw[:ret - 1])
- changes.append(result)
- if ret == -1:
- cls.raise_changelog_err()
- # cleanup tracker
- cls.cl_startfresh()
- return sorted(changes, key=clsort)
-
- @classmethod
- def cl_done(cls, clfile):
- ret = gr_cl_done(cls, clfile)
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_history_scan(cls):
- ret = cls._get_api('gf_history_changelog_scan')()
- if ret == -1:
- cls.raise_changelog_err()
-
- return ret
-
- @classmethod
- def cl_history_changelog(cls, changelog_path, start, end, num_parallel):
- actual_end = c_ulong()
- ret = gr_cl_history_changelog(cls, changelog_path, start, end,
- num_parallel, byref(actual_end))
- if ret == -1:
- cls.raise_changelog_err()
-
- if ret == -2:
- raise ChangelogHistoryNotAvailable()
-
- return (ret, actual_end.value)
-
- @classmethod
- def cl_history_startfresh(cls):
- ret = cls._get_api('gf_history_changelog_start_fresh')()
- if ret == -1:
- cls.raise_changelog_err()
-
- @classmethod
- def cl_history_getchanges(cls):
- """ remove hardcoding for path name length """
- def clsort(f):
- return f.split('.')[-1]
-
- changes = []
- buf = gr_create_string_buffer(4096)
- call = cls._get_api('gf_history_changelog_next_change')
-
- while True:
- ret = call(buf, 4096)
- if ret in (0, -1):
- break
- # py2 and py3 compatibility
- result = bytearray_to_str(buf.raw[:ret - 1])
- changes.append(result)
- if ret == -1:
- cls.raise_changelog_err()
-
- return sorted(changes, key=clsort)
-
- @classmethod
- def cl_history_done(cls, clfile):
- ret = gr_cl_history_done(cls, clfile)
- if ret == -1:
- cls.raise_changelog_err()
+libgfc = CDLL(
+ find_library("gfchangelog"),
+ mode=RTLD_GLOBAL,
+ use_errno=True
+)
+
+
+def _raise_changelog_err():
+ errn = get_errno()
+ raise ChangelogException(errn, os.strerror(errn))
+
+
+def _init():
+ if libgfc.gf_changelog_init(None) == -1:
+ _raise_changelog_err()
+
+
+def register(brick, path, log_file, log_level, retries=0):
+ _init()
+
+ ret = gr_cl_register(libgfc, brick, path, log_file, log_level, retries)
+
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def scan():
+ ret = libgfc.gf_changelog_scan()
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def startfresh():
+ ret = libgfc.gf_changelog_start_fresh()
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def getchanges():
+ def clsort(cfile):
+ return cfile.split('.')[-1]
+
+ changes = []
+ buf = gr_create_string_buffer(4096)
+ call = libgfc.gf_changelog_next_change
+
+ while True:
+ ret = call(buf, 4096)
+ if ret in (0, -1):
+ break
+
+ # py2 and py3 compatibility
+ result = bytearray_to_str(buf.raw[:ret - 1])
+ changes.append(result)
+
+ if ret == -1:
+ _raise_changelog_err()
+
+ # cleanup tracker
+ startfresh()
+
+ return sorted(changes, key=clsort)
+
+
+def done(clfile):
+ ret = gr_cl_done(libgfc, clfile)
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def history_scan():
+ ret = libgfc.gf_history_changelog_scan()
+ if ret == -1:
+ _raise_changelog_err()
+
+ return ret
+
+
+def history_changelog(changelog_path, start, end, num_parallel):
+ actual_end = c_ulong()
+ ret = gr_cl_history_changelog(libgfc, changelog_path, start, end,
+ num_parallel, byref(actual_end))
+ if ret == -1:
+ _raise_changelog_err()
+
+ if ret == -2:
+ raise ChangelogHistoryNotAvailable()
+
+ return (ret, actual_end.value)
+
+
+def history_startfresh():
+ ret = libgfc.gf_history_changelog_start_fresh()
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def history_getchanges():
+ def clsort(cfile):
+ return cfile.split('.')[-1]
+
+ changes = []
+ buf = gr_create_string_buffer(4096)
+ call = libgfc.gf_history_changelog_next_change
+
+ while True:
+ ret = call(buf, 4096)
+ if ret in (0, -1):
+ break
+
+ # py2 and py3 compatibility
+ result = bytearray_to_str(buf.raw[:ret - 1])
+ changes.append(result)
+
+ if ret == -1:
+ _raise_changelog_err()
+
+ return sorted(changes, key=clsort)
+
+
+def history_done(clfile):
+ ret = gr_cl_history_done(libgfc, clfile)
+ if ret == -1:
+ _raise_changelog_err()
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index f75a5421bcf..96232bb0831 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -22,6 +22,7 @@ from threading import Condition, Lock
from datetime import datetime
import gsyncdconfig as gconf
+import libgfchangelog
from rconf import rconf
from syncdutils import Thread, GsyncdError, escape_space_newline
from syncdutils import unescape_space_newline, gauxpfx, escape
@@ -1498,9 +1499,9 @@ class GMasterChangelogMixin(GMasterCommon):
# that are _historical_ to that time.
data_stime = self.get_data_stime()
- self.changelog_agent.scan()
+ libgfchangelog.scan()
self.crawls += 1
- changes = self.changelog_agent.getchanges()
+ changes = libgfchangelog.getchanges()
if changes:
if data_stime:
logging.info(lf("slave's time",
@@ -1517,10 +1518,9 @@ class GMasterChangelogMixin(GMasterCommon):
self.changelogs_batch_process(changes)
- def register(self, register_time, changelog_agent, status):
- self.changelog_agent = changelog_agent
+ def register(self, register_time, status):
self.sleep_interval = gconf.get("change-interval")
- self.changelog_done_func = self.changelog_agent.done
+ self.changelog_done_func = libgfchangelog.done
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
".processed")
@@ -1529,11 +1529,10 @@ class GMasterChangelogMixin(GMasterCommon):
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
- def register(self, register_time, changelog_agent, status):
- self.changelog_agent = changelog_agent
+ def register(self, register_time, status):
self.changelog_register_time = register_time
self.history_crawl_start_time = register_time
- self.changelog_done_func = self.changelog_agent.history_done
+ self.changelog_done_func = libgfchangelog.history_done
self.history_turns = 0
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
@@ -1561,7 +1560,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# location then consuming history will not work(Known issue as of now)
changelog_path = os.path.join(rconf.args.local_path,
".glusterfs/changelogs")
- ret, actual_end = self.changelog_agent.history(
+ ret, actual_end = libgfchangelog.history_changelog(
changelog_path,
data_stime[0],
end_time,
@@ -1573,10 +1572,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# to be processed. returns positive value as number of changelogs
# to be processed, which will be fetched using
# history_getchanges()
- while self.changelog_agent.history_scan() > 0:
+ while libgfchangelog.history_scan() > 0:
self.crawls += 1
- changes = self.changelog_agent.history_getchanges()
+ changes = libgfchangelog.history_getchanges()
if changes:
if data_stime:
logging.info(lf("slave's time",
@@ -1629,7 +1628,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self, register_time=None, changelog_agent=None, status=None):
+ def register(self, register_time=None, status=None):
self.status = status
self.counter = 0
self.comlist = []
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index 236afe70d11..14e77aef27e 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -20,6 +20,7 @@ import random
from resource import SSH
import gsyncdconfig as gconf
+import libgfchangelog
from rconf import rconf
from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile
from syncdutils import set_term_handler, GsyncdError
@@ -81,7 +82,7 @@ class Monitor(object):
# give a chance to graceful exit
errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH])
- def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master,
+ def monitor(self, w, argv, cpids, slave_vol, slave_host, master,
suuid, slavenodes):
"""the monitor loop
@@ -150,7 +151,7 @@ class Monitor(object):
remote_host = "%s@%s" % (remote_user, remote_new[0])
remote_id = remote_new[1]
- # Spawn the worker and agent in lock to avoid fd leak
+ # Spawn the worker in lock to avoid fd leak
self.lock.acquire()
self.status[w[0]['dir']].set_worker_status(self.ST_INIT)
@@ -158,44 +159,10 @@ class Monitor(object):
brick=w[0]['dir'],
slave_node=remote_host))
- # Couple of pipe pairs for RPC communication b/w
- # worker and changelog agent.
-
- # read/write end for agent
- (ra, ww) = pipe()
- # read/write end for worker
- (rw, wa) = pipe()
-
- # spawn the agent process
- apid = os.fork()
- if apid == 0:
- os.close(rw)
- os.close(ww)
- args_to_agent = argv + [
- 'agent',
- rconf.args.master,
- rconf.args.slave,
- '--local-path', w[0]['dir'],
- '--local-node', w[0]['host'],
- '--local-node-id', w[0]['uuid'],
- '--slave-id', suuid,
- '--rpc-fd', ','.join([str(ra), str(wa), str(rw), str(ww)])
- ]
-
- if rconf.args.config_file is not None:
- args_to_agent += ['-c', rconf.args.config_file]
-
- if rconf.args.debug:
- args_to_agent.append("--debug")
-
- os.execv(sys.executable, args_to_agent)
-
pr, pw = pipe()
cpid = os.fork()
if cpid == 0:
os.close(pr)
- os.close(ra)
- os.close(wa)
args_to_worker = argv + [
'worker',
@@ -206,8 +173,6 @@ class Monitor(object):
'--local-node', w[0]['host'],
'--local-node-id', w[0]['uuid'],
'--slave-id', suuid,
- '--rpc-fd',
- ','.join([str(rw), str(ww), str(ra), str(wa)]),
'--subvol-num', str(w[2]),
'--resource-remote', remote_host,
'--resource-remote-id', remote_id
@@ -238,14 +203,8 @@ class Monitor(object):
os.execv(sys.executable, args_to_worker)
cpids.add(cpid)
- agents.add(apid)
os.close(pw)
- # close all RPC pipes in monitor
- os.close(ra)
- os.close(wa)
- os.close(rw)
- os.close(ww)
self.lock.release()
t0 = time.time()
@@ -254,42 +213,19 @@ class Monitor(object):
if so:
ret = nwait(cpid, os.WNOHANG)
- ret_agent = nwait(apid, os.WNOHANG)
-
- if ret_agent is not None:
- # Agent is died Kill Worker
- logging.info(lf("Changelog Agent died, Aborting Worker",
- brick=w[0]['dir']))
- errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
- nwait(cpid)
- nwait(apid)
if ret is not None:
logging.info(lf("worker died before establishing "
"connection",
brick=w[0]['dir']))
- nwait(apid) # wait for agent
else:
logging.debug("worker(%s) connected" % w[0]['dir'])
while time.time() < t0 + conn_timeout:
ret = nwait(cpid, os.WNOHANG)
- ret_agent = nwait(apid, os.WNOHANG)
if ret is not None:
logging.info(lf("worker died in startup phase",
brick=w[0]['dir']))
- nwait(apid) # wait for agent
- break
-
- if ret_agent is not None:
- # Agent is died Kill Worker
- logging.info(lf("Changelog Agent died, Aborting "
- "Worker",
- brick=w[0]['dir']))
- errno_wrap(os.kill, [cpid, signal.SIGKILL],
- [ESRCH])
- nwait(cpid)
- nwait(apid)
break
time.sleep(1)
@@ -304,12 +240,8 @@ class Monitor(object):
brick=w[0]['dir'],
timeout=conn_timeout))
errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
- nwait(apid) # wait for agent
ret = nwait(cpid)
if ret is None:
- # If worker dies, agent terminates on EOF.
- # So lets wait for agent first.
- nwait(apid)
ret = nwait(cpid)
if exit_signalled(ret):
ret = 0
@@ -333,18 +265,15 @@ class Monitor(object):
argv = [os.path.basename(sys.executable), sys.argv[0]]
cpids = set()
- agents = set()
ta = []
for wx in wspx:
def wmon(w):
- cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol,
+ cpid, _ = self.monitor(w, argv, cpids, slave_vol,
slave_host, master, suuid, slavenodes)
time.sleep(1)
self.lock.acquire()
for cpid in cpids:
errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
- for apid in agents:
- errno_wrap(os.kill, [apid, signal.SIGKILL], [ESRCH])
self.lock.release()
finalize(exval=1)
t = Thread(target=wmon, args=[wx])
@@ -354,8 +283,8 @@ class Monitor(object):
# monitor status was being updated in each monitor thread. It
# should not be done as it can cause deadlock for a worker start.
# set_monitor_status uses flock to synchronize multple instances
- # updating the file. Since each monitor thread forks worker and
- # agent, these processes can hold the reference to fd of status
+ # updating the file. Since each monitor thread forks worker,
+ # these processes can hold the reference to fd of status
# file causing deadlock to workers which starts later as flock
# will not be release until all references to same fd is closed.
# It will also cause fd leaks.
diff --git a/geo-replication/syncdaemon/py2py3.py b/geo-replication/syncdaemon/py2py3.py
index faad750059c..f9c76e1b50a 100644
--- a/geo-replication/syncdaemon/py2py3.py
+++ b/geo-replication/syncdaemon/py2py3.py
@@ -55,23 +55,23 @@ if sys.version_info >= (3,):
def gr_lremovexattr(cls, path, attr):
return cls.libc.lremovexattr(path.encode(), attr.encode())
- def gr_cl_register(cls, brick, path, log_file, log_level, retries):
- return cls._get_api('gf_changelog_register')(brick.encode(),
- path.encode(),
- log_file.encode(),
- log_level, retries)
+ def gr_cl_register(libgfapi, brick, path, log_file, log_level, retries):
+ return libgfapi.gf_changelog_register(brick.encode(),
+ path.encode(),
+ log_file.encode(),
+ log_level, retries)
- def gr_cl_done(cls, clfile):
- return cls._get_api('gf_changelog_done')(clfile.encode())
+ def gr_cl_done(libgfapi, clfile):
+ return libgfapi.gf_changelog_done(clfile.encode())
- def gr_cl_history_changelog(cls, changelog_path, start, end, num_parallel,
+ def gr_cl_history_changelog(libgfapi, changelog_path, start, end, num_parallel,
actual_end):
- return cls._get_api('gf_history_changelog')(changelog_path.encode(),
- start, end, num_parallel,
- actual_end)
+ return libgfapi.gf_history_changelog(changelog_path.encode(),
+ start, end, num_parallel,
+ actual_end)
- def gr_cl_history_done(cls, clfile):
- return cls._get_api('gf_history_changelog_done')(clfile.encode())
+ def gr_cl_history_done(libgfapi, clfile):
+ return libgfapi.gf_history_changelog_done(clfile.encode())
# regular file
@@ -137,20 +137,20 @@ else:
def gr_lremovexattr(cls, path, attr):
return cls.libc.lremovexattr(path, attr)
- def gr_cl_register(cls, brick, path, log_file, log_level, retries):
- return cls._get_api('gf_changelog_register')(brick, path, log_file,
- log_level, retries)
+ def gr_cl_register(libgfapi, brick, path, log_file, log_level, retries):
+ return libgfapi.gf_changelog_register(brick, path, log_file,
+ log_level, retries)
- def gr_cl_done(cls, clfile):
- return cls._get_api('gf_changelog_done')(clfile)
+ def gr_cl_done(libgfapi, clfile):
+ return libgfapi.gf_changelog_done(clfile)
- def gr_cl_history_changelog(cls, changelog_path, start, end, num_parallel,
+ def gr_cl_history_changelog(libgfapi, changelog_path, start, end, num_parallel,
actual_end):
- return cls._get_api('gf_history_changelog')(changelog_path, start, end,
- num_parallel, actual_end)
+ return libgfapi.gf_history_changelog(changelog_path, start, end,
+ num_parallel, actual_end)
- def gr_cl_history_done(cls, clfile):
- return cls._get_api('gf_history_changelog_done')(clfile)
+ def gr_cl_history_done(libgfapi, clfile):
+ return libgfapi.gf_history_changelog_done(clfile)
# regular file
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index cdd3ae8d7e2..ae5600d1d9a 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -25,6 +25,7 @@ import errno
from rconf import rconf
import gsyncdconfig as gconf
+import libgfchangelog
import repce
from repce import RepceServer, RepceClient
@@ -35,7 +36,6 @@ from syncdutils import entry2pb, gauxpfx, errno_wrap, lstat
from syncdutils import NoStimeAvailable, PartialHistoryAvailable
from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
from syncdutils import get_changelog_log_level, get_rsync_version
-from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
from syncdutils import GX_GFID_CANONICAL_LEN
from gsyncdstatus import GeorepStatus
from syncdutils import lf, Popen, sup
@@ -1245,9 +1245,6 @@ class GLUSTER(object):
# register the crawlers and start crawling
# g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
# g3 ==> changelog History
- (inf, ouf, ra, wa) = rconf.args.rpc_fd.split(',')
- changelog_agent = RepceClient(int(inf), int(ouf))
-
status = GeorepStatus(gconf.get("state-file"),
rconf.args.local_node,
rconf.args.local_path,
@@ -1255,12 +1252,6 @@ class GLUSTER(object):
rconf.args.master,
rconf.args.slave)
status.reset_on_worker_start()
- rv = changelog_agent.version()
- if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:
- raise GsyncdError(
- "RePCe major version mismatch(changelog agent): "
- "local %s, remote %s" %
- (CHANGELOG_AGENT_CLIENT_VERSION, rv))
try:
workdir = g2.setup_working_dir()
@@ -1271,17 +1262,16 @@ class GLUSTER(object):
# register with the changelog library
# 9 == log level (DEBUG)
# 5 == connection retries
- changelog_agent.init()
- changelog_agent.register(rconf.args.local_path,
- workdir,
- gconf.get("changelog-log-file"),
- get_changelog_log_level(
- gconf.get("changelog-log-level")),
- g2.CHANGELOG_CONN_RETRIES)
+ libgfchangelog.register(rconf.args.local_path,
+ workdir,
+ gconf.get("changelog-log-file"),
+ get_changelog_log_level(
+ gconf.get("changelog-log-level")),
+ g2.CHANGELOG_CONN_RETRIES)
register_time = int(time.time())
- g2.register(register_time, changelog_agent, status)
- g3.register(register_time, changelog_agent, status)
+ g2.register(register_time, status)
+ g3.register(register_time, status)
except ChangelogException as e:
logging.error(lf("Changelog register failed", error=e))
sys.exit(1)
diff --git a/geo-replication/syncdaemon/subcmds.py b/geo-replication/syncdaemon/subcmds.py
index f8515f2607b..b8508532e30 100644
--- a/geo-replication/syncdaemon/subcmds.py
+++ b/geo-replication/syncdaemon/subcmds.py
@@ -97,17 +97,6 @@ def subcmd_slave(args):
local.service_loop()
-def subcmd_agent(args):
- import os
- from changelogagent import agent, Changelog
- from syncdutils import lf
-
- os.setsid()
- logging.debug(lf("RPC FD",
- rpc_fd=repr(args.rpc_fd)))
- return agent(Changelog(), args.rpc_fd)
-
-
def subcmd_voluuidget(args):
from subprocess import Popen, PIPE
import xml.etree.ElementTree as XET
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index d6420355791..8e783136318 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -62,8 +62,6 @@ GF_OP_RETRIES = 10
GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0'
-CHANGELOG_AGENT_SERVER_VERSION = 1.0
-CHANGELOG_AGENT_CLIENT_VERSION = 1.0
NodeID = None
rsync_version = None
unshare_mnt_propagation = None