diff options
Diffstat (limited to 'geo-replication/syncdaemon/repce.py')
-rw-r--r-- | geo-replication/syncdaemon/repce.py | 225 |
1 files changed, 225 insertions, 0 deletions
diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py new file mode 100644 index 00000000000..755fb61df48 --- /dev/null +++ b/geo-replication/syncdaemon/repce.py @@ -0,0 +1,225 @@ +import os +import sys +import time +import logging +from threading import Condition +try: + import thread +except ImportError: + # py 3 + import _thread as thread +try: + from Queue import Queue +except ImportError: + # py 3 + from queue import Queue +try: + import cPickle as pickle +except ImportError: + # py 3 + import pickle + +from syncdutils import Thread, select + +pickle_proto = -1 +repce_version = 1.0 + +def ioparse(i, o): + if isinstance(i, int): + i = os.fdopen(i) + # rely on duck typing for recognizing + # streams as that works uniformly + # in py2 and py3 + if hasattr(o, 'fileno'): + o = o.fileno() + return (i, o) + +def send(out, *args): + """pickle args and write out wholly in one syscall + + ie. not use the ability of pickle to dump directly to + a stream, as that would potentially mess up messages + by interleaving them + """ + os.write(out, pickle.dumps(args, pickle_proto)) + +def recv(inf): + """load an object from input stream""" + return pickle.load(inf) + + +class RepceServer(object): + """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce + + ... also our homebrewed RPC backend where the transport layer is + reduced to a pair of filehandles. + + This is the server component. + """ + + def __init__(self, obj, i, o, wnum=6): + """register a backend object .obj to which incoming messages + are dispatched, also incoming/outcoming streams + """ + self.obj = obj + self.inf, self.out = ioparse(i, o) + self.wnum = wnum + self.q = Queue() + + def service_loop(self): + """fire up worker threads, get messages and dispatch among them""" + for i in range(self.wnum): + t = Thread(target=self.worker) + t.start() + try: + while True: + self.q.put(recv(self.inf)) + except EOFError: + logging.info("terminating on reaching EOF.") + + def worker(self): + """life of a worker + + Get message, extract its id, method name and arguments + (kwargs not supported), call method on .obj. + Send back message id + return value. + If method call throws an exception, rescue it, and send + back the exception as result (with flag marking it as + exception). + """ + while True: + in_data = self.q.get(True) + rid = in_data[0] + rmeth = in_data[1] + exc = False + if rmeth == '__repce_version__': + res = repce_version + else: + try: + res = getattr(self.obj, rmeth)(*in_data[2:]) + except: + res = sys.exc_info()[1] + exc = True + logging.exception("call failed: ") + send(self.out, rid, exc, res) + + +class RepceJob(object): + """class representing message status we can use + for waiting on reply""" + + def __init__(self, cbk): + """ + - .rid: (process-wise) unique id + - .cbk: what we do upon receiving reply + """ + self.rid = (os.getpid(), thread.get_ident(), time.time()) + self.cbk = cbk + self.lever = Condition() + self.done = False + + def __repr__(self): + return ':'.join([str(x) for x in self.rid]) + + def wait(self): + self.lever.acquire() + if not self.done: + self.lever.wait() + self.lever.release() + return self.result + + def wakeup(self, data): + self.result = data + self.lever.acquire() + self.done = True + self.lever.notify() + self.lever.release() + + +class RepceClient(object): + """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce + + ... also our homebrewed RPC backend where the transport layer is + reduced to a pair of filehandles. + + This is the client component. + """ + + def __init__(self, i, o): + self.inf, self.out = ioparse(i, o) + self.jtab = {} + t = Thread(target = self.listen) + t.start() + + def listen(self): + while True: + select((self.inf,), (), ()) + rid, exc, res = recv(self.inf) + rjob = self.jtab.pop(rid) + if rjob.cbk: + rjob.cbk(rjob, [exc, res]) + + def push(self, meth, *args, **kw): + """wrap arguments in a RepceJob, send them to server + and return the RepceJob + + @cbk to pass on RepceJob can be given as kwarg. + """ + cbk = kw.get('cbk') + if not cbk: + def cbk(rj, res): + if res[0]: + raise res[1] + rjob = RepceJob(cbk) + self.jtab[rjob.rid] = rjob + logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args))) + send(self.out, rjob.rid, meth, *args) + return rjob + + def __call__(self, meth, *args): + """RePCe client is callabe, calling it implements a synchronous remote call + + We do a .push with a cbk which does a wakeup upon receiving anwser, then wait + on the RepceJob. + """ + rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) + exc, res = rjob.wait() + if exc: + logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__))) + raise res + logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res))) + return res + + class mprx(object): + """method proxy, standard trick to implement rubyesque method_missing + in Python + + A class is a closure factory, you know what I mean, or go read some SICP. + """ + def __init__(self, ins, meth): + self.ins = ins + self.meth = meth + + def __call__(self, *a): + return self.ins(self.meth, *a) + + def __getattr__(self, meth): + """this implements transparent method dispatch to remote object, + so that you don't need to call the RepceClient instance like + + rclient('how_old_are_you_if_born_in', 1979) + + but you can make it into an ordinary method call like + + rclient.how_old_are_you_if_born_in(1979) + """ + return self.mprx(self, meth) + + def __version__(self): + """used in handshake to verify compatibility""" + d = {'proto': self('__repce_version__')} + try: + d['object'] = self('version') + except AttributeError: + pass + return d |