diff options
Diffstat (limited to 'geo-replication')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 26 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 97 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 46 | 
3 files changed, 105 insertions, 64 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 2aae860f5d1..2987bca0601 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -24,7 +24,7 @@ from datetime import datetime  from gconf import gconf  from syncdutils import Thread, GsyncdError, boolify, escape_space_newline  from syncdutils import unescape_space_newline, gauxpfx, md5hex, selfkill -from syncdutils import lstat, errno_wrap, FreeObject, lf +from syncdutils import lstat, errno_wrap, FreeObject, lf, matching_disk_gfid  from syncdutils import NoStimeAvailable, PartialHistoryAvailable  URXTIME = (-1, 0) @@ -792,6 +792,10 @@ class GMasterChangelogMixin(GMasterCommon):          fix_entry_ops = []          failures1 = []          for failure in failures: +            if failure[2]['dst']: +                pbname = failure[0]['entry1'] +            else: +                pbname = failure[0]['entry']              if failure[2]['gfid_mismatch']:                  slave_gfid = failure[2]['slave_gfid']                  st = lstat(os.path.join(pfx, slave_gfid)) @@ -800,7 +804,6 @@ class GMasterChangelogMixin(GMasterCommon):                                      ' the entry', retry_count=retry_count,                                      entry=repr(failure)))                      #Add deletion to fix_entry_ops list -                    pbname = failure[0]['entry']                      if failure[2]['slave_isdir']:                          fix_entry_ops.append(edct('RMDIR',                                                    gfid=failure[2]['slave_gfid'], @@ -836,7 +839,6 @@ class GMasterChangelogMixin(GMasterCommon):                                          ' Deleting the entry',                                          retry_count=retry_count,                                          entry=repr(failure))) -                        pbname = failure[0]['entry']                          fix_entry_ops.append(edct('UNLINK',                                                    gfid=failure[2]['slave_gfid'],                                                    entry=pbname)) @@ -1024,15 +1026,27 @@ class GMasterChangelogMixin(GMasterCommon):                                          stat=st, link=rl))                  else:                      # stat() to get mode and other information +                    if not matching_disk_gfid(gfid, en): +                        logging.debug(lf('Ignoring entry, purged in the ' +                                      'interim', file=en, gfid=gfid)) +                        continue +                      go = os.path.join(pfx, gfid)                      st = lstat(go)                      if isinstance(st, int): -                        logging.debug(lf('file got purged in the interim', -                                         file=go)) +                        logging.debug(lf('Ignoring entry, purged in the ' +                                      'interim', file=en, gfid=gfid))                          continue                      if ty == 'LINK': -                        entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) +                        rl = None +                        if st and stat.S_ISLNK(st.st_mode): +                            rl = errno_wrap(os.readlink, [en], [ENOENT], +                                            [ESTALE]) +                            if isinstance(rl, int): +                                rl = None +                        entries.append(edct(ty, stat=st, entry=en, gfid=gfid, +                                       link=rl))                      elif ty == 'SYMLINK':                          rl = errno_wrap(os.readlink, [en], [ENOENT], [ESTALE])                          if isinstance(rl, int): diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index a608944f9be..a6e351590f8 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -39,9 +39,11 @@ from syncdutils import NoStimeAvailable, PartialHistoryAvailable  from syncdutils import ChangelogException, ChangelogHistoryNotAvailable  from syncdutils import get_changelog_log_level, get_rsync_version  from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION +from syncdutils import GX_GFID_CANONICAL_LEN  from gsyncdstatus import GeorepStatus  from syncdutils import get_master_and_slave_data_from_args  from syncdutils import mntpt_list, lf +from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt  UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')  HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I) @@ -110,33 +112,6 @@ def parse_url(ustr):      return getattr(this, sch.upper())(path) -class _MetaXattr(object): - -    """singleton class, a lazy wrapper around the -    libcxattr module - -    libcxattr (a heavy import due to ctypes) is -    loaded only when when the single -    instance is tried to be used. - -    This reduces runtime for those invocations -    which do not need filesystem manipulation -    (eg. for config, url parsing) -    """ - -    def __getattr__(self, meth): -        from libcxattr import Xattr as LXattr -        xmeth = [m for m in dir(LXattr) if m[0] != '_'] -        if not meth in xmeth: -            return -        for m in xmeth: -            setattr(self, m, getattr(LXattr, m)) -        return getattr(self, meth) - - -Xattr = _MetaXattr() - -  class Popen(subprocess.Popen):      """customized subclass of subprocess.Popen with a ring @@ -294,7 +269,6 @@ class Server(object):      NTV_FMTSTR = "!" + "B" * 19 + "II"      FRGN_XTRA_FMT = "I"      FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT -    GX_GFID_CANONICAL_LEN = 37  # canonical gfid len + '\0'      # for backend gfid fetch, do not use GX_NSPACE_PFX      GFID_XATTR = 'trusted.gfid' @@ -304,15 +278,15 @@ class Server(object):      @classmethod      def _fmt_mknod(cls, l): -        return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l + 1) +        return "!II%dsI%dsIII" % (GX_GFID_CANONICAL_LEN, l + 1)      @classmethod      def _fmt_mkdir(cls, l): -        return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l + 1) +        return "!II%dsI%dsII" % (GX_GFID_CANONICAL_LEN, l + 1)      @classmethod      def _fmt_symlink(cls, l1, l2): -        return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1 + 1, l2 + 1) +        return "!II%dsI%ds%ds" % (GX_GFID_CANONICAL_LEN, l1 + 1, l2 + 1)      def _pathguard(f):          """decorator method that checks @@ -385,12 +359,6 @@ class Server(object):                  raise      @classmethod -    def gfid_mnt(cls, gfidpath): -        return errno_wrap(Xattr.lgetxattr, -                          [gfidpath, 'glusterfs.gfid.string', -                           cls.GX_GFID_CANONICAL_LEN], [ENOENT], [ESTALE]) - -    @classmethod      @_pathguard      def purge(cls, path, entries=None):          """force-delete subtrees @@ -653,27 +621,33 @@ class Server(object):                  return              if op == 'UNLINK': -                errno_wrap(os.unlink, [entry], [ENOENT, ESTALE], -                           [EBUSY]) +                er = errno_wrap(os.unlink, [entry], [ENOENT, ESTALE], [EBUSY]) +                return er +              elif op == 'RMDIR':                  er = errno_wrap(os.rmdir, [entry], [ENOENT, ESTALE,                                                      ENOTEMPTY], [EBUSY])                  if er == ENOTEMPTY:                      return er -        def collect_failure(e, cmd_ret): +        def collect_failure(e, cmd_ret, dst=False):              slv_entry_info = {}              slv_entry_info['gfid_mismatch'] = False +            slv_entry_info['dst'] = dst              # We do this for failing fops on Slave              # Master should be logging this              if cmd_ret is None:                  return False              if cmd_ret == EEXIST: -                disk_gfid = cls.gfid_mnt(e['entry']) +                if dst: +                    en = e['entry1'] +                else: +                    en = e['entry'] +                disk_gfid = get_gfid_from_mnt(en)                  if isinstance(disk_gfid, basestring) and e['gfid'] != disk_gfid:                      slv_entry_info['gfid_mismatch'] = True -                    st = lstat(e['entry']) +                    st = lstat(en)                      if not isinstance(st, int):                          if st and stat.S_ISDIR(st.st_mode):                              slv_entry_info['slave_isdir'] = True @@ -690,16 +664,6 @@ class Server(object):          failures = [] -        def matching_disk_gfid(gfid, entry): -            disk_gfid = cls.gfid_mnt(entry) -            if isinstance(disk_gfid, int): -                return False - -            if not gfid == disk_gfid: -                return False - -            return True -          def recursive_rmdir(gfid, entry, path):              """disk_gfid check added for original path for which              recursive_delete is called. This disk gfid check executed @@ -738,8 +702,9 @@ class Server(object):                                   "with on-disk gfid",                                   source=entry,                                   gfid=gfid, -                                 disk_gfid=cls.gfid_mnt(entry), +                                 disk_gfid=get_gfid_from_mnt(entry),                                   target=en)) +                collect_failure(e, EEXIST)                  return              cmd_ret = errno_wrap(os.rename, @@ -817,14 +782,24 @@ class Server(object):                  st = lstat(slink)                  if isinstance(st, int):                      (pg, bname) = entry2pb(entry) -                    blob = entry_pack_reg_stat(gfid, bname, e['stat']) +                    if stat.S_ISREG(e['stat']['mode']): +                        blob = entry_pack_reg_stat(gfid, bname, e['stat']) +                    elif stat.S_ISLNK(e['stat']['mode']): +                        blob = entry_pack_symlink(gfid, bname, e['link'], +                                                  e['stat'])                  else:                      cmd_ret = errno_wrap(os.link,                                           [slink, entry],                                           [ENOENT, EEXIST], [ESTALE])                      collect_failure(e, cmd_ret)              elif op == 'SYMLINK': -                blob = entry_pack_symlink(gfid, bname, e['link'], e['stat']) +                en = e['entry'] +                st = lstat(entry) +                if isinstance(st, int): +                    blob = entry_pack_symlink(gfid, bname, e['link'], +                                              e['stat']) +                elif not matching_disk_gfid(gfid, en): +                    collect_failure(e, EEXIST)              elif op == 'RENAME':                  en = e['entry1']                  st = lstat(entry) @@ -832,9 +807,13 @@ class Server(object):                      if e['stat'] and not stat.S_ISDIR(e['stat']['mode']):                          if stat.S_ISLNK(e['stat']['mode']) and \                             e['link'] is not None: -                            (pg, bname) = entry2pb(en) -                            blob = entry_pack_symlink(gfid, bname, -                                                      e['link'], e['stat']) +                            st1 = lstat(en) +                            if isinstance(st1, int): +                                (pg, bname) = entry2pb(en) +                                blob = entry_pack_symlink(gfid, bname, +                                                          e['link'], e['stat']) +                            elif not matching_disk_gfid(gfid, en): +                                collect_failure(e, EEXIST, True)                          else:                              (pg, bname) = entry2pb(en)                              blob = entry_pack_reg_stat(gfid, bname, e['stat']) @@ -865,6 +844,8 @@ class Server(object):                                              raise                                  else:                                      raise +                        elif not matching_disk_gfid(gfid, en): +                            collect_failure(e, EEXIST, True)                          else:                              rename_with_disk_gfid_confirmation(gfid, entry, en)              if blob: diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 43b21668a46..2187ecd226b 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -66,6 +66,8 @@ except ImportError:  _CL_AUX_GFID_PFX = ".gfid/"  GF_OP_RETRIES = 10 +GX_GFID_CANONICAL_LEN = 37  # canonical gfid len + '\0' +  CHANGELOG_AGENT_SERVER_VERSION = 1.0  CHANGELOG_AGENT_CLIENT_VERSION = 1.0  NodeID = None @@ -371,6 +373,33 @@ class GsyncdError(Exception):      pass +class _MetaXattr(object): + +    """singleton class, a lazy wrapper around the +    libcxattr module + +    libcxattr (a heavy import due to ctypes) is +    loaded only when when the single +    instance is tried to be used. + +    This reduces runtime for those invocations +    which do not need filesystem manipulation +    (eg. for config, url parsing) +    """ + +    def __getattr__(self, meth): +        from libcxattr import Xattr as LXattr +        xmeth = [m for m in dir(LXattr) if m[0] != '_'] +        if meth not in xmeth: +            return +        for m in xmeth: +            setattr(self, m, getattr(LXattr, m)) +        return getattr(self, meth) + + +Xattr = _MetaXattr() + +  def getusername(uid=None):      if uid is None:          uid = os.geteuid() @@ -524,6 +553,23 @@ def lstat(e):      return errno_wrap(os.lstat, [e], [ENOENT], [ESTALE, EBUSY]) +def get_gfid_from_mnt(gfidpath): +    return errno_wrap(Xattr.lgetxattr, +                      [gfidpath, 'glusterfs.gfid.string', +                       GX_GFID_CANONICAL_LEN], [ENOENT], [ESTALE]) + + +def matching_disk_gfid(gfid, entry): +    disk_gfid = get_gfid_from_mnt(entry) +    if isinstance(disk_gfid, int): +        return False + +    if not gfid == disk_gfid: +        return False + +    return True + +  class NoStimeAvailable(Exception):      pass  | 
