diff options
Diffstat (limited to 'geo-replication')
-rw-r--r-- | geo-replication/syncdaemon/libcxattr.py | 16 | ||||
-rw-r--r-- | geo-replication/syncdaemon/libgfchangelog.py | 30 | ||||
-rw-r--r-- | geo-replication/syncdaemon/py2py3.py | 141 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 55 |
4 files changed, 174 insertions, 68 deletions
diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py index 7f3f6ce453a..c7d69d7eb2e 100644 --- a/geo-replication/syncdaemon/libcxattr.py +++ b/geo-replication/syncdaemon/libcxattr.py @@ -9,9 +9,9 @@ # import os -from ctypes import CDLL, create_string_buffer, get_errno -import py2py3 -from py2py3 import bytearray_to_str +from ctypes import CDLL, get_errno +from py2py3 import bytearray_to_str, gr_create_string_buffer +from py2py3 import gr_query_xattr, gr_lsetxattr, gr_lremovexattr class Xattr(object): @@ -40,7 +40,7 @@ class Xattr(object): @classmethod def _query_xattr(cls, path, siz, syscall, *a): if siz: - buf = create_string_buffer(b'\0' * siz) + buf = gr_create_string_buffer(siz) else: buf = None ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz))) @@ -56,7 +56,7 @@ class Xattr(object): @classmethod def lgetxattr(cls, path, attr, siz=0): - return cls._query_xattr(path.encode(), siz, 'lgetxattr', attr.encode()) + return gr_query_xattr(cls, path, siz, 'lgetxattr', attr) @classmethod def lgetxattr_buf(cls, path, attr): @@ -70,7 +70,7 @@ class Xattr(object): @classmethod def llistxattr(cls, path, siz=0): - ret = cls._query_xattr(path.encode(), siz, 'llistxattr') + ret = gr_query_xattr(cls, path, siz, 'llistxattr') if isinstance(ret, str): ret = ret.strip('\0') ret = ret.split('\0') if ret else [] @@ -78,13 +78,13 @@ class Xattr(object): @classmethod def lsetxattr(cls, path, attr, val): - ret = cls.libc.lsetxattr(path.encode(), attr.encode(), val, len(val), 0) + ret = gr_lsetxattr(cls, path, attr, val) if ret == -1: cls.raise_oserr() @classmethod def lremovexattr(cls, path, attr): - ret = cls.libc.lremovexattr(path.encode(), attr.encode()) + ret = gr_lremovexattr(cls, path, attr) if ret == -1: cls.raise_oserr() diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py index cc40fd5475d..fff9d24e54f 100644 --- a/geo-replication/syncdaemon/libgfchangelog.py +++ b/geo-replication/syncdaemon/libgfchangelog.py @@ -9,9 +9,10 @@ # import os -from ctypes import CDLL, RTLD_GLOBAL, create_string_buffer, \ - get_errno, byref, c_ulong +from ctypes import CDLL, RTLD_GLOBAL, get_errno, byref, c_ulong from syncdutils import ChangelogException, ChangelogHistoryNotAvailable +from py2py3 import gr_cl_history_changelog, gr_cl_done, gr_create_string_buffer +from py2py3 import gr_cl_register, gr_cl_history_done, bytearray_to_str class Changes(object): @@ -39,9 +40,7 @@ class Changes(object): @classmethod def cl_register(cls, brick, path, log_file, log_level, retries=0): - ret = cls._get_api('gf_changelog_register')(brick.encode(), path.encode(), - log_file.encode(), - log_level, retries) + ret = gr_cl_register(cls, brick, path, log_file, log_level, retries) if ret == -1: cls.raise_changelog_err() @@ -63,14 +62,16 @@ class Changes(object): def clsort(f): return f.split('.')[-1] changes = [] - buf = create_string_buffer(b'\0' * 4096) + buf = gr_create_string_buffer(4096) call = cls._get_api('gf_changelog_next_change') while True: ret = call(buf, 4096) if ret in (0, -1): break - changes.append(buf.raw[:ret - 1].decode()) + # py2 and py3 compatibility + result = bytearray_to_str(buf.raw[:ret - 1]) + changes.append(result) if ret == -1: cls.raise_changelog_err() # cleanup tracker @@ -79,7 +80,7 @@ class Changes(object): @classmethod def cl_done(cls, clfile): - ret = cls._get_api('gf_changelog_done')(clfile.encode()) + ret = gr_cl_done(cls, clfile) if ret == -1: cls.raise_changelog_err() @@ -94,9 +95,8 @@ class Changes(object): @classmethod def cl_history_changelog(cls, changelog_path, start, end, num_parallel): actual_end = c_ulong() - ret = cls._get_api('gf_history_changelog')(changelog_path.encode(), start, end, - num_parallel, - byref(actual_end)) + ret = gr_cl_history_changelog(cls, changelog_path, start, end, + num_parallel, byref(actual_end)) if ret == -1: cls.raise_changelog_err() @@ -118,14 +118,16 @@ class Changes(object): return f.split('.')[-1] changes = [] - buf = create_string_buffer(b'\0' * 4096) + buf = gr_create_string_buffer(4096) call = cls._get_api('gf_history_changelog_next_change') while True: ret = call(buf, 4096) if ret in (0, -1): break - changes.append(buf.raw[:ret - 1].decode()) + # py2 and py3 compatibility + result = bytearray_to_str(buf.raw[:ret - 1]) + changes.append(result) if ret == -1: cls.raise_changelog_err() @@ -133,6 +135,6 @@ class Changes(object): @classmethod def cl_history_done(cls, clfile): - ret = cls._get_api('gf_history_changelog_done')(clfile.encode()) + ret = gr_cl_history_done(cls, clfile) if ret == -1: cls.raise_changelog_err() diff --git a/geo-replication/syncdaemon/py2py3.py b/geo-replication/syncdaemon/py2py3.py index 4c0e1152aa8..faad750059c 100644 --- a/geo-replication/syncdaemon/py2py3.py +++ b/geo-replication/syncdaemon/py2py3.py @@ -12,7 +12,10 @@ import sys import os - +import stat +import struct +from syncdutils import umask +from ctypes import create_string_buffer if sys.version_info >= (3,): def pipe(): @@ -35,6 +38,77 @@ if sys.version_info >= (3,): def str_to_bytearray(string): return bytes([ord(c) for c in string]) + def gr_create_string_buffer(size): + return create_string_buffer(b'\0', size) + + def gr_query_xattr(cls, path, size, syscall, attr=None): + if attr: + return cls._query_xattr(path.encode(), size, syscall, + attr.encode()) + else: + return cls._query_xattr(path.encode(), size, syscall) + + def gr_lsetxattr(cls, path, attr, val): + return cls.libc.lsetxattr(path.encode(), attr.encode(), val, + len(val), 0) + + def gr_lremovexattr(cls, path, attr): + return cls.libc.lremovexattr(path.encode(), attr.encode()) + + def gr_cl_register(cls, brick, path, log_file, log_level, retries): + return cls._get_api('gf_changelog_register')(brick.encode(), + path.encode(), + log_file.encode(), + log_level, retries) + + def gr_cl_done(cls, clfile): + return cls._get_api('gf_changelog_done')(clfile.encode()) + + def gr_cl_history_changelog(cls, changelog_path, start, end, num_parallel, + actual_end): + return cls._get_api('gf_history_changelog')(changelog_path.encode(), + start, end, num_parallel, + actual_end) + + def gr_cl_history_done(cls, clfile): + return cls._get_api('gf_history_changelog_done')(clfile.encode()) + + # regular file + + def entry_pack_reg(cls, gf, bn, mo, uid, gid): + bn_encoded = bn.encode() + blen = len(bn_encoded) + return struct.pack(cls._fmt_mknod(blen), + uid, gid, gf.encode(), mo, bn_encoded, + stat.S_IMODE(mo), 0, umask()) + + def entry_pack_reg_stat(cls, gf, bn, st): + bn_encoded = bn.encode() + blen = len(bn_encoded) + mo = st['mode'] + return struct.pack(cls._fmt_mknod(blen), + st['uid'], st['gid'], + gf.encode(), mo, bn_encoded, + stat.S_IMODE(mo), 0, umask()) + # mkdir + + def entry_pack_mkdir(cls, gf, bn, mo, uid, gid): + bn_encoded = bn.encode() + blen = len(bn_encoded) + return struct.pack(cls._fmt_mkdir(blen), + uid, gid, gf.encode(), mo, bn_encoded, + stat.S_IMODE(mo), umask()) + # symlink + + def entry_pack_symlink(cls, gf, bn, lnk, st): + bn_encoded = bn.encode() + blen = len(bn_encoded) + lnk_encoded = lnk.encode() + llen = len(lnk_encoded) + return struct.pack(cls._fmt_symlink(blen, llen), + st['uid'], st['gid'], + gf.encode(), st['mode'], bn_encoded, + lnk_encoded) else: def pipe(): (r, w) = os.pipe() @@ -42,8 +116,69 @@ else: # Raw conversion of bytearray to string def bytearray_to_str(byte_arr): - return ''.join([b for b in byte_arr]) + return byte_arr # Raw conversion of string to bytearray def str_to_bytearray(string): - return b"".join([c for c in string]) + return string + + def gr_create_string_buffer(size): + return create_string_buffer('\0', size) + + def gr_query_xattr(cls, path, size, syscall, attr=None): + if attr: + return cls._query_xattr(path, size, syscall, attr) + else: + return cls._query_xattr(path, size, syscall) + + def gr_lsetxattr(cls, path, attr, val): + return cls.libc.lsetxattr(path, attr, val, len(val), 0) + + def gr_lremovexattr(cls, path, attr): + return cls.libc.lremovexattr(path, attr) + + def gr_cl_register(cls, brick, path, log_file, log_level, retries): + return cls._get_api('gf_changelog_register')(brick, path, log_file, + log_level, retries) + + def gr_cl_done(cls, clfile): + return cls._get_api('gf_changelog_done')(clfile) + + def gr_cl_history_changelog(cls, changelog_path, start, end, num_parallel, + actual_end): + return cls._get_api('gf_history_changelog')(changelog_path, start, end, + num_parallel, actual_end) + + def gr_cl_history_done(cls, clfile): + return cls._get_api('gf_history_changelog_done')(clfile) + + # regular file + + def entry_pack_reg(cls, gf, bn, mo, uid, gid): + blen = len(bn) + return struct.pack(cls._fmt_mknod(blen), + uid, gid, gf, mo, bn, + stat.S_IMODE(mo), 0, umask()) + + def entry_pack_reg_stat(cls, gf, bn, st): + blen = len(bn) + mo = st['mode'] + return struct.pack(cls._fmt_mknod(blen), + st['uid'], st['gid'], + gf, mo, bn, + stat.S_IMODE(mo), 0, umask()) + # mkdir + + def entry_pack_mkdir(cls, gf, bn, mo, uid, gid): + blen = len(bn) + return struct.pack(cls._fmt_mkdir(blen), + uid, gid, gf, mo, bn, + stat.S_IMODE(mo), umask()) + # symlink + + def entry_pack_symlink(cls, gf, bn, lnk, st): + blen = len(bn) + llen = len(lnk) + return struct.pack(cls._fmt_symlink(blen, llen), + st['uid'], st['gid'], + gf, st['mode'], bn, lnk) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index c8575f00e70..c669f7f7756 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -31,7 +31,7 @@ from repce import RepceServer, RepceClient from master import gmaster_builder import syncdutils from syncdutils import GsyncdError, select, privileged, funcode -from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat +from syncdutils import entry2pb, gauxpfx, errno_wrap, lstat from syncdutils import NoStimeAvailable, PartialHistoryAvailable from syncdutils import ChangelogException, ChangelogHistoryNotAvailable from syncdutils import get_changelog_log_level, get_rsync_version @@ -41,8 +41,8 @@ from gsyncdstatus import GeorepStatus from syncdutils import lf, Popen, sup from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt from syncdutils import unshare_propagation_supported, get_slv_dir_path -import py2py3 -from py2py3 import pipe, str_to_bytearray +from py2py3 import pipe, str_to_bytearray, entry_pack_reg +from py2py3 import entry_pack_reg_stat, entry_pack_mkdir, entry_pack_symlink ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') @@ -377,37 +377,6 @@ class Server(object): def entry_ops(cls, entries): pfx = gauxpfx() logging.debug('entries: %s' % repr(entries)) - # regular file - - def entry_pack_reg(gf, bn, mo, uid, gid): - blen = len(bn) - return struct.pack(cls._fmt_mknod(blen), - uid, gid, gf.encode(), mo, bn.encode(), - stat.S_IMODE(mo), 0, umask()) - - def entry_pack_reg_stat(gf, bn, st): - blen = len(bn) - mo = st['mode'] - return struct.pack(cls._fmt_mknod(blen), - st['uid'], st['gid'], - gf.encode(), mo, bn.encode(), - stat.S_IMODE(mo), 0, umask()) - # mkdir - - def entry_pack_mkdir(gf, bn, mo, uid, gid): - blen = len(bn) - return struct.pack(cls._fmt_mkdir(blen), - uid, gid, gf.encode(), mo, bn.encode(), - stat.S_IMODE(mo), umask()) - # symlink - - def entry_pack_symlink(gf, bn, lnk, st): - blen = len(bn) - llen = len(lnk) - return struct.pack(cls._fmt_symlink(blen, llen), - st['uid'], st['gid'], - gf.encode(), st['mode'], bn.encode(), - lnk.encode()) def entry_purge(op, entry, gfid, e, uid, gid): # This is an extremely racy code and needs to be fixed ASAP. @@ -581,8 +550,8 @@ class Server(object): st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): - blob = entry_pack_reg( - gfid, bname, e['mode'], e['uid'], e['gid']) + blob = entry_pack_reg(cls, gfid, bname, + e['mode'], e['uid'], e['gid']) # Self healed hardlinks are recorded as MKNOD. # So if the gfid already exists, it should be # processed as hard link not mknod. @@ -597,8 +566,8 @@ class Server(object): st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): - blob = entry_pack_mkdir( - gfid, bname, e['mode'], e['uid'], e['gid']) + blob = entry_pack_mkdir(cls, gfid, bname, + e['mode'], e['uid'], e['gid']) elif (isinstance(lstat(en), int) or not matching_disk_gfid(gfid, en)): # If gfid of a directory exists on slave but path based @@ -626,9 +595,9 @@ class Server(object): if isinstance(st, int): (pg, bname) = entry2pb(entry) if stat.S_ISREG(e['stat']['mode']): - blob = entry_pack_reg_stat(gfid, bname, e['stat']) + blob = entry_pack_reg_stat(cls, gfid, bname, e['stat']) elif stat.S_ISLNK(e['stat']['mode']): - blob = entry_pack_symlink(gfid, bname, e['link'], + blob = entry_pack_symlink(cls, gfid, bname, e['link'], e['stat']) else: cmd_ret = errno_wrap(os.link, @@ -639,7 +608,7 @@ class Server(object): en = e['entry'] st = lstat(entry) if isinstance(st, int): - blob = entry_pack_symlink(gfid, bname, e['link'], + blob = entry_pack_symlink(cls, gfid, bname, e['link'], e['stat']) elif not matching_disk_gfid(gfid, en): collect_failure(e, EEXIST, uid, gid) @@ -659,7 +628,7 @@ class Server(object): st1 = lstat(en) if isinstance(st1, int): (pg, bname) = entry2pb(en) - blob = entry_pack_symlink(gfid, bname, + blob = entry_pack_symlink(cls, gfid, bname, e['link'], e['stat']) elif not matching_disk_gfid(gfid, en): collect_failure(e, EEXIST, uid, gid, True) @@ -669,7 +638,7 @@ class Server(object): # don't create multiple entries with same gfid if isinstance(st, int): (pg, bname) = entry2pb(en) - blob = entry_pack_reg_stat(gfid, bname, + blob = entry_pack_reg_stat(cls, gfid, bname, e['stat']) else: cmd_ret = errno_wrap(os.link, [slink, en], |