summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2014-03-21 12:33:10 +0530
committerVijay Bellur <vbellur@redhat.com>2014-04-07 21:56:55 -0700
commit238d101e55e067e5afcd43c728884e9ab8d36549 (patch)
tree60498b107335c0ae526bfa034bd56303406710ab /geo-replication/syncdaemon/master.py
parent0c20b17c09b2eca82f3c79013fd3fe1c72a957fd (diff)
geo-rep: code pep8/flake8 fixes
pep8 is a style guide for python. http://legacy.python.org/dev/peps/pep-0008/ pep8 can be installed using, `pip install pep8` Usage: `pep8 <python file>`, For example, `pep8 master.py` will display all the coding standard errors. flake8 is used to identify unused imports and other issues in code. pip install flake8 cd $GLUSTER_REPO/geo-replication/ flake8 syncdaemon Updated license headers to each source file. Change-Id: I01c7d0a6091d21bfa48720e9fb5624b77fa3db4a Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/7311 Reviewed-by: Kotresh HR <khiremat@redhat.com> Reviewed-by: Prashanth Pai <ppai@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py384
1 files changed, 234 insertions, 150 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 98a61bc1d75..4301396f9f4 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -1,25 +1,30 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
import os
import sys
import time
import stat
-import random
-import signal
import json
import logging
import socket
import string
import errno
-from shutil import copyfileobj
-from errno import ENOENT, ENODATA, EPIPE, EEXIST, errorcode
-from threading import currentThread, Condition, Lock
+from errno import ENOENT, ENODATA, EPIPE, EEXIST
+from threading import Condition, Lock
from datetime import datetime
-from libcxattr import Xattr
-
from gconf import gconf
-from tempfile import mkdtemp, NamedTemporaryFile
-from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \
- unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \
- lstat, errno_wrap, update_file
+from tempfile import NamedTemporaryFile
+from syncdutils import Thread, GsyncdError, boolify, escape
+from syncdutils import unescape, select, gauxpfx, md5hex, selfkill
+from syncdutils import lstat, errno_wrap
URXTIME = (-1, 0)
@@ -27,18 +32,20 @@ URXTIME = (-1, 0)
# of the DRY principle (no, don't look for elevated or
# perspectivistic things here)
+
def _xtime_now():
t = time.time()
sec = int(t)
nsec = int((t - sec) * 1000000)
return (sec, nsec)
+
def _volinfo_hook_relax_foreign(self):
volinfo_sys = self.get_sys_volinfo()
fgn_vi = volinfo_sys[self.KFGN]
if fgn_vi:
expiry = fgn_vi['timeout'] - int(time.time()) + 1
- logging.info('foreign volume info found, waiting %d sec for expiry' % \
+ logging.info('foreign volume info found, waiting %d sec for expiry' %
expiry)
time.sleep(expiry)
volinfo_sys = self.get_sys_volinfo()
@@ -58,10 +65,14 @@ def gmaster_builder(excrawl=None):
logging.info('setting up %s change detection mode' % changemixin)
modemixin = getattr(this, modemixin.capitalize() + 'Mixin')
crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
- sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin
- purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin
+ sendmarkmixin = boolify(
+ gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin
+ purgemixin = boolify(
+ gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin
syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine
- class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin, syncengine):
+
+ class _GMaster(crawlmixin, modemixin, sendmarkmixin,
+ purgemixin, syncengine):
pass
return _GMaster
@@ -71,6 +82,7 @@ def gmaster_builder(excrawl=None):
# sync modes
class NormalMixin(object):
+
"""normal geo-rep behavior"""
minus_infinity = URXTIME
@@ -152,14 +164,18 @@ class NormalMixin(object):
self.slave.server.set_stime(path, self.uuid, mark)
# self.slave.server.set_xtime_remote(path, self.uuid, mark)
+
class PartialMixin(NormalMixin):
+
"""a variant tuned towards operation with a master
that has partial info of the slave (brick typically)"""
def xtime_reversion_hook(self, path, xtl, xtr):
pass
+
class RecoverMixin(NormalMixin):
+
"""a variant that differs from normal in terms
of ignoring non-indexed files"""
@@ -178,11 +194,13 @@ class RecoverMixin(NormalMixin):
# Further mixins for certain tunable behaviors
+
class SendmarkNormalMixin(object):
def sendmark_regular(self, *a, **kw):
return self.sendmark(*a, **kw)
+
class SendmarkRsyncMixin(object):
def sendmark_regular(self, *a, **kw):
@@ -194,19 +212,24 @@ class PurgeNormalMixin(object):
def purge_missing(self, path, names):
self.slave.server.purge(path, names)
+
class PurgeNoopMixin(object):
def purge_missing(self, path, names):
pass
+
class TarSSHEngine(object):
+
"""Sync engine that uses tar(1) piped over ssh(1)
for data transfers. Good for lots of small files.
"""
+
def a_syncdata(self, files):
logging.debug('files: %s' % (files))
for f in files:
pb = self.syncer.add(f)
+
def regjob(se, xte, pb):
rv = pb.wait()
if rv[0]:
@@ -228,13 +251,17 @@ class TarSSHEngine(object):
self.a_syncdata(files)
self.syncdata_wait()
+
class RsyncEngine(object):
+
"""Sync engine that uses rsync(1) for data transfers"""
+
def a_syncdata(self, files):
logging.debug('files: %s' % (files))
for f in files:
logging.debug('candidate for syncing %s' % f)
pb = self.syncer.add(f)
+
def regjob(se, xte, pb):
rv = pb.wait()
if rv[0]:
@@ -258,7 +285,9 @@ class RsyncEngine(object):
self.a_syncdata(files)
self.syncdata_wait()
+
class GMasterCommon(object):
+
"""abstract class impementling master role"""
KFGN = 0
@@ -269,8 +298,9 @@ class GMasterCommon(object):
err out on multiple foreign masters
"""
- fgn_vis, nat_vi = self.master.server.aggregated.foreign_volume_infos(), \
- self.master.server.aggregated.native_volume_info()
+ fgn_vis, nat_vi = (
+ self.master.server.aggregated.foreign_volume_infos(),
+ self.master.server.aggregated.native_volume_info())
fgn_vi = None
if fgn_vis:
if len(fgn_vis) > 1:
@@ -316,15 +346,14 @@ class GMasterCommon(object):
if getattr(gconf, 'state_detail_file', None):
try:
with open(gconf.state_detail_file, 'r+') as f:
- loaded_data= json.load(f)
- diff_data = set(default_data) - set (loaded_data)
+ loaded_data = json.load(f)
+ diff_data = set(default_data) - set(loaded_data)
if len(diff_data):
for i in diff_data:
loaded_data[i] = default_data[i]
return loaded_data
- except (IOError):
- ex = sys.exc_info()[1]
- logging.warn ('Creating new gconf.state_detail_file.')
+ except IOError:
+ logging.warn('Creating new gconf.state_detail_file.')
# Create file with initial data
try:
with open(gconf.state_detail_file, 'wb') as f:
@@ -364,7 +393,8 @@ class GMasterCommon(object):
# - self.turns is the number of turns since start
# - self.total_turns is a limit so that if self.turns reaches it, then
# we exit (for diagnostic purposes)
- # so, eg., if the master fs changes unceasingly, self.turns will remain 0.
+ # so, eg., if the master fs changes unceasingly, self.turns will remain
+ # 0.
self.crawls = 0
self.turns = 0
self.total_turns = int(gconf.turns)
@@ -394,7 +424,7 @@ class GMasterCommon(object):
t.start()
def should_crawl(cls):
- return (gconf.glusterd_uuid in cls.master.server.node_uuid())
+ return gconf.glusterd_uuid in cls.master.server.node_uuid()
def register(self):
self.register()
@@ -416,18 +446,18 @@ class GMasterCommon(object):
volinfo_sys = self.volinfo_hook()
self.volinfo = volinfo_sys[self.KNAT]
inter_master = volinfo_sys[self.KFGN]
- logging.info("%s master with volume id %s ..." % \
- (inter_master and "intermediate" or "primary",
- self.uuid))
+ logging.info("%s master with volume id %s ..." %
+ (inter_master and "intermediate" or "primary",
+ self.uuid))
gconf.configinterface.set('volume_id', self.uuid)
if self.volinfo:
if self.volinfo['retval']:
- logging.warn("master cluster's info may not be valid %d" % \
+ logging.warn("master cluster's info may not be valid %d" %
self.volinfo['retval'])
self.start_checkpoint_thread()
else:
raise GsyncdError("master volinfo unavailable")
- self.total_crawl_stats = self.get_initial_crawl_data()
+ self.total_crawl_stats = self.get_initial_crawl_data()
self.lastreport['time'] = time.time()
logging.info('crawl interval: %d seconds' % self.sleep_interval)
@@ -435,7 +465,7 @@ class GMasterCommon(object):
crawl = self.should_crawl()
while not self.terminate:
if self.start:
- logging.debug("... crawl #%d done, took %.6f seconds" % \
+ logging.debug("... crawl #%d done, took %.6f seconds" %
(self.crawls, time.time() - self.start))
self.start = time.time()
should_display_info = self.start - self.lastreport['time'] >= 60
@@ -443,11 +473,11 @@ class GMasterCommon(object):
logging.info("%d crawls, %d turns",
self.crawls - self.lastreport['crawls'],
self.turns - self.lastreport['turns'])
- self.lastreport.update(crawls = self.crawls,
- turns = self.turns,
- time = self.start)
+ self.lastreport.update(crawls=self.crawls,
+ turns=self.turns,
+ time=self.start)
t1 = time.time()
- if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds
+ if int(t1 - t0) >= 60: # lets hardcode this check to 60 seconds
crawl = self.should_crawl()
t0 = t1
self.update_worker_remote_node()
@@ -456,11 +486,14 @@ class GMasterCommon(object):
# bring up _this_ brick to the cluster stime
# which is min of cluster (but max of the replicas)
brick_stime = self.xtime('.', self.slave)
- cluster_stime = self.master.server.aggregated.stime_mnt('.', '.'.join([str(self.uuid), str(gconf.slave_id)]))
- logging.debug("Cluster stime: %s | Brick stime: %s" % (repr(cluster_stime), repr(brick_stime)))
+ cluster_stime = self.master.server.aggregated.stime_mnt(
+ '.', '.'.join([str(self.uuid), str(gconf.slave_id)]))
+ logging.debug("Cluster stime: %s | Brick stime: %s" %
+ (repr(cluster_stime), repr(brick_stime)))
if not isinstance(cluster_stime, int):
if brick_stime < cluster_stime:
- self.slave.server.set_stime(self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
+ self.slave.server.set_stime(
+ self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
time.sleep(5)
continue
self.update_worker_health("Active")
@@ -489,13 +522,14 @@ class GMasterCommon(object):
with checkpoint @chkpt"""
if xtimish:
val = cls.serialize_xtime(val)
- gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))
+ gconf.configinterface.set(
+ 'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))
@staticmethod
def humantime(*tpair):
"""format xtime-like (sec, nsec) pair to human readable format"""
ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\
- strftime("%Y-%m-%d %H:%M:%S")
+ strftime("%Y-%m-%d %H:%M:%S")
if len(tpair) > 1:
ts += '.' + str(tpair[1])
return ts
@@ -506,7 +540,7 @@ class GMasterCommon(object):
years = int(years)
days = int(days)
- date=""
+ date = ""
m, s = divmod(crawl_time.seconds, 60)
h, m = divmod(m, 60)
@@ -515,7 +549,8 @@ class GMasterCommon(object):
if days != 0:
date += "%s %s " % (days, "day" if days == 1 else "days")
- date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2))
+ date += "%s:%s:%s" % (string.zfill(h, 2),
+ string.zfill(m, 2), string.zfill(s, 2))
return date
def checkpt_service(self, chan, chkpt):
@@ -540,16 +575,18 @@ class GMasterCommon(object):
if not checkpt_tgt:
checkpt_tgt = self.xtime('.')
if isinstance(checkpt_tgt, int):
- raise GsyncdError("master root directory is unaccessible (%s)",
+ raise GsyncdError("master root directory is "
+ "unaccessible (%s)",
os.strerror(checkpt_tgt))
self._set_checkpt_param(chkpt, 'target', checkpt_tgt)
- logging.debug("checkpoint target %s has been determined for checkpoint %s" % \
+ logging.debug("checkpoint target %s has been determined "
+ "for checkpoint %s" %
(repr(checkpt_tgt), chkpt))
# check if the label is 'now'
chkpt_lbl = chkpt
try:
- x1,x2 = chkpt.split(':')
+ x1, x2 = chkpt.split(':')
if x1 == 'now':
chkpt_lbl = "as of " + self.humantime(x2)
except:
@@ -557,41 +594,46 @@ class GMasterCommon(object):
completed = self._checkpt_param(chkpt, 'completed', xtimish=False)
if completed:
completed = tuple(int(x) for x in completed.split('.'))
- s,_,_ = select([chan], [], [], (not completed) and 5 or None)
+ s, _, _ = select([chan], [], [], (not completed) and 5 or None)
# either request made and we re-check to not
# give back stale data, or we still hunting for completion
- if self.native_xtime(checkpt_tgt) and self.native_xtime(checkpt_tgt) < self.volmark:
+ if (self.native_xtime(checkpt_tgt) and (
+ self.native_xtime(checkpt_tgt) < self.volmark)):
# indexing has been reset since setting the checkpoint
status = "is invalid"
else:
xtr = self.xtime('.', self.slave)
if isinstance(xtr, int):
- raise GsyncdError("slave root directory is unaccessible (%s)",
+ raise GsyncdError("slave root directory is "
+ "unaccessible (%s)",
os.strerror(xtr))
ncompleted = self.xtime_geq(xtr, checkpt_tgt)
- if completed and not ncompleted: # stale data
- logging.warn("completion time %s for checkpoint %s became stale" % \
+ if completed and not ncompleted: # stale data
+ logging.warn("completion time %s for checkpoint %s "
+ "became stale" %
(self.humantime(*completed), chkpt))
completed = None
gconf.configinterface.delete('checkpoint_completed')
- if ncompleted and not completed: # just reaching completion
+ if ncompleted and not completed: # just reaching completion
completed = "%.6f" % time.time()
- self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False)
+ self._set_checkpt_param(
+ chkpt, 'completed', completed, xtimish=False)
completed = tuple(int(x) for x in completed.split('.'))
logging.info("checkpoint %s completed" % chkpt)
status = completed and \
- "completed at " + self.humantime(completed[0]) or \
- "not reached yet"
+ "completed at " + self.humantime(completed[0]) or \
+ "not reached yet"
if s:
conn = None
try:
conn, _ = chan.accept()
try:
- conn.send("checkpoint %s is %s\0" % (chkpt_lbl, status))
+ conn.send("checkpoint %s is %s\0" %
+ (chkpt_lbl, status))
except:
exc = sys.exc_info()[1]
- if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \
- exc.errno == EPIPE:
+ if ((isinstance(exc, OSError) or isinstance(
+ exc, IOError)) and exc.errno == EPIPE):
logging.debug('checkpoint client disconnected')
else:
raise
@@ -602,11 +644,13 @@ class GMasterCommon(object):
def start_checkpoint_thread(self):
"""prepare and start checkpoint service"""
if self.checkpoint_thread or not (
- getattr(gconf, 'state_socket_unencoded', None) and getattr(gconf, 'socketdir', None)
+ getattr(gconf, 'state_socket_unencoded', None) and getattr(
+ gconf, 'socketdir', None)
):
return
chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- state_socket = os.path.join(gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket")
+ state_socket = os.path.join(
+ gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket")
try:
os.unlink(state_socket)
except:
@@ -621,9 +665,9 @@ class GMasterCommon(object):
def add_job(self, path, label, job, *a, **kw):
"""insert @job function to job table at @path with @label"""
- if self.jobtab.get(path) == None:
+ if self.jobtab.get(path) is None:
self.jobtab[path] = []
- self.jobtab[path].append((label, a, lambda : job(*a, **kw)))
+ self.jobtab[path].append((label, a, lambda: job(*a, **kw)))
def add_failjob(self, path, label):
"""invoke .add_job with a job that does nothing just fails"""
@@ -644,7 +688,7 @@ class GMasterCommon(object):
ret = j[-1]()
if not ret:
succeed = False
- if succeed and not args[0] == None:
+ if succeed and not args[0] is None:
self.sendmark(path, *args)
return succeed
@@ -657,19 +701,21 @@ class GMasterCommon(object):
self.slave.server.setattr(path, adct)
self.set_slave_xtime(path, mark)
+
class GMasterChangelogMixin(GMasterCommon):
+
""" changelog based change detection and syncing """
# index for change type and entry
IDX_START = 0
- IDX_END = 2
+ IDX_END = 2
- POS_GFID = 0
- POS_TYPE = 1
+ POS_GFID = 0
+ POS_TYPE = 1
POS_ENTRY1 = -1
- TYPE_META = "M "
- TYPE_GFID = "D "
+ TYPE_META = "M "
+ TYPE_GFID = "D "
TYPE_ENTRY = "E "
# flat directory heirarchy for gfid based access
@@ -686,18 +732,19 @@ class GMasterChangelogMixin(GMasterCommon):
def setup_working_dir(self):
workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path))
logfile = os.path.join(workdir, 'changes.log')
- logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile))
+ logging.debug('changelog working dir %s (log: %s)' %
+ (workdir, logfile))
return (workdir, logfile)
def process_change(self, change, done, retry):
pfx = gauxpfx()
- clist = []
+ clist = []
entries = []
meta_gfid = set()
datas = set()
# basic crawl stats: files and bytes
- files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}
+ files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}
try:
f = open(change, "r")
clist = f.readlines()
@@ -750,17 +797,19 @@ class GMasterChangelogMixin(GMasterCommon):
elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
entry_update()
# stat information present in the changelog itself
- entries.append(edct(ty, gfid=gfid, entry=en, mode=int(ec[2]),\
+ entries.append(edct(ty, gfid=gfid, entry=en,
+ mode=int(ec[2]),
uid=int(ec[3]), gid=int(ec[4])))
else:
# stat() to get mode and other information
go = os.path.join(pfx, gfid)
st = lstat(go)
if isinstance(st, int):
- if ty == 'RENAME': # special hack for renames...
+ if ty == 'RENAME': # special hack for renames...
entries.append(edct('UNLINK', gfid=gfid, entry=en))
else:
- logging.debug('file %s got purged in the interim' % go)
+ logging.debug(
+ 'file %s got purged in the interim' % go)
continue
if ty == 'LINK':
@@ -771,17 +820,20 @@ class GMasterChangelogMixin(GMasterCommon):
if isinstance(rl, int):
continue
entry_update()
- entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
+ entries.append(
+ edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
elif ty == 'RENAME':
entry_update()
- e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
- entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st))
+ e1 = unescape(
+ os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
+ entries.append(
+ edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st))
else:
logging.warn('ignoring %s [op %s]' % (gfid, ty))
elif et == self.TYPE_GFID:
datas.add(os.path.join(pfx, ec[0]))
elif et == self.TYPE_META:
- if ec[1] == 'SETATTR': # only setattr's for now...
+ if ec[1] == 'SETATTR': # only setattr's for now...
meta_gfid.add(os.path.join(pfx, ec[0]))
else:
logging.warn('got invalid changelog type: %s' % (et))
@@ -789,10 +841,10 @@ class GMasterChangelogMixin(GMasterCommon):
if not retry:
self.update_worker_cumilitive_status(files_pending)
# sync namespace
- if (entries):
+ if entries:
self.slave.server.entry_ops(entries)
# sync metadata
- if (meta_gfid):
+ if meta_gfid:
meta_entries = []
for go in meta_gfid:
st = lstat(go)
@@ -814,22 +866,25 @@ class GMasterChangelogMixin(GMasterCommon):
self.skipped_gfid_list = []
self.current_files_skipped_count = 0
- # first, fire all changelog transfers in parallel. entry and metadata
- # are performed synchronously, therefore in serial. However at the end
- # of each changelog, data is synchronized with syncdata_async() - which
- # means it is serial w.r.t entries/metadata of that changelog but
- # happens in parallel with data of other changelogs.
+ # first, fire all changelog transfers in parallel. entry and
+ # metadata are performed synchronously, therefore in serial.
+ # However at the end of each changelog, data is synchronized
+ # with syncdata_async() - which means it is serial w.r.t
+ # entries/metadata of that changelog but happens in parallel
+ # with data of other changelogs.
for change in changes:
logging.debug('processing change %s' % change)
self.process_change(change, done, retry)
if not retry:
- self.turns += 1 # number of changelogs processed in the batch
+ # number of changelogs processed in the batch
+ self.turns += 1
- # Now we wait for all the data transfers fired off in the above step
- # to complete. Note that this is not ideal either. Ideally we want to
- # trigger the entry/meta-data transfer of the next batch while waiting
- # for the data transfer of the current batch to finish.
+ # Now we wait for all the data transfers fired off in the above
+ # step to complete. Note that this is not ideal either. Ideally
+ # we want to trigger the entry/meta-data transfer of the next
+ # batch while waiting for the data transfer of the current batch
+ # to finish.
# Note that the reason to wait for the data transfer (vs doing it
# completely in the background and call the changelog_done()
@@ -837,10 +892,11 @@ class GMasterChangelogMixin(GMasterCommon):
# and prevents a spiraling increase of wait stubs from consuming
# unbounded memory and resources.
- # update the slave's time with the timestamp of the _last_ changelog
- # file time suffix. Since, the changelog prefix time is the time when
- # the changelog was rolled over, introduce a tolerence of 1 second to
- # counter the small delta b/w the marker update and gettimeofday().
+ # update the slave's time with the timestamp of the _last_
+ # changelog file time suffix. Since, the changelog prefix time
+ # is the time when the changelog was rolled over, introduce a
+ # tolerence of 1 second to counter the small delta b/w the
+ # marker update and gettimeofday().
# NOTE: this is only for changelog mode, not xsync.
# @change is the last changelog (therefore max time for this batch)
@@ -856,10 +912,13 @@ class GMasterChangelogMixin(GMasterCommon):
retry = True
tries += 1
if tries == self.MAX_RETRIES:
- logging.warn('changelogs %s could not be processed - moving on...' % \
+ logging.warn('changelogs %s could not be processed - '
+ 'moving on...' %
' '.join(map(os.path.basename, changes)))
- self.update_worker_total_files_skipped(self.current_files_skipped_count)
- logging.warn('SKIPPED GFID = %s' % ','.join(self.skipped_gfid_list))
+ self.update_worker_total_files_skipped(
+ self.current_files_skipped_count)
+ logging.warn('SKIPPED GFID = %s' %
+ ','.join(self.skipped_gfid_list))
self.update_worker_files_syncd()
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
@@ -873,7 +932,7 @@ class GMasterChangelogMixin(GMasterCommon):
# entry_ops() that failed... so we retry the _whole_ changelog
# again.
# TODO: remove entry retries when it's gets fixed.
- logging.warn('incomplete sync, retrying changelogs: %s' % \
+ logging.warn('incomplete sync, retrying changelogs: %s' %
' '.join(map(os.path.basename, changes)))
time.sleep(0.5)
@@ -884,15 +943,15 @@ class GMasterChangelogMixin(GMasterCommon):
self.sendmark(path, stime)
def get_worker_status_file(self):
- file_name = gconf.local_path+'.status'
+ file_name = gconf.local_path + '.status'
file_name = file_name.replace("/", "_")
- worker_status_file = gconf.georep_session_working_dir+file_name
+ worker_status_file = gconf.georep_session_working_dir + file_name
return worker_status_file
def update_worker_status(self, key, value):
- default_data = {"remote_node":"N/A",
- "worker status":"Not Started",
- "crawl status":"N/A",
+ default_data = {"remote_node": "N/A",
+ "worker status": "Not Started",
+ "crawl status": "N/A",
"files_syncd": 0,
"files_remaining": 0,
"bytes_remaining": 0,
@@ -909,7 +968,7 @@ class GMasterChangelogMixin(GMasterCommon):
f.flush()
os.fsync(f.fileno())
except (IOError, OSError, ValueError):
- logging.info ('Creating new %s' % worker_status_file)
+ logging.info('Creating new %s' % worker_status_file)
try:
with open(worker_status_file, 'wb') as f:
default_data[key] = value
@@ -920,9 +979,9 @@ class GMasterChangelogMixin(GMasterCommon):
raise
def update_worker_cumilitive_status(self, files_pending):
- default_data = {"remote_node":"N/A",
- "worker status":"Not Started",
- "crawl status":"N/A",
+ default_data = {"remote_node": "N/A",
+ "worker status": "Not Started",
+ "crawl status": "N/A",
"files_syncd": 0,
"files_remaining": 0,
"bytes_remaining": 0,
@@ -932,8 +991,8 @@ class GMasterChangelogMixin(GMasterCommon):
try:
with open(worker_status_file, 'r+') as f:
loaded_data = json.load(f)
- loaded_data['files_remaining'] = files_pending['count']
- loaded_data['bytes_remaining'] = files_pending['bytes']
+ loaded_data['files_remaining'] = files_pending['count']
+ loaded_data['bytes_remaining'] = files_pending['bytes']
loaded_data['purges_remaining'] = files_pending['purge']
os.ftruncate(f.fileno(), 0)
os.lseek(f.fileno(), 0, os.SEEK_SET)
@@ -941,11 +1000,11 @@ class GMasterChangelogMixin(GMasterCommon):
f.flush()
os.fsync(f.fileno())
except (IOError, OSError, ValueError):
- logging.info ('Creating new %s' % worker_status_file)
+ logging.info('Creating new %s' % worker_status_file)
try:
with open(worker_status_file, 'wb') as f:
- default_data['files_remaining'] = files_pending['count']
- default_data['bytes_remaining'] = files_pending['bytes']
+ default_data['files_remaining'] = files_pending['count']
+ default_data['bytes_remaining'] = files_pending['bytes']
default_data['purges_remaining'] = files_pending['purge']
json.dump(default_data, f)
f.flush()
@@ -953,24 +1012,24 @@ class GMasterChangelogMixin(GMasterCommon):
except:
raise
- def update_worker_remote_node (self):
+ def update_worker_remote_node(self):
node = sys.argv[-1]
node = node.split("@")[-1]
remote_node_ip = node.split(":")[0]
remote_node_vol = node.split(":")[3]
remote_node = remote_node_ip + '::' + remote_node_vol
- self.update_worker_status ('remote_node', remote_node)
+ self.update_worker_status('remote_node', remote_node)
- def update_worker_health (self, state):
- self.update_worker_status ('worker status', state)
+ def update_worker_health(self, state):
+ self.update_worker_status('worker status', state)
- def update_worker_crawl_status (self, state):
- self.update_worker_status ('crawl status', state)
+ def update_worker_crawl_status(self, state):
+ self.update_worker_status('crawl status', state)
- def update_worker_files_syncd (self):
- default_data = {"remote_node":"N/A",
- "worker status":"Not Started",
- "crawl status":"N/A",
+ def update_worker_files_syncd(self):
+ default_data = {"remote_node": "N/A",
+ "worker status": "Not Started",
+ "crawl status": "N/A",
"files_syncd": 0,
"files_remaining": 0,
"bytes_remaining": 0,
@@ -981,8 +1040,8 @@ class GMasterChangelogMixin(GMasterCommon):
with open(worker_status_file, 'r+') as f:
loaded_data = json.load(f)
loaded_data['files_syncd'] += loaded_data['files_remaining']
- loaded_data['files_remaining'] = 0
- loaded_data['bytes_remaining'] = 0
+ loaded_data['files_remaining'] = 0
+ loaded_data['bytes_remaining'] = 0
loaded_data['purges_remaining'] = 0
os.ftruncate(f.fileno(), 0)
os.lseek(f.fileno(), 0, os.SEEK_SET)
@@ -990,7 +1049,7 @@ class GMasterChangelogMixin(GMasterCommon):
f.flush()
os.fsync(f.fileno())
except (IOError, OSError, ValueError):
- logging.info ('Creating new %s' % worker_status_file)
+ logging.info('Creating new %s' % worker_status_file)
try:
with open(worker_status_file, 'wb') as f:
json.dump(default_data, f)
@@ -999,19 +1058,19 @@ class GMasterChangelogMixin(GMasterCommon):
except:
raise
- def update_worker_files_remaining (self, state):
- self.update_worker_status ('files_remaining', state)
+ def update_worker_files_remaining(self, state):
+ self.update_worker_status('files_remaining', state)
- def update_worker_bytes_remaining (self, state):
- self.update_worker_status ('bytes_remaining', state)
+ def update_worker_bytes_remaining(self, state):
+ self.update_worker_status('bytes_remaining', state)
- def update_worker_purges_remaining (self, state):
- self.update_worker_status ('purges_remaining', state)
+ def update_worker_purges_remaining(self, state):
+ self.update_worker_status('purges_remaining', state)
- def update_worker_total_files_skipped (self, value):
- default_data = {"remote_node":"N/A",
- "worker status":"Not Started",
- "crawl status":"N/A",
+ def update_worker_total_files_skipped(self, value):
+ default_data = {"remote_node": "N/A",
+ "worker status": "Not Started",
+ "crawl status": "N/A",
"files_syncd": 0,
"files_remaining": 0,
"bytes_remaining": 0,
@@ -1029,7 +1088,7 @@ class GMasterChangelogMixin(GMasterCommon):
f.flush()
os.fsync(f.fileno())
except (IOError, OSError, ValueError):
- logging.info ('Creating new %s' % worker_status_file)
+ logging.info('Creating new %s' % worker_status_file)
try:
with open(worker_status_file, 'wb') as f:
default_data['total_files_skipped'] = value
@@ -1057,9 +1116,12 @@ class GMasterChangelogMixin(GMasterCommon):
if changes:
if purge_time:
logging.info("slave's time: %s" % repr(purge_time))
- processed = [x for x in changes if int(x.split('.')[-1]) < purge_time[0]]
+ processed = [x for x in changes
+ if int(x.split('.')[-1]) < purge_time[0]]
for pr in processed:
- logging.info('skipping already processed change: %s...' % os.path.basename(pr))
+ logging.info(
+ 'skipping already processed change: %s...' %
+ os.path.basename(pr))
self.master.server.changelog_done(pr)
changes.remove(pr)
logging.debug('processing changes %s' % repr(changes))
@@ -1080,7 +1142,9 @@ class GMasterChangelogMixin(GMasterCommon):
# control should not reach here
raise
+
class GMasterXsyncMixin(GMasterChangelogMixin):
+
"""
This crawl needs to be xtime based (as of now
it's not. this is beacuse we generate CHANGELOG
@@ -1091,7 +1155,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
files, hardlinks and symlinks.
"""
- XSYNC_MAX_ENTRIES = 1<<13
+ XSYNC_MAX_ENTRIES = 1 << 13
def register(self):
self.counter = 0
@@ -1145,7 +1209,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
def open(self):
try:
- self.xsync_change = os.path.join(self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time())))
+ self.xsync_change = os.path.join(
+ self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time())))
self.fh = open(self.xsync_change, 'w')
except IOError:
raise
@@ -1165,7 +1230,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.put('xsync', self.fname())
self.counter = 0
if not last:
- time.sleep(1) # make sure changelogs are 1 second apart
+ time.sleep(1) # make sure changelogs are 1 second apart
self.open()
def sync_stime(self, stime=None, last=False):
@@ -1207,7 +1272,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
xtr_root = self.xtime('.', self.slave)
if isinstance(xtr_root, int):
if xtr_root != ENOENT:
- logging.warn("slave cluster not returning the " \
+ logging.warn("slave cluster not returning the "
"correct xtime for root (%d)" % xtr_root)
xtr_root = self.minus_infinity
xtl = self.xtime(path)
@@ -1216,7 +1281,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
xtr = self.xtime(path, self.slave)
if isinstance(xtr, int):
if xtr != ENOENT:
- logging.warn("slave cluster not returning the " \
+ logging.warn("slave cluster not returning the "
"correct xtime for %s (%d)" % (path, xtr))
xtr = self.minus_infinity
xtr = max(xtr, xtr_root)
@@ -1235,7 +1300,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
e = os.path.join(path, e)
xte = self.xtime(e)
if isinstance(xte, int):
- logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte]))
+ logging.warn("irregular xtime for %s: %s" %
+ (e, errno.errorcode[xte]))
continue
if not self.need_sync(e, xte, xtr):
continue
@@ -1256,35 +1322,51 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.sync_done(self.stimes, False)
self.stimes = []
if stat.S_ISDIR(mo):
- self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))])
+ self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str(
+ st.st_uid), str(st.st_gid), escape(os.path.join(pargfid,
+ bname))])
self.Xcrawl(e, xtr_root)
self.stimes.append((e, xte))
elif stat.S_ISLNK(mo):
- self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))])
+ self.write_entry_change(
+ "E", [gfid, 'SYMLINK', escape(os.path.join(pargfid,
+ bname))])
elif stat.S_ISREG(mo):
nlink = st.st_nlink
- nlink -= 1 # fixup backend stat link count
- # if a file has a hardlink, create a Changelog entry as 'LINK' so the slave
- # side will decide if to create the new entry, or to create link.
+ nlink -= 1 # fixup backend stat link count
+ # if a file has a hardlink, create a Changelog entry as
+ # 'LINK' so the slave side will decide if to create the
+ # new entry, or to create link.
if nlink == 1:
- self.write_entry_change("E", [gfid, 'MKNOD', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))])
+ self.write_entry_change("E",
+ [gfid, 'MKNOD', str(mo),
+ str(st.st_uid),
+ str(st.st_gid),
+ escape(os.path.join(
+ pargfid, bname))])
else:
- self.write_entry_change("E", [gfid, 'LINK', escape(os.path.join(pargfid, bname))])
+ self.write_entry_change(
+ "E", [gfid, 'LINK', escape(os.path.join(pargfid,
+ bname))])
self.write_entry_change("D", [gfid])
if path == '.':
self.stimes.append((path, xtl))
self.sync_done(self.stimes, True)
+
class BoxClosedErr(Exception):
pass
+
class PostBox(list):
+
"""synchronized collection for storing things thought of as "requests" """
def __init__(self, *a):
list.__init__(self, *a)
# too bad Python stdlib does not have read/write locks...
- # it would suffivce to grab the lock in .append as reader, in .close as writer
+ # it would suffivce to grab the lock in .append as reader, in .close as
+ # writer
self.lever = Condition()
self.open = True
self.done = False
@@ -1319,7 +1401,9 @@ class PostBox(list):
self.open = False
self.lever.release()
+
class Syncer(object):
+
"""a staged queue to relay rsync requests to rsync workers
By "staged queue" its meant that when a consumer comes to the