diff options
Diffstat (limited to 'geo-replication')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 133 |
1 files changed, 33 insertions, 100 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index a19fe264419..1ef76061976 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -41,11 +41,7 @@ def _volinfo_hook_relax_foreign(self): expiry) time.sleep(expiry) volinfo_sys = self.get_sys_volinfo() - self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, - volinfo_sys) - if self.inter_master: - raise GsyncdError("cannot be intermediate master in special mode") - return (volinfo_sys, state_change) + return volinfo_sys # The API! @@ -134,10 +130,7 @@ class NormalMixin(object): return (vi, gap) def volinfo_hook(self): - volinfo_sys = self.get_sys_volinfo() - self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, - volinfo_sys) - return (volinfo_sys, state_change) + return self.get_sys_volinfo() def xtime_reversion_hook(self, path, xtl, xtr): if xtr > xtl: @@ -227,13 +220,6 @@ class GMasterCommon(object): if self.volinfo: return self.volinfo['volume_mark'] - @property - def inter_master(self): - """decide if we are an intermediate master - in a cascading setup - """ - return self.volinfo_state[self.KFGN] and True or False - def xtime(self, path, *a, **opts): """get amended xtime @@ -303,11 +289,6 @@ class GMasterCommon(object): self.total_crawl_stats = None self.start = None self.change_seen = None - # the authoritative (foreign, native) volinfo pair - # which lets us deduce what to do when we refetch - # the volinfos from system - uuid_preset = getattr(gconf, 'volume_id', None) - self.volinfo_state = (uuid_preset and {'uuid': uuid_preset}, None) # the actual volinfo we make use of self.volinfo = None self.terminate = False @@ -326,34 +307,6 @@ class GMasterCommon(object): t = Thread(target=keep_alive) t.start() - def volinfo_query(self): - """volume info state machine""" - volinfo_sys, state_change = self.volinfo_hook() - if self.inter_master: - self.volinfo = volinfo_sys[self.KFGN] - else: - self.volinfo = volinfo_sys[self.KNAT] - if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master): - logging.info('new master is %s', self.uuid) - if self.volinfo: - logging.info("%s master with volume id %s ..." % \ - (self.inter_master and "intermediate" or "primary", - self.uuid)) - if state_change == self.KFGN: - gconf.configinterface.set('volume_id', self.uuid) - if self.volinfo: - if self.volinfo['retval']: - raise GsyncdError ("master is corrupt") - self.start_checkpoint_thread() - else: - if should_display_info or self.crawls == 0: - if self.inter_master: - logging.info("waiting for being synced from %s ..." % \ - self.volinfo_state[self.KFGN]['uuid']) - else: - logging.info("waiting for volume info ...") - return True - def should_crawl(cls): return (gconf.glusterd_uuid in cls.master.server.node_uuid()) @@ -366,25 +319,38 @@ class GMasterCommon(object): # for a passive gsyncd (ie. in a replicate scenario) # the keepalive thread would keep the connection alive. self.init_keep_alive() + + # no need to maintain volinfo state machine. + # in a cascading setup, each geo-replication session is + # independent (ie. 'volume-mark' and 'xtime' are not + # propogated). This is beacuse the slave's xtime is now + # stored on the master itself. 'volume-mark' just identifies + # that we are in a cascading setup and need to enable + # 'geo-replication.ignore-pid-check' option. + volinfo_sys = self.volinfo_hook() + self.volinfo = volinfo_sys[self.KNAT] + inter_master = volinfo_sys[self.KFGN] + logging.info("%s master with volume id %s ..." % \ + (inter_master and "intermediate" or "primary", + self.uuid)) + gconf.configinterface.set('volume_id', self.uuid) + if self.volinfo: + if self.volinfo['retval']: + raise GsyncdError("master is corrupt") + self.start_checkpoint_thread() + else: + raise GsyncdError("master volinfo unavailable") self.total_crawl_stats = self.get_initial_crawl_data() self.lastreport['time'] = time.time() logging.info('crawl interval: %d seconds' % self.sleep_interval) + t0 = time.time() crawl = self.should_crawl() while not self.terminate: - if self.volinfo_query(): - continue - t1 = time.time() - if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds - crawl = self.should_crawl() - t0 = t1 - if not crawl: - time.sleep(5) - continue if self.start: logging.debug("... crawl #%d done, took %.6f seconds" % \ (self.crawls, time.time() - self.start)) - self.start = t1 + self.start = time.time() should_display_info = self.start - self.lastreport['time'] >= 60 if should_display_info: logging.info("%d crawls, %d turns", @@ -393,6 +359,13 @@ class GMasterCommon(object): self.lastreport.update(crawls = self.crawls, turns = self.turns, time = self.start) + t1 = time.time() + if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds + crawl = self.should_crawl() + t0 = t1 + if not crawl: + time.sleep(5) + continue self.crawl() if oneshot: return @@ -585,46 +558,6 @@ class GMasterCommon(object): self.slave.server.setattr(path, adct) self.set_slave_xtime(path, mark) - @staticmethod - def volinfo_state_machine(volinfo_state, volinfo_sys): - """compute new volinfo_state from old one and incoming - as of current system state, also indicating if there was a - change regarding which volume mark is the authoritative one - - @volinfo_state, @volinfo_sys are pairs of volume mark dicts - (foreign, native). - - Note this method is marked as static, ie. the computation is - pure, without reliance on any excess implicit state. State - transitions which are deemed as ambiguous or banned will raise - an exception. - - """ - # store the value below "boxed" to emulate proper closures - # (variables of the enclosing scope are available inner functions - # provided they are no reassigned; mutation is OK). - param = FreeObject(relax_mismatch = False, state_change = None, index=-1) - def select_vi(vi0, vi): - param.index += 1 - if vi and (not vi0 or vi0['uuid'] == vi['uuid']): - if not vi0 and not param.relax_mismatch: - param.state_change = param.index - # valid new value found; for the rest, we are graceful about - # uuid mismatch - param.relax_mismatch = True - return vi - if vi0 and vi and vi0['uuid'] != vi['uuid'] and not param.relax_mismatch: - # uuid mismatch for master candidate, bail out - raise GsyncdError("aborting on uuid change from %s to %s" % \ - (vi0['uuid'], vi['uuid'])) - # fall back to old - return vi0 - newstate = tuple(select_vi(*vip) for vip in zip(volinfo_state, volinfo_sys)) - srep = lambda vi: vi and vi['uuid'][0:8] - logging.debug('(%s, %s) << (%s, %s) -> (%s, %s)' % \ - tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate)) - return newstate, param.state_change - class GMasterChangelogMixin(GMasterCommon): """ changelog based change detection and syncing """ @@ -765,7 +698,7 @@ class GMasterChangelogMixin(GMasterCommon): e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2])) entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2, stat=st)) else: - pass + logging.warn('ignoring %s [op %s]' % (gfid, ty)) elif et in self.TYPE_GFID: go = os.path.join(pfx, ec[0]) st = lstat(go) |