summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--xlators/features/marker/utils/syncdaemon/gsyncd.py8
-rw-r--r--xlators/features/marker/utils/syncdaemon/master.py368
-rw-r--r--xlators/features/marker/utils/syncdaemon/resource.py31
3 files changed, 341 insertions, 66 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py
index d68cea6725e..165aebda1f9 100644
--- a/xlators/features/marker/utils/syncdaemon/gsyncd.py
+++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py
@@ -172,6 +172,14 @@ def main_i():
op.add_option('--allow-network', metavar='IPS', default='')
op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs)
op.add_option('--checkpoint', metavar='LABEL', default='')
+ # tunables for failover/failback mechanism:
+ # None - gsyncd behaves as normal
+ # blind - gsyncd works with xtime pairs to identify
+ # candidates for synchronization
+ # wrapup - same as normal mode but does not assign
+ # xtimes to orphaned files
+ # see crawl() for usage of the above tunables
+ op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP)
op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local)
# duh. need to specify dest or value will be mapped to None :S
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py
index 4826037f134..945ebb75dd2 100644
--- a/xlators/features/marker/utils/syncdaemon/master.py
+++ b/xlators/features/marker/utils/syncdaemon/master.py
@@ -22,8 +22,281 @@ from syncdutils import FreeObject, Thread, GsyncdError, boolify, \
URXTIME = (-1, 0)
-class GMaster(object):
- """class impementling master role"""
+# Utility functions to help us to get to closer proximity
+# of the DRY principle (no, don't look for elevated or
+# perspectivistic things here)
+
+def _xtime_now():
+ t = time.time()
+ sec = int(t)
+ nsec = int((t - sec) * 1000000)
+ return (sec, nsec)
+
+def _volinfo_hook_relax_foreign(self):
+ volinfo_sys = self.get_sys_volinfo()
+ fgn_vi = volinfo_sys[self.KFGN]
+ if fgn_vi:
+ expiry = fgn_vi['timeout'] - int(time.time()) + 1
+ logging.info('foreign volume info found, waiting %d sec for expiry' % \
+ expiry)
+ time.sleep(expiry)
+ volinfo_sys = self.get_sys_volinfo()
+ self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state,
+ volinfo_sys)
+ if self.inter_master:
+ raise GsyncdError("cannot be intermediate master in special mode")
+ return (volinfo_sys, state_change)
+
+
+# The API!
+
+def gmaster_builder():
+ """produce the GMaster class variant corresponding
+ to sync mode"""
+ this = sys.modules[__name__]
+ mixin = gconf.special_sync_mode
+ if not mixin:
+ mixin = 'normal'
+ logging.info('setting up master for %s sync mode' % mixin)
+ mixin = getattr(this, mixin.capitalize() + 'Mixin')
+ class _GMaster(GMasterBase, mixin):
+ pass
+ return _GMaster
+
+
+# Mixin classes that implement the data format
+# and logic particularities of the certain
+# sync modes
+
+class NormalMixin(object):
+ """normal geo-rep behavior"""
+
+ minus_infinity = URXTIME
+
+ # following staticmethods ideally would be
+ # methods of an xtime object (in particular,
+ # implementing the hooks needed for comparison
+ # operators), but at this point we don't yet
+ # have a dedicated xtime class
+
+ @staticmethod
+ def serialize_xtime(xt):
+ return "%d.%d" % tuple(xt)
+
+ @staticmethod
+ def deserialize_xtime(xt):
+ return tuple(int(x) for x in xt.split("."))
+
+ @staticmethod
+ def native_xtime(xt):
+ return xt
+
+ @staticmethod
+ def xtime_geq(xt0, xt1):
+ return xt0 >= xt1
+
+ def make_xtime_opts(self, is_master, opts):
+ if not 'create' in opts:
+ opts['create'] = is_master and not self.inter_master
+ if not 'default_xtime' in opts:
+ if is_master and self.inter_master:
+ opts['default_xtime'] = ENODATA
+ else:
+ opts['default_xtime'] = URXTIME
+
+ def xtime_low(self, server, path, **opts):
+ xt = server.xtime(path, self.uuid)
+ if isinstance(xt, int) and xt != ENODATA:
+ return xt
+ if xt == ENODATA or xt < self.volmark:
+ if opts['create']:
+ xt = _xtime_now()
+ server.set_xtime(path, self.uuid, xt)
+ else:
+ xt = opts['default_xtime']
+ return xt
+
+ def keepalive_payload_hook(self, timo, gap):
+ # first grab a reference as self.volinfo
+ # can be changed in main thread
+ vi = self.volinfo
+ if vi:
+ # then have a private copy which we can mod
+ vi = vi.copy()
+ vi['timeout'] = int(time.time()) + timo
+ else:
+ # send keep-alives more frequently to
+ # avoid a delay in announcing our volume info
+ # to slave if it becomes established in the
+ # meantime
+ gap = min(10, gap)
+ return (vi, gap)
+
+ def volinfo_hook(self):
+ volinfo_sys = self.get_sys_volinfo()
+ self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state,
+ volinfo_sys)
+ return (volinfo_sys, state_change)
+
+ def xtime_reversion_hook(self, path, xtl, xtr):
+ if xtr > xtl:
+ raise GsyncdError("timestamp corruption for " + path)
+
+ def need_sync(self, e, xte, xtrd):
+ return xte > xtrd
+
+ def set_slave_xtime(self, path, mark):
+ self.slave.server.set_xtime(path, self.uuid, mark)
+
+class WrapupMixin(NormalMixin):
+ """a variant that differs from normal in terms
+ of ignoring non-indexed files"""
+
+ @staticmethod
+ def make_xtime_opts(is_master, opts):
+ if not 'create' in opts:
+ opts['create'] = False
+ if not 'default_xtime' in opts:
+ opts['default_xtime'] = URXTIME
+
+ @staticmethod
+ def keepalive_payload_hook(timo, gap):
+ return (None, gap)
+
+ def volinfo_hook(self):
+ return _volinfo_hook_relax_foreign(self)
+
+class BlindMixin(object):
+ """Geo-rep flavor using vectored xtime.
+
+ Coordinates are the master, slave uuid pair;
+ in master coordinate behavior is normal,
+ in slave coordinate we force synchronization
+ on any value difference (these are in disjunctive
+ relation, ie. if either orders the entry to be
+ synced, it shall be synced.
+ """
+
+ minus_infinity = (URXTIME, None)
+
+ @staticmethod
+ def serialize_xtime(xt):
+ a = []
+ for x in xt:
+ if not x:
+ x = ('None', '')
+ a.extend(x)
+ return '.'.join(str(n) for n in a)
+
+ @staticmethod
+ def deserialize_xtime(xt):
+ a = xt.split(".")
+ a = (tuple(a[0:2]), tuple(a[3:4]))
+ b = []
+ for p in a:
+ if p[0] == 'None':
+ p = None
+ else:
+ p = tuple(int(x) for x in p)
+ b.append(p)
+ return tuple(b)
+
+ @staticmethod
+ def native_xtime(xt):
+ return xt[0]
+
+ @staticmethod
+ def xtime_geq(xt0, xt1):
+ return (not xt1[0] or xt0[0] >= xt1[0]) and \
+ (not xt1[1] or xt0[1] >= xt1[1])
+
+ @property
+ def ruuid(self):
+ if self.volinfo_r:
+ return self.volinfo_r['uuid']
+
+ @staticmethod
+ def make_xtime_opts(is_master, opts):
+ if not 'create' in opts:
+ opts['create'] = is_master
+ if not 'default_xtime' in opts:
+ opts['default_xtime'] = URXTIME
+
+ def xtime_low(self, server, path, **opts):
+ xtd = server.xtime_vec(path, self.uuid, self.ruuid)
+ if isinstance(xtd, int):
+ return xtd
+ xt = (xtd[self.uuid], xtd[self.ruuid])
+ if not xt[1] and (not xt[0] or xt[0] < self.volmark):
+ if opts['create']:
+ # not expected, but can happen if file originates
+ # from interrupted gsyncd transfer
+ logging.warn('have to fix up missing xtime on ' + path)
+ xt0 = _xtime_now()
+ server.set_xtime(path, self.uuid, xt0)
+ else:
+ xt0 = opts['default_xtime']
+ xt = (xt0, xt[1])
+ return xt
+
+ @staticmethod
+ def keepalive_payload_hook(timo, gap):
+ return (None, gap)
+
+ def volinfo_hook(self):
+ res = _volinfo_hook_relax_foreign(self)
+ volinfo_r_new = self.slave.server.native_volume_info()
+ if volinfo_r_new['retval']:
+ raise GsyncdError("slave is corrupt")
+ if getattr(self, 'volinfo_r', None):
+ if self.volinfo_r['uuid'] != volinfo_r_new['uuid']:
+ raise GsyncdError("uuid mismatch on slave")
+ self.volinfo_r = volinfo_r_new
+ return res
+
+ def xtime_reversion_hook(self, path, xtl, xtr):
+ if not isinstance(xtr[0], int) and \
+ (isinstance(xtl[0], int) or xtr[0] > xtl[0]):
+ raise GsyncdError("timestamp corruption for " + path)
+
+ def need_sync(self, e, xte, xtrd):
+ if xte[0]:
+ if not xtrd[0] or xte[0] > xtrd[0]:
+ # there is outstanding diff at 0th pos,
+ # we can short-cut to true
+ return True
+ # we arrived to this point by either of these
+ # two possiblilites:
+ # - no outstanding difference at 0th pos,
+ # wanna see 1st pos if he raises veto
+ # against "no need to sync" proposal
+ # - no data at 0th pos, 1st pos will have
+ # to decide (due to xtime assignment,
+ # in this case 1st pos does carry data
+ # -- iow, if 1st pos did not have data,
+ # and 0th neither, 0th would have been
+ # force-feeded)
+ if not xte[1]:
+ # no data, no veto
+ return False
+ # the hard work: for 1st pos,
+ # the conduct is fetch corresponding
+ # slave data and do a "blind" comparison
+ # (ie. do not care who is newer, we trigger
+ # sync on non-identical xitmes)
+ xtr = self.xtime(e, self.slave)
+ return isinstance(xtr, int) or xte[1] != xtr[1]
+
+ def set_slave_xtime(self, path, mark):
+ xtd = {}
+ for (u, t) in zip((self.uuid, self.ruuid), mark):
+ if t:
+ xtd[u] = t
+ self.slave.server.set_xtime_vec(path, xtd)
+
+
+class GMasterBase(object):
+ """abstract class impementling master role"""
KFGN = 0
KNAT = 1
@@ -72,27 +345,8 @@ class GMaster(object):
rsc = a[0]
else:
rsc = self.master
- if not 'create' in opts:
- opts['create'] = (rsc == self.master and not self.inter_master)
- if not 'default_xtime' in opts:
- if rsc == self.master and self.inter_master:
- opts['default_xtime'] = ENODATA
- else:
- opts['default_xtime'] = URXTIME
- xt = rsc.server.xtime(path, self.uuid)
- if isinstance(xt, int) and xt != ENODATA:
- return xt
- invalid_xtime = (xt == ENODATA or xt < self.volmark)
- if invalid_xtime:
- if opts['create']:
- t = time.time()
- sec = int(t)
- nsec = int((t - sec) * 1000000)
- xt = (sec, nsec)
- rsc.server.set_xtime(path, self.uuid, xt)
- else:
- xt = opts['default_xtime']
- return xt
+ self.make_xtime_opts(rsc == self.master, opts)
+ return self.xtime_low(rsc.server, path, **opts)
def __init__(self, master, slave):
self.master = master
@@ -123,8 +377,8 @@ class GMaster(object):
self.terminate = False
self.checkpoint_thread = None
- @staticmethod
- def _checkpt_param(chkpt, prm, timish=True):
+ @classmethod
+ def _checkpt_param(cls, chkpt, prm, xtimish=True):
"""use config backend to lookup a parameter belonging to
checkpoint @chkpt"""
cprm = getattr(gconf, 'checkpoint_' + prm, None)
@@ -133,16 +387,16 @@ class GMaster(object):
chkpt_mapped, val = cprm.split(':', 1)
if unescape(chkpt_mapped) != chkpt:
return
- if timish:
- val = tuple(int(x) for x in val.split("."))
+ if xtimish:
+ val = cls.deserialize_xtime(val)
return val
- @staticmethod
- def _set_checkpt_param(chkpt, prm, val, timish=True):
+ @classmethod
+ def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True):
"""use config backend to store a parameter associated
with checkpoint @chkpt"""
- if timish:
- val = "%d.%d" % tuple(val)
+ if xtimish:
+ val = cls.serialize_xtime(val)
gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))
@staticmethod
@@ -167,12 +421,14 @@ class GMaster(object):
conn, _ = chan.accept()
conn.send('\0')
conn.close()
- completed = self._checkpt_param(chkpt, 'completed')
+ completed = self._checkpt_param(chkpt, 'completed', xtimish=False)
+ if completed:
+ completed = tuple(int(x) for x in completed.split('.'))
while True:
s,_,_ = select([chan], [], [], (not completed) and 5 or None)
# either request made and we re-check to not
# give back stale data, or we still hunting for completion
- if tgt < self.volmark:
+ if self.native_xtime(tgt) and self.native_xtime(tgt) < self.volmark:
# indexing has been reset since setting the checkpoint
status = "is invalid"
else:
@@ -180,15 +436,16 @@ class GMaster(object):
if isinstance(xtr, int):
raise GsyncdError("slave root directory is unaccessible (%s)",
os.strerror(xtr))
- ncompleted = (xtr >= tgt)
+ ncompleted = self.xtime_geq(xtr, tgt)
if completed and not ncompleted: # stale data
logging.warn("completion time %s for checkpoint %s became stale" % \
(self.humantime(*completed), chkpt))
completed = None
gconf.confdata.delete('checkpoint-completed')
if ncompleted and not completed: # just reaching completion
- completed = [ int(x) for x in ("%.6f" % time.time()).split('.') ]
- self._set_checkpt_param(chkpt, 'completed', completed)
+ completed = "%.6f" % time.time()
+ self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False)
+ completed = tuple(int(x) for x in completed.split('.'))
logging.info("checkpoint %s completed" % chkpt)
status = completed and \
"completed at " + self.humantime(completed[0]) or \
@@ -232,8 +489,8 @@ class GMaster(object):
raise GsyncdError("master root directory is unaccessible (%s)",
os.strerror(checkpt_tgt))
self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt)
- logging.debug("checkpoint target %d.%d has been determined for checkpoint %s" % \
- (checkpt_tgt[0], checkpt_tgt[1], gconf.checkpoint))
+ logging.debug("checkpoint target %s has been determined for checkpoint %s" % \
+ (repr(checkpt_tgt), gconf.checkpoint))
t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt))
t.start()
self.checkpoint_thread = t
@@ -244,20 +501,7 @@ class GMaster(object):
if timo > 0:
def keep_alive():
while True:
- gap = timo * 0.5
- # first grab a reference as self.volinfo
- # can be changed in main thread
- vi = self.volinfo
- if vi:
- # then have a private copy which we can mod
- vi = vi.copy()
- vi['timeout'] = int(time.time()) + timo
- else:
- # send keep-alives more frequently to
- # avoid a delay in announcing our volume info
- # to slave if it becomes established in the
- # meantime
- gap = min(10, gap)
+ vi, gap = self.keepalive_payload_hook(timo, timo * 0.5)
self.slave.server.keep_alive(vi)
time.sleep(gap)
t = Thread(target=keep_alive)
@@ -302,7 +546,7 @@ class GMaster(object):
"""
if adct:
self.slave.server.setattr(path, adct)
- self.slave.server.set_xtime(path, self.uuid, mark)
+ self.set_slave_xtime(path, mark)
@staticmethod
def volinfo_state_machine(volinfo_state, volinfo_sys):
@@ -397,9 +641,7 @@ class GMaster(object):
self.lastreport.update(crawls = self.crawls,
turns = self.turns,
time = self.start)
- volinfo_sys = self.get_sys_volinfo()
- self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state,
- volinfo_sys)
+ volinfo_sys, state_change = self.volinfo_hook()
if self.inter_master:
self.volinfo = volinfo_sys[self.KFGN]
else:
@@ -430,20 +672,18 @@ class GMaster(object):
if isinstance(xtl, int):
self.add_failjob(path, 'no-local-node')
return
- xtr0 = self.xtime(path, self.slave)
- if isinstance(xtr0, int):
- if xtr0 != ENOENT:
+ xtr = self.xtime(path, self.slave)
+ if isinstance(xtr, int):
+ if xtr != ENOENT:
self.slave.server.purge(path)
try:
self.slave.server.mkdir(path)
except OSError:
self.add_failjob(path, 'no-remote-node')
return
- xtr = URXTIME
+ xtr = self.minus_infinity
else:
- xtr = xtr0
- if xtr > xtl:
- raise GsyncdError("timestamp corruption for " + path)
+ self.xtime_reversion_hook(path, xtl, xtr)
if xtl == xtr:
if path == '.' and self.change_seen:
self.turns += 1
@@ -482,7 +722,7 @@ class GMaster(object):
xte = self.xtime(e)
if isinstance(xte, int):
logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte]))
- elif xte > xtr:
+ elif self.need_sync(e, xte, xtr):
chld.append((e, xte))
def indulgently(e, fnc, blame=None):
if not blame:
diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py
index c4cd19c9fb7..7e62fd48ca9 100644
--- a/xlators/features/marker/utils/syncdaemon/resource.py
+++ b/xlators/features/marker/utils/syncdaemon/resource.py
@@ -17,7 +17,7 @@ from select import error as selecterror
from gconf import gconf
import repce
from repce import RepceServer, RepceClient
-from master import GMaster
+from master import gmaster_builder
import syncdutils
from syncdutils import GsyncdError, select, privileged
@@ -359,11 +359,37 @@ class Server(object):
raise
@classmethod
+ def xtime_vec(cls, path, *uuids):
+ """vectored version of @xtime
+
+ accepts a list of uuids and returns a dictionary
+ with uuid as key(s) and xtime as value(s)
+ """
+ xt = {}
+ for uuid in uuids:
+ xtu = cls.xtime(path, uuid)
+ if xtu == ENODATA:
+ xtu = None
+ if isinstance(xtu, int):
+ return xtu
+ xt[uuid] = xtu
+ return xt
+
+ @classmethod
@_pathguard
def set_xtime(cls, path, uuid, mark):
"""set @mark as xtime for @uuid on @path"""
Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))
+ @classmethod
+ def set_xtime_vec(cls, path, mark_dct):
+ """vectored (or dictered) version of set_xtime
+
+ ignore values that match @ignore
+ """
+ for u,t in mark_dct.items():
+ cls.set_xtime(path, u, t)
+
@staticmethod
@_pathguard
def setattr(path, adct):
@@ -604,6 +630,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
if x[0] > now:
logging.debug("volinfo[%s] expires: %d (%d sec later)" % \
(d['uuid'], x[0], x[0] - now))
+ d['timeout'] = x[0]
dict_list.append(d)
else:
try:
@@ -820,7 +847,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
- else do that's what's inherited
"""
if args:
- GMaster(self, args[0]).crawl_loop()
+ gmaster_builder()(self, args[0]).crawl_loop()
else:
sup(self, *args)