diff options
Diffstat (limited to 'geo-replication/syncdaemon')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 38 |
1 files changed, 32 insertions, 6 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index dabf5536c64..03d5b572cef 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -39,6 +39,14 @@ URXTIME = (-1, 0) # crawl before starting live changelog crawl. CHANGELOG_ROLLOVER_TIME = 15 +# Max size of Changelogs to process per batch, Changelogs Processing is +# not limited by the number of changelogs but instead based on +# size of the changelog file, One sample changelog file size was 145408 +# with ~1000 CREATE and ~1000 DATA. 5 such files in one batch is 727040 +# If geo-rep worker crashes while processing a batch, it has to retry only +# that batch since stime will get updated after each batch. +MAX_CHANGELOG_BATCH_SIZE = 727040 + # Utility functions to help us to get to closer proximity # of the DRY principle (no, don't look for elevated or # perspectivistic things here) @@ -1299,6 +1307,28 @@ class GMasterChangelogMixin(GMasterCommon): except: raise + def changelogs_batch_process(self, changes): + changelogs_batches = [] + current_size = 0 + for c in changes: + si = os.lstat(c).st_size + if (si + current_size) > MAX_CHANGELOG_BATCH_SIZE: + # Create new batch if single Changelog file greater than + # Max Size! or current batch size exceeds Max size + changelogs_batches.append([c]) + current_size = si + else: + # Append to last batch, if No batches available Create one + current_size += si + if not changelogs_batches: + changelogs_batches.append([c]) + else: + changelogs_batches[-1].append(c) + + for batch in changelogs_batches: + logging.debug('processing changes %s' % repr(batch)) + self.process(batch) + def crawl(self): self.update_worker_crawl_status("Changelog Crawl") changes = [] @@ -1322,9 +1352,7 @@ class GMasterChangelogMixin(GMasterCommon): changes.remove(pr) self.archive_and_purge_changelogs(processed) - if changes: - logging.debug('processing changes %s' % repr(changes)) - self.process(changes) + self.changelogs_batch_process(changes) def register(self, register_time, changelog_agent): self.changelog_agent = changelog_agent @@ -1388,9 +1416,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): self.changelog_done_func(pr) changes.remove(pr) - if changes: - logging.debug('processing changes %s' % repr(changes)) - self.process(changes) + self.changelogs_batch_process(changes) history_turn_time = int(time.time()) - self.history_crawl_start_time |