diff options
Diffstat (limited to 'geo-replication/syncdaemon/syncdutils.py')
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 121 | 
1 files changed, 50 insertions, 71 deletions
| diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index e611b7b6ae5..bc03522fdda 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -15,19 +15,19 @@ import time  import fcntl  import shutil  import logging -import socket  import errno  import threading  import subprocess  from subprocess import PIPE  from threading import Lock, Thread as baseThread  from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED -from errno import EINTR, ENOENT, EPERM, ESTALE, EBUSY, errorcode +from errno import EINTR, ENOENT, ESTALE, EBUSY, errorcode  from signal import signal, SIGTERM  import select as oselect  from os import waitpid as owaitpid  import xml.etree.ElementTree as XET  from select import error as SelectError +from cPickle import PickleError  from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE  sys.path.insert(1, GLUSTERFS_LIBEXECDIR) @@ -46,25 +46,10 @@ except ImportError:      EVENT_GEOREP_PASSIVE = None      EVENT_GEOREP_CHECKPOINT_COMPLETED = None -try: -    from cPickle import PickleError -except ImportError: -    # py 3 -    from pickle import PickleError - -from gconf import gconf +import gsyncdconfig as gconf +from rconf import rconf -try: -    # py 3 -    from urllib import parse as urllib -except ImportError: -    import urllib - -try: -    from hashlib import md5 as md5 -except ImportError: -    # py 2.4 -    from md5 import new as md5 +from hashlib import md5 as md5  # auxiliary gfid based access prefix  _CL_AUX_GFID_PFX = ".gfid/" @@ -80,6 +65,10 @@ SPACE_ESCAPE_CHAR = "%20"  NEWLINE_ESCAPE_CHAR = "%0A"  PERCENTAGE_ESCAPE_CHAR = "%25" +final_lock = Lock() + +mntpt_list = [] +  def sup(x, *a, **kw):      """a rubyesque "super" for python ;) @@ -93,12 +82,7 @@ def sup(x, *a, **kw):  def escape(s):      """the chosen flavor of string escaping, used all over         to turn whatever data to creatable representation""" -    return urllib.quote_plus(s) - - -def unescape(s): -    """inverse of .escape""" -    return urllib.unquote_plus(s) +    return s.replace("/", "-").strip("-")  def escape_space_newline(s): @@ -170,17 +154,17 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url):      """      Setup GConf ssh control path parameters      """ -    gconf.ssh_ctl_dir = ctld +    rconf.ssh_ctl_dir = ctld      content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr,                                                          resource_url)      content_md5 = md5hex(content) -    fname = os.path.join(gconf.ssh_ctl_dir, +    fname = os.path.join(rconf.ssh_ctl_dir,                           "%s.mft" % content_md5)      create_manifest(fname, content) -    ssh_ctl_path = os.path.join(gconf.ssh_ctl_dir, +    ssh_ctl_path = os.path.join(rconf.ssh_ctl_dir,                                  "%s.sock" % content_md5) -    gconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path] +    rconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path]  def grabfile(fname, content=None): @@ -207,32 +191,30 @@ def grabfile(fname, content=None):          except:              f.close()              raise -    gconf.permanent_handles.append(f) +    rconf.permanent_handles.append(f)      return f  def grabpidfile(fname=None, setpid=True):      """.grabfile customization for pid files"""      if not fname: -        fname = gconf.pid_file +        fname = gconf.get("pid-file")      content = None      if setpid:          content = str(os.getpid()) + '\n'      return grabfile(fname, content=content) -final_lock = Lock() -mntpt_list = [] -def finalize(*a, **kw): +def finalize(*args, **kwargs):      """all those messy final steps we go trough upon termination      Do away with pidfile, ssh control dir and logging.      """      final_lock.acquire() -    if getattr(gconf, 'pid_file', None): -        rm_pidf = gconf.pid_file_owned -        if gconf.cpid: +    if gconf.get('pid_file'): +        rm_pidf = rconf.pid_file_owned +        if rconf.cpid:              # exit path from parent branch of daemonization              rm_pidf = False              while True: @@ -240,37 +222,31 @@ def finalize(*a, **kw):                  if not f:                      # child has already taken over pidfile                      break -                if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid: +                if os.waitpid(rconf.cpid, os.WNOHANG)[0] == rconf.cpid:                      # child has terminated                      rm_pidf = True                      break                  time.sleep(0.1)          if rm_pidf:              try: -                os.unlink(gconf.pid_file) +                os.unlink(rconf.pid_file)              except:                  ex = sys.exc_info()[1]                  if ex.errno == ENOENT:                      pass                  else:                      raise -    if gconf.ssh_ctl_dir and not gconf.cpid: +    if rconf.ssh_ctl_dir and not rconf.cpid:          def handle_rm_error(func, path, exc_info):              if exc_info[1].errno == ENOENT:                  return              raise exc_info[1] -        shutil.rmtree(gconf.ssh_ctl_dir, onerror=handle_rm_error) -    if getattr(gconf, 'state_socket', None): -        try: -            os.unlink(gconf.state_socket) -        except: -            if sys.exc_info()[0] == OSError: -                pass +        shutil.rmtree(rconf.ssh_ctl_dir, onerror=handle_rm_error)      """ Unmount if not done """      for mnt in mntpt_list: -        p0 = subprocess.Popen (["umount", "-l", mnt], stderr=subprocess.PIPE) +        p0 = subprocess.Popen(["umount", "-l", mnt], stderr=subprocess.PIPE)          _, errdata = p0.communicate()          if p0.returncode == 0:              try: @@ -280,12 +256,11 @@ def finalize(*a, **kw):          else:              pass -    if gconf.log_exit: +    if rconf.log_exit:          logging.info("exiting.")      sys.stdout.flush()      sys.stderr.flush() -    os._exit(kw.get('exval', 0)) - +    os._exit(kwargs.get('exval', 0))  def log_raise_exception(excont): @@ -315,9 +290,9 @@ def log_raise_exception(excont):              ((isinstance(exc, OSError) or isinstance(exc, IOError)) and               exc.errno == EPIPE):              logging.error('connection to peer is broken') -            if hasattr(gconf, 'transport'): -                gconf.transport.wait() -                if gconf.transport.returncode == 127: +            if hasattr(rconf, 'transport'): +                rconf.transport.wait() +                if rconf.transport.returncode == 127:                      logging.error("getting \"No such file or directory\""                                    "errors is most likely due to "                                    "MISCONFIGURATION, please remove all " @@ -331,7 +306,7 @@ def log_raise_exception(excont):                                    "<SLAVEVOL> config remote-gsyncd "                                    "<GSYNCD_PATH> (Example GSYNCD_PATH: "                                    "`/usr/libexec/glusterfs/gsyncd`)") -                gconf.transport.terminate_geterr() +                rconf.transport.terminate_geterr()          elif isinstance(exc, OSError) and exc.errno in (ENOTCONN,                                                          ECONNABORTED):              logging.error(lf('glusterfs session went down', @@ -365,20 +340,20 @@ class Thread(baseThread):        function coughs up an exception      """ -    def __init__(self, *a, **kw): -        tf = kw.get('target') +    def __init__(self, *args, **kwargs): +        tf = kwargs.get('target')          if tf: -            def twrap(*aa): +            def twrap(*aargs):                  excont = FreeObject(exval=0)                  try: -                    tf(*aa) +                    tf(*aargs)                  except:                      try:                          log_raise_exception(excont)                      finally:                          finalize(exval=excont.exval) -            kw['target'] = twrap -        baseThread.__init__(self, *a, **kw) +            kwargs['target'] = twrap +        baseThread.__init__(self, *args, **kwargs)          self.setDaemon(True) @@ -443,7 +418,7 @@ def boolify(s):      lstr = s.lower()      if lstr in true_list:          rv = True -    elif not lstr in false_list: +    elif lstr not in false_list:          logging.warn(lf("Unknown string in \"string to boolean\" conversion, "                          "defaulting to False",                          str=s)) @@ -451,29 +426,33 @@ def boolify(s):      return rv -def eintr_wrap(func, exc, *a): +def eintr_wrap(func, exc, *args):      """      wrapper around syscalls resilient to interrupt caused      by signals      """      while True:          try: -            return func(*a) +            return func(*args)          except exc:              ex = sys.exc_info()[1]              if not ex.args[0] == EINTR:                  raise -def select(*a): -    return eintr_wrap(oselect.select, oselect.error, *a) +def select(*args): +    return eintr_wrap(oselect.select, oselect.error, *args) + + +def waitpid(*args): +    return eintr_wrap(owaitpid, OSError, *args) -def waitpid(*a): -    return eintr_wrap(owaitpid, OSError, *a) +def term_handler_default_hook(signum, frame): +    finalize(signum, frame, exval=1) -def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})): +def set_term_handler(hook=term_handler_default_hook):      signal(SIGTERM, hook) @@ -550,7 +529,7 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):              ex = sys.exc_info()[1]              if ex.errno in errnos:                  return ex.errno -            if not ex.errno in retry_errnos: +            if ex.errno not in retry_errnos:                  raise              nr_tries += 1              if nr_tries == GF_OP_RETRIES: | 
