diff options
author | Avra Sengupta <asengupt@redhat.com> | 2013-06-01 16:17:57 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2013-07-26 13:18:57 -0700 |
commit | b13c483dca20e4015b958f8959328e665a357f60 (patch) | |
tree | 2af62fc50bae39e930fcbe09101d3e51c76eb6fc /geo-replication/syncdaemon | |
parent | 4944fc943efc41df1841e4e559180171f6541112 (diff) |
gsyncd: distribute the crawling load
* also consume changelog for change detection.
* Status fixes
* Use new libgfchangelog done API
* process (and sync) one changelog at a time
Change-Id: I24891615bb762e0741b1819ddfdef8802326cb16
BUG: 847839
Original Author: Csaba Henk <csaba@redhat.com>
Original Author: Aravinda VK <avishwan@redhat.com>
Original Author: Venky Shankar <vshankar@redhat.com>
Original Author: Amar Tumballi <amarts@redhat.com>
Original Author: Avra Sengupta <asengupt@redhat.com>
Signed-off-by: Avra Sengupta <asengupt@redhat.com>
Reviewed-on: http://review.gluster.org/5131
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Tested-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon')
-rw-r--r-- | geo-replication/syncdaemon/Makefile.am | 2 | ||||
-rw-r--r-- | geo-replication/syncdaemon/README.md | 37 | ||||
-rw-r--r-- | geo-replication/syncdaemon/configinterface.py | 4 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 125 | ||||
-rw-r--r-- | geo-replication/syncdaemon/libcxattr.py | 15 | ||||
-rw-r--r-- | geo-replication/syncdaemon/libgfchangelog.py | 64 | ||||
-rw-r--r-- | geo-replication/syncdaemon/master.py | 632 | ||||
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 216 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 207 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 102 |
10 files changed, 1166 insertions, 238 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index c19f6b45919..83f969639cc 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -2,6 +2,6 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \ resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ - $(top_builddir)/contrib/ipaddr-py/ipaddr.py + $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py CLEANFILES = diff --git a/geo-replication/syncdaemon/README.md b/geo-replication/syncdaemon/README.md index d45006932d1..0eb15fa7170 100644 --- a/geo-replication/syncdaemon/README.md +++ b/geo-replication/syncdaemon/README.md @@ -11,13 +11,13 @@ Requirements are categorized according to this. * Python >= 2.5, or 2.4 with Ctypes (see below) (both) * OpenSSH >= 4.0 (master) / SSH2 compliant sshd (eg. openssh) (slave) * rsync (both) -* glusterfs with marker support (master); glusterfs (optional on slave) -* FUSE; for supported versions consult glusterfs +* glusterfs: with marker and changelog support (master & slave); +* FUSE: glusterfs fuse module with auxilary gfid based access support INSTALLATION ------------ -As of now, the supported way of operation is running from the source directory. +As of now, the supported way of operation is running from the source directory or using the RPMs given. If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/). @@ -46,34 +46,11 @@ USAGE ----- gsyncd is a utilitly for continous mirroring, ie. it mirrors master to slave incrementally. -Assume we have a gluster volume _pop_ at localhost. We try to set up the following mirrors -for it with gysncd: +Assume we have a gluster volume _pop_ at localhost. We try to set up the mirroring for volume +_pop_ using gsyncd for gluster volume _moz_ on remote machine/cluster @ example.com. The +respective gsyncd invocations are (demoing some syntax sugaring): -1. _/data/mirror_ -2. local gluster volume _yow_ -3. _/data/far_mirror_ at example.com -4. gluster volume _moz_ at example.com - -The respective gsyncd invocations are (demoing some syntax sugaring): - -1. - - gsyncd.py gluster://localhost:pop file:///data/mirror - - or short form - - gsyncd.py :pop /data/mirror - -2. `gsyncd :pop :yow` -3. - - gsyncd.py :pop ssh://example.com:/data/far_mirror - - or short form - - gsyncd.py :pop example.com:/data/far_mirror - -4. `gsyncd.py :pop example.com::moz` +`gsyncd.py :pop example.com::moz` gsyncd has to be available on both sides; it's location on the remote side has to be specified via the "--remote-gsyncd" option (or "remote-gsyncd" config file parameter). (This option can also be diff --git a/geo-replication/syncdaemon/configinterface.py b/geo-replication/syncdaemon/configinterface.py index e55bec519e9..a326e824681 100644 --- a/geo-replication/syncdaemon/configinterface.py +++ b/geo-replication/syncdaemon/configinterface.py @@ -24,9 +24,9 @@ class MultiDict(object): def __getitem__(self, key): val = None for d in self.dicts: - if d.get(key): + if d.get(key) != None: val = d[key] - if not val: + if val == None: raise KeyError(key) return val diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 387900e6ce8..ad498c39cdc 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -2,10 +2,12 @@ import os import os.path +import glob import sys import time import logging import signal +import shutil import optparse import fcntl import fnmatch @@ -17,7 +19,7 @@ from ipaddr import IPAddress, IPNetwork from gconf import gconf from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception -from syncdutils import GsyncdError, select, set_term_handler, privileged +from syncdutils import GsyncdError, select, set_term_handler, privileged, update_file from configinterface import GConffile import resource from monitor import monitor @@ -109,6 +111,17 @@ def startup(**kw): GLogger._gsyncd_loginit(**kw) + +def _unlink(path): + try: + os.unlink(path) + except (OSError, IOError): + if sys.exc_info()[1].errno == ENOENT: + pass + else: + raise GsyncdError('Unlink error: %s' % path) + + def main(): """main routine, signal/exception handling boilerplates""" gconf.starttime = time.time() @@ -153,21 +166,27 @@ def main_i(): op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs) op.add_option('--gluster-log-level', metavar='LVL') op.add_option('--gluster-params', metavar='PRMS', default='') + op.add_option('--glusterd-uuid', metavar='UUID', type=str, default='', help=SUPPRESS_HELP) op.add_option('--gluster-cli-options', metavar='OPTS', default='--log-file=-') op.add_option('--mountbroker', metavar='LABEL') op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs) op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs) op.add_option('--log-file-mbr', metavar='LOGF', type=str, action='callback', callback=store_abs) op.add_option('--state-file', metavar='STATF', type=str, action='callback', callback=store_abs) + op.add_option('--state-detail-file', metavar='STATF', type=str, action='callback', callback=store_abs) op.add_option('--ignore-deletes', default=False, action='store_true') + op.add_option('--isolated-slave', default=False, action='store_true') op.add_option('--use-rsync-xattrs', default=False, action='store_true') op.add_option('-L', '--log-level', metavar='LVL') op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0])) op.add_option('--volume-id', metavar='UUID') + op.add_option('--slave-id', metavar='ID') op.add_option('--session-owner', metavar='ID') + op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='') + op.add_option('--local-path', metavar='PATH', help=SUPPRESS_HELP, default='') op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh') op.add_option('--rsync-command', metavar='CMD', default='rsync') - op.add_option('--rsync-options', metavar='OPTS', default='--sparse') + op.add_option('--rsync-options', metavar='OPTS', default='') op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress') op.add_option('--timeout', metavar='SEC', type=int, default=120) op.add_option('--connection-timeout', metavar='SEC', type=int, default=60, help=SUPPRESS_HELP) @@ -186,15 +205,28 @@ def main_i(): # see crawl() for usage of the above tunables op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP) + # changelog or xtime? (TODO: Change the default) + op.add_option('--change-detector', metavar='MODE', type=str, default='xtime') + # sleep interval for change detection (xtime crawl uses a hardcoded 1 second sleep time) + op.add_option('--change-interval', metavar='SEC', type=int, default=3) + # working directory for changelog based mechanism + op.add_option('--working-dir', metavar='DIR', type=str, action='callback', callback=store_abs) + op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local) # duh. need to specify dest or value will be mapped to None :S op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True)) + op.add_option('--resource-local', dest='resource_local', type=str, action='callback', callback=store_local) + op.add_option('--resource-remote', dest='resource_remote', type=str, action='callback', callback=store_local) op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local) op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True)) op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont')) + op.add_option('--verify', type=str, dest="verify", action='callback', callback=store_local) + op.add_option('--create', type=str, dest="create", action='callback', callback=store_local) + op.add_option('--delete', dest='delete', action='callback', callback=store_local_curry(True)) op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a), setattr(a[-1].values, 'log_file', '-'), setattr(a[-1].values, 'log_level', 'DEBUG'))), + op.add_option('--path', type=str, action='append') for a in ('check', 'get'): op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', action='callback', @@ -225,6 +257,19 @@ def main_i(): # values container. defaults = op.get_default_values() opts, args = op.parse_args(values=optparse.Values()) + args_orig = args[:] + r = rconf.get('resource_local') + if r: + if len(args) == 0: + args.append(None) + args[0] = r + r = rconf.get('resource_remote') + if r: + if len(args) == 0: + raise GsyncdError('local resource unspecfied') + elif len(args) == 1: + args.append(None) + args[1] = r confdata = rconf.get('config') if not (len(args) == 2 or \ (len(args) == 1 and rconf.get('listen')) or \ @@ -234,6 +279,12 @@ def main_i(): sys.stderr.write(op.get_usage() + "\n") sys.exit(1) + verify = rconf.get('verify') + if verify: + logging.info (verify) + logging.info ("Able to spawn gsyncd.py") + return + restricted = os.getenv('_GSYNCD_RESTRICTED_') if restricted: @@ -250,6 +301,17 @@ def main_i(): (k, v)) confrx = getattr(confdata, 'rx', None) + def makersc(aa, check=True): + if not aa: + return ([], None, None) + ra = [resource.parse_url(u) for u in aa] + local = ra[0] + remote = None + if len(ra) > 1: + remote = ra[1] + if check and not local.can_connect_to(remote): + raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path)) + return (ra, local, remote) if confrx: # peers are regexen, don't try to parse them if confrx == 'glob': @@ -257,27 +319,20 @@ def main_i(): canon_peers = args namedict = {} else: - rscs = [resource.parse_url(u) for u in args] dc = rconf.get('url_print') + rscs, local, remote = makersc(args_orig, not dc) if dc: for r in rscs: print(r.get_url(**{'normal': {}, 'canon': {'canonical': True}, 'canon_esc': {'canonical': True, 'escaped': True}}[dc])) return - local = remote = None - if rscs: - local = rscs[0] - if len(rscs) > 1: - remote = rscs[1] - if not local.can_connect_to(remote): - raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path)) pa = ([], [], []) urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True}) for x in rscs: for i in range(len(pa)): pa[i].append(x.get_url(**urlprms[i])) - peers, canon_peers, canon_esc_peers = pa + _, canon_peers, canon_esc_peers = pa # creating the namedict, a dict representing various ways of referring to / repreenting # peers to be fillable in config templates mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:]) @@ -327,6 +382,39 @@ def main_i(): gconf.__dict__.update(opts.__dict__) gconf.configinterface = gcnf + delete = rconf.get('delete') + if delete: + logging.info ('geo-replication delete') + # Delete pid file, status file, socket file + cleanup_paths = [] + if getattr(gconf, 'pid_file', None): + cleanup_paths.append(gconf.pid_file) + + if getattr(gconf, 'state_file', None): + cleanup_paths.append(gconf.state_file) + + if getattr(gconf, 'state_detail_file', None): + cleanup_paths.append(gconf.state_detail_file) + + if getattr(gconf, 'state_socket_unencoded', None): + cleanup_paths.append(gconf.state_socket_unencoded) + + # Cleanup changelog working dirs + if getattr(gconf, 'working_dir', None): + try: + shutil.rmtree(gconf.working_dir) + except (IOError, OSError): + if sys.exc_info()[1].errno == ENOENT: + pass + else: + raise GsyncdError('Error while removing working dir: %s' % gconf.working_dir) + + for path in cleanup_paths: + # To delete temp files + for f in glob.glob(path + "*"): + _unlink(f) + return + if restricted and gconf.allow_network: ssh_conn = os.getenv('SSH_CONNECTION') if not ssh_conn: @@ -380,9 +468,16 @@ def main_i(): raise return + create = rconf.get('create') + if create: + if getattr(gconf, 'state_file', None): + update_file(gconf.state_file, lambda f: f.write(create + '\n')) + return + go_daemon = rconf['go_daemon'] be_monitor = rconf.get('monitor') + rscs, local, remote = makersc(args) if not be_monitor and isinstance(remote, resource.SSH) and \ go_daemon == 'should': go_daemon = 'postconn' @@ -393,16 +488,16 @@ def main_i(): label = 'monitor' elif remote: #master - label = '' + label = gconf.local_path else: label = 'slave' startup(go_daemon=go_daemon, log_file=log_file, label=label) + resource.Popen.init_errhandler() if be_monitor: - return monitor() + return monitor(*rscs) - logging.info("syncing: %s" % " -> ".join(peers)) - resource.Popen.init_errhandler() + logging.info("syncing: %s" % " -> ".join(r.url for r in rscs)) if remote: go_daemon = remote.connect_remote(go_daemon=go_daemon) if go_daemon: diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py index f0a9d22920a..b5b6956aea6 100644 --- a/geo-replication/syncdaemon/libcxattr.py +++ b/geo-replication/syncdaemon/libcxattr.py @@ -43,6 +43,16 @@ class Xattr(object): return cls._query_xattr( path, siz, 'lgetxattr', attr) @classmethod + def lgetxattr_buf(cls, path, attr): + """lgetxattr variant with size discovery""" + size = cls.lgetxattr(path, attr) + if size == -1: + cls.raise_oserr() + if size == 0: + return '' + return cls.lgetxattr(path, attr, size) + + @classmethod def llistxattr(cls, path, siz=0): ret = cls._query_xattr(path, siz, 'llistxattr') if isinstance(ret, str): @@ -56,6 +66,11 @@ class Xattr(object): cls.raise_oserr() @classmethod + def lsetxattr_l(cls, path, attr, val): + """ lazy lsetxattr(): caller handles errno """ + cls.libc.lsetxattr(path, attr, val, len(val), 0) + + @classmethod def lremovexattr(cls, path, attr): ret = cls.libc.lremovexattr(path, attr) if ret == -1: diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py new file mode 100644 index 00000000000..68ec3baf144 --- /dev/null +++ b/geo-replication/syncdaemon/libgfchangelog.py @@ -0,0 +1,64 @@ +import os +from ctypes import * +from ctypes.util import find_library + +class Changes(object): + libgfc = CDLL(find_library("gfchangelog"), use_errno=True) + + @classmethod + def geterrno(cls): + return get_errno() + + @classmethod + def raise_oserr(cls): + errn = cls.geterrno() + raise OSError(errn, os.strerror(errn)) + + @classmethod + def _get_api(cls, call): + return getattr(cls.libgfc, call) + + @classmethod + def cl_register(cls, brick, path, log_file, log_level, retries = 0): + ret = cls._get_api('gf_changelog_register')(brick, path, + log_file, log_level, retries) + if ret == -1: + cls.raise_oserr() + + @classmethod + def cl_scan(cls): + ret = cls._get_api('gf_changelog_scan')() + if ret == -1: + cls.raise_oserr() + + @classmethod + def cl_startfresh(cls): + ret = cls._get_api('gf_changelog_start_fresh')() + if ret == -1: + cls.raise_oserr() + + @classmethod + def cl_getchanges(cls): + """ remove hardcoding for path name length """ + def clsort(f): + return f.split('.')[-1] + changes = [] + buf = create_string_buffer('\0', 4096) + call = cls._get_api('gf_changelog_next_change') + + while True: + ret = call(buf, 4096) + if ret in (0, -1): + break; + changes.append(buf.raw[:ret-1]) + if ret == -1: + cls.raise_oserr() + # cleanup tracker + cls.cl_startfresh() + return sorted(changes, key=clsort) + + @classmethod + def cl_done(cls, clfile): + ret = cls._get_api('gf_changelog_done')(clfile) + if ret == -1: + cls.raise_oserr() diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index f903f30595d..58df14954bb 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -4,22 +4,20 @@ import time import stat import random import signal +import json import logging import socket +import string import errno -import re -from errno import ENOENT, ENODATA, EPIPE +from shutil import copyfileobj +from errno import ENOENT, ENODATA, EPIPE, EEXIST from threading import currentThread, Condition, Lock from datetime import datetime -try: - from hashlib import md5 as md5 -except ImportError: - # py 2.4 - from md5 import new as md5 from gconf import gconf -from syncdutils import FreeObject, Thread, GsyncdError, boolify, \ - escape, unescape, select +from tempfile import mkdtemp, NamedTemporaryFile +from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \ + unescape, select, gauxpfx, md5hex, selfkill, entry2pb URXTIME = (-1, 0) @@ -51,18 +49,20 @@ def _volinfo_hook_relax_foreign(self): # The API! -def gmaster_builder(): +def gmaster_builder(excrawl=None): """produce the GMaster class variant corresponding to sync mode""" this = sys.modules[__name__] modemixin = gconf.special_sync_mode if not modemixin: modemixin = 'normal' - logging.info('setting up master for %s sync mode' % modemixin) + changemixin = isinstance(excrawl, str) and excrawl or gconf.change_detector + logging.info('setting up %s change detection mode' % changemixin) modemixin = getattr(this, modemixin.capitalize() + 'Mixin') + crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin') sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin - class _GMaster(GMasterBase, modemixin, sendmarkmixin, purgemixin): + class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin): pass return _GMaster @@ -100,12 +100,9 @@ class NormalMixin(object): def make_xtime_opts(self, is_master, opts): if not 'create' in opts: - opts['create'] = is_master and not self.inter_master + opts['create'] = is_master if not 'default_xtime' in opts: - if is_master and self.inter_master: - opts['default_xtime'] = ENODATA - else: - opts['default_xtime'] = URXTIME + opts['default_xtime'] = URXTIME def xtime_low(self, server, path, **opts): xt = server.xtime(path, self.uuid) @@ -114,7 +111,7 @@ class NormalMixin(object): if xt == ENODATA or xt < self.volmark: if opts['create']: xt = _xtime_now() - server.set_xtime(path, self.uuid, xt) + server.aggregated.set_xtime(path, self.uuid, xt) else: xt = opts['default_xtime'] return xt @@ -151,6 +148,13 @@ class NormalMixin(object): def set_slave_xtime(self, path, mark): self.slave.server.set_xtime(path, self.uuid, mark) +class PartialMixin(NormalMixin): + """a variant tuned towards operation with a master + that has partial info of the slave (brick typically)""" + + def xtime_reversion_hook(self, path, xtl, xtr): + pass + class WrapupMixin(NormalMixin): """a variant that differs from normal in terms of ignoring non-indexed files""" @@ -163,7 +167,7 @@ class WrapupMixin(NormalMixin): opts['default_xtime'] = URXTIME @staticmethod - def keepalive_payload_hook(timo, gap): + def keepalive_payload_hook(self, timo, gap): return (None, gap) def volinfo_hook(self): @@ -236,19 +240,19 @@ class BlindMixin(object): # from interrupted gsyncd transfer logging.warn('have to fix up missing xtime on ' + path) xt0 = _xtime_now() - server.set_xtime(path, self.uuid, xt0) + server.aggregated.set_xtime(path, self.uuid, xt0) else: xt0 = opts['default_xtime'] xt = (xt0, xt[1]) return xt @staticmethod - def keepalive_payload_hook(timo, gap): + def keepalive_payload_hook(self, timo, gap): return (None, gap) def volinfo_hook(self): res = _volinfo_hook_relax_foreign(self) - volinfo_r_new = self.slave.server.native_volume_info() + volinfo_r_new = self.slave.server.aggregated.native_volume_info() if volinfo_r_new['retval']: raise GsyncdError("slave is corrupt") if getattr(self, 'volinfo_r', None): @@ -321,9 +325,7 @@ class PurgeNoopMixin(object): def purge_missing(self, path, names): pass - - -class GMasterBase(object): +class GMasterCommon(object): """abstract class impementling master role""" KFGN = 0 @@ -334,8 +336,8 @@ class GMasterBase(object): err out on multiple foreign masters """ - fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \ - self.master.server.native_volume_info() + fgn_vis, nat_vi = self.master.server.aggregated.foreign_volume_infos(), \ + self.master.server.aggregated.native_volume_info() fgn_vi = None if fgn_vis: if len(fgn_vis) > 1: @@ -376,6 +378,33 @@ class GMasterBase(object): self.make_xtime_opts(rsc == self.master, opts) return self.xtime_low(rsc.server, path, **opts) + def get_initial_crawl_data(self): + default_data = {'sync_time': 0, 'files_synced': 0, 'bytes_synced': 0} + if getattr(gconf, 'state_detail_file', None): + try: + return json.load(open(gconf.state_detail_file)) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + # Create file with initial data + with open(gconf.state_detail_file, 'wb') as f: + json.dump(default_data, f) + return default_data + else: + raise + + return default_data + + def update_crawl_data(self): + if getattr(gconf, 'state_detail_file', None): + try: + same_dir = os.path.dirname(gconf.state_detail_file) + with NamedTemporaryFile(dir=same_dir, delete=False) as tmp: + json.dump(self.total_crawl_stats, tmp) + os.rename(tmp.name, gconf.state_detail_file) + except (IOError, OSError): + raise + def __init__(self, master, slave): self.master = master self.slave = slave @@ -392,15 +421,12 @@ class GMasterBase(object): self.crawls = 0 self.turns = 0 self.total_turns = int(gconf.turns) - self.lastreport = {'crawls': 0, 'turns': 0} + self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} + self.crawl_stats = {'sync_time': 0, 'last_synctime': 0, 'crawl_starttime': 0, + 'crawl_time': 0, 'files_synced': 0, 'bytes_synced' :0} + self.total_crawl_stats = self.get_initial_crawl_data() self.start = None self.change_seen = None - self.syncTime=0 - self.lastSyncTime=0 - self.crawlStartTime=0 - self.crawlTime=0 - self.filesSynced=0 - self.bytesSynced=0 # the authoritative (foreign, native) volinfo pair # which lets us deduce what to do when we refetch # the volinfos from system @@ -409,8 +435,94 @@ class GMasterBase(object): # the actual volinfo we make use of self.volinfo = None self.terminate = False + self.sleep_interval = 1 self.checkpoint_thread = None + def init_keep_alive(cls): + """start the keep-alive thread """ + timo = int(gconf.timeout or 0) + if timo > 0: + def keep_alive(): + while True: + vi, gap = cls.keepalive_payload_hook(timo, timo * 0.5) + cls.slave.server.keep_alive(vi) + time.sleep(gap) + t = Thread(target=keep_alive) + t.start() + + def volinfo_query(self): + """volume info state machine""" + volinfo_sys, state_change = self.volinfo_hook() + if self.inter_master: + self.volinfo = volinfo_sys[self.KFGN] + else: + self.volinfo = volinfo_sys[self.KNAT] + if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master): + logging.info('new master is %s', self.uuid) + if self.volinfo: + logging.info("%s master with volume id %s ..." % \ + (self.inter_master and "intermediate" or "primary", + self.uuid)) + if state_change == self.KFGN: + gconf.configinterface.set('volume_id', self.uuid) + if self.volinfo: + if self.volinfo['retval']: + raise GsyncdError ("master is corrupt") + self.start_checkpoint_thread() + else: + if should_display_info or self.crawls == 0: + if self.inter_master: + logging.info("waiting for being synced from %s ..." % \ + self.volinfo_state[self.KFGN]['uuid']) + else: + logging.info("waiting for volume info ...") + return True + + def should_crawl(cls): + return (gconf.glusterd_uuid in cls.master.server.node_uuid()) + + def register(self): + self.register() + + def crawlwrap(self, oneshot=False): + if oneshot: + # it's important to do this during the oneshot crawl as + # for a passive gsyncd (ie. in a replicate scenario) + # the keepalive thread would keep the connection alive. + self.init_keep_alive() + self.lastreport['time'] = time.time() + self.crawl_stats['crawl_starttime'] = datetime.now() + + logging.info('crawl interval: %d seconds' % self.sleep_interval) + t0 = time.time() + crawl = self.should_crawl() + while not self.terminate: + if self.volinfo_query(): + continue + t1 = time.time() + if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds + crawl = self.should_crawl() + t0 = t1 + if not crawl: + time.sleep(5) + continue + if self.start: + logging.debug("... crawl #%d done, took %.6f seconds" % \ + (self.crawls, time.time() - self.start)) + self.start = t1 + should_display_info = self.start - self.lastreport['time'] >= 60 + if should_display_info: + logging.info("%d crawls, %d turns", + self.crawls - self.lastreport['crawls'], + self.turns - self.lastreport['turns']) + self.lastreport.update(crawls = self.crawls, + turns = self.turns, + time = self.start) + self.crawl() + if oneshot: + return + time.sleep(self.sleep_interval) + @classmethod def _checkpt_param(cls, chkpt, prm, xtimish=True): """use config backend to lookup a parameter belonging to @@ -443,32 +555,37 @@ class GMasterBase(object): return ts def get_extra_info(self): - str_info="\nFile synced : %d" %(self.filesSynced) - str_info+="\nBytes Synced : %d KB" %(self.syncer.bytesSynced) - str_info+="\nSync Time : %f seconds" %(self.syncTime) - self.crawlTime=datetime.now()-self.crawlStartTime - years , days =divmod(self.crawlTime.days,365.25) - years=int(years) - days=int(days) + str_info = "\nFilesSynced=%d;" % (self.crawl_stats['files_synced']) + str_info += "BytesSynced=%s;" % (self.crawl_stats['bytes_synced']) + + self.crawl_stats['crawl_time'] = datetime.now() - self.crawl_stats['crawl_starttime'] + + str_info += "Uptime=%s;" % (self._crawl_time_format(self.crawl_stats['crawl_time'])) + str_info += "SyncTime=%s;" % (self.crawl_stats['sync_time']) + str_info += "TotalSyncTime=%s;" % (self.total_crawl_stats['sync_time']) + str_info += "TotalFilesSynced=%d;" % (self.total_crawl_stats['files_synced']) + str_info += "TotalBytesSynced=%s;" % (self.total_crawl_stats['bytes_synced']) + str_info += "\0" + logging.debug(str_info) + return str_info + + def _crawl_time_format(self, crawl_time): + # Ex: 5 years, 4 days, 20:23:10 + years, days = divmod(crawl_time.days, 365.25) + years = int(years) + days = int(days) date="" - m, s = divmod(self.crawlTime.seconds, 60) + m, s = divmod(crawl_time.seconds, 60) h, m = divmod(m, 60) - if years!=0 : - date+=str(years)+" year " - if days!=0 : - date+=str(days)+" day " - if h!=0 : - date+=str(h)+" H : " - if m!=0 or h!=0 : - date+=str(m)+" M : " - - date+=str(s)+" S" - self.crawlTime=date - str_info+="\nCrawl Time : %s" %(str(self.crawlTime)) - str_info+="\n\0" - return str_info + if years != 0: + date += "%s %s " % (years, "year" if years == 1 else "years") + if days != 0: + date += "%s %s " % (days, "day" if days == 1 else "days") + + date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2)) + return date def checkpt_service(self, chan, chkpt, tgt): """checkpoint service loop @@ -517,7 +634,7 @@ class GMasterBase(object): try: conn, _ = chan.accept() try: - conn.send(" | checkpoint %s %s %s" % (chkpt, status,self.get_extra_info())) + conn.send(" | checkpoint %s %s %s" % (chkpt, status, self.get_extra_info())) except: exc = sys.exc_info()[1] if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ @@ -536,7 +653,7 @@ class GMasterBase(object): ): return chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - state_socket = os.path.join(gconf.socketdir, md5(gconf.state_socket_unencoded).hexdigest() + ".socket") + state_socket = os.path.join(gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket") try: os.unlink(state_socket) except: @@ -559,22 +676,6 @@ class GMasterBase(object): t.start() self.checkpoint_thread = t - def crawl_loop(self): - """start the keep-alive thread and iterate .crawl""" - timo = int(gconf.timeout or 0) - if timo > 0: - def keep_alive(): - while True: - vi, gap = self.keepalive_payload_hook(timo, timo * 0.5) - self.slave.server.keep_alive(vi) - time.sleep(gap) - t = Thread(target=keep_alive) - t.start() - self.lastreport['time'] = time.time() - self.crawlStartTime=datetime.now() - while not self.terminate: - self.crawl() - def add_job(self, path, label, job, *a, **kw): """insert @job function to job table at @path with @label""" if self.jobtab.get(path) == None: @@ -600,7 +701,7 @@ class GMasterBase(object): ret = j[-1]() if not ret: succeed = False - if succeed: + if succeed and not args[0] == None: self.sendmark(path, *args) return succeed @@ -653,6 +754,319 @@ class GMasterBase(object): tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate)) return newstate, param.state_change +class GMasterChangelogMixin(GMasterCommon): + """ changelog based change detection and syncing """ + + # index for change type and entry + IDX_START = 0 + IDX_END = 2 + + POS_GFID = 0 + POS_TYPE = 1 + POS_ENTRY1 = 2 + POS_ENTRY2 = 3 # renames + + _CL_TYPE_DATA_PFX = "D " + _CL_TYPE_METADATA_PFX = "M " + _CL_TYPE_ENTRY_PFX = "E " + + TYPE_GFID = [_CL_TYPE_DATA_PFX] # ignoring metadata ops + TYPE_ENTRY = [_CL_TYPE_ENTRY_PFX] + + # flat directory heirarchy for gfid based access + FLAT_DIR_HIERARCHY = '.' + + def fallback_xsync(self): + logging.info('falling back to xsync mode') + gconf.configinterface.set('change-detector', 'xsync') + selfkill() + + def setup_working_dir(self): + workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path)) + logfile = os.path.join(workdir, 'changes.log') + logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile)) + return (workdir, logfile) + + def lstat(self, e): + try: + return os.lstat(e) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + return ex.errno + else: + raise + + # sync data + def syncdata(self, datas): + logging.debug('datas: %s' % (datas)) + for data in datas: + logging.debug('candidate for syncing %s' % data) + pb = self.syncer.add(data) + timeA = datetime.now() + def regjob(se, xte, pb): + rv = pb.wait() + if rv[0]: + logging.debug('synced ' + se) + # update stats + timeB = datetime.now() + self.crawl_stats['last_synctime'] = timeB - timeA + self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) + self.crawl_stats['files_synced'] += 1 + self.crawl_stats['bytes_synced'] += self.syncer.bytes_synced + + # cumulative statistics + self.total_crawl_stats['bytes_synced'] += self.syncer.bytes_synced + self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) + self.total_crawl_stats['files_synced'] += 1 + return True + else: + if rv[1] in [23, 24]: + # stat to check if the file exist + st = self.lstat(se) + if isinstance(st, int): + # file got unlinked in the interim + return True + logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) + self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb) + if self.wait(self.FLAT_DIR_HIERARCHY, None): + self.update_crawl_data() + return True + + def process_change(self, change, done): + clist = [] + entries = [] + purges = set() + links = set() + datas = set() + pfx = gauxpfx() + try: + f = open(change, "r") + clist = f.readlines() + f.close() + except IOError: + raise + + def edct(op, **ed): + dct = {} + dct['op'] = op + for k in ed: + if k == 'stat': + st = ed[k] + dst = dct['stat'] = {} + dst['uid'] = st.st_uid + dst['gid'] = st.st_gid + dst['mode'] = st.st_mode + else: + dct[k] = ed[k] + return dct + for e in clist: + e = e.strip() + et = e[self.IDX_START:self.IDX_END] + ec = e[self.IDX_END:].split(' ') + if et in self.TYPE_ENTRY: + ty = ec[self.POS_TYPE] + en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1])) + gfid = ec[self.POS_GFID] + # definitely need a better way bucketize entry ops + if ty in ['UNLINK', 'RMDIR']: + entries.append(edct(ty, gfid=gfid, entry=en)) + purges.update([os.path.join(pfx, gfid)]) + continue + if not ty == 'RENAME': + go = os.path.join(pfx, gfid) + st = self.lstat(go) + if isinstance(st, int): + logging.debug('file %s got purged in the interim' % go) + continue + if ty in ['CREATE', 'MKDIR', 'MKNOD']: + entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) + elif ty == 'LINK': + entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) + links.update([os.path.join(pfx, gfid)]) + elif ty == 'SYMLINK': + entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=os.readlink(en))) + elif ty == 'RENAME': + e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2])) + entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2)) + else: + pass + elif et in self.TYPE_GFID: + da = os.path.join(pfx, ec[0]) + st = self.lstat(da) + if isinstance(st, int): + logging.debug('file %s got purged in the interim' % da) + continue + datas.update([da]) + logging.debug('entries: %s' % repr(entries)) + # sync namespace + if (entries): + self.slave.server.entry_ops(entries) + # sync data + if self.syncdata(datas - (purges - links)): + if done: + self.master.server.changelog_done(change) + return True + + def process(self, changes, done=1): + for change in changes: + times = 0 + while True: + times += 1 + logging.debug('processing change %s [%d time(s)]' % (change, times)) + if self.process_change(change, done): + break + # it's either entry_ops() or Rsync that failed to do it's + # job. Mostly it's entry_ops() [which currently has a problem + # of failing to create an entry but failing to return an errno] + # Therefore we do not know if it's either Rsync or the freaking + # entry_ops() that failed... so we retry the _whole_ changelog + # again. + # TODO: remove entry retries when it's gets fixed. + logging.warn('incomplete sync, retrying changelog: %s' % change) + time.sleep(0.5) + self.turns += 1 + + def upd_stime(self, stime): + if stime: + self.sendmark(self.FLAT_DIR_HIERARCHY, stime) + + def crawl(self): + changes = [] + try: + self.master.server.changelog_scan() + self.crawls += 1 + except OSError: + self.fallback_xsync() + changes = self.master.server.changelog_getchanges() + if changes: + xtl = self.xtime(self.FLAT_DIR_HIERARCHY) + if isinstance(xtl, int): + raise GsyncdError('master is corrupt') + logging.debug('processing changes %s' % repr(changes)) + self.process(changes) + self.upd_stime(xtl) + + def register(self): + (workdir, logfile) = self.setup_working_dir() + self.sleep_interval = int(gconf.change_interval) + # register with the changelog library + try: + # 9 == log level (DEBUG) + # 5 == connection retries + self.master.server.changelog_register(gconf.local_path, + workdir, logfile, 9, 5) + except OSError: + self.fallback_xsync() + # control should not reach here + raise + +class GMasterXsyncMixin(GMasterChangelogMixin): + """ + + This crawl needs to be xtime based (as of now + it's not. this is beacuse we generate CHANGELOG + file during each crawl which is then processed + by process_change()). + For now it's used as a one-shot initial sync + mechanism and only syncs directories, regular + files and symlinks. + """ + + def register(self): + self.sleep_interval = 60 + self.tempdir = self.setup_working_dir()[0] + self.tempdir = os.path.join(self.tempdir, 'xsync') + logging.info('xsync temp directory: %s' % self.tempdir) + try: + os.makedirs(self.tempdir) + except OSError: + ex = sys.exc_info()[1] + if ex.errno == EEXIST and os.path.isdir(self.tempdir): + pass + else: + raise + + def write_entry_change(self, prefix, data=[]): + self.fh.write("%s %s\n" % (prefix, ' '.join(data))) + + def open(self): + try: + self.xsync_change = os.path.join(self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time()))) + self.fh = open(self.xsync_change, 'w') + except IOError: + raise + + def close(self): + self.fh.close() + + def fname(self): + return self.xsync_change + + def crawl(self, path='.', xtr=None, done=0): + """ generate a CHANGELOG file consumable by process_change """ + if path == '.': + self.open() + self.crawls += 1 + if not xtr: + # get the root stime and use it for all comparisons + xtr = self.xtime('.', self.slave) + if isinstance(xtr, int): + if xtr != ENOENT: + raise GsyncdError('slave is corrupt') + xtr = self.minus_infinity + xtl = self.xtime(path) + if isinstance(xtl, int): + raise GsyncdError('master is corrupt') + if xtr == xtl: + if path == '.': + self.close() + return + self.xtime_reversion_hook(path, xtl, xtr) + logging.debug("entering " + path) + dem = self.master.server.entries(path) + pargfid = self.master.server.gfid(path) + if isinstance(pargfid, int): + logging.warn('skipping directory %s' % (path)) + for e in dem: + bname = e + e = os.path.join(path, e) + st = self.lstat(e) + if isinstance(st, int): + logging.warn('%s got purged in the interim..' % e) + continue + gfid = self.master.server.gfid(e) + if isinstance(gfid, int): + logging.warn('skipping entry %s..' % (e)) + continue + xte = self.xtime(e) + if isinstance(xte, int): + raise GsyncdError('master is corrupt') + if not self.need_sync(e, xte, xtr): + continue + mo = st.st_mode + if stat.S_ISDIR(mo): + self.write_entry_change("E", [gfid, 'MKDIR', escape(os.path.join(pargfid, bname))]) + self.crawl(e, xtr) + elif stat.S_ISREG(mo): + self.write_entry_change("E", [gfid, 'CREATE', escape(os.path.join(pargfid, bname))]) + self.write_entry_change("D", [gfid]) + elif stat.S_ISLNK(mo): + self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))]) + else: + logging.info('ignoring %s' % e) + if path == '.': + logging.info('processing xsync changelog %s' % self.fname()) + self.close() + self.process([self.fname()], done) + self.upd_stime(xtl) + +class GMasterXtimeMixin(GMasterCommon): + """ xtime based change detection and syncing """ + + def register(self): + pass + def crawl(self, path='.', xtl=None): """crawling... @@ -691,46 +1105,6 @@ class GMasterBase(object): assert that the file systems (master / slave) underneath do not change and actions taken upon some condition will not lose their context by the time they are performed. """ - if path == '.': - if self.start: - self.crawls += 1 - logging.debug("... crawl #%d done, took %.6f seconds" % \ - (self.crawls, time.time() - self.start)) - time.sleep(1) - self.start = time.time() - should_display_info = self.start - self.lastreport['time'] >= 60 - if should_display_info: - logging.info("completed %d crawls, %d turns", - self.crawls - self.lastreport['crawls'], - self.turns - self.lastreport['turns']) - self.lastreport.update(crawls = self.crawls, - turns = self.turns, - time = self.start) - volinfo_sys, state_change = self.volinfo_hook() - if self.inter_master: - self.volinfo = volinfo_sys[self.KFGN] - else: - self.volinfo = volinfo_sys[self.KNAT] - if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master): - logging.info('new master is %s', self.uuid) - if self.volinfo: - logging.info("%s master with volume id %s ..." % \ - (self.inter_master and "intermediate" or "primary", - self.uuid)) - if state_change == self.KFGN: - gconf.configinterface.set('volume_id', self.uuid) - if self.volinfo: - if self.volinfo['retval']: - raise GsyncdError ("master is corrupt") - self.start_checkpoint_thread() - else: - if should_display_info or self.crawls == 0: - if self.inter_master: - logging.info("waiting for being synced from %s ..." % \ - self.volinfo_state[self.KFGN]['uuid']) - else: - logging.info("waiting for volume info ...") - return logging.debug("entering " + path) if not xtl: xtl = self.xtime(path) @@ -806,6 +1180,7 @@ class GMasterBase(object): st = indulgently(e, lambda e: os.lstat(e)) if st == False: continue + mo = st.st_mode adct = {'own': (st.st_uid, st.st_gid)} if stat.S_ISLNK(mo): @@ -815,16 +1190,19 @@ class GMasterBase(object): elif stat.S_ISREG(mo): logging.debug("syncing %s ..." % e) pb = self.syncer.add(e) - timeA=datetime.now() + timeA = datetime.now() def regjob(e, xte, pb): - if pb.wait(): + if pb.wait()[0]: logging.debug("synced " + e) self.sendmark_regular(e, xte) - - timeB=datetime.now() - self.lastSyncTime=timeB-timeA - self.syncTime=(self.syncTime+self.lastSyncTime.microseconds)/(10.0**6) - self.filesSynced=self.filesSynced+1 + # update stats + timeB = datetime.now() + self.crawl_stats['last_synctime'] = timeB - timeA + self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) + self.crawl_stats['files_synced'] += 1 + self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) + self.total_crawl_stats['files_synced'] += 1 + self.update_crawl_data() return True else: logging.warn("failed to sync " + e) @@ -841,6 +1219,7 @@ class GMasterBase(object): if path == '.': self.wait(path, xtl) + class BoxClosedErr(Exception): pass @@ -920,7 +1299,7 @@ class Syncer(object): self.slave = slave self.lock = Lock() self.pb = PostBox() - self.bytesSynced=0 + self.bytes_synced = 0 for i in range(int(gconf.sync_jobs)): t = Thread(target=self.syncjob) t.start() @@ -940,13 +1319,10 @@ class Syncer(object): pb.close() po = self.slave.rsync(pb) if po.returncode == 0: - regEx=re.search('\ *total\ *transferred\ *file\ *size:\ *(\d+)\ *bytes\ *',po.stdout.read(),re.IGNORECASE) - if regEx: - self.bytesSynced+=(int(regEx.group(1)))/1024 - ret = True + ret = (True, 0) elif po.returncode in (23, 24): # partial transfer (cf. rsync(1)), that's normal - ret = False + ret = (False, po.returncode) else: po.errfail() pb.wakeup(ret) diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index b8956dcc2b9..badd0d9c5f8 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -3,26 +3,94 @@ import sys import time import signal import logging +import uuid +import xml.etree.ElementTree as XET +from subprocess import PIPE +from resource import Popen, FILE, GLUSTER, SSH +from threading import Lock from gconf import gconf -from syncdutils import update_file, select, waitpid, set_term_handler +from syncdutils import update_file, select, waitpid, set_term_handler, is_host_local, GsyncdError +from syncdutils import escape, Thread, finalize, memoize + +class Volinfo(object): + def __init__(self, vol, host='localhost', prelude=[]): + po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, 'volume', 'info', vol], + stdout=PIPE, stderr=PIPE) + vix = po.stdout.read() + po.wait() + po.terminate_geterr() + vi = XET.fromstring(vix) + if vi.find('opRet').text != '0': + if prelude: + via = '(via %s) ' % prelude.join(' ') + else: + via = ' ' + raise GsyncdError('getting volume info of %s%s failed with errorcode %s', + (vol, via, vi.find('opErrno').text)) + self.tree = vi + self.volume = vol + self.host = host + + def get(self, elem): + return self.tree.findall('.//' + elem) + + @property + @memoize + def bricks(self): + def bparse(b): + host, dirp = b.text.split(':', 2) + return {'host': host, 'dir': dirp} + return [ bparse(b) for b in self.get('brick') ] + + @property + @memoize + def uuid(self): + ids = self.get('id') + if len(ids) != 1: + raise GsyncdError("volume info of %s obtained from %s: ambiguous uuid", + self.volume, self.host) + return ids[0].text + class Monitor(object): """class which spawns and manages gsyncd workers""" + ST_INIT = 'Initializing...' + ST_STABLE = 'Stable' + ST_FAULTY = 'faulty' + ST_INCON = 'inconsistent' + _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON] + def __init__(self): - self.state = None + self.lock = Lock() + self.state = {} - def set_state(self, state): + def set_state(self, state, w=None): """set the state that can be used by external agents like glusterd for status reporting""" - if state == self.state: - return - self.state = state - logging.info('new state: %s' % state) - if getattr(gconf, 'state_file', None): - update_file(gconf.state_file, lambda f: f.write(state + '\n')) - - def monitor(self): + computestate = lambda: self.state and self._ST_ORD[max(self._ST_ORD.index(s) for s in self.state.values())] + if w: + self.lock.acquire() + old_state = computestate() + self.state[w] = state + state = computestate() + self.lock.release() + if state != old_state: + self.set_state(state) + else: + logging.info('new state: %s' % state) + if getattr(gconf, 'state_file', None): + update_file(gconf.state_file, lambda f: f.write(state + '\n')) + + @staticmethod + def terminate(): + # relax one SIGTERM by setting a handler that sets back + # standard handler + set_term_handler(lambda *a: set_term_handler()) + # give a chance to graceful exit + os.kill(-os.getpid(), signal.SIGTERM) + + def monitor(self, w, argv, cpids): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -41,27 +109,8 @@ class Monitor(object): blown worker blows up on EPIPE if the net goes down, due to the keep-alive thread) """ - def sigcont_handler(*a): - """ - Re-init logging and send group kill signal - """ - md = gconf.log_metadata - logging.shutdown() - lcls = logging.getLoggerClass() - lcls.setup(label=md.get('saved_label'), **md) - pid = os.getpid() - os.kill(-pid, signal.SIGUSR1) - signal.signal(signal.SIGUSR1, lambda *a: ()) - signal.signal(signal.SIGCONT, sigcont_handler) - - argv = sys.argv[:] - for o in ('-N', '--no-daemon', '--monitor'): - while o in argv: - argv.remove(o) - argv.extend(('-N', '-p', '')) - argv.insert(0, os.path.basename(sys.executable)) - self.set_state('starting...') + self.set_state(self.ST_INIT, w) ret = 0 def nwait(p, o=0): p2, r = waitpid(p, o) @@ -83,7 +132,13 @@ class Monitor(object): cpid = os.fork() if cpid == 0: os.close(pr) - os.execv(sys.executable, argv + ['--feedback-fd', str(pw)]) + os.execv(sys.executable, argv + ['--feedback-fd', str(pw), + '--local-path', w[0], + '--local-id', '.' + escape(w[0]), + '--resource-remote', w[1]]) + self.lock.acquire() + cpids.add(cpid) + self.lock.release() os.close(pw) t0 = time.time() so = select((pr,), (), (), conn_timeout)[0] @@ -103,27 +158,104 @@ class Monitor(object): else: logging.debug("worker not confirmed in %d sec, aborting it" % \ conn_timeout) - # relax one SIGTERM by setting a handler that sets back - # standard handler - set_term_handler(lambda *a: set_term_handler()) - # give a chance to graceful exit - os.kill(-os.getpid(), signal.SIGTERM) + self.terminate() time.sleep(1) os.kill(cpid, signal.SIGKILL) ret = nwait(cpid) if ret == None: - self.set_state('OK') + self.set_state(self.ST_STABLE, w) ret = nwait(cpid) if exit_signalled(ret): ret = 0 else: ret = exit_status(ret) if ret in (0,1): - self.set_state('faulty') + self.set_state(self.ST_FAULTY, w) time.sleep(10) - self.set_state('inconsistent') + self.set_state(self.ST_INCON, w) return ret -def monitor(): + def multiplex(self, wspx, suuid): + def sigcont_handler(*a): + """ + Re-init logging and send group kill signal + """ + md = gconf.log_metadata + logging.shutdown() + lcls = logging.getLoggerClass() + lcls.setup(label=md.get('saved_label'), **md) + pid = os.getpid() + os.kill(-pid, signal.SIGUSR1) + signal.signal(signal.SIGUSR1, lambda *a: ()) + signal.signal(signal.SIGCONT, sigcont_handler) + + argv = sys.argv[:] + for o in ('-N', '--no-daemon', '--monitor'): + while o in argv: + argv.remove(o) + argv.extend(('-N', '-p', '', '--slave-id', suuid)) + argv.insert(0, os.path.basename(sys.executable)) + + cpids = set() + ta = [] + for wx in wspx: + def wmon(w): + cpid, _ = self.monitor(w, argv, cpids) + terminate() + time.sleep(1) + self.lock.acquire() + for cpid in cpids: + os.kill(cpid, signal.SIGKILL) + self.lock.release() + finalize(exval=1) + t = Thread(target = wmon, args=[wx]) + t.start() + ta.append(t) + for t in ta: + t.join() + +def distribute(*resources): + master, slave = resources + mvol = Volinfo(master.volume, master.host) + logging.debug('master bricks: ' + repr(mvol.bricks)) + locmbricks = [ b['dir'] for b in mvol.bricks if is_host_local(b['host']) ] + prelude = [] + si = slave + if isinstance(slave, SSH): + prelude = gconf.ssh_command.split() + [slave.remote_addr] + si = slave.inner_rsc + logging.debug('slave SSH gateway: ' + slave.remote_addr) + if isinstance(si, FILE): + sbricks = {'host': 'localhost', 'dir': si.path} + suuid = uuid.uuid5(uuid.NAMESPACE_URL, slave.get_url(canonical=True)) + elif isinstance(si, GLUSTER): + svol = Volinfo(si.volume, si.host, prelude) + sbricks = svol.bricks + suuid = svol.uuid + else: + raise GsyncdError("unkown slave type " + slave.url) + logging.info('slave bricks: ' + repr(sbricks)) + if isinstance(si, FILE): + slaves = [ slave.url ] + else: + slavenodes = set(b['host'] for b in sbricks) + if isinstance(slave, SSH) and not gconf.isolated_slave: + rap = SSH.parse_ssh_address(slave.remote_addr) + slaves = [ 'ssh://' + rap['user'] + '@' + h + ':' + si.url for h in slavenodes ] + else: + slavevols = [ h + ':' + si.volume for h in slavenodes ] + if isinstance(slave, SSH): + slaves = [ 'ssh://' + rap.remote_addr + ':' + v for v in slavevols ] + else: + slaves = slavevols + locmbricks.sort() + slaves.sort() + workerspex = [] + for i in range(len(locmbricks)): + workerspex.append((locmbricks[i], slaves[i % len(slaves)])) + logging.info('worker specs: ' + repr(workerspex)) + return workerspex, suuid + +def monitor(*resources): """oh yeah, actually Monitor is used as singleton, too""" - return Monitor().monitor() + return Monitor().multiplex(*distribute(*resources)) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 73102fbcb44..52989fe28cc 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -5,13 +5,14 @@ import stat import time import fcntl import errno +import types import struct import socket import logging import tempfile import threading import subprocess -from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR +from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR, ENOTEMPTY from select import error as SelectError from gconf import gconf @@ -19,7 +20,8 @@ import repce from repce import RepceServer, RepceClient from master import gmaster_builder import syncdutils -from syncdutils import GsyncdError, select, privileged, boolify +from syncdutils import GsyncdError, select, privileged, boolify, funcode +from syncdutils import umask, entry2pb, gauxpfx, errno_wrap UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) @@ -105,7 +107,18 @@ class _MetaXattr(object): setattr(self, m, getattr(LXattr, m)) return getattr(self, meth) +class _MetaChangelog(object): + def __getattr__(self, meth): + from libgfchangelog import Changes as LChanges + xmeth = [ m for m in dir(LChanges) if m[0] != '_' ] + if not meth in xmeth: + return + for m in xmeth: + setattr(self, m, getattr(LChanges, m)) + return getattr(self, meth) + Xattr = _MetaXattr() +Changes = _MetaChangelog() class Popen(subprocess.Popen): @@ -245,10 +258,24 @@ class Server(object): and classmethods and is used directly, without instantiation.) """ - GX_NSPACE = (privileged() and "trusted" or "system") + ".glusterfs" + GX_NSPACE_PFX = (privileged() and "trusted" or "system") + GX_NSPACE = GX_NSPACE_PFX + ".glusterfs" NTV_FMTSTR = "!" + "B"*19 + "II" FRGN_XTRA_FMT = "I" FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT + GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' + + local_path = '' + + @classmethod + def _fmt_mknod(cls, l): + return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l+1) + @classmethod + def _fmt_mkdir(cls, l): + return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l+1) + @classmethod + def _fmt_symlink(cls, l1, l2): + return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1+1, l2+1) def _pathguard(f): """decorator method that checks @@ -257,22 +284,21 @@ class Server(object): point out of the managed tree """ - fc = getattr(f, 'func_code', None) - if not fc: - # python 3 - fc = f.__code__ + fc = funcode(f) pi = list(fc.co_varnames).index('path') def ff(*a): path = a[pi] ps = path.split('/') if path[0] == '/' or '..' in ps: raise ValueError('unsafe path') + a = list(a) + a[pi] = os.path.join(a[0].local_path, path) return f(*a) return ff - @staticmethod + @classmethod @_pathguard - def entries(path): + def entries(cls, path): """directory entries in an array""" # prevent symlinks being followed if not stat.S_ISDIR(os.lstat(path).st_mode): @@ -371,6 +397,18 @@ class Server(object): raise @classmethod + def gfid(cls, gfidpath): + return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid', cls.GX_GFID_CANONICAL_LEN], [ENOENT]) + + @classmethod + def node_uuid(cls, path='.'): + try: + uuid_l = Xattr.lgetxattr_buf(path, '.'.join([cls.GX_NSPACE, 'node-uuid'])) + return uuid_l[:-1].split(' ') + except OSError: + raise + + @classmethod def xtime_vec(cls, path, *uuids): """vectored version of @xtime @@ -402,9 +440,96 @@ class Server(object): for u,t in mark_dct.items(): cls.set_xtime(path, u, t) - @staticmethod + @classmethod + def entry_ops(cls, entries): + pfx = gauxpfx() + logging.debug('entries: %s' % repr(entries)) + # regular file + def entry_pack_reg(gf, bn, st): + blen = len(bn) + mo = st['mode'] + return struct.pack(cls._fmt_mknod(blen), + st['uid'], st['gid'], + gf, mo, bn, + stat.S_IMODE(mo), 0, umask()) + # mkdir + def entry_pack_mkdir(gf, bn, st): + blen = len(bn) + mo = st['mode'] + return struct.pack(cls._fmt_mkdir(blen), + st['uid'], st['gid'], + gf, mo, bn, + stat.S_IMODE(mo), umask()) + #symlink + def entry_pack_symlink(gf, bn, lnk, st): + blen = len(bn) + llen = len(lnk) + return struct.pack(cls._fmt_symlink(blen, llen), + st['uid'], st['gid'], + gf, st['mode'], bn, lnk) + def entry_purge(entry, gfid): + # This is an extremely racy code and needs to be fixed ASAP. + # The GFID check here is to be sure that the pargfid/bname + # to be purged is the GFID gotten from the changelog. + # (a stat(changelog_gfid) would also be valid here) + # The race here is between the GFID check and the purge. + disk_gfid = cls.gfid(entry) + if isinstance(disk_gfid, int): + return + if not gfid == disk_gfid: + return + er = errno_wrap(os.unlink, [entry], [ENOENT, EISDIR]) + if isinstance(er, int): + if er == EISDIR: + er = errno_wrap(os.rmdir, [entry], [ENOENT, ENOTEMPTY]) + if er == ENOTEMPTY: + return er + for e in entries: + blob = None + op = e['op'] + gfid = e['gfid'] + entry = e['entry'] + (pg, bname) = entry2pb(entry) + if op in ['RMDIR', 'UNLINK']: + while True: + er = entry_purge(entry, gfid) + if isinstance(er, int): + time.sleep(1) + else: + break + elif op == 'CREATE': + blob = entry_pack_reg(gfid, bname, e['stat']) + elif op == 'MKDIR': + blob = entry_pack_mkdir(gfid, bname, e['stat']) + elif op == 'LINK': + errno_wrap(os.link, [os.path.join(pfx, gfid), entry], [ENOENT, EEXIST]) + elif op == 'SYMLINK': + blob = entry_pack_symlink(gfid, bname, e['link'], e['stat']) + elif op == 'RENAME': + en = e['entry1'] + errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST]) + if blob: + errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [ENOENT, EEXIST]) + + @classmethod + def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0): + Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) + + @classmethod + def changelog_scan(cls): + Changes.cl_scan() + + @classmethod + def changelog_getchanges(cls): + return Changes.cl_getchanges() + + @classmethod + def changelog_done(cls, clfile): + Changes.cl_done(clfile) + + @classmethod @_pathguard - def setattr(path, adct): + def setattr(cls, path, adct): """set file attributes @adct is a dict, where 'own', 'mode' and 'times' @@ -537,10 +662,10 @@ class SlaveRemote(object): raise GsyncdError("no files to sync") logging.debug("files: " + ", ".join(files)) argv = gconf.rsync_command.split() + \ - ['-aR0', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \ + ['-avR0', '--inplace', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \ gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \ ['.'] + list(args) - po = Popen(argv, stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE) + po = Popen(argv, stdin=subprocess.PIPE,stderr=subprocess.PIPE) for f in files: po.stdin.write(f) po.stdin.write('\0') @@ -685,7 +810,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def can_connect_to(self, remote): """determine our position in the connectibility matrix""" - return True + return not remote or \ + (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER)) class Mounter(object): """Abstract base class for mounter backends""" @@ -864,6 +990,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): sup(self, *a, **kw) self.slavedir = "/proc/%d/cwd" % self.server.pid() + def gmaster_instantiate_tuple(self, slave): + """return a tuple of the 'one shot' and the 'main crawl' class instance""" + return (gmaster_builder('xsync')(self, slave), gmaster_builder()(self, slave)) + def service_loop(self, *args): """enter service loop @@ -873,7 +1003,41 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): - else do that's what's inherited """ if args: - gmaster_builder()(self, args[0]).crawl_loop() + slave = args[0] + if gconf.local_path: + class brickserver(FILE.FILEServer): + local_path = gconf.local_path + aggregated = self.server + @classmethod + def entries(cls, path): + e = super(brickserver, cls).entries(path) + # on the brick don't mess with /.glusterfs + if path == '.': + try: + e.remove('.glusterfs') + except ValueError: + pass + return e + if gconf.slave_id: + # define {,set_}xtime in slave, thus preempting + # the call to remote, so that it takes data from + # the local brick + slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server) + slave.server.set_xtime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_xtime(path, uuid + '.' + gconf.slave_id, mark), slave.server) + (g1, g2) = self.gmaster_instantiate_tuple(slave) + g1.master.server = brickserver + g2.master.server = brickserver + else: + (g1, g2) = self.gmaster_instantiate_tuple(slave) + g1.master.server.aggregated = gmaster.master.server + g2.master.server.aggregated = gmaster.master.server + # bad bad bad: bad way to do things like this + # need to make this elegant + # register the crawlers and start crawling + g1.register() + g2.register() + g1.crawlwrap(oneshot=True) + g2.crawlwrap() else: sup(self, *args) @@ -893,13 +1057,18 @@ class SSH(AbstractUrl, SlaveRemote): '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ])) self.inner_rsc = parse_url(inner_url) - def canonical_path(self): - m = re.match('([^@]+)@(.+)', self.remote_addr) + @staticmethod + def parse_ssh_address(addr): + m = re.match('([^@]+)@(.+)', addr) if m: u, h = m.groups() else: - u, h = syncdutils.getusername(), self.remote_addr - remote_addr = '@'.join([u, gethostbyname(h)]) + u, h = syncdutils.getusername(), addr + return {'user': u, 'host': h} + + def canonical_path(self): + rap = self.parse_ssh_address(self.remote_addr) + remote_addr = '@'.join([rap['user'], gethostbyname(rap['host'])]) return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)]) def can_connect_to(self, remote): diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 0764c07904d..720200018e5 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -5,8 +5,9 @@ import time import fcntl import shutil import logging +import socket from threading import Lock, Thread as baseThread -from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, errorcode +from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, ENOENT, EPERM, ESTALE, errorcode from signal import signal, SIGTERM, SIGKILL from time import sleep import select as oselect @@ -25,6 +26,15 @@ try: except ImportError: import urllib +try: + from hashlib import md5 as md5 +except ImportError: + # py 2.4 + from md5 import new as md5 + +# auxillary gfid based access prefix +_CL_AUX_GFID_PFX = ".gfid/" + def escape(s): """the chosen flavor of string escaping, used all over to turn whatever data to creatable representation""" @@ -286,3 +296,93 @@ def waitpid (*a): def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})): signal(SIGTERM, hook) + +def is_host_local(host): + locaddr = False + for ai in socket.getaddrinfo(host, None): + # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators/mgmt/glusterd/src/glusterd-utils.c#L125 + if ai[0] == socket.AF_INET: + if ai[-1][0].split(".")[0] == "127": + locaddr = True + break + elif ai[0] == socket.AF_INET6: + if ai[-1][0] == "::1": + locaddr = True + break + else: + continue + try: + # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue, + # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587 + s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP) + except socket.error: + ex = sys.exc_info()[1] + if ex.errno != EPERM: + raise + f = None + try: + f = open("/proc/sys/net/ipv4/ip_nonlocal_bind") + if int(f.read()) != 0: + raise GsyncdError( + "non-local bind is set and not allowed to create raw sockets, " + "cannot determine if %s is local" % host) + s = socket.socket(ai[0], socket.SOCK_DGRAM) + finally: + if f: + f.close() + try: + s.bind(ai[-1]) + locaddr = True + break + except: + pass + s.close() + return locaddr + +def funcode(f): + fc = getattr(f, 'func_code', None) + if not fc: + # python 3 + fc = f.__code__ + return fc + +def memoize(f): + fc = funcode(f) + fn = fc.co_name + def ff(self, *a, **kw): + rv = getattr(self, '_' + fn, None) + if rv == None: + rv = f(self, *a, **kw) + setattr(self, '_' + fn, rv) + return rv + return ff + +def umask(): + return os.umask(0) + +def entry2pb(e): + return e.rsplit('/', 1) + +def gauxpfx(): + return _CL_AUX_GFID_PFX + +def md5hex(s): + return md5(s).hexdigest() + +def selfkill(sig=SIGTERM): + os.kill(os.getpid(), sig) + +def errno_wrap(call, arg=[], errnos=[]): + """ wrapper around calls resilient to errnos. + retry in case of ESTALE + """ + while True: + try: + return call(*arg) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in errnos: + return ex.errno + if not ex.errno == ESTALE: + raise + time.sleep(0.5) # retry the call |