summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py632
1 files changed, 504 insertions, 128 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index f903f30595d..58df14954bb 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -4,22 +4,20 @@ import time
import stat
import random
import signal
+import json
import logging
import socket
+import string
import errno
-import re
-from errno import ENOENT, ENODATA, EPIPE
+from shutil import copyfileobj
+from errno import ENOENT, ENODATA, EPIPE, EEXIST
from threading import currentThread, Condition, Lock
from datetime import datetime
-try:
- from hashlib import md5 as md5
-except ImportError:
- # py 2.4
- from md5 import new as md5
from gconf import gconf
-from syncdutils import FreeObject, Thread, GsyncdError, boolify, \
- escape, unescape, select
+from tempfile import mkdtemp, NamedTemporaryFile
+from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \
+ unescape, select, gauxpfx, md5hex, selfkill, entry2pb
URXTIME = (-1, 0)
@@ -51,18 +49,20 @@ def _volinfo_hook_relax_foreign(self):
# The API!
-def gmaster_builder():
+def gmaster_builder(excrawl=None):
"""produce the GMaster class variant corresponding
to sync mode"""
this = sys.modules[__name__]
modemixin = gconf.special_sync_mode
if not modemixin:
modemixin = 'normal'
- logging.info('setting up master for %s sync mode' % modemixin)
+ changemixin = isinstance(excrawl, str) and excrawl or gconf.change_detector
+ logging.info('setting up %s change detection mode' % changemixin)
modemixin = getattr(this, modemixin.capitalize() + 'Mixin')
+ crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin
purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin
- class _GMaster(GMasterBase, modemixin, sendmarkmixin, purgemixin):
+ class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin):
pass
return _GMaster
@@ -100,12 +100,9 @@ class NormalMixin(object):
def make_xtime_opts(self, is_master, opts):
if not 'create' in opts:
- opts['create'] = is_master and not self.inter_master
+ opts['create'] = is_master
if not 'default_xtime' in opts:
- if is_master and self.inter_master:
- opts['default_xtime'] = ENODATA
- else:
- opts['default_xtime'] = URXTIME
+ opts['default_xtime'] = URXTIME
def xtime_low(self, server, path, **opts):
xt = server.xtime(path, self.uuid)
@@ -114,7 +111,7 @@ class NormalMixin(object):
if xt == ENODATA or xt < self.volmark:
if opts['create']:
xt = _xtime_now()
- server.set_xtime(path, self.uuid, xt)
+ server.aggregated.set_xtime(path, self.uuid, xt)
else:
xt = opts['default_xtime']
return xt
@@ -151,6 +148,13 @@ class NormalMixin(object):
def set_slave_xtime(self, path, mark):
self.slave.server.set_xtime(path, self.uuid, mark)
+class PartialMixin(NormalMixin):
+ """a variant tuned towards operation with a master
+ that has partial info of the slave (brick typically)"""
+
+ def xtime_reversion_hook(self, path, xtl, xtr):
+ pass
+
class WrapupMixin(NormalMixin):
"""a variant that differs from normal in terms
of ignoring non-indexed files"""
@@ -163,7 +167,7 @@ class WrapupMixin(NormalMixin):
opts['default_xtime'] = URXTIME
@staticmethod
- def keepalive_payload_hook(timo, gap):
+ def keepalive_payload_hook(self, timo, gap):
return (None, gap)
def volinfo_hook(self):
@@ -236,19 +240,19 @@ class BlindMixin(object):
# from interrupted gsyncd transfer
logging.warn('have to fix up missing xtime on ' + path)
xt0 = _xtime_now()
- server.set_xtime(path, self.uuid, xt0)
+ server.aggregated.set_xtime(path, self.uuid, xt0)
else:
xt0 = opts['default_xtime']
xt = (xt0, xt[1])
return xt
@staticmethod
- def keepalive_payload_hook(timo, gap):
+ def keepalive_payload_hook(self, timo, gap):
return (None, gap)
def volinfo_hook(self):
res = _volinfo_hook_relax_foreign(self)
- volinfo_r_new = self.slave.server.native_volume_info()
+ volinfo_r_new = self.slave.server.aggregated.native_volume_info()
if volinfo_r_new['retval']:
raise GsyncdError("slave is corrupt")
if getattr(self, 'volinfo_r', None):
@@ -321,9 +325,7 @@ class PurgeNoopMixin(object):
def purge_missing(self, path, names):
pass
-
-
-class GMasterBase(object):
+class GMasterCommon(object):
"""abstract class impementling master role"""
KFGN = 0
@@ -334,8 +336,8 @@ class GMasterBase(object):
err out on multiple foreign masters
"""
- fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \
- self.master.server.native_volume_info()
+ fgn_vis, nat_vi = self.master.server.aggregated.foreign_volume_infos(), \
+ self.master.server.aggregated.native_volume_info()
fgn_vi = None
if fgn_vis:
if len(fgn_vis) > 1:
@@ -376,6 +378,33 @@ class GMasterBase(object):
self.make_xtime_opts(rsc == self.master, opts)
return self.xtime_low(rsc.server, path, **opts)
+ def get_initial_crawl_data(self):
+ default_data = {'sync_time': 0, 'files_synced': 0, 'bytes_synced': 0}
+ if getattr(gconf, 'state_detail_file', None):
+ try:
+ return json.load(open(gconf.state_detail_file))
+ except (IOError, OSError):
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ # Create file with initial data
+ with open(gconf.state_detail_file, 'wb') as f:
+ json.dump(default_data, f)
+ return default_data
+ else:
+ raise
+
+ return default_data
+
+ def update_crawl_data(self):
+ if getattr(gconf, 'state_detail_file', None):
+ try:
+ same_dir = os.path.dirname(gconf.state_detail_file)
+ with NamedTemporaryFile(dir=same_dir, delete=False) as tmp:
+ json.dump(self.total_crawl_stats, tmp)
+ os.rename(tmp.name, gconf.state_detail_file)
+ except (IOError, OSError):
+ raise
+
def __init__(self, master, slave):
self.master = master
self.slave = slave
@@ -392,15 +421,12 @@ class GMasterBase(object):
self.crawls = 0
self.turns = 0
self.total_turns = int(gconf.turns)
- self.lastreport = {'crawls': 0, 'turns': 0}
+ self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0}
+ self.crawl_stats = {'sync_time': 0, 'last_synctime': 0, 'crawl_starttime': 0,
+ 'crawl_time': 0, 'files_synced': 0, 'bytes_synced' :0}
+ self.total_crawl_stats = self.get_initial_crawl_data()
self.start = None
self.change_seen = None
- self.syncTime=0
- self.lastSyncTime=0
- self.crawlStartTime=0
- self.crawlTime=0
- self.filesSynced=0
- self.bytesSynced=0
# the authoritative (foreign, native) volinfo pair
# which lets us deduce what to do when we refetch
# the volinfos from system
@@ -409,8 +435,94 @@ class GMasterBase(object):
# the actual volinfo we make use of
self.volinfo = None
self.terminate = False
+ self.sleep_interval = 1
self.checkpoint_thread = None
+ def init_keep_alive(cls):
+ """start the keep-alive thread """
+ timo = int(gconf.timeout or 0)
+ if timo > 0:
+ def keep_alive():
+ while True:
+ vi, gap = cls.keepalive_payload_hook(timo, timo * 0.5)
+ cls.slave.server.keep_alive(vi)
+ time.sleep(gap)
+ t = Thread(target=keep_alive)
+ t.start()
+
+ def volinfo_query(self):
+ """volume info state machine"""
+ volinfo_sys, state_change = self.volinfo_hook()
+ if self.inter_master:
+ self.volinfo = volinfo_sys[self.KFGN]
+ else:
+ self.volinfo = volinfo_sys[self.KNAT]
+ if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master):
+ logging.info('new master is %s', self.uuid)
+ if self.volinfo:
+ logging.info("%s master with volume id %s ..." % \
+ (self.inter_master and "intermediate" or "primary",
+ self.uuid))
+ if state_change == self.KFGN:
+ gconf.configinterface.set('volume_id', self.uuid)
+ if self.volinfo:
+ if self.volinfo['retval']:
+ raise GsyncdError ("master is corrupt")
+ self.start_checkpoint_thread()
+ else:
+ if should_display_info or self.crawls == 0:
+ if self.inter_master:
+ logging.info("waiting for being synced from %s ..." % \
+ self.volinfo_state[self.KFGN]['uuid'])
+ else:
+ logging.info("waiting for volume info ...")
+ return True
+
+ def should_crawl(cls):
+ return (gconf.glusterd_uuid in cls.master.server.node_uuid())
+
+ def register(self):
+ self.register()
+
+ def crawlwrap(self, oneshot=False):
+ if oneshot:
+ # it's important to do this during the oneshot crawl as
+ # for a passive gsyncd (ie. in a replicate scenario)
+ # the keepalive thread would keep the connection alive.
+ self.init_keep_alive()
+ self.lastreport['time'] = time.time()
+ self.crawl_stats['crawl_starttime'] = datetime.now()
+
+ logging.info('crawl interval: %d seconds' % self.sleep_interval)
+ t0 = time.time()
+ crawl = self.should_crawl()
+ while not self.terminate:
+ if self.volinfo_query():
+ continue
+ t1 = time.time()
+ if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds
+ crawl = self.should_crawl()
+ t0 = t1
+ if not crawl:
+ time.sleep(5)
+ continue
+ if self.start:
+ logging.debug("... crawl #%d done, took %.6f seconds" % \
+ (self.crawls, time.time() - self.start))
+ self.start = t1
+ should_display_info = self.start - self.lastreport['time'] >= 60
+ if should_display_info:
+ logging.info("%d crawls, %d turns",
+ self.crawls - self.lastreport['crawls'],
+ self.turns - self.lastreport['turns'])
+ self.lastreport.update(crawls = self.crawls,
+ turns = self.turns,
+ time = self.start)
+ self.crawl()
+ if oneshot:
+ return
+ time.sleep(self.sleep_interval)
+
@classmethod
def _checkpt_param(cls, chkpt, prm, xtimish=True):
"""use config backend to lookup a parameter belonging to
@@ -443,32 +555,37 @@ class GMasterBase(object):
return ts
def get_extra_info(self):
- str_info="\nFile synced : %d" %(self.filesSynced)
- str_info+="\nBytes Synced : %d KB" %(self.syncer.bytesSynced)
- str_info+="\nSync Time : %f seconds" %(self.syncTime)
- self.crawlTime=datetime.now()-self.crawlStartTime
- years , days =divmod(self.crawlTime.days,365.25)
- years=int(years)
- days=int(days)
+ str_info = "\nFilesSynced=%d;" % (self.crawl_stats['files_synced'])
+ str_info += "BytesSynced=%s;" % (self.crawl_stats['bytes_synced'])
+
+ self.crawl_stats['crawl_time'] = datetime.now() - self.crawl_stats['crawl_starttime']
+
+ str_info += "Uptime=%s;" % (self._crawl_time_format(self.crawl_stats['crawl_time']))
+ str_info += "SyncTime=%s;" % (self.crawl_stats['sync_time'])
+ str_info += "TotalSyncTime=%s;" % (self.total_crawl_stats['sync_time'])
+ str_info += "TotalFilesSynced=%d;" % (self.total_crawl_stats['files_synced'])
+ str_info += "TotalBytesSynced=%s;" % (self.total_crawl_stats['bytes_synced'])
+ str_info += "\0"
+ logging.debug(str_info)
+ return str_info
+
+ def _crawl_time_format(self, crawl_time):
+ # Ex: 5 years, 4 days, 20:23:10
+ years, days = divmod(crawl_time.days, 365.25)
+ years = int(years)
+ days = int(days)
date=""
- m, s = divmod(self.crawlTime.seconds, 60)
+ m, s = divmod(crawl_time.seconds, 60)
h, m = divmod(m, 60)
- if years!=0 :
- date+=str(years)+" year "
- if days!=0 :
- date+=str(days)+" day "
- if h!=0 :
- date+=str(h)+" H : "
- if m!=0 or h!=0 :
- date+=str(m)+" M : "
-
- date+=str(s)+" S"
- self.crawlTime=date
- str_info+="\nCrawl Time : %s" %(str(self.crawlTime))
- str_info+="\n\0"
- return str_info
+ if years != 0:
+ date += "%s %s " % (years, "year" if years == 1 else "years")
+ if days != 0:
+ date += "%s %s " % (days, "day" if days == 1 else "days")
+
+ date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2))
+ return date
def checkpt_service(self, chan, chkpt, tgt):
"""checkpoint service loop
@@ -517,7 +634,7 @@ class GMasterBase(object):
try:
conn, _ = chan.accept()
try:
- conn.send(" | checkpoint %s %s %s" % (chkpt, status,self.get_extra_info()))
+ conn.send(" | checkpoint %s %s %s" % (chkpt, status, self.get_extra_info()))
except:
exc = sys.exc_info()[1]
if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \
@@ -536,7 +653,7 @@ class GMasterBase(object):
):
return
chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- state_socket = os.path.join(gconf.socketdir, md5(gconf.state_socket_unencoded).hexdigest() + ".socket")
+ state_socket = os.path.join(gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket")
try:
os.unlink(state_socket)
except:
@@ -559,22 +676,6 @@ class GMasterBase(object):
t.start()
self.checkpoint_thread = t
- def crawl_loop(self):
- """start the keep-alive thread and iterate .crawl"""
- timo = int(gconf.timeout or 0)
- if timo > 0:
- def keep_alive():
- while True:
- vi, gap = self.keepalive_payload_hook(timo, timo * 0.5)
- self.slave.server.keep_alive(vi)
- time.sleep(gap)
- t = Thread(target=keep_alive)
- t.start()
- self.lastreport['time'] = time.time()
- self.crawlStartTime=datetime.now()
- while not self.terminate:
- self.crawl()
-
def add_job(self, path, label, job, *a, **kw):
"""insert @job function to job table at @path with @label"""
if self.jobtab.get(path) == None:
@@ -600,7 +701,7 @@ class GMasterBase(object):
ret = j[-1]()
if not ret:
succeed = False
- if succeed:
+ if succeed and not args[0] == None:
self.sendmark(path, *args)
return succeed
@@ -653,6 +754,319 @@ class GMasterBase(object):
tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate))
return newstate, param.state_change
+class GMasterChangelogMixin(GMasterCommon):
+ """ changelog based change detection and syncing """
+
+ # index for change type and entry
+ IDX_START = 0
+ IDX_END = 2
+
+ POS_GFID = 0
+ POS_TYPE = 1
+ POS_ENTRY1 = 2
+ POS_ENTRY2 = 3 # renames
+
+ _CL_TYPE_DATA_PFX = "D "
+ _CL_TYPE_METADATA_PFX = "M "
+ _CL_TYPE_ENTRY_PFX = "E "
+
+ TYPE_GFID = [_CL_TYPE_DATA_PFX] # ignoring metadata ops
+ TYPE_ENTRY = [_CL_TYPE_ENTRY_PFX]
+
+ # flat directory heirarchy for gfid based access
+ FLAT_DIR_HIERARCHY = '.'
+
+ def fallback_xsync(self):
+ logging.info('falling back to xsync mode')
+ gconf.configinterface.set('change-detector', 'xsync')
+ selfkill()
+
+ def setup_working_dir(self):
+ workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path))
+ logfile = os.path.join(workdir, 'changes.log')
+ logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile))
+ return (workdir, logfile)
+
+ def lstat(self, e):
+ try:
+ return os.lstat(e)
+ except (IOError, OSError):
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ return ex.errno
+ else:
+ raise
+
+ # sync data
+ def syncdata(self, datas):
+ logging.debug('datas: %s' % (datas))
+ for data in datas:
+ logging.debug('candidate for syncing %s' % data)
+ pb = self.syncer.add(data)
+ timeA = datetime.now()
+ def regjob(se, xte, pb):
+ rv = pb.wait()
+ if rv[0]:
+ logging.debug('synced ' + se)
+ # update stats
+ timeB = datetime.now()
+ self.crawl_stats['last_synctime'] = timeB - timeA
+ self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
+ self.crawl_stats['files_synced'] += 1
+ self.crawl_stats['bytes_synced'] += self.syncer.bytes_synced
+
+ # cumulative statistics
+ self.total_crawl_stats['bytes_synced'] += self.syncer.bytes_synced
+ self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
+ self.total_crawl_stats['files_synced'] += 1
+ return True
+ else:
+ if rv[1] in [23, 24]:
+ # stat to check if the file exist
+ st = self.lstat(se)
+ if isinstance(st, int):
+ # file got unlinked in the interim
+ return True
+ logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))
+ self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb)
+ if self.wait(self.FLAT_DIR_HIERARCHY, None):
+ self.update_crawl_data()
+ return True
+
+ def process_change(self, change, done):
+ clist = []
+ entries = []
+ purges = set()
+ links = set()
+ datas = set()
+ pfx = gauxpfx()
+ try:
+ f = open(change, "r")
+ clist = f.readlines()
+ f.close()
+ except IOError:
+ raise
+
+ def edct(op, **ed):
+ dct = {}
+ dct['op'] = op
+ for k in ed:
+ if k == 'stat':
+ st = ed[k]
+ dst = dct['stat'] = {}
+ dst['uid'] = st.st_uid
+ dst['gid'] = st.st_gid
+ dst['mode'] = st.st_mode
+ else:
+ dct[k] = ed[k]
+ return dct
+ for e in clist:
+ e = e.strip()
+ et = e[self.IDX_START:self.IDX_END]
+ ec = e[self.IDX_END:].split(' ')
+ if et in self.TYPE_ENTRY:
+ ty = ec[self.POS_TYPE]
+ en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1]))
+ gfid = ec[self.POS_GFID]
+ # definitely need a better way bucketize entry ops
+ if ty in ['UNLINK', 'RMDIR']:
+ entries.append(edct(ty, gfid=gfid, entry=en))
+ purges.update([os.path.join(pfx, gfid)])
+ continue
+ if not ty == 'RENAME':
+ go = os.path.join(pfx, gfid)
+ st = self.lstat(go)
+ if isinstance(st, int):
+ logging.debug('file %s got purged in the interim' % go)
+ continue
+ if ty in ['CREATE', 'MKDIR', 'MKNOD']:
+ entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
+ elif ty == 'LINK':
+ entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
+ links.update([os.path.join(pfx, gfid)])
+ elif ty == 'SYMLINK':
+ entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=os.readlink(en)))
+ elif ty == 'RENAME':
+ e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2]))
+ entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2))
+ else:
+ pass
+ elif et in self.TYPE_GFID:
+ da = os.path.join(pfx, ec[0])
+ st = self.lstat(da)
+ if isinstance(st, int):
+ logging.debug('file %s got purged in the interim' % da)
+ continue
+ datas.update([da])
+ logging.debug('entries: %s' % repr(entries))
+ # sync namespace
+ if (entries):
+ self.slave.server.entry_ops(entries)
+ # sync data
+ if self.syncdata(datas - (purges - links)):
+ if done:
+ self.master.server.changelog_done(change)
+ return True
+
+ def process(self, changes, done=1):
+ for change in changes:
+ times = 0
+ while True:
+ times += 1
+ logging.debug('processing change %s [%d time(s)]' % (change, times))
+ if self.process_change(change, done):
+ break
+ # it's either entry_ops() or Rsync that failed to do it's
+ # job. Mostly it's entry_ops() [which currently has a problem
+ # of failing to create an entry but failing to return an errno]
+ # Therefore we do not know if it's either Rsync or the freaking
+ # entry_ops() that failed... so we retry the _whole_ changelog
+ # again.
+ # TODO: remove entry retries when it's gets fixed.
+ logging.warn('incomplete sync, retrying changelog: %s' % change)
+ time.sleep(0.5)
+ self.turns += 1
+
+ def upd_stime(self, stime):
+ if stime:
+ self.sendmark(self.FLAT_DIR_HIERARCHY, stime)
+
+ def crawl(self):
+ changes = []
+ try:
+ self.master.server.changelog_scan()
+ self.crawls += 1
+ except OSError:
+ self.fallback_xsync()
+ changes = self.master.server.changelog_getchanges()
+ if changes:
+ xtl = self.xtime(self.FLAT_DIR_HIERARCHY)
+ if isinstance(xtl, int):
+ raise GsyncdError('master is corrupt')
+ logging.debug('processing changes %s' % repr(changes))
+ self.process(changes)
+ self.upd_stime(xtl)
+
+ def register(self):
+ (workdir, logfile) = self.setup_working_dir()
+ self.sleep_interval = int(gconf.change_interval)
+ # register with the changelog library
+ try:
+ # 9 == log level (DEBUG)
+ # 5 == connection retries
+ self.master.server.changelog_register(gconf.local_path,
+ workdir, logfile, 9, 5)
+ except OSError:
+ self.fallback_xsync()
+ # control should not reach here
+ raise
+
+class GMasterXsyncMixin(GMasterChangelogMixin):
+ """
+
+ This crawl needs to be xtime based (as of now
+ it's not. this is beacuse we generate CHANGELOG
+ file during each crawl which is then processed
+ by process_change()).
+ For now it's used as a one-shot initial sync
+ mechanism and only syncs directories, regular
+ files and symlinks.
+ """
+
+ def register(self):
+ self.sleep_interval = 60
+ self.tempdir = self.setup_working_dir()[0]
+ self.tempdir = os.path.join(self.tempdir, 'xsync')
+ logging.info('xsync temp directory: %s' % self.tempdir)
+ try:
+ os.makedirs(self.tempdir)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == EEXIST and os.path.isdir(self.tempdir):
+ pass
+ else:
+ raise
+
+ def write_entry_change(self, prefix, data=[]):
+ self.fh.write("%s %s\n" % (prefix, ' '.join(data)))
+
+ def open(self):
+ try:
+ self.xsync_change = os.path.join(self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time())))
+ self.fh = open(self.xsync_change, 'w')
+ except IOError:
+ raise
+
+ def close(self):
+ self.fh.close()
+
+ def fname(self):
+ return self.xsync_change
+
+ def crawl(self, path='.', xtr=None, done=0):
+ """ generate a CHANGELOG file consumable by process_change """
+ if path == '.':
+ self.open()
+ self.crawls += 1
+ if not xtr:
+ # get the root stime and use it for all comparisons
+ xtr = self.xtime('.', self.slave)
+ if isinstance(xtr, int):
+ if xtr != ENOENT:
+ raise GsyncdError('slave is corrupt')
+ xtr = self.minus_infinity
+ xtl = self.xtime(path)
+ if isinstance(xtl, int):
+ raise GsyncdError('master is corrupt')
+ if xtr == xtl:
+ if path == '.':
+ self.close()
+ return
+ self.xtime_reversion_hook(path, xtl, xtr)
+ logging.debug("entering " + path)
+ dem = self.master.server.entries(path)
+ pargfid = self.master.server.gfid(path)
+ if isinstance(pargfid, int):
+ logging.warn('skipping directory %s' % (path))
+ for e in dem:
+ bname = e
+ e = os.path.join(path, e)
+ st = self.lstat(e)
+ if isinstance(st, int):
+ logging.warn('%s got purged in the interim..' % e)
+ continue
+ gfid = self.master.server.gfid(e)
+ if isinstance(gfid, int):
+ logging.warn('skipping entry %s..' % (e))
+ continue
+ xte = self.xtime(e)
+ if isinstance(xte, int):
+ raise GsyncdError('master is corrupt')
+ if not self.need_sync(e, xte, xtr):
+ continue
+ mo = st.st_mode
+ if stat.S_ISDIR(mo):
+ self.write_entry_change("E", [gfid, 'MKDIR', escape(os.path.join(pargfid, bname))])
+ self.crawl(e, xtr)
+ elif stat.S_ISREG(mo):
+ self.write_entry_change("E", [gfid, 'CREATE', escape(os.path.join(pargfid, bname))])
+ self.write_entry_change("D", [gfid])
+ elif stat.S_ISLNK(mo):
+ self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))])
+ else:
+ logging.info('ignoring %s' % e)
+ if path == '.':
+ logging.info('processing xsync changelog %s' % self.fname())
+ self.close()
+ self.process([self.fname()], done)
+ self.upd_stime(xtl)
+
+class GMasterXtimeMixin(GMasterCommon):
+ """ xtime based change detection and syncing """
+
+ def register(self):
+ pass
+
def crawl(self, path='.', xtl=None):
"""crawling...
@@ -691,46 +1105,6 @@ class GMasterBase(object):
assert that the file systems (master / slave) underneath do not change and actions
taken upon some condition will not lose their context by the time they are performed.
"""
- if path == '.':
- if self.start:
- self.crawls += 1
- logging.debug("... crawl #%d done, took %.6f seconds" % \
- (self.crawls, time.time() - self.start))
- time.sleep(1)
- self.start = time.time()
- should_display_info = self.start - self.lastreport['time'] >= 60
- if should_display_info:
- logging.info("completed %d crawls, %d turns",
- self.crawls - self.lastreport['crawls'],
- self.turns - self.lastreport['turns'])
- self.lastreport.update(crawls = self.crawls,
- turns = self.turns,
- time = self.start)
- volinfo_sys, state_change = self.volinfo_hook()
- if self.inter_master:
- self.volinfo = volinfo_sys[self.KFGN]
- else:
- self.volinfo = volinfo_sys[self.KNAT]
- if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master):
- logging.info('new master is %s', self.uuid)
- if self.volinfo:
- logging.info("%s master with volume id %s ..." % \
- (self.inter_master and "intermediate" or "primary",
- self.uuid))
- if state_change == self.KFGN:
- gconf.configinterface.set('volume_id', self.uuid)
- if self.volinfo:
- if self.volinfo['retval']:
- raise GsyncdError ("master is corrupt")
- self.start_checkpoint_thread()
- else:
- if should_display_info or self.crawls == 0:
- if self.inter_master:
- logging.info("waiting for being synced from %s ..." % \
- self.volinfo_state[self.KFGN]['uuid'])
- else:
- logging.info("waiting for volume info ...")
- return
logging.debug("entering " + path)
if not xtl:
xtl = self.xtime(path)
@@ -806,6 +1180,7 @@ class GMasterBase(object):
st = indulgently(e, lambda e: os.lstat(e))
if st == False:
continue
+
mo = st.st_mode
adct = {'own': (st.st_uid, st.st_gid)}
if stat.S_ISLNK(mo):
@@ -815,16 +1190,19 @@ class GMasterBase(object):
elif stat.S_ISREG(mo):
logging.debug("syncing %s ..." % e)
pb = self.syncer.add(e)
- timeA=datetime.now()
+ timeA = datetime.now()
def regjob(e, xte, pb):
- if pb.wait():
+ if pb.wait()[0]:
logging.debug("synced " + e)
self.sendmark_regular(e, xte)
-
- timeB=datetime.now()
- self.lastSyncTime=timeB-timeA
- self.syncTime=(self.syncTime+self.lastSyncTime.microseconds)/(10.0**6)
- self.filesSynced=self.filesSynced+1
+ # update stats
+ timeB = datetime.now()
+ self.crawl_stats['last_synctime'] = timeB - timeA
+ self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
+ self.crawl_stats['files_synced'] += 1
+ self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
+ self.total_crawl_stats['files_synced'] += 1
+ self.update_crawl_data()
return True
else:
logging.warn("failed to sync " + e)
@@ -841,6 +1219,7 @@ class GMasterBase(object):
if path == '.':
self.wait(path, xtl)
+
class BoxClosedErr(Exception):
pass
@@ -920,7 +1299,7 @@ class Syncer(object):
self.slave = slave
self.lock = Lock()
self.pb = PostBox()
- self.bytesSynced=0
+ self.bytes_synced = 0
for i in range(int(gconf.sync_jobs)):
t = Thread(target=self.syncjob)
t.start()
@@ -940,13 +1319,10 @@ class Syncer(object):
pb.close()
po = self.slave.rsync(pb)
if po.returncode == 0:
- regEx=re.search('\ *total\ *transferred\ *file\ *size:\ *(\d+)\ *bytes\ *',po.stdout.read(),re.IGNORECASE)
- if regEx:
- self.bytesSynced+=(int(regEx.group(1)))/1024
- ret = True
+ ret = (True, 0)
elif po.returncode in (23, 24):
# partial transfer (cf. rsync(1)), that's normal
- ret = False
+ ret = (False, po.returncode)
else:
po.errfail()
pb.wakeup(ret)