diff options
Diffstat (limited to 'geo-replication/syncdaemon/gsyncd.py')
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 292 |
1 files changed, 185 insertions, 107 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 6eb62c6b076..426d964de95 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -1,4 +1,13 @@ #!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# import os import os.path @@ -6,25 +15,27 @@ import glob import sys import time import logging -import signal import shutil import optparse import fcntl import fnmatch from optparse import OptionParser, SUPPRESS_HELP from logging import Logger, handlers -from errno import EEXIST, ENOENT +from errno import ENOENT from ipaddr import IPAddress, IPNetwork from gconf import gconf -from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception -from syncdutils import GsyncdError, select, set_term_handler, privileged, update_file +from syncdutils import FreeObject, norm, grabpidfile, finalize +from syncdutils import log_raise_exception, privileged, update_file +from syncdutils import GsyncdError, select, set_term_handler from configinterface import GConffile, upgrade_config_file import resource from monitor import monitor + class GLogger(Logger): + """Logger customizations for gsyncd. It implements a log format similar to that of glusterfs. @@ -51,7 +62,8 @@ class GLogger(Logger): if lbl: lbl = '(' + lbl + ')' lprm = {'datefmt': "%Y-%m-%d %H:%M:%S", - 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"} + 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + + lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"} lprm.update(kw) lvl = kw.get('level', logging.INFO) lprm['level'] = lvl @@ -64,7 +76,7 @@ class GLogger(Logger): try: logging_handler = handlers.WatchedFileHandler(lprm['filename']) formatter = logging.Formatter(fmt=lprm['format'], - datefmt=lprm['datefmt']) + datefmt=lprm['datefmt']) logging_handler.setFormatter(formatter) logging.getLogger().addHandler(logging_handler) except AttributeError: @@ -96,6 +108,7 @@ class GLogger(Logger): gconf.log_metadata = lkw gconf.log_exit = True + def startup(**kw): """set up logging, pidfile grabbing, daemonization""" if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn': @@ -144,14 +157,15 @@ def main(): gconf.starttime = time.time() set_term_handler() GLogger.setup() - excont = FreeObject(exval = 0) + excont = FreeObject(exval=0) try: try: main_i() except: log_raise_exception(excont) finally: - finalize(exval = excont.exval) + finalize(exval=excont.exval) + def main_i(): """internal main routine @@ -171,50 +185,71 @@ def main_i(): if val and val != '-': val = os.path.abspath(val) setattr(parser.values, opt.dest, val) + def store_local(opt, optstr, val, parser): rconf[opt.dest] = val + def store_local_curry(val): return lambda o, oo, vx, p: store_local(o, oo, val, p) + def store_local_obj(op, dmake): - return lambda o, oo, vx, p: store_local(o, oo, FreeObject(op=op, **dmake(vx)), p) - - op = OptionParser(usage="%prog [options...] <master> <slave>", version="%prog 0.0.1") - op.add_option('--gluster-command-dir', metavar='DIR', default='') - op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs) - op.add_option('--gluster-log-level', metavar='LVL') - op.add_option('--gluster-params', metavar='PRMS', default='') - op.add_option('--glusterd-uuid', metavar='UUID', type=str, default='', help=SUPPRESS_HELP) - op.add_option('--gluster-cli-options', metavar='OPTS', default='--log-file=-') - op.add_option('--mountbroker', metavar='LABEL') - op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs) - op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs) - op.add_option('--log-file-mbr', metavar='LOGF', type=str, action='callback', callback=store_abs) - op.add_option('--state-file', metavar='STATF', type=str, action='callback', callback=store_abs) - op.add_option('--state-detail-file', metavar='STATF', type=str, action='callback', callback=store_abs) - op.add_option('--georep-session-working-dir', metavar='STATF', type=str, action='callback', callback=store_abs) - op.add_option('--ignore-deletes', default=False, action='store_true') - op.add_option('--isolated-slave', default=False, action='store_true') - op.add_option('--use-rsync-xattrs', default=False, action='store_true') - op.add_option('-L', '--log-level', metavar='LVL') - op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0])) - op.add_option('--volume-id', metavar='UUID') - op.add_option('--slave-id', metavar='ID') - op.add_option('--session-owner', metavar='ID') - op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='') - op.add_option('--local-path', metavar='PATH', help=SUPPRESS_HELP, default='') - op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh') - op.add_option('--ssh-command-tar', metavar='CMD', default='ssh') - op.add_option('--rsync-command', metavar='CMD', default='rsync') - op.add_option('--rsync-options', metavar='OPTS', default='') - op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress') - op.add_option('--timeout', metavar='SEC', type=int, default=120) - op.add_option('--connection-timeout', metavar='SEC', type=int, default=60, help=SUPPRESS_HELP) - op.add_option('--sync-jobs', metavar='N', type=int, default=3) - op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) - op.add_option('--allow-network', metavar='IPS', default='') - op.add_option('--socketdir', metavar='DIR') - op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs) - op.add_option('--checkpoint', metavar='LABEL', default='') + return lambda o, oo, vx, p: store_local( + o, oo, FreeObject(op=op, **dmake(vx)), p) + + op = OptionParser( + usage="%prog [options...] <master> <slave>", version="%prog 0.0.1") + op.add_option('--gluster-command-dir', metavar='DIR', default='') + op.add_option('--gluster-log-file', metavar='LOGF', + default=os.devnull, type=str, action='callback', + callback=store_abs) + op.add_option('--gluster-log-level', metavar='LVL') + op.add_option('--gluster-params', metavar='PRMS', default='') + op.add_option( + '--glusterd-uuid', metavar='UUID', type=str, default='', + help=SUPPRESS_HELP) + op.add_option( + '--gluster-cli-options', metavar='OPTS', default='--log-file=-') + op.add_option('--mountbroker', metavar='LABEL') + op.add_option('-p', '--pid-file', metavar='PIDF', type=str, + action='callback', callback=store_abs) + op.add_option('-l', '--log-file', metavar='LOGF', type=str, + action='callback', callback=store_abs) + op.add_option('--log-file-mbr', metavar='LOGF', type=str, + action='callback', callback=store_abs) + op.add_option('--state-file', metavar='STATF', type=str, + action='callback', callback=store_abs) + op.add_option('--state-detail-file', metavar='STATF', + type=str, action='callback', callback=store_abs) + op.add_option('--georep-session-working-dir', metavar='STATF', + type=str, action='callback', callback=store_abs) + op.add_option('--ignore-deletes', default=False, action='store_true') + op.add_option('--isolated-slave', default=False, action='store_true') + op.add_option('--use-rsync-xattrs', default=False, action='store_true') + op.add_option('-L', '--log-level', metavar='LVL') + op.add_option('-r', '--remote-gsyncd', metavar='CMD', + default=os.path.abspath(sys.argv[0])) + op.add_option('--volume-id', metavar='UUID') + op.add_option('--slave-id', metavar='ID') + op.add_option('--session-owner', metavar='ID') + op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='') + op.add_option( + '--local-path', metavar='PATH', help=SUPPRESS_HELP, default='') + op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh') + op.add_option('--ssh-command-tar', metavar='CMD', default='ssh') + op.add_option('--rsync-command', metavar='CMD', default='rsync') + op.add_option('--rsync-options', metavar='OPTS', default='') + op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress') + op.add_option('--timeout', metavar='SEC', type=int, default=120) + op.add_option('--connection-timeout', metavar='SEC', + type=int, default=60, help=SUPPRESS_HELP) + op.add_option('--sync-jobs', metavar='N', type=int, default=3) + op.add_option( + '--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) + op.add_option('--allow-network', metavar='IPS', default='') + op.add_option('--socketdir', metavar='DIR') + op.add_option('--state-socket-unencoded', metavar='SOCKF', + type=str, action='callback', callback=store_abs) + op.add_option('--checkpoint', metavar='LABEL', default='') # tunables for failover/failback mechanism: # None - gsyncd behaves as normal # blind - gsyncd works with xtime pairs to identify @@ -225,56 +260,86 @@ def main_i(): op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP) # changelog or xtime? (TODO: Change the default) - op.add_option('--change-detector', metavar='MODE', type=str, default='xtime') - # sleep interval for change detection (xtime crawl uses a hardcoded 1 second sleep time) + op.add_option( + '--change-detector', metavar='MODE', type=str, default='xtime') + # sleep interval for change detection (xtime crawl uses a hardcoded 1 + # second sleep time) op.add_option('--change-interval', metavar='SEC', type=int, default=3) # working directory for changelog based mechanism - op.add_option('--working-dir', metavar='DIR', type=str, action='callback', callback=store_abs) + op.add_option('--working-dir', metavar='DIR', type=str, + action='callback', callback=store_abs) op.add_option('--use-tarssh', default=False, action='store_true') - op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local) + op.add_option('-c', '--config-file', metavar='CONF', + type=str, action='callback', callback=store_local) # duh. need to specify dest or value will be mapped to None :S - op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True)) - op.add_option('--resource-local', dest='resource_local', type=str, action='callback', callback=store_local) - op.add_option('--resource-remote', dest='resource_remote', type=str, action='callback', callback=store_local) - op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local) - op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True)) - op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont')) - op.add_option('--verify', type=str, dest="verify", action='callback', callback=store_local) - op.add_option('--create', type=str, dest="create", action='callback', callback=store_local) - op.add_option('--delete', dest='delete', action='callback', callback=store_local_curry(True)) - op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a), - setattr(a[-1].values, 'log_file', '-'), - setattr(a[-1].values, 'log_level', 'DEBUG'))), + op.add_option('--monitor', dest='monitor', action='callback', + callback=store_local_curry(True)) + op.add_option('--resource-local', dest='resource_local', + type=str, action='callback', callback=store_local) + op.add_option('--resource-remote', dest='resource_remote', + type=str, action='callback', callback=store_local) + op.add_option('--feedback-fd', dest='feedback_fd', type=int, + help=SUPPRESS_HELP, action='callback', callback=store_local) + op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, + action='callback', callback=store_local_curry(True)) + op.add_option('-N', '--no-daemon', dest="go_daemon", + action='callback', callback=store_local_curry('dont')) + op.add_option('--verify', type=str, dest="verify", + action='callback', callback=store_local) + op.add_option('--create', type=str, dest="create", + action='callback', callback=store_local) + op.add_option('--delete', dest='delete', action='callback', + callback=store_local_curry(True)) + op.add_option('--debug', dest="go_daemon", action='callback', + callback=lambda *a: (store_local_curry('dont')(*a), + setattr( + a[-1].values, 'log_file', '-'), + setattr(a[-1].values, 'log_level', + 'DEBUG'))), op.add_option('--path', type=str, action='append') for a in ('check', 'get'): - op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', action='callback', + op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', + action='callback', callback=store_local_obj(a, lambda vx: {'opt': vx})) - op.add_option('--config-get-all', dest='config', action='callback', callback=store_local_obj('get', lambda vx: {'opt': None})) + op.add_option('--config-get-all', dest='config', action='callback', + callback=store_local_obj('get', lambda vx: {'opt': None})) for m in ('', '-rx', '-glob'): # call this code 'Pythonic' eh? - # have to define a one-shot local function to be able to inject (a value depending on the) + # have to define a one-shot local function to be able + # to inject (a value depending on the) # iteration variable into the inner lambda def conf_mod_opt_regex_variant(rx): - op.add_option('--config-set' + m, metavar='OPT VAL', type=str, nargs=2, dest='config', action='callback', - callback=store_local_obj('set', lambda vx: {'opt': vx[0], 'val': vx[1], 'rx': rx})) - op.add_option('--config-del' + m, metavar='OPT', type=str, dest='config', action='callback', - callback=store_local_obj('del', lambda vx: {'opt': vx, 'rx': rx})) + op.add_option('--config-set' + m, metavar='OPT VAL', type=str, + nargs=2, dest='config', action='callback', + callback=store_local_obj('set', lambda vx: { + 'opt': vx[0], 'val': vx[1], 'rx': rx})) + op.add_option('--config-del' + m, metavar='OPT', type=str, + dest='config', action='callback', + callback=store_local_obj('del', lambda vx: { + 'opt': vx, 'rx': rx})) conf_mod_opt_regex_variant(m and m[1:] or False) - op.add_option('--normalize-url', dest='url_print', action='callback', callback=store_local_curry('normal')) - op.add_option('--canonicalize-url', dest='url_print', action='callback', callback=store_local_curry('canon')) - op.add_option('--canonicalize-escape-url', dest='url_print', action='callback', callback=store_local_curry('canon_esc')) - - tunables = [ norm(o.get_opt_string()[2:]) for o in op.option_list if o.callback in (store_abs, 'store_true', None) and o.get_opt_string() not in ('--version', '--help') ] - remote_tunables = [ 'listen', 'go_daemon', 'timeout', 'session_owner', 'config_file', 'use_rsync_xattrs' ] - rq_remote_tunables = { 'listen': True } - - # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults - # -- for this to work out we need to tell apart defaults from explicitly set - # options... so churn out the defaults here and call the parser with virgin - # values container. + op.add_option('--normalize-url', dest='url_print', + action='callback', callback=store_local_curry('normal')) + op.add_option('--canonicalize-url', dest='url_print', + action='callback', callback=store_local_curry('canon')) + op.add_option('--canonicalize-escape-url', dest='url_print', + action='callback', callback=store_local_curry('canon_esc')) + + tunables = [norm(o.get_opt_string()[2:]) + for o in op.option_list + if (o.callback in (store_abs, 'store_true', None) and + o.get_opt_string() not in ('--version', '--help'))] + remote_tunables = ['listen', 'go_daemon', 'timeout', + 'session_owner', 'config_file', 'use_rsync_xattrs'] + rq_remote_tunables = {'listen': True} + + # precedence for sources of values: 1) commandline, 2) cfg file, 3) + # defaults for this to work out we need to tell apart defaults from + # explicitly set options... so churn out the defaults here and call + # the parser with virgin values container. defaults = op.get_default_values() opts, args = op.parse_args(values=optparse.Values()) args_orig = args[:] @@ -291,9 +356,9 @@ def main_i(): args.append(None) args[1] = r confdata = rconf.get('config') - if not (len(args) == 2 or \ - (len(args) == 1 and rconf.get('listen')) or \ - (len(args) <= 2 and confdata) or \ + if not (len(args) == 2 or + (len(args) == 1 and rconf.get('listen')) or + (len(args) <= 2 and confdata) or rconf.get('url_print')): sys.stderr.write("error: incorrect number of arguments\n\n") sys.stderr.write(op.get_usage() + "\n") @@ -301,8 +366,8 @@ def main_i(): verify = rconf.get('verify') if verify: - logging.info (verify) - logging.info ("Able to spawn gsyncd.py") + logging.info(verify) + logging.info("Able to spawn gsyncd.py") return restricted = os.getenv('_GSYNCD_RESTRICTED_') @@ -313,14 +378,17 @@ def main_i(): allopts.update(rconf) bannedtuns = set(allopts.keys()) - set(remote_tunables) if bannedtuns: - raise GsyncdError('following tunables cannot be set with restricted SSH invocaton: ' + \ + raise GsyncdError('following tunables cannot be set with ' + 'restricted SSH invocaton: ' + ', '.join(bannedtuns)) for k, v in rq_remote_tunables.items(): if not k in allopts or allopts[k] != v: - raise GsyncdError('tunable %s is not set to value %s required for restricted SSH invocaton' % \ + raise GsyncdError('tunable %s is not set to value %s required ' + 'for restricted SSH invocaton' % (k, v)) confrx = getattr(confdata, 'rx', None) + def makersc(aa, check=True): if not aa: return ([], None, None) @@ -330,12 +398,13 @@ def main_i(): if len(ra) > 1: remote = ra[1] if check and not local.can_connect_to(remote): - raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path)) + raise GsyncdError("%s cannot work with %s" % + (local.path, remote and remote.path)) return (ra, local, remote) if confrx: # peers are regexen, don't try to parse them if confrx == 'glob': - args = [ '\A' + fnmatch.translate(a) for a in args ] + args = ['\A' + fnmatch.translate(a) for a in args] canon_peers = args namedict = {} else: @@ -345,21 +414,24 @@ def main_i(): for r in rscs: print(r.get_url(**{'normal': {}, 'canon': {'canonical': True}, - 'canon_esc': {'canonical': True, 'escaped': True}}[dc])) + 'canon_esc': {'canonical': True, + 'escaped': True}}[dc])) return pa = ([], [], []) - urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True}) + urlprms = ( + {}, {'canonical': True}, {'canonical': True, 'escaped': True}) for x in rscs: for i in range(len(pa)): pa[i].append(x.get_url(**urlprms[i])) _, canon_peers, canon_esc_peers = pa - # creating the namedict, a dict representing various ways of referring to / repreenting - # peers to be fillable in config templates - mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:]) + # creating the namedict, a dict representing various ways of referring + # to / repreenting peers to be fillable in config templates + mods = (lambda x: x, lambda x: x[ + 0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:]) if remote: - rmap = { local: ('local', 'master'), remote: ('remote', 'slave') } + rmap = {local: ('local', 'master'), remote: ('remote', 'slave')} else: - rmap = { local: ('local', 'slave') } + rmap = {local: ('local', 'slave')} namedict = {} for i in range(len(rscs)): x = rscs[i] @@ -370,10 +442,13 @@ def main_i(): if name == 'remote': namedict['remotehost'] = x.remotehost if not 'config_file' in rconf: - rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd_template.conf") + rconf['config_file'] = os.path.join( + os.path.dirname(sys.argv[0]), "conf/gsyncd_template.conf") upgrade_config_file(rconf['config_file']) - gcnf = GConffile(rconf['config_file'], canon_peers, defaults.__dict__, opts.__dict__, namedict) + gcnf = GConffile( + rconf['config_file'], canon_peers, + defaults.__dict__, opts.__dict__, namedict) checkpoint_change = False if confdata: @@ -407,7 +482,7 @@ def main_i(): delete = rconf.get('delete') if delete: - logging.info ('geo-replication delete') + logging.info('geo-replication delete') # Delete pid file, status file, socket file cleanup_paths = [] if getattr(gconf, 'pid_file', None): @@ -422,7 +497,7 @@ def main_i(): if getattr(gconf, 'state_socket_unencoded', None): cleanup_paths.append(gconf.state_socket_unencoded) - cleanup_paths.append(rconf['config_file'][:-11] + "*"); + cleanup_paths.append(rconf['config_file'][:-11] + "*") # Cleanup changelog working dirs if getattr(gconf, 'working_dir', None): @@ -432,7 +507,9 @@ def main_i(): if sys.exc_info()[1].errno == ENOENT: pass else: - raise GsyncdError('Error while removing working dir: %s' % gconf.working_dir) + raise GsyncdError( + 'Error while removing working dir: %s' % + gconf.working_dir) for path in cleanup_paths: # To delete temp files @@ -443,10 +520,11 @@ def main_i(): if restricted and gconf.allow_network: ssh_conn = os.getenv('SSH_CONNECTION') if not ssh_conn: - #legacy env var + # legacy env var ssh_conn = os.getenv('SSH_CLIENT') if ssh_conn: - allowed_networks = [ IPNetwork(a) for a in gconf.allow_network.split(',') ] + allowed_networks = [IPNetwork(a) + for a in gconf.allow_network.split(',')] client_ip = IPAddress(ssh_conn.split()[0]) allowed = False for nw in allowed_networks: @@ -460,7 +538,7 @@ def main_i(): if ffd: fcntl.fcntl(ffd, fcntl.F_SETFD, fcntl.FD_CLOEXEC) - #normalize loglevel + # normalize loglevel lvl0 = gconf.log_level if isinstance(lvl0, str): lvl1 = lvl0.upper() @@ -519,7 +597,7 @@ def main_i(): if be_monitor: label = 'monitor' elif remote: - #master + # master label = gconf.local_path else: label = 'slave' |