diff options
Diffstat (limited to 'geo-replication/syncdaemon/gsyncd.py')
| -rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 292 | 
1 files changed, 185 insertions, 107 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 6eb62c6b0..426d964de 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -1,4 +1,13 @@  #!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +#  import os  import os.path @@ -6,25 +15,27 @@ import glob  import sys  import time  import logging -import signal  import shutil  import optparse  import fcntl  import fnmatch  from optparse import OptionParser, SUPPRESS_HELP  from logging import Logger, handlers -from errno import EEXIST, ENOENT +from errno import ENOENT  from ipaddr import IPAddress, IPNetwork  from gconf import gconf -from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception -from syncdutils import GsyncdError, select, set_term_handler, privileged, update_file +from syncdutils import FreeObject, norm, grabpidfile, finalize +from syncdutils import log_raise_exception, privileged, update_file +from syncdutils import GsyncdError, select, set_term_handler  from configinterface import GConffile, upgrade_config_file  import resource  from monitor import monitor +  class GLogger(Logger): +      """Logger customizations for gsyncd.      It implements a log format similar to that of glusterfs. @@ -51,7 +62,8 @@ class GLogger(Logger):          if lbl:              lbl = '(' + lbl + ')'          lprm = {'datefmt': "%Y-%m-%d %H:%M:%S", -                'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"} +                'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + +                lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"}          lprm.update(kw)          lvl = kw.get('level', logging.INFO)          lprm['level'] = lvl @@ -64,7 +76,7 @@ class GLogger(Logger):              try:                  logging_handler = handlers.WatchedFileHandler(lprm['filename'])                  formatter = logging.Formatter(fmt=lprm['format'], -                datefmt=lprm['datefmt']) +                                              datefmt=lprm['datefmt'])                  logging_handler.setFormatter(formatter)                  logging.getLogger().addHandler(logging_handler)              except AttributeError: @@ -96,6 +108,7 @@ class GLogger(Logger):          gconf.log_metadata = lkw          gconf.log_exit = True +  def startup(**kw):      """set up logging, pidfile grabbing, daemonization"""      if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn': @@ -144,14 +157,15 @@ def main():      gconf.starttime = time.time()      set_term_handler()      GLogger.setup() -    excont = FreeObject(exval = 0) +    excont = FreeObject(exval=0)      try:          try:              main_i()          except:              log_raise_exception(excont)      finally: -        finalize(exval = excont.exval) +        finalize(exval=excont.exval) +  def main_i():      """internal main routine @@ -171,50 +185,71 @@ def main_i():          if val and val != '-':              val = os.path.abspath(val)          setattr(parser.values, opt.dest, val) +      def store_local(opt, optstr, val, parser):          rconf[opt.dest] = val +      def store_local_curry(val):          return lambda o, oo, vx, p: store_local(o, oo, val, p) +      def store_local_obj(op, dmake): -        return lambda o, oo, vx, p: store_local(o, oo, FreeObject(op=op, **dmake(vx)), p) - -    op = OptionParser(usage="%prog [options...] <master> <slave>", version="%prog 0.0.1") -    op.add_option('--gluster-command-dir', metavar='DIR',   default='') -    op.add_option('--gluster-log-file',    metavar='LOGF',  default=os.devnull, type=str, action='callback', callback=store_abs) -    op.add_option('--gluster-log-level',   metavar='LVL') -    op.add_option('--gluster-params',      metavar='PRMS',  default='') -    op.add_option('--glusterd-uuid',       metavar='UUID',  type=str, default='', help=SUPPRESS_HELP) -    op.add_option('--gluster-cli-options', metavar='OPTS',  default='--log-file=-') -    op.add_option('--mountbroker',         metavar='LABEL') -    op.add_option('-p', '--pid-file',      metavar='PIDF',  type=str, action='callback', callback=store_abs) -    op.add_option('-l', '--log-file',      metavar='LOGF',  type=str, action='callback', callback=store_abs) -    op.add_option('--log-file-mbr',        metavar='LOGF',  type=str, action='callback', callback=store_abs) -    op.add_option('--state-file',          metavar='STATF', type=str, action='callback', callback=store_abs) -    op.add_option('--state-detail-file',   metavar='STATF', type=str, action='callback', callback=store_abs) -    op.add_option('--georep-session-working-dir', metavar='STATF', type=str, action='callback', callback=store_abs) -    op.add_option('--ignore-deletes',      default=False, action='store_true') -    op.add_option('--isolated-slave',      default=False, action='store_true') -    op.add_option('--use-rsync-xattrs',    default=False, action='store_true') -    op.add_option('-L', '--log-level',     metavar='LVL') -    op.add_option('-r', '--remote-gsyncd', metavar='CMD',   default=os.path.abspath(sys.argv[0])) -    op.add_option('--volume-id',           metavar='UUID') -    op.add_option('--slave-id',            metavar='ID') -    op.add_option('--session-owner',       metavar='ID') -    op.add_option('--local-id',            metavar='ID',    help=SUPPRESS_HELP, default='') -    op.add_option('--local-path',          metavar='PATH',  help=SUPPRESS_HELP, default='') -    op.add_option('-s', '--ssh-command',   metavar='CMD',   default='ssh') -    op.add_option('--ssh-command-tar',     metavar='CMD',   default='ssh') -    op.add_option('--rsync-command',       metavar='CMD',   default='rsync') -    op.add_option('--rsync-options',       metavar='OPTS',  default='') -    op.add_option('--rsync-ssh-options',   metavar='OPTS',  default='--compress') -    op.add_option('--timeout',             metavar='SEC',   type=int, default=120) -    op.add_option('--connection-timeout',  metavar='SEC',   type=int, default=60, help=SUPPRESS_HELP) -    op.add_option('--sync-jobs',           metavar='N',     type=int, default=3) -    op.add_option('--turns',               metavar='N',     type=int, default=0, help=SUPPRESS_HELP) -    op.add_option('--allow-network',       metavar='IPS',   default='') -    op.add_option('--socketdir',           metavar='DIR') -    op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs) -    op.add_option('--checkpoint',          metavar='LABEL', default='') +        return lambda o, oo, vx, p: store_local( +            o, oo, FreeObject(op=op, **dmake(vx)), p) + +    op = OptionParser( +        usage="%prog [options...] <master> <slave>", version="%prog 0.0.1") +    op.add_option('--gluster-command-dir', metavar='DIR', default='') +    op.add_option('--gluster-log-file', metavar='LOGF', +                  default=os.devnull, type=str, action='callback', +                  callback=store_abs) +    op.add_option('--gluster-log-level', metavar='LVL') +    op.add_option('--gluster-params', metavar='PRMS', default='') +    op.add_option( +        '--glusterd-uuid', metavar='UUID', type=str, default='', +        help=SUPPRESS_HELP) +    op.add_option( +        '--gluster-cli-options', metavar='OPTS', default='--log-file=-') +    op.add_option('--mountbroker', metavar='LABEL') +    op.add_option('-p', '--pid-file', metavar='PIDF', type=str, +                  action='callback', callback=store_abs) +    op.add_option('-l', '--log-file', metavar='LOGF', type=str, +                  action='callback', callback=store_abs) +    op.add_option('--log-file-mbr', metavar='LOGF', type=str, +                  action='callback', callback=store_abs) +    op.add_option('--state-file', metavar='STATF', type=str, +                  action='callback', callback=store_abs) +    op.add_option('--state-detail-file', metavar='STATF', +                  type=str, action='callback', callback=store_abs) +    op.add_option('--georep-session-working-dir', metavar='STATF', +                  type=str, action='callback', callback=store_abs) +    op.add_option('--ignore-deletes', default=False, action='store_true') +    op.add_option('--isolated-slave', default=False, action='store_true') +    op.add_option('--use-rsync-xattrs', default=False, action='store_true') +    op.add_option('-L', '--log-level', metavar='LVL') +    op.add_option('-r', '--remote-gsyncd', metavar='CMD', +                  default=os.path.abspath(sys.argv[0])) +    op.add_option('--volume-id', metavar='UUID') +    op.add_option('--slave-id', metavar='ID') +    op.add_option('--session-owner', metavar='ID') +    op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='') +    op.add_option( +        '--local-path', metavar='PATH', help=SUPPRESS_HELP, default='') +    op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh') +    op.add_option('--ssh-command-tar', metavar='CMD', default='ssh') +    op.add_option('--rsync-command', metavar='CMD', default='rsync') +    op.add_option('--rsync-options', metavar='OPTS', default='') +    op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress') +    op.add_option('--timeout', metavar='SEC', type=int, default=120) +    op.add_option('--connection-timeout', metavar='SEC', +                  type=int, default=60, help=SUPPRESS_HELP) +    op.add_option('--sync-jobs', metavar='N', type=int, default=3) +    op.add_option( +        '--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) +    op.add_option('--allow-network', metavar='IPS', default='') +    op.add_option('--socketdir', metavar='DIR') +    op.add_option('--state-socket-unencoded', metavar='SOCKF', +                  type=str, action='callback', callback=store_abs) +    op.add_option('--checkpoint', metavar='LABEL', default='')      # tunables for failover/failback mechanism:      # None   - gsyncd behaves as normal      # blind  - gsyncd works with xtime pairs to identify @@ -225,56 +260,86 @@ def main_i():      op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP)      # changelog or xtime? (TODO: Change the default) -    op.add_option('--change-detector', metavar='MODE', type=str, default='xtime') -    # sleep interval for change detection (xtime crawl uses a hardcoded 1 second sleep time) +    op.add_option( +        '--change-detector', metavar='MODE', type=str, default='xtime') +    # sleep interval for change detection (xtime crawl uses a hardcoded 1 +    # second sleep time)      op.add_option('--change-interval', metavar='SEC', type=int, default=3)      # working directory for changelog based mechanism -    op.add_option('--working-dir', metavar='DIR', type=str, action='callback', callback=store_abs) +    op.add_option('--working-dir', metavar='DIR', type=str, +                  action='callback', callback=store_abs)      op.add_option('--use-tarssh', default=False, action='store_true') -    op.add_option('-c', '--config-file',   metavar='CONF',  type=str, action='callback', callback=store_local) +    op.add_option('-c', '--config-file', metavar='CONF', +                  type=str, action='callback', callback=store_local)      # duh. need to specify dest or value will be mapped to None :S -    op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True)) -    op.add_option('--resource-local', dest='resource_local', type=str, action='callback', callback=store_local) -    op.add_option('--resource-remote', dest='resource_remote', type=str, action='callback', callback=store_local) -    op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local) -    op.add_option('--listen', dest='listen', help=SUPPRESS_HELP,      action='callback', callback=store_local_curry(True)) -    op.add_option('-N', '--no-daemon', dest="go_daemon",    action='callback', callback=store_local_curry('dont')) -    op.add_option('--verify', type=str, dest="verify", action='callback', callback=store_local) -    op.add_option('--create', type=str, dest="create", action='callback', callback=store_local) -    op.add_option('--delete', dest='delete', action='callback', callback=store_local_curry(True)) -    op.add_option('--debug', dest="go_daemon",              action='callback', callback=lambda *a: (store_local_curry('dont')(*a), -                                                                                                    setattr(a[-1].values, 'log_file', '-'), -                                                                                                    setattr(a[-1].values, 'log_level', 'DEBUG'))), +    op.add_option('--monitor', dest='monitor', action='callback', +                  callback=store_local_curry(True)) +    op.add_option('--resource-local', dest='resource_local', +                  type=str, action='callback', callback=store_local) +    op.add_option('--resource-remote', dest='resource_remote', +                  type=str, action='callback', callback=store_local) +    op.add_option('--feedback-fd', dest='feedback_fd', type=int, +                  help=SUPPRESS_HELP, action='callback', callback=store_local) +    op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, +                  action='callback', callback=store_local_curry(True)) +    op.add_option('-N', '--no-daemon', dest="go_daemon", +                  action='callback', callback=store_local_curry('dont')) +    op.add_option('--verify', type=str, dest="verify", +                  action='callback', callback=store_local) +    op.add_option('--create', type=str, dest="create", +                  action='callback', callback=store_local) +    op.add_option('--delete', dest='delete', action='callback', +                  callback=store_local_curry(True)) +    op.add_option('--debug', dest="go_daemon", action='callback', +                  callback=lambda *a: (store_local_curry('dont')(*a), +                                       setattr( +                                           a[-1].values, 'log_file', '-'), +                                       setattr(a[-1].values, 'log_level', +                                               'DEBUG'))),      op.add_option('--path', type=str, action='append')      for a in ('check', 'get'): -        op.add_option('--config-' + a,      metavar='OPT',  type=str, dest='config', action='callback', +        op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', +                      action='callback',                        callback=store_local_obj(a, lambda vx: {'opt': vx})) -    op.add_option('--config-get-all', dest='config', action='callback', callback=store_local_obj('get', lambda vx: {'opt': None})) +    op.add_option('--config-get-all', dest='config', action='callback', +                  callback=store_local_obj('get', lambda vx: {'opt': None}))      for m in ('', '-rx', '-glob'):          # call this code 'Pythonic' eh? -        # have to define a one-shot local function to be able to inject (a value depending on the) +        # have to define a one-shot local function to be able +        # to inject (a value depending on the)          # iteration variable into the inner lambda          def conf_mod_opt_regex_variant(rx): -            op.add_option('--config-set' + m,   metavar='OPT VAL', type=str, nargs=2, dest='config', action='callback', -                          callback=store_local_obj('set', lambda vx: {'opt': vx[0], 'val': vx[1], 'rx': rx})) -            op.add_option('--config-del' + m,   metavar='OPT',  type=str, dest='config', action='callback', -                          callback=store_local_obj('del', lambda vx: {'opt': vx, 'rx': rx})) +            op.add_option('--config-set' + m, metavar='OPT VAL', type=str, +                          nargs=2, dest='config', action='callback', +                          callback=store_local_obj('set', lambda vx: { +                              'opt': vx[0], 'val': vx[1], 'rx': rx})) +            op.add_option('--config-del' + m, metavar='OPT', type=str, +                          dest='config', action='callback', +                          callback=store_local_obj('del', lambda vx: { +                              'opt': vx, 'rx': rx}))          conf_mod_opt_regex_variant(m and m[1:] or False) -    op.add_option('--normalize-url',           dest='url_print', action='callback', callback=store_local_curry('normal')) -    op.add_option('--canonicalize-url',        dest='url_print', action='callback', callback=store_local_curry('canon')) -    op.add_option('--canonicalize-escape-url', dest='url_print', action='callback', callback=store_local_curry('canon_esc')) - -    tunables = [ norm(o.get_opt_string()[2:]) for o in op.option_list if o.callback in (store_abs, 'store_true', None) and o.get_opt_string() not in ('--version', '--help') ] -    remote_tunables = [ 'listen', 'go_daemon', 'timeout', 'session_owner', 'config_file', 'use_rsync_xattrs' ] -    rq_remote_tunables = { 'listen': True } - -    # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults -    # -- for this to work out we need to tell apart defaults from explicitly set -    # options... so churn out the defaults here and call the parser with virgin -    # values container. +    op.add_option('--normalize-url', dest='url_print', +                  action='callback', callback=store_local_curry('normal')) +    op.add_option('--canonicalize-url', dest='url_print', +                  action='callback', callback=store_local_curry('canon')) +    op.add_option('--canonicalize-escape-url', dest='url_print', +                  action='callback', callback=store_local_curry('canon_esc')) + +    tunables = [norm(o.get_opt_string()[2:]) +                for o in op.option_list +                if (o.callback in (store_abs, 'store_true', None) and +                    o.get_opt_string() not in ('--version', '--help'))] +    remote_tunables = ['listen', 'go_daemon', 'timeout', +                       'session_owner', 'config_file', 'use_rsync_xattrs'] +    rq_remote_tunables = {'listen': True} + +    # precedence for sources of values: 1) commandline, 2) cfg file, 3) +    # defaults for this to work out we need to tell apart defaults from +    # explicitly set options... so churn out the defaults here and call +    # the parser with virgin values container.      defaults = op.get_default_values()      opts, args = op.parse_args(values=optparse.Values())      args_orig = args[:] @@ -291,9 +356,9 @@ def main_i():              args.append(None)          args[1] = r      confdata = rconf.get('config') -    if not (len(args) == 2 or \ -            (len(args) == 1 and rconf.get('listen')) or \ -            (len(args) <= 2 and confdata) or \ +    if not (len(args) == 2 or +            (len(args) == 1 and rconf.get('listen')) or +            (len(args) <= 2 and confdata) or              rconf.get('url_print')):          sys.stderr.write("error: incorrect number of arguments\n\n")          sys.stderr.write(op.get_usage() + "\n") @@ -301,8 +366,8 @@ def main_i():      verify = rconf.get('verify')      if verify: -        logging.info (verify) -        logging.info ("Able to spawn gsyncd.py") +        logging.info(verify) +        logging.info("Able to spawn gsyncd.py")          return      restricted = os.getenv('_GSYNCD_RESTRICTED_') @@ -313,14 +378,17 @@ def main_i():          allopts.update(rconf)          bannedtuns = set(allopts.keys()) - set(remote_tunables)          if bannedtuns: -            raise GsyncdError('following tunables cannot be set with restricted SSH invocaton: ' + \ +            raise GsyncdError('following tunables cannot be set with ' +                              'restricted SSH invocaton: ' +                                ', '.join(bannedtuns))          for k, v in rq_remote_tunables.items():              if not k in allopts or allopts[k] != v: -                raise GsyncdError('tunable %s is not set to value %s required for restricted SSH invocaton' % \ +                raise GsyncdError('tunable %s is not set to value %s required ' +                                  'for restricted SSH invocaton' %                                    (k, v))      confrx = getattr(confdata, 'rx', None) +      def makersc(aa, check=True):          if not aa:              return ([], None, None) @@ -330,12 +398,13 @@ def main_i():          if len(ra) > 1:              remote = ra[1]          if check and not local.can_connect_to(remote): -            raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path)) +            raise GsyncdError("%s cannot work with %s" % +                              (local.path, remote and remote.path))          return (ra, local, remote)      if confrx:          # peers are regexen, don't try to parse them          if confrx == 'glob': -            args = [ '\A' + fnmatch.translate(a) for a in args ] +            args = ['\A' + fnmatch.translate(a) for a in args]          canon_peers = args          namedict = {}      else: @@ -345,21 +414,24 @@ def main_i():              for r in rscs:                  print(r.get_url(**{'normal': {},                                     'canon': {'canonical': True}, -                                   'canon_esc': {'canonical': True, 'escaped': True}}[dc])) +                                   'canon_esc': {'canonical': True, +                                                 'escaped': True}}[dc]))              return          pa = ([], [], []) -        urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True}) +        urlprms = ( +            {}, {'canonical': True}, {'canonical': True, 'escaped': True})          for x in rscs:              for i in range(len(pa)):                  pa[i].append(x.get_url(**urlprms[i]))          _, canon_peers, canon_esc_peers = pa -        # creating the namedict, a dict representing various ways of referring to / repreenting -        # peers to be fillable in config templates -        mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:]) +        # creating the namedict, a dict representing various ways of referring +        # to / repreenting peers to be fillable in config templates +        mods = (lambda x: x, lambda x: x[ +                0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:])          if remote: -            rmap = { local: ('local', 'master'), remote: ('remote', 'slave') } +            rmap = {local: ('local', 'master'), remote: ('remote', 'slave')}          else: -            rmap = { local: ('local', 'slave') } +            rmap = {local: ('local', 'slave')}          namedict = {}          for i in range(len(rscs)):              x = rscs[i] @@ -370,10 +442,13 @@ def main_i():                  if name == 'remote':                      namedict['remotehost'] = x.remotehost      if not 'config_file' in rconf: -        rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd_template.conf") +        rconf['config_file'] = os.path.join( +            os.path.dirname(sys.argv[0]), "conf/gsyncd_template.conf")      upgrade_config_file(rconf['config_file']) -    gcnf = GConffile(rconf['config_file'], canon_peers, defaults.__dict__, opts.__dict__, namedict) +    gcnf = GConffile( +        rconf['config_file'], canon_peers, +        defaults.__dict__, opts.__dict__, namedict)      checkpoint_change = False      if confdata: @@ -407,7 +482,7 @@ def main_i():      delete = rconf.get('delete')      if delete: -        logging.info ('geo-replication delete') +        logging.info('geo-replication delete')          # Delete pid file, status file, socket file          cleanup_paths = []          if getattr(gconf, 'pid_file', None): @@ -422,7 +497,7 @@ def main_i():          if getattr(gconf, 'state_socket_unencoded', None):              cleanup_paths.append(gconf.state_socket_unencoded) -        cleanup_paths.append(rconf['config_file'][:-11] + "*"); +        cleanup_paths.append(rconf['config_file'][:-11] + "*")          # Cleanup changelog working dirs          if getattr(gconf, 'working_dir', None): @@ -432,7 +507,9 @@ def main_i():                  if sys.exc_info()[1].errno == ENOENT:                      pass                  else: -                    raise GsyncdError('Error while removing working dir: %s' % gconf.working_dir) +                    raise GsyncdError( +                        'Error while removing working dir: %s' % +                        gconf.working_dir)          for path in cleanup_paths:              # To delete temp files @@ -443,10 +520,11 @@ def main_i():      if restricted and gconf.allow_network:          ssh_conn = os.getenv('SSH_CONNECTION')          if not ssh_conn: -            #legacy env var +            # legacy env var              ssh_conn = os.getenv('SSH_CLIENT')          if ssh_conn: -            allowed_networks = [ IPNetwork(a) for a in gconf.allow_network.split(',') ] +            allowed_networks = [IPNetwork(a) +                                for a in gconf.allow_network.split(',')]              client_ip = IPAddress(ssh_conn.split()[0])              allowed = False              for nw in allowed_networks: @@ -460,7 +538,7 @@ def main_i():      if ffd:          fcntl.fcntl(ffd, fcntl.F_SETFD, fcntl.FD_CLOEXEC) -    #normalize loglevel +    # normalize loglevel      lvl0 = gconf.log_level      if isinstance(lvl0, str):          lvl1 = lvl0.upper() @@ -519,7 +597,7 @@ def main_i():      if be_monitor:          label = 'monitor'      elif remote: -        #master +        # master          label = gconf.local_path      else:          label = 'slave'  | 
