diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 235 |
1 files changed, 126 insertions, 109 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 4c1a529a3ed..552c4deec44 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -12,7 +12,6 @@ import os import sys import time import stat -import json import logging import fcntl import string @@ -21,9 +20,11 @@ import tarfile from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN, ESTALE, EINTR from threading import Condition, Lock from datetime import datetime -from gconf import gconf -from syncdutils import Thread, GsyncdError, boolify, escape_space_newline -from syncdutils import unescape_space_newline, gauxpfx, md5hex, selfkill + +import gsyncdconfig as gconf +from rconf import rconf +from syncdutils import Thread, GsyncdError, escape_space_newline +from syncdutils import unescape_space_newline, gauxpfx, escape from syncdutils import lstat, errno_wrap, FreeObject, lf, matching_disk_gfid from syncdutils import NoStimeAvailable, PartialHistoryAvailable @@ -85,24 +86,41 @@ def gmaster_builder(excrawl=None): """produce the GMaster class variant corresponding to sync mode""" this = sys.modules[__name__] - modemixin = gconf.special_sync_mode + modemixin = gconf.get("special-sync-mode") if not modemixin: modemixin = 'normal' - changemixin = 'xsync' if gconf.change_detector == 'xsync' \ - else excrawl or gconf.change_detector + + if gconf.get("change-detector") == 'xsync': + changemixin = 'xsync' + elif excrawl: + changemixin = excrawl + else: + changemixin = gconf.get("change-detector") + logging.debug(lf('setting up change detection mode', mode=changemixin)) modemixin = getattr(this, modemixin.capitalize() + 'Mixin') crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin') - sendmarkmixin = boolify( - gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin - purgemixin = boolify( - gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin - syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine + + if gconf.get("use-rsync-xattrs"): + sendmarkmixin = SendmarkRsyncMixin + else: + sendmarkmixin = SendmarkNormalMixin + + if gconf.get("ignore-deletes"): + purgemixin = PurgeNoopMixin + else: + purgemixin = PurgeNormalMixin + + if gconf.get("sync-method") == "tarssh": + syncengine = TarSSHEngine + else: + syncengine = RsyncEngine class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin, syncengine): pass + return _GMaster @@ -139,9 +157,9 @@ class NormalMixin(object): return xt0 >= xt1 def make_xtime_opts(self, is_master, opts): - if not 'create' in opts: + if 'create' not in opts: opts['create'] = is_master - if not 'default_xtime' in opts: + if 'default_xtime' not in opts: opts['default_xtime'] = URXTIME def xtime_low(self, rsc, path, **opts): @@ -212,9 +230,9 @@ class RecoverMixin(NormalMixin): @staticmethod def make_xtime_opts(is_master, opts): - if not 'create' in opts: + if 'create' not in opts: opts['create'] = False - if not 'default_xtime' in opts: + if 'default_xtime' not in opts: opts['default_xtime'] = URXTIME def keepalive_payload_hook(self, timo, gap): @@ -385,7 +403,7 @@ class GMasterCommon(object): self.master = master self.slave = slave self.jobtab = {} - if boolify(gconf.use_tarssh): + if gconf.get("sync-method") == "tarssh": self.syncer = Syncer(slave, self.slave.tarssh, [2]) else: # partial transfer (cf. rsync(1)), that's normal @@ -401,7 +419,7 @@ class GMasterCommon(object): # 0. self.crawls = 0 self.turns = 0 - self.total_turns = int(gconf.turns) + self.total_turns = rconf.turns self.crawl_start = datetime.now() self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} self.start = None @@ -414,7 +432,7 @@ class GMasterCommon(object): def init_keep_alive(cls): """start the keep-alive thread """ - timo = int(gconf.timeout or 0) + timo = gconf.get("slave-timeout", 0) if timo > 0: def keep_alive(): while True: @@ -427,28 +445,28 @@ class GMasterCommon(object): def mgmt_lock(self): """Take management volume lock """ - if gconf.mgmt_lock_fd: + if rconf.mgmt_lock_fd: try: - fcntl.lockf(gconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) - if not gconf.active_earlier: - gconf.active_earlier = True + fcntl.lockf(rconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + if not rconf.active_earlier: + rconf.active_earlier = True logging.info(lf("Got lock Becoming ACTIVE", - brick=gconf.local_path)) + brick=rconf.args.local_path)) return True except: ex = sys.exc_info()[1] if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): - if not gconf.passive_earlier: - gconf.passive_earlier = True + if not rconf.passive_earlier: + rconf.passive_earlier = True logging.info(lf("Didn't get lock Becoming PASSIVE", - brick=gconf.local_path)) + brick=rconf.local_path)) return False raise fd = None - bname = str(self.uuid) + "_" + str(gconf.slave_id) + "_subvol_" \ - + str(gconf.subvol_num) + ".lock" - mgmt_lock_dir = os.path.join(gconf.meta_volume_mnt, "geo-rep") + bname = str(self.uuid) + "_" + rconf.args.slave_id + "_subvol_" \ + + str(rconf.args.subvol_num) + ".lock" + mgmt_lock_dir = os.path.join(gconf.get("meta-volume-mnt"), "geo-rep") path = os.path.join(mgmt_lock_dir, bname) logging.debug(lf("lock file path", path=path)) try: @@ -471,30 +489,30 @@ class GMasterCommon(object): try: fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # Save latest FD for future use - gconf.mgmt_lock_fd = fd + rconf.mgmt_lock_fd = fd except: ex = sys.exc_info()[1] if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): # cannot grab, it's taken - if not gconf.passive_earlier: - gconf.passive_earlier = True + if not rconf.passive_earlier: + rconf.passive_earlier = True logging.info(lf("Didn't get lock Becoming PASSIVE", - brick=gconf.local_path)) - gconf.mgmt_lock_fd = fd + brick=rconf.args.local_path)) + rconf.mgmt_lock_fd = fd return False raise - if not gconf.active_earlier: - gconf.active_earlier = True + if not rconf.active_earlier: + rconf.active_earlier = True logging.info(lf("Got lock Becoming ACTIVE", - brick=gconf.local_path)) + brick=rconf.args.local_path)) return True def should_crawl(self): - if not boolify(gconf.use_meta_volume): - return gconf.glusterd_uuid in self.master.server.node_uuid() + if not gconf.get("use-meta-volume"): + return rconf.args.local_node_id in self.master.server.node_uuid() - if not os.path.ismount(gconf.meta_volume_mnt): + if not os.path.ismount(gconf.get("meta-volume-mnt")): logging.error("Meta-volume is not mounted. Worker Exiting...") sys.exit(1) return self.mgmt_lock() @@ -532,7 +550,7 @@ class GMasterCommon(object): logging.debug("%s master with volume id %s ..." % (inter_master and "intermediate" or "primary", self.uuid)) - gconf.configinterface.set('volume_id', self.uuid) + rconf.volume_id = self.uuid if self.volinfo: if self.volinfo['retval']: logging.warn(lf("master cluster's info may not be valid", @@ -557,7 +575,7 @@ class GMasterCommon(object): turns=self.turns, time=self.start) t1 = time.time() - if int(t1 - t0) >= int(gconf.replica_failover_interval): + if int(t1 - t0) >= gconf.get("replica-failover-interval"): crawl = self.should_crawl() t0 = t1 self.update_worker_remote_node() @@ -567,7 +585,7 @@ class GMasterCommon(object): # which is min of cluster (but max of the replicas) brick_stime = self.xtime('.', self.slave) cluster_stime = self.master.server.aggregated.stime_mnt( - '.', '.'.join([str(self.uuid), str(gconf.slave_id)])) + '.', '.'.join([str(self.uuid), rconf.args.slave_id])) logging.debug(lf("Crawl info", cluster_stime=cluster_stime, brick_stime=brick_stime)) @@ -675,6 +693,7 @@ class XCrawlMetadata(object): self.st_atime = float(st_atime) self.st_mtime = float(st_mtime) + class GMasterChangelogMixin(GMasterCommon): """ changelog based change detection and syncing """ @@ -701,34 +720,34 @@ class GMasterChangelogMixin(GMasterCommon): def init_fop_batch_stats(self): self.batch_stats = { - "CREATE":0, - "MKNOD":0, - "UNLINK":0, - "MKDIR":0, - "RMDIR":0, - "LINK":0, - "SYMLINK":0, - "RENAME":0, - "SETATTR":0, - "SETXATTR":0, - "XATTROP":0, - "DATA":0, - "ENTRY_SYNC_TIME":0, - "META_SYNC_TIME":0, - "DATA_START_TIME":0 + "CREATE": 0, + "MKNOD": 0, + "UNLINK": 0, + "MKDIR": 0, + "RMDIR": 0, + "LINK": 0, + "SYMLINK": 0, + "RENAME": 0, + "SETATTR": 0, + "SETXATTR": 0, + "XATTROP": 0, + "DATA": 0, + "ENTRY_SYNC_TIME": 0, + "META_SYNC_TIME": 0, + "DATA_START_TIME": 0 } def update_fop_batch_stats(self, ty): if ty in ['FSETXATTR']: - ty = 'SETXATTR' - self.batch_stats[ty] = self.batch_stats.get(ty,0) + 1 + ty = 'SETXATTR' + self.batch_stats[ty] = self.batch_stats.get(ty, 0) + 1 def archive_and_purge_changelogs(self, changelogs): # Creates tar file instead of tar.gz, since changelogs will # be appended to existing tar. archive name is # archive_<YEAR><MONTH>.tar archive_name = "archive_%s.tar" % datetime.today().strftime( - gconf.changelog_archive_format) + gconf.get("changelog-archive-format")) try: tar = tarfile.open(os.path.join(self.processed_changelogs_dir, @@ -764,13 +783,9 @@ class GMasterChangelogMixin(GMasterCommon): else: raise - def fallback_xsync(self): - logging.info('falling back to xsync mode') - gconf.configinterface.set('change-detector', 'xsync') - selfkill() - def setup_working_dir(self): - workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path)) + workdir = os.path.join(gconf.get("working-dir"), + escape(rconf.args.local_path)) logging.debug('changelog working dir %s' % workdir) return workdir @@ -804,27 +819,30 @@ class GMasterChangelogMixin(GMasterCommon): logging.info(lf('Fixing gfid mismatch in slave. Deleting' ' the entry', retry_count=retry_count, entry=repr(failure))) - #Add deletion to fix_entry_ops list + # Add deletion to fix_entry_ops list if failure[2]['slave_isdir']: - fix_entry_ops.append(edct('RMDIR', - gfid=failure[2]['slave_gfid'], - entry=pbname)) + fix_entry_ops.append( + edct('RMDIR', + gfid=failure[2]['slave_gfid'], + entry=pbname)) else: - fix_entry_ops.append(edct('UNLINK', - gfid=failure[2]['slave_gfid'], - entry=pbname)) + fix_entry_ops.append( + edct('UNLINK', + gfid=failure[2]['slave_gfid'], + entry=pbname)) elif not isinstance(st, int): - #The file exists on master but with different name. - #Probabaly renamed and got missed during xsync crawl. + # The file exists on master but with different name. + # Probabaly renamed and got missed during xsync crawl. if failure[2]['slave_isdir']: logging.info(lf('Fixing gfid mismatch in slave', retry_count=retry_count, entry=repr(failure))) - realpath = os.readlink(os.path.join(gconf.local_path, - ".glusterfs", - slave_gfid[0:2], - slave_gfid[2:4], - slave_gfid)) + realpath = os.readlink(os.path.join( + rconf.args.local_path, + ".glusterfs", + slave_gfid[0:2], + slave_gfid[2:4], + slave_gfid)) dst_entry = os.path.join(pfx, realpath.split('/')[-2], realpath.split('/')[-1]) rename_dict = edct('RENAME', gfid=slave_gfid, @@ -840,19 +858,20 @@ class GMasterChangelogMixin(GMasterCommon): ' Deleting the entry', retry_count=retry_count, entry=repr(failure))) - fix_entry_ops.append(edct('UNLINK', - gfid=failure[2]['slave_gfid'], - entry=pbname)) + fix_entry_ops.append( + edct('UNLINK', + gfid=failure[2]['slave_gfid'], + entry=pbname)) logging.error(lf('Entry cannot be fixed in slave due ' 'to GFID mismatch, find respective ' 'path for the GFID and trigger sync', gfid=slave_gfid)) if fix_entry_ops: - #Process deletions of entries whose gfids are mismatched + # Process deletions of entries whose gfids are mismatched failures1 = self.slave.server.entry_ops(fix_entry_ops) if not failures1: - logging.info ("Sucessfully fixed entry ops with gfid mismatch") + logging.info("Sucessfully fixed entry ops with gfid mismatch") return failures1 @@ -880,12 +899,11 @@ class GMasterChangelogMixin(GMasterCommon): for failure in failures1: logging.error("Failed to fix entry ops %s", repr(failure)) else: - #Retry original entry list 5 times + # Retry original entry list 5 times failures = self.slave.server.entry_ops(entries) self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') - def process_change(self, change, done, retry): pfx = gauxpfx() clist = [] @@ -930,7 +948,7 @@ class GMasterChangelogMixin(GMasterCommon): # skip ENTRY operation if hot tier brick if self.name == 'live_changelog' or \ self.name == 'history_changelog': - if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY: + if rconf.args.is_hottier and et == self.TYPE_ENTRY: logging.debug(lf('skip ENTRY op if hot tier brick', op=ec[self.POS_TYPE])) continue @@ -978,7 +996,7 @@ class GMasterChangelogMixin(GMasterCommon): 'master', gfid=gfid, pgfid_bname=en)) continue - if not boolify(gconf.ignore_deletes): + if not gconf.get("ignore-deletes"): if not ignore_entry_ops: entries.append(edct(ty, gfid=gfid, entry=en)) elif ty in ['CREATE', 'MKDIR', 'MKNOD']: @@ -1084,12 +1102,11 @@ class GMasterChangelogMixin(GMasterCommon): st_mtime=ec[6]))) else: meta_gfid.add((os.path.join(pfx, ec[0]), )) - elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \ - ec[1] == 'FXATTROP': + elif ec[1] in ['SETXATTR', 'XATTROP', 'FXATTROP']: # To sync xattr/acls use rsync/tar, --xattrs and --acls # switch to rsync and tar - if not boolify(gconf.use_tarssh) and \ - (boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)): + if not gconf.get("sync-method") == "tarssh" and \ + (gconf.get("sync-xattrs") or gconf.get("sync-acls")): datas.add(os.path.join(pfx, ec[0])) else: logging.warn(lf('got invalid fop type', @@ -1102,8 +1119,8 @@ class GMasterChangelogMixin(GMasterCommon): self.status.inc_value("data", len(datas)) self.batch_stats["DATA"] += self.files_in_batch - \ - self.batch_stats["SETXATTR"] - \ - self.batch_stats["XATTROP"] + self.batch_stats["SETXATTR"] - \ + self.batch_stats["XATTROP"] entry_start_time = time.time() # sync namespace @@ -1185,7 +1202,7 @@ class GMasterChangelogMixin(GMasterCommon): # with data of other changelogs. if retry: - if tries == (int(gconf.max_rsync_retries) - 1): + if tries == (gconf.get("max-rsync-retries") - 1): # Enable Error logging if it is last retry self.syncer.enable_errorlog() @@ -1243,7 +1260,7 @@ class GMasterChangelogMixin(GMasterCommon): # We do not know which changelog transfer failed, retry everything. retry = True tries += 1 - if tries == int(gconf.max_rsync_retries): + if tries == gconf.get("max-rsync-retries"): logging.error(lf('changelogs could not be processed ' 'completely - moving on...', files=map(os.path.basename, changes))) @@ -1331,8 +1348,7 @@ class GMasterChangelogMixin(GMasterCommon): # Update last_synced_time in status file based on stime # only update stime if stime xattr set to Brick root if path == self.FLAT_DIR_HIERARCHY: - chkpt_time = gconf.configinterface.get_realtime( - "checkpoint") + chkpt_time = gconf.getr("checkpoint") checkpoint_time = 0 if chkpt_time is not None: checkpoint_time = int(chkpt_time) @@ -1340,7 +1356,7 @@ class GMasterChangelogMixin(GMasterCommon): self.status.set_last_synced(stime, checkpoint_time) def update_worker_remote_node(self): - node = sys.argv[-1] + node = rconf.args.resource_remote node_data = node.split("@") node = node_data[-1] remote_node_ip = node.split(":")[0] @@ -1351,7 +1367,7 @@ class GMasterChangelogMixin(GMasterCommon): current_size = 0 for c in changes: si = os.lstat(c).st_size - if (si + current_size) > int(gconf.changelog_batch_size): + if (si + current_size) > gconf.get("changelog-batch-size"): # Create new batch if single Changelog file greater than # Max Size! or current batch size exceeds Max size changelogs_batches.append([c]) @@ -1397,7 +1413,7 @@ class GMasterChangelogMixin(GMasterCommon): def register(self, register_time, changelog_agent, status): self.changelog_agent = changelog_agent - self.sleep_interval = int(gconf.change_interval) + self.sleep_interval = gconf.get("change-interval") self.changelog_done_func = self.changelog_agent.done self.tempdir = self.setup_working_dir() self.processed_changelogs_dir = os.path.join(self.tempdir, @@ -1437,13 +1453,13 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): # Changelogs backend path is hardcoded as # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different # location then consuming history will not work(Known issue as of now) - changelog_path = os.path.join(gconf.local_path, + changelog_path = os.path.join(rconf.args.local_path, ".glusterfs/changelogs") ret, actual_end = self.changelog_agent.history( changelog_path, data_stime[0], end_time, - int(gconf.sync_jobs)) + gconf.get("sync-jobs")) # scan followed by getchanges till scan returns zero. # history_scan() is blocking call, till it gets the number @@ -1736,7 +1752,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin): [gfid, 'MKNOD', str(mo), str(0), str(0), escape_space_newline( - os.path.join(pargfid, bname))]) + os.path.join( + pargfid, bname))]) else: self.write_entry_change( "E", [gfid, 'LINK', escape_space_newline( @@ -1837,8 +1854,8 @@ class Syncer(object): self.pb = PostBox() self.sync_engine = sync_engine self.errnos_ok = resilient_errnos - for i in range(int(gconf.sync_jobs)): - t = Thread(target=self.syncjob, args=(i+1, )) + for i in range(gconf.get("sync-jobs")): + t = Thread(target=self.syncjob, args=(i + 1, )) t.start() def syncjob(self, job_id): |