diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 504 |
1 files changed, 324 insertions, 180 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 80c4d9d8b95..3df08e41a13 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -25,7 +25,13 @@ from gconf import gconf from syncdutils import Thread, GsyncdError, boolify, escape from syncdutils import unescape, gauxpfx, md5hex, selfkill from syncdutils import lstat, errno_wrap, FreeObject -from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable +from syncdutils import NoStimeAvailable, PartialHistoryAvailable +from changelogsdb import db_init, db_record_data, db_record_meta +from changelogsdb import db_remove_data, db_remove_meta +from changelogsdb import db_get_data, db_get_meta, db_commit +from changelogsdb import db_get_data_count, db_get_meta_count +from changelogsdb import db_delete_meta_if_exists_in_data + URXTIME = (-1, 0) @@ -45,6 +51,10 @@ CHANGELOG_ROLLOVER_TIME = 15 # that batch since stime will get updated after each batch. MAX_CHANGELOG_BATCH_SIZE = 727040 +# Number of record to query once +DB_PAGINATION_SIZE_META = 100 +DB_PAGINATION_SIZE_DATA = 1000 + # Utility functions to help us to get to closer proximity # of the DRY principle (no, don't look for elevated or # perspectivistic things here) @@ -69,6 +79,24 @@ def _volinfo_hook_relax_foreign(self): return volinfo_sys +def edct(op, **ed): + dct = {} + dct['op'] = op + for k in ed: + if k == 'stat': + st = ed[k] + dst = dct['stat'] = {} + if st: + dst['uid'] = st.st_uid + dst['gid'] = st.st_gid + dst['mode'] = st.st_mode + dst['atime'] = st.st_atime + dst['mtime'] = st.st_mtime + else: + dct[k] = ed[k] + return dct + + # The API! def gmaster_builder(excrawl=None): @@ -259,7 +287,7 @@ class TarSSHEngine(object): st = lstat(se) if isinstance(st, int): # file got unlinked in the interim - self.unlinked_gfids.add(se) + db_remove_data(se) return True self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -294,7 +322,7 @@ class RsyncEngine(object): st = lstat(se) if isinstance(st, int): # file got unlinked in the interim - self.unlinked_gfids.add(se) + db_remove_data(se) return True self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -340,6 +368,18 @@ class GMasterCommon(object): if self.volinfo: return self.volinfo['volume_mark'] + def get_entry_stime(self): + data = self.slave.server.entry_stime(".", self.uuid) + if isinstance(data, int): + data = None + return data + + def get_data_stime(self): + data = self.slave.server.stime(".", self.uuid) + if isinstance(data, int): + data = None + return data + def xtime(self, path, *a, **opts): """get amended xtime @@ -387,7 +427,6 @@ class GMasterCommon(object): self.volinfo = None self.terminate = False self.sleep_interval = 1 - self.unlinked_gfids = set() def init_keep_alive(cls): """start the keep-alive thread """ @@ -553,7 +592,7 @@ class GMasterCommon(object): self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime) # Purge all changelogs available in processing dir # less than cluster_stime - proc_dir = os.path.join(self.setup_working_dir(), + proc_dir = os.path.join(self.tempdir, ".processing") if os.path.exists(proc_dir): @@ -627,6 +666,11 @@ class GMasterCommon(object): ret = j[-1]() if not ret: succeed = False + + # All the unlinked GFIDs removed from Data and Meta list + # Commit the Transaction + db_commit() + if succeed and not args[0] is None: self.sendmark(path, *args) return succeed @@ -670,9 +714,6 @@ class GMasterChangelogMixin(GMasterCommon): # flat directory hierarchy for gfid based access FLAT_DIR_HIERARCHY = '.' - # maximum retries per changelog before giving up - MAX_RETRIES = 10 - CHANGELOG_LOG_LEVEL = 9 CHANGELOG_CONN_RETRIES = 5 @@ -727,21 +768,48 @@ class GMasterChangelogMixin(GMasterCommon): logging.debug('changelog working dir %s' % workdir) return workdir - def get_purge_time(self): - purge_time = self.xtime('.', self.slave) - if isinstance(purge_time, int): - purge_time = None - return purge_time + def log_failures(self, failures, entry_key, gfid_prefix, log_prefix): + num_failures = 0 + for failure in failures: + st = lstat(os.path.join(gfid_prefix, failure[0][entry_key])) + if not isinstance(st, int): + num_failures += 1 + logging.error('%s FAILED: %s' % (log_prefix, + repr(failure))) + + self.status.inc_value("failures", num_failures) def process_change(self, change, done, retry): pfx = gauxpfx() clist = [] entries = [] - meta_gfid = set() - datas = set() + change_ts = change.split(".")[-1] + + # self.data_batch_start is None only in beginning and during + # new batch start + if self.data_batch_start is None: + self.data_batch_start = change_ts + + # Ignore entry ops which are already processed in Changelog modes + ignore_entry_ops = False + entry_stime = None + data_stime = None + if self.name in ["live_changelog", "history_changelog"]: + entry_stime = self.get_entry_stime() + data_stime = self.get_data_stime() + + if entry_stime is not None and data_stime is not None: + # if entry_stime is not None but data_stime > entry_stime + # This situation is caused by the stime update of Passive worker + # Consider data_stime in this case. + if data_stime[0] > entry_stime[0]: + entry_stime = data_stime + + # Compare the entry_stime with changelog file suffix + # if changelog time is less than entry_stime then ignore + if int(change_ts) <= entry_stime[0]: + ignore_entry_ops = True - # basic crawl stats: files and bytes - files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []} try: f = open(change, "r") clist = f.readlines() @@ -749,42 +817,6 @@ class GMasterChangelogMixin(GMasterCommon): except IOError: raise - def edct(op, **ed): - dct = {} - dct['op'] = op - for k in ed: - if k == 'stat': - st = ed[k] - dst = dct['stat'] = {} - if st: - dst['uid'] = st.st_uid - dst['gid'] = st.st_gid - dst['mode'] = st.st_mode - dst['atime'] = st.st_atime - dst['mtime'] = st.st_mtime - else: - dct[k] = ed[k] - return dct - - # entry counts (not purges) - def entry_update(): - files_pending['count'] += 1 - - # purge count - def purge_update(): - files_pending['purge'] += 1 - - def log_failures(failures, entry_key, gfid_prefix, log_prefix): - num_failures = 0 - for failure in failures: - st = lstat(os.path.join(gfid_prefix, failure[0][entry_key])) - if not isinstance(st, int): - num_failures += 1 - logging.error('%s FAILED: %s' % (log_prefix, - repr(failure))) - - self.status.inc_value("failures", num_failures) - for e in clist: e = e.strip() et = e[self.IDX_START:self.IDX_END] # entry type @@ -792,12 +824,20 @@ class GMasterChangelogMixin(GMasterCommon): # skip ENTRY operation if hot tier brick if self.name == 'live_changelog' or \ - self.name == 'history_changelog': + self.name == 'history_changelog': if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY: logging.debug('skip ENTRY op: %s if hot tier brick' % (ec[self.POS_TYPE])) continue + # Data and Meta operations are decided while parsing + # UNLINK/RMDIR/MKNOD except that case ignore all the other + # entry ops if ignore_entry_ops is True. + # UNLINK/RMDIR/MKNOD entry_ops are ignored in the end + if ignore_entry_ops and et == self.TYPE_ENTRY and \ + ec[self.POS_TYPE] not in ["UNLINK", "RMDIR", "MKNOD"]: + continue + if et == self.TYPE_ENTRY: # extract information according to the type of # the entry operation. create(), mkdir() and mknod() @@ -819,15 +859,16 @@ class GMasterChangelogMixin(GMasterCommon): # Remove from DATA list, so that rsync will # not fail pt = os.path.join(pfx, ec[0]) - if pt in datas: - datas.remove(pt) + st = lstat(pt) + if isinstance(st, int): + # file got unlinked, May be historical Changelog + db_remove_data(pt) + db_remove_meta(pt) if not boolify(gconf.ignore_deletes): - purge_update() - entries.append(edct(ty, gfid=gfid, entry=en)) + if not ignore_entry_ops: + entries.append(edct(ty, gfid=gfid, entry=en)) elif ty in ['CREATE', 'MKDIR', 'MKNOD']: - entry_update() - # Special case: record mknod as link if ty in ['MKNOD']: mode = int(ec[2]) @@ -845,10 +886,10 @@ class GMasterChangelogMixin(GMasterCommon): # CREATED if source not exists. entries.append(edct('LINK', stat=st, entry=en, gfid=gfid)) - # Here, we have the assumption that only # tier-gfid.linkto causes this mknod. Add data - datas.add(os.path.join(pfx, ec[0])) + db_record_data(os.path.join(pfx, ec[0]), + change_ts) continue # stat info. present in the changelog itself @@ -867,7 +908,6 @@ class GMasterChangelogMixin(GMasterCommon): if isinstance(rl, int): rl = None - entry_update() e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st, link=rl)) @@ -880,120 +920,102 @@ class GMasterChangelogMixin(GMasterCommon): continue if ty == 'LINK': - entry_update() entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) elif ty == 'SYMLINK': rl = errno_wrap(os.readlink, [en], [ENOENT], [ESTALE]) if isinstance(rl, int): continue - entry_update() + entries.append( edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) else: logging.warn('ignoring %s [op %s]' % (gfid, ty)) elif et == self.TYPE_GFID: - # If self.unlinked_gfids is available, then that means it is - # retrying the changelog second time. Do not add the GFID's - # to rsync job if failed previously but unlinked in master - if self.unlinked_gfids and \ - os.path.join(pfx, ec[0]) in self.unlinked_gfids: - logging.debug("ignoring data, since file purged interim") - else: - datas.add(os.path.join(pfx, ec[0])) + db_record_data(os.path.join(pfx, ec[0]), change_ts) elif et == self.TYPE_META: if ec[1] == 'SETATTR': # only setattr's for now... - if len(ec) == 5: - # In xsync crawl, we already have stat data - # avoid doing stat again - meta_gfid.add((os.path.join(pfx, ec[0]), - XCrawlMetadata(st_uid=ec[2], - st_gid=ec[3], - st_mode=ec[4], - st_atime=ec[5], - st_mtime=ec[6]))) - else: - meta_gfid.add((os.path.join(pfx, ec[0]), )) + db_record_meta(os.path.join(pfx, ec[0]), change_ts) elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \ ec[1] == 'FXATTROP': # To sync xattr/acls use rsync/tar, --xattrs and --acls # switch to rsync and tar if not boolify(gconf.use_tarssh) and \ (boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)): - datas.add(os.path.join(pfx, ec[0])) + db_record_data(os.path.join(pfx, ec[0]), change_ts) else: logging.warn('got invalid changelog type: %s' % (et)) logging.debug('entries: %s' % repr(entries)) - # Increment counters for Status - self.status.inc_value("entry", len(entries)) - self.files_in_batch += len(datas) - self.status.inc_value("data", len(datas)) - # sync namespace - if entries: + if entries and not ignore_entry_ops: + # Increment counters for Status + self.status.inc_value("entry", len(entries)) failures = self.slave.server.entry_ops(entries) - log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') + self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') self.status.dec_value("entry", len(entries)) - # sync metadata - if meta_gfid: - meta_entries = [] - for go in meta_gfid: - if len(go) > 1: - st = go[1] - else: - st = lstat(go[0]) - if isinstance(st, int): - logging.debug('file %s got purged in the interim' % go[0]) - continue - meta_entries.append(edct('META', go=go[0], stat=st)) - if meta_entries: - self.status.inc_value("meta", len(entries)) - failures = self.slave.server.meta_ops(meta_entries) - log_failures(failures, 'go', '', 'META') - self.status.dec_value("meta", len(entries)) - - # sync data - if datas: + # Update Entry stime in Brick Root only in case of Changelog mode + if self.name in ["live_changelog", "history_changelog"]: + entry_stime_to_update = (int(change_ts) - 1, 0) + self.upd_entry_stime(entry_stime_to_update) + self.status.set_field("last_synced_entry", + entry_stime_to_update[0]) + + if ignore_entry_ops: + # Book keeping, to show in logs the range of Changelogs skipped + self.num_skipped_entry_changelogs += 1 + if self.skipped_entry_changelogs_first is None: + self.skipped_entry_changelogs_first = change_ts + + self.skipped_entry_changelogs_last = change_ts + + # Batch data based on number of changelogs as configured as + # gconf.max_data_changelogs_in_batch(Default is 24 hrs) + # stime will be set after completion of these batch, so on failure + # Geo-rep will progress day by day + if (self.num_changelogs > gconf.max_data_changelogs_in_batch): + # (Start Changelog TS, End Changelog TS, [Changes]) + self.data_batches.append([self.data_batch_start, change_ts, + [change]]) + self.data_batch_start = None + self.num_changelogs = 0 + else: + self.data_batches[-1][1] = change_ts + self.data_batches[-1][2].append(change) + + def datas_to_queue(self, start, end): + # Paginate db entries and add it to Rsync PostBox + offset = 0 + total_datas = 0 + while True: + # Db Pagination + datas = db_get_data(start=start, end=end, + limit=DB_PAGINATION_SIZE_DATA, + offset=offset) + if len(datas) == 0: + break + offset += DB_PAGINATION_SIZE_DATA + total_datas += len(datas) self.a_syncdata(datas) - self.datas_in_batch.update(datas) + return total_datas - def process(self, changes, done=1): - tries = 0 + def handle_data_sync(self, start, end, changes, done, total_datas): + """ + Wait till all rsync jobs are complete, also handle the retries + Update data stime Once Rsync jobs are complete. + """ retry = False - self.unlinked_gfids = set() - self.files_in_batch = 0 - self.datas_in_batch = set() + tries = 0 + # Error log disabled till the last round self.syncer.disable_errorlog() while True: - # first, fire all changelog transfers in parallel. entry and - # metadata are performed synchronously, therefore in serial. - # However at the end of each changelog, data is synchronized - # with syncdata_async() - which means it is serial w.r.t - # entries/metadata of that changelog but happens in parallel - # with data of other changelogs. - if retry: - if tries == (self.MAX_RETRIES - 1): - # Enable Error logging if it is last retry - self.syncer.enable_errorlog() - - # Remove Unlinked GFIDs from Queue - for unlinked_gfid in self.unlinked_gfids: - if unlinked_gfid in self.datas_in_batch: - self.datas_in_batch.remove(unlinked_gfid) - - # Retry only Sync. Do not retry entry ops - if self.datas_in_batch: - self.a_syncdata(self.datas_in_batch) - else: - for change in changes: - logging.debug('processing change %s' % change) - self.process_change(change, done, retry) - if not retry: - # number of changelogs processed in the batch - self.turns += 1 + self.datas_to_queue(start, end) + + if retry and tries == (gconf.max_rsync_retries - 1): + # Enable Error logging if it is last retry + self.syncer.enable_errorlog() # Now we wait for all the data transfers fired off in the above # step to complete. Note that this is not ideal either. Ideally @@ -1016,38 +1038,34 @@ class GMasterChangelogMixin(GMasterCommon): # @change is the last changelog (therefore max time for this batch) if self.syncdata_wait(): - self.unlinked_gfids = set() if done: - xtl = (int(change.split('.')[-1]) - 1, 0) + xtl = (int(changes[-1].split('.')[-1]) - 1, 0) self.upd_stime(xtl) map(self.changelog_done_func, changes) self.archive_and_purge_changelogs(changes) # Reset Data counter after sync - self.status.dec_value("data", self.files_in_batch) - self.files_in_batch = 0 - self.datas_in_batch = set() + self.status.dec_value("data", total_datas) break # We do not know which changelog transfer failed, retry everything. retry = True tries += 1 - if tries == self.MAX_RETRIES: + if tries >= gconf.max_rsync_retries: logging.error('changelogs %s could not be processed ' 'completely - moving on...' % ' '.join(map(os.path.basename, changes))) # Reset data counter on failure - self.status.dec_value("data", self.files_in_batch) - self.files_in_batch = 0 - self.datas_in_batch = set() + self.status.dec_value("data", total_datas) if done: - xtl = (int(change.split('.')[-1]) - 1, 0) + xtl = (int(changes[-1].split('.')[-1]) - 1, 0) self.upd_stime(xtl) map(self.changelog_done_func, changes) self.archive_and_purge_changelogs(changes) break + # it's either entry_ops() or Rsync that failed to do it's # job. Mostly it's entry_ops() [which currently has a problem # of failing to create an entry but failing to return an errno] @@ -1058,11 +1076,114 @@ class GMasterChangelogMixin(GMasterCommon): logging.warn('incomplete sync, retrying changelogs: %s' % ' '.join(map(os.path.basename, changes))) - # Reset the Data counter before Retry - self.status.dec_value("data", self.files_in_batch) - self.files_in_batch = 0 time.sleep(0.5) + # Log Current batch details + if changes: + logging.info( + "{0} mode completed in {1:.4f} seconds " + "({2} - {3} Num: {4}) stime: {5}, entry_stime: {6}".format( + self.name, + time.time() - self.batch_start_time, + changes[0].split("/")[-1], + changes[-1].split("/")[-1], + len(changes), + repr(self.get_data_stime()), + repr(self.get_entry_stime()))) + + def process(self, changes, done=1): + retry = False + first_changelog_ts = changes[0].split(".")[-1] + + db_init(os.path.join(self.tempdir, "temp_changelogs.db")) + + self.skipped_entry_changelogs_first = None + self.skipped_entry_changelogs_last = None + self.num_skipped_entry_changelogs = 0 + self.batch_start_time = time.time() + # (Start Changelog TS, End Changelog TS, [Changes]) + self.data_batches = [[first_changelog_ts, first_changelog_ts, []]] + self.data_batch_start = None + self.num_changelogs = 0 + + for change in changes: + logging.debug('processing change %s' % change) + self.process_change(change, done, retry) + # number of changelogs processed in the batch + self.turns += 1 + + # Rsync/Tar will preserve permissions, so if a GFID exists + # in data queue then it syncs meta details too. Remove + # all meta from meta table if exists in data table + db_delete_meta_if_exists_in_data() + + # All the Data/Meta populated, Commit the Changes in Db + db_commit() + + # Log the Skipped Entry ops range if any + if self.skipped_entry_changelogs_first is not None and \ + self.skipped_entry_changelogs_last is not None: + logging.info("Skipping already processed entry " + "ops from CHANGELOG.{0} to CHANGELOG.{1} " + "Num: {2}".format( + self.skipped_entry_changelogs_first, + self.skipped_entry_changelogs_last, + self.num_skipped_entry_changelogs)) + + # Entry Changelogs syncing finished + logging.info("Syncing Entries completed in {0:.4f} seconds " + "CHANGELOG.{1} - CHANGELOG.{2} " + "Num: {3}".format( + time.time() - self.batch_start_time, + changes[0].split(".")[-1], + changes[-1].split(".")[-1], + len(changes))) + + # Update Status Data and Meta Count + self.status.inc_value("data", db_get_data_count()) + self.status.inc_value("meta", db_get_meta_count()) + + for b in self.data_batches: + # Add to data Queue, so that Rsync will start parallelly + # while syncing Meta ops + total_datas = self.datas_to_queue(b[0], b[1]) + + # Sync Meta + offset = 0 + while True: + # Db Pagination + meta_gfids = db_get_meta(start=b[0], end=b[1], + limit=DB_PAGINATION_SIZE_META, + offset=offset) + if len(meta_gfids) == 0: + break + offset += DB_PAGINATION_SIZE_META + + # Collect required information for GFIDs which + # exists in Master + meta_entries = [] + for go in meta_gfids: + st = lstat(go) + if isinstance(st, int): + logging.debug('file %s got purged in the ' + 'interim' % go) + continue + meta_entries.append(edct('META', go=go, stat=st)) + + if meta_entries: + failures = self.slave.server.meta_ops(meta_entries) + self.log_failures(failures, 'go', '', 'META') + self.status.dec_value("meta", len(meta_entries)) + + # Sync Data, Rsync already started syncing the files + # wait for the completion and retry if required. + self.handle_data_sync(b[0], b[1], b[2], done, total_datas) + + def upd_entry_stime(self, stime): + self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY, + self.uuid, + stime) + def upd_stime(self, stime, path=None): if not path: path = self.FLAT_DIR_HIERARCHY @@ -1085,7 +1206,12 @@ class GMasterChangelogMixin(GMasterCommon): remote_node_ip = node.split(":")[0] self.status.set_slave_node(remote_node_ip) - def changelogs_batch_process(self, changes): + def changelogs_batch_process(self, changes, single_batch=False): + if single_batch and changes: + logging.debug('processing changes %s' % repr(changes)) + self.process(changes) + return + changelogs_batches = [] current_size = 0 for c in changes: @@ -1112,16 +1238,15 @@ class GMasterChangelogMixin(GMasterCommon): changes = [] # get stime (from the brick) and purge changelogs # that are _historical_ to that time. - purge_time = self.get_purge_time() + data_stime = self.get_data_stime() self.changelog_agent.scan() self.crawls += 1 changes = self.changelog_agent.getchanges() if changes: - if purge_time: - logging.info("slave's time: %s" % repr(purge_time)) + if data_stime: processed = [x for x in changes - if int(x.split('.')[-1]) < purge_time[0]] + if int(x.split('.')[-1]) < data_stime[0]] for pr in processed: logging.info( 'skipping already processed change: %s...' % @@ -1136,7 +1261,8 @@ class GMasterChangelogMixin(GMasterCommon): self.changelog_agent = changelog_agent self.sleep_interval = int(gconf.change_interval) self.changelog_done_func = self.changelog_agent.done - self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), + self.tempdir = self.setup_working_dir() + self.processed_changelogs_dir = os.path.join(self.tempdir, ".processed") self.name = "live_changelog" self.status = status @@ -1149,7 +1275,8 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): self.history_crawl_start_time = register_time self.changelog_done_func = self.changelog_agent.history_done self.history_turns = 0 - self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), + self.tempdir = self.setup_working_dir() + self.processed_changelogs_dir = os.path.join(self.tempdir, ".history/.processed") self.name = "history_changelog" self.status = status @@ -1157,15 +1284,17 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): def crawl(self): self.history_turns += 1 self.status.set_worker_crawl_status("History Crawl") - purge_time = self.get_purge_time() + data_stime = self.get_data_stime() end_time = int(time.time()) - logging.info('starting history crawl... turns: %s, stime: %s, etime: %s' - % (self.history_turns, repr(purge_time), repr(end_time))) + logging.info('starting history crawl... turns: %s, stime: %s, ' + 'etime: %s, entry_stime: %s' + % (self.history_turns, repr(data_stime), + repr(end_time), self.get_entry_stime())) - if not purge_time or purge_time == URXTIME: + if not data_stime or data_stime == URXTIME: logging.info("stime not available, abandoning history crawl") - raise NoPurgeTimeAvailable() + raise NoStimeAvailable() # Changelogs backend path is hardcoded as # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different @@ -1174,7 +1303,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): ".glusterfs/changelogs") ret, actual_end = self.changelog_agent.history( changelog_path, - purge_time[0], + data_stime[0], end_time, int(gconf.sync_jobs)) @@ -1184,27 +1313,42 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # to be processed. returns positive value as number of changelogs # to be processed, which will be fetched using # history_getchanges() - while self.changelog_agent.history_scan() > 0: + num_scanned_changelogs = self.changelog_agent.history_scan() + num_changelogs = num_scanned_changelogs + changes = [] + while num_scanned_changelogs > 0: self.crawls += 1 - changes = self.changelog_agent.history_getchanges() + changes += self.changelog_agent.history_getchanges() if changes: - if purge_time: - logging.info("slave's time: %s" % repr(purge_time)) + if data_stime: processed = [x for x in changes - if int(x.split('.')[-1]) < purge_time[0]] + if int(x.split('.')[-1]) < data_stime[0]] for pr in processed: logging.info('skipping already processed change: ' '%s...' % os.path.basename(pr)) self.changelog_done_func(pr) changes.remove(pr) - self.changelogs_batch_process(changes) + if num_changelogs > gconf.max_history_changelogs_in_batch: + self.changelogs_batch_process(changes, single_batch=True) + num_changelogs = 0 + changes = [] + + num_scanned_changelogs = self.changelog_agent.history_scan() + num_changelogs += num_scanned_changelogs + + # If Last batch is not processed with MAX_NUM_CHANGELOGS_IN_BATCH + # condition above + if changes: + self.changelogs_batch_process(changes, single_batch=True) history_turn_time = int(time.time()) - self.history_crawl_start_time - logging.info('finished history crawl syncing, endtime: %s, stime: %s' - % (actual_end, repr(self.get_purge_time()))) + logging.info('finished history crawl syncing, endtime: %s, ' + 'stime: %s, entry_stime: %s' + % (actual_end, repr(self.get_data_stime()), + self.get_entry_stime())) # If TS returned from history_changelog is < register_time # then FS crawl may be required, since history is only available @@ -1269,14 +1413,14 @@ class GMasterXsyncMixin(GMasterChangelogMixin): t = Thread(target=Xsyncer) t.start() logging.info('starting hybrid crawl..., stime: %s' - % repr(self.get_purge_time())) + % repr(self.get_data_stime())) self.status.set_worker_crawl_status("Hybrid Crawl") while True: try: item = self.comlist.pop(0) if item[0] == 'finale': logging.info('finished hybrid crawl syncing, stime: %s' - % repr(self.get_purge_time())) + % repr(self.get_data_stime())) break elif item[0] == 'xsync': logging.info('processing xsync changelog %s' % (item[1])) |