diff options
author | Aravinda VK <avishwan@redhat.com> | 2016-08-08 17:02:37 +0530 |
---|---|---|
committer | Aravinda VK <avishwan@redhat.com> | 2016-08-26 10:45:58 -0700 |
commit | 6c283f107b646405936520e2549510115bf2ef64 (patch) | |
tree | 67459f0c7a502a68413c5cfad5865ca9dcb240e0 /geo-replication/syncdaemon | |
parent | 4a3454753f6e4ddc309c8d1cb11a6e4e432c1da6 (diff) |
geo-rep: Post process Data and Meta Changelogs
With this patch, Data and Meta GFIDs are post processed. If Changelog has
UNLINK entry then remove from Data and Meta GFIDs list(If stat on GFID is
ENOENT in Master).
While processing Changelogs,
- Collect all the data and meta operations in a temporary database
- Delete all Data and Meta GFIDs which are already unlinked as per Changelogs
(unlink only if stat on GFID is ENOENT)
- Process all Entry operations as usual
- Process data and meta operations in batch(Fetch from Db in batch)
- Data sync is again batched based on number of changelogs(Default 1day
changelogs). Once the sync is complete, Update last Changelog's time as last_synced
time as usual.
Additionally maintain entry_stime on Brick root, ignore Entry ops if changelog
suffix time is less than entry_stime. If data stime is more than entry_stime,
this can happen only when passive worker updates stime by itself by getting
mount point stime. Use entry_stime = data_stime in this case.
New configurations:
max-rsync-retries - Default Value is 10
max-data-changelogs-in-batch - Max number of changelogs to be considered in a
batch for syncing. Default value is 5760(4 changelogs per min * 60 min *
24 hours)
max-history-changelogs-in-batch - Max number of history changelogs to be
processed at once. Default value 86400(4 changelogs per min * 60 min * 24
hours * 15 days)
BUG: 1364420
Change-Id: I7b665895bf4806035c2a8573d361257cbadbea17
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/15110
Smoke: Gluster Build System <jenkins@build.gluster.org>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
Reviewed-by: Kotresh HR <khiremat@redhat.com>
CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
Diffstat (limited to 'geo-replication/syncdaemon')
-rw-r--r-- | geo-replication/syncdaemon/Makefile.am | 2 | ||||
-rw-r--r-- | geo-replication/syncdaemon/changelogsdb.py | 111 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 22 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncdstatus.py | 2 | ||||
-rw-r--r-- | geo-replication/syncdaemon/master.py | 504 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 54 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 7 |
7 files changed, 516 insertions, 186 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index ed0f5e40924..ce875bdacb6 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -3,6 +3,6 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \ resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py \ - gsyncdstatus.py + gsyncdstatus.py changelogsdb.py CLEANFILES = diff --git a/geo-replication/syncdaemon/changelogsdb.py b/geo-replication/syncdaemon/changelogsdb.py new file mode 100644 index 00000000000..7e64158e7af --- /dev/null +++ b/geo-replication/syncdaemon/changelogsdb.py @@ -0,0 +1,111 @@ +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import os +import sqlite3 +from errno import ENOENT + +conn = None +cursor = None + + +def db_commit(): + conn.commit() + + +def db_init(db_path): + global conn, cursor + # Remove Temp Db + try: + os.unlink(db_path) + os.unlink(db_path + "-journal") + except OSError as e: + if e.errno != ENOENT: + raise + + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + cursor.execute("DROP TABLE IF EXISTS data") + cursor.execute("DROP TABLE IF EXISTS meta") + query = """CREATE TABLE IF NOT EXISTS data( + gfid VARCHAR(100) PRIMARY KEY ON CONFLICT IGNORE, + changelog_time VARCHAR(100) + )""" + cursor.execute(query) + + query = """CREATE TABLE IF NOT EXISTS meta( + gfid VARCHAR(100) PRIMARY KEY ON CONFLICT IGNORE, + changelog_time VARCHAR(100) + )""" + cursor.execute(query) + + +def db_record_data(gfid, changelog_time): + query = "INSERT INTO data(gfid, changelog_time) VALUES(?, ?)" + cursor.execute(query, (gfid, changelog_time)) + + +def db_record_meta(gfid, changelog_time): + query = "INSERT INTO meta(gfid, changelog_time) VALUES(?, ?)" + cursor.execute(query, (gfid, changelog_time)) + + +def db_remove_meta(gfid): + query = "DELETE FROM meta WHERE gfid = ?" + cursor.execute(query, (gfid, )) + + +def db_remove_data(gfid): + query = "DELETE FROM data WHERE gfid = ?" + cursor.execute(query, (gfid, )) + + +def db_get_data(start, end, limit, offset): + query = """SELECT gfid FROM data WHERE changelog_time + BETWEEN ? AND ? LIMIT ? OFFSET ?""" + cursor.execute(query, (start, end, limit, offset)) + out = [] + for row in cursor: + out.append(row[0]) + + return out + + +def db_get_meta(start, end, limit, offset): + query = """SELECT gfid FROM meta WHERE changelog_time + BETWEEN ? AND ? LIMIT ? OFFSET ?""" + cursor.execute(query, (start, end, limit, offset)) + out = [] + for row in cursor: + out.append(row[0]) + + return out + + +def db_delete_meta_if_exists_in_data(): + query = """ + DELETE FROM meta WHERE gfid in + (SELECT M.gfid + FROM meta M INNER JOIN data D + ON M.gfid = D.gfid) + """ + cursor.execute(query) + + +def db_get_data_count(): + query = "SELECT COUNT(gfid) FROM data" + cursor.execute(query) + return cursor.fetchone()[0] + + +def db_get_meta_count(): + query = "SELECT COUNT(gfid) FROM meta" + cursor.execute(query) + return cursor.fetchone()[0] diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index c2699a183ae..918bee0ce1c 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -274,6 +274,28 @@ def main_i(): op.add_option('--sync-acls', default=True, action='store_true') op.add_option('--log-rsync-performance', default=False, action='store_true') + op.add_option('--max-rsync-retries', type=int, default=10) + + # This is for stime granularity, Bigger batch will be split into + # multiple data batches, On failure it will start from this point + # Default value is 1 day changelogs + # (4 * 60 * 24 = 5760) + # 4 changelogs per minute + # 60 min per hr + # 24 hrs per day + op.add_option('--max-data-changelogs-in-batch', type=int, default=5760) + + # While processing Historical Changelogs above BATCH SIZE is not considered + # since all Changelogs to be post processed once, Batching it makes more + # rsync retries. (4 * 60 * 24 * 15 = 86400) + # 4 changelogs per minute + # 60 min per hr + # 24 hrs per day + # 15 days + # This means 15 days changelogs can be processed at once in case of + # History scan + op.add_option('--max-history-changelogs-in-batch', type=int, default=86400) + op.add_option('--pause-on-start', default=False, action='store_true') op.add_option('-L', '--log-level', metavar='LVL') op.add_option('-r', '--remote-gsyncd', metavar='CMD', diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py index 88398e2ce8a..beacd7473b2 100644 --- a/geo-replication/syncdaemon/gsyncdstatus.py +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -52,6 +52,7 @@ def get_default_values(): "slave_node": DEFAULT_STATUS, "worker_status": DEFAULT_STATUS, "last_synced": 0, + "last_synced_entry": 0, "crawl_status": DEFAULT_STATUS, "entry": 0, "data": 0, @@ -239,6 +240,7 @@ class GeorepStatus(object): slave_node N/A VALUE VALUE N/A status Created VALUE Paused Stopped last_synced N/A VALUE VALUE VALUE + last_synced_entry N/A VALUE VALUE VALUE crawl_status N/A VALUE N/A N/A entry N/A VALUE N/A N/A data N/A VALUE N/A N/A diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 80c4d9d8b95..3df08e41a13 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -25,7 +25,13 @@ from gconf import gconf from syncdutils import Thread, GsyncdError, boolify, escape from syncdutils import unescape, gauxpfx, md5hex, selfkill from syncdutils import lstat, errno_wrap, FreeObject -from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable +from syncdutils import NoStimeAvailable, PartialHistoryAvailable +from changelogsdb import db_init, db_record_data, db_record_meta +from changelogsdb import db_remove_data, db_remove_meta +from changelogsdb import db_get_data, db_get_meta, db_commit +from changelogsdb import db_get_data_count, db_get_meta_count +from changelogsdb import db_delete_meta_if_exists_in_data + URXTIME = (-1, 0) @@ -45,6 +51,10 @@ CHANGELOG_ROLLOVER_TIME = 15 # that batch since stime will get updated after each batch. MAX_CHANGELOG_BATCH_SIZE = 727040 +# Number of record to query once +DB_PAGINATION_SIZE_META = 100 +DB_PAGINATION_SIZE_DATA = 1000 + # Utility functions to help us to get to closer proximity # of the DRY principle (no, don't look for elevated or # perspectivistic things here) @@ -69,6 +79,24 @@ def _volinfo_hook_relax_foreign(self): return volinfo_sys +def edct(op, **ed): + dct = {} + dct['op'] = op + for k in ed: + if k == 'stat': + st = ed[k] + dst = dct['stat'] = {} + if st: + dst['uid'] = st.st_uid + dst['gid'] = st.st_gid + dst['mode'] = st.st_mode + dst['atime'] = st.st_atime + dst['mtime'] = st.st_mtime + else: + dct[k] = ed[k] + return dct + + # The API! def gmaster_builder(excrawl=None): @@ -259,7 +287,7 @@ class TarSSHEngine(object): st = lstat(se) if isinstance(st, int): # file got unlinked in the interim - self.unlinked_gfids.add(se) + db_remove_data(se) return True self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -294,7 +322,7 @@ class RsyncEngine(object): st = lstat(se) if isinstance(st, int): # file got unlinked in the interim - self.unlinked_gfids.add(se) + db_remove_data(se) return True self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -340,6 +368,18 @@ class GMasterCommon(object): if self.volinfo: return self.volinfo['volume_mark'] + def get_entry_stime(self): + data = self.slave.server.entry_stime(".", self.uuid) + if isinstance(data, int): + data = None + return data + + def get_data_stime(self): + data = self.slave.server.stime(".", self.uuid) + if isinstance(data, int): + data = None + return data + def xtime(self, path, *a, **opts): """get amended xtime @@ -387,7 +427,6 @@ class GMasterCommon(object): self.volinfo = None self.terminate = False self.sleep_interval = 1 - self.unlinked_gfids = set() def init_keep_alive(cls): """start the keep-alive thread """ @@ -553,7 +592,7 @@ class GMasterCommon(object): self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime) # Purge all changelogs available in processing dir # less than cluster_stime - proc_dir = os.path.join(self.setup_working_dir(), + proc_dir = os.path.join(self.tempdir, ".processing") if os.path.exists(proc_dir): @@ -627,6 +666,11 @@ class GMasterCommon(object): ret = j[-1]() if not ret: succeed = False + + # All the unlinked GFIDs removed from Data and Meta list + # Commit the Transaction + db_commit() + if succeed and not args[0] is None: self.sendmark(path, *args) return succeed @@ -670,9 +714,6 @@ class GMasterChangelogMixin(GMasterCommon): # flat directory hierarchy for gfid based access FLAT_DIR_HIERARCHY = '.' - # maximum retries per changelog before giving up - MAX_RETRIES = 10 - CHANGELOG_LOG_LEVEL = 9 CHANGELOG_CONN_RETRIES = 5 @@ -727,21 +768,48 @@ class GMasterChangelogMixin(GMasterCommon): logging.debug('changelog working dir %s' % workdir) return workdir - def get_purge_time(self): - purge_time = self.xtime('.', self.slave) - if isinstance(purge_time, int): - purge_time = None - return purge_time + def log_failures(self, failures, entry_key, gfid_prefix, log_prefix): + num_failures = 0 + for failure in failures: + st = lstat(os.path.join(gfid_prefix, failure[0][entry_key])) + if not isinstance(st, int): + num_failures += 1 + logging.error('%s FAILED: %s' % (log_prefix, + repr(failure))) + + self.status.inc_value("failures", num_failures) def process_change(self, change, done, retry): pfx = gauxpfx() clist = [] entries = [] - meta_gfid = set() - datas = set() + change_ts = change.split(".")[-1] + + # self.data_batch_start is None only in beginning and during + # new batch start + if self.data_batch_start is None: + self.data_batch_start = change_ts + + # Ignore entry ops which are already processed in Changelog modes + ignore_entry_ops = False + entry_stime = None + data_stime = None + if self.name in ["live_changelog", "history_changelog"]: + entry_stime = self.get_entry_stime() + data_stime = self.get_data_stime() + + if entry_stime is not None and data_stime is not None: + # if entry_stime is not None but data_stime > entry_stime + # This situation is caused by the stime update of Passive worker + # Consider data_stime in this case. + if data_stime[0] > entry_stime[0]: + entry_stime = data_stime + + # Compare the entry_stime with changelog file suffix + # if changelog time is less than entry_stime then ignore + if int(change_ts) <= entry_stime[0]: + ignore_entry_ops = True - # basic crawl stats: files and bytes - files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []} try: f = open(change, "r") clist = f.readlines() @@ -749,42 +817,6 @@ class GMasterChangelogMixin(GMasterCommon): except IOError: raise - def edct(op, **ed): - dct = {} - dct['op'] = op - for k in ed: - if k == 'stat': - st = ed[k] - dst = dct['stat'] = {} - if st: - dst['uid'] = st.st_uid - dst['gid'] = st.st_gid - dst['mode'] = st.st_mode - dst['atime'] = st.st_atime - dst['mtime'] = st.st_mtime - else: - dct[k] = ed[k] - return dct - - # entry counts (not purges) - def entry_update(): - files_pending['count'] += 1 - - # purge count - def purge_update(): - files_pending['purge'] += 1 - - def log_failures(failures, entry_key, gfid_prefix, log_prefix): - num_failures = 0 - for failure in failures: - st = lstat(os.path.join(gfid_prefix, failure[0][entry_key])) - if not isinstance(st, int): - num_failures += 1 - logging.error('%s FAILED: %s' % (log_prefix, - repr(failure))) - - self.status.inc_value("failures", num_failures) - for e in clist: e = e.strip() et = e[self.IDX_START:self.IDX_END] # entry type @@ -792,12 +824,20 @@ class GMasterChangelogMixin(GMasterCommon): # skip ENTRY operation if hot tier brick if self.name == 'live_changelog' or \ - self.name == 'history_changelog': + self.name == 'history_changelog': if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY: logging.debug('skip ENTRY op: %s if hot tier brick' % (ec[self.POS_TYPE])) continue + # Data and Meta operations are decided while parsing + # UNLINK/RMDIR/MKNOD except that case ignore all the other + # entry ops if ignore_entry_ops is True. + # UNLINK/RMDIR/MKNOD entry_ops are ignored in the end + if ignore_entry_ops and et == self.TYPE_ENTRY and \ + ec[self.POS_TYPE] not in ["UNLINK", "RMDIR", "MKNOD"]: + continue + if et == self.TYPE_ENTRY: # extract information according to the type of # the entry operation. create(), mkdir() and mknod() @@ -819,15 +859,16 @@ class GMasterChangelogMixin(GMasterCommon): # Remove from DATA list, so that rsync will # not fail pt = os.path.join(pfx, ec[0]) - if pt in datas: - datas.remove(pt) + st = lstat(pt) + if isinstance(st, int): + # file got unlinked, May be historical Changelog + db_remove_data(pt) + db_remove_meta(pt) if not boolify(gconf.ignore_deletes): - purge_update() - entries.append(edct(ty, gfid=gfid, entry=en)) + if not ignore_entry_ops: + entries.append(edct(ty, gfid=gfid, entry=en)) elif ty in ['CREATE', 'MKDIR', 'MKNOD']: - entry_update() - # Special case: record mknod as link if ty in ['MKNOD']: mode = int(ec[2]) @@ -845,10 +886,10 @@ class GMasterChangelogMixin(GMasterCommon): # CREATED if source not exists. entries.append(edct('LINK', stat=st, entry=en, gfid=gfid)) - # Here, we have the assumption that only # tier-gfid.linkto causes this mknod. Add data - datas.add(os.path.join(pfx, ec[0])) + db_record_data(os.path.join(pfx, ec[0]), + change_ts) continue # stat info. present in the changelog itself @@ -867,7 +908,6 @@ class GMasterChangelogMixin(GMasterCommon): if isinstance(rl, int): rl = None - entry_update() e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st, link=rl)) @@ -880,120 +920,102 @@ class GMasterChangelogMixin(GMasterCommon): continue if ty == 'LINK': - entry_update() entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) elif ty == 'SYMLINK': rl = errno_wrap(os.readlink, [en], [ENOENT], [ESTALE]) if isinstance(rl, int): continue - entry_update() + entries.append( edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) else: logging.warn('ignoring %s [op %s]' % (gfid, ty)) elif et == self.TYPE_GFID: - # If self.unlinked_gfids is available, then that means it is - # retrying the changelog second time. Do not add the GFID's - # to rsync job if failed previously but unlinked in master - if self.unlinked_gfids and \ - os.path.join(pfx, ec[0]) in self.unlinked_gfids: - logging.debug("ignoring data, since file purged interim") - else: - datas.add(os.path.join(pfx, ec[0])) + db_record_data(os.path.join(pfx, ec[0]), change_ts) elif et == self.TYPE_META: if ec[1] == 'SETATTR': # only setattr's for now... - if len(ec) == 5: - # In xsync crawl, we already have stat data - # avoid doing stat again - meta_gfid.add((os.path.join(pfx, ec[0]), - XCrawlMetadata(st_uid=ec[2], - st_gid=ec[3], - st_mode=ec[4], - st_atime=ec[5], - st_mtime=ec[6]))) - else: - meta_gfid.add((os.path.join(pfx, ec[0]), )) + db_record_meta(os.path.join(pfx, ec[0]), change_ts) elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \ ec[1] == 'FXATTROP': # To sync xattr/acls use rsync/tar, --xattrs and --acls # switch to rsync and tar if not boolify(gconf.use_tarssh) and \ (boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)): - datas.add(os.path.join(pfx, ec[0])) + db_record_data(os.path.join(pfx, ec[0]), change_ts) else: logging.warn('got invalid changelog type: %s' % (et)) logging.debug('entries: %s' % repr(entries)) - # Increment counters for Status - self.status.inc_value("entry", len(entries)) - self.files_in_batch += len(datas) - self.status.inc_value("data", len(datas)) - # sync namespace - if entries: + if entries and not ignore_entry_ops: + # Increment counters for Status + self.status.inc_value("entry", len(entries)) failures = self.slave.server.entry_ops(entries) - log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') + self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') self.status.dec_value("entry", len(entries)) - # sync metadata - if meta_gfid: - meta_entries = [] - for go in meta_gfid: - if len(go) > 1: - st = go[1] - else: - st = lstat(go[0]) - if isinstance(st, int): - logging.debug('file %s got purged in the interim' % go[0]) - continue - meta_entries.append(edct('META', go=go[0], stat=st)) - if meta_entries: - self.status.inc_value("meta", len(entries)) - failures = self.slave.server.meta_ops(meta_entries) - log_failures(failures, 'go', '', 'META') - self.status.dec_value("meta", len(entries)) - - # sync data - if datas: + # Update Entry stime in Brick Root only in case of Changelog mode + if self.name in ["live_changelog", "history_changelog"]: + entry_stime_to_update = (int(change_ts) - 1, 0) + self.upd_entry_stime(entry_stime_to_update) + self.status.set_field("last_synced_entry", + entry_stime_to_update[0]) + + if ignore_entry_ops: + # Book keeping, to show in logs the range of Changelogs skipped + self.num_skipped_entry_changelogs += 1 + if self.skipped_entry_changelogs_first is None: + self.skipped_entry_changelogs_first = change_ts + + self.skipped_entry_changelogs_last = change_ts + + # Batch data based on number of changelogs as configured as + # gconf.max_data_changelogs_in_batch(Default is 24 hrs) + # stime will be set after completion of these batch, so on failure + # Geo-rep will progress day by day + if (self.num_changelogs > gconf.max_data_changelogs_in_batch): + # (Start Changelog TS, End Changelog TS, [Changes]) + self.data_batches.append([self.data_batch_start, change_ts, + [change]]) + self.data_batch_start = None + self.num_changelogs = 0 + else: + self.data_batches[-1][1] = change_ts + self.data_batches[-1][2].append(change) + + def datas_to_queue(self, start, end): + # Paginate db entries and add it to Rsync PostBox + offset = 0 + total_datas = 0 + while True: + # Db Pagination + datas = db_get_data(start=start, end=end, + limit=DB_PAGINATION_SIZE_DATA, + offset=offset) + if len(datas) == 0: + break + offset += DB_PAGINATION_SIZE_DATA + total_datas += len(datas) self.a_syncdata(datas) - self.datas_in_batch.update(datas) + return total_datas - def process(self, changes, done=1): - tries = 0 + def handle_data_sync(self, start, end, changes, done, total_datas): + """ + Wait till all rsync jobs are complete, also handle the retries + Update data stime Once Rsync jobs are complete. + """ retry = False - self.unlinked_gfids = set() - self.files_in_batch = 0 - self.datas_in_batch = set() + tries = 0 + # Error log disabled till the last round self.syncer.disable_errorlog() while True: - # first, fire all changelog transfers in parallel. entry and - # metadata are performed synchronously, therefore in serial. - # However at the end of each changelog, data is synchronized - # with syncdata_async() - which means it is serial w.r.t - # entries/metadata of that changelog but happens in parallel - # with data of other changelogs. - if retry: - if tries == (self.MAX_RETRIES - 1): - # Enable Error logging if it is last retry - self.syncer.enable_errorlog() - - # Remove Unlinked GFIDs from Queue - for unlinked_gfid in self.unlinked_gfids: - if unlinked_gfid in self.datas_in_batch: - self.datas_in_batch.remove(unlinked_gfid) - - # Retry only Sync. Do not retry entry ops - if self.datas_in_batch: - self.a_syncdata(self.datas_in_batch) - else: - for change in changes: - logging.debug('processing change %s' % change) - self.process_change(change, done, retry) - if not retry: - # number of changelogs processed in the batch - self.turns += 1 + self.datas_to_queue(start, end) + + if retry and tries == (gconf.max_rsync_retries - 1): + # Enable Error logging if it is last retry + self.syncer.enable_errorlog() # Now we wait for all the data transfers fired off in the above # step to complete. Note that this is not ideal either. Ideally @@ -1016,38 +1038,34 @@ class GMasterChangelogMixin(GMasterCommon): # @change is the last changelog (therefore max time for this batch) if self.syncdata_wait(): - self.unlinked_gfids = set() if done: - xtl = (int(change.split('.')[-1]) - 1, 0) + xtl = (int(changes[-1].split('.')[-1]) - 1, 0) self.upd_stime(xtl) map(self.changelog_done_func, changes) self.archive_and_purge_changelogs(changes) # Reset Data counter after sync - self.status.dec_value("data", self.files_in_batch) - self.files_in_batch = 0 - self.datas_in_batch = set() + self.status.dec_value("data", total_datas) break # We do not know which changelog transfer failed, retry everything. retry = True tries += 1 - if tries == self.MAX_RETRIES: + if tries >= gconf.max_rsync_retries: logging.error('changelogs %s could not be processed ' 'completely - moving on...' % ' '.join(map(os.path.basename, changes))) # Reset data counter on failure - self.status.dec_value("data", self.files_in_batch) - self.files_in_batch = 0 - self.datas_in_batch = set() + self.status.dec_value("data", total_datas) if done: - xtl = (int(change.split('.')[-1]) - 1, 0) + xtl = (int(changes[-1].split('.')[-1]) - 1, 0) self.upd_stime(xtl) map(self.changelog_done_func, changes) self.archive_and_purge_changelogs(changes) break + # it's either entry_ops() or Rsync that failed to do it's # job. Mostly it's entry_ops() [which currently has a problem # of failing to create an entry but failing to return an errno] @@ -1058,11 +1076,114 @@ class GMasterChangelogMixin(GMasterCommon): logging.warn('incomplete sync, retrying changelogs: %s' % ' '.join(map(os.path.basename, changes))) - # Reset the Data counter before Retry - self.status.dec_value("data", self.files_in_batch) - self.files_in_batch = 0 time.sleep(0.5) + # Log Current batch details + if changes: + logging.info( + "{0} mode completed in {1:.4f} seconds " + "({2} - {3} Num: {4}) stime: {5}, entry_stime: {6}".format( + self.name, + time.time() - self.batch_start_time, + changes[0].split("/")[-1], + changes[-1].split("/")[-1], + len(changes), + repr(self.get_data_stime()), + repr(self.get_entry_stime()))) + + def process(self, changes, done=1): + retry = False + first_changelog_ts = changes[0].split(".")[-1] + + db_init(os.path.join(self.tempdir, "temp_changelogs.db")) + + self.skipped_entry_changelogs_first = None + self.skipped_entry_changelogs_last = None + self.num_skipped_entry_changelogs = 0 + self.batch_start_time = time.time() + # (Start Changelog TS, End Changelog TS, [Changes]) + self.data_batches = [[first_changelog_ts, first_changelog_ts, []]] + self.data_batch_start = None + self.num_changelogs = 0 + + for change in changes: + logging.debug('processing change %s' % change) + self.process_change(change, done, retry) + # number of changelogs processed in the batch + self.turns += 1 + + # Rsync/Tar will preserve permissions, so if a GFID exists + # in data queue then it syncs meta details too. Remove + # all meta from meta table if exists in data table + db_delete_meta_if_exists_in_data() + + # All the Data/Meta populated, Commit the Changes in Db + db_commit() + + # Log the Skipped Entry ops range if any + if self.skipped_entry_changelogs_first is not None and \ + self.skipped_entry_changelogs_last is not None: + logging.info("Skipping already processed entry " + "ops from CHANGELOG.{0} to CHANGELOG.{1} " + "Num: {2}".format( + self.skipped_entry_changelogs_first, + self.skipped_entry_changelogs_last, + self.num_skipped_entry_changelogs)) + + # Entry Changelogs syncing finished + logging.info("Syncing Entries completed in {0:.4f} seconds " + "CHANGELOG.{1} - CHANGELOG.{2} " + "Num: {3}".format( + time.time() - self.batch_start_time, + changes[0].split(".")[-1], + changes[-1].split(".")[-1], + len(changes))) + + # Update Status Data and Meta Count + self.status.inc_value("data", db_get_data_count()) + self.status.inc_value("meta", db_get_meta_count()) + + for b in self.data_batches: + # Add to data Queue, so that Rsync will start parallelly + # while syncing Meta ops + total_datas = self.datas_to_queue(b[0], b[1]) + + # Sync Meta + offset = 0 + while True: + # Db Pagination + meta_gfids = db_get_meta(start=b[0], end=b[1], + limit=DB_PAGINATION_SIZE_META, + offset=offset) + if len(meta_gfids) == 0: + break + offset += DB_PAGINATION_SIZE_META + + # Collect required information for GFIDs which + # exists in Master + meta_entries = [] + for go in meta_gfids: + st = lstat(go) + if isinstance(st, int): + logging.debug('file %s got purged in the ' + 'interim' % go) + continue + meta_entries.append(edct('META', go=go, stat=st)) + + if meta_entries: + failures = self.slave.server.meta_ops(meta_entries) + self.log_failures(failures, 'go', '', 'META') + self.status.dec_value("meta", len(meta_entries)) + + # Sync Data, Rsync already started syncing the files + # wait for the completion and retry if required. + self.handle_data_sync(b[0], b[1], b[2], done, total_datas) + + def upd_entry_stime(self, stime): + self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY, + self.uuid, + stime) + def upd_stime(self, stime, path=None): if not path: path = self.FLAT_DIR_HIERARCHY @@ -1085,7 +1206,12 @@ class GMasterChangelogMixin(GMasterCommon): remote_node_ip = node.split(":")[0] self.status.set_slave_node(remote_node_ip) - def changelogs_batch_process(self, changes): + def changelogs_batch_process(self, changes, single_batch=False): + if single_batch and changes: + logging.debug('processing changes %s' % repr(changes)) + self.process(changes) + return + changelogs_batches = [] current_size = 0 for c in changes: @@ -1112,16 +1238,15 @@ class GMasterChangelogMixin(GMasterCommon): changes = [] # get stime (from the brick) and purge changelogs # that are _historical_ to that time. - purge_time = self.get_purge_time() + data_stime = self.get_data_stime() self.changelog_agent.scan() self.crawls += 1 changes = self.changelog_agent.getchanges() if changes: - if purge_time: - logging.info("slave's time: %s" % repr(purge_time)) + if data_stime: processed = [x for x in changes - if int(x.split('.')[-1]) < purge_time[0]] + if int(x.split('.')[-1]) < data_stime[0]] for pr in processed: logging.info( 'skipping already processed change: %s...' % @@ -1136,7 +1261,8 @@ class GMasterChangelogMixin(GMasterCommon): self.changelog_agent = changelog_agent self.sleep_interval = int(gconf.change_interval) self.changelog_done_func = self.changelog_agent.done - self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), + self.tempdir = self.setup_working_dir() + self.processed_changelogs_dir = os.path.join(self.tempdir, ".processed") self.name = "live_changelog" self.status = status @@ -1149,7 +1275,8 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): self.history_crawl_start_time = register_time self.changelog_done_func = self.changelog_agent.history_done self.history_turns = 0 - self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), + self.tempdir = self.setup_working_dir() + self.processed_changelogs_dir = os.path.join(self.tempdir, ".history/.processed") self.name = "history_changelog" self.status = status @@ -1157,15 +1284,17 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): def crawl(self): self.history_turns += 1 self.status.set_worker_crawl_status("History Crawl") - purge_time = self.get_purge_time() + data_stime = self.get_data_stime() end_time = int(time.time()) - logging.info('starting history crawl... turns: %s, stime: %s, etime: %s' - % (self.history_turns, repr(purge_time), repr(end_time))) + logging.info('starting history crawl... turns: %s, stime: %s, ' + 'etime: %s, entry_stime: %s' + % (self.history_turns, repr(data_stime), + repr(end_time), self.get_entry_stime())) - if not purge_time or purge_time == URXTIME: + if not data_stime or data_stime == URXTIME: logging.info("stime not available, abandoning history crawl") - raise NoPurgeTimeAvailable() + raise NoStimeAvailable() # Changelogs backend path is hardcoded as # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different @@ -1174,7 +1303,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): ".glusterfs/changelogs") ret, actual_end = self.changelog_agent.history( changelog_path, - purge_time[0], + data_stime[0], end_time, int(gconf.sync_jobs)) @@ -1184,27 +1313,42 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # to be processed. returns positive value as number of changelogs # to be processed, which will be fetched using # history_getchanges() - while self.changelog_agent.history_scan() > 0: + num_scanned_changelogs = self.changelog_agent.history_scan() + num_changelogs = num_scanned_changelogs + changes = [] + while num_scanned_changelogs > 0: self.crawls += 1 - changes = self.changelog_agent.history_getchanges() + changes += self.changelog_agent.history_getchanges() if changes: - if purge_time: - logging.info("slave's time: %s" % repr(purge_time)) + if data_stime: processed = [x for x in changes - if int(x.split('.')[-1]) < purge_time[0]] + if int(x.split('.')[-1]) < data_stime[0]] for pr in processed: logging.info('skipping already processed change: ' '%s...' % os.path.basename(pr)) self.changelog_done_func(pr) changes.remove(pr) - self.changelogs_batch_process(changes) + if num_changelogs > gconf.max_history_changelogs_in_batch: + self.changelogs_batch_process(changes, single_batch=True) + num_changelogs = 0 + changes = [] + + num_scanned_changelogs = self.changelog_agent.history_scan() + num_changelogs += num_scanned_changelogs + + # If Last batch is not processed with MAX_NUM_CHANGELOGS_IN_BATCH + # condition above + if changes: + self.changelogs_batch_process(changes, single_batch=True) history_turn_time = int(time.time()) - self.history_crawl_start_time - logging.info('finished history crawl syncing, endtime: %s, stime: %s' - % (actual_end, repr(self.get_purge_time()))) + logging.info('finished history crawl syncing, endtime: %s, ' + 'stime: %s, entry_stime: %s' + % (actual_end, repr(self.get_data_stime()), + self.get_entry_stime())) # If TS returned from history_changelog is < register_time # then FS crawl may be required, since history is only available @@ -1269,14 +1413,14 @@ class GMasterXsyncMixin(GMasterChangelogMixin): t = Thread(target=Xsyncer) t.start() logging.info('starting hybrid crawl..., stime: %s' - % repr(self.get_purge_time())) + % repr(self.get_data_stime())) self.status.set_worker_crawl_status("Hybrid Crawl") while True: try: item = self.comlist.pop(0) if item[0] == 'finale': logging.info('finished hybrid crawl syncing, stime: %s' - % repr(self.get_purge_time())) + % repr(self.get_data_stime())) break elif item[0] == 'xsync': logging.info('processing xsync changelog %s' % (item[1])) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index ed0e7efe2b2..91ca1916f6a 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -31,11 +31,10 @@ import shutil from gconf import gconf import repce from repce import RepceServer, RepceClient -from master import gmaster_builder import syncdutils from syncdutils import GsyncdError, select, privileged, boolify, funcode from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat -from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable +from syncdutils import NoStimeAvailable, PartialHistoryAvailable from syncdutils import ChangelogException, ChangelogHistoryNotAvailable from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION from gsyncdstatus import GeorepStatus @@ -522,6 +521,29 @@ class Server(object): raise @classmethod + @_pathguard + def entry_stime(cls, path, uuid): + """ + entry_stime xattr to reduce the number of retry of Entry changes when + Geo-rep worker crashes and restarts. entry_stime is updated after + processing every changelog file. On failure and restart, worker only + have to reprocess the last changelog for Entry ops. + Xattr Key: <PFX>.<MASTERVOL_UUID>.<SLAVEVOL_UUID>.entry_stime + """ + try: + val = Xattr.lgetxattr(path, + '.'.join([cls.GX_NSPACE, uuid, + 'entry_stime']), + 8) + return struct.unpack('!II', val) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOENT, ENODATA, ENOTDIR): + return ex.errno + else: + raise + + @classmethod def node_uuid(cls, path='.'): try: uuid_l = Xattr.lgetxattr_buf( @@ -542,6 +564,16 @@ class Server(object): @classmethod @_pathguard + def set_entry_stime(cls, path, uuid, mark): + """set @mark as stime for @uuid on @path""" + errno_wrap(Xattr.lsetxattr, + [path, + '.'.join([cls.GX_NSPACE, uuid, 'entry_stime']), + struct.pack('!II', *mark)], + [ENOENT]) + + @classmethod + @_pathguard def set_xtime(cls, path, uuid, mark): """set @mark as xtime for @uuid on @path""" errno_wrap(Xattr.lsetxattr, @@ -1376,6 +1408,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def gmaster_instantiate_tuple(self, slave): """return a tuple of the 'one shot' and the 'main crawl' class instance""" + from master import gmaster_builder return (gmaster_builder('xsync')(self, slave), gmaster_builder()(self, slave), gmaster_builder('changeloghistory')(self, slave)) @@ -1436,6 +1469,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): uuid + '.' + gconf.slave_id) ), slave.server) + slave.server.entry_stime = types.MethodType( + lambda _self, path, uuid: ( + brickserver.entry_stime( + path, + uuid + '.' + gconf.slave_id) + ), + slave.server) slave.server.set_stime = types.MethodType( lambda _self, path, uuid, mark: ( brickserver.set_stime(path, @@ -1443,6 +1483,14 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): mark) ), slave.server) + slave.server.set_entry_stime = types.MethodType( + lambda _self, path, uuid, mark: ( + brickserver.set_entry_stime( + path, + uuid + '.' + gconf.slave_id, + mark) + ), + slave.server) (g1, g2, g3) = self.gmaster_instantiate_tuple(slave) g1.master.server = brickserver g2.master.server = brickserver @@ -1506,7 +1554,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): except ChangelogHistoryNotAvailable: logging.info('Changelog history not available, using xsync') g1.crawlwrap(oneshot=True, register_time=register_time) - except NoPurgeTimeAvailable: + except NoStimeAvailable: logging.info('No stime available, using xsync crawl') g1.crawlwrap(oneshot=True, register_time=register_time) except ChangelogException as e: diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 40eff050a9e..f7beb947efc 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -45,7 +45,7 @@ except ImportError: # auxiliary gfid based access prefix _CL_AUX_GFID_PFX = ".gfid/" -GF_OP_RETRIES = 20 +GF_OP_RETRIES = 10 CHANGELOG_AGENT_SERVER_VERSION = 1.0 CHANGELOG_AGENT_CLIENT_VERSION = 1.0 @@ -494,15 +494,18 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]): def lstat(e): return errno_wrap(os.lstat, [e], [ENOENT], [ESTALE]) -class NoPurgeTimeAvailable(Exception): + +class NoStimeAvailable(Exception): pass class PartialHistoryAvailable(Exception): pass + class ChangelogHistoryNotAvailable(Exception): pass + class ChangelogException(OSError): pass |