summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/gsyncd.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/gsyncd.py')
-rw-r--r--geo-replication/syncdaemon/gsyncd.py419
1 files changed, 419 insertions, 0 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
new file mode 100644
index 00000000000..387900e6ce8
--- /dev/null
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -0,0 +1,419 @@
+#!/usr/bin/env python
+
+import os
+import os.path
+import sys
+import time
+import logging
+import signal
+import optparse
+import fcntl
+import fnmatch
+from optparse import OptionParser, SUPPRESS_HELP
+from logging import Logger
+from errno import EEXIST, ENOENT
+
+from ipaddr import IPAddress, IPNetwork
+
+from gconf import gconf
+from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception
+from syncdutils import GsyncdError, select, set_term_handler, privileged
+from configinterface import GConffile
+import resource
+from monitor import monitor
+
+class GLogger(Logger):
+ """Logger customizations for gsyncd.
+
+ It implements a log format similar to that of glusterfs.
+ """
+
+ def makeRecord(self, name, level, *a):
+ rv = Logger.makeRecord(self, name, level, *a)
+ rv.nsecs = (rv.created - int(rv.created)) * 1000000
+ fr = sys._getframe(4)
+ callee = fr.f_locals.get('self')
+ if callee:
+ ctx = str(type(callee)).split("'")[1].split('.')[-1]
+ else:
+ ctx = '<top>'
+ if not hasattr(rv, 'funcName'):
+ rv.funcName = fr.f_code.co_name
+ rv.lvlnam = logging.getLevelName(level)[0]
+ rv.ctx = ctx
+ return rv
+
+ @classmethod
+ def setup(cls, **kw):
+ lbl = kw.get('label', "")
+ if lbl:
+ lbl = '(' + lbl + ')'
+ lprm = {'datefmt': "%Y-%m-%d %H:%M:%S",
+ 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"}
+ lprm.update(kw)
+ lvl = kw.get('level', logging.INFO)
+ lprm['level'] = lvl
+ logging.root = cls("root", lvl)
+ logging.setLoggerClass(cls)
+ logging.getLogger().handlers = []
+ logging.basicConfig(**lprm)
+
+ @classmethod
+ def _gsyncd_loginit(cls, **kw):
+ lkw = {}
+ if gconf.log_level:
+ lkw['level'] = gconf.log_level
+ if kw.get('log_file'):
+ if kw['log_file'] in ('-', '/dev/stderr'):
+ lkw['stream'] = sys.stderr
+ elif kw['log_file'] == '/dev/stdout':
+ lkw['stream'] = sys.stdout
+ else:
+ lkw['filename'] = kw['log_file']
+
+ cls.setup(label=kw.get('label'), **lkw)
+
+ lkw.update({'saved_label': kw.get('label')})
+ gconf.log_metadata = lkw
+ gconf.log_exit = True
+
+def startup(**kw):
+ """set up logging, pidfile grabbing, daemonization"""
+ if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn':
+ if not grabpidfile():
+ sys.stderr.write("pidfile is taken, exiting.\n")
+ sys.exit(2)
+ gconf.pid_file_owned = True
+
+ if kw.get('go_daemon') == 'should':
+ x, y = os.pipe()
+ gconf.cpid = os.fork()
+ if gconf.cpid:
+ os.close(x)
+ sys.exit()
+ os.close(y)
+ os.setsid()
+ dn = os.open(os.devnull, os.O_RDWR)
+ for f in (sys.stdin, sys.stdout, sys.stderr):
+ os.dup2(dn, f.fileno())
+ if getattr(gconf, 'pid_file', None):
+ if not grabpidfile(gconf.pid_file + '.tmp'):
+ raise GsyncdError("cannot grab temporary pidfile")
+ os.rename(gconf.pid_file + '.tmp', gconf.pid_file)
+ # wait for parent to terminate
+ # so we can start up with
+ # no messing from the dirty
+ # ol' bustard
+ select((x,), (), ())
+ os.close(x)
+
+ GLogger._gsyncd_loginit(**kw)
+
+def main():
+ """main routine, signal/exception handling boilerplates"""
+ gconf.starttime = time.time()
+ set_term_handler()
+ GLogger.setup()
+ excont = FreeObject(exval = 0)
+ try:
+ try:
+ main_i()
+ except:
+ log_raise_exception(excont)
+ finally:
+ finalize(exval = excont.exval)
+
+def main_i():
+ """internal main routine
+
+ parse command line, decide what action will be taken;
+ we can either:
+ - query/manipulate configuration
+ - format gsyncd urls using gsyncd's url parsing engine
+ - start service in following modes, in given stages:
+ - monitor: startup(), monitor()
+ - master: startup(), connect_remote(), connect(), service_loop()
+ - slave: startup(), connect(), service_loop()
+ """
+ rconf = {'go_daemon': 'should'}
+
+ def store_abs(opt, optstr, val, parser):
+ if val and val != '-':
+ val = os.path.abspath(val)
+ setattr(parser.values, opt.dest, val)
+ def store_local(opt, optstr, val, parser):
+ rconf[opt.dest] = val
+ def store_local_curry(val):
+ return lambda o, oo, vx, p: store_local(o, oo, val, p)
+ def store_local_obj(op, dmake):
+ return lambda o, oo, vx, p: store_local(o, oo, FreeObject(op=op, **dmake(vx)), p)
+
+ op = OptionParser(usage="%prog [options...] <master> <slave>", version="%prog 0.0.1")
+ op.add_option('--gluster-command-dir', metavar='DIR', default='')
+ op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs)
+ op.add_option('--gluster-log-level', metavar='LVL')
+ op.add_option('--gluster-params', metavar='PRMS', default='')
+ op.add_option('--gluster-cli-options', metavar='OPTS', default='--log-file=-')
+ op.add_option('--mountbroker', metavar='LABEL')
+ op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs)
+ op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs)
+ op.add_option('--log-file-mbr', metavar='LOGF', type=str, action='callback', callback=store_abs)
+ op.add_option('--state-file', metavar='STATF', type=str, action='callback', callback=store_abs)
+ op.add_option('--ignore-deletes', default=False, action='store_true')
+ op.add_option('--use-rsync-xattrs', default=False, action='store_true')
+ op.add_option('-L', '--log-level', metavar='LVL')
+ op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0]))
+ op.add_option('--volume-id', metavar='UUID')
+ op.add_option('--session-owner', metavar='ID')
+ op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh')
+ op.add_option('--rsync-command', metavar='CMD', default='rsync')
+ op.add_option('--rsync-options', metavar='OPTS', default='--sparse')
+ op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress')
+ op.add_option('--timeout', metavar='SEC', type=int, default=120)
+ op.add_option('--connection-timeout', metavar='SEC', type=int, default=60, help=SUPPRESS_HELP)
+ op.add_option('--sync-jobs', metavar='N', type=int, default=3)
+ op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP)
+ op.add_option('--allow-network', metavar='IPS', default='')
+ op.add_option('--socketdir', metavar='DIR')
+ op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs)
+ op.add_option('--checkpoint', metavar='LABEL', default='')
+ # tunables for failover/failback mechanism:
+ # None - gsyncd behaves as normal
+ # blind - gsyncd works with xtime pairs to identify
+ # candidates for synchronization
+ # wrapup - same as normal mode but does not assign
+ # xtimes to orphaned files
+ # see crawl() for usage of the above tunables
+ op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP)
+
+ op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local)
+ # duh. need to specify dest or value will be mapped to None :S
+ op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True))
+ op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local)
+ op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True))
+ op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont'))
+ op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a),
+ setattr(a[-1].values, 'log_file', '-'),
+ setattr(a[-1].values, 'log_level', 'DEBUG'))),
+
+ for a in ('check', 'get'):
+ op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', action='callback',
+ callback=store_local_obj(a, lambda vx: {'opt': vx}))
+ op.add_option('--config-get-all', dest='config', action='callback', callback=store_local_obj('get', lambda vx: {'opt': None}))
+ for m in ('', '-rx', '-glob'):
+ # call this code 'Pythonic' eh?
+ # have to define a one-shot local function to be able to inject (a value depending on the)
+ # iteration variable into the inner lambda
+ def conf_mod_opt_regex_variant(rx):
+ op.add_option('--config-set' + m, metavar='OPT VAL', type=str, nargs=2, dest='config', action='callback',
+ callback=store_local_obj('set', lambda vx: {'opt': vx[0], 'val': vx[1], 'rx': rx}))
+ op.add_option('--config-del' + m, metavar='OPT', type=str, dest='config', action='callback',
+ callback=store_local_obj('del', lambda vx: {'opt': vx, 'rx': rx}))
+ conf_mod_opt_regex_variant(m and m[1:] or False)
+
+ op.add_option('--normalize-url', dest='url_print', action='callback', callback=store_local_curry('normal'))
+ op.add_option('--canonicalize-url', dest='url_print', action='callback', callback=store_local_curry('canon'))
+ op.add_option('--canonicalize-escape-url', dest='url_print', action='callback', callback=store_local_curry('canon_esc'))
+
+ tunables = [ norm(o.get_opt_string()[2:]) for o in op.option_list if o.callback in (store_abs, 'store_true', None) and o.get_opt_string() not in ('--version', '--help') ]
+ remote_tunables = [ 'listen', 'go_daemon', 'timeout', 'session_owner', 'config_file', 'use_rsync_xattrs' ]
+ rq_remote_tunables = { 'listen': True }
+
+ # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults
+ # -- for this to work out we need to tell apart defaults from explicitly set
+ # options... so churn out the defaults here and call the parser with virgin
+ # values container.
+ defaults = op.get_default_values()
+ opts, args = op.parse_args(values=optparse.Values())
+ confdata = rconf.get('config')
+ if not (len(args) == 2 or \
+ (len(args) == 1 and rconf.get('listen')) or \
+ (len(args) <= 2 and confdata) or \
+ rconf.get('url_print')):
+ sys.stderr.write("error: incorrect number of arguments\n\n")
+ sys.stderr.write(op.get_usage() + "\n")
+ sys.exit(1)
+
+ restricted = os.getenv('_GSYNCD_RESTRICTED_')
+
+ if restricted:
+ allopts = {}
+ allopts.update(opts.__dict__)
+ allopts.update(rconf)
+ bannedtuns = set(allopts.keys()) - set(remote_tunables)
+ if bannedtuns:
+ raise GsyncdError('following tunables cannot be set with restricted SSH invocaton: ' + \
+ ', '.join(bannedtuns))
+ for k, v in rq_remote_tunables.items():
+ if not k in allopts or allopts[k] != v:
+ raise GsyncdError('tunable %s is not set to value %s required for restricted SSH invocaton' % \
+ (k, v))
+
+ confrx = getattr(confdata, 'rx', None)
+ if confrx:
+ # peers are regexen, don't try to parse them
+ if confrx == 'glob':
+ args = [ '\A' + fnmatch.translate(a) for a in args ]
+ canon_peers = args
+ namedict = {}
+ else:
+ rscs = [resource.parse_url(u) for u in args]
+ dc = rconf.get('url_print')
+ if dc:
+ for r in rscs:
+ print(r.get_url(**{'normal': {},
+ 'canon': {'canonical': True},
+ 'canon_esc': {'canonical': True, 'escaped': True}}[dc]))
+ return
+ local = remote = None
+ if rscs:
+ local = rscs[0]
+ if len(rscs) > 1:
+ remote = rscs[1]
+ if not local.can_connect_to(remote):
+ raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path))
+ pa = ([], [], [])
+ urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True})
+ for x in rscs:
+ for i in range(len(pa)):
+ pa[i].append(x.get_url(**urlprms[i]))
+ peers, canon_peers, canon_esc_peers = pa
+ # creating the namedict, a dict representing various ways of referring to / repreenting
+ # peers to be fillable in config templates
+ mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:])
+ if remote:
+ rmap = { local: ('local', 'master'), remote: ('remote', 'slave') }
+ else:
+ rmap = { local: ('local', 'slave') }
+ namedict = {}
+ for i in range(len(rscs)):
+ x = rscs[i]
+ for name in rmap[x]:
+ for j in range(3):
+ namedict[mods[j](name)] = pa[j][i]
+ if x.scheme == 'gluster':
+ namedict[name + 'vol'] = x.volume
+ if not 'config_file' in rconf:
+ rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf")
+ gcnf = GConffile(rconf['config_file'], canon_peers, defaults.__dict__, opts.__dict__, namedict)
+
+ checkpoint_change = False
+ if confdata:
+ opt_ok = norm(confdata.opt) in tunables + [None]
+ if confdata.op == 'check':
+ if opt_ok:
+ sys.exit(0)
+ else:
+ sys.exit(1)
+ elif not opt_ok:
+ raise GsyncdError("not a valid option: " + confdata.opt)
+ if confdata.op == 'get':
+ gcnf.get(confdata.opt)
+ elif confdata.op == 'set':
+ gcnf.set(confdata.opt, confdata.val, confdata.rx)
+ elif confdata.op == 'del':
+ gcnf.delete(confdata.opt, confdata.rx)
+ # when modifying checkpoint, it's important to make a log
+ # of that, so in that case we go on to set up logging even
+ # if its just config invocation
+ if confdata.opt == 'checkpoint' and confdata.op in ('set', 'del') and \
+ not confdata.rx:
+ checkpoint_change = True
+ if not checkpoint_change:
+ return
+
+ gconf.__dict__.update(defaults.__dict__)
+ gcnf.update_to(gconf.__dict__)
+ gconf.__dict__.update(opts.__dict__)
+ gconf.configinterface = gcnf
+
+ if restricted and gconf.allow_network:
+ ssh_conn = os.getenv('SSH_CONNECTION')
+ if not ssh_conn:
+ #legacy env var
+ ssh_conn = os.getenv('SSH_CLIENT')
+ if ssh_conn:
+ allowed_networks = [ IPNetwork(a) for a in gconf.allow_network.split(',') ]
+ client_ip = IPAddress(ssh_conn.split()[0])
+ allowed = False
+ for nw in allowed_networks:
+ if client_ip in nw:
+ allowed = True
+ break
+ if not allowed:
+ raise GsyncdError("client IP address is not allowed")
+
+ ffd = rconf.get('feedback_fd')
+ if ffd:
+ fcntl.fcntl(ffd, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
+
+ #normalize loglevel
+ lvl0 = gconf.log_level
+ if isinstance(lvl0, str):
+ lvl1 = lvl0.upper()
+ lvl2 = logging.getLevelName(lvl1)
+ # I have _never_ _ever_ seen such an utterly braindead
+ # error condition
+ if lvl2 == "Level " + lvl1:
+ raise GsyncdError('cannot recognize log level "%s"' % lvl0)
+ gconf.log_level = lvl2
+
+ if not privileged() and gconf.log_file_mbr:
+ gconf.log_file = gconf.log_file_mbr
+
+ if checkpoint_change:
+ try:
+ GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf')
+ if confdata.op == 'set':
+ logging.info('checkpoint %s set' % confdata.val)
+ elif confdata.op == 'del':
+ logging.info('checkpoint info was reset')
+ except IOError:
+ if sys.exc_info()[1].errno == ENOENT:
+ # directory of log path is not present,
+ # which happens if we get here from
+ # a peer-multiplexed "config-set checkpoint"
+ # (as that directory is created only on the
+ # original node)
+ pass
+ else:
+ raise
+ return
+
+ go_daemon = rconf['go_daemon']
+ be_monitor = rconf.get('monitor')
+
+ if not be_monitor and isinstance(remote, resource.SSH) and \
+ go_daemon == 'should':
+ go_daemon = 'postconn'
+ log_file = None
+ else:
+ log_file = gconf.log_file
+ if be_monitor:
+ label = 'monitor'
+ elif remote:
+ #master
+ label = ''
+ else:
+ label = 'slave'
+ startup(go_daemon=go_daemon, log_file=log_file, label=label)
+
+ if be_monitor:
+ return monitor()
+
+ logging.info("syncing: %s" % " -> ".join(peers))
+ resource.Popen.init_errhandler()
+ if remote:
+ go_daemon = remote.connect_remote(go_daemon=go_daemon)
+ if go_daemon:
+ startup(go_daemon=go_daemon, log_file=gconf.log_file)
+ # complete remote connection in child
+ remote.connect_remote(go_daemon='done')
+ local.connect()
+ if ffd:
+ os.close(ffd)
+ local.service_loop(*[r for r in [remote] if r])
+
+
+if __name__ == "__main__":
+ main()