diff options
author | Aravinda VK <avishwan@redhat.com> | 2017-06-21 12:56:14 +0530 |
---|---|---|
committer | Amar Tumballi <amarts@redhat.com> | 2017-11-15 05:20:08 +0000 |
commit | 705ec055040268f876d04fe5743a6ce4738d6e02 (patch) | |
tree | 47841519fd8415e31777418a94575cfae06b4bb8 /geo-replication/syncdaemon/syncdutils.py | |
parent | da825da9501bcb51656e82cda3c21a2ef592c5e2 (diff) |
geo-rep: Refactoring Config and Arguments parsing
- Fixed Python pep8 issues
- Removed dead code
- Rewritten configuration management
- Rewritten Arguments/subcommands handling
- Added Args upgrade to accommodate all these changes without changing
glusterd code
- use of md5 removed, which was used to hash the brick path for workdir
Both Master and Slave nodes will have subdir for session in the
format "<mastervol>_<primary_slave_host>_<slavevol>
$GLUSTER_LOGDIR/geo-replication/<mastervol>_<primary_slave_host>_<slavevol>
$GLUSTER_LOGDIR/geo-replication-slaves/<mastervol>_<primary_slave_host>_<slavevol>
Log file paths renamed since session info is available with directory
name itself.
$LOG_DIR_MASTER/
- gsyncd.log - Gsyncd, Worker monitor logs
- mnt-<brick-path>.log - Aux mount logs, mounted by each worker
- changes-<brick-path>.log - Changelog related logs(One per brick)
$LOG_DIR_SLAVE/
- gsyncd.log - Slave Gsyncd logs
- mnt-<master-node>-<master-brick-path>.log - Aux mount logs,
mounted for each connection from master-node:master-brick
- mnt-mbr-<master-node>-<master-brick-path>.log - Same as above,
but mountbroker setup
Fixes: #73
Change-Id: I2ec2a21e4e2a92fd92899d026e8543725276f021
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/syncdutils.py')
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 121 |
1 files changed, 50 insertions, 71 deletions
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index e611b7b6ae5..bc03522fdda 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -15,19 +15,19 @@ import time import fcntl import shutil import logging -import socket import errno import threading import subprocess from subprocess import PIPE from threading import Lock, Thread as baseThread from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED -from errno import EINTR, ENOENT, EPERM, ESTALE, EBUSY, errorcode +from errno import EINTR, ENOENT, ESTALE, EBUSY, errorcode from signal import signal, SIGTERM import select as oselect from os import waitpid as owaitpid import xml.etree.ElementTree as XET from select import error as SelectError +from cPickle import PickleError from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE sys.path.insert(1, GLUSTERFS_LIBEXECDIR) @@ -46,25 +46,10 @@ except ImportError: EVENT_GEOREP_PASSIVE = None EVENT_GEOREP_CHECKPOINT_COMPLETED = None -try: - from cPickle import PickleError -except ImportError: - # py 3 - from pickle import PickleError - -from gconf import gconf +import gsyncdconfig as gconf +from rconf import rconf -try: - # py 3 - from urllib import parse as urllib -except ImportError: - import urllib - -try: - from hashlib import md5 as md5 -except ImportError: - # py 2.4 - from md5 import new as md5 +from hashlib import md5 as md5 # auxiliary gfid based access prefix _CL_AUX_GFID_PFX = ".gfid/" @@ -80,6 +65,10 @@ SPACE_ESCAPE_CHAR = "%20" NEWLINE_ESCAPE_CHAR = "%0A" PERCENTAGE_ESCAPE_CHAR = "%25" +final_lock = Lock() + +mntpt_list = [] + def sup(x, *a, **kw): """a rubyesque "super" for python ;) @@ -93,12 +82,7 @@ def sup(x, *a, **kw): def escape(s): """the chosen flavor of string escaping, used all over to turn whatever data to creatable representation""" - return urllib.quote_plus(s) - - -def unescape(s): - """inverse of .escape""" - return urllib.unquote_plus(s) + return s.replace("/", "-").strip("-") def escape_space_newline(s): @@ -170,17 +154,17 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url): """ Setup GConf ssh control path parameters """ - gconf.ssh_ctl_dir = ctld + rconf.ssh_ctl_dir = ctld content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr, resource_url) content_md5 = md5hex(content) - fname = os.path.join(gconf.ssh_ctl_dir, + fname = os.path.join(rconf.ssh_ctl_dir, "%s.mft" % content_md5) create_manifest(fname, content) - ssh_ctl_path = os.path.join(gconf.ssh_ctl_dir, + ssh_ctl_path = os.path.join(rconf.ssh_ctl_dir, "%s.sock" % content_md5) - gconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path] + rconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path] def grabfile(fname, content=None): @@ -207,32 +191,30 @@ def grabfile(fname, content=None): except: f.close() raise - gconf.permanent_handles.append(f) + rconf.permanent_handles.append(f) return f def grabpidfile(fname=None, setpid=True): """.grabfile customization for pid files""" if not fname: - fname = gconf.pid_file + fname = gconf.get("pid-file") content = None if setpid: content = str(os.getpid()) + '\n' return grabfile(fname, content=content) -final_lock = Lock() -mntpt_list = [] -def finalize(*a, **kw): +def finalize(*args, **kwargs): """all those messy final steps we go trough upon termination Do away with pidfile, ssh control dir and logging. """ final_lock.acquire() - if getattr(gconf, 'pid_file', None): - rm_pidf = gconf.pid_file_owned - if gconf.cpid: + if gconf.get('pid_file'): + rm_pidf = rconf.pid_file_owned + if rconf.cpid: # exit path from parent branch of daemonization rm_pidf = False while True: @@ -240,37 +222,31 @@ def finalize(*a, **kw): if not f: # child has already taken over pidfile break - if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid: + if os.waitpid(rconf.cpid, os.WNOHANG)[0] == rconf.cpid: # child has terminated rm_pidf = True break time.sleep(0.1) if rm_pidf: try: - os.unlink(gconf.pid_file) + os.unlink(rconf.pid_file) except: ex = sys.exc_info()[1] if ex.errno == ENOENT: pass else: raise - if gconf.ssh_ctl_dir and not gconf.cpid: + if rconf.ssh_ctl_dir and not rconf.cpid: def handle_rm_error(func, path, exc_info): if exc_info[1].errno == ENOENT: return raise exc_info[1] - shutil.rmtree(gconf.ssh_ctl_dir, onerror=handle_rm_error) - if getattr(gconf, 'state_socket', None): - try: - os.unlink(gconf.state_socket) - except: - if sys.exc_info()[0] == OSError: - pass + shutil.rmtree(rconf.ssh_ctl_dir, onerror=handle_rm_error) """ Unmount if not done """ for mnt in mntpt_list: - p0 = subprocess.Popen (["umount", "-l", mnt], stderr=subprocess.PIPE) + p0 = subprocess.Popen(["umount", "-l", mnt], stderr=subprocess.PIPE) _, errdata = p0.communicate() if p0.returncode == 0: try: @@ -280,12 +256,11 @@ def finalize(*a, **kw): else: pass - if gconf.log_exit: + if rconf.log_exit: logging.info("exiting.") sys.stdout.flush() sys.stderr.flush() - os._exit(kw.get('exval', 0)) - + os._exit(kwargs.get('exval', 0)) def log_raise_exception(excont): @@ -315,9 +290,9 @@ def log_raise_exception(excont): ((isinstance(exc, OSError) or isinstance(exc, IOError)) and exc.errno == EPIPE): logging.error('connection to peer is broken') - if hasattr(gconf, 'transport'): - gconf.transport.wait() - if gconf.transport.returncode == 127: + if hasattr(rconf, 'transport'): + rconf.transport.wait() + if rconf.transport.returncode == 127: logging.error("getting \"No such file or directory\"" "errors is most likely due to " "MISCONFIGURATION, please remove all " @@ -331,7 +306,7 @@ def log_raise_exception(excont): "<SLAVEVOL> config remote-gsyncd " "<GSYNCD_PATH> (Example GSYNCD_PATH: " "`/usr/libexec/glusterfs/gsyncd`)") - gconf.transport.terminate_geterr() + rconf.transport.terminate_geterr() elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED): logging.error(lf('glusterfs session went down', @@ -365,20 +340,20 @@ class Thread(baseThread): function coughs up an exception """ - def __init__(self, *a, **kw): - tf = kw.get('target') + def __init__(self, *args, **kwargs): + tf = kwargs.get('target') if tf: - def twrap(*aa): + def twrap(*aargs): excont = FreeObject(exval=0) try: - tf(*aa) + tf(*aargs) except: try: log_raise_exception(excont) finally: finalize(exval=excont.exval) - kw['target'] = twrap - baseThread.__init__(self, *a, **kw) + kwargs['target'] = twrap + baseThread.__init__(self, *args, **kwargs) self.setDaemon(True) @@ -443,7 +418,7 @@ def boolify(s): lstr = s.lower() if lstr in true_list: rv = True - elif not lstr in false_list: + elif lstr not in false_list: logging.warn(lf("Unknown string in \"string to boolean\" conversion, " "defaulting to False", str=s)) @@ -451,29 +426,33 @@ def boolify(s): return rv -def eintr_wrap(func, exc, *a): +def eintr_wrap(func, exc, *args): """ wrapper around syscalls resilient to interrupt caused by signals """ while True: try: - return func(*a) + return func(*args) except exc: ex = sys.exc_info()[1] if not ex.args[0] == EINTR: raise -def select(*a): - return eintr_wrap(oselect.select, oselect.error, *a) +def select(*args): + return eintr_wrap(oselect.select, oselect.error, *args) + + +def waitpid(*args): + return eintr_wrap(owaitpid, OSError, *args) -def waitpid(*a): - return eintr_wrap(owaitpid, OSError, *a) +def term_handler_default_hook(signum, frame): + finalize(signum, frame, exval=1) -def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})): +def set_term_handler(hook=term_handler_default_hook): signal(SIGTERM, hook) @@ -550,7 +529,7 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]): ex = sys.exc_info()[1] if ex.errno in errnos: return ex.errno - if not ex.errno in retry_errnos: + if ex.errno not in retry_errnos: raise nr_tries += 1 if nr_tries == GF_OP_RETRIES: |