summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py799
1 files changed, 573 insertions, 226 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 95810a61e..721fe18bd 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -10,15 +10,16 @@ import socket
import string
import errno
from shutil import copyfileobj
-from errno import ENOENT, ENODATA, EPIPE, EEXIST
+from errno import ENOENT, ENODATA, EPIPE, EEXIST, errorcode
from threading import currentThread, Condition, Lock
from datetime import datetime
+from libcxattr import Xattr
from gconf import gconf
from tempfile import mkdtemp, NamedTemporaryFile
from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \
unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \
- lstat, errno_wrap
+ lstat, errno_wrap, update_file
URXTIME = (-1, 0)
@@ -59,7 +60,8 @@ def gmaster_builder(excrawl=None):
crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin
purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin
- class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin):
+ syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine
+ class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin, syncengine):
pass
return _GMaster
@@ -101,14 +103,17 @@ class NormalMixin(object):
if not 'default_xtime' in opts:
opts['default_xtime'] = URXTIME
- def xtime_low(self, server, path, **opts):
- xt = server.xtime(path, self.uuid)
+ def xtime_low(self, rsc, path, **opts):
+ if rsc == self.master:
+ xt = rsc.server.xtime(path, self.uuid)
+ else:
+ xt = rsc.server.stime(path, self.uuid)
if isinstance(xt, int) and xt != ENODATA:
return xt
if xt == ENODATA or xt < self.volmark:
if opts['create']:
xt = _xtime_now()
- server.aggregated.set_xtime(path, self.uuid, xt)
+ rsc.server.aggregated.set_xtime(path, self.uuid, xt)
else:
xt = opts['default_xtime']
return xt
@@ -140,7 +145,7 @@ class NormalMixin(object):
return xte > xtrd
def set_slave_xtime(self, path, mark):
- self.slave.server.set_xtime(path, self.uuid, mark)
+ self.slave.server.set_stime(path, self.uuid, mark)
self.slave.server.set_xtime_remote(path, self.uuid, mark)
class PartialMixin(NormalMixin):
@@ -190,6 +195,65 @@ class PurgeNoopMixin(object):
def purge_missing(self, path, names):
pass
+class TarSSHEngine(object):
+ """Sync engine that uses tar(1) piped over ssh(1)
+ for data transfers. Good for lots of small files.
+ """
+ def a_syncdata(self, files):
+ logging.debug('files: %s' % (files))
+ for f in files:
+ pb = self.syncer.add(f)
+ def regjob(se, xte, pb):
+ rv = pb.wait()
+ if rv[0]:
+ logging.debug('synced ' + se)
+ return True
+ else:
+ # stat check for file presence
+ st = lstat(se)
+ if isinstance(st, int):
+ return True
+ logging.warn('tar+ssh: %s [errcode: %d]' % (se, rv[1]))
+ self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
+
+ def syncdata_wait(self):
+ if self.wait(self.FLAT_DIR_HIERARCHY, None):
+ return True
+
+ def syncdata(self, files):
+ self.a_syncdata(files)
+ self.syncdata_wait()
+
+class RsyncEngine(object):
+ """Sync engine that uses rsync(1) for data transfers"""
+ def a_syncdata(self, files):
+ logging.debug('files: %s' % (files))
+ for f in files:
+ logging.debug('candidate for syncing %s' % f)
+ pb = self.syncer.add(f)
+ def regjob(se, xte, pb):
+ rv = pb.wait()
+ if rv[0]:
+ logging.debug('synced ' + se)
+ return True
+ else:
+ if rv[1] in [23, 24]:
+ # stat to check if the file exist
+ st = lstat(se)
+ if isinstance(st, int):
+ # file got unlinked in the interim
+ return True
+ logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))
+ self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
+
+ def syncdata_wait(self):
+ if self.wait(self.FLAT_DIR_HIERARCHY, None):
+ return True
+
+ def syncdata(self, files):
+ self.a_syncdata(files)
+ self.syncdata_wait()
+
class GMasterCommon(object):
"""abstract class impementling master role"""
@@ -234,7 +298,7 @@ class GMasterCommon(object):
else:
rsc = self.master
self.make_xtime_opts(rsc == self.master, opts)
- return self.xtime_low(rsc.server, path, **opts)
+ return self.xtime_low(rsc, path, **opts)
def get_initial_crawl_data(self):
# while persisting only 'files_syncd' is non-zero, rest of
@@ -243,18 +307,26 @@ class GMasterCommon(object):
default_data = {'files_syncd': 0,
'files_remaining': 0,
'bytes_remaining': 0,
- 'purges_remaining': 0}
+ 'purges_remaining': 0,
+ 'total_files_skipped': 0}
if getattr(gconf, 'state_detail_file', None):
try:
- return json.load(open(gconf.state_detail_file))
- except (IOError, OSError):
+ with open(gconf.state_detail_file, 'r+') as f:
+ loaded_data= json.load(f)
+ diff_data = set(default_data) - set (loaded_data)
+ if len(diff_data):
+ for i in diff_data:
+ loaded_data[i] = default_data[i]
+ return loaded_data
+ except (IOError):
ex = sys.exc_info()[1]
- if ex.errno == ENOENT:
- # Create file with initial data
+ logging.warn ('Creating new gconf.state_detail_file.')
+ # Create file with initial data
+ try:
with open(gconf.state_detail_file, 'wb') as f:
json.dump(default_data, f)
return default_data
- else:
+ except:
raise
return default_data
@@ -264,6 +336,8 @@ class GMasterCommon(object):
same_dir = os.path.dirname(gconf.state_detail_file)
with NamedTemporaryFile(dir=same_dir, delete=False) as tmp:
json.dump(self.total_crawl_stats, tmp)
+ tmp.flush()
+ os.fsync(tmp.fileno())
os.rename(tmp.name, gconf.state_detail_file)
except (IOError, OSError):
raise
@@ -272,7 +346,13 @@ class GMasterCommon(object):
self.master = master
self.slave = slave
self.jobtab = {}
- self.syncer = Syncer(slave)
+ if boolify(gconf.use_tarssh):
+ logging.info("using 'tar over ssh' as the sync engine")
+ self.syncer = Syncer(slave, self.slave.tarssh)
+ else:
+ logging.info("using 'rsync' as the sync engine")
+ # partial transfer (cf. rsync(1)), that's normal
+ self.syncer = Syncer(slave, self.slave.rsync, [23, 24])
# crawls vs. turns:
# - self.crawls is simply the number of crawl() invocations on root
# - one turn is a maximal consecutive sequence of crawls so that each
@@ -294,6 +374,8 @@ class GMasterCommon(object):
self.terminate = False
self.sleep_interval = 1
self.checkpoint_thread = None
+ self.current_files_skipped_count = 0
+ self.skipped_gfid_list = []
def init_keep_alive(cls):
"""start the keep-alive thread """
@@ -336,7 +418,8 @@ class GMasterCommon(object):
gconf.configinterface.set('volume_id', self.uuid)
if self.volinfo:
if self.volinfo['retval']:
- raise GsyncdError("master is corrupt")
+ logging.warn("master cluster's info may not be valid %d" % \
+ self.volinfo['retval'])
self.start_checkpoint_thread()
else:
raise GsyncdError("master volinfo unavailable")
@@ -349,7 +432,7 @@ class GMasterCommon(object):
while not self.terminate:
if self.start:
logging.debug("... crawl #%d done, took %.6f seconds" % \
- (self.crawls, time.time() - self.start))
+ (self.crawls, time.time() - self.start))
self.start = time.time()
should_display_info = self.start - self.lastreport['time'] >= 60
if should_display_info:
@@ -363,9 +446,20 @@ class GMasterCommon(object):
if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds
crawl = self.should_crawl()
t0 = t1
+ self.update_worker_remote_node()
if not crawl:
+ self.update_worker_health("Passive")
+ # bring up _this_ brick to the cluster stime
+ # which is min of cluster (but max of the replicas)
+ brick_stime = self.xtime('.', self.slave)
+ cluster_stime = self.master.server.aggregated.stime_mnt('.', '.'.join([str(self.uuid), str(gconf.slave_id)]))
+ logging.debug("Cluster stime: %s | Brick stime: %s" % (repr(cluster_stime), repr(brick_stime)))
+ if not isinstance(cluster_stime, int):
+ if brick_stime < cluster_stime:
+ self.slave.server.set_stime(self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
time.sleep(5)
continue
+ self.update_worker_health("Active")
self.crawl()
if oneshot:
return
@@ -375,7 +469,7 @@ class GMasterCommon(object):
def _checkpt_param(cls, chkpt, prm, xtimish=True):
"""use config backend to lookup a parameter belonging to
checkpoint @chkpt"""
- cprm = getattr(gconf, 'checkpoint_' + prm, None)
+ cprm = gconf.configinterface.get_realtime('checkpoint_' + prm)
if not cprm:
return
chkpt_mapped, val = cprm.split(':', 1)
@@ -402,17 +496,6 @@ class GMasterCommon(object):
ts += '.' + str(tpair[1])
return ts
- def get_extra_info(self):
- str_info = '\nUptime=%s;FilesSyncd=%d;FilesPending=%d;BytesPending=%d;DeletesPending=%d;' % \
- (self._crawl_time_format(datetime.now() - self.crawl_start), \
- self.total_crawl_stats['files_syncd'], \
- self.total_crawl_stats['files_remaining'], \
- self.total_crawl_stats['bytes_remaining'], \
- self.total_crawl_stats['purges_remaining'])
- str_info += '\0'
- logging.debug(str_info)
- return str_info
-
def _crawl_time_format(self, crawl_time):
# Ex: 5 years, 4 days, 20:23:10
years, days = divmod(crawl_time.days, 365.25)
@@ -431,27 +514,49 @@ class GMasterCommon(object):
date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2))
return date
- def checkpt_service(self, chan, chkpt, tgt):
+ def checkpt_service(self, chan, chkpt):
"""checkpoint service loop
monitor and verify checkpoint status for @chkpt, and listen
for incoming requests for whom we serve a pretty-formatted
status report"""
- if not chkpt:
- # dummy loop for the case when there is no checkpt set
- while True:
+ while True:
+ chkpt = gconf.configinterface.get_realtime("checkpoint")
+ if not chkpt:
+ gconf.configinterface.delete("checkpoint_completed")
+ gconf.configinterface.delete("checkpoint_target")
+ # dummy loop for the case when there is no checkpt set
select([chan], [], [])
conn, _ = chan.accept()
- conn.send(self.get_extra_info())
+ conn.send('\0')
conn.close()
- completed = self._checkpt_param(chkpt, 'completed', xtimish=False)
- if completed:
- completed = tuple(int(x) for x in completed.split('.'))
- while True:
+ continue
+
+ checkpt_tgt = self._checkpt_param(chkpt, 'target')
+ if not checkpt_tgt:
+ checkpt_tgt = self.xtime('.')
+ if isinstance(checkpt_tgt, int):
+ raise GsyncdError("master root directory is unaccessible (%s)",
+ os.strerror(checkpt_tgt))
+ self._set_checkpt_param(chkpt, 'target', checkpt_tgt)
+ logging.debug("checkpoint target %s has been determined for checkpoint %s" % \
+ (repr(checkpt_tgt), chkpt))
+
+ # check if the label is 'now'
+ chkpt_lbl = chkpt
+ try:
+ x1,x2 = chkpt.split(':')
+ if x1 == 'now':
+ chkpt_lbl = "as of " + self.humantime(x2)
+ except:
+ pass
+ completed = self._checkpt_param(chkpt, 'completed', xtimish=False)
+ if completed:
+ completed = tuple(int(x) for x in completed.split('.'))
s,_,_ = select([chan], [], [], (not completed) and 5 or None)
# either request made and we re-check to not
# give back stale data, or we still hunting for completion
- if self.native_xtime(tgt) and self.native_xtime(tgt) < self.volmark:
+ if self.native_xtime(checkpt_tgt) and self.native_xtime(checkpt_tgt) < self.volmark:
# indexing has been reset since setting the checkpoint
status = "is invalid"
else:
@@ -459,12 +564,12 @@ class GMasterCommon(object):
if isinstance(xtr, int):
raise GsyncdError("slave root directory is unaccessible (%s)",
os.strerror(xtr))
- ncompleted = self.xtime_geq(xtr, tgt)
+ ncompleted = self.xtime_geq(xtr, checkpt_tgt)
if completed and not ncompleted: # stale data
logging.warn("completion time %s for checkpoint %s became stale" % \
(self.humantime(*completed), chkpt))
completed = None
- gconf.confdata.delete('checkpoint-completed')
+ gconf.configinterface.delete('checkpoint_completed')
if ncompleted and not completed: # just reaching completion
completed = "%.6f" % time.time()
self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False)
@@ -478,7 +583,7 @@ class GMasterCommon(object):
try:
conn, _ = chan.accept()
try:
- conn.send(" | checkpoint %s %s %s" % (chkpt, status, self.get_extra_info()))
+ conn.send("checkpoint %s is %s\0" % (chkpt_lbl, status))
except:
exc = sys.exc_info()[1]
if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \
@@ -505,18 +610,8 @@ class GMasterCommon(object):
pass
chan.bind(state_socket)
chan.listen(1)
- checkpt_tgt = None
- if gconf.checkpoint:
- checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target')
- if not checkpt_tgt:
- checkpt_tgt = self.xtime('.')
- if isinstance(checkpt_tgt, int):
- raise GsyncdError("master root directory is unaccessible (%s)",
- os.strerror(checkpt_tgt))
- self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt)
- logging.debug("checkpoint target %s has been determined for checkpoint %s" % \
- (repr(checkpt_tgt), gconf.checkpoint))
- t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt))
+ chkpt = gconf.configinterface.get_realtime("checkpoint")
+ t = Thread(target=self.checkpt_service, args=(chan, chkpt))
t.start()
self.checkpoint_thread = t
@@ -567,15 +662,11 @@ class GMasterChangelogMixin(GMasterCommon):
POS_GFID = 0
POS_TYPE = 1
- POS_ENTRY1 = 2
- POS_ENTRY2 = 3 # renames
-
- _CL_TYPE_DATA_PFX = "D "
- _CL_TYPE_METADATA_PFX = "M "
- _CL_TYPE_ENTRY_PFX = "E "
+ POS_ENTRY1 = -1
- TYPE_GFID = [_CL_TYPE_DATA_PFX] # ignoring metadata ops
- TYPE_ENTRY = [_CL_TYPE_ENTRY_PFX]
+ TYPE_META = "M "
+ TYPE_GFID = "D "
+ TYPE_ENTRY = "E "
# flat directory heirarchy for gfid based access
FLAT_DIR_HIERARCHY = '.'
@@ -594,39 +685,11 @@ class GMasterChangelogMixin(GMasterCommon):
logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile))
return (workdir, logfile)
- # update stats from *this* crawl
- def update_cumulative_stats(self, files_pending):
- self.total_crawl_stats['files_remaining'] = files_pending['count']
- self.total_crawl_stats['bytes_remaining'] = files_pending['bytes']
- self.total_crawl_stats['purges_remaining'] = files_pending['purge']
-
- # sync data
- def syncdata(self, datas):
- logging.debug('datas: %s' % (datas))
- for data in datas:
- logging.debug('candidate for syncing %s' % data)
- pb = self.syncer.add(data)
- def regjob(se, xte, pb):
- rv = pb.wait()
- if rv[0]:
- logging.debug('synced ' + se)
- return True
- else:
- if rv[1] in [23, 24]:
- # stat to check if the file exist
- st = lstat(se)
- if isinstance(st, int):
- # file got unlinked in the interim
- return True
- logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))
- self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb)
- if self.wait(self.FLAT_DIR_HIERARCHY, None):
- return True
-
def process_change(self, change, done, retry):
pfx = gauxpfx()
clist = []
entries = []
+ meta_gfid = set()
datas = set()
# basic crawl stats: files and bytes
@@ -652,136 +715,351 @@ class GMasterChangelogMixin(GMasterCommon):
dct[k] = ed[k]
return dct
- # regular file update: bytes & count
- def _update_reg(entry, size):
- if not entry in files_pending['files']:
- files_pending['count'] += 1
- files_pending['bytes'] += size
- files_pending['files'].append(entry)
- # updates for directories, symlinks etc..
- def _update_rest():
+ # entry counts (not purges)
+ def entry_update():
files_pending['count'] += 1
- # entry count
- def entry_update(entry, size, mode):
- if stat.S_ISREG(mode):
- _update_reg(entry, size)
- else:
- _update_rest()
# purge count
def purge_update():
files_pending['purge'] += 1
for e in clist:
e = e.strip()
- et = e[self.IDX_START:self.IDX_END]
- ec = e[self.IDX_END:].split(' ')
- if et in self.TYPE_ENTRY:
+ et = e[self.IDX_START:self.IDX_END] # entry type
+ ec = e[self.IDX_END:].split(' ') # rest of the bits
+
+ if et == self.TYPE_ENTRY:
+ # extract information according to the type of
+ # the entry operation. create(), mkdir() and mknod()
+ # have mode, uid, gid information in the changelog
+ # itself, so no need to stat()...
ty = ec[self.POS_TYPE]
+
+ # PARGFID/BNAME
en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1]))
+ # GFID of the entry
gfid = ec[self.POS_GFID]
- # definitely need a better way bucketize entry ops
+
if ty in ['UNLINK', 'RMDIR']:
purge_update()
entries.append(edct(ty, gfid=gfid, entry=en))
- continue
- go = os.path.join(pfx, gfid)
- st = lstat(go)
- if isinstance(st, int):
- if ty == 'RENAME':
- entries.append(edct('UNLINK', gfid=gfid, entry=en))
- else:
- logging.debug('file %s got purged in the interim' % go)
- continue
- entry_update(go, st.st_size, st.st_mode)
- if ty in ['CREATE', 'MKDIR', 'MKNOD']:
- entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
- elif ty == 'LINK':
- entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
- elif ty == 'SYMLINK':
- rl = errno_wrap(os.readlink, [en], [ENOENT])
- if isinstance(rl, int):
- continue
- entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
- elif ty == 'RENAME':
- e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2]))
- entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2, stat=st))
+ elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
+ entry_update()
+ # stat information present in the changelog itself
+ entries.append(edct(ty, gfid=gfid, entry=en, mode=int(ec[2]),\
+ uid=int(ec[3]), gid=int(ec[4])))
else:
- logging.warn('ignoring %s [op %s]' % (gfid, ty))
- elif et in self.TYPE_GFID:
- go = os.path.join(pfx, ec[0])
- st = lstat(go)
- if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % go)
- continue
- entry_update(go, st.st_size, st.st_mode)
- datas.update([go])
+ # stat() to get mode and other information
+ go = os.path.join(pfx, gfid)
+ st = lstat(go)
+ if isinstance(st, int):
+ if ty == 'RENAME': # special hack for renames...
+ entries.append(edct('UNLINK', gfid=gfid, entry=en))
+ else:
+ logging.debug('file %s got purged in the interim' % go)
+ continue
+
+ if ty == 'LINK':
+ entry_update()
+ entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
+ elif ty == 'SYMLINK':
+ rl = errno_wrap(os.readlink, [en], [ENOENT])
+ if isinstance(rl, int):
+ continue
+ entry_update()
+ entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
+ elif ty == 'RENAME':
+ entry_update()
+ e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
+ entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st))
+ else:
+ logging.warn('ignoring %s [op %s]' % (gfid, ty))
+ elif et == self.TYPE_GFID:
+ datas.add(os.path.join(pfx, ec[0]))
+ elif et == self.TYPE_META:
+ if ec[1] == 'SETATTR': # only setattr's for now...
+ meta_gfid.add(os.path.join(pfx, ec[0]))
+ else:
+ logging.warn('got invalid changelog type: %s' % (et))
logging.debug('entries: %s' % repr(entries))
if not retry:
- self.update_cumulative_stats(files_pending)
+ self.update_worker_cumilitive_status(files_pending)
# sync namespace
if (entries):
self.slave.server.entry_ops(entries)
+ # sync metadata
+ if (meta_gfid):
+ meta_entries = []
+ for go in meta_gfid:
+ st = lstat(go)
+ if isinstance(st, int):
+ logging.debug('file %s got purged in the interim' % go)
+ continue
+ meta_entries.append(edct('META', go=go, stat=st))
+ if meta_entries:
+ self.slave.server.meta_ops(meta_entries)
# sync data
- if self.syncdata(datas):
- if done:
- self.master.server.changelog_done(change)
- return True
-
- def sync_done(self):
- self.total_crawl_stats['files_syncd'] += self.total_crawl_stats['files_remaining']
- self.total_crawl_stats['files_remaining'] = 0
- self.total_crawl_stats['bytes_remaining'] = 0
- self.total_crawl_stats['purges_remaining'] = 0
- self.update_crawl_data()
+ if datas:
+ self.a_syncdata(datas)
def process(self, changes, done=1):
- for change in changes:
- tries = 0
- retry = False
- while True:
- logging.debug('processing change %s' % change)
- if self.process_change(change, done, retry):
- self.sync_done()
- break
- retry = True
- tries += 1
- if tries == self.MAX_RETRIES:
- logging.warn('changelog %s could not be processed - moving on...' % os.path.basename(change))
- self.sync_done()
- if done:
- self.master.server.changelog_done(change)
- break
- # it's either entry_ops() or Rsync that failed to do it's
- # job. Mostly it's entry_ops() [which currently has a problem
- # of failing to create an entry but failing to return an errno]
- # Therefore we do not know if it's either Rsync or the freaking
- # entry_ops() that failed... so we retry the _whole_ changelog
- # again.
- # TODO: remove entry retries when it's gets fixed.
- logging.warn('incomplete sync, retrying changelog: %s' % change)
- time.sleep(0.5)
- self.turns += 1
+ tries = 0
+ retry = False
- def upd_stime(self, stime):
+ while True:
+ self.skipped_gfid_list = []
+ self.current_files_skipped_count = 0
+
+ # first, fire all changelog transfers in parallel. entry and metadata
+ # are performed synchronously, therefore in serial. However at the end
+ # of each changelog, data is synchronized with syncdata_async() - which
+ # means it is serial w.r.t entries/metadata of that changelog but
+ # happens in parallel with data of other changelogs.
+
+ for change in changes:
+ logging.debug('processing change %s' % change)
+ self.process_change(change, done, retry)
+ if not retry:
+ self.turns += 1 # number of changelogs processed in the batch
+
+ # Now we wait for all the data transfers fired off in the above step
+ # to complete. Note that this is not ideal either. Ideally we want to
+ # trigger the entry/meta-data transfer of the next batch while waiting
+ # for the data transfer of the current batch to finish.
+
+ # Note that the reason to wait for the data transfer (vs doing it
+ # completely in the background and call the changelog_done()
+ # asynchronously) is because this waiting acts as a "backpressure"
+ # and prevents a spiraling increase of wait stubs from consuming
+ # unbounded memory and resources.
+
+ # update the slave's time with the timestamp of the _last_ changelog
+ # file time suffix. Since, the changelog prefix time is the time when
+ # the changelog was rolled over, introduce a tolerence of 1 second to
+ # counter the small delta b/w the marker update and gettimeofday().
+ # NOTE: this is only for changelog mode, not xsync.
+
+ # @change is the last changelog (therefore max time for this batch)
+ if self.syncdata_wait():
+ if done:
+ xtl = (int(change.split('.')[-1]) - 1, 0)
+ self.upd_stime(xtl)
+ map(self.master.server.changelog_done, changes)
+ self.update_worker_files_syncd()
+ break
+
+ # We do not know which changelog transfer failed, retry everything.
+ retry = True
+ tries += 1
+ if tries == self.MAX_RETRIES:
+ logging.warn('changelogs %s could not be processed - moving on...' % \
+ ' '.join(map(os.path.basename, changes)))
+ self.update_worker_total_files_skipped(self.current_files_skipped_count)
+ logging.warn('SKIPPED GFID = %s' % ','.join(self.skipped_gfid_list))
+ self.update_worker_files_syncd()
+ if done:
+ xtl = (int(change.split('.')[-1]) - 1, 0)
+ self.upd_stime(xtl)
+ map(self.master.server.changelog_done, changes)
+ break
+ # it's either entry_ops() or Rsync that failed to do it's
+ # job. Mostly it's entry_ops() [which currently has a problem
+ # of failing to create an entry but failing to return an errno]
+ # Therefore we do not know if it's either Rsync or the freaking
+ # entry_ops() that failed... so we retry the _whole_ changelog
+ # again.
+ # TODO: remove entry retries when it's gets fixed.
+ logging.warn('incomplete sync, retrying changelogs: %s' % \
+ ' '.join(map(os.path.basename, changes)))
+ time.sleep(0.5)
+
+ def upd_stime(self, stime, path=None):
+ if not path:
+ path = self.FLAT_DIR_HIERARCHY
if not stime == URXTIME:
- self.sendmark(self.FLAT_DIR_HIERARCHY, stime)
+ self.sendmark(path, stime)
+
+ def get_worker_status_file(self):
+ file_name = gconf.local_path+'.status'
+ file_name = file_name.replace("/", "_")
+ worker_status_file = gconf.georep_session_working_dir+file_name
+ return worker_status_file
+
+ def update_worker_status(self, key, value):
+ default_data = {"remote_node":"N/A",
+ "worker status":"Not Started",
+ "crawl status":"N/A",
+ "files_syncd": 0,
+ "files_remaining": 0,
+ "bytes_remaining": 0,
+ "purges_remaining": 0,
+ "total_files_skipped": 0}
+ worker_status_file = self.get_worker_status_file()
+ try:
+ with open(worker_status_file, 'r+') as f:
+ loaded_data = json.load(f)
+ loaded_data[key] = value
+ os.ftruncate(f.fileno(), 0)
+ os.lseek(f.fileno(), 0, os.SEEK_SET)
+ json.dump(loaded_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except (IOError, OSError, ValueError):
+ logging.info ('Creating new %s' % worker_status_file)
+ try:
+ with open(worker_status_file, 'wb') as f:
+ default_data[key] = value
+ json.dump(default_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except:
+ raise
+
+ def update_worker_cumilitive_status(self, files_pending):
+ default_data = {"remote_node":"N/A",
+ "worker status":"Not Started",
+ "crawl status":"N/A",
+ "files_syncd": 0,
+ "files_remaining": 0,
+ "bytes_remaining": 0,
+ "purges_remaining": 0,
+ "total_files_skipped": 0}
+ worker_status_file = self.get_worker_status_file()
+ try:
+ with open(worker_status_file, 'r+') as f:
+ loaded_data = json.load(f)
+ loaded_data['files_remaining'] = files_pending['count']
+ loaded_data['bytes_remaining'] = files_pending['bytes']
+ loaded_data['purges_remaining'] = files_pending['purge']
+ os.ftruncate(f.fileno(), 0)
+ os.lseek(f.fileno(), 0, os.SEEK_SET)
+ json.dump(loaded_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except (IOError, OSError, ValueError):
+ logging.info ('Creating new %s' % worker_status_file)
+ try:
+ with open(worker_status_file, 'wb') as f:
+ default_data['files_remaining'] = files_pending['count']
+ default_data['bytes_remaining'] = files_pending['bytes']
+ default_data['purges_remaining'] = files_pending['purge']
+ json.dump(default_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except:
+ raise
+
+ def update_worker_remote_node (self):
+ node = sys.argv[-1]
+ node = node.split("@")[-1]
+ remote_node_ip = node.split(":")[0]
+ remote_node_vol = node.split(":")[3]
+ remote_node = remote_node_ip + '::' + remote_node_vol
+ self.update_worker_status ('remote_node', remote_node)
+
+ def update_worker_health (self, state):
+ self.update_worker_status ('worker status', state)
+
+ def update_worker_crawl_status (self, state):
+ self.update_worker_status ('crawl status', state)
+
+ def update_worker_files_syncd (self):
+ default_data = {"remote_node":"N/A",
+ "worker status":"Not Started",
+ "crawl status":"N/A",
+ "files_syncd": 0,
+ "files_remaining": 0,
+ "bytes_remaining": 0,
+ "purges_remaining": 0,
+ "total_files_skipped": 0}
+ worker_status_file = self.get_worker_status_file()
+ try:
+ with open(worker_status_file, 'r+') as f:
+ loaded_data = json.load(f)
+ loaded_data['files_syncd'] += loaded_data['files_remaining']
+ loaded_data['files_remaining'] = 0
+ loaded_data['bytes_remaining'] = 0
+ loaded_data['purges_remaining'] = 0
+ os.ftruncate(f.fileno(), 0)
+ os.lseek(f.fileno(), 0, os.SEEK_SET)
+ json.dump(loaded_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except (IOError, OSError, ValueError):
+ logging.info ('Creating new %s' % worker_status_file)
+ try:
+ with open(worker_status_file, 'wb') as f:
+ json.dump(default_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except:
+ raise
+
+ def update_worker_files_remaining (self, state):
+ self.update_worker_status ('files_remaining', state)
+
+ def update_worker_bytes_remaining (self, state):
+ self.update_worker_status ('bytes_remaining', state)
+
+ def update_worker_purges_remaining (self, state):
+ self.update_worker_status ('purges_remaining', state)
+
+ def update_worker_total_files_skipped (self, value):
+ default_data = {"remote_node":"N/A",
+ "worker status":"Not Started",
+ "crawl status":"N/A",
+ "files_syncd": 0,
+ "files_remaining": 0,
+ "bytes_remaining": 0,
+ "purges_remaining": 0,
+ "total_files_skipped": 0}
+ worker_status_file = self.get_worker_status_file()
+ try:
+ with open(worker_status_file, 'r+') as f:
+ loaded_data = json.load(f)
+ loaded_data['total_files_skipped'] = value
+ loaded_data['files_remaining'] -= value
+ os.ftruncate(f.fileno(), 0)
+ os.lseek(f.fileno(), 0, os.SEEK_SET)
+ json.dump(loaded_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except (IOError, OSError, ValueError):
+ logging.info ('Creating new %s' % worker_status_file)
+ try:
+ with open(worker_status_file, 'wb') as f:
+ default_data['total_files_skipped'] = value
+ json.dump(default_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except:
+ raise
def crawl(self):
+ self.update_worker_crawl_status("Changelog Crawl")
changes = []
+ # get stime (from the brick) and purge changelogs
+ # that are _historical_ to that time.
+ purge_time = self.xtime('.', self.slave)
+ if isinstance(purge_time, int):
+ purge_time = None
try:
self.master.server.changelog_scan()
self.crawls += 1
except OSError:
self.fallback_xsync()
+ self.update_worker_crawl_status("Hybrid Crawl")
changes = self.master.server.changelog_getchanges()
if changes:
- xtl = self.xtime(self.FLAT_DIR_HIERARCHY)
- if isinstance(xtl, int):
- raise GsyncdError('master is corrupt')
+ if purge_time:
+ logging.info("slave's time: %s" % repr(purge_time))
+ processed = [x for x in changes if int(x.split('.')[-1]) < purge_time[0]]
+ for pr in processed:
+ logging.info('skipping already processed change: %s...' % os.path.basename(pr))
+ self.master.server.changelog_done(pr)
+ changes.remove(pr)
logging.debug('processing changes %s' % repr(changes))
self.process(changes)
- self.upd_stime(xtl)
def register(self):
(workdir, logfile) = self.setup_working_dir()
@@ -799,17 +1077,20 @@ class GMasterChangelogMixin(GMasterCommon):
class GMasterXsyncMixin(GMasterChangelogMixin):
"""
-
This crawl needs to be xtime based (as of now
it's not. this is beacuse we generate CHANGELOG
file during each crawl which is then processed
by process_change()).
For now it's used as a one-shot initial sync
mechanism and only syncs directories, regular
- files and symlinks.
+ files, hardlinks and symlinks.
"""
+ XSYNC_MAX_ENTRIES = 1<<13
+
def register(self):
+ self.counter = 0
+ self.comlist = []
self.sleep_interval = 60
self.tempdir = self.setup_working_dir()[0]
self.tempdir = os.path.join(self.tempdir, 'xsync')
@@ -823,6 +1104,36 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
else:
raise
+ def crawl(self):
+ """
+ event dispatcher thread
+
+ this thread dispatches either changelog or synchronizes stime.
+ additionally terminates itself on recieving a 'finale' event
+ """
+ def Xsyncer():
+ self.Xcrawl()
+ t = Thread(target=Xsyncer)
+ t.start()
+ logging.info('starting hybrid crawl...')
+ self.update_worker_crawl_status("Hybrid Crawl")
+ while True:
+ try:
+ item = self.comlist.pop(0)
+ if item[0] == 'finale':
+ logging.info('finished hybrid crawl syncing')
+ break
+ elif item[0] == 'xsync':
+ logging.info('processing xsync changelog %s' % (item[1]))
+ self.process([item[1]], 0)
+ elif item[0] == 'stime':
+ logging.debug('setting slave time: %s' % repr(item[1]))
+ self.upd_stime(item[1][1], item[1][0])
+ else:
+ logging.warn('unknown tuple in comlist (%s)' % repr(item))
+ except IndexError:
+ time.sleep(1)
+
def write_entry_change(self, prefix, data=[]):
self.fh.write("%s %s\n" % (prefix, ' '.join(data)))
@@ -839,24 +1150,61 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
def fname(self):
return self.xsync_change
- def crawl(self, path='.', xtr=None, done=0):
- """ generate a CHANGELOG file consumable by process_change """
+ def put(self, mark, item):
+ self.comlist.append((mark, item))
+
+ def sync_xsync(self, last):
+ """schedule a processing of changelog"""
+ self.close()
+ self.put('xsync', self.fname())
+ self.counter = 0
+ if not last:
+ time.sleep(1) # make sure changelogs are 1 second apart
+ self.open()
+
+ def sync_stime(self, stime=None, last=False):
+ """schedule a stime synchronization"""
+ if stime:
+ self.put('stime', stime)
+ if last:
+ self.put('finale', None)
+
+ def sync_done(self, stime=None, last=False):
+ self.sync_xsync(last)
+ if stime:
+ self.sync_stime(stime, last)
+
+ def Xcrawl(self, path='.', xtr_root=None):
+ """
+ generate a CHANGELOG file consumable by process_change.
+
+ slave's xtime (stime) is _cached_ for comparisons across
+ the filesystem tree, but set after directory synchronization.
+ """
if path == '.':
self.open()
self.crawls += 1
- if not xtr:
+ if not xtr_root:
# get the root stime and use it for all comparisons
- xtr = self.xtime('.', self.slave)
- if isinstance(xtr, int):
- if xtr != ENOENT:
- raise GsyncdError('slave is corrupt')
- xtr = self.minus_infinity
+ xtr_root = self.xtime('.', self.slave)
+ if isinstance(xtr_root, int):
+ if xtr_root != ENOENT:
+ logging.warn("slave cluster not returning the " \
+ "correct xtime for root (%d)" % xtr_root)
+ xtr_root = self.minus_infinity
xtl = self.xtime(path)
if isinstance(xtl, int):
- raise GsyncdError('master is corrupt')
- if xtr == xtl:
+ logging.warn("master cluster's xtime not found")
+ xtr = self.xtime(path, self.slave)
+ if isinstance(xtr, int):
+ if xtr != ENOENT:
+ logging.warn("slave cluster not returning the " \
+ "correct xtime for %s (%d)" % (path, xtr))
+ xtr = self.minus_infinity
+ xtr = max(xtr, xtr_root)
+ if not self.need_sync(path, xtl, xtr):
if path == '.':
- self.close()
+ self.sync_done((path, xtl), True)
return
self.xtime_reversion_hook(path, xtl, xtr)
logging.debug("entering " + path)
@@ -867,43 +1215,42 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
for e in dem:
bname = e
e = os.path.join(path, e)
- st = lstat(e)
+ xte = self.xtime(e)
+ if isinstance(xte, int):
+ logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte]))
+ continue
+ if not self.need_sync(e, xte, xtr):
+ continue
+ st = self.master.server.lstat(e)
if isinstance(st, int):
- logging.warn('%s got purged in the interim..' % e)
+ logging.warn('%s got purged in the interim ...' % e)
continue
gfid = self.master.server.gfid(e)
if isinstance(gfid, int):
- logging.warn('skipping entry %s..' % (e))
- continue
- xte = self.xtime(e)
- if isinstance(xte, int):
- raise GsyncdError('master is corrupt')
- if not self.need_sync(e, xte, xtr):
+ logging.warn('skipping entry %s..' % e)
continue
mo = st.st_mode
+ self.counter += 1
+ if self.counter == self.XSYNC_MAX_ENTRIES:
+ self.sync_done()
if stat.S_ISDIR(mo):
- self.write_entry_change("E", [gfid, 'MKDIR', escape(os.path.join(pargfid, bname))])
- self.crawl(e, xtr)
+ self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))])
+ self.Xcrawl(e, xtr_root)
+ self.sync_done((e, xte), False)
elif stat.S_ISLNK(mo):
- rl = errno_wrap(os.readlink, [en], [ENOENT])
- if isinstance(rl, int):
- continue
- self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname)), rl])
- else:
+ self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))])
+ elif stat.S_ISREG(mo):
+ nlink = st.st_nlink
+ nlink -= 1 # fixup backend stat link count
# if a file has a hardlink, create a Changelog entry as 'LINK' so the slave
# side will decide if to create the new entry, or to create link.
- if st.st_nlink == 1:
- self.write_entry_change("E", [gfid, 'MKNOD', escape(os.path.join(pargfid, bname))])
+ if nlink == 1:
+ self.write_entry_change("E", [gfid, 'MKNOD', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))])
else:
self.write_entry_change("E", [gfid, 'LINK', escape(os.path.join(pargfid, bname))])
- if stat.S_ISREG(mo):
- self.write_entry_change("D", [gfid])
-
+ self.write_entry_change("D", [gfid])
if path == '.':
- logging.info('processing xsync changelog %s' % self.fname())
- self.close()
- self.process([self.fname()], done)
- self.upd_stime(xtl)
+ self.sync_done((path, xtl), True)
class BoxClosedErr(Exception):
pass
@@ -979,12 +1326,13 @@ class Syncer(object):
each completed syncjob.
"""
- def __init__(self, slave):
+ def __init__(self, slave, sync_engine, resilient_errnos=[]):
"""spawn worker threads"""
self.slave = slave
self.lock = Lock()
self.pb = PostBox()
- self.bytes_synced = 0
+ self.sync_engine = sync_engine
+ self.errnos_ok = resilient_errnos
for i in range(int(gconf.sync_jobs)):
t = Thread(target=self.syncjob)
t.start()
@@ -1002,11 +1350,10 @@ class Syncer(object):
break
time.sleep(0.5)
pb.close()
- po = self.slave.rsync(pb)
+ po = self.sync_engine(pb)
if po.returncode == 0:
ret = (True, 0)
- elif po.returncode in (23, 24):
- # partial transfer (cf. rsync(1)), that's normal
+ elif po.returncode in self.errnos_ok:
ret = (False, po.returncode)
else:
po.errfail()