diff options
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 176 |
1 files changed, 138 insertions, 38 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index faf62f868c7..8deb5114b50 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -265,6 +265,9 @@ class Server(object): FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' + GFID_XATTR = 'trusted.gfid' # for backend gfid fetch, do not use GX_NSPACE_PFX + GFID_FMTSTR = "!" + "B"*16 + local_path = '' @classmethod @@ -305,6 +308,38 @@ class Server(object): raise OSError(ENOTDIR, os.strerror(ENOTDIR)) return os.listdir(path) + + @classmethod + @_pathguard + def lstat(cls, path): + try: + return os.lstat(path) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + return ex.errno + else: + raise + + + @classmethod + @_pathguard + def gfid(cls, path): + try: + buf = Xattr.lgetxattr(path, cls.GFID_XATTR, 16) + m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)])) + return '-'.join(m.groups()) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + return ex.errno + else: + raise + + @classmethod + def gfid_mnt(cls, gfidpath): + return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT]) + @classmethod @_pathguard def purge(cls, path, entries=None): @@ -397,8 +432,42 @@ class Server(object): raise @classmethod - def gfid(cls, gfidpath): - return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT]) + @_pathguard + def stime_mnt(cls, path, uuid): + """query xtime extended attribute + + Return xtime of @path for @uuid as a pair of integers. + "Normal" errors due to non-existent @path or extended attribute + are tolerated and errno is returned in such a case. + """ + + try: + return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8)) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOENT, ENODATA, ENOTDIR): + return ex.errno + else: + raise + + @classmethod + @_pathguard + def stime(cls, path, uuid): + """query xtime extended attribute + + Return xtime of @path for @uuid as a pair of integers. + "Normal" errors due to non-existent @path or extended attribute + are tolerated and errno is returned in such a case. + """ + + try: + return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8)) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOENT, ENODATA, ENOTDIR): + return ex.errno + else: + raise @classmethod def node_uuid(cls, path='.'): @@ -409,21 +478,10 @@ class Server(object): raise @classmethod - def xtime_vec(cls, path, *uuids): - """vectored version of @xtime - - accepts a list of uuids and returns a dictionary - with uuid as key(s) and xtime as value(s) - """ - xt = {} - for uuid in uuids: - xtu = cls.xtime(path, uuid) - if xtu == ENODATA: - xtu = None - if isinstance(xtu, int): - return xtu - xt[uuid] = xtu - return xt + @_pathguard + def set_stime(cls, path, uuid, mark): + """set @mark as stime for @uuid on @path""" + Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), struct.pack('!II', *mark)) @classmethod @_pathguard @@ -444,20 +502,16 @@ class Server(object): Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) @classmethod - def set_xtime_vec(cls, path, mark_dct): - """vectored (or dictered) version of set_xtime - - ignore values that match @ignore - """ - for u,t in mark_dct.items(): - cls.set_xtime(path, u, t) - - @classmethod def entry_ops(cls, entries): pfx = gauxpfx() logging.debug('entries: %s' % repr(entries)) # regular file - def entry_pack_reg(gf, bn, st): + def entry_pack_reg(gf, bn, mo, uid, gid): + blen = len(bn) + return struct.pack(cls._fmt_mknod(blen), + uid, gid, gf, mo, bn, + stat.S_IMODE(mo), 0, umask()) + def entry_pack_reg_stat(gf, bn, st): blen = len(bn) mo = st['mode'] return struct.pack(cls._fmt_mknod(blen), @@ -465,12 +519,10 @@ class Server(object): gf, mo, bn, stat.S_IMODE(mo), 0, umask()) # mkdir - def entry_pack_mkdir(gf, bn, st): + def entry_pack_mkdir(gf, bn, mo, uid, gid): blen = len(bn) - mo = st['mode'] return struct.pack(cls._fmt_mkdir(blen), - st['uid'], st['gid'], - gf, mo, bn, + uid, gid, gf, mo, bn, stat.S_IMODE(mo), umask()) #symlink def entry_pack_symlink(gf, bn, lnk, st): @@ -485,7 +537,7 @@ class Server(object): # to be purged is the GFID gotten from the changelog. # (a stat(changelog_gfid) would also be valid here) # The race here is between the GFID check and the purge. - disk_gfid = cls.gfid(entry) + disk_gfid = cls.gfid_mnt(entry) if isinstance(disk_gfid, int): return if not gfid == disk_gfid: @@ -510,15 +562,15 @@ class Server(object): else: break elif op in ['CREATE', 'MKNOD']: - blob = entry_pack_reg(gfid, bname, e['stat']) + blob = entry_pack_reg(gfid, bname, e['mode'], e['uid'], e['uid']) elif op == 'MKDIR': - blob = entry_pack_mkdir(gfid, bname, e['stat']) + blob = entry_pack_mkdir(gfid, bname, e['mode'], e['uid'], e['uid']) elif op == 'LINK': slink = os.path.join(pfx, gfid) st = lstat(slink) if isinstance(st, int): (pg, bname) = entry2pb(entry) - blob = entry_pack_reg(gfid, bname, e['stat']) + blob = entry_pack_reg_stat(gfid, bname, e['stat']) else: errno_wrap(os.link, [slink, entry], [ENOENT, EEXIST]) elif op == 'SYMLINK': @@ -528,13 +580,24 @@ class Server(object): st = lstat(entry) if isinstance(st, int): (pg, bname) = entry2pb(en) - blob = entry_pack_reg(gfid, bname, e['stat']) + blob = entry_pack_reg_stat(gfid, bname, e['stat']) else: errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST]) if blob: errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [EEXIST], [ENOENT, ESTALE, EINVAL]) @classmethod + def meta_ops(cls, meta_entries): + logging.debug('Meta-entries: %s' % repr(meta_entries)) + for e in meta_entries: + mode = e['stat']['mode'] + uid = e['stat']['uid'] + gid = e['stat']['gid'] + go = e['go'] + errno_wrap(os.chmod, [go, mode], [ENOENT], [ESTALE, EINVAL]) + errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL]) + + @classmethod def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0): Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) @@ -699,6 +762,29 @@ class SlaveRemote(object): return po + def tarssh(self, files, slaveurl): + """invoke tar+ssh + -z (compress) can be use if needed, but ommitting it now + as it results in wierd error (tar+ssh errors out (errcode: 2) + """ + if not files: + raise GsyncdError("no files to sync") + logging.debug("files: " + ", ".join(files)) + (host, rdir) = slaveurl.split(':') + tar_cmd = ["tar", "-cf", "-", "--files-from", "-"] + ssh_cmd = gconf.ssh_command_tar.split() + [host, "tar", "--overwrite", "-xf", "-", "-C", rdir] + p0 = Popen(tar_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE) + for f in files: + p0.stdin.write(f) + p0.stdin.write('\n') + p0.stdin.close() + p0.wait() + + p1.wait() + p1.terminate_geterr(fail_on_err = False) + + return p1 class AbstractUrl(object): """abstract base class for url scheme classes""" @@ -1041,12 +1127,20 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): except ValueError: pass return e + @classmethod + def lstat(cls, e): + """ path based backend stat """ + return super(brickserver, cls).lstat(e) + @classmethod + def gfid(cls, e): + """ path based backend gfid fetch """ + return super(brickserver, cls).gfid(e) if gconf.slave_id: # define {,set_}xtime in slave, thus preempting # the call to remote, so that it takes data from # the local brick - slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server) - slave.server.set_xtime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_xtime(path, uuid + '.' + gconf.slave_id, mark), slave.server) + slave.server.stime = types.MethodType(lambda _self, path, uuid: brickserver.stime(path, uuid + '.' + gconf.slave_id), slave.server) + slave.server.set_stime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_stime(path, uuid + '.' + gconf.slave_id, mark), slave.server) (g1, g2) = self.gmaster_instantiate_tuple(slave) g1.master.server = brickserver g2.master.server = brickserver @@ -1067,6 +1161,9 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def rsync(self, files): return sup(self, files, self.slavedir) + def tarssh(self, files): + return sup(self, files, self.slavedir) + class SSH(AbstractUrl, SlaveRemote): """scheme class for ssh:// urls @@ -1170,3 +1267,6 @@ class SSH(AbstractUrl, SlaveRemote): def rsync(self, files): return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), *(gconf.rsync_ssh_options.split() + [self.slaveurl])) + + def tarssh(self, files): + return sup(self, files, self.slaveurl) |