diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 940 |
1 files changed, 682 insertions, 258 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index b65abf98589..9501aeae6b5 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -12,20 +12,23 @@ import os import sys import time import stat -import json import logging import fcntl import string import errno import tarfile -from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN, ESTALE +from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN, ESTALE, EINTR from threading import Condition, Lock from datetime import datetime -from gconf import gconf -from syncdutils import Thread, GsyncdError, boolify, escape -from syncdutils import unescape, gauxpfx, md5hex, selfkill -from syncdutils import lstat, errno_wrap, FreeObject -from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable + +import gsyncdconfig as gconf +import libgfchangelog +from rconf import rconf +from syncdutils import (Thread, GsyncdError, escape_space_newline, + unescape_space_newline, gauxpfx, escape, + lstat, errno_wrap, FreeObject, lf, matching_disk_gfid, + NoStimeAvailable, PartialHistoryAvailable, + host_brick_split) URXTIME = (-1, 0) @@ -37,14 +40,6 @@ URXTIME = (-1, 0) # crawl before starting live changelog crawl. CHANGELOG_ROLLOVER_TIME = 15 -# Max size of Changelogs to process per batch, Changelogs Processing is -# not limited by the number of changelogs but instead based on -# size of the changelog file, One sample changelog file size was 145408 -# with ~1000 CREATE and ~1000 DATA. 5 such files in one batch is 727040 -# If geo-rep worker crashes while processing a batch, it has to retry only -# that batch since stime will get updated after each batch. -MAX_CHANGELOG_BATCH_SIZE = 727040 - # Utility functions to help us to get to closer proximity # of the DRY principle (no, don't look for elevated or # perspectivistic things here) @@ -62,36 +57,75 @@ def _volinfo_hook_relax_foreign(self): fgn_vi = volinfo_sys[self.KFGN] if fgn_vi: expiry = fgn_vi['timeout'] - int(time.time()) + 1 - logging.info('foreign volume info found, waiting %d sec for expiry' % - expiry) + logging.info(lf('foreign volume info found, waiting for expiry', + expiry=expiry)) time.sleep(expiry) volinfo_sys = self.get_sys_volinfo() return volinfo_sys +def edct(op, **ed): + dct = {} + dct['op'] = op + # This is used in automatic gfid conflict resolution. + # When marked True, it's skipped during re-processing. + dct['skip_entry'] = False + for k in ed: + if k == 'stat': + st = ed[k] + dst = dct['stat'] = {} + if st: + dst['uid'] = st.st_uid + dst['gid'] = st.st_gid + dst['mode'] = st.st_mode + dst['atime'] = st.st_atime + dst['mtime'] = st.st_mtime + else: + dct[k] = ed[k] + return dct + + # The API! def gmaster_builder(excrawl=None): """produce the GMaster class variant corresponding to sync mode""" this = sys.modules[__name__] - modemixin = gconf.special_sync_mode + modemixin = gconf.get("special-sync-mode") if not modemixin: modemixin = 'normal' - changemixin = 'xsync' if gconf.change_detector == 'xsync' \ - else excrawl or gconf.change_detector - logging.info('setting up %s change detection mode' % changemixin) + + if gconf.get("change-detector") == 'xsync': + changemixin = 'xsync' + elif excrawl: + changemixin = excrawl + else: + changemixin = gconf.get("change-detector") + + logging.debug(lf('setting up change detection mode', + mode=changemixin)) modemixin = getattr(this, modemixin.capitalize() + 'Mixin') crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin') - sendmarkmixin = boolify( - gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin - purgemixin = boolify( - gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin - syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine + + if gconf.get("use-rsync-xattrs"): + sendmarkmixin = SendmarkRsyncMixin + else: + sendmarkmixin = SendmarkNormalMixin + + if gconf.get("ignore-deletes"): + purgemixin = PurgeNoopMixin + else: + purgemixin = PurgeNormalMixin + + if gconf.get("sync-method") == "tarssh": + syncengine = TarSSHEngine + else: + syncengine = RsyncEngine class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin, syncengine): pass + return _GMaster @@ -128,9 +162,9 @@ class NormalMixin(object): return xt0 >= xt1 def make_xtime_opts(self, is_master, opts): - if not 'create' in opts: + if 'create' not in opts: opts['create'] = is_master - if not 'default_xtime' in opts: + if 'default_xtime' not in opts: opts['default_xtime'] = URXTIME def xtime_low(self, rsc, path, **opts): @@ -149,7 +183,9 @@ class NormalMixin(object): xt = _xtime_now() rsc.server.aggregated.set_xtime(path, self.uuid, xt) else: - xt = opts['default_xtime'] + zero_zero = (0, 0) + if xt != zero_zero: + xt = opts['default_xtime'] return xt def keepalive_payload_hook(self, timo, gap): @@ -161,7 +197,7 @@ class NormalMixin(object): vi = vi.copy() vi['timeout'] = int(time.time()) + timo else: - # send keep-alives more frequently to + # send keep-alive more frequently to # avoid a delay in announcing our volume info # to slave if it becomes established in the # meantime @@ -199,9 +235,9 @@ class RecoverMixin(NormalMixin): @staticmethod def make_xtime_opts(is_master, opts): - if not 'create' in opts: + if 'create' not in opts: opts['create'] = False - if not 'default_xtime' in opts: + if 'default_xtime' not in opts: opts['default_xtime'] = URXTIME def keepalive_payload_hook(self, timo, gap): @@ -244,7 +280,7 @@ class TarSSHEngine(object): """ def a_syncdata(self, files): - logging.debug('files: %s' % (files)) + logging.debug(lf("Files", files=files)) for f in files: pb = self.syncer.add(f) @@ -252,7 +288,7 @@ class TarSSHEngine(object): def regjob(se, xte, pb): rv = pb.wait() if rv[0]: - logging.debug('synced ' + se) + logging.debug(lf('synced', file=se)) return True else: # stat check for file presence @@ -278,16 +314,16 @@ class RsyncEngine(object): """Sync engine that uses rsync(1) for data transfers""" def a_syncdata(self, files): - logging.debug('files: %s' % (files)) + logging.debug(lf("files", files=files)) for f in files: - logging.debug('candidate for syncing %s' % f) + logging.debug(lf('candidate for syncing', file=f)) pb = self.syncer.add(f) def regjob(se, xte, pb): rv = pb.wait() if rv[0]: - logging.debug('synced ' + se) + logging.debug(lf('synced', file=se)) return True else: # stat to check if the file exist @@ -340,6 +376,18 @@ class GMasterCommon(object): if self.volinfo: return self.volinfo['volume_mark'] + def get_entry_stime(self): + data = self.slave.server.entry_stime(".", self.uuid) + if isinstance(data, int): + data = None + return data + + def get_data_stime(self): + data = self.slave.server.stime(".", self.uuid) + if isinstance(data, int): + data = None + return data + def xtime(self, path, *a, **opts): """get amended xtime @@ -360,11 +408,9 @@ class GMasterCommon(object): self.master = master self.slave = slave self.jobtab = {} - if boolify(gconf.use_tarssh): - logging.info("using 'tar over ssh' as the sync engine") + if gconf.get("sync-method") == "tarssh": self.syncer = Syncer(slave, self.slave.tarssh, [2]) else: - logging.info("using 'rsync' as the sync engine") # partial transfer (cf. rsync(1)), that's normal self.syncer = Syncer(slave, self.slave.rsync, [23, 24]) # crawls vs. turns: @@ -378,7 +424,7 @@ class GMasterCommon(object): # 0. self.crawls = 0 self.turns = 0 - self.total_turns = int(gconf.turns) + self.total_turns = rconf.turns self.crawl_start = datetime.now() self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} self.start = None @@ -391,7 +437,7 @@ class GMasterCommon(object): def init_keep_alive(cls): """start the keep-alive thread """ - timo = int(gconf.timeout or 0) + timo = gconf.get("slave-timeout", 0) if timo > 0: def keep_alive(): while True: @@ -404,30 +450,22 @@ class GMasterCommon(object): def mgmt_lock(self): """Take management volume lock """ - if gconf.mgmt_lock_fd: + if rconf.mgmt_lock_fd: try: - fcntl.lockf(gconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) - if not gconf.active_earlier: - gconf.active_earlier = True - logging.info("Got lock : %s : Becoming ACTIVE" - % gconf.local_path) + fcntl.lockf(rconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) return True except: ex = sys.exc_info()[1] if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): - if not gconf.passive_earlier: - gconf.passive_earlier = True - logging.info("Didn't get lock : %s : Becoming PASSIVE" - % gconf.local_path) return False raise fd = None - bname = str(self.uuid) + "_" + str(gconf.slave_id) + "_subvol_" \ - + str(gconf.subvol_num) + ".lock" - mgmt_lock_dir = os.path.join(gconf.meta_volume_mnt, "geo-rep") + bname = str(self.uuid) + "_" + rconf.args.slave_id + "_subvol_" \ + + str(rconf.args.subvol_num) + ".lock" + mgmt_lock_dir = os.path.join(gconf.get("meta-volume-mnt"), "geo-rep") path = os.path.join(mgmt_lock_dir, bname) - logging.debug("lock_file_path: %s" % path) + logging.debug(lf("lock file path", path=path)) try: fd = os.open(path, os.O_CREAT | os.O_RDWR) except OSError: @@ -448,29 +486,22 @@ class GMasterCommon(object): try: fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # Save latest FD for future use - gconf.mgmt_lock_fd = fd + rconf.mgmt_lock_fd = fd except: ex = sys.exc_info()[1] if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): # cannot grab, it's taken - if not gconf.passive_earlier: - gconf.passive_earlier = True - logging.info("Didn't get lock : %s : Becoming PASSIVE" - % gconf.local_path) - gconf.mgmt_lock_fd = fd + rconf.mgmt_lock_fd = fd return False raise - if not gconf.active_earlier: - gconf.active_earlier = True - logging.info("Got lock : %s : Becoming ACTIVE" % gconf.local_path) return True def should_crawl(self): - if not boolify(gconf.use_meta_volume): - return gconf.glusterd_uuid in self.master.server.node_uuid() + if not gconf.get("use-meta-volume"): + return rconf.args.local_node_id in self.master.server.node_uuid() - if not os.path.ismount(gconf.meta_volume_mnt): + if not os.path.ismount(gconf.get("meta-volume-mnt")): logging.error("Meta-volume is not mounted. Worker Exiting...") sys.exit(1) return self.mgmt_lock() @@ -487,8 +518,8 @@ class GMasterCommon(object): # If crawlwrap is called when partial history available, # then it sets register_time which is the time when geo-rep - # worker registerd to changelog consumption. Since nsec is - # not considered in register time, their are chances of skipping + # worker registered to changelog consumption. Since nsec is + # not considered in register time, there are chances of skipping # changes detection in xsync crawl. This limit will be reset when # crawlwrap is called again. self.live_changelog_start_time = None @@ -498,25 +529,24 @@ class GMasterCommon(object): # no need to maintain volinfo state machine. # in a cascading setup, each geo-replication session is # independent (ie. 'volume-mark' and 'xtime' are not - # propogated). This is because the slave's xtime is now + # propagated). This is because the slave's xtime is now # stored on the master itself. 'volume-mark' just identifies # that we are in a cascading setup and need to enable # 'geo-replication.ignore-pid-check' option. volinfo_sys = self.volinfo_hook() self.volinfo = volinfo_sys[self.KNAT] inter_master = volinfo_sys[self.KFGN] - logging.info("%s master with volume id %s ..." % - (inter_master and "intermediate" or "primary", - self.uuid)) - gconf.configinterface.set('volume_id', self.uuid) + logging.debug("%s master with volume id %s ..." % + (inter_master and "intermediate" or "primary", + self.uuid)) + rconf.volume_id = self.uuid if self.volinfo: if self.volinfo['retval']: - logging.warn("master cluster's info may not be valid %d" % - self.volinfo['retval']) + logging.warn(lf("master cluster's info may not be valid", + error=self.volinfo['retval'])) else: raise GsyncdError("master volinfo unavailable") self.lastreport['time'] = time.time() - logging.info('crawl interval: %d seconds' % self.sleep_interval) t0 = time.time() crawl = self.should_crawl() @@ -527,14 +557,14 @@ class GMasterCommon(object): self.start = time.time() should_display_info = self.start - self.lastreport['time'] >= 60 if should_display_info: - logging.info("%d crawls, %d turns", - self.crawls - self.lastreport['crawls'], - self.turns - self.lastreport['turns']) + logging.debug("%d crawls, %d turns", + self.crawls - self.lastreport['crawls'], + self.turns - self.lastreport['turns']) self.lastreport.update(crawls=self.crawls, turns=self.turns, time=self.start) t1 = time.time() - if int(t1 - t0) >= int(gconf.replica_failover_interval): + if int(t1 - t0) >= gconf.get("replica-failover-interval"): crawl = self.should_crawl() t0 = t1 self.update_worker_remote_node() @@ -544,16 +574,19 @@ class GMasterCommon(object): # which is min of cluster (but max of the replicas) brick_stime = self.xtime('.', self.slave) cluster_stime = self.master.server.aggregated.stime_mnt( - '.', '.'.join([str(self.uuid), str(gconf.slave_id)])) - logging.debug("Cluster stime: %s | Brick stime: %s" % - (repr(cluster_stime), repr(brick_stime))) + '.', '.'.join([str(self.uuid), rconf.args.slave_id])) + logging.debug(lf("Crawl info", + cluster_stime=cluster_stime, + brick_stime=brick_stime)) + if not isinstance(cluster_stime, int): if brick_stime < cluster_stime: self.slave.server.set_stime( self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime) + self.upd_stime(cluster_stime) # Purge all changelogs available in processing dir # less than cluster_stime - proc_dir = os.path.join(self.setup_working_dir(), + proc_dir = os.path.join(self.tempdir, ".processing") if os.path.exists(proc_dir): @@ -667,20 +700,44 @@ class GMasterChangelogMixin(GMasterCommon): TYPE_GFID = "D " TYPE_ENTRY = "E " + MAX_EF_RETRIES = 10 + MAX_OE_RETRIES = 10 + # flat directory hierarchy for gfid based access FLAT_DIR_HIERARCHY = '.' - # maximum retries per changelog before giving up - MAX_RETRIES = 10 - CHANGELOG_CONN_RETRIES = 5 + def init_fop_batch_stats(self): + self.batch_stats = { + "CREATE": 0, + "MKNOD": 0, + "UNLINK": 0, + "MKDIR": 0, + "RMDIR": 0, + "LINK": 0, + "SYMLINK": 0, + "RENAME": 0, + "SETATTR": 0, + "SETXATTR": 0, + "XATTROP": 0, + "DATA": 0, + "ENTRY_SYNC_TIME": 0, + "META_SYNC_TIME": 0, + "DATA_START_TIME": 0 + } + + def update_fop_batch_stats(self, ty): + if ty in ['FSETXATTR']: + ty = 'SETXATTR' + self.batch_stats[ty] = self.batch_stats.get(ty, 0) + 1 + def archive_and_purge_changelogs(self, changelogs): # Creates tar file instead of tar.gz, since changelogs will # be appended to existing tar. archive name is # archive_<YEAR><MONTH>.tar archive_name = "archive_%s.tar" % datetime.today().strftime( - gconf.changelog_archive_format) + gconf.get("changelog-archive-format")) try: tar = tarfile.open(os.path.join(self.processed_changelogs_dir, @@ -716,21 +773,218 @@ class GMasterChangelogMixin(GMasterCommon): else: raise - def fallback_xsync(self): - logging.info('falling back to xsync mode') - gconf.configinterface.set('change-detector', 'xsync') - selfkill() - def setup_working_dir(self): - workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path)) + workdir = os.path.join(gconf.get("working-dir"), + escape(rconf.args.local_path)) logging.debug('changelog working dir %s' % workdir) return workdir - def get_purge_time(self): - purge_time = self.xtime('.', self.slave) - if isinstance(purge_time, int): - purge_time = None - return purge_time + def log_failures(self, failures, entry_key, gfid_prefix, log_prefix): + num_failures = 0 + for failure in failures: + st = lstat(os.path.join(gfid_prefix, failure[0][entry_key])) + if not isinstance(st, int): + num_failures += 1 + logging.error(lf('%s FAILED' % log_prefix, + data=failure)) + if failure[0]['op'] == 'MKDIR': + raise GsyncdError("The above directory failed to sync." + " Please fix it to proceed further.") + + self.status.inc_value("failures", num_failures) + + def fix_possible_entry_failures(self, failures, retry_count, entries): + pfx = gauxpfx() + fix_entry_ops = [] + failures1 = [] + remove_gfids = set() + for failure in failures: + if failure[2]['name_mismatch']: + pbname = failure[2]['slave_entry'] + elif failure[2]['dst']: + pbname = failure[0]['entry1'] + else: + pbname = failure[0]['entry'] + + op = failure[0]['op'] + # name exists but gfid is different + if failure[2]['gfid_mismatch'] or failure[2]['name_mismatch']: + slave_gfid = failure[2]['slave_gfid'] + st = lstat(os.path.join(pfx, slave_gfid)) + # Takes care of scenarios with no hardlinks + if isinstance(st, int) and st == ENOENT: + logging.debug(lf('Entry not present on master. Fixing gfid ' + 'mismatch in slave. Deleting the entry', + retry_count=retry_count, + entry=repr(failure))) + # Add deletion to fix_entry_ops list + if failure[2]['slave_isdir']: + fix_entry_ops.append( + edct('RMDIR', + gfid=failure[2]['slave_gfid'], + entry=pbname)) + else: + fix_entry_ops.append( + edct('UNLINK', + gfid=failure[2]['slave_gfid'], + entry=pbname)) + remove_gfids.add(slave_gfid) + if op in ['RENAME']: + # If renamed gfid doesn't exists on master, remove + # rename entry and unlink src on slave + st = lstat(os.path.join(pfx, failure[0]['gfid'])) + if isinstance(st, int) and st == ENOENT: + logging.debug("Unlink source %s" % repr(failure)) + remove_gfids.add(failure[0]['gfid']) + fix_entry_ops.append( + edct('UNLINK', + gfid=failure[0]['gfid'], + entry=failure[0]['entry'])) + # Takes care of scenarios of hardlinks/renames on master + elif not isinstance(st, int): + if matching_disk_gfid(slave_gfid, pbname): + # Safe to ignore the failure as master contains same + # file with same gfid. Remove entry from entries list + logging.debug(lf('Fixing gfid mismatch in slave. ' + ' Safe to ignore, take out entry', + retry_count=retry_count, + entry=repr(failure))) + remove_gfids.add(failure[0]['gfid']) + if op == 'RENAME': + fix_entry_ops.append( + edct('UNLINK', + gfid=failure[0]['gfid'], + entry=failure[0]['entry'])) + # The file exists on master but with different name. + # Probably renamed and got missed during xsync crawl. + elif failure[2]['slave_isdir']: + realpath = os.readlink(os.path.join( + rconf.args.local_path, + ".glusterfs", + slave_gfid[0:2], + slave_gfid[2:4], + slave_gfid)) + dst_entry = os.path.join(pfx, realpath.split('/')[-2], + realpath.split('/')[-1]) + src_entry = pbname + logging.debug(lf('Fixing dir name/gfid mismatch in ' + 'slave', retry_count=retry_count, + entry=repr(failure))) + if src_entry == dst_entry: + # Safe to ignore the failure as master contains + # same directory as in slave with same gfid. + # Remove the failure entry from entries list + logging.debug(lf('Fixing dir name/gfid mismatch' + ' in slave. Safe to ignore, ' + 'take out entry', + retry_count=retry_count, + entry=repr(failure))) + try: + entries.remove(failure[0]) + except ValueError: + pass + else: + rename_dict = edct('RENAME', gfid=slave_gfid, + entry=src_entry, + entry1=dst_entry, stat=st, + link=None) + logging.debug(lf('Fixing dir name/gfid mismatch' + ' in slave. Renaming', + retry_count=retry_count, + entry=repr(rename_dict))) + fix_entry_ops.append(rename_dict) + else: + # A hardlink file exists with different name or + # renamed file exists and we are sure from + # matching_disk_gfid check that the entry doesn't + # exist with same gfid so we can safely delete on slave + logging.debug(lf('Fixing file gfid mismatch in slave. ' + 'Hardlink/Rename Case. Deleting entry', + retry_count=retry_count, + entry=repr(failure))) + fix_entry_ops.append( + edct('UNLINK', + gfid=failure[2]['slave_gfid'], + entry=pbname)) + elif failure[1] == ENOENT: + if op in ['RENAME']: + pbname = failure[0]['entry1'] + else: + pbname = failure[0]['entry'] + + pargfid = pbname.split('/')[1] + st = lstat(os.path.join(pfx, pargfid)) + # Safe to ignore the failure as master doesn't contain + # parent directory. + if isinstance(st, int): + logging.debug(lf('Fixing ENOENT error in slave. Parent ' + 'does not exist on master. Safe to ' + 'ignore, take out entry', + retry_count=retry_count, + entry=repr(failure))) + try: + entries.remove(failure[0]) + except ValueError: + pass + else: + logging.debug(lf('Fixing ENOENT error in slave. Create ' + 'parent directory on slave.', + retry_count=retry_count, + entry=repr(failure))) + realpath = os.readlink(os.path.join(rconf.args.local_path, + ".glusterfs", + pargfid[0:2], + pargfid[2:4], + pargfid)) + dir_entry = os.path.join(pfx, realpath.split('/')[-2], + realpath.split('/')[-1]) + fix_entry_ops.append( + edct('MKDIR', gfid=pargfid, entry=dir_entry, + mode=st.st_mode, uid=st.st_uid, gid=st.st_gid)) + + logging.debug("remove_gfids: %s" % repr(remove_gfids)) + if remove_gfids: + for e in entries: + if e['op'] in ['MKDIR', 'MKNOD', 'CREATE', 'RENAME'] \ + and e['gfid'] in remove_gfids: + logging.debug("Removed entry op from retrial list: entry: %s" % repr(e)) + e['skip_entry'] = True + + if fix_entry_ops: + # Process deletions of entries whose gfids are mismatched + failures1 = self.slave.server.entry_ops(fix_entry_ops) + + return (failures1, fix_entry_ops) + + def handle_entry_failures(self, failures, entries): + retries = 0 + pending_failures = False + failures1 = [] + failures2 = [] + entry_ops1 = [] + entry_ops2 = [] + + if failures: + pending_failures = True + failures1 = failures + entry_ops1 = entries + + while pending_failures and retries < self.MAX_EF_RETRIES: + retries += 1 + (failures2, entry_ops2) = self.fix_possible_entry_failures( + failures1, retries, entry_ops1) + if not failures2: + pending_failures = False + logging.info(lf('Successfully fixed entry ops with gfid ' + 'mismatch', retry_count=retries)) + else: + pending_failures = True + failures1 = failures2 + entry_ops1 = entry_ops2 + + if pending_failures: + for failure in failures1: + logging.error("Failed to fix entry ops %s", repr(failure)) def process_change(self, change, done, retry): pfx = gauxpfx() @@ -739,8 +993,28 @@ class GMasterChangelogMixin(GMasterCommon): meta_gfid = set() datas = set() - # basic crawl stats: files and bytes - files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []} + change_ts = change.split(".")[-1] + + # Ignore entry ops which are already processed in Changelog modes + ignore_entry_ops = False + entry_stime = None + data_stime = None + if self.name in ["live_changelog", "history_changelog"]: + entry_stime = self.get_entry_stime() + data_stime = self.get_data_stime() + + if entry_stime is not None and data_stime is not None: + # if entry_stime is not None but data_stime > entry_stime + # This situation is caused by the stime update of Passive worker + # Consider data_stime in this case. + if data_stime[0] > entry_stime[0]: + entry_stime = data_stime + + # Compare the entry_stime with changelog file suffix + # if changelog time is less than entry_stime then ignore + if int(change_ts) <= entry_stime[0]: + ignore_entry_ops = True + try: f = open(change, "r") clist = f.readlines() @@ -748,42 +1022,6 @@ class GMasterChangelogMixin(GMasterCommon): except IOError: raise - def edct(op, **ed): - dct = {} - dct['op'] = op - for k in ed: - if k == 'stat': - st = ed[k] - dst = dct['stat'] = {} - if st: - dst['uid'] = st.st_uid - dst['gid'] = st.st_gid - dst['mode'] = st.st_mode - dst['atime'] = st.st_atime - dst['mtime'] = st.st_mtime - else: - dct[k] = ed[k] - return dct - - # entry counts (not purges) - def entry_update(): - files_pending['count'] += 1 - - # purge count - def purge_update(): - files_pending['purge'] += 1 - - def log_failures(failures, entry_key, gfid_prefix, log_prefix): - num_failures = 0 - for failure in failures: - st = lstat(os.path.join(gfid_prefix, failure[0][entry_key])) - if not isinstance(st, int): - num_failures += 1 - logging.error('%s FAILED: %s' % (log_prefix, - repr(failure))) - - self.status.inc_value("failures", num_failures) - for e in clist: e = e.strip() et = e[self.IDX_START:self.IDX_END] # entry type @@ -791,12 +1029,20 @@ class GMasterChangelogMixin(GMasterCommon): # skip ENTRY operation if hot tier brick if self.name == 'live_changelog' or \ - self.name == 'history_changelog': - if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY: - logging.debug('skip ENTRY op: %s if hot tier brick' - % (ec[self.POS_TYPE])) + self.name == 'history_changelog': + if rconf.args.is_hottier and et == self.TYPE_ENTRY: + logging.debug(lf('skip ENTRY op if hot tier brick', + op=ec[self.POS_TYPE])) continue + # Data and Meta operations are decided while parsing + # UNLINK/RMDIR/MKNOD except that case ignore all the other + # entry ops if ignore_entry_ops is True. + # UNLINK/RMDIR/MKNOD entry_ops are ignored in the end + if ignore_entry_ops and et == self.TYPE_ENTRY and \ + ec[self.POS_TYPE] not in ["UNLINK", "RMDIR", "MKNOD"]: + continue + if et == self.TYPE_ENTRY: # extract information according to the type of # the entry operation. create(), mkdir() and mknod() @@ -804,8 +1050,11 @@ class GMasterChangelogMixin(GMasterCommon): # itself, so no need to stat()... ty = ec[self.POS_TYPE] + self.update_fop_batch_stats(ec[self.POS_TYPE]) + # PARGFID/BNAME - en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1])) + en = unescape_space_newline( + os.path.join(pfx, ec[self.POS_ENTRY1])) # GFID of the entry gfid = ec[self.POS_GFID] @@ -813,24 +1062,30 @@ class GMasterChangelogMixin(GMasterCommon): # The index of PARGFID/BNAME for UNLINK, RMDIR # is no more the last index. It varies based on # changelog.capture-del-path is enabled or not. - en = unescape(os.path.join(pfx, ec[self.UNLINK_ENTRY])) + en = unescape_space_newline( + os.path.join(pfx, ec[self.UNLINK_ENTRY])) # Remove from DATA list, so that rsync will # not fail pt = os.path.join(pfx, ec[0]) - if pt in datas: + st = lstat(pt) + if pt in datas and isinstance(st, int): + # file got unlinked, May be historical Changelog datas.remove(pt) - if not boolify(gconf.ignore_deletes): - purge_update() - entries.append(edct(ty, gfid=gfid, entry=en)) - elif ty in ['CREATE', 'MKDIR', 'MKNOD']: - entry_update() + if ty in ['RMDIR'] and not isinstance(st, int): + logging.info(lf('Ignoring rmdir. Directory present in ' + 'master', gfid=gfid, pgfid_bname=en)) + continue + if not gconf.get("ignore-deletes"): + if not ignore_entry_ops: + entries.append(edct(ty, gfid=gfid, entry=en)) + elif ty in ['CREATE', 'MKDIR', 'MKNOD']: # Special case: record mknod as link if ty in ['MKNOD']: mode = int(ec[2]) - if mode & 01000: + if mode & 0o1000: # Avoid stat'ing the file as it # may be deleted in the interim st = FreeObject(st_mode=int(ec[2]), @@ -862,34 +1117,61 @@ class GMasterChangelogMixin(GMasterCommon): rl = None if st and stat.S_ISLNK(st.st_mode): - rl = errno_wrap(os.readlink, [en], [ENOENT], [ESTALE]) + rl = errno_wrap(os.readlink, [en], [ENOENT], + [ESTALE, EINTR]) if isinstance(rl, int): rl = None - entry_update() - e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) + e1 = unescape_space_newline( + os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st, link=rl)) + # If src doesn't exist while doing rename, destination + # is created. If data is not followed by rename, this + # remains zero byte file on slave. Hence add data entry + # for renames + datas.add(os.path.join(pfx, gfid)) else: # stat() to get mode and other information + if not matching_disk_gfid(gfid, en): + logging.debug(lf('Ignoring entry, purged in the ' + 'interim', file=en, gfid=gfid)) + continue + go = os.path.join(pfx, gfid) st = lstat(go) if isinstance(st, int): - logging.debug('file %s got purged in the interim' % go) + logging.debug(lf('Ignoring entry, purged in the ' + 'interim', file=en, gfid=gfid)) continue if ty == 'LINK': - entry_update() - entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) + rl = None + if st and stat.S_ISLNK(st.st_mode): + rl = errno_wrap(os.readlink, [en], [ENOENT], + [ESTALE, EINTR]) + if isinstance(rl, int): + rl = None + entries.append(edct(ty, stat=st, entry=en, gfid=gfid, + link=rl)) + # If src doesn't exist while doing link, destination + # is created based on file type. If data is not + # followed by link, this remains zero byte file on + # slave. Hence add data entry for links + if rl is None: + datas.add(os.path.join(pfx, gfid)) elif ty == 'SYMLINK': - rl = errno_wrap(os.readlink, [en], [ENOENT], [ESTALE]) + rl = errno_wrap(os.readlink, [en], [ENOENT], + [ESTALE, EINTR]) if isinstance(rl, int): continue - entry_update() + entries.append( edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) else: - logging.warn('ignoring %s [op %s]' % (gfid, ty)) + logging.warn(lf('ignoring op', + gfid=gfid, + type=ty)) elif et == self.TYPE_GFID: # If self.unlinked_gfids is available, then that means it is # retrying the changelog second time. Do not add the GFID's @@ -900,6 +1182,7 @@ class GMasterChangelogMixin(GMasterCommon): else: datas.add(os.path.join(pfx, ec[0])) elif et == self.TYPE_META: + self.update_fop_batch_stats(ec[self.POS_TYPE]) if ec[1] == 'SETATTR': # only setattr's for now... if len(ec) == 5: # In xsync crawl, we already have stat data @@ -912,28 +1195,69 @@ class GMasterChangelogMixin(GMasterCommon): st_mtime=ec[6]))) else: meta_gfid.add((os.path.join(pfx, ec[0]), )) - elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \ - ec[1] == 'FXATTROP': + elif ec[1] in ['SETXATTR', 'XATTROP', 'FXATTROP']: # To sync xattr/acls use rsync/tar, --xattrs and --acls # switch to rsync and tar - if not boolify(gconf.use_tarssh) and \ - (boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)): + if not gconf.get("sync-method") == "tarssh" and \ + (gconf.get("sync-xattrs") or gconf.get("sync-acls")): datas.add(os.path.join(pfx, ec[0])) else: - logging.warn('got invalid changelog type: %s' % (et)) + logging.warn(lf('got invalid fop type', + type=et)) logging.debug('entries: %s' % repr(entries)) # Increment counters for Status - self.status.inc_value("entry", len(entries)) self.files_in_batch += len(datas) self.status.inc_value("data", len(datas)) + self.batch_stats["DATA"] += self.files_in_batch - \ + self.batch_stats["SETXATTR"] - \ + self.batch_stats["XATTROP"] + + entry_start_time = time.time() # sync namespace - if entries: + if entries and not ignore_entry_ops: + # Increment counters for Status + self.status.inc_value("entry", len(entries)) + failures = self.slave.server.entry_ops(entries) - log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') + + if gconf.get("gfid-conflict-resolution"): + count = 0 + if failures: + logging.info(lf('Entry ops failed with gfid mismatch', + count=len(failures))) + while failures and count < self.MAX_OE_RETRIES: + count += 1 + self.handle_entry_failures(failures, entries) + logging.info(lf('Retry original entries', count=count)) + failures = self.slave.server.entry_ops(entries) + if not failures: + logging.info("Successfully fixed all entry ops with " + "gfid mismatch") + break + + self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') self.status.dec_value("entry", len(entries)) + # Update Entry stime in Brick Root only in case of Changelog mode + if self.name in ["live_changelog", "history_changelog"]: + entry_stime_to_update = (int(change_ts) - 1, 0) + self.upd_entry_stime(entry_stime_to_update) + self.status.set_field("last_synced_entry", + entry_stime_to_update[0]) + + self.batch_stats["ENTRY_SYNC_TIME"] += time.time() - entry_start_time + + if ignore_entry_ops: + # Book keeping, to show in logs the range of Changelogs skipped + self.num_skipped_entry_changelogs += 1 + if self.skipped_entry_changelogs_first is None: + self.skipped_entry_changelogs_first = change_ts + + self.skipped_entry_changelogs_last = change_ts + + meta_start_time = time.time() # sync metadata if meta_gfid: meta_entries = [] @@ -943,14 +1267,20 @@ class GMasterChangelogMixin(GMasterCommon): else: st = lstat(go[0]) if isinstance(st, int): - logging.debug('file %s got purged in the interim' % go[0]) + logging.debug(lf('file got purged in the interim', + file=go[0])) continue meta_entries.append(edct('META', go=go[0], stat=st)) if meta_entries: - self.status.inc_value("meta", len(entries)) + self.status.inc_value("meta", len(meta_entries)) failures = self.slave.server.meta_ops(meta_entries) - log_failures(failures, 'go', '', 'META') - self.status.dec_value("meta", len(entries)) + self.log_failures(failures, 'go', '', 'META') + self.status.dec_value("meta", len(meta_entries)) + + self.batch_stats["META_SYNC_TIME"] += time.time() - meta_start_time + + if self.batch_stats["DATA_START_TIME"] == 0: + self.batch_stats["DATA_START_TIME"] = time.time() # sync data if datas: @@ -963,7 +1293,13 @@ class GMasterChangelogMixin(GMasterCommon): self.unlinked_gfids = set() self.files_in_batch = 0 self.datas_in_batch = set() + # Error log disabled till the last round self.syncer.disable_errorlog() + self.skipped_entry_changelogs_first = None + self.skipped_entry_changelogs_last = None + self.num_skipped_entry_changelogs = 0 + self.batch_start_time = time.time() + self.init_fop_batch_stats() while True: # first, fire all changelog transfers in parallel. entry and @@ -974,7 +1310,7 @@ class GMasterChangelogMixin(GMasterCommon): # with data of other changelogs. if retry: - if tries == (self.MAX_RETRIES - 1): + if tries == (gconf.get("max-rsync-retries") - 1): # Enable Error logging if it is last retry self.syncer.enable_errorlog() @@ -988,7 +1324,8 @@ class GMasterChangelogMixin(GMasterCommon): self.a_syncdata(self.datas_in_batch) else: for change in changes: - logging.debug('processing change %s' % change) + logging.debug(lf('processing change', + changelog=change)) self.process_change(change, done, retry) if not retry: # number of changelogs processed in the batch @@ -1019,7 +1356,7 @@ class GMasterChangelogMixin(GMasterCommon): if done: xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) - map(self.changelog_done_func, changes) + list(map(self.changelog_done_func, changes)) self.archive_and_purge_changelogs(changes) # Reset Data counter after sync @@ -1031,10 +1368,10 @@ class GMasterChangelogMixin(GMasterCommon): # We do not know which changelog transfer failed, retry everything. retry = True tries += 1 - if tries == self.MAX_RETRIES: - logging.error('changelogs %s could not be processed ' - 'completely - moving on...' % - ' '.join(map(os.path.basename, changes))) + if tries == gconf.get("max-rsync-retries"): + logging.error(lf('changelogs could not be processed ' + 'completely - moving on...', + files=list(map(os.path.basename, changes)))) # Reset data counter on failure self.status.dec_value("data", self.files_in_batch) @@ -1044,7 +1381,7 @@ class GMasterChangelogMixin(GMasterCommon): if done: xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) - map(self.changelog_done_func, changes) + list(map(self.changelog_done_func, changes)) self.archive_and_purge_changelogs(changes) break # it's either entry_ops() or Rsync that failed to do it's @@ -1054,14 +1391,62 @@ class GMasterChangelogMixin(GMasterCommon): # entry_ops() that failed... so we retry the _whole_ changelog # again. # TODO: remove entry retries when it's gets fixed. - logging.warn('incomplete sync, retrying changelogs: %s' % - ' '.join(map(os.path.basename, changes))) + logging.warn(lf('incomplete sync, retrying changelogs', + files=list(map(os.path.basename, changes)))) # Reset the Data counter before Retry self.status.dec_value("data", self.files_in_batch) self.files_in_batch = 0 + self.init_fop_batch_stats() time.sleep(0.5) + # Log the Skipped Entry ops range if any + if self.skipped_entry_changelogs_first is not None and \ + self.skipped_entry_changelogs_last is not None: + logging.info(lf("Skipping already processed entry ops", + from_changelog=self.skipped_entry_changelogs_first, + to_changelog=self.skipped_entry_changelogs_last, + num_changelogs=self.num_skipped_entry_changelogs)) + + # Log Current batch details + if changes: + logging.info( + lf("Entry Time Taken", + UNL=self.batch_stats["UNLINK"], + RMD=self.batch_stats["RMDIR"], + CRE=self.batch_stats["CREATE"], + MKN=self.batch_stats["MKNOD"], + MKD=self.batch_stats["MKDIR"], + REN=self.batch_stats["RENAME"], + LIN=self.batch_stats["LINK"], + SYM=self.batch_stats["SYMLINK"], + duration="%.4f" % self.batch_stats["ENTRY_SYNC_TIME"])) + + logging.info( + lf("Data/Metadata Time Taken", + SETA=self.batch_stats["SETATTR"], + meta_duration="%.4f" % self.batch_stats["META_SYNC_TIME"], + SETX=self.batch_stats["SETXATTR"], + XATT=self.batch_stats["XATTROP"], + DATA=self.batch_stats["DATA"], + data_duration="%.4f" % ( + time.time() - self.batch_stats["DATA_START_TIME"]))) + + logging.info( + lf("Batch Completed", + mode=self.name, + duration="%.4f" % (time.time() - self.batch_start_time), + changelog_start=changes[0].split(".")[-1], + changelog_end=changes[-1].split(".")[-1], + num_changelogs=len(changes), + stime=self.get_data_stime(), + entry_stime=self.get_entry_stime())) + + def upd_entry_stime(self, stime): + self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY, + self.uuid, + stime) + def upd_stime(self, stime, path=None): if not path: path = self.FLAT_DIR_HIERARCHY @@ -1071,8 +1456,7 @@ class GMasterChangelogMixin(GMasterCommon): # Update last_synced_time in status file based on stime # only update stime if stime xattr set to Brick root if path == self.FLAT_DIR_HIERARCHY: - chkpt_time = gconf.configinterface.get_realtime( - "checkpoint") + chkpt_time = gconf.getr("checkpoint") checkpoint_time = 0 if chkpt_time is not None: checkpoint_time = int(chkpt_time) @@ -1080,10 +1464,10 @@ class GMasterChangelogMixin(GMasterCommon): self.status.set_last_synced(stime, checkpoint_time) def update_worker_remote_node(self): - node = sys.argv[-1] + node = rconf.args.resource_remote node_data = node.split("@") node = node_data[-1] - remote_node_ip = node.split(":")[0] + remote_node_ip, _ = host_brick_split(node) self.status.set_slave_node(remote_node_ip) def changelogs_batch_process(self, changes): @@ -1091,7 +1475,7 @@ class GMasterChangelogMixin(GMasterCommon): current_size = 0 for c in changes: si = os.lstat(c).st_size - if (si + current_size) > MAX_CHANGELOG_BATCH_SIZE: + if (si + current_size) > gconf.get("changelog-batch-size"): # Create new batch if single Changelog file greater than # Max Size! or current batch size exceeds Max size changelogs_batches.append([c]) @@ -1105,7 +1489,8 @@ class GMasterChangelogMixin(GMasterCommon): changelogs_batches[-1].append(c) for batch in changelogs_batches: - logging.debug('processing changes %s' % repr(batch)) + logging.debug(lf('processing changes', + batch=batch)) self.process(batch) def crawl(self): @@ -1113,44 +1498,45 @@ class GMasterChangelogMixin(GMasterCommon): changes = [] # get stime (from the brick) and purge changelogs # that are _historical_ to that time. - purge_time = self.get_purge_time() + data_stime = self.get_data_stime() - self.changelog_agent.scan() + libgfchangelog.scan() self.crawls += 1 - changes = self.changelog_agent.getchanges() + changes = libgfchangelog.getchanges() if changes: - if purge_time: - logging.info("slave's time: %s" % repr(purge_time)) + if data_stime: + logging.info(lf("slave's time", + stime=data_stime)) processed = [x for x in changes - if int(x.split('.')[-1]) < purge_time[0]] + if int(x.split('.')[-1]) < data_stime[0]] for pr in processed: - logging.info( - 'skipping already processed change: %s...' % - os.path.basename(pr)) + logging.debug( + lf('skipping already processed change', + changelog=os.path.basename(pr))) self.changelog_done_func(pr) changes.remove(pr) self.archive_and_purge_changelogs(processed) self.changelogs_batch_process(changes) - def register(self, register_time, changelog_agent, status): - self.changelog_agent = changelog_agent - self.sleep_interval = int(gconf.change_interval) - self.changelog_done_func = self.changelog_agent.done - self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), + def register(self, register_time, status): + self.sleep_interval = gconf.get("change-interval") + self.changelog_done_func = libgfchangelog.done + self.tempdir = self.setup_working_dir() + self.processed_changelogs_dir = os.path.join(self.tempdir, ".processed") self.name = "live_changelog" self.status = status class GMasterChangeloghistoryMixin(GMasterChangelogMixin): - def register(self, register_time, changelog_agent, status): - self.changelog_agent = changelog_agent + def register(self, register_time, status): self.changelog_register_time = register_time self.history_crawl_start_time = register_time - self.changelog_done_func = self.changelog_agent.history_done + self.changelog_done_func = libgfchangelog.history_done self.history_turns = 0 - self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), + self.tempdir = self.setup_working_dir() + self.processed_changelogs_dir = os.path.join(self.tempdir, ".history/.processed") self.name = "history_changelog" self.status = status @@ -1158,26 +1544,34 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): def crawl(self): self.history_turns += 1 self.status.set_worker_crawl_status("History Crawl") - purge_time = self.get_purge_time() + data_stime = self.get_data_stime() end_time = int(time.time()) - logging.info('starting history crawl... turns: %s, stime: %s, etime: %s' - % (self.history_turns, repr(purge_time), repr(end_time))) - if not purge_time or purge_time == URXTIME: - logging.info("stime not available, abandoning history crawl") - raise NoPurgeTimeAvailable() + #as start of historical crawl marks Geo-rep worker restart + if gconf.get("ignore-deletes"): + logging.info(lf('ignore-deletes config option is set', + stime=data_stime)) + + logging.info(lf('starting history crawl', + turns=self.history_turns, + stime=data_stime, + etime=end_time, + entry_stime=self.get_entry_stime())) + + if not data_stime or data_stime == URXTIME: + raise NoStimeAvailable() # Changelogs backend path is hardcoded as # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different # location then consuming history will not work(Known issue as of now) - changelog_path = os.path.join(gconf.local_path, + changelog_path = os.path.join(rconf.args.local_path, ".glusterfs/changelogs") - ret, actual_end = self.changelog_agent.history( + ret, actual_end = libgfchangelog.history_changelog( changelog_path, - purge_time[0], + data_stime[0], end_time, - int(gconf.sync_jobs)) + gconf.get("sync-jobs")) # scan followed by getchanges till scan returns zero. # history_scan() is blocking call, till it gets the number @@ -1185,18 +1579,19 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # to be processed. returns positive value as number of changelogs # to be processed, which will be fetched using # history_getchanges() - while self.changelog_agent.history_scan() > 0: + while libgfchangelog.history_scan() > 0: self.crawls += 1 - changes = self.changelog_agent.history_getchanges() + changes = libgfchangelog.history_getchanges() if changes: - if purge_time: - logging.info("slave's time: %s" % repr(purge_time)) + if data_stime: + logging.info(lf("slave's time", + stime=data_stime)) processed = [x for x in changes - if int(x.split('.')[-1]) < purge_time[0]] + if int(x.split('.')[-1]) < data_stime[0]] for pr in processed: - logging.info('skipping already processed change: ' - '%s...' % os.path.basename(pr)) + logging.debug(lf('skipping already processed change', + changelog=os.path.basename(pr))) self.changelog_done_func(pr) changes.remove(pr) @@ -1204,8 +1599,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): history_turn_time = int(time.time()) - self.history_crawl_start_time - logging.info('finished history crawl syncing, endtime: %s, stime: %s' - % (actual_end, repr(self.get_purge_time()))) + logging.info(lf('finished history crawl', + endtime=actual_end, + stime=self.get_data_stime(), + entry_stime=self.get_entry_stime())) # If TS returned from history_changelog is < register_time # then FS crawl may be required, since history is only available @@ -1219,7 +1616,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): self.history_crawl_start_time = int(time.time()) self.crawl() else: - # This exeption will be catched in resource.py and + # This exception will be caught in resource.py and # fallback to xsync for the small gap. raise PartialHistoryAvailable(str(actual_end)) @@ -1238,17 +1635,18 @@ class GMasterXsyncMixin(GMasterChangelogMixin): XSYNC_MAX_ENTRIES = 1 << 13 - def register(self, register_time=None, changelog_agent=None, status=None): + def register(self, register_time=None, status=None): self.status = status self.counter = 0 self.comlist = [] self.stimes = [] self.sleep_interval = 60 self.tempdir = self.setup_working_dir() + logging.info(lf('Working dir', + path=self.tempdir)) self.tempdir = os.path.join(self.tempdir, 'xsync') self.processed_changelogs_dir = self.tempdir self.name = "xsync" - logging.info('xsync temp directory: %s' % self.tempdir) try: os.makedirs(self.tempdir) except OSError: @@ -1257,6 +1655,11 @@ class GMasterXsyncMixin(GMasterChangelogMixin): pass else: raise + # Purge stale unprocessed xsync changelogs + for f in os.listdir(self.tempdir): + if f.startswith("XSYNC-CHANGELOG"): + os.remove(os.path.join(self.tempdir, f)) + def crawl(self): """ @@ -1269,25 +1672,28 @@ class GMasterXsyncMixin(GMasterChangelogMixin): self.Xcrawl() t = Thread(target=Xsyncer) t.start() - logging.info('starting hybrid crawl..., stime: %s' - % repr(self.get_purge_time())) + logging.info(lf('starting hybrid crawl', + stime=self.get_data_stime())) self.status.set_worker_crawl_status("Hybrid Crawl") while True: try: item = self.comlist.pop(0) if item[0] == 'finale': - logging.info('finished hybrid crawl syncing, stime: %s' - % repr(self.get_purge_time())) + logging.info(lf('finished hybrid crawl', + stime=self.get_data_stime())) break elif item[0] == 'xsync': - logging.info('processing xsync changelog %s' % (item[1])) + logging.info(lf('processing xsync changelog', + path=item[1])) self.process([item[1]], 0) self.archive_and_purge_changelogs([item[1]]) elif item[0] == 'stime': - logging.debug('setting slave time: %s' % repr(item[1])) + logging.debug(lf('setting slave time', + time=item[1])) self.upd_stime(item[1][1], item[1][0]) else: - logging.warn('unknown tuple in comlist (%s)' % repr(item)) + logging.warn(lf('unknown tuple in comlist', + entry=item)) except IndexError: time.sleep(1) @@ -1347,7 +1753,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): def is_sticky(self, path, mo): """check for DHTs linkto sticky bit file""" sticky = False - if mo & 01000: + if mo & 0o1000: sticky = self.master.server.linkto_check(path) return sticky @@ -1365,8 +1771,9 @@ class GMasterXsyncMixin(GMasterChangelogMixin): xtr_root = self.xtime('.', self.slave) if isinstance(xtr_root, int): if xtr_root != ENOENT: - logging.warn("slave cluster not returning the " - "correct xtime for root (%d)" % xtr_root) + logging.warn(lf("slave cluster not returning the " + "xtime for root", + error=xtr_root)) xtr_root = self.minus_infinity xtl = self.xtime(path) if isinstance(xtl, int): @@ -1374,8 +1781,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin): xtr = self.xtime(path, self.slave) if isinstance(xtr, int): if xtr != ENOENT: - logging.warn("slave cluster not returning the " - "correct xtime for %s (%d)" % (path, xtr)) + logging.warn(lf("slave cluster not returning the " + "xtime for dir", + path=path, + error=xtr)) xtr = self.minus_infinity xtr = max(xtr, xtr_root) zero_zero = (0, 0) @@ -1390,27 +1799,32 @@ class GMasterXsyncMixin(GMasterChangelogMixin): dem = self.master.server.entries(path) pargfid = self.master.server.gfid(path) if isinstance(pargfid, int): - logging.warn('skipping directory %s' % (path)) + logging.warn(lf('skipping directory', + path=path)) for e in dem: bname = e e = os.path.join(path, e) xte = self.xtime(e) if isinstance(xte, int): - logging.warn("irregular xtime for %s: %s" % - (e, errno.errorcode[xte])) + logging.warn(lf("irregular xtime", + path=e, + error=errno.errorcode[xte])) continue if not self.need_sync(e, xte, xtr): continue st = self.master.server.lstat(e) if isinstance(st, int): - logging.warn('%s got purged in the interim ...' % e) + logging.warn(lf('got purged in the interim', + path=e)) continue if self.is_sticky(e, st.st_mode): - logging.debug('ignoring sticky bit file %s' % e) + logging.debug(lf('ignoring sticky bit file', + path=e)) continue gfid = self.master.server.gfid(e) if isinstance(gfid, int): - logging.warn('skipping entry %s..' % e) + logging.warn(lf('skipping entry', + path=e)) continue mo = st.st_mode self.counter += 1 if ((stat.S_ISDIR(mo) or @@ -1420,8 +1834,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin): self.sync_done(self.stimes, False) self.stimes = [] if stat.S_ISDIR(mo): - self.write_entry_change("E", [gfid, 'MKDIR', str(mo), - str(0), str(0), escape(os.path.join(pargfid, bname))]) + self.write_entry_change("E", + [gfid, 'MKDIR', str(mo), + str(0), str(0), escape_space_newline( + os.path.join(pargfid, bname))]) self.write_entry_change("M", [gfid, "SETATTR", str(st.st_uid), str(st.st_gid), str(st.st_mode), str(st.st_atime), @@ -1440,8 +1856,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin): self.stimes.append((e, stime_to_update)) elif stat.S_ISLNK(mo): self.write_entry_change( - "E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, - bname))]) + "E", [gfid, 'SYMLINK', escape_space_newline( + os.path.join(pargfid, bname))]) elif stat.S_ISREG(mo): nlink = st.st_nlink nlink -= 1 # fixup backend stat link count @@ -1452,12 +1868,13 @@ class GMasterXsyncMixin(GMasterChangelogMixin): self.write_entry_change("E", [gfid, 'MKNOD', str(mo), str(0), str(0), - escape(os.path.join( - pargfid, bname))]) + escape_space_newline( + os.path.join( + pargfid, bname))]) else: self.write_entry_change( - "E", [gfid, 'LINK', escape(os.path.join(pargfid, - bname))]) + "E", [gfid, 'LINK', escape_space_newline( + os.path.join(pargfid, bname))]) self.write_entry_change("D", [gfid]) if path == '.': stime_to_update = xtl @@ -1554,11 +1971,11 @@ class Syncer(object): self.pb = PostBox() self.sync_engine = sync_engine self.errnos_ok = resilient_errnos - for i in range(int(gconf.sync_jobs)): - t = Thread(target=self.syncjob) + for i in range(gconf.get("sync-jobs")): + t = Thread(target=self.syncjob, args=(i + 1, )) t.start() - def syncjob(self): + def syncjob(self, job_id): """the life of a worker""" while True: pb = None @@ -1571,7 +1988,14 @@ class Syncer(object): break time.sleep(0.5) pb.close() + start = time.time() po = self.sync_engine(pb, self.log_err) + logging.info(lf("Sync Time Taken", + job=job_id, + num_files=len(pb), + return_code=po.returncode, + duration="%.4f" % (time.time() - start))) + if po.returncode == 0: ret = (True, 0) elif po.returncode in self.errnos_ok: |
