diff options
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 191 |
1 files changed, 99 insertions, 92 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index c8575f00e70..f12c7ceaa36 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -19,30 +19,31 @@ import struct import logging import tempfile import subprocess -from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES -from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM +from errno import (EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES, + EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM) import errno from rconf import rconf import gsyncdconfig as gconf +import libgfchangelog import repce from repce import RepceServer, RepceClient from master import gmaster_builder import syncdutils -from syncdutils import GsyncdError, select, privileged, funcode -from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat -from syncdutils import NoStimeAvailable, PartialHistoryAvailable -from syncdutils import ChangelogException, ChangelogHistoryNotAvailable -from syncdutils import get_changelog_log_level, get_rsync_version -from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION -from syncdutils import GX_GFID_CANONICAL_LEN +from syncdutils import (GsyncdError, select, privileged, funcode, + entry2pb, gauxpfx, errno_wrap, lstat, + NoStimeAvailable, PartialHistoryAvailable, + ChangelogException, ChangelogHistoryNotAvailable, + get_changelog_log_level, get_rsync_version, + GX_GFID_CANONICAL_LEN, + gf_mount_ready, lf, Popen, sup, + Xattr, matching_disk_gfid, get_gfid_from_mnt, + unshare_propagation_supported, get_slv_dir_path) from gsyncdstatus import GeorepStatus -from syncdutils import lf, Popen, sup -from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt -from syncdutils import unshare_propagation_supported, get_slv_dir_path -import py2py3 -from py2py3 import pipe, str_to_bytearray +from py2py3 import (pipe, str_to_bytearray, entry_pack_reg, + entry_pack_reg_stat, entry_pack_mkdir, + entry_pack_symlink) ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') @@ -377,37 +378,7 @@ class Server(object): def entry_ops(cls, entries): pfx = gauxpfx() logging.debug('entries: %s' % repr(entries)) - # regular file - - def entry_pack_reg(gf, bn, mo, uid, gid): - blen = len(bn) - return struct.pack(cls._fmt_mknod(blen), - uid, gid, gf.encode(), mo, bn.encode(), - stat.S_IMODE(mo), 0, umask()) - - def entry_pack_reg_stat(gf, bn, st): - blen = len(bn) - mo = st['mode'] - return struct.pack(cls._fmt_mknod(blen), - st['uid'], st['gid'], - gf.encode(), mo, bn.encode(), - stat.S_IMODE(mo), 0, umask()) - # mkdir - - def entry_pack_mkdir(gf, bn, mo, uid, gid): - blen = len(bn) - return struct.pack(cls._fmt_mkdir(blen), - uid, gid, gf.encode(), mo, bn.encode(), - stat.S_IMODE(mo), umask()) - # symlink - - def entry_pack_symlink(gf, bn, lnk, st): - blen = len(bn) - llen = len(lnk) - return struct.pack(cls._fmt_symlink(blen, llen), - st['uid'], st['gid'], - gf.encode(), st['mode'], bn.encode(), - lnk.encode()) + dist_count = rconf.args.master_dist_count def entry_purge(op, entry, gfid, e, uid, gid): # This is an extremely racy code and needs to be fixed ASAP. @@ -457,7 +428,7 @@ class Server(object): e['stat']['uid'] = uid e['stat']['gid'] = gid - if cmd_ret == EEXIST: + if cmd_ret in [EEXIST, ESTALE]: if dst: en = e['entry1'] else: @@ -541,6 +512,12 @@ class Server(object): entry = e['entry'] uid = 0 gid = 0 + + # Skip entry processing if it's marked true during gfid + # conflict resolution + if e['skip_entry']: + continue + if e.get("stat", {}): # Copy UID/GID value and then reset to zero. Copied UID/GID # will be used to run chown once entry is created. @@ -581,8 +558,8 @@ class Server(object): st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): - blob = entry_pack_reg( - gfid, bname, e['mode'], e['uid'], e['gid']) + blob = entry_pack_reg(cls, gfid, bname, + e['mode'], e['uid'], e['gid']) # Self healed hardlinks are recorded as MKNOD. # So if the gfid already exists, it should be # processed as hard link not mknod. @@ -597,8 +574,8 @@ class Server(object): st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): - blob = entry_pack_mkdir( - gfid, bname, e['mode'], e['uid'], e['gid']) + blob = entry_pack_mkdir(cls, gfid, bname, + e['mode'], e['uid'], e['gid']) elif (isinstance(lstat(en), int) or not matching_disk_gfid(gfid, en)): # If gfid of a directory exists on slave but path based @@ -610,6 +587,8 @@ class Server(object): logging.info(lf("Special case: rename on mkdir", gfid=gfid, entry=repr(entry))) src_entry = get_slv_dir_path(slv_host, slv_volume, gfid) + if src_entry is None: + collect_failure(e, ENOENT, uid, gid) if src_entry is not None and src_entry != entry: slv_entry_info = {} slv_entry_info['gfid_mismatch'] = False @@ -626,9 +605,9 @@ class Server(object): if isinstance(st, int): (pg, bname) = entry2pb(entry) if stat.S_ISREG(e['stat']['mode']): - blob = entry_pack_reg_stat(gfid, bname, e['stat']) + blob = entry_pack_reg_stat(cls, gfid, bname, e['stat']) elif stat.S_ISLNK(e['stat']['mode']): - blob = entry_pack_symlink(gfid, bname, e['link'], + blob = entry_pack_symlink(cls, gfid, bname, e['link'], e['stat']) else: cmd_ret = errno_wrap(os.link, @@ -639,7 +618,7 @@ class Server(object): en = e['entry'] st = lstat(entry) if isinstance(st, int): - blob = entry_pack_symlink(gfid, bname, e['link'], + blob = entry_pack_symlink(cls, gfid, bname, e['link'], e['stat']) elif not matching_disk_gfid(gfid, en): collect_failure(e, EEXIST, uid, gid) @@ -654,22 +633,27 @@ class Server(object): # exist with different gfid. if not matching_disk_gfid(gfid, entry): if e['stat'] and not stat.S_ISDIR(e['stat']['mode']): - if stat.S_ISLNK(e['stat']['mode']) and \ - e['link'] is not None: - st1 = lstat(en) - if isinstance(st1, int): - (pg, bname) = entry2pb(en) - blob = entry_pack_symlink(gfid, bname, - e['link'], e['stat']) - elif not matching_disk_gfid(gfid, en): - collect_failure(e, EEXIST, uid, gid, True) + if stat.S_ISLNK(e['stat']['mode']): + # src is not present, so don't sync symlink as + # we don't know target. It's ok to ignore. If + # it's unliked, it's fine. If it's renamed to + # something else, it will be synced then. + if e['link'] is not None: + st1 = lstat(en) + if isinstance(st1, int): + (pg, bname) = entry2pb(en) + blob = entry_pack_symlink(cls, gfid, bname, + e['link'], + e['stat']) + elif not matching_disk_gfid(gfid, en): + collect_failure(e, EEXIST, uid, gid, True) else: slink = os.path.join(pfx, gfid) st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): (pg, bname) = entry2pb(en) - blob = entry_pack_reg_stat(gfid, bname, + blob = entry_pack_reg_stat(cls, gfid, bname, e['stat']) else: cmd_ret = errno_wrap(os.link, [slink, en], @@ -704,15 +688,21 @@ class Server(object): raise else: raise - elif not matching_disk_gfid(gfid, en): + elif not matching_disk_gfid(gfid, en) and dist_count > 1: collect_failure(e, EEXIST, uid, gid, True) else: + # We are here which means matching_disk_gfid for + # both source and destination has returned false + # and distribution count for master vol is greater + # then one. Which basically says both the source and + # destination exist and not hardlinks. + # So we are safe to go ahead with rename here. rename_with_disk_gfid_confirmation(gfid, entry, en, uid, gid) if blob: cmd_ret = errno_wrap(Xattr.lsetxattr, [pg, 'glusterfs.gfid.newfile', blob], - [EEXIST, ENOENT], + [EEXIST, ENOENT, ESTALE], [ESTALE, EINVAL, EBUSY]) collect_failure(e, cmd_ret, uid, gid) @@ -961,6 +951,16 @@ class Mounter(object): logging.exception('mount cleanup failure:') rv = 200 os._exit(rv) + + #Polling the dht.subvol.status value. + RETRIES = 10 + while not gf_mount_ready(): + if RETRIES < 0: + logging.error('Subvols are not up') + break + RETRIES -= 1 + time.sleep(0.2) + logging.debug('auxiliary glusterfs mount prepared') @@ -1256,9 +1256,6 @@ class GLUSTER(object): # register the crawlers and start crawling # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) # g3 ==> changelog History - (inf, ouf, ra, wa) = rconf.args.rpc_fd.split(',') - changelog_agent = RepceClient(int(inf), int(ouf)) - status = GeorepStatus(gconf.get("state-file"), rconf.args.local_node, rconf.args.local_path, @@ -1266,12 +1263,6 @@ class GLUSTER(object): rconf.args.master, rconf.args.slave) status.reset_on_worker_start() - rv = changelog_agent.version() - if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: - raise GsyncdError( - "RePCe major version mismatch(changelog agent): " - "local %s, remote %s" % - (CHANGELOG_AGENT_CLIENT_VERSION, rv)) try: workdir = g2.setup_working_dir() @@ -1282,17 +1273,16 @@ class GLUSTER(object): # register with the changelog library # 9 == log level (DEBUG) # 5 == connection retries - changelog_agent.init() - changelog_agent.register(rconf.args.local_path, - workdir, - gconf.get("changelog-log-file"), - get_changelog_log_level( - gconf.get("changelog-log-level")), - g2.CHANGELOG_CONN_RETRIES) + libgfchangelog.register(rconf.args.local_path, + workdir, + gconf.get("changelog-log-file"), + get_changelog_log_level( + gconf.get("changelog-log-level")), + g2.CHANGELOG_CONN_RETRIES) register_time = int(time.time()) - g2.register(register_time, changelog_agent, status) - g3.register(register_time, changelog_agent, status) + g2.register(register_time, status) + g3.register(register_time, status) except ChangelogException as e: logging.error(lf("Changelog register failed", error=e)) sys.exit(1) @@ -1427,7 +1417,9 @@ class SSH(object): '--slave-gluster-log-level', gconf.get("slave-gluster-log-level"), '--slave-gluster-command-dir', - gconf.get("slave-gluster-command-dir")] + gconf.get("slave-gluster-command-dir"), + '--master-dist-count', + str(gconf.get("master-distribution-count"))] if gconf.get("slave-access-mount"): args_to_slave.append('--slave-access-mount') @@ -1492,7 +1484,7 @@ class SSH(object): if log_rsync_performance: # use stdout=PIPE only when log_rsync_performance enabled - # Else rsync will write to stdout and nobody is their + # Else rsync will write to stdout and nobody is there # to consume. If PIPE is full rsync hangs. po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) @@ -1530,7 +1522,7 @@ class SSH(object): return po - def tarssh(self, files, slaveurl, log_err=False): + def tarssh(self, files, log_err=False): """invoke tar+ssh -z (compress) can be use if needed, but omitting it now as it results in weird error (tar+ssh errors out (errcode: 2) @@ -1538,10 +1530,11 @@ class SSH(object): if not files: raise GsyncdError("no files to sync") logging.debug("files: " + ", ".join(files)) - (host, rdir) = slaveurl.split(':') + (host, rdir) = self.slaveurl.split(':') + tar_cmd = ["tar"] + \ ["--sparse", "-cf", "-", "--files-from", "-"] - ssh_cmd = gconf.get("ssh-command-tar").split() + \ + ssh_cmd = gconf.get("ssh-command").split() + \ gconf.get("ssh-options-tar").split() + \ ["-p", str(gconf.get("ssh-port"))] + \ [host, "tar"] + \ @@ -1557,15 +1550,29 @@ class SSH(object): p0.stdin.close() p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits. - # wait for tar to terminate, collecting any errors, further - # waiting for transfer to complete - _, stderr1 = p1.communicate() # stdin and stdout of p0 is already closed, Reset to None and # wait for child process to complete p0.stdin = None p0.stdout = None - p0.communicate() + + def wait_for_tar(p0): + _, stderr = p0.communicate() + if log_err: + for errline in stderr.strip().split("\n")[:-1]: + if "No such file or directory" not in errline: + logging.error(lf("SYNC Error", + sync_engine="Tarssh", + error=errline)) + + t = syncdutils.Thread(target=wait_for_tar, args=(p0, )) + # wait for tar to terminate, collecting any errors, further + # waiting for transfer to complete + t.start() + + # wait for ssh process + _, stderr1 = p1.communicate() + t.join() if log_err: for errline in stderr1.strip().split("\n")[:-1]: |
