From b13c483dca20e4015b958f8959328e665a357f60 Mon Sep 17 00:00:00 2001 From: Avra Sengupta Date: Sat, 1 Jun 2013 16:17:57 +0530 Subject: gsyncd: distribute the crawling load * also consume changelog for change detection. * Status fixes * Use new libgfchangelog done API * process (and sync) one changelog at a time Change-Id: I24891615bb762e0741b1819ddfdef8802326cb16 BUG: 847839 Original Author: Csaba Henk Original Author: Aravinda VK Original Author: Venky Shankar Original Author: Amar Tumballi Original Author: Avra Sengupta Signed-off-by: Avra Sengupta Reviewed-on: http://review.gluster.org/5131 Reviewed-by: Vijay Bellur Tested-by: Vijay Bellur --- geo-replication/syncdaemon/gsyncd.py | 125 ++++++++++++++++++++++++++++++----- 1 file changed, 110 insertions(+), 15 deletions(-) (limited to 'geo-replication/syncdaemon/gsyncd.py') diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 387900e6c..ad498c39c 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -2,10 +2,12 @@ import os import os.path +import glob import sys import time import logging import signal +import shutil import optparse import fcntl import fnmatch @@ -17,7 +19,7 @@ from ipaddr import IPAddress, IPNetwork from gconf import gconf from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception -from syncdutils import GsyncdError, select, set_term_handler, privileged +from syncdutils import GsyncdError, select, set_term_handler, privileged, update_file from configinterface import GConffile import resource from monitor import monitor @@ -109,6 +111,17 @@ def startup(**kw): GLogger._gsyncd_loginit(**kw) + +def _unlink(path): + try: + os.unlink(path) + except (OSError, IOError): + if sys.exc_info()[1].errno == ENOENT: + pass + else: + raise GsyncdError('Unlink error: %s' % path) + + def main(): """main routine, signal/exception handling boilerplates""" gconf.starttime = time.time() @@ -153,21 +166,27 @@ def main_i(): op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs) op.add_option('--gluster-log-level', metavar='LVL') op.add_option('--gluster-params', metavar='PRMS', default='') + op.add_option('--glusterd-uuid', metavar='UUID', type=str, default='', help=SUPPRESS_HELP) op.add_option('--gluster-cli-options', metavar='OPTS', default='--log-file=-') op.add_option('--mountbroker', metavar='LABEL') op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs) op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs) op.add_option('--log-file-mbr', metavar='LOGF', type=str, action='callback', callback=store_abs) op.add_option('--state-file', metavar='STATF', type=str, action='callback', callback=store_abs) + op.add_option('--state-detail-file', metavar='STATF', type=str, action='callback', callback=store_abs) op.add_option('--ignore-deletes', default=False, action='store_true') + op.add_option('--isolated-slave', default=False, action='store_true') op.add_option('--use-rsync-xattrs', default=False, action='store_true') op.add_option('-L', '--log-level', metavar='LVL') op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0])) op.add_option('--volume-id', metavar='UUID') + op.add_option('--slave-id', metavar='ID') op.add_option('--session-owner', metavar='ID') + op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='') + op.add_option('--local-path', metavar='PATH', help=SUPPRESS_HELP, default='') op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh') op.add_option('--rsync-command', metavar='CMD', default='rsync') - op.add_option('--rsync-options', metavar='OPTS', default='--sparse') + op.add_option('--rsync-options', metavar='OPTS', default='') op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress') op.add_option('--timeout', metavar='SEC', type=int, default=120) op.add_option('--connection-timeout', metavar='SEC', type=int, default=60, help=SUPPRESS_HELP) @@ -186,15 +205,28 @@ def main_i(): # see crawl() for usage of the above tunables op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP) + # changelog or xtime? (TODO: Change the default) + op.add_option('--change-detector', metavar='MODE', type=str, default='xtime') + # sleep interval for change detection (xtime crawl uses a hardcoded 1 second sleep time) + op.add_option('--change-interval', metavar='SEC', type=int, default=3) + # working directory for changelog based mechanism + op.add_option('--working-dir', metavar='DIR', type=str, action='callback', callback=store_abs) + op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local) # duh. need to specify dest or value will be mapped to None :S op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True)) + op.add_option('--resource-local', dest='resource_local', type=str, action='callback', callback=store_local) + op.add_option('--resource-remote', dest='resource_remote', type=str, action='callback', callback=store_local) op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local) op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True)) op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont')) + op.add_option('--verify', type=str, dest="verify", action='callback', callback=store_local) + op.add_option('--create', type=str, dest="create", action='callback', callback=store_local) + op.add_option('--delete', dest='delete', action='callback', callback=store_local_curry(True)) op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a), setattr(a[-1].values, 'log_file', '-'), setattr(a[-1].values, 'log_level', 'DEBUG'))), + op.add_option('--path', type=str, action='append') for a in ('check', 'get'): op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', action='callback', @@ -225,6 +257,19 @@ def main_i(): # values container. defaults = op.get_default_values() opts, args = op.parse_args(values=optparse.Values()) + args_orig = args[:] + r = rconf.get('resource_local') + if r: + if len(args) == 0: + args.append(None) + args[0] = r + r = rconf.get('resource_remote') + if r: + if len(args) == 0: + raise GsyncdError('local resource unspecfied') + elif len(args) == 1: + args.append(None) + args[1] = r confdata = rconf.get('config') if not (len(args) == 2 or \ (len(args) == 1 and rconf.get('listen')) or \ @@ -234,6 +279,12 @@ def main_i(): sys.stderr.write(op.get_usage() + "\n") sys.exit(1) + verify = rconf.get('verify') + if verify: + logging.info (verify) + logging.info ("Able to spawn gsyncd.py") + return + restricted = os.getenv('_GSYNCD_RESTRICTED_') if restricted: @@ -250,6 +301,17 @@ def main_i(): (k, v)) confrx = getattr(confdata, 'rx', None) + def makersc(aa, check=True): + if not aa: + return ([], None, None) + ra = [resource.parse_url(u) for u in aa] + local = ra[0] + remote = None + if len(ra) > 1: + remote = ra[1] + if check and not local.can_connect_to(remote): + raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path)) + return (ra, local, remote) if confrx: # peers are regexen, don't try to parse them if confrx == 'glob': @@ -257,27 +319,20 @@ def main_i(): canon_peers = args namedict = {} else: - rscs = [resource.parse_url(u) for u in args] dc = rconf.get('url_print') + rscs, local, remote = makersc(args_orig, not dc) if dc: for r in rscs: print(r.get_url(**{'normal': {}, 'canon': {'canonical': True}, 'canon_esc': {'canonical': True, 'escaped': True}}[dc])) return - local = remote = None - if rscs: - local = rscs[0] - if len(rscs) > 1: - remote = rscs[1] - if not local.can_connect_to(remote): - raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path)) pa = ([], [], []) urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True}) for x in rscs: for i in range(len(pa)): pa[i].append(x.get_url(**urlprms[i])) - peers, canon_peers, canon_esc_peers = pa + _, canon_peers, canon_esc_peers = pa # creating the namedict, a dict representing various ways of referring to / repreenting # peers to be fillable in config templates mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:]) @@ -327,6 +382,39 @@ def main_i(): gconf.__dict__.update(opts.__dict__) gconf.configinterface = gcnf + delete = rconf.get('delete') + if delete: + logging.info ('geo-replication delete') + # Delete pid file, status file, socket file + cleanup_paths = [] + if getattr(gconf, 'pid_file', None): + cleanup_paths.append(gconf.pid_file) + + if getattr(gconf, 'state_file', None): + cleanup_paths.append(gconf.state_file) + + if getattr(gconf, 'state_detail_file', None): + cleanup_paths.append(gconf.state_detail_file) + + if getattr(gconf, 'state_socket_unencoded', None): + cleanup_paths.append(gconf.state_socket_unencoded) + + # Cleanup changelog working dirs + if getattr(gconf, 'working_dir', None): + try: + shutil.rmtree(gconf.working_dir) + except (IOError, OSError): + if sys.exc_info()[1].errno == ENOENT: + pass + else: + raise GsyncdError('Error while removing working dir: %s' % gconf.working_dir) + + for path in cleanup_paths: + # To delete temp files + for f in glob.glob(path + "*"): + _unlink(f) + return + if restricted and gconf.allow_network: ssh_conn = os.getenv('SSH_CONNECTION') if not ssh_conn: @@ -380,9 +468,16 @@ def main_i(): raise return + create = rconf.get('create') + if create: + if getattr(gconf, 'state_file', None): + update_file(gconf.state_file, lambda f: f.write(create + '\n')) + return + go_daemon = rconf['go_daemon'] be_monitor = rconf.get('monitor') + rscs, local, remote = makersc(args) if not be_monitor and isinstance(remote, resource.SSH) and \ go_daemon == 'should': go_daemon = 'postconn' @@ -393,16 +488,16 @@ def main_i(): label = 'monitor' elif remote: #master - label = '' + label = gconf.local_path else: label = 'slave' startup(go_daemon=go_daemon, log_file=log_file, label=label) + resource.Popen.init_errhandler() if be_monitor: - return monitor() + return monitor(*rscs) - logging.info("syncing: %s" % " -> ".join(peers)) - resource.Popen.init_errhandler() + logging.info("syncing: %s" % " -> ".join(r.url for r in rscs)) if remote: go_daemon = remote.connect_remote(go_daemon=go_daemon) if go_daemon: -- cgit