diff options
Diffstat (limited to 'geo-replication/syncdaemon/gsyncd.py')
| -rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 125 | 
1 files changed, 110 insertions, 15 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 387900e6ce8..ad498c39cdc 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -2,10 +2,12 @@  import os  import os.path +import glob  import sys  import time  import logging  import signal +import shutil  import optparse  import fcntl  import fnmatch @@ -17,7 +19,7 @@ from ipaddr import IPAddress, IPNetwork  from gconf import gconf  from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception -from syncdutils import GsyncdError, select, set_term_handler, privileged +from syncdutils import GsyncdError, select, set_term_handler, privileged, update_file  from configinterface import GConffile  import resource  from monitor import monitor @@ -109,6 +111,17 @@ def startup(**kw):      GLogger._gsyncd_loginit(**kw) + +def _unlink(path): +    try: +        os.unlink(path) +    except (OSError, IOError): +        if sys.exc_info()[1].errno == ENOENT: +            pass +        else: +            raise GsyncdError('Unlink error: %s' % path) + +  def main():      """main routine, signal/exception handling boilerplates"""      gconf.starttime = time.time() @@ -153,21 +166,27 @@ def main_i():      op.add_option('--gluster-log-file',    metavar='LOGF',  default=os.devnull, type=str, action='callback', callback=store_abs)      op.add_option('--gluster-log-level',   metavar='LVL')      op.add_option('--gluster-params',      metavar='PRMS',  default='') +    op.add_option('--glusterd-uuid',       metavar='UUID',  type=str, default='', help=SUPPRESS_HELP)      op.add_option('--gluster-cli-options', metavar='OPTS',  default='--log-file=-')      op.add_option('--mountbroker',         metavar='LABEL')      op.add_option('-p', '--pid-file',      metavar='PIDF',  type=str, action='callback', callback=store_abs)      op.add_option('-l', '--log-file',      metavar='LOGF',  type=str, action='callback', callback=store_abs)      op.add_option('--log-file-mbr',        metavar='LOGF',  type=str, action='callback', callback=store_abs)      op.add_option('--state-file',          metavar='STATF', type=str, action='callback', callback=store_abs) +    op.add_option('--state-detail-file',   metavar='STATF', type=str, action='callback', callback=store_abs)      op.add_option('--ignore-deletes',      default=False, action='store_true') +    op.add_option('--isolated-slave',      default=False, action='store_true')      op.add_option('--use-rsync-xattrs',    default=False, action='store_true')      op.add_option('-L', '--log-level',     metavar='LVL')      op.add_option('-r', '--remote-gsyncd', metavar='CMD',   default=os.path.abspath(sys.argv[0]))      op.add_option('--volume-id',           metavar='UUID') +    op.add_option('--slave-id',            metavar='ID')      op.add_option('--session-owner',       metavar='ID') +    op.add_option('--local-id',            metavar='ID',    help=SUPPRESS_HELP, default='') +    op.add_option('--local-path',          metavar='PATH',  help=SUPPRESS_HELP, default='')      op.add_option('-s', '--ssh-command',   metavar='CMD',   default='ssh')      op.add_option('--rsync-command',       metavar='CMD',   default='rsync') -    op.add_option('--rsync-options',       metavar='OPTS',  default='--sparse') +    op.add_option('--rsync-options',       metavar='OPTS',  default='')      op.add_option('--rsync-ssh-options',   metavar='OPTS',  default='--compress')      op.add_option('--timeout',             metavar='SEC',   type=int, default=120)      op.add_option('--connection-timeout',  metavar='SEC',   type=int, default=60, help=SUPPRESS_HELP) @@ -186,15 +205,28 @@ def main_i():      # see crawl() for usage of the above tunables      op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP) +    # changelog or xtime? (TODO: Change the default) +    op.add_option('--change-detector', metavar='MODE', type=str, default='xtime') +    # sleep interval for change detection (xtime crawl uses a hardcoded 1 second sleep time) +    op.add_option('--change-interval', metavar='SEC', type=int, default=3) +    # working directory for changelog based mechanism +    op.add_option('--working-dir', metavar='DIR', type=str, action='callback', callback=store_abs) +      op.add_option('-c', '--config-file',   metavar='CONF',  type=str, action='callback', callback=store_local)      # duh. need to specify dest or value will be mapped to None :S      op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True)) +    op.add_option('--resource-local', dest='resource_local', type=str, action='callback', callback=store_local) +    op.add_option('--resource-remote', dest='resource_remote', type=str, action='callback', callback=store_local)      op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local)      op.add_option('--listen', dest='listen', help=SUPPRESS_HELP,      action='callback', callback=store_local_curry(True))      op.add_option('-N', '--no-daemon', dest="go_daemon",    action='callback', callback=store_local_curry('dont')) +    op.add_option('--verify', type=str, dest="verify", action='callback', callback=store_local) +    op.add_option('--create', type=str, dest="create", action='callback', callback=store_local) +    op.add_option('--delete', dest='delete', action='callback', callback=store_local_curry(True))      op.add_option('--debug', dest="go_daemon",              action='callback', callback=lambda *a: (store_local_curry('dont')(*a),                                                                                                      setattr(a[-1].values, 'log_file', '-'),                                                                                                      setattr(a[-1].values, 'log_level', 'DEBUG'))), +    op.add_option('--path', type=str, action='append')      for a in ('check', 'get'):          op.add_option('--config-' + a,      metavar='OPT',  type=str, dest='config', action='callback', @@ -225,6 +257,19 @@ def main_i():      # values container.      defaults = op.get_default_values()      opts, args = op.parse_args(values=optparse.Values()) +    args_orig = args[:] +    r = rconf.get('resource_local') +    if r: +        if len(args) == 0: +            args.append(None) +        args[0] = r +    r = rconf.get('resource_remote') +    if r: +        if len(args) == 0: +            raise GsyncdError('local resource unspecfied') +        elif len(args) == 1: +            args.append(None) +        args[1] = r      confdata = rconf.get('config')      if not (len(args) == 2 or \              (len(args) == 1 and rconf.get('listen')) or \ @@ -234,6 +279,12 @@ def main_i():          sys.stderr.write(op.get_usage() + "\n")          sys.exit(1) +    verify = rconf.get('verify') +    if verify: +        logging.info (verify) +        logging.info ("Able to spawn gsyncd.py") +        return +      restricted = os.getenv('_GSYNCD_RESTRICTED_')      if restricted: @@ -250,6 +301,17 @@ def main_i():                                    (k, v))      confrx = getattr(confdata, 'rx', None) +    def makersc(aa, check=True): +        if not aa: +            return ([], None, None) +        ra = [resource.parse_url(u) for u in aa] +        local = ra[0] +        remote = None +        if len(ra) > 1: +            remote = ra[1] +        if check and not local.can_connect_to(remote): +            raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path)) +        return (ra, local, remote)      if confrx:          # peers are regexen, don't try to parse them          if confrx == 'glob': @@ -257,27 +319,20 @@ def main_i():          canon_peers = args          namedict = {}      else: -        rscs = [resource.parse_url(u) for u in args]          dc = rconf.get('url_print') +        rscs, local, remote = makersc(args_orig, not dc)          if dc:              for r in rscs:                  print(r.get_url(**{'normal': {},                                     'canon': {'canonical': True},                                     'canon_esc': {'canonical': True, 'escaped': True}}[dc]))              return -        local = remote = None -        if rscs: -            local = rscs[0] -            if len(rscs) > 1: -                remote = rscs[1] -            if not local.can_connect_to(remote): -                raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path))          pa = ([], [], [])          urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True})          for x in rscs:              for i in range(len(pa)):                  pa[i].append(x.get_url(**urlprms[i])) -        peers, canon_peers, canon_esc_peers = pa +        _, canon_peers, canon_esc_peers = pa          # creating the namedict, a dict representing various ways of referring to / repreenting          # peers to be fillable in config templates          mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:]) @@ -327,6 +382,39 @@ def main_i():      gconf.__dict__.update(opts.__dict__)      gconf.configinterface = gcnf +    delete = rconf.get('delete') +    if delete: +        logging.info ('geo-replication delete') +        # Delete pid file, status file, socket file +        cleanup_paths = [] +        if getattr(gconf, 'pid_file', None): +            cleanup_paths.append(gconf.pid_file) + +        if getattr(gconf, 'state_file', None): +            cleanup_paths.append(gconf.state_file) + +        if getattr(gconf, 'state_detail_file', None): +            cleanup_paths.append(gconf.state_detail_file) + +        if getattr(gconf, 'state_socket_unencoded', None): +            cleanup_paths.append(gconf.state_socket_unencoded) + +        # Cleanup changelog working dirs +        if getattr(gconf, 'working_dir', None): +            try: +                shutil.rmtree(gconf.working_dir) +            except (IOError, OSError): +                if sys.exc_info()[1].errno == ENOENT: +                    pass +                else: +                    raise GsyncdError('Error while removing working dir: %s' % gconf.working_dir) + +        for path in cleanup_paths: +            # To delete temp files +            for f in glob.glob(path + "*"): +                _unlink(f) +        return +      if restricted and gconf.allow_network:          ssh_conn = os.getenv('SSH_CONNECTION')          if not ssh_conn: @@ -380,9 +468,16 @@ def main_i():                  raise          return +    create = rconf.get('create') +    if create: +        if getattr(gconf, 'state_file', None): +            update_file(gconf.state_file, lambda f: f.write(create + '\n')) +        return +      go_daemon = rconf['go_daemon']      be_monitor = rconf.get('monitor') +    rscs, local, remote = makersc(args)      if not be_monitor and isinstance(remote, resource.SSH) and \         go_daemon == 'should':          go_daemon = 'postconn' @@ -393,16 +488,16 @@ def main_i():          label = 'monitor'      elif remote:          #master -        label = '' +        label = gconf.local_path      else:          label = 'slave'      startup(go_daemon=go_daemon, log_file=log_file, label=label) +    resource.Popen.init_errhandler()      if be_monitor: -        return monitor() +        return monitor(*rscs) -    logging.info("syncing: %s" % " -> ".join(peers)) -    resource.Popen.init_errhandler() +    logging.info("syncing: %s" % " -> ".join(r.url for r in rscs))      if remote:          go_daemon = remote.connect_remote(go_daemon=go_daemon)          if go_daemon:  | 
