summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2019-10-23 10:10:12 +0530
committerAmar Tumballi <amarts@gmail.com>2019-11-07 06:24:39 +0000
commit0fc68040b72fc94dec3874345547e294b9ec1f45 (patch)
treeea053550c79c3903804a7e3c3812551a41e17b01 /geo-replication/syncdaemon/master.py
parent0ab6c178468b6cce095c54ab62cfa51162d01fcc (diff)
georep: Merge Worker and Agent as a single process
- libgfchangelog is simplified by removing unnecessary API Class - Merged Agent logic into Worker instead of running Worker and Agent as two separate processes and maintaining RPC between Worker and Agent. - Geo-rep command Pause and Resume will continue without any changes. But Agent functionality also gets paused with that. Updates: #755 Change-Id: Ie2c00fa7dddf21f180f0649e0aaf084d29023c98 Signed-off-by: Aravinda VK <avishwan@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py23
1 files changed, 11 insertions, 12 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index f75a5421bcf..96232bb0831 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -22,6 +22,7 @@ from threading import Condition, Lock
from datetime import datetime
import gsyncdconfig as gconf
+import libgfchangelog
from rconf import rconf
from syncdutils import Thread, GsyncdError, escape_space_newline
from syncdutils import unescape_space_newline, gauxpfx, escape
@@ -1498,9 +1499,9 @@ class GMasterChangelogMixin(GMasterCommon):
# that are _historical_ to that time.
data_stime = self.get_data_stime()
- self.changelog_agent.scan()
+ libgfchangelog.scan()
self.crawls += 1
- changes = self.changelog_agent.getchanges()
+ changes = libgfchangelog.getchanges()
if changes:
if data_stime:
logging.info(lf("slave's time",
@@ -1517,10 +1518,9 @@ class GMasterChangelogMixin(GMasterCommon):
self.changelogs_batch_process(changes)
- def register(self, register_time, changelog_agent, status):
- self.changelog_agent = changelog_agent
+ def register(self, register_time, status):
self.sleep_interval = gconf.get("change-interval")
- self.changelog_done_func = self.changelog_agent.done
+ self.changelog_done_func = libgfchangelog.done
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
".processed")
@@ -1529,11 +1529,10 @@ class GMasterChangelogMixin(GMasterCommon):
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
- def register(self, register_time, changelog_agent, status):
- self.changelog_agent = changelog_agent
+ def register(self, register_time, status):
self.changelog_register_time = register_time
self.history_crawl_start_time = register_time
- self.changelog_done_func = self.changelog_agent.history_done
+ self.changelog_done_func = libgfchangelog.history_done
self.history_turns = 0
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
@@ -1561,7 +1560,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# location then consuming history will not work(Known issue as of now)
changelog_path = os.path.join(rconf.args.local_path,
".glusterfs/changelogs")
- ret, actual_end = self.changelog_agent.history(
+ ret, actual_end = libgfchangelog.history_changelog(
changelog_path,
data_stime[0],
end_time,
@@ -1573,10 +1572,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# to be processed. returns positive value as number of changelogs
# to be processed, which will be fetched using
# history_getchanges()
- while self.changelog_agent.history_scan() > 0:
+ while libgfchangelog.history_scan() > 0:
self.crawls += 1
- changes = self.changelog_agent.history_getchanges()
+ changes = libgfchangelog.history_getchanges()
if changes:
if data_stime:
logging.info(lf("slave's time",
@@ -1629,7 +1628,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self, register_time=None, changelog_agent=None, status=None):
+ def register(self, register_time=None, status=None):
self.status = status
self.counter = 0
self.comlist = []