diff options
author | Csaba Henk <csaba@gluster.com> | 2011-08-10 05:02:43 +0300 |
---|---|---|
committer | Anand Avati <avati@gluster.com> | 2011-09-08 00:06:57 -0700 |
commit | 7d4560cbcdcae0d74cf486c544d5eb58775da51f (patch) | |
tree | 52a2a9cb4e51a4786b195492de18a1fb7b6713d2 /xlators/features/marker/utils/syncdaemon/master.py | |
parent | d39a7fad09a6b4abcb23d132fd7dfdf0d440e928 (diff) |
gsyncd: do the homework, document _everything_
Change-Id: I559e6a0709b8064cfd54c693e289c741f9c4c4ab
BUG: 1570
Reviewed-on: http://review.gluster.com/319
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Kaushik BV <kaushikbv@gluster.com>
Diffstat (limited to 'xlators/features/marker/utils/syncdaemon/master.py')
-rw-r--r-- | xlators/features/marker/utils/syncdaemon/master.py | 124 |
1 files changed, 121 insertions, 3 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py index e7cb977e8ad..4273bf0c419 100644 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -14,11 +14,16 @@ from syncdutils import FreeObject, Thread, GsyncdError URXTIME = (-1, 0) class GMaster(object): + """class impementling master role""" KFGN = 0 KNAT = 1 def get_sys_volinfo(self): + """query volume marks on fs root + + err out on multiple foreign masters + """ fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \ self.master.server.native_volume_info() fgn_vi = None @@ -40,9 +45,20 @@ class GMaster(object): @property def inter_master(self): + """decide if we are an intermediate master + in a cascading setup + """ return self.volinfo_state[self.KFGN] and True or False def xtime(self, path, *a, **opts): + """get amended xtime + + as of amending, we can create missing xtime, or + determine a valid value if what we get is expired + (as of the volume mark expiry); way of amendig + depends on @opts and on subject of query (master + or slave). + """ if a: rsc = a[0] else: @@ -87,7 +103,7 @@ class GMaster(object): self.lastreport = {'crawls': 0, 'turns': 0} self.start = None self.change_seen = None - # the authorative (foreign, native) volinfo pair + # the authoritative (foreign, native) volinfo pair # which lets us deduce what to do when we refetch # the volinfos from system uuid_preset = getattr(gconf, 'volume_id', None) @@ -97,6 +113,7 @@ class GMaster(object): self.terminate = False def crawl_loop(self): + """start the keep-alive thread and iterate .crawl""" timo = int(gconf.timeout or 0) if timo > 0: def keep_alive(): @@ -124,15 +141,24 @@ class GMaster(object): self.crawl() def add_job(self, path, label, job, *a, **kw): + """insert @job function to job table at @path with @label""" if self.jobtab.get(path) == None: self.jobtab[path] = [] self.jobtab[path].append((label, a, lambda : job(*a, **kw))) def add_failjob(self, path, label): + """invoke .add_job with a job that does nothing just fails""" logging.debug('salvaged: ' + label) self.add_job(path, label, lambda: False) def wait(self, path, *args): + """perform jobs registered for @path + + Reset jobtab entry for @path, + determine success as the conjuction of + success of all the jobs. In case of + success, call .sendmark on @path + """ jobs = self.jobtab.pop(path, []) succeed = True for j in jobs: @@ -144,12 +170,29 @@ class GMaster(object): return succeed def sendmark(self, path, mark, adct=None): + """update slave side xtime for @path to master side xtime + + also can send a setattr payload (see Server.setattr). + """ if adct: self.slave.server.setattr(path, adct) self.slave.server.set_xtime(path, self.uuid, mark) @staticmethod def volinfo_state_machine(volinfo_state, volinfo_sys): + """compute new volinfo_state from old one and incoming + as of current system state, also indicating if there was a + change regarding which volume mark is the authoritative one + + @volinfo_state, @volinfo_sys are pairs of volume mark dicts + (foreign, native). + + Note this method is marked as static, ie. the computation is + pure, without reliance on any excess implicit state. State + transitions which are deemed as ambiguous or banned will raise + an exception. + + """ # store the value below "boxed" to emulate proper closures # (variables of the enclosing scope are available inner functions # provided they are no reassigned; mutation is OK). @@ -176,6 +219,43 @@ class GMaster(object): return newstate, param.state_change def crawl(self, path='.', xtl=None): + """crawling... + + Standing around + All the right people + Crawling + Tennis on Tuesday + The ladder is long + It is your nature + You've gotta suntan + Football on Sunday + Society boy + + Recursively walk the master side tree and check if updates are + needed due to xtime differences. One invocation of crawl checks + children of @path and do a recursive enter only on + those directory children where there is an update needed. + + Way of updates depend on file type: + - for symlinks, sync them directy and synchronously + - for regular children, register jobs for @path (cf. .add_job) to start + and wait on their rsync + - for directory children, register a job for @path which waits (.wait) + on jobs for the given child + (other kind of filesystem nodes are not considered) + + Those slave side children which do not exist on master are simply + purged (see Server.purge). + + Behavior is fault tolerant, synchronization is adaptive: if some action fails, + just go on relentlessly, adding a fail job (see .add_failjob) which will prevent + the .sendmark on @path, so when the next crawl will arrive to @path it will not + see it as up-to-date and will try to sync it again. While this semantics can be + supported by funky design principles (http://c2.com/cgi/wiki?LazinessImpatienceHubris), + the ultimate reason which excludes other possibilities is simply transience: we cannot + assert that the file systems (master / slave) underneath do not change and actions + taken upon some condition will not lose their context by the time they are performed. + """ if path == '.': if self.start: self.crawls += 1 @@ -326,14 +406,18 @@ class BoxClosedErr(Exception): pass class PostBox(list): + """synchronized collection for storing things thought of as "requests" """ def __init__(self, *a): list.__init__(self, *a) + # too bad Python stdlib does not have read/write locks... + # it would suffivce to grab the lock in .append as reader, in .close as writer self.lever = Condition() self.open = True self.done = False def wait(self): + """wait on requests to be processed""" self.lever.acquire() if not self.done: self.lever.wait() @@ -341,6 +425,7 @@ class PostBox(list): return self.result def wakeup(self, data): + """wake up requestors with the result""" self.result = data self.lever.acquire() self.done = True @@ -348,6 +433,7 @@ class PostBox(list): self.lever.release() def append(self, e): + """post a request""" self.lever.acquire() if not self.open: raise BoxClosedErr @@ -355,13 +441,43 @@ class PostBox(list): self.lever.release() def close(self): + """prohibit the posting of further requests""" self.lever.acquire() self.open = False self.lever.release() class Syncer(object): + """a staged queue to relay rsync requests to rsync workers + + By "staged queue" its meant that when a consumer comes to the + queue, it takes _all_ entries, leaving the queue empty. + (I don't know if there is an official term for this pattern.) + + The queue uses a PostBox to accumulate incoming items. + When a consumer (rsync worker) comes, a new PostBox is + set up and the old one is passed on to the consumer. + + Instead of the simplistic scheme of having one big lock + which synchronizes both the addition of new items and + PostBox exchanges, use a separate lock to arbitrate consumers, + and rely on PostBox's synchronization mechanisms take + care about additions. + + There is a corner case racy situation, producers vs. consumers, + which is not handled by this scheme: namely, when the PostBox + exchange occurs in between being passed to the producer for posting + and the post placement. But that's what Postbox.close is for: + such a posting will find the PostBox closed, in which case + the producer can re-try posting against the actual PostBox of + the queue. + + To aid accumlation of items in the PostBoxen before grabbed + by an rsync worker, the worker goes to sleep a bit after + each completed syncjob. + """ def __init__(self, slave): + """spawn worker threads""" self.slave = slave self.lock = Lock() self.pb = PostBox() @@ -370,6 +486,7 @@ class Syncer(object): t.start() def syncjob(self): + """the life of a worker""" while True: pb = None while True: @@ -393,8 +510,9 @@ class Syncer(object): def add(self, e): while True: + pb = self.pb try: - self.pb.append(e) - return self.pb + pb.append(e) + return pb except BoxClosedErr: pass |