From d354ea0a05a3b6a13d227b567a81afdb8ea46abb Mon Sep 17 00:00:00 2001 From: Saravanakumar Arumugam Date: Wed, 24 Aug 2016 15:19:53 +0530 Subject: geo-rep: add geo-rep events for server side changes Event Type defined in #15351 to avoid merge conflicts Add geo-rep events applicable to changes in geo-rep session in the server side. Change-Id: Ia66574d2abccad7fce6a96667efbc7c6c8903fc6 BUG: 1370445 Signed-off-by: Saravanakumar Arumugam Reviewed-on: http://review.gluster.org/15328 Tested-by: Aravinda VK Smoke: Gluster Build System NetBSD-regression: NetBSD Build System CentOS-regression: Gluster Build System Reviewed-by: Aravinda VK --- geo-replication/syncdaemon/Makefile.am | 2 +- geo-replication/syncdaemon/conf.py.in | 14 + geo-replication/syncdaemon/configinterface.py | 415 +++++++++++++++++++++++ geo-replication/syncdaemon/configinterface.py.in | 414 ---------------------- geo-replication/syncdaemon/monitor.py | 41 ++- geo-replication/syncdaemon/syncdutils.py | 9 + 6 files changed, 464 insertions(+), 431 deletions(-) create mode 100644 geo-replication/syncdaemon/conf.py.in create mode 100644 geo-replication/syncdaemon/configinterface.py delete mode 100644 geo-replication/syncdaemon/configinterface.py.in (limited to 'geo-replication') diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index ce875bdacb6..88c9e64e525 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -3,6 +3,6 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \ resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py \ - gsyncdstatus.py changelogsdb.py + gsyncdstatus.py changelogsdb.py conf.py CLEANFILES = diff --git a/geo-replication/syncdaemon/conf.py.in b/geo-replication/syncdaemon/conf.py.in new file mode 100644 index 00000000000..88072789496 --- /dev/null +++ b/geo-replication/syncdaemon/conf.py.in @@ -0,0 +1,14 @@ +# +# Copyright (c) 2016 Red Hat, Inc. +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +GLUSTERFS_LIBEXECDIR = '@GLUSTERFS_LIBEXECDIR@' +GLUSTERD_WORKDIR = "@GLUSTERD_WORKDIR@" + +LOCALSTATEDIR = "@localstatedir@" diff --git a/geo-replication/syncdaemon/configinterface.py b/geo-replication/syncdaemon/configinterface.py new file mode 100644 index 00000000000..adcefb8ab52 --- /dev/null +++ b/geo-replication/syncdaemon/configinterface.py @@ -0,0 +1,415 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +try: + import ConfigParser +except ImportError: + # py 3 + import configparser as ConfigParser +import re +from string import Template +import os +import errno +import sys +from stat import ST_DEV, ST_INO, ST_MTIME +import tempfile +import shutil + +from syncdutils import escape, unescape, norm, update_file, GsyncdError +from conf import GLUSTERD_WORKDIR, LOCALSTATEDIR + +SECT_ORD = '__section_order__' +SECT_META = '__meta__' +config_version = 2.0 + +re_type = type(re.compile('')) + +TMPL_CONFIG_FILE = GLUSTERD_WORKDIR + "/geo-replication/gsyncd_template.conf" + +# (SECTION, OPTION, OLD VALUE, NEW VALUE) +CONFIGS = ( + ("peersrx . .", + "georep_session_working_dir", + "", + GLUSTERD_WORKDIR + "/geo-replication/${mastervol}_${remotehost}_" + "${slavevol}/"), + ("peersrx .", + "gluster_params", + "aux-gfid-mount xlator-option=\*-dht.assert-no-child-down=true", + "aux-gfid-mount"), + ("peersrx .", + "gluster_params", + "aux-gfid-mount", + "aux-gfid-mount acl"), + ("peersrx . .", + "ssh_command_tar", + "", + "ssh -oPasswordAuthentication=no -oStrictHostKeyChecking=no " + "-i " + GLUSTERD_WORKDIR + "/geo-replication/tar_ssh.pem"), + ("peersrx . .", + "changelog_log_file", + "", + "${iprefix}/log/glusterfs/geo-replication/${mastervol}" + "/${eSlave}${local_id}-changes.log"), + ("peersrx . .", + "working_dir", + LOCALSTATEDIR + "/run/gluster/${mastervol}/${eSlave}", + "${iprefix}/lib/misc/glusterfsd/${mastervol}/${eSlave}"), + ("peersrx . .", + "ignore_deletes", + "true", + "false"), + ("peersrx . .", + "pid-file", + GLUSTERD_WORKDIR + "/geo-replication/${mastervol}_${remotehost}_" + "${slavevol}/${eSlave}.pid", + GLUSTERD_WORKDIR + "/geo-replication/${mastervol}_${remotehost}_" + "${slavevol}/monitor.pid"), + ("peersrx . .", + "state-file", + GLUSTERD_WORKDIR + "/geo-replication/${mastervol}_${remotehost}_" + "${slavevol}/${eSlave}.status", + GLUSTERD_WORKDIR + "/geo-replication/${mastervol}_${remotehost}_" + "${slavevol}/monitor.status"), +) + + +def upgrade_config_file(path, confdata): + config_change = False + config = ConfigParser.RawConfigParser() + # If confdata.rx present then glusterd is adding config values, + # it will create config file if not exists. config.read is fine in + # this case since any other error will be raised during write. + if getattr(confdata, "rx", False): + config.read(path) + else: + with open(path) as fp: + config.readfp(fp) + + for sec, opt, oldval, newval in CONFIGS: + try: + val = config.get(sec, opt) + except ConfigParser.NoOptionError: + # if new config opt not exists + config_change = True + config.set(sec, opt, newval) + continue + except ConfigParser.Error: + """ + When gsyncd invoked at the time of create, config file + will not be their. Ignore any ConfigParser errors + """ + continue + + if val == newval: + # value is same as new val + continue + + if val == oldval: + # config value needs update + config_change = True + config.set(sec, opt, newval) + + # To convert from old peers section format to new peers section format. + # Old format: peers gluster://: \ + # ssh://root@:gluster://: + # New format: peers + for old_sect in config.sections(): + if old_sect.startswith("peers "): + peers_data = old_sect.split(" ") + mvol = peers_data[1].split("%3A")[-1] + svol = peers_data[2].split("%3A")[-1] + new_sect = "peers {0} {1}".format(mvol, svol) + + if old_sect == new_sect: + # Already in new format "peers mastervol slavevol" + continue + + # Create new section if not exists + try: + config.add_section(new_sect) + except ConfigParser.DuplicateSectionError: + pass + + config_change = True + # Add all the items of old_sect to new_sect + for key, val in config.items(old_sect): + config.set(new_sect, key, val) + + # Delete old section + config.remove_section(old_sect) + + if config_change: + tempConfigFile = tempfile.NamedTemporaryFile(mode="wb", delete=False) + with open(tempConfigFile.name, 'wb') as configFile: + config.write(configFile) + + # If src and dst are two different file system, then os.rename + # fails, In this case if temp file created in /tmp and if /tmp is + # separate fs then os.rename gives following error, so use shutil + # OSError: [Errno 18] Invalid cross-device link + # mail.python.org/pipermail/python-list/2005-February/342893.html + shutil.move(tempConfigFile.name, path) + + +class MultiDict(object): + + """a virtual dict-like class which functions as the union + of underlying dicts""" + + def __init__(self, *dd): + self.dicts = dd + + def __getitem__(self, key): + val = None + for d in self.dicts: + if d.get(key) is not None: + val = d[key] + if val is None: + raise KeyError(key) + return val + + +class GConffile(object): + + """A high-level interface to ConfigParser which flattens the two-tiered + config layout by implenting automatic section dispatch based on initial + parameters. + + Also ensure section ordering in terms of their time of addition -- a compat + hack for Python < 2.7. + """ + + def _normconfig(self): + """normalize config keys by s/-/_/g""" + for n, s in self.config._sections.items(): + if n.find('__') == 0: + continue + s2 = type(s)() + for k, v in s.items(): + if k.find('__') != 0: + k = norm(k) + s2[k] = v + self.config._sections[n] = s2 + + def __init__(self, path, peers, confdata, *dd): + """ + - .path: location of config file + - .config: underlying ConfigParser instance + - .peers: on behalf of whom we flatten .config + (master, or master-slave url pair) + - .auxdicts: template subtituents + """ + self.peers = peers + self.path = path + self.auxdicts = dd + self.config = ConfigParser.RawConfigParser() + if getattr(confdata, "rx", False): + self.config.read(path) + else: + with open(path) as fp: + self.config.readfp(fp) + + self.dev, self.ino, self.mtime = -1, -1, -1 + self._normconfig() + + def _load(self): + try: + sres = os.stat(self.path) + self.dev = sres[ST_DEV] + self.ino = sres[ST_INO] + self.mtime = sres[ST_MTIME] + except (OSError, IOError): + if sys.exc_info()[1].errno == errno.ENOENT: + sres = None + + self.config = ConfigParser.RawConfigParser() + with open(self.path) as fp: + self.config.readfp(fp) + self._normconfig() + + def get_realtime(self, opt): + try: + sres = os.stat(self.path) + except (OSError, IOError): + if sys.exc_info()[1].errno == errno.ENOENT: + sres = None + else: + raise + + # compare file system stat with that of our stream file handle + if not sres or sres[ST_DEV] != self.dev or \ + sres[ST_INO] != self.ino or self.mtime != sres[ST_MTIME]: + self._load() + + return self.get(opt, printValue=False) + + def section(self, rx=False): + """get the section name of the section representing .peers + in .config""" + peers = self.peers + if not peers: + peers = ['.', '.'] + rx = True + if rx: + return ' '.join(['peersrx'] + [escape(u) for u in peers]) + else: + return ' '.join(['peers'] + [u.split(':')[-1] for u in peers]) + + @staticmethod + def parse_section(section): + """retrieve peers sequence encoded by section name + (as urls or regexen, depending on section type) + """ + sl = section.split() + st = sl.pop(0) + sl = [unescape(u) for u in sl] + if st == 'peersrx': + sl = [re.compile(u) for u in sl] + return sl + + def ord_sections(self): + """Return an ordered list of sections. + + Ordering happens based on the auxiliary + SECT_ORD section storing indices for each + section added through the config API. + + To not to go corrupt in case of manually + written config files, we take care to append + also those sections which are not registered + in SECT_ORD. + + Needed for python 2.{4,5,6} where ConfigParser + cannot yet order sections/options internally. + """ + so = {} + if self.config.has_section(SECT_ORD): + so = self.config._sections[SECT_ORD] + so2 = {} + for k, v in so.items(): + if k != '__name__': + so2[k] = int(v) + tv = 0 + if so2: + tv = max(so2.values()) + 1 + ss = [s for s in self.config.sections() if s.find('__') != 0] + for s in ss: + if s in so.keys(): + continue + so2[s] = tv + tv += 1 + + def scmp(x, y): + return cmp(*(so2[s] for s in (x, y))) + ss.sort(scmp) + return ss + + def update_to(self, dct, allow_unresolved=False): + """update @dct from key/values of ours. + + key/values are collected from .config by filtering the regexp sections + according to match, and from .section. The values are treated as + templates, which are substituted from .auxdicts and (in case of regexp + sections) match groups. + """ + if not self.peers: + raise GsyncdError('no peers given, cannot select matching options') + + def update_from_sect(sect, mud): + for k, v in self.config._sections[sect].items(): + if k == '__name__': + continue + if allow_unresolved: + dct[k] = Template(v).safe_substitute(mud) + else: + dct[k] = Template(v).substitute(mud) + for sect in self.ord_sections(): + sp = self.parse_section(sect) + if isinstance(sp[0], re_type) and len(sp) == len(self.peers): + match = True + mad = {} + for i in range(len(sp)): + m = sp[i].search(self.peers[i]) + if not m: + match = False + break + for j in range(len(m.groups())): + mad['match%d_%d' % (i + 1, j + 1)] = m.groups()[j] + if match: + update_from_sect(sect, MultiDict(dct, mad, *self.auxdicts)) + if self.config.has_section(self.section()): + update_from_sect(self.section(), MultiDict(dct, *self.auxdicts)) + + def get(self, opt=None, printValue=True): + """print the matching key/value pairs from .config, + or if @opt given, the value for @opt (according to the + logic described in .update_to) + """ + d = {} + self.update_to(d, allow_unresolved=True) + if opt: + opt = norm(opt) + v = d.get(opt) + if v: + if printValue: + print(v) + else: + return v + else: + for k, v in d.iteritems(): + if k == '__name__': + continue + print("%s: %s" % (k, v)) + + def write(self, trfn, opt, *a, **kw): + """update on-disk config transactionally + + @trfn is the transaction function + """ + def mergeconf(f): + self.config = ConfigParser.RawConfigParser() + self.config.readfp(f) + self._normconfig() + if not self.config.has_section(SECT_META): + self.config.add_section(SECT_META) + self.config.set(SECT_META, 'version', config_version) + return trfn(norm(opt), *a, **kw) + + def updateconf(f): + self.config.write(f) + update_file(self.path, updateconf, mergeconf) + + def _set(self, opt, val, rx=False): + """set @opt to @val in .section""" + sect = self.section(rx) + if not self.config.has_section(sect): + self.config.add_section(sect) + # regarding SECT_ORD, cf. ord_sections + if not self.config.has_section(SECT_ORD): + self.config.add_section(SECT_ORD) + self.config.set( + SECT_ORD, sect, len(self.config._sections[SECT_ORD])) + self.config.set(sect, opt, val) + return True + + def set(self, opt, *a, **kw): + """perform ._set transactionally""" + self.write(self._set, opt, *a, **kw) + + def _delete(self, opt, rx=False): + """delete @opt from .section""" + sect = self.section(rx) + if self.config.has_section(sect): + return self.config.remove_option(sect, opt) + + def delete(self, opt, *a, **kw): + """perform ._delete transactionally""" + self.write(self._delete, opt, *a, **kw) diff --git a/geo-replication/syncdaemon/configinterface.py.in b/geo-replication/syncdaemon/configinterface.py.in deleted file mode 100644 index e1cf007a2b8..00000000000 --- a/geo-replication/syncdaemon/configinterface.py.in +++ /dev/null @@ -1,414 +0,0 @@ -# -# Copyright (c) 2011-2014 Red Hat, Inc. -# This file is part of GlusterFS. - -# This file is licensed to you under your choice of the GNU Lesser -# General Public License, version 3 or any later version (LGPLv3 or -# later), or the GNU General Public License, version 2 (GPLv2), in all -# cases as published by the Free Software Foundation. -# - -try: - import ConfigParser -except ImportError: - # py 3 - import configparser as ConfigParser -import re -from string import Template -import os -import errno -import sys -from stat import ST_DEV, ST_INO, ST_MTIME -import tempfile -import shutil - -from syncdutils import escape, unescape, norm, update_file, GsyncdError - -SECT_ORD = '__section_order__' -SECT_META = '__meta__' -config_version = 2.0 - -re_type = type(re.compile('')) - -TMPL_CONFIG_FILE = "@GLUSTERD_WORKDIR@/geo-replication/gsyncd_template.conf" - -# (SECTION, OPTION, OLD VALUE, NEW VALUE) -CONFIGS = ( - ("peersrx . .", - "georep_session_working_dir", - "", - "@GLUSTERD_WORKDIR@/geo-replication/${mastervol}_${remotehost}_" - "${slavevol}/"), - ("peersrx .", - "gluster_params", - "aux-gfid-mount xlator-option=\*-dht.assert-no-child-down=true", - "aux-gfid-mount"), - ("peersrx .", - "gluster_params", - "aux-gfid-mount", - "aux-gfid-mount acl"), - ("peersrx . .", - "ssh_command_tar", - "", - "ssh -oPasswordAuthentication=no -oStrictHostKeyChecking=no " - "-i @GLUSTERD_WORKDIR@/geo-replication/tar_ssh.pem"), - ("peersrx . .", - "changelog_log_file", - "", - "${iprefix}/log/glusterfs/geo-replication/${mastervol}" - "/${eSlave}${local_id}-changes.log"), - ("peersrx . .", - "working_dir", - "@localstatedir@/run/gluster/${mastervol}/${eSlave}", - "${iprefix}/lib/misc/glusterfsd/${mastervol}/${eSlave}"), - ("peersrx . .", - "ignore_deletes", - "true", - "false"), - ("peersrx . .", - "pid-file", - "@GLUSTERD_WORKDIR@/geo-replication/${mastervol}_${remotehost}_" - "${slavevol}/${eSlave}.pid", - "@GLUSTERD_WORKDIR@/geo-replication/${mastervol}_${remotehost}_" - "${slavevol}/monitor.pid"), - ("peersrx . .", - "state-file", - "@GLUSTERD_WORKDIR@/geo-replication/${mastervol}_${remotehost}_" - "${slavevol}/${eSlave}.status", - "@GLUSTERD_WORKDIR@/geo-replication/${mastervol}_${remotehost}_" - "${slavevol}/monitor.status"), -) - - -def upgrade_config_file(path, confdata): - config_change = False - config = ConfigParser.RawConfigParser() - # If confdata.rx present then glusterd is adding config values, - # it will create config file if not exists. config.read is fine in - # this case since any other error will be raised during write. - if getattr(confdata, "rx", False): - config.read(path) - else: - with open(path) as fp: - config.readfp(fp) - - for sec, opt, oldval, newval in CONFIGS: - try: - val = config.get(sec, opt) - except ConfigParser.NoOptionError: - # if new config opt not exists - config_change = True - config.set(sec, opt, newval) - continue - except ConfigParser.Error: - """ - When gsyncd invoked at the time of create, config file - will not be their. Ignore any ConfigParser errors - """ - continue - - if val == newval: - # value is same as new val - continue - - if val == oldval: - # config value needs update - config_change = True - config.set(sec, opt, newval) - - # To convert from old peers section format to new peers section format. - # Old format: peers gluster://: \ - # ssh://root@:gluster://: - # New format: peers - for old_sect in config.sections(): - if old_sect.startswith("peers "): - peers_data = old_sect.split(" ") - mvol = peers_data[1].split("%3A")[-1] - svol = peers_data[2].split("%3A")[-1] - new_sect = "peers {0} {1}".format(mvol, svol) - - if old_sect == new_sect: - # Already in new format "peers mastervol slavevol" - continue - - # Create new section if not exists - try: - config.add_section(new_sect) - except ConfigParser.DuplicateSectionError: - pass - - config_change = True - # Add all the items of old_sect to new_sect - for key, val in config.items(old_sect): - config.set(new_sect, key, val) - - # Delete old section - config.remove_section(old_sect) - - if config_change: - tempConfigFile = tempfile.NamedTemporaryFile(mode="wb", delete=False) - with open(tempConfigFile.name, 'wb') as configFile: - config.write(configFile) - - # If src and dst are two different file system, then os.rename - # fails, In this case if temp file created in /tmp and if /tmp is - # separate fs then os.rename gives following error, so use shutil - # OSError: [Errno 18] Invalid cross-device link - # mail.python.org/pipermail/python-list/2005-February/342893.html - shutil.move(tempConfigFile.name, path) - - -class MultiDict(object): - - """a virtual dict-like class which functions as the union - of underlying dicts""" - - def __init__(self, *dd): - self.dicts = dd - - def __getitem__(self, key): - val = None - for d in self.dicts: - if d.get(key) is not None: - val = d[key] - if val is None: - raise KeyError(key) - return val - - -class GConffile(object): - - """A high-level interface to ConfigParser which flattens the two-tiered - config layout by implenting automatic section dispatch based on initial - parameters. - - Also ensure section ordering in terms of their time of addition -- a compat - hack for Python < 2.7. - """ - - def _normconfig(self): - """normalize config keys by s/-/_/g""" - for n, s in self.config._sections.items(): - if n.find('__') == 0: - continue - s2 = type(s)() - for k, v in s.items(): - if k.find('__') != 0: - k = norm(k) - s2[k] = v - self.config._sections[n] = s2 - - def __init__(self, path, peers, confdata, *dd): - """ - - .path: location of config file - - .config: underlying ConfigParser instance - - .peers: on behalf of whom we flatten .config - (master, or master-slave url pair) - - .auxdicts: template subtituents - """ - self.peers = peers - self.path = path - self.auxdicts = dd - self.config = ConfigParser.RawConfigParser() - if getattr(confdata, "rx", False): - self.config.read(path) - else: - with open(path) as fp: - self.config.readfp(fp) - - self.dev, self.ino, self.mtime = -1, -1, -1 - self._normconfig() - - def _load(self): - try: - sres = os.stat(self.path) - self.dev = sres[ST_DEV] - self.ino = sres[ST_INO] - self.mtime = sres[ST_MTIME] - except (OSError, IOError): - if sys.exc_info()[1].errno == errno.ENOENT: - sres = None - - self.config = ConfigParser.RawConfigParser() - with open(self.path) as fp: - self.config.readfp(fp) - self._normconfig() - - def get_realtime(self, opt): - try: - sres = os.stat(self.path) - except (OSError, IOError): - if sys.exc_info()[1].errno == errno.ENOENT: - sres = None - else: - raise - - # compare file system stat with that of our stream file handle - if not sres or sres[ST_DEV] != self.dev or \ - sres[ST_INO] != self.ino or self.mtime != sres[ST_MTIME]: - self._load() - - return self.get(opt, printValue=False) - - def section(self, rx=False): - """get the section name of the section representing .peers - in .config""" - peers = self.peers - if not peers: - peers = ['.', '.'] - rx = True - if rx: - return ' '.join(['peersrx'] + [escape(u) for u in peers]) - else: - return ' '.join(['peers'] + [u.split(':')[-1] for u in peers]) - - @staticmethod - def parse_section(section): - """retrieve peers sequence encoded by section name - (as urls or regexen, depending on section type) - """ - sl = section.split() - st = sl.pop(0) - sl = [unescape(u) for u in sl] - if st == 'peersrx': - sl = [re.compile(u) for u in sl] - return sl - - def ord_sections(self): - """Return an ordered list of sections. - - Ordering happens based on the auxiliary - SECT_ORD section storing indices for each - section added through the config API. - - To not to go corrupt in case of manually - written config files, we take care to append - also those sections which are not registered - in SECT_ORD. - - Needed for python 2.{4,5,6} where ConfigParser - cannot yet order sections/options internally. - """ - so = {} - if self.config.has_section(SECT_ORD): - so = self.config._sections[SECT_ORD] - so2 = {} - for k, v in so.items(): - if k != '__name__': - so2[k] = int(v) - tv = 0 - if so2: - tv = max(so2.values()) + 1 - ss = [s for s in self.config.sections() if s.find('__') != 0] - for s in ss: - if s in so.keys(): - continue - so2[s] = tv - tv += 1 - - def scmp(x, y): - return cmp(*(so2[s] for s in (x, y))) - ss.sort(scmp) - return ss - - def update_to(self, dct, allow_unresolved=False): - """update @dct from key/values of ours. - - key/values are collected from .config by filtering the regexp sections - according to match, and from .section. The values are treated as - templates, which are substituted from .auxdicts and (in case of regexp - sections) match groups. - """ - if not self.peers: - raise GsyncdError('no peers given, cannot select matching options') - - def update_from_sect(sect, mud): - for k, v in self.config._sections[sect].items(): - if k == '__name__': - continue - if allow_unresolved: - dct[k] = Template(v).safe_substitute(mud) - else: - dct[k] = Template(v).substitute(mud) - for sect in self.ord_sections(): - sp = self.parse_section(sect) - if isinstance(sp[0], re_type) and len(sp) == len(self.peers): - match = True - mad = {} - for i in range(len(sp)): - m = sp[i].search(self.peers[i]) - if not m: - match = False - break - for j in range(len(m.groups())): - mad['match%d_%d' % (i + 1, j + 1)] = m.groups()[j] - if match: - update_from_sect(sect, MultiDict(dct, mad, *self.auxdicts)) - if self.config.has_section(self.section()): - update_from_sect(self.section(), MultiDict(dct, *self.auxdicts)) - - def get(self, opt=None, printValue=True): - """print the matching key/value pairs from .config, - or if @opt given, the value for @opt (according to the - logic described in .update_to) - """ - d = {} - self.update_to(d, allow_unresolved=True) - if opt: - opt = norm(opt) - v = d.get(opt) - if v: - if printValue: - print(v) - else: - return v - else: - for k, v in d.iteritems(): - if k == '__name__': - continue - print("%s: %s" % (k, v)) - - def write(self, trfn, opt, *a, **kw): - """update on-disk config transactionally - - @trfn is the transaction function - """ - def mergeconf(f): - self.config = ConfigParser.RawConfigParser() - self.config.readfp(f) - self._normconfig() - if not self.config.has_section(SECT_META): - self.config.add_section(SECT_META) - self.config.set(SECT_META, 'version', config_version) - return trfn(norm(opt), *a, **kw) - - def updateconf(f): - self.config.write(f) - update_file(self.path, updateconf, mergeconf) - - def _set(self, opt, val, rx=False): - """set @opt to @val in .section""" - sect = self.section(rx) - if not self.config.has_section(sect): - self.config.add_section(sect) - # regarding SECT_ORD, cf. ord_sections - if not self.config.has_section(SECT_ORD): - self.config.add_section(SECT_ORD) - self.config.set( - SECT_ORD, sect, len(self.config._sections[SECT_ORD])) - self.config.set(sect, opt, val) - return True - - def set(self, opt, *a, **kw): - """perform ._set transactionally""" - self.write(self._set, opt, *a, **kw) - - def _delete(self, opt, rx=False): - """delete @opt from .section""" - sect = self.section(rx) - if self.config.has_section(sect): - return self.config.remove_option(sect, opt) - - def delete(self, opt, *a, **kw): - """perform ._delete transactionally""" - self.write(self._delete, opt, *a, **kw) diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index a26de0c9cf5..a624fe44d8e 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -25,6 +25,7 @@ from gconf import gconf from syncdutils import select, waitpid, errno_wrap from syncdutils import set_term_handler, is_host_local, GsyncdError from syncdutils import escape, Thread, finalize, memoize +from syncdutils import gf_event, eventtypes from gsyncdstatus import GeorepStatus, set_monitor_status @@ -209,11 +210,12 @@ class Monitor(object): blown worker blows up on EPIPE if the net goes down, due to the keep-alive thread) """ - if not self.status.get(w[0], None): - self.status[w[0]] = GeorepStatus(gconf.state_file, w[0]) + if not self.status.get(w[0]['dir'], None): + self.status[w[0]['dir']] = GeorepStatus(gconf.state_file, + w[0]['dir']) set_monitor_status(gconf.state_file, self.ST_STARTED) - self.status[w[0]].set_worker_status(self.ST_INIT) + self.status[w[0]['dir']].set_worker_status(self.ST_INIT) ret = 0 @@ -280,7 +282,7 @@ class Monitor(object): if apid == 0: os.close(rw) os.close(ww) - os.execv(sys.executable, argv + ['--local-path', w[0], + os.execv(sys.executable, argv + ['--local-path', w[0]['dir'], '--agent', '--rpc-fd', ','.join([str(ra), str(wa), @@ -292,9 +294,9 @@ class Monitor(object): os.close(ra) os.close(wa) os.execv(sys.executable, argv + ['--feedback-fd', str(pw), - '--local-path', w[0], + '--local-path', w[0]['dir'], '--local-id', - '.' + escape(w[0]), + '.' + escape(w[0]['dir']), '--rpc-fd', ','.join([str(rw), str(ww), str(ra), str(wa)]), @@ -324,31 +326,31 @@ class Monitor(object): if ret_agent is not None: # Agent is died Kill Worker logging.info("Changelog Agent died, " - "Aborting Worker(%s)" % w[0]) + "Aborting Worker(%s)" % w[0]['dir']) errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) nwait(cpid) nwait(apid) if ret is not None: logging.info("worker(%s) died before establishing " - "connection" % w[0]) + "connection" % w[0]['dir']) nwait(apid) # wait for agent else: - logging.debug("worker(%s) connected" % w[0]) + logging.debug("worker(%s) connected" % w[0]['dir']) while time.time() < t0 + conn_timeout: ret = nwait(cpid, os.WNOHANG) ret_agent = nwait(apid, os.WNOHANG) if ret is not None: logging.info("worker(%s) died in startup " - "phase" % w[0]) + "phase" % w[0]['dir']) nwait(apid) # wait for agent break if ret_agent is not None: # Agent is died Kill Worker logging.info("Changelog Agent died, Aborting " - "Worker(%s)" % w[0]) + "Worker(%s)" % w[0]['dir']) errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) nwait(cpid) nwait(apid) @@ -357,12 +359,12 @@ class Monitor(object): time.sleep(1) else: logging.info("worker(%s) not confirmed in %d sec, " - "aborting it" % (w[0], conn_timeout)) + "aborting it" % (w[0]['dir'], conn_timeout)) errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) nwait(apid) # wait for agent ret = nwait(cpid) if ret is None: - self.status[w[0]].set_worker_status(self.ST_STABLE) + self.status[w[0]['dir']].set_worker_status(self.ST_STABLE) # If worker dies, agent terminates on EOF. # So lets wait for agent first. nwait(apid) @@ -372,9 +374,16 @@ class Monitor(object): else: ret = exit_status(ret) if ret in (0, 1): - self.status[w[0]].set_worker_status(self.ST_FAULTY) + self.status[w[0]['dir']].set_worker_status(self.ST_FAULTY) + gf_event(eventtypes.GEOREP_FAULTY, + master_volume=master.volume, + master_node=w[0]['host'], + slave_host=slave_host, + slave_volume=slave_vol, + current_slave_host=current_slave_host, + brick_path=w[0]['dir']) time.sleep(10) - self.status[w[0]].set_worker_status(self.ST_INCON) + self.status[w[0]['dir']].set_worker_status(self.ST_INCON) return ret def multiplex(self, wspx, suuid, slave_vol, slave_host, master): @@ -461,7 +470,7 @@ def distribute(*resources): for idx, brick in enumerate(mvol.bricks): if is_host_local(brick['host']): is_hot = mvol.is_hot(":".join([brick['host'], brick['dir']])) - workerspex.append((brick['dir'], + workerspex.append((brick, slaves[idx % len(slaves)], get_subvol_num(idx, mvol, is_hot), is_hot)) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index f7beb947efc..ea10fcb8817 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -23,6 +23,10 @@ from signal import signal, SIGTERM import select as oselect from os import waitpid as owaitpid +from conf import GLUSTERFS_LIBEXECDIR +sys.path.insert(1, GLUSTERFS_LIBEXECDIR) +from events import eventtypes + try: from cPickle import PickleError except ImportError: @@ -509,3 +513,8 @@ class ChangelogHistoryNotAvailable(Exception): class ChangelogException(OSError): pass + + +def gf_event(event_type, **kwargs): + from events.gf_event import gf_event as gfevent + gfevent(event_type, **kwargs) -- cgit