diff options
-rw-r--r-- | geo-replication/syncdaemon/Makefile.am | 2 | ||||
-rw-r--r-- | geo-replication/syncdaemon/changelogsdb.py | 111 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 21 | ||||
-rw-r--r-- | geo-replication/syncdaemon/master.py | 307 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 2 |
5 files changed, 123 insertions, 320 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index 88c9e64e525..7cdaf45ddec 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -3,6 +3,6 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \ resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py \ - gsyncdstatus.py changelogsdb.py conf.py + gsyncdstatus.py conf.py CLEANFILES = diff --git a/geo-replication/syncdaemon/changelogsdb.py b/geo-replication/syncdaemon/changelogsdb.py deleted file mode 100644 index 7e64158e7af..00000000000 --- a/geo-replication/syncdaemon/changelogsdb.py +++ /dev/null @@ -1,111 +0,0 @@ -# -# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> -# This file is part of GlusterFS. - -# This file is licensed to you under your choice of the GNU Lesser -# General Public License, version 3 or any later version (LGPLv3 or -# later), or the GNU General Public License, version 2 (GPLv2), in all -# cases as published by the Free Software Foundation. -# - -import os -import sqlite3 -from errno import ENOENT - -conn = None -cursor = None - - -def db_commit(): - conn.commit() - - -def db_init(db_path): - global conn, cursor - # Remove Temp Db - try: - os.unlink(db_path) - os.unlink(db_path + "-journal") - except OSError as e: - if e.errno != ENOENT: - raise - - conn = sqlite3.connect(db_path) - cursor = conn.cursor() - cursor.execute("DROP TABLE IF EXISTS data") - cursor.execute("DROP TABLE IF EXISTS meta") - query = """CREATE TABLE IF NOT EXISTS data( - gfid VARCHAR(100) PRIMARY KEY ON CONFLICT IGNORE, - changelog_time VARCHAR(100) - )""" - cursor.execute(query) - - query = """CREATE TABLE IF NOT EXISTS meta( - gfid VARCHAR(100) PRIMARY KEY ON CONFLICT IGNORE, - changelog_time VARCHAR(100) - )""" - cursor.execute(query) - - -def db_record_data(gfid, changelog_time): - query = "INSERT INTO data(gfid, changelog_time) VALUES(?, ?)" - cursor.execute(query, (gfid, changelog_time)) - - -def db_record_meta(gfid, changelog_time): - query = "INSERT INTO meta(gfid, changelog_time) VALUES(?, ?)" - cursor.execute(query, (gfid, changelog_time)) - - -def db_remove_meta(gfid): - query = "DELETE FROM meta WHERE gfid = ?" - cursor.execute(query, (gfid, )) - - -def db_remove_data(gfid): - query = "DELETE FROM data WHERE gfid = ?" - cursor.execute(query, (gfid, )) - - -def db_get_data(start, end, limit, offset): - query = """SELECT gfid FROM data WHERE changelog_time - BETWEEN ? AND ? LIMIT ? OFFSET ?""" - cursor.execute(query, (start, end, limit, offset)) - out = [] - for row in cursor: - out.append(row[0]) - - return out - - -def db_get_meta(start, end, limit, offset): - query = """SELECT gfid FROM meta WHERE changelog_time - BETWEEN ? AND ? LIMIT ? OFFSET ?""" - cursor.execute(query, (start, end, limit, offset)) - out = [] - for row in cursor: - out.append(row[0]) - - return out - - -def db_delete_meta_if_exists_in_data(): - query = """ - DELETE FROM meta WHERE gfid in - (SELECT M.gfid - FROM meta M INNER JOIN data D - ON M.gfid = D.gfid) - """ - cursor.execute(query) - - -def db_get_data_count(): - query = "SELECT COUNT(gfid) FROM data" - cursor.execute(query) - return cursor.fetchone()[0] - - -def db_get_meta_count(): - query = "SELECT COUNT(gfid) FROM meta" - cursor.execute(query) - return cursor.fetchone()[0] diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 918bee0ce1c..84b26a41145 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -275,27 +275,6 @@ def main_i(): op.add_option('--log-rsync-performance', default=False, action='store_true') op.add_option('--max-rsync-retries', type=int, default=10) - - # This is for stime granularity, Bigger batch will be split into - # multiple data batches, On failure it will start from this point - # Default value is 1 day changelogs - # (4 * 60 * 24 = 5760) - # 4 changelogs per minute - # 60 min per hr - # 24 hrs per day - op.add_option('--max-data-changelogs-in-batch', type=int, default=5760) - - # While processing Historical Changelogs above BATCH SIZE is not considered - # since all Changelogs to be post processed once, Batching it makes more - # rsync retries. (4 * 60 * 24 * 15 = 86400) - # 4 changelogs per minute - # 60 min per hr - # 24 hrs per day - # 15 days - # This means 15 days changelogs can be processed at once in case of - # History scan - op.add_option('--max-history-changelogs-in-batch', type=int, default=86400) - op.add_option('--pause-on-start', default=False, action='store_true') op.add_option('-L', '--log-level', metavar='LVL') op.add_option('-r', '--remote-gsyncd', metavar='CMD', diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 3df08e41a13..7d015aee718 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -26,12 +26,6 @@ from syncdutils import Thread, GsyncdError, boolify, escape from syncdutils import unescape, gauxpfx, md5hex, selfkill from syncdutils import lstat, errno_wrap, FreeObject from syncdutils import NoStimeAvailable, PartialHistoryAvailable -from changelogsdb import db_init, db_record_data, db_record_meta -from changelogsdb import db_remove_data, db_remove_meta -from changelogsdb import db_get_data, db_get_meta, db_commit -from changelogsdb import db_get_data_count, db_get_meta_count -from changelogsdb import db_delete_meta_if_exists_in_data - URXTIME = (-1, 0) @@ -51,10 +45,6 @@ CHANGELOG_ROLLOVER_TIME = 15 # that batch since stime will get updated after each batch. MAX_CHANGELOG_BATCH_SIZE = 727040 -# Number of record to query once -DB_PAGINATION_SIZE_META = 100 -DB_PAGINATION_SIZE_DATA = 1000 - # Utility functions to help us to get to closer proximity # of the DRY principle (no, don't look for elevated or # perspectivistic things here) @@ -287,7 +277,7 @@ class TarSSHEngine(object): st = lstat(se) if isinstance(st, int): # file got unlinked in the interim - db_remove_data(se) + self.unlinked_gfids.add(se) return True self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -322,7 +312,7 @@ class RsyncEngine(object): st = lstat(se) if isinstance(st, int): # file got unlinked in the interim - db_remove_data(se) + self.unlinked_gfids.add(se) return True self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -427,6 +417,7 @@ class GMasterCommon(object): self.volinfo = None self.terminate = False self.sleep_interval = 1 + self.unlinked_gfids = set() def init_keep_alive(cls): """start the keep-alive thread """ @@ -666,11 +657,6 @@ class GMasterCommon(object): ret = j[-1]() if not ret: succeed = False - - # All the unlinked GFIDs removed from Data and Meta list - # Commit the Transaction - db_commit() - if succeed and not args[0] is None: self.sendmark(path, *args) return succeed @@ -783,12 +769,10 @@ class GMasterChangelogMixin(GMasterCommon): pfx = gauxpfx() clist = [] entries = [] - change_ts = change.split(".")[-1] + meta_gfid = set() + datas = set() - # self.data_batch_start is None only in beginning and during - # new batch start - if self.data_batch_start is None: - self.data_batch_start = change_ts + change_ts = change.split(".")[-1] # Ignore entry ops which are already processed in Changelog modes ignore_entry_ops = False @@ -860,10 +844,9 @@ class GMasterChangelogMixin(GMasterCommon): # not fail pt = os.path.join(pfx, ec[0]) st = lstat(pt) - if isinstance(st, int): + if pt in datas and isinstance(st, int): # file got unlinked, May be historical Changelog - db_remove_data(pt) - db_remove_meta(pt) + datas.remove(pt) if not boolify(gconf.ignore_deletes): if not ignore_entry_ops: @@ -886,10 +869,10 @@ class GMasterChangelogMixin(GMasterCommon): # CREATED if source not exists. entries.append(edct('LINK', stat=st, entry=en, gfid=gfid)) + # Here, we have the assumption that only # tier-gfid.linkto causes this mknod. Add data - db_record_data(os.path.join(pfx, ec[0]), - change_ts) + datas.add(os.path.join(pfx, ec[0])) continue # stat info. present in the changelog itself @@ -931,25 +914,48 @@ class GMasterChangelogMixin(GMasterCommon): else: logging.warn('ignoring %s [op %s]' % (gfid, ty)) elif et == self.TYPE_GFID: - db_record_data(os.path.join(pfx, ec[0]), change_ts) + # If self.unlinked_gfids is available, then that means it is + # retrying the changelog second time. Do not add the GFID's + # to rsync job if failed previously but unlinked in master + if self.unlinked_gfids and \ + os.path.join(pfx, ec[0]) in self.unlinked_gfids: + logging.debug("ignoring data, since file purged interim") + else: + datas.add(os.path.join(pfx, ec[0])) elif et == self.TYPE_META: if ec[1] == 'SETATTR': # only setattr's for now... - db_record_meta(os.path.join(pfx, ec[0]), change_ts) + if len(ec) == 5: + # In xsync crawl, we already have stat data + # avoid doing stat again + meta_gfid.add((os.path.join(pfx, ec[0]), + XCrawlMetadata(st_uid=ec[2], + st_gid=ec[3], + st_mode=ec[4], + st_atime=ec[5], + st_mtime=ec[6]))) + else: + meta_gfid.add((os.path.join(pfx, ec[0]), )) elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \ ec[1] == 'FXATTROP': # To sync xattr/acls use rsync/tar, --xattrs and --acls # switch to rsync and tar if not boolify(gconf.use_tarssh) and \ (boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)): - db_record_data(os.path.join(pfx, ec[0]), change_ts) + datas.add(os.path.join(pfx, ec[0])) else: logging.warn('got invalid changelog type: %s' % (et)) logging.debug('entries: %s' % repr(entries)) + # Increment counters for Status + self.status.inc_value("entry", len(entries)) + self.files_in_batch += len(datas) + self.status.inc_value("data", len(datas)) + # sync namespace if entries and not ignore_entry_ops: # Increment counters for Status self.status.inc_value("entry", len(entries)) + failures = self.slave.server.entry_ops(entries) self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') self.status.dec_value("entry", len(entries)) @@ -969,53 +975,70 @@ class GMasterChangelogMixin(GMasterCommon): self.skipped_entry_changelogs_last = change_ts - # Batch data based on number of changelogs as configured as - # gconf.max_data_changelogs_in_batch(Default is 24 hrs) - # stime will be set after completion of these batch, so on failure - # Geo-rep will progress day by day - if (self.num_changelogs > gconf.max_data_changelogs_in_batch): - # (Start Changelog TS, End Changelog TS, [Changes]) - self.data_batches.append([self.data_batch_start, change_ts, - [change]]) - self.data_batch_start = None - self.num_changelogs = 0 - else: - self.data_batches[-1][1] = change_ts - self.data_batches[-1][2].append(change) - - def datas_to_queue(self, start, end): - # Paginate db entries and add it to Rsync PostBox - offset = 0 - total_datas = 0 - while True: - # Db Pagination - datas = db_get_data(start=start, end=end, - limit=DB_PAGINATION_SIZE_DATA, - offset=offset) - if len(datas) == 0: - break - offset += DB_PAGINATION_SIZE_DATA - total_datas += len(datas) + # sync metadata + if meta_gfid: + meta_entries = [] + for go in meta_gfid: + if len(go) > 1: + st = go[1] + else: + st = lstat(go[0]) + if isinstance(st, int): + logging.debug('file %s got purged in the interim' % go[0]) + continue + meta_entries.append(edct('META', go=go[0], stat=st)) + if meta_entries: + self.status.inc_value("meta", len(entries)) + failures = self.slave.server.meta_ops(meta_entries) + self.log_failures(failures, 'go', '', 'META') + self.status.dec_value("meta", len(entries)) + + # sync data + if datas: self.a_syncdata(datas) - return total_datas + self.datas_in_batch.update(datas) - def handle_data_sync(self, start, end, changes, done, total_datas): - """ - Wait till all rsync jobs are complete, also handle the retries - Update data stime Once Rsync jobs are complete. - """ - retry = False + def process(self, changes, done=1): tries = 0 + retry = False + self.unlinked_gfids = set() + self.files_in_batch = 0 + self.datas_in_batch = set() # Error log disabled till the last round self.syncer.disable_errorlog() + self.skipped_entry_changelogs_first = None + self.skipped_entry_changelogs_last = None + self.num_skipped_entry_changelogs = 0 + self.batch_start_time = time.time() while True: - if retry: - self.datas_to_queue(start, end) + # first, fire all changelog transfers in parallel. entry and + # metadata are performed synchronously, therefore in serial. + # However at the end of each changelog, data is synchronized + # with syncdata_async() - which means it is serial w.r.t + # entries/metadata of that changelog but happens in parallel + # with data of other changelogs. - if retry and tries == (gconf.max_rsync_retries - 1): - # Enable Error logging if it is last retry - self.syncer.enable_errorlog() + if retry: + if tries == (int(gconf.max_rsync_retries) - 1): + # Enable Error logging if it is last retry + self.syncer.enable_errorlog() + + # Remove Unlinked GFIDs from Queue + for unlinked_gfid in self.unlinked_gfids: + if unlinked_gfid in self.datas_in_batch: + self.datas_in_batch.remove(unlinked_gfid) + + # Retry only Sync. Do not retry entry ops + if self.datas_in_batch: + self.a_syncdata(self.datas_in_batch) + else: + for change in changes: + logging.debug('processing change %s' % change) + self.process_change(change, done, retry) + if not retry: + # number of changelogs processed in the batch + self.turns += 1 # Now we wait for all the data transfers fired off in the above # step to complete. Note that this is not ideal either. Ideally @@ -1038,34 +1061,38 @@ class GMasterChangelogMixin(GMasterCommon): # @change is the last changelog (therefore max time for this batch) if self.syncdata_wait(): + self.unlinked_gfids = set() if done: - xtl = (int(changes[-1].split('.')[-1]) - 1, 0) + xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) map(self.changelog_done_func, changes) self.archive_and_purge_changelogs(changes) # Reset Data counter after sync - self.status.dec_value("data", total_datas) + self.status.dec_value("data", self.files_in_batch) + self.files_in_batch = 0 + self.datas_in_batch = set() break # We do not know which changelog transfer failed, retry everything. retry = True tries += 1 - if tries >= gconf.max_rsync_retries: + if tries == int(gconf.max_rsync_retries): logging.error('changelogs %s could not be processed ' 'completely - moving on...' % ' '.join(map(os.path.basename, changes))) # Reset data counter on failure - self.status.dec_value("data", total_datas) + self.status.dec_value("data", self.files_in_batch) + self.files_in_batch = 0 + self.datas_in_batch = set() if done: - xtl = (int(changes[-1].split('.')[-1]) - 1, 0) + xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) map(self.changelog_done_func, changes) self.archive_and_purge_changelogs(changes) break - # it's either entry_ops() or Rsync that failed to do it's # job. Mostly it's entry_ops() [which currently has a problem # of failing to create an entry but failing to return an errno] @@ -1076,8 +1103,21 @@ class GMasterChangelogMixin(GMasterCommon): logging.warn('incomplete sync, retrying changelogs: %s' % ' '.join(map(os.path.basename, changes))) + # Reset the Data counter before Retry + self.status.dec_value("data", self.files_in_batch) + self.files_in_batch = 0 time.sleep(0.5) + # Log the Skipped Entry ops range if any + if self.skipped_entry_changelogs_first is not None and \ + self.skipped_entry_changelogs_last is not None: + logging.info("Skipping already processed entry " + "ops from CHANGELOG.{0} to CHANGELOG.{1} " + "Num: {2}".format( + self.skipped_entry_changelogs_first, + self.skipped_entry_changelogs_last, + self.num_skipped_entry_changelogs)) + # Log Current batch details if changes: logging.info( @@ -1091,94 +1131,6 @@ class GMasterChangelogMixin(GMasterCommon): repr(self.get_data_stime()), repr(self.get_entry_stime()))) - def process(self, changes, done=1): - retry = False - first_changelog_ts = changes[0].split(".")[-1] - - db_init(os.path.join(self.tempdir, "temp_changelogs.db")) - - self.skipped_entry_changelogs_first = None - self.skipped_entry_changelogs_last = None - self.num_skipped_entry_changelogs = 0 - self.batch_start_time = time.time() - # (Start Changelog TS, End Changelog TS, [Changes]) - self.data_batches = [[first_changelog_ts, first_changelog_ts, []]] - self.data_batch_start = None - self.num_changelogs = 0 - - for change in changes: - logging.debug('processing change %s' % change) - self.process_change(change, done, retry) - # number of changelogs processed in the batch - self.turns += 1 - - # Rsync/Tar will preserve permissions, so if a GFID exists - # in data queue then it syncs meta details too. Remove - # all meta from meta table if exists in data table - db_delete_meta_if_exists_in_data() - - # All the Data/Meta populated, Commit the Changes in Db - db_commit() - - # Log the Skipped Entry ops range if any - if self.skipped_entry_changelogs_first is not None and \ - self.skipped_entry_changelogs_last is not None: - logging.info("Skipping already processed entry " - "ops from CHANGELOG.{0} to CHANGELOG.{1} " - "Num: {2}".format( - self.skipped_entry_changelogs_first, - self.skipped_entry_changelogs_last, - self.num_skipped_entry_changelogs)) - - # Entry Changelogs syncing finished - logging.info("Syncing Entries completed in {0:.4f} seconds " - "CHANGELOG.{1} - CHANGELOG.{2} " - "Num: {3}".format( - time.time() - self.batch_start_time, - changes[0].split(".")[-1], - changes[-1].split(".")[-1], - len(changes))) - - # Update Status Data and Meta Count - self.status.inc_value("data", db_get_data_count()) - self.status.inc_value("meta", db_get_meta_count()) - - for b in self.data_batches: - # Add to data Queue, so that Rsync will start parallelly - # while syncing Meta ops - total_datas = self.datas_to_queue(b[0], b[1]) - - # Sync Meta - offset = 0 - while True: - # Db Pagination - meta_gfids = db_get_meta(start=b[0], end=b[1], - limit=DB_PAGINATION_SIZE_META, - offset=offset) - if len(meta_gfids) == 0: - break - offset += DB_PAGINATION_SIZE_META - - # Collect required information for GFIDs which - # exists in Master - meta_entries = [] - for go in meta_gfids: - st = lstat(go) - if isinstance(st, int): - logging.debug('file %s got purged in the ' - 'interim' % go) - continue - meta_entries.append(edct('META', go=go, stat=st)) - - if meta_entries: - failures = self.slave.server.meta_ops(meta_entries) - self.log_failures(failures, 'go', '', 'META') - self.status.dec_value("meta", len(meta_entries)) - - # Sync Data, Rsync already started syncing the files - # wait for the completion and retry if required. - self.handle_data_sync(b[0], b[1], b[2], done, total_datas) - def upd_entry_stime(self, stime): self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY, self.uuid, @@ -1206,12 +1158,7 @@ class GMasterChangelogMixin(GMasterCommon): remote_node_ip = node.split(":")[0] self.status.set_slave_node(remote_node_ip) - def changelogs_batch_process(self, changes, single_batch=False): - if single_batch and changes: - logging.debug('processing changes %s' % repr(changes)) - self.process(changes) - return - + def changelogs_batch_process(self, changes): changelogs_batches = [] current_size = 0 for c in changes: @@ -1245,6 +1192,7 @@ class GMasterChangelogMixin(GMasterCommon): changes = self.changelog_agent.getchanges() if changes: if data_stime: + logging.info("slave's time: %s" % repr(data_stime)) processed = [x for x in changes if int(x.split('.')[-1]) < data_stime[0]] for pr in processed: @@ -1313,15 +1261,13 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # to be processed. returns positive value as number of changelogs # to be processed, which will be fetched using # history_getchanges() - num_scanned_changelogs = self.changelog_agent.history_scan() - num_changelogs = num_scanned_changelogs - changes = [] - while num_scanned_changelogs > 0: + while self.changelog_agent.history_scan() > 0: self.crawls += 1 - changes += self.changelog_agent.history_getchanges() + changes = self.changelog_agent.history_getchanges() if changes: if data_stime: + logging.info("slave's time: %s" % repr(data_stime)) processed = [x for x in changes if int(x.split('.')[-1]) < data_stime[0]] for pr in processed: @@ -1330,18 +1276,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): self.changelog_done_func(pr) changes.remove(pr) - if num_changelogs > gconf.max_history_changelogs_in_batch: - self.changelogs_batch_process(changes, single_batch=True) - num_changelogs = 0 - changes = [] - - num_scanned_changelogs = self.changelog_agent.history_scan() - num_changelogs += num_scanned_changelogs - - # If Last batch is not processed with MAX_NUM_CHANGELOGS_IN_BATCH - # condition above - if changes: - self.changelogs_batch_process(changes, single_batch=True) + self.changelogs_batch_process(changes) history_turn_time = int(time.time()) - self.history_crawl_start_time diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 91ca1916f6a..00bf4cb9d5c 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -31,6 +31,7 @@ import shutil from gconf import gconf import repce from repce import RepceServer, RepceClient +from master import gmaster_builder import syncdutils from syncdutils import GsyncdError, select, privileged, boolify, funcode from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat @@ -1408,7 +1409,6 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def gmaster_instantiate_tuple(self, slave): """return a tuple of the 'one shot' and the 'main crawl' class instance""" - from master import gmaster_builder return (gmaster_builder('xsync')(self, slave), gmaster_builder()(self, slave), gmaster_builder('changeloghistory')(self, slave)) |