summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py133
1 files changed, 33 insertions, 100 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index a19fe2644..1ef760619 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -41,11 +41,7 @@ def _volinfo_hook_relax_foreign(self):
expiry)
time.sleep(expiry)
volinfo_sys = self.get_sys_volinfo()
- self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state,
- volinfo_sys)
- if self.inter_master:
- raise GsyncdError("cannot be intermediate master in special mode")
- return (volinfo_sys, state_change)
+ return volinfo_sys
# The API!
@@ -134,10 +130,7 @@ class NormalMixin(object):
return (vi, gap)
def volinfo_hook(self):
- volinfo_sys = self.get_sys_volinfo()
- self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state,
- volinfo_sys)
- return (volinfo_sys, state_change)
+ return self.get_sys_volinfo()
def xtime_reversion_hook(self, path, xtl, xtr):
if xtr > xtl:
@@ -227,13 +220,6 @@ class GMasterCommon(object):
if self.volinfo:
return self.volinfo['volume_mark']
- @property
- def inter_master(self):
- """decide if we are an intermediate master
- in a cascading setup
- """
- return self.volinfo_state[self.KFGN] and True or False
-
def xtime(self, path, *a, **opts):
"""get amended xtime
@@ -303,11 +289,6 @@ class GMasterCommon(object):
self.total_crawl_stats = None
self.start = None
self.change_seen = None
- # the authoritative (foreign, native) volinfo pair
- # which lets us deduce what to do when we refetch
- # the volinfos from system
- uuid_preset = getattr(gconf, 'volume_id', None)
- self.volinfo_state = (uuid_preset and {'uuid': uuid_preset}, None)
# the actual volinfo we make use of
self.volinfo = None
self.terminate = False
@@ -326,34 +307,6 @@ class GMasterCommon(object):
t = Thread(target=keep_alive)
t.start()
- def volinfo_query(self):
- """volume info state machine"""
- volinfo_sys, state_change = self.volinfo_hook()
- if self.inter_master:
- self.volinfo = volinfo_sys[self.KFGN]
- else:
- self.volinfo = volinfo_sys[self.KNAT]
- if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master):
- logging.info('new master is %s', self.uuid)
- if self.volinfo:
- logging.info("%s master with volume id %s ..." % \
- (self.inter_master and "intermediate" or "primary",
- self.uuid))
- if state_change == self.KFGN:
- gconf.configinterface.set('volume_id', self.uuid)
- if self.volinfo:
- if self.volinfo['retval']:
- raise GsyncdError ("master is corrupt")
- self.start_checkpoint_thread()
- else:
- if should_display_info or self.crawls == 0:
- if self.inter_master:
- logging.info("waiting for being synced from %s ..." % \
- self.volinfo_state[self.KFGN]['uuid'])
- else:
- logging.info("waiting for volume info ...")
- return True
-
def should_crawl(cls):
return (gconf.glusterd_uuid in cls.master.server.node_uuid())
@@ -366,25 +319,38 @@ class GMasterCommon(object):
# for a passive gsyncd (ie. in a replicate scenario)
# the keepalive thread would keep the connection alive.
self.init_keep_alive()
+
+ # no need to maintain volinfo state machine.
+ # in a cascading setup, each geo-replication session is
+ # independent (ie. 'volume-mark' and 'xtime' are not
+ # propogated). This is beacuse the slave's xtime is now
+ # stored on the master itself. 'volume-mark' just identifies
+ # that we are in a cascading setup and need to enable
+ # 'geo-replication.ignore-pid-check' option.
+ volinfo_sys = self.volinfo_hook()
+ self.volinfo = volinfo_sys[self.KNAT]
+ inter_master = volinfo_sys[self.KFGN]
+ logging.info("%s master with volume id %s ..." % \
+ (inter_master and "intermediate" or "primary",
+ self.uuid))
+ gconf.configinterface.set('volume_id', self.uuid)
+ if self.volinfo:
+ if self.volinfo['retval']:
+ raise GsyncdError("master is corrupt")
+ self.start_checkpoint_thread()
+ else:
+ raise GsyncdError("master volinfo unavailable")
self.total_crawl_stats = self.get_initial_crawl_data()
self.lastreport['time'] = time.time()
logging.info('crawl interval: %d seconds' % self.sleep_interval)
+
t0 = time.time()
crawl = self.should_crawl()
while not self.terminate:
- if self.volinfo_query():
- continue
- t1 = time.time()
- if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds
- crawl = self.should_crawl()
- t0 = t1
- if not crawl:
- time.sleep(5)
- continue
if self.start:
logging.debug("... crawl #%d done, took %.6f seconds" % \
(self.crawls, time.time() - self.start))
- self.start = t1
+ self.start = time.time()
should_display_info = self.start - self.lastreport['time'] >= 60
if should_display_info:
logging.info("%d crawls, %d turns",
@@ -393,6 +359,13 @@ class GMasterCommon(object):
self.lastreport.update(crawls = self.crawls,
turns = self.turns,
time = self.start)
+ t1 = time.time()
+ if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds
+ crawl = self.should_crawl()
+ t0 = t1
+ if not crawl:
+ time.sleep(5)
+ continue
self.crawl()
if oneshot:
return
@@ -585,46 +558,6 @@ class GMasterCommon(object):
self.slave.server.setattr(path, adct)
self.set_slave_xtime(path, mark)
- @staticmethod
- def volinfo_state_machine(volinfo_state, volinfo_sys):
- """compute new volinfo_state from old one and incoming
- as of current system state, also indicating if there was a
- change regarding which volume mark is the authoritative one
-
- @volinfo_state, @volinfo_sys are pairs of volume mark dicts
- (foreign, native).
-
- Note this method is marked as static, ie. the computation is
- pure, without reliance on any excess implicit state. State
- transitions which are deemed as ambiguous or banned will raise
- an exception.
-
- """
- # store the value below "boxed" to emulate proper closures
- # (variables of the enclosing scope are available inner functions
- # provided they are no reassigned; mutation is OK).
- param = FreeObject(relax_mismatch = False, state_change = None, index=-1)
- def select_vi(vi0, vi):
- param.index += 1
- if vi and (not vi0 or vi0['uuid'] == vi['uuid']):
- if not vi0 and not param.relax_mismatch:
- param.state_change = param.index
- # valid new value found; for the rest, we are graceful about
- # uuid mismatch
- param.relax_mismatch = True
- return vi
- if vi0 and vi and vi0['uuid'] != vi['uuid'] and not param.relax_mismatch:
- # uuid mismatch for master candidate, bail out
- raise GsyncdError("aborting on uuid change from %s to %s" % \
- (vi0['uuid'], vi['uuid']))
- # fall back to old
- return vi0
- newstate = tuple(select_vi(*vip) for vip in zip(volinfo_state, volinfo_sys))
- srep = lambda vi: vi and vi['uuid'][0:8]
- logging.debug('(%s, %s) << (%s, %s) -> (%s, %s)' % \
- tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate))
- return newstate, param.state_change
-
class GMasterChangelogMixin(GMasterCommon):
""" changelog based change detection and syncing """
@@ -765,7 +698,7 @@ class GMasterChangelogMixin(GMasterCommon):
e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2]))
entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2, stat=st))
else:
- pass
+ logging.warn('ignoring %s [op %s]' % (gfid, ty))
elif et in self.TYPE_GFID:
go = os.path.join(pfx, ec[0])
st = lstat(go)