diff options
author | Aravinda VK <avishwan@redhat.com> | 2019-10-23 10:10:12 +0530 |
---|---|---|
committer | Amar Tumballi <amarts@gmail.com> | 2019-11-07 06:24:39 +0000 |
commit | 0fc68040b72fc94dec3874345547e294b9ec1f45 (patch) | |
tree | ea053550c79c3903804a7e3c3812551a41e17b01 /geo-replication/syncdaemon/master.py | |
parent | 0ab6c178468b6cce095c54ab62cfa51162d01fcc (diff) |
georep: Merge Worker and Agent as a single process
- libgfchangelog is simplified by removing unnecessary API Class
- Merged Agent logic into Worker instead of running Worker and Agent as
two separate processes and maintaining RPC between Worker and Agent.
- Geo-rep command Pause and Resume will continue without any changes.
But Agent functionality also gets paused with that.
Updates: #755
Change-Id: Ie2c00fa7dddf21f180f0649e0aaf084d29023c98
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 23 |
1 files changed, 11 insertions, 12 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index f75a5421bcf..96232bb0831 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -22,6 +22,7 @@ from threading import Condition, Lock from datetime import datetime import gsyncdconfig as gconf +import libgfchangelog from rconf import rconf from syncdutils import Thread, GsyncdError, escape_space_newline from syncdutils import unescape_space_newline, gauxpfx, escape @@ -1498,9 +1499,9 @@ class GMasterChangelogMixin(GMasterCommon): # that are _historical_ to that time. data_stime = self.get_data_stime() - self.changelog_agent.scan() + libgfchangelog.scan() self.crawls += 1 - changes = self.changelog_agent.getchanges() + changes = libgfchangelog.getchanges() if changes: if data_stime: logging.info(lf("slave's time", @@ -1517,10 +1518,9 @@ class GMasterChangelogMixin(GMasterCommon): self.changelogs_batch_process(changes) - def register(self, register_time, changelog_agent, status): - self.changelog_agent = changelog_agent + def register(self, register_time, status): self.sleep_interval = gconf.get("change-interval") - self.changelog_done_func = self.changelog_agent.done + self.changelog_done_func = libgfchangelog.done self.tempdir = self.setup_working_dir() self.processed_changelogs_dir = os.path.join(self.tempdir, ".processed") @@ -1529,11 +1529,10 @@ class GMasterChangelogMixin(GMasterCommon): class GMasterChangeloghistoryMixin(GMasterChangelogMixin): - def register(self, register_time, changelog_agent, status): - self.changelog_agent = changelog_agent + def register(self, register_time, status): self.changelog_register_time = register_time self.history_crawl_start_time = register_time - self.changelog_done_func = self.changelog_agent.history_done + self.changelog_done_func = libgfchangelog.history_done self.history_turns = 0 self.tempdir = self.setup_working_dir() self.processed_changelogs_dir = os.path.join(self.tempdir, @@ -1561,7 +1560,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # location then consuming history will not work(Known issue as of now) changelog_path = os.path.join(rconf.args.local_path, ".glusterfs/changelogs") - ret, actual_end = self.changelog_agent.history( + ret, actual_end = libgfchangelog.history_changelog( changelog_path, data_stime[0], end_time, @@ -1573,10 +1572,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # to be processed. returns positive value as number of changelogs # to be processed, which will be fetched using # history_getchanges() - while self.changelog_agent.history_scan() > 0: + while libgfchangelog.history_scan() > 0: self.crawls += 1 - changes = self.changelog_agent.history_getchanges() + changes = libgfchangelog.history_getchanges() if changes: if data_stime: logging.info(lf("slave's time", @@ -1629,7 +1628,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): XSYNC_MAX_ENTRIES = 1 << 13 - def register(self, register_time=None, changelog_agent=None, status=None): + def register(self, register_time=None, status=None): self.status = status self.counter = 0 self.comlist = [] |