diff options
Diffstat (limited to 'geo-replication/syncdaemon/syncdutils.py')
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 438 |
1 files changed, 438 insertions, 0 deletions
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py new file mode 100644 index 000000000..348eb38c1 --- /dev/null +++ b/geo-replication/syncdaemon/syncdutils.py @@ -0,0 +1,438 @@ +import os +import sys +import pwd +import time +import fcntl +import shutil +import logging +import socket +from threading import Lock, Thread as baseThread +from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, ENOENT, EPERM, ESTALE, errorcode +from signal import signal, SIGTERM, SIGKILL +from time import sleep +import select as oselect +from os import waitpid as owaitpid + +try: + from cPickle import PickleError +except ImportError: + # py 3 + from pickle import PickleError + +from gconf import gconf + +try: + # py 3 + from urllib import parse as urllib +except ImportError: + import urllib + +try: + from hashlib import md5 as md5 +except ImportError: + # py 2.4 + from md5 import new as md5 + +# auxillary gfid based access prefix +_CL_AUX_GFID_PFX = ".gfid/" +GF_OP_RETRIES = 20 + +def escape(s): + """the chosen flavor of string escaping, used all over + to turn whatever data to creatable representation""" + return urllib.quote_plus(s) + +def unescape(s): + """inverse of .escape""" + return urllib.unquote_plus(s) + +def norm(s): + if s: + return s.replace('-', '_') + +def update_file(path, updater, merger = lambda f: True): + """update a file in a transaction-like manner""" + + fr = fw = None + try: + fd = os.open(path, os.O_CREAT|os.O_RDWR) + try: + fr = os.fdopen(fd, 'r+b') + except: + os.close(fd) + raise + fcntl.lockf(fr, fcntl.LOCK_EX) + if not merger(fr): + return + + tmpp = path + '.tmp.' + str(os.getpid()) + fd = os.open(tmpp, os.O_CREAT|os.O_EXCL|os.O_WRONLY) + try: + fw = os.fdopen(fd, 'wb', 0) + except: + os.close(fd) + raise + updater(fw) + os.fsync(fd) + os.rename(tmpp, path) + finally: + for fx in (fr, fw): + if fx: + fx.close() + +def create_manifest(fname, content): + """ + Create manifest file for SSH Control Path + """ + fd = None + try: + fd = os.open(fname, os.O_CREAT|os.O_RDWR) + try: + os.write(fd, content) + except: + os.close(fd) + raise + finally: + if fd != None: + os.close(fd) + +def setup_ssh_ctl(ctld, remote_addr, resource_url): + """ + Setup GConf ssh control path parameters + """ + gconf.ssh_ctl_dir = ctld + content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr, + resource_url) + content_md5 = md5hex(content) + fname = os.path.join(gconf.ssh_ctl_dir, + "%s.mft" % content_md5) + + create_manifest(fname, content) + ssh_ctl_path = os.path.join(gconf.ssh_ctl_dir, + "%s.sock" % content_md5) + gconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path] + +def grabfile(fname, content=None): + """open @fname + contest for its fcntl lock + + @content: if given, set the file content to it + """ + # damn those messy open() mode codes + fd = os.open(fname, os.O_CREAT|os.O_RDWR) + f = os.fdopen(fd, 'r+b', 0) + try: + fcntl.lockf(f, fcntl.LOCK_EX|fcntl.LOCK_NB) + except: + ex = sys.exc_info()[1] + f.close() + if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): + # cannot grab, it's taken + return + raise + if content: + try: + f.truncate() + f.write(content) + except: + f.close() + raise + gconf.permanent_handles.append(f) + return f + +def grabpidfile(fname=None, setpid=True): + """.grabfile customization for pid files""" + if not fname: + fname = gconf.pid_file + content = None + if setpid: + content = str(os.getpid()) + '\n' + return grabfile(fname, content=content) + +final_lock = Lock() + +def finalize(*a, **kw): + """all those messy final steps we go trough upon termination + + Do away with pidfile, ssh control dir and logging. + """ + final_lock.acquire() + if getattr(gconf, 'pid_file', None): + rm_pidf = gconf.pid_file_owned + if gconf.cpid: + # exit path from parent branch of daemonization + rm_pidf = False + while True: + f = grabpidfile(setpid=False) + if not f: + # child has already taken over pidfile + break + if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid: + # child has terminated + rm_pidf = True + break; + time.sleep(0.1) + if rm_pidf: + try: + os.unlink(gconf.pid_file) + except: + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + pass + else: + raise + if gconf.ssh_ctl_dir and not gconf.cpid: + shutil.rmtree(gconf.ssh_ctl_dir) + if getattr(gconf, 'state_socket', None): + try: + os.unlink(gconf.state_socket) + except: + if sys.exc_info()[0] == OSError: + pass + if gconf.log_exit: + logging.info("exiting.") + sys.stdout.flush() + sys.stderr.flush() + os._exit(kw.get('exval', 0)) + +def log_raise_exception(excont): + """top-level exception handler + + Try to some fancy things to cover up we face with an error. + Translate some weird sounding but well understood exceptions + into human-friendly lingo + """ + is_filelog = False + for h in logging.getLogger().handlers: + fno = getattr(getattr(h, 'stream', None), 'fileno', None) + if fno and not os.isatty(fno()): + is_filelog = True + + exc = sys.exc_info()[1] + if isinstance(exc, SystemExit): + excont.exval = exc.code or 0 + raise + else: + logtag = None + if isinstance(exc, GsyncdError): + if is_filelog: + logging.error(exc.args[0]) + sys.stderr.write('failure: ' + exc.args[0] + '\n') + elif isinstance(exc, PickleError) or isinstance(exc, EOFError) or \ + ((isinstance(exc, OSError) or isinstance(exc, IOError)) and \ + exc.errno == EPIPE): + logging.error('connection to peer is broken') + if hasattr(gconf, 'transport'): + gconf.transport.wait() + if gconf.transport.returncode == 127: + logging.warn("!!!!!!!!!!!!!") + logging.warn('!!! getting "No such file or directory" errors ' + "is most likely due to MISCONFIGURATION, please consult " + "http://access.redhat.com/knowledge/docs/en-US/Red_Hat_Storage/2.0/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html") + logging.warn("!!!!!!!!!!!!!") + gconf.transport.terminate_geterr() + elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED): + logging.error('glusterfs session went down [%s]', errorcode[exc.errno]) + else: + logtag = "FAIL" + if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG): + logtag = "FULL EXCEPTION TRACE" + if logtag: + logging.exception(logtag + ": ") + sys.stderr.write("failed with %s.\n" % type(exc).__name__) + excont.exval = 1 + sys.exit(excont.exval) + + +class FreeObject(object): + """wildcard class for which any attribute can be set""" + + def __init__(self, **kw): + for k,v in kw.items(): + setattr(self, k, v) + +class Thread(baseThread): + """thread class flavor for gsyncd + + - always a daemon thread + - force exit for whole program if thread + function coughs up an exception + """ + def __init__(self, *a, **kw): + tf = kw.get('target') + if tf: + def twrap(*aa): + excont = FreeObject(exval = 0) + try: + tf(*aa) + except: + try: + log_raise_exception(excont) + finally: + finalize(exval = excont.exval) + kw['target'] = twrap + baseThread.__init__(self, *a, **kw) + self.setDaemon(True) + +class GsyncdError(Exception): + pass + +def getusername(uid = None): + if uid == None: + uid = os.geteuid() + return pwd.getpwuid(uid).pw_name + +def privileged(): + return os.geteuid() == 0 + +def boolify(s): + """ + Generic string to boolean converter + + return + - Quick return if string 's' is of type bool + - True if it's in true_list + - False if it's in false_list + - Warn if it's not present in either and return False + """ + true_list = ['true', 'yes', '1', 'on'] + false_list = ['false', 'no', '0', 'off'] + + if isinstance(s, bool): + return s + + rv = False + lstr = s.lower() + if lstr in true_list: + rv = True + elif not lstr in false_list: + logging.warn("Unknown string (%s) in string to boolean conversion defaulting to False\n" % (s)) + + return rv + +def eintr_wrap(func, exc, *a): + """ + wrapper around syscalls resilient to interrupt caused + by signals + """ + while True: + try: + return func(*a) + except exc: + ex = sys.exc_info()[1] + if not ex.args[0] == EINTR: + raise + +def select(*a): + return eintr_wrap(oselect.select, oselect.error, *a) + +def waitpid (*a): + return eintr_wrap(owaitpid, OSError, *a) + +def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})): + signal(SIGTERM, hook) + +def is_host_local(host): + locaddr = False + for ai in socket.getaddrinfo(host, None): + # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators/mgmt/glusterd/src/glusterd-utils.c#L125 + if ai[0] == socket.AF_INET: + if ai[-1][0].split(".")[0] == "127": + locaddr = True + break + elif ai[0] == socket.AF_INET6: + if ai[-1][0] == "::1": + locaddr = True + break + else: + continue + try: + # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue, + # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587 + s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP) + except socket.error: + ex = sys.exc_info()[1] + if ex.errno != EPERM: + raise + f = None + try: + f = open("/proc/sys/net/ipv4/ip_nonlocal_bind") + if int(f.read()) != 0: + raise GsyncdError( + "non-local bind is set and not allowed to create raw sockets, " + "cannot determine if %s is local" % host) + s = socket.socket(ai[0], socket.SOCK_DGRAM) + finally: + if f: + f.close() + try: + s.bind(ai[-1]) + locaddr = True + break + except: + pass + s.close() + return locaddr + +def funcode(f): + fc = getattr(f, 'func_code', None) + if not fc: + # python 3 + fc = f.__code__ + return fc + +def memoize(f): + fc = funcode(f) + fn = fc.co_name + def ff(self, *a, **kw): + rv = getattr(self, '_' + fn, None) + if rv == None: + rv = f(self, *a, **kw) + setattr(self, '_' + fn, rv) + return rv + return ff + +def umask(): + return os.umask(0) + +def entry2pb(e): + return e.rsplit('/', 1) + +def gauxpfx(): + return _CL_AUX_GFID_PFX + +def md5hex(s): + return md5(s).hexdigest() + +def selfkill(sig=SIGTERM): + os.kill(os.getpid(), sig) + +def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]): + """ wrapper around calls resilient to errnos. + retry in case of ESTALE by default. + """ + nr_tries = 0 + while True: + try: + return call(*arg) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in errnos: + return ex.errno + if not ex.errno in retry_errnos: + raise + nr_tries += 1 + if nr_tries == GF_OP_RETRIES: + # probably a screwed state, cannot do much... + logging.warn('reached maximum retries (%s)...' % repr(arg)) + return + time.sleep(0.250) # retry the call + +def lstat(e): + try: + return os.lstat(e) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + return ex.errno + else: + raise |
