summaryrefslogtreecommitdiffstats
path: root/geo-replication
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication')
-rw-r--r--geo-replication/syncdaemon/libcxattr.py16
-rw-r--r--geo-replication/syncdaemon/libgfchangelog.py30
-rw-r--r--geo-replication/syncdaemon/py2py3.py141
-rw-r--r--geo-replication/syncdaemon/resource.py55
4 files changed, 174 insertions, 68 deletions
diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py
index 7f3f6ce453a..c7d69d7eb2e 100644
--- a/geo-replication/syncdaemon/libcxattr.py
+++ b/geo-replication/syncdaemon/libcxattr.py
@@ -9,9 +9,9 @@
#
import os
-from ctypes import CDLL, create_string_buffer, get_errno
-import py2py3
-from py2py3 import bytearray_to_str
+from ctypes import CDLL, get_errno
+from py2py3 import bytearray_to_str, gr_create_string_buffer
+from py2py3 import gr_query_xattr, gr_lsetxattr, gr_lremovexattr
class Xattr(object):
@@ -40,7 +40,7 @@ class Xattr(object):
@classmethod
def _query_xattr(cls, path, siz, syscall, *a):
if siz:
- buf = create_string_buffer(b'\0' * siz)
+ buf = gr_create_string_buffer(siz)
else:
buf = None
ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz)))
@@ -56,7 +56,7 @@ class Xattr(object):
@classmethod
def lgetxattr(cls, path, attr, siz=0):
- return cls._query_xattr(path.encode(), siz, 'lgetxattr', attr.encode())
+ return gr_query_xattr(cls, path, siz, 'lgetxattr', attr)
@classmethod
def lgetxattr_buf(cls, path, attr):
@@ -70,7 +70,7 @@ class Xattr(object):
@classmethod
def llistxattr(cls, path, siz=0):
- ret = cls._query_xattr(path.encode(), siz, 'llistxattr')
+ ret = gr_query_xattr(cls, path, siz, 'llistxattr')
if isinstance(ret, str):
ret = ret.strip('\0')
ret = ret.split('\0') if ret else []
@@ -78,13 +78,13 @@ class Xattr(object):
@classmethod
def lsetxattr(cls, path, attr, val):
- ret = cls.libc.lsetxattr(path.encode(), attr.encode(), val, len(val), 0)
+ ret = gr_lsetxattr(cls, path, attr, val)
if ret == -1:
cls.raise_oserr()
@classmethod
def lremovexattr(cls, path, attr):
- ret = cls.libc.lremovexattr(path.encode(), attr.encode())
+ ret = gr_lremovexattr(cls, path, attr)
if ret == -1:
cls.raise_oserr()
diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py
index cc40fd5475d..fff9d24e54f 100644
--- a/geo-replication/syncdaemon/libgfchangelog.py
+++ b/geo-replication/syncdaemon/libgfchangelog.py
@@ -9,9 +9,10 @@
#
import os
-from ctypes import CDLL, RTLD_GLOBAL, create_string_buffer, \
- get_errno, byref, c_ulong
+from ctypes import CDLL, RTLD_GLOBAL, get_errno, byref, c_ulong
from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
+from py2py3 import gr_cl_history_changelog, gr_cl_done, gr_create_string_buffer
+from py2py3 import gr_cl_register, gr_cl_history_done, bytearray_to_str
class Changes(object):
@@ -39,9 +40,7 @@ class Changes(object):
@classmethod
def cl_register(cls, brick, path, log_file, log_level, retries=0):
- ret = cls._get_api('gf_changelog_register')(brick.encode(), path.encode(),
- log_file.encode(),
- log_level, retries)
+ ret = gr_cl_register(cls, brick, path, log_file, log_level, retries)
if ret == -1:
cls.raise_changelog_err()
@@ -63,14 +62,16 @@ class Changes(object):
def clsort(f):
return f.split('.')[-1]
changes = []
- buf = create_string_buffer(b'\0' * 4096)
+ buf = gr_create_string_buffer(4096)
call = cls._get_api('gf_changelog_next_change')
while True:
ret = call(buf, 4096)
if ret in (0, -1):
break
- changes.append(buf.raw[:ret - 1].decode())
+ # py2 and py3 compatibility
+ result = bytearray_to_str(buf.raw[:ret - 1])
+ changes.append(result)
if ret == -1:
cls.raise_changelog_err()
# cleanup tracker
@@ -79,7 +80,7 @@ class Changes(object):
@classmethod
def cl_done(cls, clfile):
- ret = cls._get_api('gf_changelog_done')(clfile.encode())
+ ret = gr_cl_done(cls, clfile)
if ret == -1:
cls.raise_changelog_err()
@@ -94,9 +95,8 @@ class Changes(object):
@classmethod
def cl_history_changelog(cls, changelog_path, start, end, num_parallel):
actual_end = c_ulong()
- ret = cls._get_api('gf_history_changelog')(changelog_path.encode(), start, end,
- num_parallel,
- byref(actual_end))
+ ret = gr_cl_history_changelog(cls, changelog_path, start, end,
+ num_parallel, byref(actual_end))
if ret == -1:
cls.raise_changelog_err()
@@ -118,14 +118,16 @@ class Changes(object):
return f.split('.')[-1]
changes = []
- buf = create_string_buffer(b'\0' * 4096)
+ buf = gr_create_string_buffer(4096)
call = cls._get_api('gf_history_changelog_next_change')
while True:
ret = call(buf, 4096)
if ret in (0, -1):
break
- changes.append(buf.raw[:ret - 1].decode())
+ # py2 and py3 compatibility
+ result = bytearray_to_str(buf.raw[:ret - 1])
+ changes.append(result)
if ret == -1:
cls.raise_changelog_err()
@@ -133,6 +135,6 @@ class Changes(object):
@classmethod
def cl_history_done(cls, clfile):
- ret = cls._get_api('gf_history_changelog_done')(clfile.encode())
+ ret = gr_cl_history_done(cls, clfile)
if ret == -1:
cls.raise_changelog_err()
diff --git a/geo-replication/syncdaemon/py2py3.py b/geo-replication/syncdaemon/py2py3.py
index 4c0e1152aa8..faad750059c 100644
--- a/geo-replication/syncdaemon/py2py3.py
+++ b/geo-replication/syncdaemon/py2py3.py
@@ -12,7 +12,10 @@
import sys
import os
-
+import stat
+import struct
+from syncdutils import umask
+from ctypes import create_string_buffer
if sys.version_info >= (3,):
def pipe():
@@ -35,6 +38,77 @@ if sys.version_info >= (3,):
def str_to_bytearray(string):
return bytes([ord(c) for c in string])
+ def gr_create_string_buffer(size):
+ return create_string_buffer(b'\0', size)
+
+ def gr_query_xattr(cls, path, size, syscall, attr=None):
+ if attr:
+ return cls._query_xattr(path.encode(), size, syscall,
+ attr.encode())
+ else:
+ return cls._query_xattr(path.encode(), size, syscall)
+
+ def gr_lsetxattr(cls, path, attr, val):
+ return cls.libc.lsetxattr(path.encode(), attr.encode(), val,
+ len(val), 0)
+
+ def gr_lremovexattr(cls, path, attr):
+ return cls.libc.lremovexattr(path.encode(), attr.encode())
+
+ def gr_cl_register(cls, brick, path, log_file, log_level, retries):
+ return cls._get_api('gf_changelog_register')(brick.encode(),
+ path.encode(),
+ log_file.encode(),
+ log_level, retries)
+
+ def gr_cl_done(cls, clfile):
+ return cls._get_api('gf_changelog_done')(clfile.encode())
+
+ def gr_cl_history_changelog(cls, changelog_path, start, end, num_parallel,
+ actual_end):
+ return cls._get_api('gf_history_changelog')(changelog_path.encode(),
+ start, end, num_parallel,
+ actual_end)
+
+ def gr_cl_history_done(cls, clfile):
+ return cls._get_api('gf_history_changelog_done')(clfile.encode())
+
+ # regular file
+
+ def entry_pack_reg(cls, gf, bn, mo, uid, gid):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ return struct.pack(cls._fmt_mknod(blen),
+ uid, gid, gf.encode(), mo, bn_encoded,
+ stat.S_IMODE(mo), 0, umask())
+
+ def entry_pack_reg_stat(cls, gf, bn, st):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ mo = st['mode']
+ return struct.pack(cls._fmt_mknod(blen),
+ st['uid'], st['gid'],
+ gf.encode(), mo, bn_encoded,
+ stat.S_IMODE(mo), 0, umask())
+ # mkdir
+
+ def entry_pack_mkdir(cls, gf, bn, mo, uid, gid):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ return struct.pack(cls._fmt_mkdir(blen),
+ uid, gid, gf.encode(), mo, bn_encoded,
+ stat.S_IMODE(mo), umask())
+ # symlink
+
+ def entry_pack_symlink(cls, gf, bn, lnk, st):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ lnk_encoded = lnk.encode()
+ llen = len(lnk_encoded)
+ return struct.pack(cls._fmt_symlink(blen, llen),
+ st['uid'], st['gid'],
+ gf.encode(), st['mode'], bn_encoded,
+ lnk_encoded)
else:
def pipe():
(r, w) = os.pipe()
@@ -42,8 +116,69 @@ else:
# Raw conversion of bytearray to string
def bytearray_to_str(byte_arr):
- return ''.join([b for b in byte_arr])
+ return byte_arr
# Raw conversion of string to bytearray
def str_to_bytearray(string):
- return b"".join([c for c in string])
+ return string
+
+ def gr_create_string_buffer(size):
+ return create_string_buffer('\0', size)
+
+ def gr_query_xattr(cls, path, size, syscall, attr=None):
+ if attr:
+ return cls._query_xattr(path, size, syscall, attr)
+ else:
+ return cls._query_xattr(path, size, syscall)
+
+ def gr_lsetxattr(cls, path, attr, val):
+ return cls.libc.lsetxattr(path, attr, val, len(val), 0)
+
+ def gr_lremovexattr(cls, path, attr):
+ return cls.libc.lremovexattr(path, attr)
+
+ def gr_cl_register(cls, brick, path, log_file, log_level, retries):
+ return cls._get_api('gf_changelog_register')(brick, path, log_file,
+ log_level, retries)
+
+ def gr_cl_done(cls, clfile):
+ return cls._get_api('gf_changelog_done')(clfile)
+
+ def gr_cl_history_changelog(cls, changelog_path, start, end, num_parallel,
+ actual_end):
+ return cls._get_api('gf_history_changelog')(changelog_path, start, end,
+ num_parallel, actual_end)
+
+ def gr_cl_history_done(cls, clfile):
+ return cls._get_api('gf_history_changelog_done')(clfile)
+
+ # regular file
+
+ def entry_pack_reg(cls, gf, bn, mo, uid, gid):
+ blen = len(bn)
+ return struct.pack(cls._fmt_mknod(blen),
+ uid, gid, gf, mo, bn,
+ stat.S_IMODE(mo), 0, umask())
+
+ def entry_pack_reg_stat(cls, gf, bn, st):
+ blen = len(bn)
+ mo = st['mode']
+ return struct.pack(cls._fmt_mknod(blen),
+ st['uid'], st['gid'],
+ gf, mo, bn,
+ stat.S_IMODE(mo), 0, umask())
+ # mkdir
+
+ def entry_pack_mkdir(cls, gf, bn, mo, uid, gid):
+ blen = len(bn)
+ return struct.pack(cls._fmt_mkdir(blen),
+ uid, gid, gf, mo, bn,
+ stat.S_IMODE(mo), umask())
+ # symlink
+
+ def entry_pack_symlink(cls, gf, bn, lnk, st):
+ blen = len(bn)
+ llen = len(lnk)
+ return struct.pack(cls._fmt_symlink(blen, llen),
+ st['uid'], st['gid'],
+ gf, st['mode'], bn, lnk)
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index c8575f00e70..c669f7f7756 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -31,7 +31,7 @@ from repce import RepceServer, RepceClient
from master import gmaster_builder
import syncdutils
from syncdutils import GsyncdError, select, privileged, funcode
-from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
+from syncdutils import entry2pb, gauxpfx, errno_wrap, lstat
from syncdutils import NoStimeAvailable, PartialHistoryAvailable
from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
from syncdutils import get_changelog_log_level, get_rsync_version
@@ -41,8 +41,8 @@ from gsyncdstatus import GeorepStatus
from syncdutils import lf, Popen, sup
from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
from syncdutils import unshare_propagation_supported, get_slv_dir_path
-import py2py3
-from py2py3 import pipe, str_to_bytearray
+from py2py3 import pipe, str_to_bytearray, entry_pack_reg
+from py2py3 import entry_pack_reg_stat, entry_pack_mkdir, entry_pack_symlink
ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
@@ -377,37 +377,6 @@ class Server(object):
def entry_ops(cls, entries):
pfx = gauxpfx()
logging.debug('entries: %s' % repr(entries))
- # regular file
-
- def entry_pack_reg(gf, bn, mo, uid, gid):
- blen = len(bn)
- return struct.pack(cls._fmt_mknod(blen),
- uid, gid, gf.encode(), mo, bn.encode(),
- stat.S_IMODE(mo), 0, umask())
-
- def entry_pack_reg_stat(gf, bn, st):
- blen = len(bn)
- mo = st['mode']
- return struct.pack(cls._fmt_mknod(blen),
- st['uid'], st['gid'],
- gf.encode(), mo, bn.encode(),
- stat.S_IMODE(mo), 0, umask())
- # mkdir
-
- def entry_pack_mkdir(gf, bn, mo, uid, gid):
- blen = len(bn)
- return struct.pack(cls._fmt_mkdir(blen),
- uid, gid, gf.encode(), mo, bn.encode(),
- stat.S_IMODE(mo), umask())
- # symlink
-
- def entry_pack_symlink(gf, bn, lnk, st):
- blen = len(bn)
- llen = len(lnk)
- return struct.pack(cls._fmt_symlink(blen, llen),
- st['uid'], st['gid'],
- gf.encode(), st['mode'], bn.encode(),
- lnk.encode())
def entry_purge(op, entry, gfid, e, uid, gid):
# This is an extremely racy code and needs to be fixed ASAP.
@@ -581,8 +550,8 @@ class Server(object):
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
- blob = entry_pack_reg(
- gfid, bname, e['mode'], e['uid'], e['gid'])
+ blob = entry_pack_reg(cls, gfid, bname,
+ e['mode'], e['uid'], e['gid'])
# Self healed hardlinks are recorded as MKNOD.
# So if the gfid already exists, it should be
# processed as hard link not mknod.
@@ -597,8 +566,8 @@ class Server(object):
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
- blob = entry_pack_mkdir(
- gfid, bname, e['mode'], e['uid'], e['gid'])
+ blob = entry_pack_mkdir(cls, gfid, bname,
+ e['mode'], e['uid'], e['gid'])
elif (isinstance(lstat(en), int) or
not matching_disk_gfid(gfid, en)):
# If gfid of a directory exists on slave but path based
@@ -626,9 +595,9 @@ class Server(object):
if isinstance(st, int):
(pg, bname) = entry2pb(entry)
if stat.S_ISREG(e['stat']['mode']):
- blob = entry_pack_reg_stat(gfid, bname, e['stat'])
+ blob = entry_pack_reg_stat(cls, gfid, bname, e['stat'])
elif stat.S_ISLNK(e['stat']['mode']):
- blob = entry_pack_symlink(gfid, bname, e['link'],
+ blob = entry_pack_symlink(cls, gfid, bname, e['link'],
e['stat'])
else:
cmd_ret = errno_wrap(os.link,
@@ -639,7 +608,7 @@ class Server(object):
en = e['entry']
st = lstat(entry)
if isinstance(st, int):
- blob = entry_pack_symlink(gfid, bname, e['link'],
+ blob = entry_pack_symlink(cls, gfid, bname, e['link'],
e['stat'])
elif not matching_disk_gfid(gfid, en):
collect_failure(e, EEXIST, uid, gid)
@@ -659,7 +628,7 @@ class Server(object):
st1 = lstat(en)
if isinstance(st1, int):
(pg, bname) = entry2pb(en)
- blob = entry_pack_symlink(gfid, bname,
+ blob = entry_pack_symlink(cls, gfid, bname,
e['link'], e['stat'])
elif not matching_disk_gfid(gfid, en):
collect_failure(e, EEXIST, uid, gid, True)
@@ -669,7 +638,7 @@ class Server(object):
# don't create multiple entries with same gfid
if isinstance(st, int):
(pg, bname) = entry2pb(en)
- blob = entry_pack_reg_stat(gfid, bname,
+ blob = entry_pack_reg_stat(cls, gfid, bname,
e['stat'])
else:
cmd_ret = errno_wrap(os.link, [slink, en],