summaryrefslogtreecommitdiffstats
path: root/xlators/features/marker
diff options
context:
space:
mode:
authorCsaba Henk <csaba@gluster.com>2011-08-10 05:02:43 +0300
committerAnand Avati <avati@gluster.com>2011-09-08 00:06:57 -0700
commit7d4560cbcdcae0d74cf486c544d5eb58775da51f (patch)
tree52a2a9cb4e51a4786b195492de18a1fb7b6713d2 /xlators/features/marker
parentd39a7fad09a6b4abcb23d132fd7dfdf0d440e928 (diff)
gsyncd: do the homework, document _everything_
Change-Id: I559e6a0709b8064cfd54c693e289c741f9c4c4ab BUG: 1570 Reviewed-on: http://review.gluster.com/319 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Kaushik BV <kaushikbv@gluster.com>
Diffstat (limited to 'xlators/features/marker')
-rw-r--r--xlators/features/marker/utils/syncdaemon/configinterface.py41
-rw-r--r--xlators/features/marker/utils/syncdaemon/gconf.py3
-rw-r--r--xlators/features/marker/utils/syncdaemon/gsyncd.py17
-rw-r--r--xlators/features/marker/utils/syncdaemon/libcxattr.py10
-rw-r--r--xlators/features/marker/utils/syncdaemon/master.py124
-rw-r--r--xlators/features/marker/utils/syncdaemon/monitor.py22
-rw-r--r--xlators/features/marker/utils/syncdaemon/repce.py64
-rw-r--r--xlators/features/marker/utils/syncdaemon/resource.py196
-rw-r--r--xlators/features/marker/utils/syncdaemon/syncdutils.py23
9 files changed, 483 insertions, 17 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/configinterface.py b/xlators/features/marker/utils/syncdaemon/configinterface.py
index cc8f7063a..fbf96c843 100644
--- a/xlators/features/marker/utils/syncdaemon/configinterface.py
+++ b/xlators/features/marker/utils/syncdaemon/configinterface.py
@@ -16,6 +16,7 @@ re_type = type(re.compile(''))
class MultiDict(object):
+ """a virtual dict-like class which functions as the union of underlying dicts"""
def __init__(self, *dd):
self.dicts = dd
@@ -31,8 +32,16 @@ class MultiDict(object):
class GConffile(object):
+ """A high-level interface to ConfigParser which flattens the two-tiered
+ config layout by implenting automatic section dispatch based on initial
+ parameters.
+
+ Also ensure section ordering in terms of their time of addition -- a compat
+ hack for Python < 2.7.
+ """
def _normconfig(self):
+ """normalize config keys by s/-/_/g"""
for n, s in self.config._sections.items():
if n.find('__') == 0:
continue
@@ -44,6 +53,13 @@ class GConffile(object):
self.config._sections[n] = s2
def __init__(self, path, peers, *dd):
+ """
+ - .path: location of config file
+ - .config: underlying ConfigParser instance
+ - .peers: on behalf of whom we flatten .config
+ (master, or master-slave url pair)
+ - .auxdicts: template subtituents
+ """
self.peers = peers
self.path = path
self.auxdicts = dd
@@ -52,6 +68,7 @@ class GConffile(object):
self._normconfig()
def section(self, rx=False):
+ """get the section name of the section representing .peers in .config"""
peers = self.peers
if not peers:
peers = ['.', '.']
@@ -64,6 +81,9 @@ class GConffile(object):
@staticmethod
def parse_section(section):
+ """retrieve peers sequence encoded by section name
+ (as urls or regexen, depending on section type)
+ """
sl = section.split()
st = sl.pop(0)
sl = [unescape(u) for u in sl]
@@ -83,7 +103,7 @@ class GConffile(object):
also those sections which are not registered
in SECT_ORD.
- Needed for python 2.{4,5} where ConfigParser
+ Needed for python 2.{4,5,6} where ConfigParser
cannot yet order sections/options internally.
"""
so = {}
@@ -108,6 +128,13 @@ class GConffile(object):
return ss
def update_to(self, dct, allow_unresolved=False):
+ """update @dct from key/values of ours.
+
+ key/values are collected from .config by filtering the regexp sections
+ according to match, and from .section. The values are treated as templates,
+ which are substituted from .auxdicts and (in case of regexp sections)
+ match groups.
+ """
if not self.peers:
raise GsyncdError('no peers given, cannot select matching options')
def update_from_sect(sect, mud):
@@ -136,6 +163,10 @@ class GConffile(object):
update_from_sect(self.section(), MultiDict(dct, *self.auxdicts))
def get(self, opt=None):
+ """print the matching key/value pairs from .config,
+ or if @opt given, the value for @opt (according to the
+ logic described in .update_to)
+ """
d = {}
self.update_to(d, allow_unresolved = True)
if opt:
@@ -150,6 +181,10 @@ class GConffile(object):
print("%s: %s" % (k, v))
def write(self, trfn, opt, *a, **kw):
+ """update on-disk config transactionally
+
+ @trfn is the transaction function
+ """
def mergeconf(f):
self.config = ConfigParser.RawConfigParser()
self.config.readfp(f)
@@ -163,6 +198,7 @@ class GConffile(object):
update_file(self.path, updateconf, mergeconf)
def _set(self, opt, val, rx=False):
+ """set @opt to @val in .section"""
sect = self.section(rx)
if not self.config.has_section(sect):
self.config.add_section(sect)
@@ -174,12 +210,15 @@ class GConffile(object):
return True
def set(self, opt, *a, **kw):
+ """perform ._set transactionally"""
self.write(self._set, opt, *a, **kw)
def _delete(self, opt, rx=False):
+ """delete @opt from .section"""
sect = self.section(rx)
if self.config.has_section(sect):
return self.config.remove_option(sect, opt)
def delete(self, opt, *a, **kw):
+ """perform ._delete transactionally"""
self.write(self._delete, opt, *a, **kw)
diff --git a/xlators/features/marker/utils/syncdaemon/gconf.py b/xlators/features/marker/utils/syncdaemon/gconf.py
index ddbac21e4..4e3b959fe 100644
--- a/xlators/features/marker/utils/syncdaemon/gconf.py
+++ b/xlators/features/marker/utils/syncdaemon/gconf.py
@@ -1,6 +1,9 @@
import os
class GConf(object):
+ """singleton class to store globals
+ shared between gsyncd modules"""
+
ssh_ctl_dir = None
ssh_ctl_args = None
cpid = None
diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py
index 960b83c13..c0d39ffd6 100644
--- a/xlators/features/marker/utils/syncdaemon/gsyncd.py
+++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py
@@ -21,6 +21,10 @@ import resource
from monitor import monitor
class GLogger(Logger):
+ """Logger customizations for gsyncd.
+
+ It implements a log format similar to that of glusterfs.
+ """
def makeRecord(self, name, level, *a):
rv = Logger.makeRecord(self, name, level, *a)
@@ -54,6 +58,7 @@ class GLogger(Logger):
def startup(**kw):
+ """set up logging, pidfile grabbing, daemonization"""
if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn':
if not grabpidfile():
sys.stderr.write("pidfile is taken, exiting.\n")
@@ -96,6 +101,7 @@ def startup(**kw):
gconf.log_exit = True
def main():
+ """main routine, signal/exception handling boilerplates"""
signal.signal(signal.SIGTERM, lambda *a: finalize(*a, **{'exval': 1}))
GLogger.setup()
excont = FreeObject(exval = 0)
@@ -108,6 +114,17 @@ def main():
finalize(exval = excont.exval)
def main_i():
+ """internal main routine
+
+ parse command line, decide what action will be taken;
+ we can either:
+ - query/manipulate configuration
+ - format gsyncd urls using gsyncd's url parsing engine
+ - start service in following modes, in given stages:
+ - monitor: startup(), monitor()
+ - master: startup(), connect_remote(), connect(), service_loop()
+ - slave: startup(), connect(), service_loop()
+ """
rconf = {'go_daemon': 'should'}
def store_abs(opt, optstr, val, parser):
diff --git a/xlators/features/marker/utils/syncdaemon/libcxattr.py b/xlators/features/marker/utils/syncdaemon/libcxattr.py
index fdc016c47..f0a9d2292 100644
--- a/xlators/features/marker/utils/syncdaemon/libcxattr.py
+++ b/xlators/features/marker/utils/syncdaemon/libcxattr.py
@@ -3,6 +3,15 @@ from ctypes import *
from ctypes.util import find_library
class Xattr(object):
+ """singleton that wraps the extended attribues system
+ interface for python using ctypes
+
+ Just implement it to the degree we need it, in particular
+ - we need just the l*xattr variants, ie. we never want symlinks to be
+ followed
+ - don't need size discovery for getxattr, as we always know the exact
+ sizes we expect
+ """
libc = CDLL(find_library("libc"))
@@ -54,6 +63,7 @@ class Xattr(object):
@classmethod
def llistxattr_buf(cls, path):
+ """listxattr variant with size discovery"""
size = cls.llistxattr(path)
if size == -1:
cls.raise_oserr()
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py
index e7cb977e8..4273bf0c4 100644
--- a/xlators/features/marker/utils/syncdaemon/master.py
+++ b/xlators/features/marker/utils/syncdaemon/master.py
@@ -14,11 +14,16 @@ from syncdutils import FreeObject, Thread, GsyncdError
URXTIME = (-1, 0)
class GMaster(object):
+ """class impementling master role"""
KFGN = 0
KNAT = 1
def get_sys_volinfo(self):
+ """query volume marks on fs root
+
+ err out on multiple foreign masters
+ """
fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \
self.master.server.native_volume_info()
fgn_vi = None
@@ -40,9 +45,20 @@ class GMaster(object):
@property
def inter_master(self):
+ """decide if we are an intermediate master
+ in a cascading setup
+ """
return self.volinfo_state[self.KFGN] and True or False
def xtime(self, path, *a, **opts):
+ """get amended xtime
+
+ as of amending, we can create missing xtime, or
+ determine a valid value if what we get is expired
+ (as of the volume mark expiry); way of amendig
+ depends on @opts and on subject of query (master
+ or slave).
+ """
if a:
rsc = a[0]
else:
@@ -87,7 +103,7 @@ class GMaster(object):
self.lastreport = {'crawls': 0, 'turns': 0}
self.start = None
self.change_seen = None
- # the authorative (foreign, native) volinfo pair
+ # the authoritative (foreign, native) volinfo pair
# which lets us deduce what to do when we refetch
# the volinfos from system
uuid_preset = getattr(gconf, 'volume_id', None)
@@ -97,6 +113,7 @@ class GMaster(object):
self.terminate = False
def crawl_loop(self):
+ """start the keep-alive thread and iterate .crawl"""
timo = int(gconf.timeout or 0)
if timo > 0:
def keep_alive():
@@ -124,15 +141,24 @@ class GMaster(object):
self.crawl()
def add_job(self, path, label, job, *a, **kw):
+ """insert @job function to job table at @path with @label"""
if self.jobtab.get(path) == None:
self.jobtab[path] = []
self.jobtab[path].append((label, a, lambda : job(*a, **kw)))
def add_failjob(self, path, label):
+ """invoke .add_job with a job that does nothing just fails"""
logging.debug('salvaged: ' + label)
self.add_job(path, label, lambda: False)
def wait(self, path, *args):
+ """perform jobs registered for @path
+
+ Reset jobtab entry for @path,
+ determine success as the conjuction of
+ success of all the jobs. In case of
+ success, call .sendmark on @path
+ """
jobs = self.jobtab.pop(path, [])
succeed = True
for j in jobs:
@@ -144,12 +170,29 @@ class GMaster(object):
return succeed
def sendmark(self, path, mark, adct=None):
+ """update slave side xtime for @path to master side xtime
+
+ also can send a setattr payload (see Server.setattr).
+ """
if adct:
self.slave.server.setattr(path, adct)
self.slave.server.set_xtime(path, self.uuid, mark)
@staticmethod
def volinfo_state_machine(volinfo_state, volinfo_sys):
+ """compute new volinfo_state from old one and incoming
+ as of current system state, also indicating if there was a
+ change regarding which volume mark is the authoritative one
+
+ @volinfo_state, @volinfo_sys are pairs of volume mark dicts
+ (foreign, native).
+
+ Note this method is marked as static, ie. the computation is
+ pure, without reliance on any excess implicit state. State
+ transitions which are deemed as ambiguous or banned will raise
+ an exception.
+
+ """
# store the value below "boxed" to emulate proper closures
# (variables of the enclosing scope are available inner functions
# provided they are no reassigned; mutation is OK).
@@ -176,6 +219,43 @@ class GMaster(object):
return newstate, param.state_change
def crawl(self, path='.', xtl=None):
+ """crawling...
+
+ Standing around
+ All the right people
+ Crawling
+ Tennis on Tuesday
+ The ladder is long
+ It is your nature
+ You've gotta suntan
+ Football on Sunday
+ Society boy
+
+ Recursively walk the master side tree and check if updates are
+ needed due to xtime differences. One invocation of crawl checks
+ children of @path and do a recursive enter only on
+ those directory children where there is an update needed.
+
+ Way of updates depend on file type:
+ - for symlinks, sync them directy and synchronously
+ - for regular children, register jobs for @path (cf. .add_job) to start
+ and wait on their rsync
+ - for directory children, register a job for @path which waits (.wait)
+ on jobs for the given child
+ (other kind of filesystem nodes are not considered)
+
+ Those slave side children which do not exist on master are simply
+ purged (see Server.purge).
+
+ Behavior is fault tolerant, synchronization is adaptive: if some action fails,
+ just go on relentlessly, adding a fail job (see .add_failjob) which will prevent
+ the .sendmark on @path, so when the next crawl will arrive to @path it will not
+ see it as up-to-date and will try to sync it again. While this semantics can be
+ supported by funky design principles (http://c2.com/cgi/wiki?LazinessImpatienceHubris),
+ the ultimate reason which excludes other possibilities is simply transience: we cannot
+ assert that the file systems (master / slave) underneath do not change and actions
+ taken upon some condition will not lose their context by the time they are performed.
+ """
if path == '.':
if self.start:
self.crawls += 1
@@ -326,14 +406,18 @@ class BoxClosedErr(Exception):
pass
class PostBox(list):
+ """synchronized collection for storing things thought of as "requests" """
def __init__(self, *a):
list.__init__(self, *a)
+ # too bad Python stdlib does not have read/write locks...
+ # it would suffivce to grab the lock in .append as reader, in .close as writer
self.lever = Condition()
self.open = True
self.done = False
def wait(self):
+ """wait on requests to be processed"""
self.lever.acquire()
if not self.done:
self.lever.wait()
@@ -341,6 +425,7 @@ class PostBox(list):
return self.result
def wakeup(self, data):
+ """wake up requestors with the result"""
self.result = data
self.lever.acquire()
self.done = True
@@ -348,6 +433,7 @@ class PostBox(list):
self.lever.release()
def append(self, e):
+ """post a request"""
self.lever.acquire()
if not self.open:
raise BoxClosedErr
@@ -355,13 +441,43 @@ class PostBox(list):
self.lever.release()
def close(self):
+ """prohibit the posting of further requests"""
self.lever.acquire()
self.open = False
self.lever.release()
class Syncer(object):
+ """a staged queue to relay rsync requests to rsync workers
+
+ By "staged queue" its meant that when a consumer comes to the
+ queue, it takes _all_ entries, leaving the queue empty.
+ (I don't know if there is an official term for this pattern.)
+
+ The queue uses a PostBox to accumulate incoming items.
+ When a consumer (rsync worker) comes, a new PostBox is
+ set up and the old one is passed on to the consumer.
+
+ Instead of the simplistic scheme of having one big lock
+ which synchronizes both the addition of new items and
+ PostBox exchanges, use a separate lock to arbitrate consumers,
+ and rely on PostBox's synchronization mechanisms take
+ care about additions.
+
+ There is a corner case racy situation, producers vs. consumers,
+ which is not handled by this scheme: namely, when the PostBox
+ exchange occurs in between being passed to the producer for posting
+ and the post placement. But that's what Postbox.close is for:
+ such a posting will find the PostBox closed, in which case
+ the producer can re-try posting against the actual PostBox of
+ the queue.
+
+ To aid accumlation of items in the PostBoxen before grabbed
+ by an rsync worker, the worker goes to sleep a bit after
+ each completed syncjob.
+ """
def __init__(self, slave):
+ """spawn worker threads"""
self.slave = slave
self.lock = Lock()
self.pb = PostBox()
@@ -370,6 +486,7 @@ class Syncer(object):
t.start()
def syncjob(self):
+ """the life of a worker"""
while True:
pb = None
while True:
@@ -393,8 +510,9 @@ class Syncer(object):
def add(self, e):
while True:
+ pb = self.pb
try:
- self.pb.append(e)
- return self.pb
+ pb.append(e)
+ return pb
except BoxClosedErr:
pass
diff --git a/xlators/features/marker/utils/syncdaemon/monitor.py b/xlators/features/marker/utils/syncdaemon/monitor.py
index 365e91435..b8e9219dc 100644
--- a/xlators/features/marker/utils/syncdaemon/monitor.py
+++ b/xlators/features/marker/utils/syncdaemon/monitor.py
@@ -8,11 +8,14 @@ from gconf import gconf
from syncdutils import update_file
class Monitor(object):
+ """class which spawns and manages gsyncd workers"""
def __init__(self):
self.state = None
def set_state(self, state):
+ """set the state that can be used by external agents
+ like glusterd for status reporting"""
if state == self.state:
return
self.state = state
@@ -21,6 +24,24 @@ class Monitor(object):
update_file(gconf.state_file, lambda f: f.write(state + '\n'))
def monitor(self):
+ """the monitor loop
+
+ Basic logic is a blantantly simple blunt heuristics:
+ if spawned client survives 60 secs, it's considered OK.
+ This servers us pretty well as it's not vulneralbe to
+ any kind of irregular behavior of the child...
+
+ ... well, except for one: if children is hung up on
+ waiting for some event, it can survive aeons, still
+ will be defunct. So we tweak the above logic to
+ expect the worker to send us a signal within 60 secs
+ (in the form of closing its end of a pipe). The worker
+ does this when it's done with the setup stage
+ ready to enter the service loop (note it's the setup
+ stage which is vulnerable to hangs -- the full
+ blown worker blows up on EPIPE if the net goes down,
+ due to the keep-alive thread)
+ """
argv = sys.argv[:]
for o in ('-N', '--no-daemon', '--monitor'):
while o in argv:
@@ -77,4 +98,5 @@ class Monitor(object):
return ret
def monitor():
+ """oh yeah, actually Monitor is used as singleton, too"""
return Monitor().monitor()
diff --git a/xlators/features/marker/utils/syncdaemon/repce.py b/xlators/features/marker/utils/syncdaemon/repce.py
index 47691301e..947352490 100644
--- a/xlators/features/marker/utils/syncdaemon/repce.py
+++ b/xlators/features/marker/utils/syncdaemon/repce.py
@@ -36,21 +36,39 @@ def ioparse(i, o):
return (i, o)
def send(out, *args):
+ """pickle args and write out wholly in one syscall
+
+ ie. not use the ability of pickle to dump directly to
+ a stream, as that would potentially mess up messages
+ by interleaving them
+ """
os.write(out, pickle.dumps(args, pickle_proto))
def recv(inf):
+ """load an object from input stream"""
return pickle.load(inf)
class RepceServer(object):
+ """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
+
+ ... also our homebrewed RPC backend where the transport layer is
+ reduced to a pair of filehandles.
+
+ This is the server component.
+ """
def __init__(self, obj, i, o, wnum=6):
+ """register a backend object .obj to which incoming messages
+ are dispatched, also incoming/outcoming streams
+ """
self.obj = obj
self.inf, self.out = ioparse(i, o)
self.wnum = wnum
self.q = Queue()
def service_loop(self):
+ """fire up worker threads, get messages and dispatch among them"""
for i in range(self.wnum):
t = Thread(target=self.worker)
t.start()
@@ -61,6 +79,15 @@ class RepceServer(object):
logging.info("terminating on reaching EOF.")
def worker(self):
+ """life of a worker
+
+ Get message, extract its id, method name and arguments
+ (kwargs not supported), call method on .obj.
+ Send back message id + return value.
+ If method call throws an exception, rescue it, and send
+ back the exception as result (with flag marking it as
+ exception).
+ """
while True:
in_data = self.q.get(True)
rid = in_data[0]
@@ -79,8 +106,14 @@ class RepceServer(object):
class RepceJob(object):
+ """class representing message status we can use
+ for waiting on reply"""
def __init__(self, cbk):
+ """
+ - .rid: (process-wise) unique id
+ - .cbk: what we do upon receiving reply
+ """
self.rid = (os.getpid(), thread.get_ident(), time.time())
self.cbk = cbk
self.lever = Condition()
@@ -105,6 +138,13 @@ class RepceJob(object):
class RepceClient(object):
+ """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
+
+ ... also our homebrewed RPC backend where the transport layer is
+ reduced to a pair of filehandles.
+
+ This is the client component.
+ """
def __init__(self, i, o):
self.inf, self.out = ioparse(i, o)
@@ -121,6 +161,11 @@ class RepceClient(object):
rjob.cbk(rjob, [exc, res])
def push(self, meth, *args, **kw):
+ """wrap arguments in a RepceJob, send them to server
+ and return the RepceJob
+
+ @cbk to pass on RepceJob can be given as kwarg.
+ """
cbk = kw.get('cbk')
if not cbk:
def cbk(rj, res):
@@ -133,6 +178,11 @@ class RepceClient(object):
return rjob
def __call__(self, meth, *args):
+ """RePCe client is callabe, calling it implements a synchronous remote call
+
+ We do a .push with a cbk which does a wakeup upon receiving anwser, then wait
+ on the RepceJob.
+ """
rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
exc, res = rjob.wait()
if exc:
@@ -142,7 +192,11 @@ class RepceClient(object):
return res
class mprx(object):
+ """method proxy, standard trick to implement rubyesque method_missing
+ in Python
+ A class is a closure factory, you know what I mean, or go read some SICP.
+ """
def __init__(self, ins, meth):
self.ins = ins
self.meth = meth
@@ -151,9 +205,19 @@ class RepceClient(object):
return self.ins(self.meth, *a)
def __getattr__(self, meth):
+ """this implements transparent method dispatch to remote object,
+ so that you don't need to call the RepceClient instance like
+
+ rclient('how_old_are_you_if_born_in', 1979)
+
+ but you can make it into an ordinary method call like
+
+ rclient.how_old_are_you_if_born_in(1979)
+ """
return self.mprx(self, meth)
def __version__(self):
+ """used in handshake to verify compatibility"""
d = {'proto': self('__repce_version__')}
try:
d['object'] = self('version')
diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py
index 66600fdad..f92e85734 100644
--- a/xlators/features/marker/utils/syncdaemon/resource.py
+++ b/xlators/features/marker/utils/syncdaemon/resource.py
@@ -26,9 +26,19 @@ HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
def sup(x, *a, **kw):
+ """a rubyesque "super" for python ;)
+
+ invoke caller method in parent class with given args.
+ """
return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw)
def desugar(ustr):
+ """transform sugared url strings to standard <scheme>://<urlbody> form
+
+ parsing logic enforces the constraint that sugared forms should contatin
+ a ':' or a '/', which ensures that sugared urls do not conflict with
+ gluster volume names.
+ """
m = re.match('([^:]*):(.*)', ustr)
if m:
if not m.groups()[0]:
@@ -46,6 +56,7 @@ def desugar(ustr):
return "file://" + ap
def gethostbyname(hnam):
+ """gethostbyname wrapper"""
try:
return socket.gethostbyname(hnam)
except socket.gaierror:
@@ -54,6 +65,11 @@ def gethostbyname(hnam):
(hnam, ex.strerror))
def parse_url(ustr):
+ """instantiate an url object by scheme-to-class dispatch
+
+ The url classes taken into consideration are the ones in
+ this module whose names are full-caps.
+ """
m = UrlRX.match(ustr)
if not m:
ustr = desugar(ustr)
@@ -68,8 +84,17 @@ def parse_url(ustr):
class _MetaXattr(object):
+ """singleton class, a lazy wrapper around the
+ libcxattr module
+
+ libcxattr (a heavy import due to ctypes) is
+ loaded only when when the single
+ instance is tried to be used.
- # load Xattr stuff on-demand
+ This reduces runtime for those invocations
+ which do not need filesystem manipulation
+ (eg. for config, url parsing)
+ """
def __getattr__(self, meth):
from libcxattr import Xattr as LXattr
@@ -84,14 +109,17 @@ Xattr = _MetaXattr()
class Popen(subprocess.Popen):
+ """customized subclass of subprocess.Popen with a ring
+ buffer for children error ouput"""
@classmethod
def init_errhandler(cls):
+ """start the thread which hanldes children's error output"""
cls.errstore = {}
def tailer():
while True:
for po in select.select([po.stderr for po in cls.errstore], [], []):
- po.lock()
+ po.lock.acquire()
try:
la = cls.errstore.get(po)
if la == None:
@@ -103,23 +131,22 @@ class Popen(subprocess.Popen):
while tots > 1<<20 and la:
tots -= len(la.pop(0))
finally:
- po.unlock()
+ po.lock.release()
t = syncdutils.Thread(target = tailer)
t.start()
cls.errhandler = t
- def lock(self):
- self._lock.acquire()
-
- def unlock(self):
- self._lock.release()
-
def __init__(self, args, *a, **kw):
- """subprocess.Popen wrapper with error-handling"""
+ """customizations for subprocess.Popen instantiation
+
+ - 'close_fds' is taken to be the default
+ - if child's stderr is chosen to be managed,
+ register it with the error handler thread
+ """
self.args = args
if 'close_fds' not in kw:
kw['close_fds'] = True
- self._lock = threading.Lock()
+ self.lock = threading.Lock()
try:
sup(self, args, *a, **kw)
except:
@@ -133,6 +160,7 @@ class Popen(subprocess.Popen):
self.errstore[self] = []
def errfail(self):
+ """fail nicely if child did not terminate with success"""
filling = None
if self.elines:
filling = ", saying:"
@@ -144,11 +172,15 @@ class Popen(subprocess.Popen):
syncdutils.finalize(exval = 1)
def terminate_geterr(self, fail_on_err = True):
- self.lock()
+ """kill child, finalize stderr harvesting (unregister
+ from errhandler, set up .elines), fail on error if
+ asked for
+ """
+ self.lock.acquire()
try:
elines = self.errstore.pop(self)
finally:
- self.unlock()
+ self.lock.release()
if self.poll() == None:
self.terminate()
if sp.poll() == None:
@@ -167,6 +199,12 @@ class Popen(subprocess.Popen):
class Server(object):
+ """singleton implemening those filesystem access primitives
+ which are needed for geo-replication functionality
+
+ (Singleton in the sense it's a class which has only static
+ and classmethods and is used directly, without instantiation.)
+ """
GX_NSPACE = "trusted.glusterfs"
NTV_FMTSTR = "!" + "B"*19 + "II"
@@ -175,6 +213,7 @@ class Server(object):
@staticmethod
def entries(path):
+ """directory entries in an array"""
# prevent symlinks being followed
if not stat.S_ISDIR(os.lstat(path).st_mode):
raise OSError(ENOTDIR, os.strerror(ENOTDIR))
@@ -182,6 +221,20 @@ class Server(object):
@classmethod
def purge(cls, path, entries=None):
+ """force-delete subtrees
+
+ If @entries is not specified, delete
+ the whole subtree under @path (including
+ @path).
+
+ Otherwise, @entries should be a
+ a sequence of children of @path, and
+ the effect is identical with a joint
+ @entries-less purge on them, ie.
+
+ for e in entries:
+ cls.purge(os.path.join(path, e))
+ """
me_also = entries == None
if not entries:
try:
@@ -216,6 +269,7 @@ class Server(object):
@classmethod
def _create(cls, path, ctor):
+ """path creation backend routine"""
try:
ctor(path)
except OSError:
@@ -235,6 +289,13 @@ class Server(object):
@classmethod
def xtime(cls, path, uuid):
+ """query xtime extended attribute
+
+ Return xtime of @path for @uuid as a pair of integers.
+ "Normal" errors due to non-existent @path or extended attribute
+ are tolerated and errno is returned in such a case.
+ """
+
try:
return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8))
except OSError:
@@ -246,10 +307,17 @@ class Server(object):
@classmethod
def set_xtime(cls, path, uuid, mark):
+ """set @mark as xtime for @uuid on @path"""
Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))
@staticmethod
def setattr(path, adct):
+ """set file attributes
+
+ @adct is a dict, where 'own', 'mode' and 'times'
+ keys are looked for and values used to perform
+ chown, chmod or utimes on @path.
+ """
own = adct.get('own')
if own:
os.lchown(path, *own)
@@ -267,6 +335,14 @@ class Server(object):
last_keep_alive = 0
@classmethod
def keep_alive(cls, dct):
+ """process keepalive messages.
+
+ Return keep-alive counter (number of received keep-alive
+ messages).
+
+ Now the "keep-alive" message can also have a payload which is
+ used to set a foreign volume-mark on the underlying file system.
+ """
if dct:
key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']])
val = struct.pack(cls.FRGN_FMTSTR,
@@ -279,15 +355,30 @@ class Server(object):
@staticmethod
def version():
+ """version used in handshake"""
return 1.0
class SlaveLocal(object):
+ """mix-in class to implement some factes of a slave server
+
+ ("mix-in" is sort of like "abstract class", ie. it's not
+ instantiated just included in the ancesty DAG. I use "mix-in"
+ to indicate that it's not used as an abstract base class,
+ rather just taken in to implement additional functionality
+ on the basis of the assumed availability of certain interfaces.)
+ """
def can_connect_to(self, remote):
+ """determine our position in the connectibility matrix"""
return not remote
def service_loop(self):
+ """start a RePCe server serving self's server
+
+ stop servicing if a timeout is configured and got no
+ keep-alime in that inteval
+ """
repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs))
t = syncdutils.Thread(target=lambda: (repce.service_loop(),
syncdutils.finalize()))
@@ -304,8 +395,15 @@ class SlaveLocal(object):
select.select((), (), ())
class SlaveRemote(object):
+ """mix-in class to implement an interface to a remote slave"""
def connect_remote(self, rargs=[], **opts):
+ """connects to a remote slave
+
+ Invoke an auxiliary utility (slave gsyncd, possibly wrapped)
+ which sets up the connection and set up a RePCe client to
+ communicate throuh its stdio.
+ """
slave = opts.get('slave', self.url)
so = getattr(gconf, 'session_owner', None)
if so:
@@ -319,6 +417,11 @@ class SlaveRemote(object):
return self.start_fd_client(po.stdout, po.stdin, **opts)
def start_fd_client(self, i, o, **opts):
+ """set up RePCe client, handshake with server
+
+ It's cut out as a separate method to let
+ subclasses hook into client startup
+ """
self.server = RepceClient(i, o)
rv = self.server.__version__()
exrv = {'proto': repce.repce_version, 'object': Server.version()}
@@ -331,6 +434,7 @@ class SlaveRemote(object):
raise GsyncdError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv))
def rsync(self, files, *args):
+ """invoke rsync"""
if not files:
raise GsyncdError("no files to sync")
logging.debug("files: " + ", ".join(files))
@@ -342,6 +446,7 @@ class SlaveRemote(object):
class AbstractUrl(object):
+ """abstract base class for url scheme classes"""
def __init__(self, path, pattern):
m = re.search(pattern, path)
@@ -358,6 +463,7 @@ class AbstractUrl(object):
return self.path
def get_url(self, canonical=False, escaped=False):
+ """format self's url in various styles"""
if canonical:
pa = self.canonical_path()
else:
@@ -376,8 +482,15 @@ class AbstractUrl(object):
class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
+ """scheme class for file:// urls
+
+ can be used to represent a file slave server
+ on slave side, or interface to a remote file
+ file server on master side
+ """
class FILEServer(Server):
+ """included server flavor"""
pass
server = FILEServer
@@ -386,6 +499,7 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
sup(self, path, '^/')
def connect(self):
+ """inhibit the resource beyond"""
os.chdir(self.path)
def rsync(self, files):
@@ -393,11 +507,21 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
+ """scheme class for gluster:// urls
+
+ can be used to represent a gluster slave server
+ on slave side, or interface to a remote gluster
+ slave on master side, or to represent master
+ (slave-ish features come from the mixins, master
+ functionality is outsourced to GMaster from master)
+ """
class GLUSTERServer(Server):
+ "server enhancements for a glusterfs backend"""
@classmethod
def _attr_unpack_dict(cls, xattr, extra_fields = ''):
+ """generic volume mark fetching/parsing backed"""
fmt_string = cls.NTV_FMTSTR + extra_fields
buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))
vm = struct.unpack(fmt_string, buf)
@@ -415,6 +539,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
@classmethod
def foreign_volume_infos(cls):
+ """return list of valid (not expired) foreign volume marks"""
dict_list = []
xattr_list = Xattr.llistxattr_buf('.')
for ele in xattr_list:
@@ -434,6 +559,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
@classmethod
def native_volume_info(cls):
+ """get the native volume mark of the underlying gluster volume"""
try:
return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark']))
except OSError:
@@ -450,9 +576,17 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
return ':'.join([gethostbyname(self.host), self.volume])
def can_connect_to(self, remote):
+ """determine our position in the connectibility matrix"""
return True
def connect(self):
+ """inhibit the resource beyond
+
+ - create temprorary mount point
+ - call glusterfs to mount the volume over there
+ - change to mounted fs root
+ - lazy umount + delete temp. mount point
+ """
def umount_l(d):
po = Popen(['umount', '-l', d], stderr=subprocess.PIPE)
po.wait()
@@ -486,6 +620,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
self.slavedir = "/proc/%d/cwd" % self.server.pid()
def service_loop(self, *args):
+ """enter service loop
+
+ - if slave given, instantiate GMaster and
+ pass control to that instance, which implements
+ master behavior
+ - else do that's what's inherited
+ """
if args:
GMaster(self, args[0]).crawl_loop()
else:
@@ -496,6 +637,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
class SSH(AbstractUrl, SlaveRemote):
+ """scheme class for ssh:// urls
+
+ interface to remote slave on master side
+ implementing an ssh based proxy
+ """
def __init__(self, path):
self.remote_addr, inner_url = sup(self, path,
@@ -512,9 +658,16 @@ class SSH(AbstractUrl, SlaveRemote):
return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)])
def can_connect_to(self, remote):
+ """determine our position in the connectibility matrix"""
return False
def start_fd_client(self, *a, **opts):
+ """customizations for client startup
+
+ - be a no-op if we are to daemonize (client startup is deferred
+ to post-daemon stage)
+ - determine target url for rsync after consulting server
+ """
if opts.get('deferred'):
return a
sup(self, *a)
@@ -528,6 +681,23 @@ class SSH(AbstractUrl, SlaveRemote):
self.slaveurl = ':'.join([self.remote_addr, slavepath])
def connect_remote(self, go_daemon=None):
+ """connect to inner slave url through outer ssh url
+
+ Wrap the connecting utility in ssh.
+
+ Much care is put into daemonizing: in that case
+ ssh is started before daemonization, but
+ RePCe client is to be created after that (as ssh
+ interactive password auth would be defeated by
+ a daemonized ssh, while client should be present
+ only in the final process). In that case the action
+ is taken apart to two parts, this method is ivoked
+ once pre-daemon, once post-daemon. Use @go_daemon
+ to deiced what part to perform.
+
+ [NB. ATM gluster product does not makes use of interactive
+ authentication.]
+ """
if go_daemon == 'done':
return self.start_fd_client(*self.fd_pair)
gconf.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-'))
diff --git a/xlators/features/marker/utils/syncdaemon/syncdutils.py b/xlators/features/marker/utils/syncdaemon/syncdutils.py
index 35afe64e9..244e29628 100644
--- a/xlators/features/marker/utils/syncdaemon/syncdutils.py
+++ b/xlators/features/marker/utils/syncdaemon/syncdutils.py
@@ -19,9 +19,12 @@ except ImportError:
import urllib
def escape(s):
+ """the chosen flavor of string escaping, used all over
+ to turn whatever data to creatable representation"""
return urllib.quote_plus(s)
def unescape(s):
+ """inverse of .escape"""
return urllib.unquote_plus(s)
def norm(s):
@@ -59,6 +62,10 @@ def update_file(path, updater, merger = lambda f: True):
fx.close()
def grabfile(fname, content=None):
+ """open @fname + contest for its fcntl lock
+
+ @content: if given, set the file content to it
+ """
# damn those messy open() mode codes
fd = os.open(fname, os.O_CREAT|os.O_RDWR)
f = os.fdopen(fd, 'r+b', 0)
@@ -82,6 +89,7 @@ def grabfile(fname, content=None):
return f
def grabpidfile(fname=None, setpid=True):
+ """.grabfile customization for pid files"""
if not fname:
fname = gconf.pid_file
content = None
@@ -92,6 +100,10 @@ def grabpidfile(fname=None, setpid=True):
final_lock = Lock()
def finalize(*a, **kw):
+ """all those messy final steps we go trough upon termination
+
+ Do away with pidfile, ssh control dir and logging.
+ """
final_lock.acquire()
if getattr(gconf, 'pid_file', None):
rm_pidf = gconf.pid_file_owned
@@ -126,6 +138,12 @@ def finalize(*a, **kw):
os._exit(kw.get('exval', 0))
def log_raise_exception(excont):
+ """top-level exception handler
+
+ Try to some fancy things to cover up we face with an error.
+ Translate some weird sounding but well understood exceptions
+ into human-friendly lingo
+ """
is_filelog = False
for h in logging.getLogger().handlers:
fno = getattr(getattr(h, 'stream', None), 'fileno', None)
@@ -170,7 +188,12 @@ class FreeObject(object):
setattr(self, k, v)
class Thread(baseThread):
+ """thread class flavor for gsyncd
+ - always a daemon thread
+ - force exit for whole program if thread
+ function coughs up an exception
+ """
def __init__(self, *a, **kw):
tf = kw.get('target')
if tf: