summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/gsyncd.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/gsyncd.py')
-rw-r--r--geo-replication/syncdaemon/gsyncd.py1077
1 files changed, 291 insertions, 786 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index adca0374c6c..fbab5cbf386 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -1,805 +1,310 @@
-#!/usr/bin/env python
-#
-# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
-# This file is part of GlusterFS.
-
-# This file is licensed to you under your choice of the GNU Lesser
-# General Public License, version 3 or any later version (LGPLv3 or
-# later), or the GNU General Public License, version 2 (GPLv2), in all
-# cases as published by the Free Software Foundation.
-#
-
+#!/usr/bin/python
+from argparse import ArgumentParser
+import time
import os
-import os.path
-import glob
+from errno import EEXIST
import sys
-import time
import logging
-import shutil
-import optparse
-import fcntl
-import fnmatch
-from optparse import OptionParser, SUPPRESS_HELP
-from logging import Logger, handlers
-from errno import ENOENT
-
-from ipaddr import IPAddress, IPNetwork
-
-from gconf import gconf
-from syncdutils import FreeObject, norm, grabpidfile, finalize
-from syncdutils import log_raise_exception, privileged, boolify
-from syncdutils import GsyncdError, select, set_term_handler
-from configinterface import GConffile, upgrade_config_file, TMPL_CONFIG_FILE
-import resource
-from monitor import monitor
-import xml.etree.ElementTree as XET
-from subprocess import PIPE
-import subprocess
-from changelogagent import agent, Changelog
-from gsyncdstatus import set_monitor_status, GeorepStatus, human_time_utc
-from libcxattr import Xattr
-import struct
-from syncdutils import get_master_and_slave_data_from_args, lf, Popen
-
-ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
-
-
-class GLogger(Logger):
-
- """Logger customizations for gsyncd.
-
- It implements a log format similar to that of glusterfs.
- """
-
- def makeRecord(self, name, level, *a):
- rv = Logger.makeRecord(self, name, level, *a)
- rv.nsecs = (rv.created - int(rv.created)) * 1000000
- fr = sys._getframe(4)
- callee = fr.f_locals.get('self')
- if callee:
- ctx = str(type(callee)).split("'")[1].split('.')[-1]
- else:
- ctx = '<top>'
- if not hasattr(rv, 'funcName'):
- rv.funcName = fr.f_code.co_name
- rv.lvlnam = logging.getLevelName(level)[0]
- rv.ctx = ctx
- return rv
-
- @classmethod
- def setup(cls, **kw):
- lbl = kw.get('label', "")
- if lbl:
- lbl = '(' + lbl + ')'
- lprm = {'datefmt': "%Y-%m-%d %H:%M:%S",
- 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" +
- lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"}
- lprm.update(kw)
- lvl = kw.get('level', logging.INFO)
- lprm['level'] = lvl
- logging.root = cls("root", lvl)
- logging.setLoggerClass(cls)
- logging.getLogger().handlers = []
- logging.getLogger().setLevel(lprm['level'])
- logging.Formatter.converter = time.gmtime # Log in GMT/UTC time
-
- if 'filename' in lprm:
- try:
- logging_handler = handlers.WatchedFileHandler(lprm['filename'])
- formatter = logging.Formatter(fmt=lprm['format'],
- datefmt=lprm['datefmt'])
- logging_handler.setFormatter(formatter)
- logging.getLogger().addHandler(logging_handler)
- except AttributeError:
- # Python version < 2.6 will not have WatchedFileHandler
- # so fallback to logging without any handler.
- # Note: logrotate will not work if Python version is < 2.6
- logging.basicConfig(**lprm)
- else:
- # If filename not passed(not available in lprm) then it may be
- # streaming.(Ex: {"stream": "/dev/stdout"})
- logging.basicConfig(**lprm)
-
- @classmethod
- def _gsyncd_loginit(cls, **kw):
- lkw = {}
- if gconf.log_level:
- lkw['level'] = gconf.log_level
- if kw.get('log_file'):
- if kw['log_file'] in ('-', '/dev/stderr'):
- lkw['stream'] = sys.stderr
- elif kw['log_file'] == '/dev/stdout':
- lkw['stream'] = sys.stdout
- else:
- lkw['filename'] = kw['log_file']
-
- cls.setup(label=kw.get('label'), **lkw)
-
- lkw.update({'saved_label': kw.get('label')})
- gconf.log_metadata = lkw
- gconf.log_exit = True
-
-
-# Given slave host and its volume name, get corresponding volume uuid
-def slave_vol_uuid_get(host, vol):
- po = subprocess.Popen(['gluster', '--xml', '--remote-host=' + host,
- 'volume', 'info', vol], bufsize=0,
- stdin=None, stdout=PIPE, stderr=PIPE)
- vix, err = po.communicate()
- if po.returncode != 0:
- logging.info(lf("Volume info failed, unable to get "
- "volume uuid of slavevol, "
- "returning empty string",
- slavevol=vol,
- slavehost=host,
- error=po.returncode))
- return ""
- vi = XET.fromstring(vix)
- if vi.find('opRet').text != '0':
- logging.info(lf("Unable to get volume uuid of slavevol, "
- "returning empty string",
- slavevol=vol,
- slavehost=host,
- error=vi.find('opErrstr').text))
- return ""
- try:
- voluuid = vi.find("volInfo/volumes/volume/id").text
- except (ParseError, AttributeError, ValueError) as e:
- logging.info(lf("Parsing failed to volume uuid of slavevol, "
- "returning empty string",
- slavevol=vol,
- slavehost=host,
- error=e))
- voluuid = ""
-
- return voluuid
-
-
-def startup(**kw):
- """set up logging, pidfile grabbing, daemonization"""
- if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn':
- if not grabpidfile():
- sys.stderr.write("pidfile is taken, exiting.\n")
- sys.exit(2)
- gconf.pid_file_owned = True
-
- if kw.get('go_daemon') == 'should':
- x, y = os.pipe()
- gconf.cpid = os.fork()
- if gconf.cpid:
- os.close(x)
- sys.exit()
- os.close(y)
- os.setsid()
- dn = os.open(os.devnull, os.O_RDWR)
- for f in (sys.stdin, sys.stdout, sys.stderr):
- os.dup2(dn, f.fileno())
- if getattr(gconf, 'pid_file', None):
- if not grabpidfile(gconf.pid_file + '.tmp'):
- raise GsyncdError("cannot grab temporary pidfile")
- os.rename(gconf.pid_file + '.tmp', gconf.pid_file)
- # wait for parent to terminate
- # so we can start up with
- # no messing from the dirty
- # ol' bustard
- select((x,), (), ())
- os.close(x)
-
- GLogger._gsyncd_loginit(**kw)
-
-
-def _unlink(path):
- try:
- os.unlink(path)
- except (OSError, IOError):
- if sys.exc_info()[1].errno == ENOENT:
- pass
- else:
- raise GsyncdError('Unlink error: %s' % path)
+from logutils import setup_logging
+import gsyncdconfig as gconf
+from rconf import rconf
+import subcmds
+from conf import GLUSTERD_WORKDIR, GLUSTERFS_CONFDIR, GCONF_VERSION
+from syncdutils import set_term_handler, finalize, lf
+from syncdutils import log_raise_exception, FreeObject, escape
+import argsupgrade
+
+
+GSYNCD_VERSION = "gsyncd.py %s.0" % GCONF_VERSION
def main():
- """main routine, signal/exception handling boilerplates"""
- gconf.starttime = time.time()
+ rconf.starttime = time.time()
+
+ # If old Glusterd sends commands in old format, below function
+ # converts the sys.argv to new format. This conversion is added
+ # temporarily for backward compatibility. This can be removed
+ # once integrated with Glusterd2
+ # This modifies sys.argv globally, so rest of the code works as usual
+ argsupgrade.upgrade()
+
+ # Default argparse version handler prints to stderr, which is fixed in
+ # 3.x series but not in 2.x, using custom parser to fix this issue
+ if "--version" in sys.argv:
+ print(GSYNCD_VERSION)
+ sys.exit(0)
+
+ parser = ArgumentParser()
+ sp = parser.add_subparsers(dest="subcmd")
+
+ # Monitor Status File update
+ p = sp.add_parser("monitor-status")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave details user@host::vol format")
+ p.add_argument("status", help="Update Monitor Status")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--debug", action="store_true")
+
+ # Monitor
+ p = sp.add_parser("monitor")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave details user@host::vol format")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--pause-on-start",
+ action="store_true",
+ help="Start with Paused state")
+ p.add_argument("--local-node-id", help="Local Node ID")
+ p.add_argument("--debug", action="store_true")
+
+ # Worker
+ p = sp.add_parser("worker")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave details user@host::vol format")
+ p.add_argument("--local-path", help="Local Brick Path")
+ p.add_argument("--feedback-fd", type=int,
+ help="feedback fd between monitor and worker")
+ p.add_argument("--local-node", help="Local master node")
+ p.add_argument("--local-node-id", help="Local Node ID")
+ p.add_argument("--rpc-fd",
+ help="Read and Write fds for worker-agent communication")
+ p.add_argument("--subvol-num", type=int, help="Subvolume number")
+ p.add_argument("--is-hottier", action="store_true",
+ help="Is this brick part of hot tier")
+ p.add_argument("--resource-remote",
+ help="Remote node to connect to Slave Volume")
+ p.add_argument("--resource-remote-id",
+ help="Remote node ID to connect to Slave Volume")
+ p.add_argument("--slave-id", help="Slave Volume ID")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--access-mount", action="store_true",
+ help="Do not umount the mount")
+ p.add_argument("--debug", action="store_true")
+
+ # Agent
+ p = sp.add_parser("agent")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave details user@host::vol format")
+ p.add_argument("--local-path", help="Local brick path")
+ p.add_argument("--local-node", help="Local master node")
+ p.add_argument("--local-node-id", help="Local Node ID")
+ p.add_argument("--slave-id", help="Slave Volume ID")
+ p.add_argument("--rpc-fd",
+ help="Read and Write fds for worker-agent communication")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--debug", action="store_true")
+
+ # Slave
+ p = sp.add_parser("slave")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave details user@host::vol format")
+ p.add_argument("--session-owner")
+ p.add_argument("--master-brick",
+ help="Master brick which is connected to the Slave")
+ p.add_argument("--master-node",
+ help="Master node which is connected to the Slave")
+ p.add_argument("--master-node-id",
+ help="Master node ID which is connected to the Slave")
+ p.add_argument("--local-node", help="Local Slave node")
+ p.add_argument("--local-node-id", help="Local Slave ID")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--debug", action="store_true")
+
+ # All configurations which are configured via "slave-" options
+ # DO NOT add default values for these configurations, default values
+ # will be picked from template config file
+ p.add_argument("--slave-timeout", type=int,
+ help="Timeout to end gsyncd at Slave side")
+ p.add_argument("--use-rsync-xattrs", action="store_true")
+ p.add_argument("--slave-log-level", help="Slave Gsyncd Log level")
+ p.add_argument("--slave-gluster-log-level",
+ help="Slave Gluster mount Log level")
+ p.add_argument("--slave-access-mount", action="store_true",
+ help="Do not umount the mount")
+
+ # Status
+ p = sp.add_parser("status")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--local-path", help="Local Brick Path")
+ p.add_argument("--debug", action="store_true")
+
+ # Config-check
+ p = sp.add_parser("config-check")
+ p.add_argument("name", help="Config Name")
+ p.add_argument("--value", help="Config Value")
+ p.add_argument("--debug", action="store_true")
+
+ # Config-get
+ p = sp.add_parser("config-get")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave")
+ p.add_argument("--name", help="Config Name")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--debug", action="store_true")
+ p.add_argument("--show-defaults", action="store_true")
+ p.add_argument("--only-value", action="store_true")
+ p.add_argument("--use-underscore", action="store_true")
+
+ # Config-set
+ p = sp.add_parser("config-set")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave")
+ p.add_argument("name", help="Config Name")
+ p.add_argument("value", help="Config Value")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--debug", action="store_true")
+
+ # Config-reset
+ p = sp.add_parser("config-reset")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave")
+ p.add_argument("name", help="Config Name")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--debug", action="store_true")
+
+ # voluuidget
+ p = sp.add_parser("voluuidget")
+ p.add_argument("host", help="Hostname")
+ p.add_argument("volname", help="Volume Name")
+ p.add_argument("--debug", action="store_true")
+
+ # Delete
+ p = sp.add_parser("delete")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument('--path', dest='paths', action="append")
+ p.add_argument("--reset-sync-time", action="store_true",
+ help="Reset Sync Time")
+ p.add_argument("--debug", action="store_true")
+
+ # Parse arguments
+ args = parser.parse_args()
+
+ # Extra template values, All arguments are already part of template
+ # variables, use this for adding extra variables
+ extra_tmpl_args = {}
+
+ # Add First/Primary Slave host, user and volume
+ if getattr(args, "slave", None) is not None:
+ hostdata, slavevol = args.slave.split("::")
+ hostdata = hostdata.split("@")
+ slavehost = hostdata[-1]
+ slaveuser = "root"
+ if len(hostdata) == 2:
+ slaveuser = hostdata[0]
+ extra_tmpl_args["primary_slave_host"] = slavehost
+ extra_tmpl_args["slaveuser"] = slaveuser
+ extra_tmpl_args["slavevol"] = slavevol
+
+ # Add Bricks encoded path
+ if getattr(args, "local_path", None) is not None:
+ extra_tmpl_args["local_id"] = escape(args.local_path)
+
+ # Add Master Bricks encoded path(For Slave)
+ if getattr(args, "master_brick", None) is not None:
+ extra_tmpl_args["master_brick_id"] = escape(args.master_brick)
+
+ # Load configurations
+ config_file = getattr(args, "config_file", None)
+
+ # Subcmd accepts config file argument but not passed
+ # Set default path for config file in that case
+ # If an subcmd accepts config file then it also accepts
+ # master and Slave arguments.
+ if config_file is None and hasattr(args, "config_file"):
+ config_file = "%s/geo-replication/%s_%s_%s/gsyncd.conf" % (
+ GLUSTERD_WORKDIR,
+ args.master,
+ extra_tmpl_args["primary_slave_host"],
+ extra_tmpl_args["slavevol"])
+
+ # If Config file path not exists, log error and continue using default conf
+ config_file_error_msg = None
+ if config_file is not None and not os.path.exists(config_file):
+ # Logging not yet initialized, create the error message to
+ # log later and reset the config_file to None
+ config_file_error_msg = lf(
+ "Session config file not exists, using the default config",
+ path=config_file)
+ config_file = None
+
+ rconf.config_file = config_file
+
+ # Load Config file
+ gconf.load(GLUSTERFS_CONFDIR + "/gsyncd.conf",
+ config_file,
+ vars(args),
+ extra_tmpl_args)
+
+ # Default label to print in log file
+ label = args.subcmd
+ if args.subcmd in ("worker", "agent"):
+ # If Worker or agent, then add brick path also to label
+ label = "%s %s" % (args.subcmd, args.local_path)
+ elif args.subcmd == "slave":
+ # If Slave add Master node and Brick details
+ label = "%s %s%s" % (args.subcmd, args.master_node, args.master_brick)
+
+ # Setup Logger
+ # Default log file
+ log_file = gconf.get("cli-log-file")
+ log_level = gconf.get("cli-log-level")
+ if getattr(args, "master", None) is not None and \
+ getattr(args, "slave", None) is not None:
+ log_file = gconf.get("log-file")
+ log_level = gconf.get("log-level")
+
+ # Use different log file location for Slave log file
+ if args.subcmd == "slave":
+ log_file = gconf.get("slave-log-file")
+ log_level = gconf.get("slave-log-level")
+
+ if args.debug:
+ log_file = "-"
+ log_level = "DEBUG"
+
+ # Create Logdir if not exists
+ try:
+ if log_file != "-":
+ os.mkdir(os.path.dirname(log_file))
+ except OSError as e:
+ if e.errno != EEXIST:
+ raise
+
+ setup_logging(
+ log_file=log_file,
+ level=log_level,
+ label=label
+ )
+
+ if config_file_error_msg is not None:
+ logging.warn(config_file_error_msg)
+
+ # Log message for loaded config file
+ if config_file is not None:
+ logging.info(lf("Using session config file", path=config_file))
+
set_term_handler()
- GLogger.setup()
excont = FreeObject(exval=0)
+
+ # Gets the function name based on the input argument. For example
+ # if subcommand passed as argument is monitor then it looks for
+ # function with name "subcmd_monitor" in subcmds file
+ func = getattr(subcmds, "subcmd_" + args.subcmd.replace("-", "_"), None)
+
try:
try:
- main_i()
+ if func is not None:
+ rconf.args = args
+ func(args)
except:
log_raise_exception(excont)
finally:
finalize(exval=excont.exval)
-def main_i():
- """internal main routine
-
- parse command line, decide what action will be taken;
- we can either:
- - query/manipulate configuration
- - format gsyncd urls using gsyncd's url parsing engine
- - start service in following modes, in given stages:
- - agent: startup(), ChangelogAgent()
- - monitor: startup(), monitor()
- - master: startup(), connect_remote(), connect(), service_loop()
- - slave: startup(), connect(), service_loop()
- """
- rconf = {'go_daemon': 'should'}
-
- def store_abs(opt, optstr, val, parser):
- if val and val != '-':
- val = os.path.abspath(val)
- setattr(parser.values, opt.dest, val)
-
- def store_local(opt, optstr, val, parser):
- rconf[opt.dest] = val
-
- def store_local_curry(val):
- return lambda o, oo, vx, p: store_local(o, oo, val, p)
-
- def store_local_obj(op, dmake):
- return lambda o, oo, vx, p: store_local(
- o, oo, FreeObject(op=op, **dmake(vx)), p)
-
- op = OptionParser(
- usage="%prog [options...] <master> <slave>", version="%prog 0.0.1")
- op.add_option('--gluster-command-dir', metavar='DIR', default='')
- op.add_option('--gluster-log-file', metavar='LOGF',
- default=os.devnull, type=str, action='callback',
- callback=store_abs)
- op.add_option('--gluster-log-level', metavar='LVL')
- op.add_option('--changelog-log-level', metavar='LVL', default="INFO")
- op.add_option('--gluster-params', metavar='PRMS', default='')
- op.add_option(
- '--glusterd-uuid', metavar='UUID', type=str, default='',
- help=SUPPRESS_HELP)
- op.add_option(
- '--gluster-cli-options', metavar='OPTS', default='--log-file=-')
- op.add_option('--mountbroker', metavar='LABEL')
- op.add_option('-p', '--pid-file', metavar='PIDF', type=str,
- action='callback', callback=store_abs)
- op.add_option('-l', '--log-file', metavar='LOGF', type=str,
- action='callback', callback=store_abs)
- op.add_option('--iprefix', metavar='LOGD', type=str,
- action='callback', callback=store_abs)
- op.add_option('--changelog-log-file', metavar='LOGF', type=str,
- action='callback', callback=store_abs)
- op.add_option('--log-file-mbr', metavar='LOGF', type=str,
- action='callback', callback=store_abs)
- op.add_option('--state-file', metavar='STATF', type=str,
- action='callback', callback=store_abs)
- op.add_option('--state-detail-file', metavar='STATF',
- type=str, action='callback', callback=store_abs)
- op.add_option('--georep-session-working-dir', metavar='STATF',
- type=str, action='callback', callback=store_abs)
- op.add_option('--access-mount', default=False, action='store_true')
- op.add_option('--ignore-deletes', default=False, action='store_true')
- op.add_option('--isolated-slave', default=False, action='store_true')
- op.add_option('--use-rsync-xattrs', default=False, action='store_true')
- op.add_option('--sync-xattrs', default=True, action='store_true')
- op.add_option('--sync-acls', default=True, action='store_true')
- op.add_option('--log-rsync-performance', default=False,
- action='store_true')
- op.add_option('--max-rsync-retries', type=int, default=10)
- # Max size of Changelogs to process per batch, Changelogs Processing is
- # not limited by the number of changelogs but instead based on
- # size of the changelog file, One sample changelog file size was 145408
- # with ~1000 CREATE and ~1000 DATA. 5 such files in one batch is 727040
- # If geo-rep worker crashes while processing a batch, it has to retry only
- # that batch since stime will get updated after each batch.
- op.add_option('--changelog-batch-size', type=int, default=727040)
- op.add_option('--pause-on-start', default=False, action='store_true')
- op.add_option('-L', '--log-level', metavar='LVL')
- op.add_option('-r', '--remote-gsyncd', metavar='CMD',
- default=os.path.abspath(sys.argv[0]))
- op.add_option('--volume-id', metavar='UUID')
- op.add_option('--slave-id', metavar='ID')
- op.add_option('--session-owner', metavar='ID')
- op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='')
- op.add_option(
- '--local-node', metavar='NODE', help=SUPPRESS_HELP, default='')
- op.add_option(
- '--local-node-id', metavar='NODEID', help=SUPPRESS_HELP, default='')
- op.add_option(
- '--local-path', metavar='PATH', help=SUPPRESS_HELP, default='')
- op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh')
- op.add_option('--ssh-port', metavar='PORT', type=int, default=22)
- op.add_option('--ssh-command-tar', metavar='CMD', default='ssh')
- op.add_option('--rsync-command', metavar='CMD', default='rsync')
- op.add_option('--rsync-options', metavar='OPTS', default='')
- op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress')
- op.add_option('--rsync-opt-ignore-missing-args', default="true")
- op.add_option('--rsync-opt-existing', default="true")
- op.add_option('--timeout', metavar='SEC', type=int, default=120)
- op.add_option('--connection-timeout', metavar='SEC',
- type=int, default=60, help=SUPPRESS_HELP)
- op.add_option('--sync-jobs', metavar='N', type=int, default=3)
- op.add_option('--replica-failover-interval', metavar='N',
- type=int, default=1)
- op.add_option('--changelog-archive-format', metavar='N',
- type=str, default="%Y%m")
- op.add_option('--use-meta-volume', default=False, action='store_true')
- op.add_option('--meta-volume-mnt', metavar='N',
- type=str, default="/var/run/gluster/shared_storage")
- op.add_option(
- '--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP)
- op.add_option('--allow-network', metavar='IPS', default='')
- op.add_option('--socketdir', metavar='DIR')
- op.add_option('--state-socket-unencoded', metavar='SOCKF',
- type=str, action='callback', callback=store_abs)
- op.add_option('--checkpoint', metavar='LABEL', default='0')
-
- # tunables for failover/failback mechanism:
- # None - gsyncd behaves as normal
- # blind - gsyncd works with xtime pairs to identify
- # candidates for synchronization
- # wrapup - same as normal mode but does not assign
- # xtimes to orphaned files
- # see crawl() for usage of the above tunables
- op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP)
-
- # changelog or xtime? (TODO: Change the default)
- op.add_option(
- '--change-detector', metavar='MODE', type=str, default='xtime')
- # sleep interval for change detection (xtime crawl uses a hardcoded 1
- # second sleep time)
- op.add_option('--change-interval', metavar='SEC', type=int, default=3)
- # working directory for changelog based mechanism
- op.add_option('--working-dir', metavar='DIR', type=str,
- action='callback', callback=store_abs)
- op.add_option('--use-tarssh', default=False, action='store_true')
-
- op.add_option('-c', '--config-file', metavar='CONF',
- type=str, action='callback', callback=store_local)
- # duh. need to specify dest or value will be mapped to None :S
- op.add_option('--monitor', dest='monitor', action='callback',
- callback=store_local_curry(True))
- op.add_option('--agent', dest='agent', action='callback',
- callback=store_local_curry(True))
- op.add_option('--resource-local', dest='resource_local',
- type=str, action='callback', callback=store_local)
- op.add_option('--resource-remote', dest='resource_remote',
- type=str, action='callback', callback=store_local)
- op.add_option('--feedback-fd', dest='feedback_fd', type=int,
- help=SUPPRESS_HELP, action='callback', callback=store_local)
- op.add_option('--rpc-fd', dest='rpc_fd', type=str, help=SUPPRESS_HELP)
- op.add_option('--subvol-num', dest='subvol_num', type=str,
- help=SUPPRESS_HELP)
- op.add_option('--listen', dest='listen', help=SUPPRESS_HELP,
- action='callback', callback=store_local_curry(True))
- op.add_option('-N', '--no-daemon', dest="go_daemon",
- action='callback', callback=store_local_curry('dont'))
- op.add_option('--verify', type=str, dest="verify",
- action='callback', callback=store_local)
- op.add_option('--slavevoluuid-get', type=str, dest="slavevoluuid_get",
- action='callback', callback=store_local)
- op.add_option('--create', type=str, dest="create",
- action='callback', callback=store_local)
- op.add_option('--delete', dest='delete', action='callback',
- callback=store_local_curry(True))
- op.add_option('--path-list', dest='path_list', action='callback',
- type=str, callback=store_local)
- op.add_option('--reset-sync-time', default=False, action='store_true')
- op.add_option('--status-get', dest='status_get', action='callback',
- callback=store_local_curry(True))
- op.add_option('--debug', dest="go_daemon", action='callback',
- callback=lambda *a: (store_local_curry('dont')(*a),
- setattr(
- a[-1].values, 'log_file', '-'),
- setattr(a[-1].values, 'log_level',
- 'DEBUG'),
- setattr(a[-1].values,
- 'changelog_log_file', '-')))
- op.add_option('--path', type=str, action='append')
-
- for a in ('check', 'get'):
- op.add_option('--config-' + a, metavar='OPT', type=str, dest='config',
- action='callback',
- callback=store_local_obj(a, lambda vx: {'opt': vx}))
- op.add_option('--config-get-all', dest='config', action='callback',
- callback=store_local_obj('get', lambda vx: {'opt': None}))
- for m in ('', '-rx', '-glob'):
- # call this code 'Pythonic' eh?
- # have to define a one-shot local function to be able
- # to inject (a value depending on the)
- # iteration variable into the inner lambda
- def conf_mod_opt_regex_variant(rx):
- op.add_option('--config-set' + m, metavar='OPT VAL', type=str,
- nargs=2, dest='config', action='callback',
- callback=store_local_obj('set', lambda vx: {
- 'opt': vx[0], 'val': vx[1], 'rx': rx}))
- op.add_option('--config-del' + m, metavar='OPT', type=str,
- dest='config', action='callback',
- callback=store_local_obj('del', lambda vx: {
- 'opt': vx, 'rx': rx}))
- conf_mod_opt_regex_variant(m and m[1:] or False)
-
- op.add_option('--normalize-url', dest='url_print',
- action='callback', callback=store_local_curry('normal'))
- op.add_option('--canonicalize-url', dest='url_print',
- action='callback', callback=store_local_curry('canon'))
- op.add_option('--canonicalize-escape-url', dest='url_print',
- action='callback', callback=store_local_curry('canon_esc'))
- op.add_option('--is-hottier', default=False, action='store_true')
-
- tunables = [norm(o.get_opt_string()[2:])
- for o in op.option_list
- if (o.callback in (store_abs, 'store_true', None) and
- o.get_opt_string() not in ('--version', '--help'))]
- remote_tunables = ['listen', 'go_daemon', 'timeout',
- 'session_owner', 'config_file', 'use_rsync_xattrs',
- 'local_id', 'local_node', 'access_mount']
- rq_remote_tunables = {'listen': True}
-
- # precedence for sources of values: 1) commandline, 2) cfg file, 3)
- # defaults for this to work out we need to tell apart defaults from
- # explicitly set options... so churn out the defaults here and call
- # the parser with virgin values container.
- defaults = op.get_default_values()
- opts, args = op.parse_args(values=optparse.Values())
- # slave url cleanup, if input comes with vol uuid as follows
- # 'ssh://fvm1::gv2:07dfddca-94bb-4841-a051-a7e582811467'
- temp_args = []
- for arg in args:
- # Split based on ::
- data = arg.split("::")
- if len(data)>1:
- slavevol_name = data[1].split(":")[0]
- temp_args.append("%s::%s" % (data[0], slavevol_name))
- else:
- temp_args.append(data[0])
- args = temp_args
- args_orig = args[:]
-
- voluuid_get = rconf.get('slavevoluuid_get')
- if voluuid_get:
- slave_host, slave_vol = voluuid_get.split("::")
- svol_uuid = slave_vol_uuid_get(slave_host, slave_vol)
- print svol_uuid
- return
-
- r = rconf.get('resource_local')
- if r:
- if len(args) == 0:
- args.append(None)
- args[0] = r
- r = rconf.get('resource_remote')
- if r:
- if len(args) == 0:
- raise GsyncdError('local resource unspecfied')
- elif len(args) == 1:
- args.append(None)
- args[1] = r
- confdata = rconf.get('config')
- if not (len(args) == 2 or
- (len(args) == 1 and rconf.get('listen')) or
- (len(args) <= 2 and confdata) or
- rconf.get('url_print')):
- sys.stderr.write("error: incorrect number of arguments\n\n")
- sys.stderr.write(op.get_usage() + "\n")
- sys.exit(1)
-
- verify = rconf.get('verify')
- if verify:
- logging.info(verify)
- logging.info("Able to spawn gsyncd.py")
- return
-
- restricted = os.getenv('_GSYNCD_RESTRICTED_')
-
- if restricted:
- allopts = {}
- allopts.update(opts.__dict__)
- allopts.update(rconf)
- bannedtuns = set(allopts.keys()) - set(remote_tunables)
- if bannedtuns:
- raise GsyncdError('following tunables cannot be set with '
- 'restricted SSH invocaton: ' +
- ', '.join(bannedtuns))
- for k, v in rq_remote_tunables.items():
- if not k in allopts or allopts[k] != v:
- raise GsyncdError('tunable %s is not set to value %s required '
- 'for restricted SSH invocaton' %
- (k, v))
-
- confrx = getattr(confdata, 'rx', None)
-
- def makersc(aa, check=True):
- if not aa:
- return ([], None, None)
- ra = [resource.parse_url(u) for u in aa]
- local = ra[0]
- remote = None
- if len(ra) > 1:
- remote = ra[1]
- if check and not local.can_connect_to(remote):
- raise GsyncdError("%s cannot work with %s" %
- (local.path, remote and remote.path))
- return (ra, local, remote)
- if confrx:
- # peers are regexen, don't try to parse them
- if confrx == 'glob':
- args = ['\A' + fnmatch.translate(a) for a in args]
- canon_peers = args
- namedict = {}
- else:
- dc = rconf.get('url_print')
- rscs, local, remote = makersc(args_orig, not dc)
- if dc:
- for r in rscs:
- print(r.get_url(**{'normal': {},
- 'canon': {'canonical': True},
- 'canon_esc': {'canonical': True,
- 'escaped': True}}[dc]))
- return
- pa = ([], [], [])
- urlprms = (
- {}, {'canonical': True}, {'canonical': True, 'escaped': True})
- for x in rscs:
- for i in range(len(pa)):
- pa[i].append(x.get_url(**urlprms[i]))
- _, canon_peers, canon_esc_peers = pa
- # creating the namedict, a dict representing various ways of referring
- # to / repreenting peers to be fillable in config templates
- mods = (lambda x: x, lambda x: x[
- 0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:])
- if remote:
- rmap = {local: ('local', 'master'), remote: ('remote', 'slave')}
- else:
- rmap = {local: ('local', 'slave')}
- namedict = {}
- for i in range(len(rscs)):
- x = rscs[i]
- for name in rmap[x]:
- for j in range(3):
- namedict[mods[j](name)] = pa[j][i]
- namedict[name + 'vol'] = x.volume
- if name == 'remote':
- namedict['remotehost'] = x.remotehost
-
- if not 'config_file' in rconf:
- rconf['config_file'] = TMPL_CONFIG_FILE
-
- # Upgrade Config File only if it is session conf file
- if rconf['config_file'] != TMPL_CONFIG_FILE:
- upgrade_config_file(rconf['config_file'], confdata)
-
- gcnf = GConffile(
- rconf['config_file'], canon_peers, confdata,
- defaults.__dict__, opts.__dict__, namedict)
-
- conf_change = False
- if confdata:
- opt_ok = norm(confdata.opt) in tunables + [None]
- if confdata.op == 'check':
- if opt_ok:
- sys.exit(0)
- else:
- sys.exit(1)
- elif not opt_ok:
- raise GsyncdError("not a valid option: " + confdata.opt)
- if confdata.op == 'get':
- gcnf.get(confdata.opt)
- elif confdata.op == 'set':
- gcnf.set(confdata.opt, confdata.val, confdata.rx)
- elif confdata.op == 'del':
- gcnf.delete(confdata.opt, confdata.rx)
- # when modifying checkpoint, it's important to make a log
- # of that, so in that case we go on to set up logging even
- # if its just config invocation
- if confdata.op in ('set', 'del') and not confdata.rx:
- conf_change = True
-
- if not conf_change:
- return
-
- gconf.__dict__.update(defaults.__dict__)
- gcnf.update_to(gconf.__dict__)
- gconf.__dict__.update(opts.__dict__)
- gconf.configinterface = gcnf
-
- delete = rconf.get('delete')
- if delete:
- logging.info('geo-replication delete')
- # remove the stime xattr from all the brick paths so that
- # a re-create of a session will start sync all over again
- stime_xattr_name = getattr(gconf, 'master.stime_xattr_name', None)
-
- # Delete pid file, status file, socket file
- cleanup_paths = []
- if getattr(gconf, 'pid_file', None):
- cleanup_paths.append(gconf.pid_file)
-
- if getattr(gconf, 'state_file', None):
- cleanup_paths.append(gconf.state_file)
-
- if getattr(gconf, 'state_detail_file', None):
- cleanup_paths.append(gconf.state_detail_file)
-
- if getattr(gconf, 'state_socket_unencoded', None):
- cleanup_paths.append(gconf.state_socket_unencoded)
-
- cleanup_paths.append(rconf['config_file'][:-11] + "*")
-
- # Cleanup changelog working dirs
- if getattr(gconf, 'working_dir', None):
- try:
- shutil.rmtree(gconf.working_dir)
- except (IOError, OSError):
- if sys.exc_info()[1].errno == ENOENT:
- pass
- else:
- raise GsyncdError(
- 'Error while removing working dir: %s' %
- gconf.working_dir)
-
- for path in cleanup_paths:
- # To delete temp files
- for f in glob.glob(path + "*"):
- _unlink(f)
-
- reset_sync_time = boolify(gconf.reset_sync_time)
- if reset_sync_time and stime_xattr_name:
- path_list = rconf.get('path_list')
- paths = []
- for p in path_list.split('--path='):
- stripped_path = p.strip()
- if stripped_path != "":
- # set stime to (0,0) to trigger full volume content resync
- # to slave on session recreation
- # look at master.py::Xcrawl hint: zero_zero
- Xattr.lsetxattr(stripped_path, stime_xattr_name,
- struct.pack("!II", 0, 0))
-
- return
-
- if restricted and gconf.allow_network:
- ssh_conn = os.getenv('SSH_CONNECTION')
- if not ssh_conn:
- # legacy env var
- ssh_conn = os.getenv('SSH_CLIENT')
- if ssh_conn:
- allowed_networks = [IPNetwork(a)
- for a in gconf.allow_network.split(',')]
- client_ip = IPAddress(ssh_conn.split()[0])
- allowed = False
- for nw in allowed_networks:
- if client_ip in nw:
- allowed = True
- break
- if not allowed:
- raise GsyncdError("client IP address is not allowed")
-
- ffd = rconf.get('feedback_fd')
- if ffd:
- fcntl.fcntl(ffd, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
-
- # normalize loglevel
- lvl0 = gconf.log_level
- if isinstance(lvl0, str):
- lvl1 = lvl0.upper()
- lvl2 = logging.getLevelName(lvl1)
- # I have _never_ _ever_ seen such an utterly braindead
- # error condition
- if lvl2 == "Level " + lvl1:
- raise GsyncdError('cannot recognize log level "%s"' % lvl0)
- gconf.log_level = lvl2
-
- if not privileged() and gconf.log_file_mbr:
- gconf.log_file = gconf.log_file_mbr
-
- if conf_change:
- try:
- GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf')
- gconf.log_exit = False
-
- if confdata.op == 'set':
- if confdata.opt == 'checkpoint':
- logging.info(lf("Checkpoint Set",
- time=human_time_utc(confdata.val)))
- else:
- logging.info(lf("Config Set",
- config=confdata.opt,
- value=confdata.val))
- elif confdata.op == 'del':
- if confdata.opt == 'checkpoint':
- logging.info("Checkpoint Reset")
- else:
- logging.info(lf("Config Reset",
- config=confdata.opt))
- except IOError:
- if sys.exc_info()[1].errno == ENOENT:
- # directory of log path is not present,
- # which happens if we get here from
- # a peer-multiplexed "config-set checkpoint"
- # (as that directory is created only on the
- # original node)
- pass
- else:
- raise
- return
-
- create = rconf.get('create')
- if create:
- if getattr(gconf, 'state_file', None):
- set_monitor_status(gconf.state_file, create)
-
- try:
- GLogger._gsyncd_loginit(log_file=gconf.log_file, label='monitor')
- gconf.log_exit = False
- logging.info(lf("Monitor Status Change",
- status=create))
- except IOError:
- if sys.exc_info()[1].errno == ENOENT:
- # If log dir not present
- pass
- else:
- raise
- return
-
- go_daemon = rconf['go_daemon']
- be_monitor = rconf.get('monitor')
- be_agent = rconf.get('agent')
-
- rscs, local, remote = makersc(args)
-
- status_get = rconf.get('status_get')
- if status_get:
- master_name, slave_data = get_master_and_slave_data_from_args(args)
- for brick in gconf.path:
- brick_status = GeorepStatus(gconf.state_file,
- gconf.local_node,
- brick,
- gconf.local_node_id,
- master_name,
- slave_data,
- getattr(gconf, "pid_file", None))
- checkpoint_time = int(getattr(gconf, "checkpoint", "0"))
- brick_status.print_status(checkpoint_time=checkpoint_time)
- return
-
- if not be_monitor and isinstance(remote, resource.SSH) and \
- go_daemon == 'should':
- go_daemon = 'postconn'
- log_file = None
- else:
- log_file = gconf.log_file
- if be_monitor:
- label = 'monitor'
- elif be_agent:
- label = gconf.local_path
- elif remote:
- # master
- label = gconf.local_path
- else:
- label = 'slave'
- startup(go_daemon=go_daemon, log_file=log_file, label=label)
- Popen.init_errhandler()
-
- if be_agent:
- os.setsid()
- logging.debug(lf("RPC FD",
- rpc_fd=repr(gconf.rpc_fd)))
- return agent(Changelog(), gconf.rpc_fd)
-
- if be_monitor:
- return monitor(*rscs)
-
- if remote:
- go_daemon = remote.connect_remote(go_daemon=go_daemon)
- if go_daemon:
- startup(go_daemon=go_daemon, log_file=gconf.log_file)
- # complete remote connection in child
- remote.connect_remote(go_daemon='done')
- local.connect()
- if ffd:
- logging.info("Closing feedback fd, waking up the monitor")
- os.close(ffd)
- local.service_loop(*[r for r in [remote] if r])
-
-
if __name__ == "__main__":
main()