diff options
author | Aravinda VK <avishwan@redhat.com> | 2015-04-11 20:03:47 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2015-04-28 10:39:41 -0700 |
commit | 428933dce2c87ea62b4f58af7d260064fade6a8b (patch) | |
tree | b781d8375b0a03affb8f866bf7913df0869da73f /geo-replication/syncdaemon | |
parent | 8986a47c54db4769feb4e6664532386f1cd0275d (diff) |
geo-rep: Limit number of changelogs to process in batch
Changelog processing is done in batch, for example if 10 changelogs
available for processing then process all at once. Collect Entry, Meta
and Data operations separately, All the entry operations like CREATE,
MKDIR, MKNOD, LINK, UNLINK will be executed first then rsync will be
triggered for whole batch. Stime will get updated once the complete
batch is complete.
In case of large number of Changelogs in a batch, If geo-rep fails after
Entry operations, but before rsync then on restart, it again starts from the
beginning since stime is not updated. It has to process all the changelogs
again. While processing same changelogs again, all CREATE will get EEXIST
since all the files created in previous run. Big hit for performance.
With this patch, Geo-rep limits number of changelogs per batch based on
Changelog file size. So that when geo-rep fails it has to retry only last batch
changelogs since stime gets updated after each batch.
BUG: 1210965
Change-Id: I844448c4cdcce38a3a2e2cca7c9a50db8f5a9062
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/10202
Reviewed-by: Kotresh HR <khiremat@redhat.com>
Tested-by: NetBSD Build System
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 38 |
1 files changed, 32 insertions, 6 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index dabf5536c64..03d5b572cef 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -39,6 +39,14 @@ URXTIME = (-1, 0) # crawl before starting live changelog crawl. CHANGELOG_ROLLOVER_TIME = 15 +# Max size of Changelogs to process per batch, Changelogs Processing is +# not limited by the number of changelogs but instead based on +# size of the changelog file, One sample changelog file size was 145408 +# with ~1000 CREATE and ~1000 DATA. 5 such files in one batch is 727040 +# If geo-rep worker crashes while processing a batch, it has to retry only +# that batch since stime will get updated after each batch. +MAX_CHANGELOG_BATCH_SIZE = 727040 + # Utility functions to help us to get to closer proximity # of the DRY principle (no, don't look for elevated or # perspectivistic things here) @@ -1299,6 +1307,28 @@ class GMasterChangelogMixin(GMasterCommon): except: raise + def changelogs_batch_process(self, changes): + changelogs_batches = [] + current_size = 0 + for c in changes: + si = os.lstat(c).st_size + if (si + current_size) > MAX_CHANGELOG_BATCH_SIZE: + # Create new batch if single Changelog file greater than + # Max Size! or current batch size exceeds Max size + changelogs_batches.append([c]) + current_size = si + else: + # Append to last batch, if No batches available Create one + current_size += si + if not changelogs_batches: + changelogs_batches.append([c]) + else: + changelogs_batches[-1].append(c) + + for batch in changelogs_batches: + logging.debug('processing changes %s' % repr(batch)) + self.process(batch) + def crawl(self): self.update_worker_crawl_status("Changelog Crawl") changes = [] @@ -1322,9 +1352,7 @@ class GMasterChangelogMixin(GMasterCommon): changes.remove(pr) self.archive_and_purge_changelogs(processed) - if changes: - logging.debug('processing changes %s' % repr(changes)) - self.process(changes) + self.changelogs_batch_process(changes) def register(self, register_time, changelog_agent): self.changelog_agent = changelog_agent @@ -1388,9 +1416,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): self.changelog_done_func(pr) changes.remove(pr) - if changes: - logging.debug('processing changes %s' % repr(changes)) - self.process(changes) + self.changelogs_batch_process(changes) history_turn_time = int(time.time()) - self.history_crawl_start_time |