diff options
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 97 |
1 files changed, 39 insertions, 58 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index a608944f9be..a6e351590f8 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -39,9 +39,11 @@ from syncdutils import NoStimeAvailable, PartialHistoryAvailable from syncdutils import ChangelogException, ChangelogHistoryNotAvailable from syncdutils import get_changelog_log_level, get_rsync_version from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION +from syncdutils import GX_GFID_CANONICAL_LEN from gsyncdstatus import GeorepStatus from syncdutils import get_master_and_slave_data_from_args from syncdutils import mntpt_list, lf +from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I) @@ -110,33 +112,6 @@ def parse_url(ustr): return getattr(this, sch.upper())(path) -class _MetaXattr(object): - - """singleton class, a lazy wrapper around the - libcxattr module - - libcxattr (a heavy import due to ctypes) is - loaded only when when the single - instance is tried to be used. - - This reduces runtime for those invocations - which do not need filesystem manipulation - (eg. for config, url parsing) - """ - - def __getattr__(self, meth): - from libcxattr import Xattr as LXattr - xmeth = [m for m in dir(LXattr) if m[0] != '_'] - if not meth in xmeth: - return - for m in xmeth: - setattr(self, m, getattr(LXattr, m)) - return getattr(self, meth) - - -Xattr = _MetaXattr() - - class Popen(subprocess.Popen): """customized subclass of subprocess.Popen with a ring @@ -294,7 +269,6 @@ class Server(object): NTV_FMTSTR = "!" + "B" * 19 + "II" FRGN_XTRA_FMT = "I" FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT - GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' # for backend gfid fetch, do not use GX_NSPACE_PFX GFID_XATTR = 'trusted.gfid' @@ -304,15 +278,15 @@ class Server(object): @classmethod def _fmt_mknod(cls, l): - return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l + 1) + return "!II%dsI%dsIII" % (GX_GFID_CANONICAL_LEN, l + 1) @classmethod def _fmt_mkdir(cls, l): - return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l + 1) + return "!II%dsI%dsII" % (GX_GFID_CANONICAL_LEN, l + 1) @classmethod def _fmt_symlink(cls, l1, l2): - return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1 + 1, l2 + 1) + return "!II%dsI%ds%ds" % (GX_GFID_CANONICAL_LEN, l1 + 1, l2 + 1) def _pathguard(f): """decorator method that checks @@ -385,12 +359,6 @@ class Server(object): raise @classmethod - def gfid_mnt(cls, gfidpath): - return errno_wrap(Xattr.lgetxattr, - [gfidpath, 'glusterfs.gfid.string', - cls.GX_GFID_CANONICAL_LEN], [ENOENT], [ESTALE]) - - @classmethod @_pathguard def purge(cls, path, entries=None): """force-delete subtrees @@ -653,27 +621,33 @@ class Server(object): return if op == 'UNLINK': - errno_wrap(os.unlink, [entry], [ENOENT, ESTALE], - [EBUSY]) + er = errno_wrap(os.unlink, [entry], [ENOENT, ESTALE], [EBUSY]) + return er + elif op == 'RMDIR': er = errno_wrap(os.rmdir, [entry], [ENOENT, ESTALE, ENOTEMPTY], [EBUSY]) if er == ENOTEMPTY: return er - def collect_failure(e, cmd_ret): + def collect_failure(e, cmd_ret, dst=False): slv_entry_info = {} slv_entry_info['gfid_mismatch'] = False + slv_entry_info['dst'] = dst # We do this for failing fops on Slave # Master should be logging this if cmd_ret is None: return False if cmd_ret == EEXIST: - disk_gfid = cls.gfid_mnt(e['entry']) + if dst: + en = e['entry1'] + else: + en = e['entry'] + disk_gfid = get_gfid_from_mnt(en) if isinstance(disk_gfid, basestring) and e['gfid'] != disk_gfid: slv_entry_info['gfid_mismatch'] = True - st = lstat(e['entry']) + st = lstat(en) if not isinstance(st, int): if st and stat.S_ISDIR(st.st_mode): slv_entry_info['slave_isdir'] = True @@ -690,16 +664,6 @@ class Server(object): failures = [] - def matching_disk_gfid(gfid, entry): - disk_gfid = cls.gfid_mnt(entry) - if isinstance(disk_gfid, int): - return False - - if not gfid == disk_gfid: - return False - - return True - def recursive_rmdir(gfid, entry, path): """disk_gfid check added for original path for which recursive_delete is called. This disk gfid check executed @@ -738,8 +702,9 @@ class Server(object): "with on-disk gfid", source=entry, gfid=gfid, - disk_gfid=cls.gfid_mnt(entry), + disk_gfid=get_gfid_from_mnt(entry), target=en)) + collect_failure(e, EEXIST) return cmd_ret = errno_wrap(os.rename, @@ -817,14 +782,24 @@ class Server(object): st = lstat(slink) if isinstance(st, int): (pg, bname) = entry2pb(entry) - blob = entry_pack_reg_stat(gfid, bname, e['stat']) + if stat.S_ISREG(e['stat']['mode']): + blob = entry_pack_reg_stat(gfid, bname, e['stat']) + elif stat.S_ISLNK(e['stat']['mode']): + blob = entry_pack_symlink(gfid, bname, e['link'], + e['stat']) else: cmd_ret = errno_wrap(os.link, [slink, entry], [ENOENT, EEXIST], [ESTALE]) collect_failure(e, cmd_ret) elif op == 'SYMLINK': - blob = entry_pack_symlink(gfid, bname, e['link'], e['stat']) + en = e['entry'] + st = lstat(entry) + if isinstance(st, int): + blob = entry_pack_symlink(gfid, bname, e['link'], + e['stat']) + elif not matching_disk_gfid(gfid, en): + collect_failure(e, EEXIST) elif op == 'RENAME': en = e['entry1'] st = lstat(entry) @@ -832,9 +807,13 @@ class Server(object): if e['stat'] and not stat.S_ISDIR(e['stat']['mode']): if stat.S_ISLNK(e['stat']['mode']) and \ e['link'] is not None: - (pg, bname) = entry2pb(en) - blob = entry_pack_symlink(gfid, bname, - e['link'], e['stat']) + st1 = lstat(en) + if isinstance(st1, int): + (pg, bname) = entry2pb(en) + blob = entry_pack_symlink(gfid, bname, + e['link'], e['stat']) + elif not matching_disk_gfid(gfid, en): + collect_failure(e, EEXIST, True) else: (pg, bname) = entry2pb(en) blob = entry_pack_reg_stat(gfid, bname, e['stat']) @@ -865,6 +844,8 @@ class Server(object): raise else: raise + elif not matching_disk_gfid(gfid, en): + collect_failure(e, EEXIST, True) else: rename_with_disk_gfid_confirmation(gfid, entry, en) if blob: |