diff options
author | Aravinda VK <avishwan@redhat.com> | 2014-03-21 12:33:10 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2014-04-07 21:56:55 -0700 |
commit | 238d101e55e067e5afcd43c728884e9ab8d36549 (patch) | |
tree | 60498b107335c0ae526bfa034bd56303406710ab /geo-replication/syncdaemon/master.py | |
parent | 0c20b17c09b2eca82f3c79013fd3fe1c72a957fd (diff) |
geo-rep: code pep8/flake8 fixes
pep8 is a style guide for python.
http://legacy.python.org/dev/peps/pep-0008/
pep8 can be installed using, `pip install pep8`
Usage: `pep8 <python file>`, For example, `pep8 master.py`
will display all the coding standard errors.
flake8 is used to identify unused imports and other issues
in code.
pip install flake8
cd $GLUSTER_REPO/geo-replication/
flake8 syncdaemon
Updated license headers to each source file.
Change-Id: I01c7d0a6091d21bfa48720e9fb5624b77fa3db4a
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/7311
Reviewed-by: Kotresh HR <khiremat@redhat.com>
Reviewed-by: Prashanth Pai <ppai@redhat.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 384 |
1 files changed, 234 insertions, 150 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 98a61bc1d75..4301396f9f4 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -1,25 +1,30 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + import os import sys import time import stat -import random -import signal import json import logging import socket import string import errno -from shutil import copyfileobj -from errno import ENOENT, ENODATA, EPIPE, EEXIST, errorcode -from threading import currentThread, Condition, Lock +from errno import ENOENT, ENODATA, EPIPE, EEXIST +from threading import Condition, Lock from datetime import datetime -from libcxattr import Xattr - from gconf import gconf -from tempfile import mkdtemp, NamedTemporaryFile -from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \ - unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \ - lstat, errno_wrap, update_file +from tempfile import NamedTemporaryFile +from syncdutils import Thread, GsyncdError, boolify, escape +from syncdutils import unescape, select, gauxpfx, md5hex, selfkill +from syncdutils import lstat, errno_wrap URXTIME = (-1, 0) @@ -27,18 +32,20 @@ URXTIME = (-1, 0) # of the DRY principle (no, don't look for elevated or # perspectivistic things here) + def _xtime_now(): t = time.time() sec = int(t) nsec = int((t - sec) * 1000000) return (sec, nsec) + def _volinfo_hook_relax_foreign(self): volinfo_sys = self.get_sys_volinfo() fgn_vi = volinfo_sys[self.KFGN] if fgn_vi: expiry = fgn_vi['timeout'] - int(time.time()) + 1 - logging.info('foreign volume info found, waiting %d sec for expiry' % \ + logging.info('foreign volume info found, waiting %d sec for expiry' % expiry) time.sleep(expiry) volinfo_sys = self.get_sys_volinfo() @@ -58,10 +65,14 @@ def gmaster_builder(excrawl=None): logging.info('setting up %s change detection mode' % changemixin) modemixin = getattr(this, modemixin.capitalize() + 'Mixin') crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin') - sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin - purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin + sendmarkmixin = boolify( + gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin + purgemixin = boolify( + gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine - class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin, syncengine): + + class _GMaster(crawlmixin, modemixin, sendmarkmixin, + purgemixin, syncengine): pass return _GMaster @@ -71,6 +82,7 @@ def gmaster_builder(excrawl=None): # sync modes class NormalMixin(object): + """normal geo-rep behavior""" minus_infinity = URXTIME @@ -152,14 +164,18 @@ class NormalMixin(object): self.slave.server.set_stime(path, self.uuid, mark) # self.slave.server.set_xtime_remote(path, self.uuid, mark) + class PartialMixin(NormalMixin): + """a variant tuned towards operation with a master that has partial info of the slave (brick typically)""" def xtime_reversion_hook(self, path, xtl, xtr): pass + class RecoverMixin(NormalMixin): + """a variant that differs from normal in terms of ignoring non-indexed files""" @@ -178,11 +194,13 @@ class RecoverMixin(NormalMixin): # Further mixins for certain tunable behaviors + class SendmarkNormalMixin(object): def sendmark_regular(self, *a, **kw): return self.sendmark(*a, **kw) + class SendmarkRsyncMixin(object): def sendmark_regular(self, *a, **kw): @@ -194,19 +212,24 @@ class PurgeNormalMixin(object): def purge_missing(self, path, names): self.slave.server.purge(path, names) + class PurgeNoopMixin(object): def purge_missing(self, path, names): pass + class TarSSHEngine(object): + """Sync engine that uses tar(1) piped over ssh(1) for data transfers. Good for lots of small files. """ + def a_syncdata(self, files): logging.debug('files: %s' % (files)) for f in files: pb = self.syncer.add(f) + def regjob(se, xte, pb): rv = pb.wait() if rv[0]: @@ -228,13 +251,17 @@ class TarSSHEngine(object): self.a_syncdata(files) self.syncdata_wait() + class RsyncEngine(object): + """Sync engine that uses rsync(1) for data transfers""" + def a_syncdata(self, files): logging.debug('files: %s' % (files)) for f in files: logging.debug('candidate for syncing %s' % f) pb = self.syncer.add(f) + def regjob(se, xte, pb): rv = pb.wait() if rv[0]: @@ -258,7 +285,9 @@ class RsyncEngine(object): self.a_syncdata(files) self.syncdata_wait() + class GMasterCommon(object): + """abstract class impementling master role""" KFGN = 0 @@ -269,8 +298,9 @@ class GMasterCommon(object): err out on multiple foreign masters """ - fgn_vis, nat_vi = self.master.server.aggregated.foreign_volume_infos(), \ - self.master.server.aggregated.native_volume_info() + fgn_vis, nat_vi = ( + self.master.server.aggregated.foreign_volume_infos(), + self.master.server.aggregated.native_volume_info()) fgn_vi = None if fgn_vis: if len(fgn_vis) > 1: @@ -316,15 +346,14 @@ class GMasterCommon(object): if getattr(gconf, 'state_detail_file', None): try: with open(gconf.state_detail_file, 'r+') as f: - loaded_data= json.load(f) - diff_data = set(default_data) - set (loaded_data) + loaded_data = json.load(f) + diff_data = set(default_data) - set(loaded_data) if len(diff_data): for i in diff_data: loaded_data[i] = default_data[i] return loaded_data - except (IOError): - ex = sys.exc_info()[1] - logging.warn ('Creating new gconf.state_detail_file.') + except IOError: + logging.warn('Creating new gconf.state_detail_file.') # Create file with initial data try: with open(gconf.state_detail_file, 'wb') as f: @@ -364,7 +393,8 @@ class GMasterCommon(object): # - self.turns is the number of turns since start # - self.total_turns is a limit so that if self.turns reaches it, then # we exit (for diagnostic purposes) - # so, eg., if the master fs changes unceasingly, self.turns will remain 0. + # so, eg., if the master fs changes unceasingly, self.turns will remain + # 0. self.crawls = 0 self.turns = 0 self.total_turns = int(gconf.turns) @@ -394,7 +424,7 @@ class GMasterCommon(object): t.start() def should_crawl(cls): - return (gconf.glusterd_uuid in cls.master.server.node_uuid()) + return gconf.glusterd_uuid in cls.master.server.node_uuid() def register(self): self.register() @@ -416,18 +446,18 @@ class GMasterCommon(object): volinfo_sys = self.volinfo_hook() self.volinfo = volinfo_sys[self.KNAT] inter_master = volinfo_sys[self.KFGN] - logging.info("%s master with volume id %s ..." % \ - (inter_master and "intermediate" or "primary", - self.uuid)) + logging.info("%s master with volume id %s ..." % + (inter_master and "intermediate" or "primary", + self.uuid)) gconf.configinterface.set('volume_id', self.uuid) if self.volinfo: if self.volinfo['retval']: - logging.warn("master cluster's info may not be valid %d" % \ + logging.warn("master cluster's info may not be valid %d" % self.volinfo['retval']) self.start_checkpoint_thread() else: raise GsyncdError("master volinfo unavailable") - self.total_crawl_stats = self.get_initial_crawl_data() + self.total_crawl_stats = self.get_initial_crawl_data() self.lastreport['time'] = time.time() logging.info('crawl interval: %d seconds' % self.sleep_interval) @@ -435,7 +465,7 @@ class GMasterCommon(object): crawl = self.should_crawl() while not self.terminate: if self.start: - logging.debug("... crawl #%d done, took %.6f seconds" % \ + logging.debug("... crawl #%d done, took %.6f seconds" % (self.crawls, time.time() - self.start)) self.start = time.time() should_display_info = self.start - self.lastreport['time'] >= 60 @@ -443,11 +473,11 @@ class GMasterCommon(object): logging.info("%d crawls, %d turns", self.crawls - self.lastreport['crawls'], self.turns - self.lastreport['turns']) - self.lastreport.update(crawls = self.crawls, - turns = self.turns, - time = self.start) + self.lastreport.update(crawls=self.crawls, + turns=self.turns, + time=self.start) t1 = time.time() - if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds + if int(t1 - t0) >= 60: # lets hardcode this check to 60 seconds crawl = self.should_crawl() t0 = t1 self.update_worker_remote_node() @@ -456,11 +486,14 @@ class GMasterCommon(object): # bring up _this_ brick to the cluster stime # which is min of cluster (but max of the replicas) brick_stime = self.xtime('.', self.slave) - cluster_stime = self.master.server.aggregated.stime_mnt('.', '.'.join([str(self.uuid), str(gconf.slave_id)])) - logging.debug("Cluster stime: %s | Brick stime: %s" % (repr(cluster_stime), repr(brick_stime))) + cluster_stime = self.master.server.aggregated.stime_mnt( + '.', '.'.join([str(self.uuid), str(gconf.slave_id)])) + logging.debug("Cluster stime: %s | Brick stime: %s" % + (repr(cluster_stime), repr(brick_stime))) if not isinstance(cluster_stime, int): if brick_stime < cluster_stime: - self.slave.server.set_stime(self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime) + self.slave.server.set_stime( + self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime) time.sleep(5) continue self.update_worker_health("Active") @@ -489,13 +522,14 @@ class GMasterCommon(object): with checkpoint @chkpt""" if xtimish: val = cls.serialize_xtime(val) - gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) + gconf.configinterface.set( + 'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) @staticmethod def humantime(*tpair): """format xtime-like (sec, nsec) pair to human readable format""" ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\ - strftime("%Y-%m-%d %H:%M:%S") + strftime("%Y-%m-%d %H:%M:%S") if len(tpair) > 1: ts += '.' + str(tpair[1]) return ts @@ -506,7 +540,7 @@ class GMasterCommon(object): years = int(years) days = int(days) - date="" + date = "" m, s = divmod(crawl_time.seconds, 60) h, m = divmod(m, 60) @@ -515,7 +549,8 @@ class GMasterCommon(object): if days != 0: date += "%s %s " % (days, "day" if days == 1 else "days") - date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2)) + date += "%s:%s:%s" % (string.zfill(h, 2), + string.zfill(m, 2), string.zfill(s, 2)) return date def checkpt_service(self, chan, chkpt): @@ -540,16 +575,18 @@ class GMasterCommon(object): if not checkpt_tgt: checkpt_tgt = self.xtime('.') if isinstance(checkpt_tgt, int): - raise GsyncdError("master root directory is unaccessible (%s)", + raise GsyncdError("master root directory is " + "unaccessible (%s)", os.strerror(checkpt_tgt)) self._set_checkpt_param(chkpt, 'target', checkpt_tgt) - logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ + logging.debug("checkpoint target %s has been determined " + "for checkpoint %s" % (repr(checkpt_tgt), chkpt)) # check if the label is 'now' chkpt_lbl = chkpt try: - x1,x2 = chkpt.split(':') + x1, x2 = chkpt.split(':') if x1 == 'now': chkpt_lbl = "as of " + self.humantime(x2) except: @@ -557,41 +594,46 @@ class GMasterCommon(object): completed = self._checkpt_param(chkpt, 'completed', xtimish=False) if completed: completed = tuple(int(x) for x in completed.split('.')) - s,_,_ = select([chan], [], [], (not completed) and 5 or None) + s, _, _ = select([chan], [], [], (not completed) and 5 or None) # either request made and we re-check to not # give back stale data, or we still hunting for completion - if self.native_xtime(checkpt_tgt) and self.native_xtime(checkpt_tgt) < self.volmark: + if (self.native_xtime(checkpt_tgt) and ( + self.native_xtime(checkpt_tgt) < self.volmark)): # indexing has been reset since setting the checkpoint status = "is invalid" else: xtr = self.xtime('.', self.slave) if isinstance(xtr, int): - raise GsyncdError("slave root directory is unaccessible (%s)", + raise GsyncdError("slave root directory is " + "unaccessible (%s)", os.strerror(xtr)) ncompleted = self.xtime_geq(xtr, checkpt_tgt) - if completed and not ncompleted: # stale data - logging.warn("completion time %s for checkpoint %s became stale" % \ + if completed and not ncompleted: # stale data + logging.warn("completion time %s for checkpoint %s " + "became stale" % (self.humantime(*completed), chkpt)) completed = None gconf.configinterface.delete('checkpoint_completed') - if ncompleted and not completed: # just reaching completion + if ncompleted and not completed: # just reaching completion completed = "%.6f" % time.time() - self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False) + self._set_checkpt_param( + chkpt, 'completed', completed, xtimish=False) completed = tuple(int(x) for x in completed.split('.')) logging.info("checkpoint %s completed" % chkpt) status = completed and \ - "completed at " + self.humantime(completed[0]) or \ - "not reached yet" + "completed at " + self.humantime(completed[0]) or \ + "not reached yet" if s: conn = None try: conn, _ = chan.accept() try: - conn.send("checkpoint %s is %s\0" % (chkpt_lbl, status)) + conn.send("checkpoint %s is %s\0" % + (chkpt_lbl, status)) except: exc = sys.exc_info()[1] - if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ - exc.errno == EPIPE: + if ((isinstance(exc, OSError) or isinstance( + exc, IOError)) and exc.errno == EPIPE): logging.debug('checkpoint client disconnected') else: raise @@ -602,11 +644,13 @@ class GMasterCommon(object): def start_checkpoint_thread(self): """prepare and start checkpoint service""" if self.checkpoint_thread or not ( - getattr(gconf, 'state_socket_unencoded', None) and getattr(gconf, 'socketdir', None) + getattr(gconf, 'state_socket_unencoded', None) and getattr( + gconf, 'socketdir', None) ): return chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - state_socket = os.path.join(gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket") + state_socket = os.path.join( + gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket") try: os.unlink(state_socket) except: @@ -621,9 +665,9 @@ class GMasterCommon(object): def add_job(self, path, label, job, *a, **kw): """insert @job function to job table at @path with @label""" - if self.jobtab.get(path) == None: + if self.jobtab.get(path) is None: self.jobtab[path] = [] - self.jobtab[path].append((label, a, lambda : job(*a, **kw))) + self.jobtab[path].append((label, a, lambda: job(*a, **kw))) def add_failjob(self, path, label): """invoke .add_job with a job that does nothing just fails""" @@ -644,7 +688,7 @@ class GMasterCommon(object): ret = j[-1]() if not ret: succeed = False - if succeed and not args[0] == None: + if succeed and not args[0] is None: self.sendmark(path, *args) return succeed @@ -657,19 +701,21 @@ class GMasterCommon(object): self.slave.server.setattr(path, adct) self.set_slave_xtime(path, mark) + class GMasterChangelogMixin(GMasterCommon): + """ changelog based change detection and syncing """ # index for change type and entry IDX_START = 0 - IDX_END = 2 + IDX_END = 2 - POS_GFID = 0 - POS_TYPE = 1 + POS_GFID = 0 + POS_TYPE = 1 POS_ENTRY1 = -1 - TYPE_META = "M " - TYPE_GFID = "D " + TYPE_META = "M " + TYPE_GFID = "D " TYPE_ENTRY = "E " # flat directory heirarchy for gfid based access @@ -686,18 +732,19 @@ class GMasterChangelogMixin(GMasterCommon): def setup_working_dir(self): workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path)) logfile = os.path.join(workdir, 'changes.log') - logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile)) + logging.debug('changelog working dir %s (log: %s)' % + (workdir, logfile)) return (workdir, logfile) def process_change(self, change, done, retry): pfx = gauxpfx() - clist = [] + clist = [] entries = [] meta_gfid = set() datas = set() # basic crawl stats: files and bytes - files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []} + files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []} try: f = open(change, "r") clist = f.readlines() @@ -750,17 +797,19 @@ class GMasterChangelogMixin(GMasterCommon): elif ty in ['CREATE', 'MKDIR', 'MKNOD']: entry_update() # stat information present in the changelog itself - entries.append(edct(ty, gfid=gfid, entry=en, mode=int(ec[2]),\ + entries.append(edct(ty, gfid=gfid, entry=en, + mode=int(ec[2]), uid=int(ec[3]), gid=int(ec[4]))) else: # stat() to get mode and other information go = os.path.join(pfx, gfid) st = lstat(go) if isinstance(st, int): - if ty == 'RENAME': # special hack for renames... + if ty == 'RENAME': # special hack for renames... entries.append(edct('UNLINK', gfid=gfid, entry=en)) else: - logging.debug('file %s got purged in the interim' % go) + logging.debug( + 'file %s got purged in the interim' % go) continue if ty == 'LINK': @@ -771,17 +820,20 @@ class GMasterChangelogMixin(GMasterCommon): if isinstance(rl, int): continue entry_update() - entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) + entries.append( + edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) elif ty == 'RENAME': entry_update() - e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) - entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st)) + e1 = unescape( + os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) + entries.append( + edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st)) else: logging.warn('ignoring %s [op %s]' % (gfid, ty)) elif et == self.TYPE_GFID: datas.add(os.path.join(pfx, ec[0])) elif et == self.TYPE_META: - if ec[1] == 'SETATTR': # only setattr's for now... + if ec[1] == 'SETATTR': # only setattr's for now... meta_gfid.add(os.path.join(pfx, ec[0])) else: logging.warn('got invalid changelog type: %s' % (et)) @@ -789,10 +841,10 @@ class GMasterChangelogMixin(GMasterCommon): if not retry: self.update_worker_cumilitive_status(files_pending) # sync namespace - if (entries): + if entries: self.slave.server.entry_ops(entries) # sync metadata - if (meta_gfid): + if meta_gfid: meta_entries = [] for go in meta_gfid: st = lstat(go) @@ -814,22 +866,25 @@ class GMasterChangelogMixin(GMasterCommon): self.skipped_gfid_list = [] self.current_files_skipped_count = 0 - # first, fire all changelog transfers in parallel. entry and metadata - # are performed synchronously, therefore in serial. However at the end - # of each changelog, data is synchronized with syncdata_async() - which - # means it is serial w.r.t entries/metadata of that changelog but - # happens in parallel with data of other changelogs. + # first, fire all changelog transfers in parallel. entry and + # metadata are performed synchronously, therefore in serial. + # However at the end of each changelog, data is synchronized + # with syncdata_async() - which means it is serial w.r.t + # entries/metadata of that changelog but happens in parallel + # with data of other changelogs. for change in changes: logging.debug('processing change %s' % change) self.process_change(change, done, retry) if not retry: - self.turns += 1 # number of changelogs processed in the batch + # number of changelogs processed in the batch + self.turns += 1 - # Now we wait for all the data transfers fired off in the above step - # to complete. Note that this is not ideal either. Ideally we want to - # trigger the entry/meta-data transfer of the next batch while waiting - # for the data transfer of the current batch to finish. + # Now we wait for all the data transfers fired off in the above + # step to complete. Note that this is not ideal either. Ideally + # we want to trigger the entry/meta-data transfer of the next + # batch while waiting for the data transfer of the current batch + # to finish. # Note that the reason to wait for the data transfer (vs doing it # completely in the background and call the changelog_done() @@ -837,10 +892,11 @@ class GMasterChangelogMixin(GMasterCommon): # and prevents a spiraling increase of wait stubs from consuming # unbounded memory and resources. - # update the slave's time with the timestamp of the _last_ changelog - # file time suffix. Since, the changelog prefix time is the time when - # the changelog was rolled over, introduce a tolerence of 1 second to - # counter the small delta b/w the marker update and gettimeofday(). + # update the slave's time with the timestamp of the _last_ + # changelog file time suffix. Since, the changelog prefix time + # is the time when the changelog was rolled over, introduce a + # tolerence of 1 second to counter the small delta b/w the + # marker update and gettimeofday(). # NOTE: this is only for changelog mode, not xsync. # @change is the last changelog (therefore max time for this batch) @@ -856,10 +912,13 @@ class GMasterChangelogMixin(GMasterCommon): retry = True tries += 1 if tries == self.MAX_RETRIES: - logging.warn('changelogs %s could not be processed - moving on...' % \ + logging.warn('changelogs %s could not be processed - ' + 'moving on...' % ' '.join(map(os.path.basename, changes))) - self.update_worker_total_files_skipped(self.current_files_skipped_count) - logging.warn('SKIPPED GFID = %s' % ','.join(self.skipped_gfid_list)) + self.update_worker_total_files_skipped( + self.current_files_skipped_count) + logging.warn('SKIPPED GFID = %s' % + ','.join(self.skipped_gfid_list)) self.update_worker_files_syncd() if done: xtl = (int(change.split('.')[-1]) - 1, 0) @@ -873,7 +932,7 @@ class GMasterChangelogMixin(GMasterCommon): # entry_ops() that failed... so we retry the _whole_ changelog # again. # TODO: remove entry retries when it's gets fixed. - logging.warn('incomplete sync, retrying changelogs: %s' % \ + logging.warn('incomplete sync, retrying changelogs: %s' % ' '.join(map(os.path.basename, changes))) time.sleep(0.5) @@ -884,15 +943,15 @@ class GMasterChangelogMixin(GMasterCommon): self.sendmark(path, stime) def get_worker_status_file(self): - file_name = gconf.local_path+'.status' + file_name = gconf.local_path + '.status' file_name = file_name.replace("/", "_") - worker_status_file = gconf.georep_session_working_dir+file_name + worker_status_file = gconf.georep_session_working_dir + file_name return worker_status_file def update_worker_status(self, key, value): - default_data = {"remote_node":"N/A", - "worker status":"Not Started", - "crawl status":"N/A", + default_data = {"remote_node": "N/A", + "worker status": "Not Started", + "crawl status": "N/A", "files_syncd": 0, "files_remaining": 0, "bytes_remaining": 0, @@ -909,7 +968,7 @@ class GMasterChangelogMixin(GMasterCommon): f.flush() os.fsync(f.fileno()) except (IOError, OSError, ValueError): - logging.info ('Creating new %s' % worker_status_file) + logging.info('Creating new %s' % worker_status_file) try: with open(worker_status_file, 'wb') as f: default_data[key] = value @@ -920,9 +979,9 @@ class GMasterChangelogMixin(GMasterCommon): raise def update_worker_cumilitive_status(self, files_pending): - default_data = {"remote_node":"N/A", - "worker status":"Not Started", - "crawl status":"N/A", + default_data = {"remote_node": "N/A", + "worker status": "Not Started", + "crawl status": "N/A", "files_syncd": 0, "files_remaining": 0, "bytes_remaining": 0, @@ -932,8 +991,8 @@ class GMasterChangelogMixin(GMasterCommon): try: with open(worker_status_file, 'r+') as f: loaded_data = json.load(f) - loaded_data['files_remaining'] = files_pending['count'] - loaded_data['bytes_remaining'] = files_pending['bytes'] + loaded_data['files_remaining'] = files_pending['count'] + loaded_data['bytes_remaining'] = files_pending['bytes'] loaded_data['purges_remaining'] = files_pending['purge'] os.ftruncate(f.fileno(), 0) os.lseek(f.fileno(), 0, os.SEEK_SET) @@ -941,11 +1000,11 @@ class GMasterChangelogMixin(GMasterCommon): f.flush() os.fsync(f.fileno()) except (IOError, OSError, ValueError): - logging.info ('Creating new %s' % worker_status_file) + logging.info('Creating new %s' % worker_status_file) try: with open(worker_status_file, 'wb') as f: - default_data['files_remaining'] = files_pending['count'] - default_data['bytes_remaining'] = files_pending['bytes'] + default_data['files_remaining'] = files_pending['count'] + default_data['bytes_remaining'] = files_pending['bytes'] default_data['purges_remaining'] = files_pending['purge'] json.dump(default_data, f) f.flush() @@ -953,24 +1012,24 @@ class GMasterChangelogMixin(GMasterCommon): except: raise - def update_worker_remote_node (self): + def update_worker_remote_node(self): node = sys.argv[-1] node = node.split("@")[-1] remote_node_ip = node.split(":")[0] remote_node_vol = node.split(":")[3] remote_node = remote_node_ip + '::' + remote_node_vol - self.update_worker_status ('remote_node', remote_node) + self.update_worker_status('remote_node', remote_node) - def update_worker_health (self, state): - self.update_worker_status ('worker status', state) + def update_worker_health(self, state): + self.update_worker_status('worker status', state) - def update_worker_crawl_status (self, state): - self.update_worker_status ('crawl status', state) + def update_worker_crawl_status(self, state): + self.update_worker_status('crawl status', state) - def update_worker_files_syncd (self): - default_data = {"remote_node":"N/A", - "worker status":"Not Started", - "crawl status":"N/A", + def update_worker_files_syncd(self): + default_data = {"remote_node": "N/A", + "worker status": "Not Started", + "crawl status": "N/A", "files_syncd": 0, "files_remaining": 0, "bytes_remaining": 0, @@ -981,8 +1040,8 @@ class GMasterChangelogMixin(GMasterCommon): with open(worker_status_file, 'r+') as f: loaded_data = json.load(f) loaded_data['files_syncd'] += loaded_data['files_remaining'] - loaded_data['files_remaining'] = 0 - loaded_data['bytes_remaining'] = 0 + loaded_data['files_remaining'] = 0 + loaded_data['bytes_remaining'] = 0 loaded_data['purges_remaining'] = 0 os.ftruncate(f.fileno(), 0) os.lseek(f.fileno(), 0, os.SEEK_SET) @@ -990,7 +1049,7 @@ class GMasterChangelogMixin(GMasterCommon): f.flush() os.fsync(f.fileno()) except (IOError, OSError, ValueError): - logging.info ('Creating new %s' % worker_status_file) + logging.info('Creating new %s' % worker_status_file) try: with open(worker_status_file, 'wb') as f: json.dump(default_data, f) @@ -999,19 +1058,19 @@ class GMasterChangelogMixin(GMasterCommon): except: raise - def update_worker_files_remaining (self, state): - self.update_worker_status ('files_remaining', state) + def update_worker_files_remaining(self, state): + self.update_worker_status('files_remaining', state) - def update_worker_bytes_remaining (self, state): - self.update_worker_status ('bytes_remaining', state) + def update_worker_bytes_remaining(self, state): + self.update_worker_status('bytes_remaining', state) - def update_worker_purges_remaining (self, state): - self.update_worker_status ('purges_remaining', state) + def update_worker_purges_remaining(self, state): + self.update_worker_status('purges_remaining', state) - def update_worker_total_files_skipped (self, value): - default_data = {"remote_node":"N/A", - "worker status":"Not Started", - "crawl status":"N/A", + def update_worker_total_files_skipped(self, value): + default_data = {"remote_node": "N/A", + "worker status": "Not Started", + "crawl status": "N/A", "files_syncd": 0, "files_remaining": 0, "bytes_remaining": 0, @@ -1029,7 +1088,7 @@ class GMasterChangelogMixin(GMasterCommon): f.flush() os.fsync(f.fileno()) except (IOError, OSError, ValueError): - logging.info ('Creating new %s' % worker_status_file) + logging.info('Creating new %s' % worker_status_file) try: with open(worker_status_file, 'wb') as f: default_data['total_files_skipped'] = value @@ -1057,9 +1116,12 @@ class GMasterChangelogMixin(GMasterCommon): if changes: if purge_time: logging.info("slave's time: %s" % repr(purge_time)) - processed = [x for x in changes if int(x.split('.')[-1]) < purge_time[0]] + processed = [x for x in changes + if int(x.split('.')[-1]) < purge_time[0]] for pr in processed: - logging.info('skipping already processed change: %s...' % os.path.basename(pr)) + logging.info( + 'skipping already processed change: %s...' % + os.path.basename(pr)) self.master.server.changelog_done(pr) changes.remove(pr) logging.debug('processing changes %s' % repr(changes)) @@ -1080,7 +1142,9 @@ class GMasterChangelogMixin(GMasterCommon): # control should not reach here raise + class GMasterXsyncMixin(GMasterChangelogMixin): + """ This crawl needs to be xtime based (as of now it's not. this is beacuse we generate CHANGELOG @@ -1091,7 +1155,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): files, hardlinks and symlinks. """ - XSYNC_MAX_ENTRIES = 1<<13 + XSYNC_MAX_ENTRIES = 1 << 13 def register(self): self.counter = 0 @@ -1145,7 +1209,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin): def open(self): try: - self.xsync_change = os.path.join(self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time()))) + self.xsync_change = os.path.join( + self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time()))) self.fh = open(self.xsync_change, 'w') except IOError: raise @@ -1165,7 +1230,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): self.put('xsync', self.fname()) self.counter = 0 if not last: - time.sleep(1) # make sure changelogs are 1 second apart + time.sleep(1) # make sure changelogs are 1 second apart self.open() def sync_stime(self, stime=None, last=False): @@ -1207,7 +1272,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): xtr_root = self.xtime('.', self.slave) if isinstance(xtr_root, int): if xtr_root != ENOENT: - logging.warn("slave cluster not returning the " \ + logging.warn("slave cluster not returning the " "correct xtime for root (%d)" % xtr_root) xtr_root = self.minus_infinity xtl = self.xtime(path) @@ -1216,7 +1281,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): xtr = self.xtime(path, self.slave) if isinstance(xtr, int): if xtr != ENOENT: - logging.warn("slave cluster not returning the " \ + logging.warn("slave cluster not returning the " "correct xtime for %s (%d)" % (path, xtr)) xtr = self.minus_infinity xtr = max(xtr, xtr_root) @@ -1235,7 +1300,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin): e = os.path.join(path, e) xte = self.xtime(e) if isinstance(xte, int): - logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) + logging.warn("irregular xtime for %s: %s" % + (e, errno.errorcode[xte])) continue if not self.need_sync(e, xte, xtr): continue @@ -1256,35 +1322,51 @@ class GMasterXsyncMixin(GMasterChangelogMixin): self.sync_done(self.stimes, False) self.stimes = [] if stat.S_ISDIR(mo): - self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))]) + self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str( + st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, + bname))]) self.Xcrawl(e, xtr_root) self.stimes.append((e, xte)) elif stat.S_ISLNK(mo): - self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))]) + self.write_entry_change( + "E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, + bname))]) elif stat.S_ISREG(mo): nlink = st.st_nlink - nlink -= 1 # fixup backend stat link count - # if a file has a hardlink, create a Changelog entry as 'LINK' so the slave - # side will decide if to create the new entry, or to create link. + nlink -= 1 # fixup backend stat link count + # if a file has a hardlink, create a Changelog entry as + # 'LINK' so the slave side will decide if to create the + # new entry, or to create link. if nlink == 1: - self.write_entry_change("E", [gfid, 'MKNOD', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))]) + self.write_entry_change("E", + [gfid, 'MKNOD', str(mo), + str(st.st_uid), + str(st.st_gid), + escape(os.path.join( + pargfid, bname))]) else: - self.write_entry_change("E", [gfid, 'LINK', escape(os.path.join(pargfid, bname))]) + self.write_entry_change( + "E", [gfid, 'LINK', escape(os.path.join(pargfid, + bname))]) self.write_entry_change("D", [gfid]) if path == '.': self.stimes.append((path, xtl)) self.sync_done(self.stimes, True) + class BoxClosedErr(Exception): pass + class PostBox(list): + """synchronized collection for storing things thought of as "requests" """ def __init__(self, *a): list.__init__(self, *a) # too bad Python stdlib does not have read/write locks... - # it would suffivce to grab the lock in .append as reader, in .close as writer + # it would suffivce to grab the lock in .append as reader, in .close as + # writer self.lever = Condition() self.open = True self.done = False @@ -1319,7 +1401,9 @@ class PostBox(list): self.open = False self.lever.release() + class Syncer(object): + """a staged queue to relay rsync requests to rsync workers By "staged queue" its meant that when a consumer comes to the |