diff options
author | Aravinda VK <avishwan@redhat.com> | 2019-10-23 10:10:12 +0530 |
---|---|---|
committer | Amar Tumballi <amarts@gmail.com> | 2019-11-07 06:24:39 +0000 |
commit | 0fc68040b72fc94dec3874345547e294b9ec1f45 (patch) | |
tree | ea053550c79c3903804a7e3c3812551a41e17b01 /geo-replication | |
parent | 0ab6c178468b6cce095c54ab62cfa51162d01fcc (diff) |
georep: Merge Worker and Agent as a single process
- libgfchangelog is simplified by removing unnecessary API Class
- Merged Agent logic into Worker instead of running Worker and Agent as
two separate processes and maintaining RPC between Worker and Agent.
- Geo-rep command Pause and Resume will continue without any changes.
But Agent functionality also gets paused with that.
Updates: #755
Change-Id: Ie2c00fa7dddf21f180f0649e0aaf084d29023c98
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Diffstat (limited to 'geo-replication')
-rw-r--r-- | geo-replication/syncdaemon/Makefile.am | 2 | ||||
-rw-r--r-- | geo-replication/syncdaemon/changelogagent.py | 78 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 19 | ||||
-rw-r--r-- | geo-replication/syncdaemon/libgfchangelog.py | 247 | ||||
-rw-r--r-- | geo-replication/syncdaemon/master.py | 23 | ||||
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 83 | ||||
-rw-r--r-- | geo-replication/syncdaemon/py2py3.py | 46 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 28 | ||||
-rw-r--r-- | geo-replication/syncdaemon/subcmds.py | 11 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 2 |
10 files changed, 176 insertions, 363 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index 62c5ce7fe30..d70e3368faf 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -2,7 +2,7 @@ syncdaemondir = $(GLUSTERFS_LIBEXECDIR)/python/syncdaemon syncdaemon_PYTHON = rconf.py gsyncd.py __init__.py master.py README.md repce.py \ resource.py syncdutils.py monitor.py libcxattr.py gsyncdconfig.py \ - libgfchangelog.py changelogagent.py gsyncdstatus.py conf.py logutils.py \ + libgfchangelog.py gsyncdstatus.py conf.py logutils.py \ subcmds.py argsupgrade.py py2py3.py CLEANFILES = diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py deleted file mode 100644 index c5fdbc3a74f..00000000000 --- a/geo-replication/syncdaemon/changelogagent.py +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/python3 -# -# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> -# This file is part of GlusterFS. - -# This file is licensed to you under your choice of the GNU Lesser -# General Public License, version 3 or any later version (LGPLv3 or -# later), or the GNU General Public License, version 2 (GPLv2), in all -# cases as published by the Free Software Foundation. -# - -import logging -import syncdutils -from syncdutils import select, CHANGELOG_AGENT_SERVER_VERSION -from repce import RepceServer - - -class _MetaChangelog(object): - - def __getattr__(self, meth): - from libgfchangelog import Changes as LChanges - xmeth = [m for m in dir(LChanges) if m[0] != '_'] - if meth not in xmeth: - return - for m in xmeth: - setattr(self, m, getattr(LChanges, m)) - return getattr(self, meth) - -Changes = _MetaChangelog() - - -class Changelog(object): - def version(self): - return CHANGELOG_AGENT_SERVER_VERSION - - def init(self): - return Changes.cl_init() - - def register(self, cl_brick, cl_dir, cl_log, cl_level, retries=0): - return Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) - - def scan(self): - return Changes.cl_scan() - - def getchanges(self): - return Changes.cl_getchanges() - - def done(self, clfile): - return Changes.cl_done(clfile) - - def history(self, changelog_path, start, end, num_parallel): - return Changes.cl_history_changelog(changelog_path, start, end, - num_parallel) - - def history_scan(self): - return Changes.cl_history_scan() - - def history_getchanges(self): - return Changes.cl_history_getchanges() - - def history_done(self, clfile): - return Changes.cl_history_done(clfile) - - -class ChangelogAgent(object): - def __init__(self, obj, fd_tup): - (inf, ouf, rw, ww) = fd_tup.split(',') - repce = RepceServer(obj, int(inf), int(ouf), 1) - t = syncdutils.Thread(target=lambda: (repce.service_loop(), - syncdutils.finalize())) - t.start() - logging.info('Agent listining...') - - select((), (), ()) - - -def agent(obj, fd_tup): - return ChangelogAgent(obj, fd_tup) diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 8940384616a..95b26c5f3fb 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -79,8 +79,6 @@ def main(): help="feedback fd between monitor and worker") p.add_argument("--local-node", help="Local master node") p.add_argument("--local-node-id", help="Local Node ID") - p.add_argument("--rpc-fd", - help="Read and Write fds for worker-agent communication") p.add_argument("--subvol-num", type=int, help="Subvolume number") p.add_argument("--is-hottier", action="store_true", help="Is this brick part of hot tier") @@ -92,19 +90,6 @@ def main(): p.add_argument("-c", "--config-file", help="Config File") p.add_argument("--debug", action="store_true") - # Agent - p = sp.add_parser("agent") - p.add_argument("master", help="Master Volume Name") - p.add_argument("slave", help="Slave details user@host::vol format") - p.add_argument("--local-path", help="Local brick path") - p.add_argument("--local-node", help="Local master node") - p.add_argument("--local-node-id", help="Local Node ID") - p.add_argument("--slave-id", help="Slave Volume ID") - p.add_argument("--rpc-fd", - help="Read and Write fds for worker-agent communication") - p.add_argument("-c", "--config-file", help="Config File") - p.add_argument("--debug", action="store_true") - # Slave p = sp.add_parser("slave") p.add_argument("master", help="Master Volume Name") @@ -271,8 +256,8 @@ def main(): # Default label to print in log file label = args.subcmd - if args.subcmd in ("worker", "agent"): - # If Worker or agent, then add brick path also to label + if args.subcmd in ("worker"): + # If Worker, then add brick path also to label label = "%s %s" % (args.subcmd, args.local_path) elif args.subcmd == "slave": # If Slave add Master node and Brick details diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py index 8d129567075..34beadb3552 100644 --- a/geo-replication/syncdaemon/libgfchangelog.py +++ b/geo-replication/syncdaemon/libgfchangelog.py @@ -16,126 +16,127 @@ from py2py3 import gr_cl_history_changelog, gr_cl_done, gr_create_string_buffer from py2py3 import gr_cl_register, gr_cl_history_done, bytearray_to_str -class Changes(object): - libgfc = CDLL(find_library("gfchangelog"), mode=RTLD_GLOBAL, - use_errno=True) - - @classmethod - def geterrno(cls): - return get_errno() - - @classmethod - def raise_changelog_err(cls): - errn = cls.geterrno() - raise ChangelogException(errn, os.strerror(errn)) - - @classmethod - def _get_api(cls, call): - return getattr(cls.libgfc, call) - - @classmethod - def cl_init(cls): - ret = cls._get_api('gf_changelog_init')(None) - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_register(cls, brick, path, log_file, log_level, retries=0): - ret = gr_cl_register(cls, brick, path, log_file, log_level, retries) - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_scan(cls): - ret = cls._get_api('gf_changelog_scan')() - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_startfresh(cls): - ret = cls._get_api('gf_changelog_start_fresh')() - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_getchanges(cls): - """ remove hardcoding for path name length """ - def clsort(f): - return f.split('.')[-1] - changes = [] - buf = gr_create_string_buffer(4096) - call = cls._get_api('gf_changelog_next_change') - - while True: - ret = call(buf, 4096) - if ret in (0, -1): - break - # py2 and py3 compatibility - result = bytearray_to_str(buf.raw[:ret - 1]) - changes.append(result) - if ret == -1: - cls.raise_changelog_err() - # cleanup tracker - cls.cl_startfresh() - return sorted(changes, key=clsort) - - @classmethod - def cl_done(cls, clfile): - ret = gr_cl_done(cls, clfile) - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_history_scan(cls): - ret = cls._get_api('gf_history_changelog_scan')() - if ret == -1: - cls.raise_changelog_err() - - return ret - - @classmethod - def cl_history_changelog(cls, changelog_path, start, end, num_parallel): - actual_end = c_ulong() - ret = gr_cl_history_changelog(cls, changelog_path, start, end, - num_parallel, byref(actual_end)) - if ret == -1: - cls.raise_changelog_err() - - if ret == -2: - raise ChangelogHistoryNotAvailable() - - return (ret, actual_end.value) - - @classmethod - def cl_history_startfresh(cls): - ret = cls._get_api('gf_history_changelog_start_fresh')() - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_history_getchanges(cls): - """ remove hardcoding for path name length """ - def clsort(f): - return f.split('.')[-1] - - changes = [] - buf = gr_create_string_buffer(4096) - call = cls._get_api('gf_history_changelog_next_change') - - while True: - ret = call(buf, 4096) - if ret in (0, -1): - break - # py2 and py3 compatibility - result = bytearray_to_str(buf.raw[:ret - 1]) - changes.append(result) - if ret == -1: - cls.raise_changelog_err() - - return sorted(changes, key=clsort) - - @classmethod - def cl_history_done(cls, clfile): - ret = gr_cl_history_done(cls, clfile) - if ret == -1: - cls.raise_changelog_err() +libgfc = CDLL( + find_library("gfchangelog"), + mode=RTLD_GLOBAL, + use_errno=True +) + + +def _raise_changelog_err(): + errn = get_errno() + raise ChangelogException(errn, os.strerror(errn)) + + +def _init(): + if libgfc.gf_changelog_init(None) == -1: + _raise_changelog_err() + + +def register(brick, path, log_file, log_level, retries=0): + _init() + + ret = gr_cl_register(libgfc, brick, path, log_file, log_level, retries) + + if ret == -1: + _raise_changelog_err() + + +def scan(): + ret = libgfc.gf_changelog_scan() + if ret == -1: + _raise_changelog_err() + + +def startfresh(): + ret = libgfc.gf_changelog_start_fresh() + if ret == -1: + _raise_changelog_err() + + +def getchanges(): + def clsort(cfile): + return cfile.split('.')[-1] + + changes = [] + buf = gr_create_string_buffer(4096) + call = libgfc.gf_changelog_next_change + + while True: + ret = call(buf, 4096) + if ret in (0, -1): + break + + # py2 and py3 compatibility + result = bytearray_to_str(buf.raw[:ret - 1]) + changes.append(result) + + if ret == -1: + _raise_changelog_err() + + # cleanup tracker + startfresh() + + return sorted(changes, key=clsort) + + +def done(clfile): + ret = gr_cl_done(libgfc, clfile) + if ret == -1: + _raise_changelog_err() + + +def history_scan(): + ret = libgfc.gf_history_changelog_scan() + if ret == -1: + _raise_changelog_err() + + return ret + + +def history_changelog(changelog_path, start, end, num_parallel): + actual_end = c_ulong() + ret = gr_cl_history_changelog(libgfc, changelog_path, start, end, + num_parallel, byref(actual_end)) + if ret == -1: + _raise_changelog_err() + + if ret == -2: + raise ChangelogHistoryNotAvailable() + + return (ret, actual_end.value) + + +def history_startfresh(): + ret = libgfc.gf_history_changelog_start_fresh() + if ret == -1: + _raise_changelog_err() + + +def history_getchanges(): + def clsort(cfile): + return cfile.split('.')[-1] + + changes = [] + buf = gr_create_string_buffer(4096) + call = libgfc.gf_history_changelog_next_change + + while True: + ret = call(buf, 4096) + if ret in (0, -1): + break + + # py2 and py3 compatibility + result = bytearray_to_str(buf.raw[:ret - 1]) + changes.append(result) + + if ret == -1: + _raise_changelog_err() + + return sorted(changes, key=clsort) + + +def history_done(clfile): + ret = gr_cl_history_done(libgfc, clfile) + if ret == -1: + _raise_changelog_err() diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index f75a5421bcf..96232bb0831 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -22,6 +22,7 @@ from threading import Condition, Lock from datetime import datetime import gsyncdconfig as gconf +import libgfchangelog from rconf import rconf from syncdutils import Thread, GsyncdError, escape_space_newline from syncdutils import unescape_space_newline, gauxpfx, escape @@ -1498,9 +1499,9 @@ class GMasterChangelogMixin(GMasterCommon): # that are _historical_ to that time. data_stime = self.get_data_stime() - self.changelog_agent.scan() + libgfchangelog.scan() self.crawls += 1 - changes = self.changelog_agent.getchanges() + changes = libgfchangelog.getchanges() if changes: if data_stime: logging.info(lf("slave's time", @@ -1517,10 +1518,9 @@ class GMasterChangelogMixin(GMasterCommon): self.changelogs_batch_process(changes) - def register(self, register_time, changelog_agent, status): - self.changelog_agent = changelog_agent + def register(self, register_time, status): self.sleep_interval = gconf.get("change-interval") - self.changelog_done_func = self.changelog_agent.done + self.changelog_done_func = libgfchangelog.done self.tempdir = self.setup_working_dir() self.processed_changelogs_dir = os.path.join(self.tempdir, ".processed") @@ -1529,11 +1529,10 @@ class GMasterChangelogMixin(GMasterCommon): class GMasterChangeloghistoryMixin(GMasterChangelogMixin): - def register(self, register_time, changelog_agent, status): - self.changelog_agent = changelog_agent + def register(self, register_time, status): self.changelog_register_time = register_time self.history_crawl_start_time = register_time - self.changelog_done_func = self.changelog_agent.history_done + self.changelog_done_func = libgfchangelog.history_done self.history_turns = 0 self.tempdir = self.setup_working_dir() self.processed_changelogs_dir = os.path.join(self.tempdir, @@ -1561,7 +1560,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # location then consuming history will not work(Known issue as of now) changelog_path = os.path.join(rconf.args.local_path, ".glusterfs/changelogs") - ret, actual_end = self.changelog_agent.history( + ret, actual_end = libgfchangelog.history_changelog( changelog_path, data_stime[0], end_time, @@ -1573,10 +1572,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # to be processed. returns positive value as number of changelogs # to be processed, which will be fetched using # history_getchanges() - while self.changelog_agent.history_scan() > 0: + while libgfchangelog.history_scan() > 0: self.crawls += 1 - changes = self.changelog_agent.history_getchanges() + changes = libgfchangelog.history_getchanges() if changes: if data_stime: logging.info(lf("slave's time", @@ -1629,7 +1628,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): XSYNC_MAX_ENTRIES = 1 << 13 - def register(self, register_time=None, changelog_agent=None, status=None): + def register(self, register_time=None, status=None): self.status = status self.counter = 0 self.comlist = [] diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 236afe70d11..14e77aef27e 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -20,6 +20,7 @@ import random from resource import SSH import gsyncdconfig as gconf +import libgfchangelog from rconf import rconf from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile from syncdutils import set_term_handler, GsyncdError @@ -81,7 +82,7 @@ class Monitor(object): # give a chance to graceful exit errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH]) - def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master, + def monitor(self, w, argv, cpids, slave_vol, slave_host, master, suuid, slavenodes): """the monitor loop @@ -150,7 +151,7 @@ class Monitor(object): remote_host = "%s@%s" % (remote_user, remote_new[0]) remote_id = remote_new[1] - # Spawn the worker and agent in lock to avoid fd leak + # Spawn the worker in lock to avoid fd leak self.lock.acquire() self.status[w[0]['dir']].set_worker_status(self.ST_INIT) @@ -158,44 +159,10 @@ class Monitor(object): brick=w[0]['dir'], slave_node=remote_host)) - # Couple of pipe pairs for RPC communication b/w - # worker and changelog agent. - - # read/write end for agent - (ra, ww) = pipe() - # read/write end for worker - (rw, wa) = pipe() - - # spawn the agent process - apid = os.fork() - if apid == 0: - os.close(rw) - os.close(ww) - args_to_agent = argv + [ - 'agent', - rconf.args.master, - rconf.args.slave, - '--local-path', w[0]['dir'], - '--local-node', w[0]['host'], - '--local-node-id', w[0]['uuid'], - '--slave-id', suuid, - '--rpc-fd', ','.join([str(ra), str(wa), str(rw), str(ww)]) - ] - - if rconf.args.config_file is not None: - args_to_agent += ['-c', rconf.args.config_file] - - if rconf.args.debug: - args_to_agent.append("--debug") - - os.execv(sys.executable, args_to_agent) - pr, pw = pipe() cpid = os.fork() if cpid == 0: os.close(pr) - os.close(ra) - os.close(wa) args_to_worker = argv + [ 'worker', @@ -206,8 +173,6 @@ class Monitor(object): '--local-node', w[0]['host'], '--local-node-id', w[0]['uuid'], '--slave-id', suuid, - '--rpc-fd', - ','.join([str(rw), str(ww), str(ra), str(wa)]), '--subvol-num', str(w[2]), '--resource-remote', remote_host, '--resource-remote-id', remote_id @@ -238,14 +203,8 @@ class Monitor(object): os.execv(sys.executable, args_to_worker) cpids.add(cpid) - agents.add(apid) os.close(pw) - # close all RPC pipes in monitor - os.close(ra) - os.close(wa) - os.close(rw) - os.close(ww) self.lock.release() t0 = time.time() @@ -254,42 +213,19 @@ class Monitor(object): if so: ret = nwait(cpid, os.WNOHANG) - ret_agent = nwait(apid, os.WNOHANG) - - if ret_agent is not None: - # Agent is died Kill Worker - logging.info(lf("Changelog Agent died, Aborting Worker", - brick=w[0]['dir'])) - errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - nwait(cpid) - nwait(apid) if ret is not None: logging.info(lf("worker died before establishing " "connection", brick=w[0]['dir'])) - nwait(apid) # wait for agent else: logging.debug("worker(%s) connected" % w[0]['dir']) while time.time() < t0 + conn_timeout: ret = nwait(cpid, os.WNOHANG) - ret_agent = nwait(apid, os.WNOHANG) if ret is not None: logging.info(lf("worker died in startup phase", brick=w[0]['dir'])) - nwait(apid) # wait for agent - break - - if ret_agent is not None: - # Agent is died Kill Worker - logging.info(lf("Changelog Agent died, Aborting " - "Worker", - brick=w[0]['dir'])) - errno_wrap(os.kill, [cpid, signal.SIGKILL], - [ESRCH]) - nwait(cpid) - nwait(apid) break time.sleep(1) @@ -304,12 +240,8 @@ class Monitor(object): brick=w[0]['dir'], timeout=conn_timeout)) errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - nwait(apid) # wait for agent ret = nwait(cpid) if ret is None: - # If worker dies, agent terminates on EOF. - # So lets wait for agent first. - nwait(apid) ret = nwait(cpid) if exit_signalled(ret): ret = 0 @@ -333,18 +265,15 @@ class Monitor(object): argv = [os.path.basename(sys.executable), sys.argv[0]] cpids = set() - agents = set() ta = [] for wx in wspx: def wmon(w): - cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, + cpid, _ = self.monitor(w, argv, cpids, slave_vol, slave_host, master, suuid, slavenodes) time.sleep(1) self.lock.acquire() for cpid in cpids: errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) - for apid in agents: - errno_wrap(os.kill, [apid, signal.SIGKILL], [ESRCH]) self.lock.release() finalize(exval=1) t = Thread(target=wmon, args=[wx]) @@ -354,8 +283,8 @@ class Monitor(object): # monitor status was being updated in each monitor thread. It # should not be done as it can cause deadlock for a worker start. # set_monitor_status uses flock to synchronize multple instances - # updating the file. Since each monitor thread forks worker and - # agent, these processes can hold the reference to fd of status + # updating the file. Since each monitor thread forks worker, + # these processes can hold the reference to fd of status # file causing deadlock to workers which starts later as flock # will not be release until all references to same fd is closed. # It will also cause fd leaks. diff --git a/geo-replication/syncdaemon/py2py3.py b/geo-replication/syncdaemon/py2py3.py index faad750059c..f9c76e1b50a 100644 --- a/geo-replication/syncdaemon/py2py3.py +++ b/geo-replication/syncdaemon/py2py3.py @@ -55,23 +55,23 @@ if sys.version_info >= (3,): def gr_lremovexattr(cls, path, attr): return cls.libc.lremovexattr(path.encode(), attr.encode()) - def gr_cl_register(cls, brick, path, log_file, log_level, retries): - return cls._get_api('gf_changelog_register')(brick.encode(), - path.encode(), - log_file.encode(), - log_level, retries) + def gr_cl_register(libgfapi, brick, path, log_file, log_level, retries): + return libgfapi.gf_changelog_register(brick.encode(), + path.encode(), + log_file.encode(), + log_level, retries) - def gr_cl_done(cls, clfile): - return cls._get_api('gf_changelog_done')(clfile.encode()) + def gr_cl_done(libgfapi, clfile): + return libgfapi.gf_changelog_done(clfile.encode()) - def gr_cl_history_changelog(cls, changelog_path, start, end, num_parallel, + def gr_cl_history_changelog(libgfapi, changelog_path, start, end, num_parallel, actual_end): - return cls._get_api('gf_history_changelog')(changelog_path.encode(), - start, end, num_parallel, - actual_end) + return libgfapi.gf_history_changelog(changelog_path.encode(), + start, end, num_parallel, + actual_end) - def gr_cl_history_done(cls, clfile): - return cls._get_api('gf_history_changelog_done')(clfile.encode()) + def gr_cl_history_done(libgfapi, clfile): + return libgfapi.gf_history_changelog_done(clfile.encode()) # regular file @@ -137,20 +137,20 @@ else: def gr_lremovexattr(cls, path, attr): return cls.libc.lremovexattr(path, attr) - def gr_cl_register(cls, brick, path, log_file, log_level, retries): - return cls._get_api('gf_changelog_register')(brick, path, log_file, - log_level, retries) + def gr_cl_register(libgfapi, brick, path, log_file, log_level, retries): + return libgfapi.gf_changelog_register(brick, path, log_file, + log_level, retries) - def gr_cl_done(cls, clfile): - return cls._get_api('gf_changelog_done')(clfile) + def gr_cl_done(libgfapi, clfile): + return libgfapi.gf_changelog_done(clfile) - def gr_cl_history_changelog(cls, changelog_path, start, end, num_parallel, + def gr_cl_history_changelog(libgfapi, changelog_path, start, end, num_parallel, actual_end): - return cls._get_api('gf_history_changelog')(changelog_path, start, end, - num_parallel, actual_end) + return libgfapi.gf_history_changelog(changelog_path, start, end, + num_parallel, actual_end) - def gr_cl_history_done(cls, clfile): - return cls._get_api('gf_history_changelog_done')(clfile) + def gr_cl_history_done(libgfapi, clfile): + return libgfapi.gf_history_changelog_done(clfile) # regular file diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index cdd3ae8d7e2..ae5600d1d9a 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -25,6 +25,7 @@ import errno from rconf import rconf import gsyncdconfig as gconf +import libgfchangelog import repce from repce import RepceServer, RepceClient @@ -35,7 +36,6 @@ from syncdutils import entry2pb, gauxpfx, errno_wrap, lstat from syncdutils import NoStimeAvailable, PartialHistoryAvailable from syncdutils import ChangelogException, ChangelogHistoryNotAvailable from syncdutils import get_changelog_log_level, get_rsync_version -from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION from syncdutils import GX_GFID_CANONICAL_LEN from gsyncdstatus import GeorepStatus from syncdutils import lf, Popen, sup @@ -1245,9 +1245,6 @@ class GLUSTER(object): # register the crawlers and start crawling # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) # g3 ==> changelog History - (inf, ouf, ra, wa) = rconf.args.rpc_fd.split(',') - changelog_agent = RepceClient(int(inf), int(ouf)) - status = GeorepStatus(gconf.get("state-file"), rconf.args.local_node, rconf.args.local_path, @@ -1255,12 +1252,6 @@ class GLUSTER(object): rconf.args.master, rconf.args.slave) status.reset_on_worker_start() - rv = changelog_agent.version() - if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: - raise GsyncdError( - "RePCe major version mismatch(changelog agent): " - "local %s, remote %s" % - (CHANGELOG_AGENT_CLIENT_VERSION, rv)) try: workdir = g2.setup_working_dir() @@ -1271,17 +1262,16 @@ class GLUSTER(object): # register with the changelog library # 9 == log level (DEBUG) # 5 == connection retries - changelog_agent.init() - changelog_agent.register(rconf.args.local_path, - workdir, - gconf.get("changelog-log-file"), - get_changelog_log_level( - gconf.get("changelog-log-level")), - g2.CHANGELOG_CONN_RETRIES) + libgfchangelog.register(rconf.args.local_path, + workdir, + gconf.get("changelog-log-file"), + get_changelog_log_level( + gconf.get("changelog-log-level")), + g2.CHANGELOG_CONN_RETRIES) register_time = int(time.time()) - g2.register(register_time, changelog_agent, status) - g3.register(register_time, changelog_agent, status) + g2.register(register_time, status) + g3.register(register_time, status) except ChangelogException as e: logging.error(lf("Changelog register failed", error=e)) sys.exit(1) diff --git a/geo-replication/syncdaemon/subcmds.py b/geo-replication/syncdaemon/subcmds.py index f8515f2607b..b8508532e30 100644 --- a/geo-replication/syncdaemon/subcmds.py +++ b/geo-replication/syncdaemon/subcmds.py @@ -97,17 +97,6 @@ def subcmd_slave(args): local.service_loop() -def subcmd_agent(args): - import os - from changelogagent import agent, Changelog - from syncdutils import lf - - os.setsid() - logging.debug(lf("RPC FD", - rpc_fd=repr(args.rpc_fd))) - return agent(Changelog(), args.rpc_fd) - - def subcmd_voluuidget(args): from subprocess import Popen, PIPE import xml.etree.ElementTree as XET diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index d6420355791..8e783136318 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -62,8 +62,6 @@ GF_OP_RETRIES = 10 GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' -CHANGELOG_AGENT_SERVER_VERSION = 1.0 -CHANGELOG_AGENT_CLIENT_VERSION = 1.0 NodeID = None rsync_version = None unshare_mnt_propagation = None |