diff options
Diffstat (limited to 'geo-replication/syncdaemon/syncdutils.py')
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 114 | 
1 files changed, 83 insertions, 31 deletions
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 1b5684c6d0c..822d919ecb1 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -1,3 +1,13 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# +  import os  import sys  import pwd @@ -7,9 +17,9 @@ import shutil  import logging  import socket  from threading import Lock, Thread as baseThread -from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, ENOENT, EPERM, ESTALE, errorcode -from signal import signal, SIGTERM, SIGKILL -from time import sleep +from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED +from errno import EINTR, ENOENT, EPERM, ESTALE, errorcode +from signal import signal, SIGTERM  import select as oselect  from os import waitpid as owaitpid @@ -37,25 +47,29 @@ except ImportError:  _CL_AUX_GFID_PFX = ".gfid/"  GF_OP_RETRIES = 20 +  def escape(s):      """the chosen flavor of string escaping, used all over         to turn whatever data to creatable representation"""      return urllib.quote_plus(s) +  def unescape(s):      """inverse of .escape"""      return urllib.unquote_plus(s) +  def norm(s):      if s:          return s.replace('-', '_') -def update_file(path, updater, merger = lambda f: True): + +def update_file(path, updater, merger=lambda f: True):      """update a file in a transaction-like manner"""      fr = fw = None      try: -        fd = os.open(path, os.O_CREAT|os.O_RDWR) +        fd = os.open(path, os.O_CREAT | os.O_RDWR)          try:              fr = os.fdopen(fd, 'r+b')          except: @@ -66,7 +80,7 @@ def update_file(path, updater, merger = lambda f: True):              return          tmpp = path + '.tmp.' + str(os.getpid()) -        fd = os.open(tmpp, os.O_CREAT|os.O_EXCL|os.O_WRONLY) +        fd = os.open(tmpp, os.O_CREAT | os.O_EXCL | os.O_WRONLY)          try:              fw = os.fdopen(fd, 'wb', 0)          except: @@ -80,29 +94,31 @@ def update_file(path, updater, merger = lambda f: True):              if fx:                  fx.close() +  def create_manifest(fname, content):      """      Create manifest file for SSH Control Path      """      fd = None      try: -        fd = os.open(fname, os.O_CREAT|os.O_RDWR) +        fd = os.open(fname, os.O_CREAT | os.O_RDWR)          try:              os.write(fd, content)          except:              os.close(fd)              raise      finally: -        if fd != None: +        if fd is not None:              os.close(fd) +  def setup_ssh_ctl(ctld, remote_addr, resource_url):      """      Setup GConf ssh control path parameters      """      gconf.ssh_ctl_dir = ctld      content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr, -                                                            resource_url) +                                                        resource_url)      content_md5 = md5hex(content)      fname = os.path.join(gconf.ssh_ctl_dir,                           "%s.mft" % content_md5) @@ -112,16 +128,17 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url):                                  "%s.sock" % content_md5)      gconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path] +  def grabfile(fname, content=None):      """open @fname + contest for its fcntl lock      @content: if given, set the file content to it      """      # damn those messy open() mode codes -    fd = os.open(fname, os.O_CREAT|os.O_RDWR) +    fd = os.open(fname, os.O_CREAT | os.O_RDWR)      f = os.fdopen(fd, 'r+b', 0)      try: -        fcntl.lockf(f, fcntl.LOCK_EX|fcntl.LOCK_NB) +        fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)      except:          ex = sys.exc_info()[1]          f.close() @@ -139,6 +156,7 @@ def grabfile(fname, content=None):      gconf.permanent_handles.append(f)      return f +  def grabpidfile(fname=None, setpid=True):      """.grabfile customization for pid files"""      if not fname: @@ -150,6 +168,7 @@ def grabpidfile(fname=None, setpid=True):  final_lock = Lock() +  def finalize(*a, **kw):      """all those messy final steps we go trough upon termination @@ -169,7 +188,7 @@ def finalize(*a, **kw):                  if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid:                      # child has terminated                      rm_pidf = True -                    break; +                    break                  time.sleep(0.1)          if rm_pidf:              try: @@ -194,6 +213,7 @@ def finalize(*a, **kw):      sys.stderr.flush()      os._exit(kw.get('exval', 0)) +  def log_raise_exception(excont):      """top-level exception handler @@ -218,20 +238,27 @@ def log_raise_exception(excont):                  logging.error(exc.args[0])              sys.stderr.write('failure: ' + exc.args[0] + '\n')          elif isinstance(exc, PickleError) or isinstance(exc, EOFError) or \ -             ((isinstance(exc, OSError) or isinstance(exc, IOError)) and \ -              exc.errno == EPIPE): +            ((isinstance(exc, OSError) or isinstance(exc, IOError)) and +             exc.errno == EPIPE):              logging.error('connection to peer is broken')              if hasattr(gconf, 'transport'):                  gconf.transport.wait()                  if gconf.transport.returncode == 127:                      logging.warn("!!!!!!!!!!!!!") -                    logging.warn('!!! getting "No such file or directory" errors ' -                                 "is most likely due to MISCONFIGURATION, please consult " -                                 "https://access.redhat.com/site/documentation/en-US/Red_Hat_Storage/2.1/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html") +                    logging.warn('!!! getting "No such file or directory" ' +                                 "errors is most likely due to " +                                 "MISCONFIGURATION" +                                 ", please consult https://access.redhat.com" +                                 "/site/documentation/en-US/Red_Hat_Storage" +                                 "/2.1/html/Administration_Guide" +                                 "/chap-User_Guide-Geo_Rep-Preparation-" +                                 "Settingup_Environment.html")                      logging.warn("!!!!!!!!!!!!!")                  gconf.transport.terminate_geterr() -        elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED): -            logging.error('glusterfs session went down [%s]', errorcode[exc.errno]) +        elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, +                                                        ECONNABORTED): +            logging.error('glusterfs session went down [%s]', +                          errorcode[exc.errno])          else:              logtag = "FAIL"          if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG): @@ -244,46 +271,54 @@ def log_raise_exception(excont):  class FreeObject(object): +      """wildcard class for which any attribute can be set"""      def __init__(self, **kw): -        for k,v in kw.items(): +        for k, v in kw.items():              setattr(self, k, v) +  class Thread(baseThread): +      """thread class flavor for gsyncd      - always a daemon thread      - force exit for whole program if thread        function coughs up an exception      """ +      def __init__(self, *a, **kw):          tf = kw.get('target')          if tf:              def twrap(*aa): -                excont = FreeObject(exval = 0) +                excont = FreeObject(exval=0)                  try:                      tf(*aa)                  except:                      try:                          log_raise_exception(excont)                      finally: -                        finalize(exval = excont.exval) +                        finalize(exval=excont.exval)              kw['target'] = twrap          baseThread.__init__(self, *a, **kw)          self.setDaemon(True) +  class GsyncdError(Exception):      pass -def getusername(uid = None): -    if uid == None: + +def getusername(uid=None): +    if uid is None:          uid = os.geteuid()      return pwd.getpwuid(uid).pw_name +  def privileged():      return os.geteuid() == 0 +  def boolify(s):      """      Generic string to boolean converter @@ -294,7 +329,7 @@ def boolify(s):      - False if it's in false_list      - Warn if it's not present in either and return False      """ -    true_list  = ['true', 'yes', '1', 'on'] +    true_list = ['true', 'yes', '1', 'on']      false_list = ['false', 'no', '0', 'off']      if isinstance(s, bool): @@ -305,10 +340,12 @@ def boolify(s):      if lstr in true_list:          rv = True      elif not lstr in false_list: -        logging.warn("Unknown string (%s) in string to boolean conversion defaulting to False\n" % (s)) +        logging.warn("Unknown string (%s) in string to boolean conversion " +                     "defaulting to False\n" % (s))      return rv +  def eintr_wrap(func, exc, *a):      """      wrapper around syscalls resilient to interrupt caused @@ -322,19 +359,24 @@ def eintr_wrap(func, exc, *a):              if not ex.args[0] == EINTR:                  raise +  def select(*a):      return eintr_wrap(oselect.select, oselect.error, *a) -def waitpid (*a): + +def waitpid(*a):      return eintr_wrap(owaitpid, OSError, *a) +  def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})):      signal(SIGTERM, hook) +  def is_host_local(host):      locaddr = False      for ai in socket.getaddrinfo(host, None): -        # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators/mgmt/glusterd/src/glusterd-utils.c#L125 +        # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators +        # /mgmt/glusterd/src/glusterd-utils.c#L125          if ai[0] == socket.AF_INET:              if ai[-1][0].split(".")[0] == "127":                  locaddr = True @@ -358,8 +400,8 @@ def is_host_local(host):                  f = open("/proc/sys/net/ipv4/ip_nonlocal_bind")                  if int(f.read()) != 0:                      raise GsyncdError( -                            "non-local bind is set and not allowed to create raw sockets, " -                            "cannot determine if %s is local" % host) +                        "non-local bind is set and not allowed to create " +                        "raw sockets, cannot determine if %s is local" % host)                  s = socket.socket(ai[0], socket.SOCK_DGRAM)              finally:                  if f: @@ -373,6 +415,7 @@ def is_host_local(host):          s.close()      return locaddr +  def funcode(f):      fc = getattr(f, 'func_code', None)      if not fc: @@ -380,32 +423,40 @@ def funcode(f):          fc = f.__code__      return fc +  def memoize(f):      fc = funcode(f)      fn = fc.co_name +      def ff(self, *a, **kw):          rv = getattr(self, '_' + fn, None) -        if rv == None: +        if rv is None:              rv = f(self, *a, **kw)              setattr(self, '_' + fn, rv)          return rv      return ff +  def umask():      return os.umask(0) +  def entry2pb(e):      return e.rsplit('/', 1) +  def gauxpfx():      return _CL_AUX_GFID_PFX +  def md5hex(s):      return md5(s).hexdigest() +  def selfkill(sig=SIGTERM):      os.kill(os.getpid(), sig) +  def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):      """ wrapper around calls resilient to errnos.      retry in case of ESTALE by default. @@ -427,6 +478,7 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):                  return              time.sleep(0.250)  # retry the call +  def lstat(e):      try:          return os.lstat(e)  | 
