diff options
Diffstat (limited to 'geo-replication/syncdaemon/syncdutils.py')
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 152 |
1 files changed, 151 insertions, 1 deletions
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 0764c0790..348eb38c1 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -5,12 +5,14 @@ import time import fcntl import shutil import logging +import socket from threading import Lock, Thread as baseThread -from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, errorcode +from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, ENOENT, EPERM, ESTALE, errorcode from signal import signal, SIGTERM, SIGKILL from time import sleep import select as oselect from os import waitpid as owaitpid + try: from cPickle import PickleError except ImportError: @@ -25,6 +27,16 @@ try: except ImportError: import urllib +try: + from hashlib import md5 as md5 +except ImportError: + # py 2.4 + from md5 import new as md5 + +# auxillary gfid based access prefix +_CL_AUX_GFID_PFX = ".gfid/" +GF_OP_RETRIES = 20 + def escape(s): """the chosen flavor of string escaping, used all over to turn whatever data to creatable representation""" @@ -68,6 +80,38 @@ def update_file(path, updater, merger = lambda f: True): if fx: fx.close() +def create_manifest(fname, content): + """ + Create manifest file for SSH Control Path + """ + fd = None + try: + fd = os.open(fname, os.O_CREAT|os.O_RDWR) + try: + os.write(fd, content) + except: + os.close(fd) + raise + finally: + if fd != None: + os.close(fd) + +def setup_ssh_ctl(ctld, remote_addr, resource_url): + """ + Setup GConf ssh control path parameters + """ + gconf.ssh_ctl_dir = ctld + content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr, + resource_url) + content_md5 = md5hex(content) + fname = os.path.join(gconf.ssh_ctl_dir, + "%s.mft" % content_md5) + + create_manifest(fname, content) + ssh_ctl_path = os.path.join(gconf.ssh_ctl_dir, + "%s.sock" % content_md5) + gconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path] + def grabfile(fname, content=None): """open @fname + contest for its fcntl lock @@ -286,3 +330,109 @@ def waitpid (*a): def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})): signal(SIGTERM, hook) + +def is_host_local(host): + locaddr = False + for ai in socket.getaddrinfo(host, None): + # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators/mgmt/glusterd/src/glusterd-utils.c#L125 + if ai[0] == socket.AF_INET: + if ai[-1][0].split(".")[0] == "127": + locaddr = True + break + elif ai[0] == socket.AF_INET6: + if ai[-1][0] == "::1": + locaddr = True + break + else: + continue + try: + # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue, + # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587 + s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP) + except socket.error: + ex = sys.exc_info()[1] + if ex.errno != EPERM: + raise + f = None + try: + f = open("/proc/sys/net/ipv4/ip_nonlocal_bind") + if int(f.read()) != 0: + raise GsyncdError( + "non-local bind is set and not allowed to create raw sockets, " + "cannot determine if %s is local" % host) + s = socket.socket(ai[0], socket.SOCK_DGRAM) + finally: + if f: + f.close() + try: + s.bind(ai[-1]) + locaddr = True + break + except: + pass + s.close() + return locaddr + +def funcode(f): + fc = getattr(f, 'func_code', None) + if not fc: + # python 3 + fc = f.__code__ + return fc + +def memoize(f): + fc = funcode(f) + fn = fc.co_name + def ff(self, *a, **kw): + rv = getattr(self, '_' + fn, None) + if rv == None: + rv = f(self, *a, **kw) + setattr(self, '_' + fn, rv) + return rv + return ff + +def umask(): + return os.umask(0) + +def entry2pb(e): + return e.rsplit('/', 1) + +def gauxpfx(): + return _CL_AUX_GFID_PFX + +def md5hex(s): + return md5(s).hexdigest() + +def selfkill(sig=SIGTERM): + os.kill(os.getpid(), sig) + +def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]): + """ wrapper around calls resilient to errnos. + retry in case of ESTALE by default. + """ + nr_tries = 0 + while True: + try: + return call(*arg) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in errnos: + return ex.errno + if not ex.errno in retry_errnos: + raise + nr_tries += 1 + if nr_tries == GF_OP_RETRIES: + # probably a screwed state, cannot do much... + logging.warn('reached maximum retries (%s)...' % repr(arg)) + return + time.sleep(0.250) # retry the call + +def lstat(e): + try: + return os.lstat(e) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + return ex.errno + else: + raise |
