summaryrefslogtreecommitdiffstats
path: root/geo-replication
diff options
context:
space:
mode:
authorKotresh HR <khiremat@redhat.com>2018-11-17 13:14:24 +0530
committerAmar Tumballi <amarts@redhat.com>2018-12-04 09:15:44 +0000
commit54ebd47e4154c8b37680b2bac43775fc92ced153 (patch)
treed7ea19c495c2798d76f12b714b2e06eebe6a021b /geo-replication
parent748e993d1f30197c533933ddae889b317ccd00d3 (diff)
geo-rep: Fix syncing of files with non-ascii filenames
Problem: Creation of files/directories with non-ascii names fails to sync to the slave. It crashes with below traceback on slave. ... File "/usr/lib/x86_64-linux-gnu/glusterfs/python/syncdaemon/repce.py", line 118, in worker res = getattr(self.obj, rmeth)(*in_data[2:]) File "/usr/lib/x86_64-linux-gnu/glusterfs/python/syncdaemon/resource.py", line 709, in entry_ops [ESTALE, EINVAL, EBUSY]) File "/usr/lib/x86_64-linux-gnu/glusterfs/python/syncdaemon/syncdutils.py", line 546, in errno_wrap return call(*arg) File "/usr/lib/x86_64-linux-gnu/glusterfs/python/syncdaemon/libcxattr.py", line 83, in lsetxattr cls.raise_oserr() File "/usr/lib/x86_64-linux-gnu/glusterfs/python/syncdaemon/libcxattr.py", line 38, in raise_oserr raise OSError(errn, os.strerror(errn)) OSError: [Errno 12] Cannot allocate memory Cause: The length calculation arguments passed to blob creation was done before encoding. Hence was failing in gfid-access layer. Fix: It appears that the calculating lenght properly fixes this issue. But it will cause issues in other places in 'python2' and not in 'python3'. So encoding and decoding each required string to make geo-rep compatible with both 'python2' and 'python3' is a nightmare and is not fool proof. Hence kept 'python2' code as is with out encode/decode and applied encode/decode only to 'python3' Added non-ascii filename tests to regression fixes: bz#1650893 Change-Id: I35cfaf848e07b1a0b5cb93c01b98b472f08271a6 Signed-off-by: Kotresh HR <khiremat@redhat.com>
Diffstat (limited to 'geo-replication')
-rw-r--r--geo-replication/syncdaemon/libcxattr.py16
-rw-r--r--geo-replication/syncdaemon/libgfchangelog.py30
-rw-r--r--geo-replication/syncdaemon/py2py3.py141
-rw-r--r--geo-replication/syncdaemon/resource.py55
4 files changed, 174 insertions, 68 deletions
diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py
index 7f3f6ce453a..c7d69d7eb2e 100644
--- a/geo-replication/syncdaemon/libcxattr.py
+++ b/geo-replication/syncdaemon/libcxattr.py
@@ -9,9 +9,9 @@
#
import os
-from ctypes import CDLL, create_string_buffer, get_errno
-import py2py3
-from py2py3 import bytearray_to_str
+from ctypes import CDLL, get_errno
+from py2py3 import bytearray_to_str, gr_create_string_buffer
+from py2py3 import gr_query_xattr, gr_lsetxattr, gr_lremovexattr
class Xattr(object):
@@ -40,7 +40,7 @@ class Xattr(object):
@classmethod
def _query_xattr(cls, path, siz, syscall, *a):
if siz:
- buf = create_string_buffer(b'\0' * siz)
+ buf = gr_create_string_buffer(siz)
else:
buf = None
ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz)))
@@ -56,7 +56,7 @@ class Xattr(object):
@classmethod
def lgetxattr(cls, path, attr, siz=0):
- return cls._query_xattr(path.encode(), siz, 'lgetxattr', attr.encode())
+ return gr_query_xattr(cls, path, siz, 'lgetxattr', attr)
@classmethod
def lgetxattr_buf(cls, path, attr):
@@ -70,7 +70,7 @@ class Xattr(object):
@classmethod
def llistxattr(cls, path, siz=0):
- ret = cls._query_xattr(path.encode(), siz, 'llistxattr')
+ ret = gr_query_xattr(cls, path, siz, 'llistxattr')
if isinstance(ret, str):
ret = ret.strip('\0')
ret = ret.split('\0') if ret else []
@@ -78,13 +78,13 @@ class Xattr(object):
@classmethod
def lsetxattr(cls, path, attr, val):
- ret = cls.libc.lsetxattr(path.encode(), attr.encode(), val, len(val), 0)
+ ret = gr_lsetxattr(cls, path, attr, val)
if ret == -1:
cls.raise_oserr()
@classmethod
def lremovexattr(cls, path, attr):
- ret = cls.libc.lremovexattr(path.encode(), attr.encode())
+ ret = gr_lremovexattr(cls, path, attr)
if ret == -1:
cls.raise_oserr()
diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py
index cc40fd5475d..fff9d24e54f 100644
--- a/geo-replication/syncdaemon/libgfchangelog.py
+++ b/geo-replication/syncdaemon/libgfchangelog.py
@@ -9,9 +9,10 @@
#
import os
-from ctypes import CDLL, RTLD_GLOBAL, create_string_buffer, \
- get_errno, byref, c_ulong
+from ctypes import CDLL, RTLD_GLOBAL, get_errno, byref, c_ulong
from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
+from py2py3 import gr_cl_history_changelog, gr_cl_done, gr_create_string_buffer
+from py2py3 import gr_cl_register, gr_cl_history_done, bytearray_to_str
class Changes(object):
@@ -39,9 +40,7 @@ class Changes(object):
@classmethod
def cl_register(cls, brick, path, log_file, log_level, retries=0):
- ret = cls._get_api('gf_changelog_register')(brick.encode(), path.encode(),
- log_file.encode(),
- log_level, retries)
+ ret = gr_cl_register(cls, brick, path, log_file, log_level, retries)
if ret == -1:
cls.raise_changelog_err()
@@ -63,14 +62,16 @@ class Changes(object):
def clsort(f):
return f.split('.')[-1]
changes = []
- buf = create_string_buffer(b'\0' * 4096)
+ buf = gr_create_string_buffer(4096)
call = cls._get_api('gf_changelog_next_change')
while True:
ret = call(buf, 4096)
if ret in (0, -1):
break
- changes.append(buf.raw[:ret - 1].decode())
+ # py2 and py3 compatibility
+ result = bytearray_to_str(buf.raw[:ret - 1])
+ changes.append(result)
if ret == -1:
cls.raise_changelog_err()
# cleanup tracker
@@ -79,7 +80,7 @@ class Changes(object):
@classmethod
def cl_done(cls, clfile):
- ret = cls._get_api('gf_changelog_done')(clfile.encode())
+ ret = gr_cl_done(cls, clfile)
if ret == -1:
cls.raise_changelog_err()
@@ -94,9 +95,8 @@ class Changes(object):
@classmethod
def cl_history_changelog(cls, changelog_path, start, end, num_parallel):
actual_end = c_ulong()
- ret = cls._get_api('gf_history_changelog')(changelog_path.encode(), start, end,
- num_parallel,
- byref(actual_end))
+ ret = gr_cl_history_changelog(cls, changelog_path, start, end,
+ num_parallel, byref(actual_end))
if ret == -1:
cls.raise_changelog_err()
@@ -118,14 +118,16 @@ class Changes(object):
return f.split('.')[-1]
changes = []
- buf = create_string_buffer(b'\0' * 4096)
+ buf = gr_create_string_buffer(4096)
call = cls._get_api('gf_history_changelog_next_change')
while True:
ret = call(buf, 4096)
if ret in (0, -1):
break
- changes.append(buf.raw[:ret - 1].decode())
+ # py2 and py3 compatibility
+ result = bytearray_to_str(buf.raw[:ret - 1])
+ changes.append(result)
if ret == -1:
cls.raise_changelog_err()
@@ -133,6 +135,6 @@ class Changes(object):
@classmethod
def cl_history_done(cls, clfile):
- ret = cls._get_api('gf_history_changelog_done')(clfile.encode())
+ ret = gr_cl_history_done(cls, clfile)
if ret == -1:
cls.raise_changelog_err()
diff --git a/geo-replication/syncdaemon/py2py3.py b/geo-replication/syncdaemon/py2py3.py
index 4c0e1152aa8..faad750059c 100644
--- a/geo-replication/syncdaemon/py2py3.py
+++ b/geo-replication/syncdaemon/py2py3.py
@@ -12,7 +12,10 @@
import sys
import os
-
+import stat
+import struct
+from syncdutils import umask
+from ctypes import create_string_buffer
if sys.version_info >= (3,):
def pipe():
@@ -35,6 +38,77 @@ if sys.version_info >= (3,):
def str_to_bytearray(string):
return bytes([ord(c) for c in string])
+ def gr_create_string_buffer(size):
+ return create_string_buffer(b'\0', size)
+
+ def gr_query_xattr(cls, path, size, syscall, attr=None):
+ if attr:
+ return cls._query_xattr(path.encode(), size, syscall,
+ attr.encode())
+ else:
+ return cls._query_xattr(path.encode(), size, syscall)
+
+ def gr_lsetxattr(cls, path, attr, val):
+ return cls.libc.lsetxattr(path.encode(), attr.encode(), val,
+ len(val), 0)
+
+ def gr_lremovexattr(cls, path, attr):
+ return cls.libc.lremovexattr(path.encode(), attr.encode())
+
+ def gr_cl_register(cls, brick, path, log_file, log_level, retries):
+ return cls._get_api('gf_changelog_register')(brick.encode(),
+ path.encode(),
+ log_file.encode(),
+ log_level, retries)
+
+ def gr_cl_done(cls, clfile):
+ return cls._get_api('gf_changelog_done')(clfile.encode())
+
+ def gr_cl_history_changelog(cls, changelog_path, start, end, num_parallel,
+ actual_end):
+ return cls._get_api('gf_history_changelog')(changelog_path.encode(),
+ start, end, num_parallel,
+ actual_end)
+
+ def gr_cl_history_done(cls, clfile):
+ return cls._get_api('gf_history_changelog_done')(clfile.encode())
+
+ # regular file
+
+ def entry_pack_reg(cls, gf, bn, mo, uid, gid):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ return struct.pack(cls._fmt_mknod(blen),
+ uid, gid, gf.encode(), mo, bn_encoded,
+ stat.S_IMODE(mo), 0, umask())
+
+ def entry_pack_reg_stat(cls, gf, bn, st):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ mo = st['mode']
+ return struct.pack(cls._fmt_mknod(blen),
+ st['uid'], st['gid'],
+ gf.encode(), mo, bn_encoded,
+ stat.S_IMODE(mo), 0, umask())
+ # mkdir
+
+ def entry_pack_mkdir(cls, gf, bn, mo, uid, gid):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ return struct.pack(cls._fmt_mkdir(blen),
+ uid, gid, gf.encode(), mo, bn_encoded,
+ stat.S_IMODE(mo), umask())
+ # symlink
+
+ def entry_pack_symlink(cls, gf, bn, lnk, st):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ lnk_encoded = lnk.encode()
+ llen = len(lnk_encoded)
+ return struct.pack(cls._fmt_symlink(blen, llen),
+ st['uid'], st['gid'],
+ gf.encode(), st['mode'], bn_encoded,
+ lnk_encoded)
else:
def pipe():
(r, w) = os.pipe()
@@ -42,8 +116,69 @@ else:
# Raw conversion of bytearray to string
def bytearray_to_str(byte_arr):
- return ''.join([b for b in byte_arr])
+ return byte_arr
# Raw conversion of string to bytearray
def str_to_bytearray(string):
- return b"".join([c for c in string])
+ return string
+
+ def gr_create_string_buffer(size):
+ return create_string_buffer('\0', size)
+
+ def gr_query_xattr(cls, path, size, syscall, attr=None):
+ if attr:
+ return cls._query_xattr(path, size, syscall, attr)
+ else:
+ return cls._query_xattr(path, size, syscall)
+
+ def gr_lsetxattr(cls, path, attr, val):
+ return cls.libc.lsetxattr(path, attr, val, len(val), 0)
+
+ def gr_lremovexattr(cls, path, attr):
+ return cls.libc.lremovexattr(path, attr)
+
+ def gr_cl_register(cls, brick, path, log_file, log_level, retries):
+ return cls._get_api('gf_changelog_register')(brick, path, log_file,
+ log_level, retries)
+
+ def gr_cl_done(cls, clfile):
+ return cls._get_api('gf_changelog_done')(clfile)
+
+ def gr_cl_history_changelog(cls, changelog_path, start, end, num_parallel,
+ actual_end):
+ return cls._get_api('gf_history_changelog')(changelog_path, start, end,
+ num_parallel, actual_end)
+
+ def gr_cl_history_done(cls, clfile):
+ return cls._get_api('gf_history_changelog_done')(clfile)
+
+ # regular file
+
+ def entry_pack_reg(cls, gf, bn, mo, uid, gid):
+ blen = len(bn)
+ return struct.pack(cls._fmt_mknod(blen),
+ uid, gid, gf, mo, bn,
+ stat.S_IMODE(mo), 0, umask())
+
+ def entry_pack_reg_stat(cls, gf, bn, st):
+ blen = len(bn)
+ mo = st['mode']
+ return struct.pack(cls._fmt_mknod(blen),
+ st['uid'], st['gid'],
+ gf, mo, bn,
+ stat.S_IMODE(mo), 0, umask())
+ # mkdir
+
+ def entry_pack_mkdir(cls, gf, bn, mo, uid, gid):
+ blen = len(bn)
+ return struct.pack(cls._fmt_mkdir(blen),
+ uid, gid, gf, mo, bn,
+ stat.S_IMODE(mo), umask())
+ # symlink
+
+ def entry_pack_symlink(cls, gf, bn, lnk, st):
+ blen = len(bn)
+ llen = len(lnk)
+ return struct.pack(cls._fmt_symlink(blen, llen),
+ st['uid'], st['gid'],
+ gf, st['mode'], bn, lnk)
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index c8575f00e70..c669f7f7756 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -31,7 +31,7 @@ from repce import RepceServer, RepceClient
from master import gmaster_builder
import syncdutils
from syncdutils import GsyncdError, select, privileged, funcode
-from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
+from syncdutils import entry2pb, gauxpfx, errno_wrap, lstat
from syncdutils import NoStimeAvailable, PartialHistoryAvailable
from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
from syncdutils import get_changelog_log_level, get_rsync_version
@@ -41,8 +41,8 @@ from gsyncdstatus import GeorepStatus
from syncdutils import lf, Popen, sup
from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
from syncdutils import unshare_propagation_supported, get_slv_dir_path
-import py2py3
-from py2py3 import pipe, str_to_bytearray
+from py2py3 import pipe, str_to_bytearray, entry_pack_reg
+from py2py3 import entry_pack_reg_stat, entry_pack_mkdir, entry_pack_symlink
ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
@@ -377,37 +377,6 @@ class Server(object):
def entry_ops(cls, entries):
pfx = gauxpfx()
logging.debug('entries: %s' % repr(entries))
- # regular file
-
- def entry_pack_reg(gf, bn, mo, uid, gid):
- blen = len(bn)
- return struct.pack(cls._fmt_mknod(blen),
- uid, gid, gf.encode(), mo, bn.encode(),
- stat.S_IMODE(mo), 0, umask())
-
- def entry_pack_reg_stat(gf, bn, st):
- blen = len(bn)
- mo = st['mode']
- return struct.pack(cls._fmt_mknod(blen),
- st['uid'], st['gid'],
- gf.encode(), mo, bn.encode(),
- stat.S_IMODE(mo), 0, umask())
- # mkdir
-
- def entry_pack_mkdir(gf, bn, mo, uid, gid):
- blen = len(bn)
- return struct.pack(cls._fmt_mkdir(blen),
- uid, gid, gf.encode(), mo, bn.encode(),
- stat.S_IMODE(mo), umask())
- # symlink
-
- def entry_pack_symlink(gf, bn, lnk, st):
- blen = len(bn)
- llen = len(lnk)
- return struct.pack(cls._fmt_symlink(blen, llen),
- st['uid'], st['gid'],
- gf.encode(), st['mode'], bn.encode(),
- lnk.encode())
def entry_purge(op, entry, gfid, e, uid, gid):
# This is an extremely racy code and needs to be fixed ASAP.
@@ -581,8 +550,8 @@ class Server(object):
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
- blob = entry_pack_reg(
- gfid, bname, e['mode'], e['uid'], e['gid'])
+ blob = entry_pack_reg(cls, gfid, bname,
+ e['mode'], e['uid'], e['gid'])
# Self healed hardlinks are recorded as MKNOD.
# So if the gfid already exists, it should be
# processed as hard link not mknod.
@@ -597,8 +566,8 @@ class Server(object):
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
- blob = entry_pack_mkdir(
- gfid, bname, e['mode'], e['uid'], e['gid'])
+ blob = entry_pack_mkdir(cls, gfid, bname,
+ e['mode'], e['uid'], e['gid'])
elif (isinstance(lstat(en), int) or
not matching_disk_gfid(gfid, en)):
# If gfid of a directory exists on slave but path based
@@ -626,9 +595,9 @@ class Server(object):
if isinstance(st, int):
(pg, bname) = entry2pb(entry)
if stat.S_ISREG(e['stat']['mode']):
- blob = entry_pack_reg_stat(gfid, bname, e['stat'])
+ blob = entry_pack_reg_stat(cls, gfid, bname, e['stat'])
elif stat.S_ISLNK(e['stat']['mode']):
- blob = entry_pack_symlink(gfid, bname, e['link'],
+ blob = entry_pack_symlink(cls, gfid, bname, e['link'],
e['stat'])
else:
cmd_ret = errno_wrap(os.link,
@@ -639,7 +608,7 @@ class Server(object):
en = e['entry']
st = lstat(entry)
if isinstance(st, int):
- blob = entry_pack_symlink(gfid, bname, e['link'],
+ blob = entry_pack_symlink(cls, gfid, bname, e['link'],
e['stat'])
elif not matching_disk_gfid(gfid, en):
collect_failure(e, EEXIST, uid, gid)
@@ -659,7 +628,7 @@ class Server(object):
st1 = lstat(en)
if isinstance(st1, int):
(pg, bname) = entry2pb(en)
- blob = entry_pack_symlink(gfid, bname,
+ blob = entry_pack_symlink(cls, gfid, bname,
e['link'], e['stat'])
elif not matching_disk_gfid(gfid, en):
collect_failure(e, EEXIST, uid, gid, True)
@@ -669,7 +638,7 @@ class Server(object):
# don't create multiple entries with same gfid
if isinstance(st, int):
(pg, bname) = entry2pb(en)
- blob = entry_pack_reg_stat(gfid, bname,
+ blob = entry_pack_reg_stat(cls, gfid, bname,
e['stat'])
else:
cmd_ret = errno_wrap(os.link, [slink, en],