summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r--geo-replication/syncdaemon/resource.py191
1 files changed, 99 insertions, 92 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index c8575f00e70..f12c7ceaa36 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -19,30 +19,31 @@ import struct
import logging
import tempfile
import subprocess
-from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES
-from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM
+from errno import (EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES,
+ EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM)
import errno
from rconf import rconf
import gsyncdconfig as gconf
+import libgfchangelog
import repce
from repce import RepceServer, RepceClient
from master import gmaster_builder
import syncdutils
-from syncdutils import GsyncdError, select, privileged, funcode
-from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
-from syncdutils import NoStimeAvailable, PartialHistoryAvailable
-from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
-from syncdutils import get_changelog_log_level, get_rsync_version
-from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
-from syncdutils import GX_GFID_CANONICAL_LEN
+from syncdutils import (GsyncdError, select, privileged, funcode,
+ entry2pb, gauxpfx, errno_wrap, lstat,
+ NoStimeAvailable, PartialHistoryAvailable,
+ ChangelogException, ChangelogHistoryNotAvailable,
+ get_changelog_log_level, get_rsync_version,
+ GX_GFID_CANONICAL_LEN,
+ gf_mount_ready, lf, Popen, sup,
+ Xattr, matching_disk_gfid, get_gfid_from_mnt,
+ unshare_propagation_supported, get_slv_dir_path)
from gsyncdstatus import GeorepStatus
-from syncdutils import lf, Popen, sup
-from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
-from syncdutils import unshare_propagation_supported, get_slv_dir_path
-import py2py3
-from py2py3 import pipe, str_to_bytearray
+from py2py3 import (pipe, str_to_bytearray, entry_pack_reg,
+ entry_pack_reg_stat, entry_pack_mkdir,
+ entry_pack_symlink)
ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
@@ -377,37 +378,7 @@ class Server(object):
def entry_ops(cls, entries):
pfx = gauxpfx()
logging.debug('entries: %s' % repr(entries))
- # regular file
-
- def entry_pack_reg(gf, bn, mo, uid, gid):
- blen = len(bn)
- return struct.pack(cls._fmt_mknod(blen),
- uid, gid, gf.encode(), mo, bn.encode(),
- stat.S_IMODE(mo), 0, umask())
-
- def entry_pack_reg_stat(gf, bn, st):
- blen = len(bn)
- mo = st['mode']
- return struct.pack(cls._fmt_mknod(blen),
- st['uid'], st['gid'],
- gf.encode(), mo, bn.encode(),
- stat.S_IMODE(mo), 0, umask())
- # mkdir
-
- def entry_pack_mkdir(gf, bn, mo, uid, gid):
- blen = len(bn)
- return struct.pack(cls._fmt_mkdir(blen),
- uid, gid, gf.encode(), mo, bn.encode(),
- stat.S_IMODE(mo), umask())
- # symlink
-
- def entry_pack_symlink(gf, bn, lnk, st):
- blen = len(bn)
- llen = len(lnk)
- return struct.pack(cls._fmt_symlink(blen, llen),
- st['uid'], st['gid'],
- gf.encode(), st['mode'], bn.encode(),
- lnk.encode())
+ dist_count = rconf.args.master_dist_count
def entry_purge(op, entry, gfid, e, uid, gid):
# This is an extremely racy code and needs to be fixed ASAP.
@@ -457,7 +428,7 @@ class Server(object):
e['stat']['uid'] = uid
e['stat']['gid'] = gid
- if cmd_ret == EEXIST:
+ if cmd_ret in [EEXIST, ESTALE]:
if dst:
en = e['entry1']
else:
@@ -541,6 +512,12 @@ class Server(object):
entry = e['entry']
uid = 0
gid = 0
+
+ # Skip entry processing if it's marked true during gfid
+ # conflict resolution
+ if e['skip_entry']:
+ continue
+
if e.get("stat", {}):
# Copy UID/GID value and then reset to zero. Copied UID/GID
# will be used to run chown once entry is created.
@@ -581,8 +558,8 @@ class Server(object):
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
- blob = entry_pack_reg(
- gfid, bname, e['mode'], e['uid'], e['gid'])
+ blob = entry_pack_reg(cls, gfid, bname,
+ e['mode'], e['uid'], e['gid'])
# Self healed hardlinks are recorded as MKNOD.
# So if the gfid already exists, it should be
# processed as hard link not mknod.
@@ -597,8 +574,8 @@ class Server(object):
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
- blob = entry_pack_mkdir(
- gfid, bname, e['mode'], e['uid'], e['gid'])
+ blob = entry_pack_mkdir(cls, gfid, bname,
+ e['mode'], e['uid'], e['gid'])
elif (isinstance(lstat(en), int) or
not matching_disk_gfid(gfid, en)):
# If gfid of a directory exists on slave but path based
@@ -610,6 +587,8 @@ class Server(object):
logging.info(lf("Special case: rename on mkdir",
gfid=gfid, entry=repr(entry)))
src_entry = get_slv_dir_path(slv_host, slv_volume, gfid)
+ if src_entry is None:
+ collect_failure(e, ENOENT, uid, gid)
if src_entry is not None and src_entry != entry:
slv_entry_info = {}
slv_entry_info['gfid_mismatch'] = False
@@ -626,9 +605,9 @@ class Server(object):
if isinstance(st, int):
(pg, bname) = entry2pb(entry)
if stat.S_ISREG(e['stat']['mode']):
- blob = entry_pack_reg_stat(gfid, bname, e['stat'])
+ blob = entry_pack_reg_stat(cls, gfid, bname, e['stat'])
elif stat.S_ISLNK(e['stat']['mode']):
- blob = entry_pack_symlink(gfid, bname, e['link'],
+ blob = entry_pack_symlink(cls, gfid, bname, e['link'],
e['stat'])
else:
cmd_ret = errno_wrap(os.link,
@@ -639,7 +618,7 @@ class Server(object):
en = e['entry']
st = lstat(entry)
if isinstance(st, int):
- blob = entry_pack_symlink(gfid, bname, e['link'],
+ blob = entry_pack_symlink(cls, gfid, bname, e['link'],
e['stat'])
elif not matching_disk_gfid(gfid, en):
collect_failure(e, EEXIST, uid, gid)
@@ -654,22 +633,27 @@ class Server(object):
# exist with different gfid.
if not matching_disk_gfid(gfid, entry):
if e['stat'] and not stat.S_ISDIR(e['stat']['mode']):
- if stat.S_ISLNK(e['stat']['mode']) and \
- e['link'] is not None:
- st1 = lstat(en)
- if isinstance(st1, int):
- (pg, bname) = entry2pb(en)
- blob = entry_pack_symlink(gfid, bname,
- e['link'], e['stat'])
- elif not matching_disk_gfid(gfid, en):
- collect_failure(e, EEXIST, uid, gid, True)
+ if stat.S_ISLNK(e['stat']['mode']):
+ # src is not present, so don't sync symlink as
+ # we don't know target. It's ok to ignore. If
+ # it's unliked, it's fine. If it's renamed to
+ # something else, it will be synced then.
+ if e['link'] is not None:
+ st1 = lstat(en)
+ if isinstance(st1, int):
+ (pg, bname) = entry2pb(en)
+ blob = entry_pack_symlink(cls, gfid, bname,
+ e['link'],
+ e['stat'])
+ elif not matching_disk_gfid(gfid, en):
+ collect_failure(e, EEXIST, uid, gid, True)
else:
slink = os.path.join(pfx, gfid)
st = lstat(slink)
# don't create multiple entries with same gfid
if isinstance(st, int):
(pg, bname) = entry2pb(en)
- blob = entry_pack_reg_stat(gfid, bname,
+ blob = entry_pack_reg_stat(cls, gfid, bname,
e['stat'])
else:
cmd_ret = errno_wrap(os.link, [slink, en],
@@ -704,15 +688,21 @@ class Server(object):
raise
else:
raise
- elif not matching_disk_gfid(gfid, en):
+ elif not matching_disk_gfid(gfid, en) and dist_count > 1:
collect_failure(e, EEXIST, uid, gid, True)
else:
+ # We are here which means matching_disk_gfid for
+ # both source and destination has returned false
+ # and distribution count for master vol is greater
+ # then one. Which basically says both the source and
+ # destination exist and not hardlinks.
+ # So we are safe to go ahead with rename here.
rename_with_disk_gfid_confirmation(gfid, entry, en,
uid, gid)
if blob:
cmd_ret = errno_wrap(Xattr.lsetxattr,
[pg, 'glusterfs.gfid.newfile', blob],
- [EEXIST, ENOENT],
+ [EEXIST, ENOENT, ESTALE],
[ESTALE, EINVAL, EBUSY])
collect_failure(e, cmd_ret, uid, gid)
@@ -961,6 +951,16 @@ class Mounter(object):
logging.exception('mount cleanup failure:')
rv = 200
os._exit(rv)
+
+ #Polling the dht.subvol.status value.
+ RETRIES = 10
+ while not gf_mount_ready():
+ if RETRIES < 0:
+ logging.error('Subvols are not up')
+ break
+ RETRIES -= 1
+ time.sleep(0.2)
+
logging.debug('auxiliary glusterfs mount prepared')
@@ -1256,9 +1256,6 @@ class GLUSTER(object):
# register the crawlers and start crawling
# g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
# g3 ==> changelog History
- (inf, ouf, ra, wa) = rconf.args.rpc_fd.split(',')
- changelog_agent = RepceClient(int(inf), int(ouf))
-
status = GeorepStatus(gconf.get("state-file"),
rconf.args.local_node,
rconf.args.local_path,
@@ -1266,12 +1263,6 @@ class GLUSTER(object):
rconf.args.master,
rconf.args.slave)
status.reset_on_worker_start()
- rv = changelog_agent.version()
- if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:
- raise GsyncdError(
- "RePCe major version mismatch(changelog agent): "
- "local %s, remote %s" %
- (CHANGELOG_AGENT_CLIENT_VERSION, rv))
try:
workdir = g2.setup_working_dir()
@@ -1282,17 +1273,16 @@ class GLUSTER(object):
# register with the changelog library
# 9 == log level (DEBUG)
# 5 == connection retries
- changelog_agent.init()
- changelog_agent.register(rconf.args.local_path,
- workdir,
- gconf.get("changelog-log-file"),
- get_changelog_log_level(
- gconf.get("changelog-log-level")),
- g2.CHANGELOG_CONN_RETRIES)
+ libgfchangelog.register(rconf.args.local_path,
+ workdir,
+ gconf.get("changelog-log-file"),
+ get_changelog_log_level(
+ gconf.get("changelog-log-level")),
+ g2.CHANGELOG_CONN_RETRIES)
register_time = int(time.time())
- g2.register(register_time, changelog_agent, status)
- g3.register(register_time, changelog_agent, status)
+ g2.register(register_time, status)
+ g3.register(register_time, status)
except ChangelogException as e:
logging.error(lf("Changelog register failed", error=e))
sys.exit(1)
@@ -1427,7 +1417,9 @@ class SSH(object):
'--slave-gluster-log-level',
gconf.get("slave-gluster-log-level"),
'--slave-gluster-command-dir',
- gconf.get("slave-gluster-command-dir")]
+ gconf.get("slave-gluster-command-dir"),
+ '--master-dist-count',
+ str(gconf.get("master-distribution-count"))]
if gconf.get("slave-access-mount"):
args_to_slave.append('--slave-access-mount')
@@ -1492,7 +1484,7 @@ class SSH(object):
if log_rsync_performance:
# use stdout=PIPE only when log_rsync_performance enabled
- # Else rsync will write to stdout and nobody is their
+ # Else rsync will write to stdout and nobody is there
# to consume. If PIPE is full rsync hangs.
po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=True)
@@ -1530,7 +1522,7 @@ class SSH(object):
return po
- def tarssh(self, files, slaveurl, log_err=False):
+ def tarssh(self, files, log_err=False):
"""invoke tar+ssh
-z (compress) can be use if needed, but omitting it now
as it results in weird error (tar+ssh errors out (errcode: 2)
@@ -1538,10 +1530,11 @@ class SSH(object):
if not files:
raise GsyncdError("no files to sync")
logging.debug("files: " + ", ".join(files))
- (host, rdir) = slaveurl.split(':')
+ (host, rdir) = self.slaveurl.split(':')
+
tar_cmd = ["tar"] + \
["--sparse", "-cf", "-", "--files-from", "-"]
- ssh_cmd = gconf.get("ssh-command-tar").split() + \
+ ssh_cmd = gconf.get("ssh-command").split() + \
gconf.get("ssh-options-tar").split() + \
["-p", str(gconf.get("ssh-port"))] + \
[host, "tar"] + \
@@ -1557,15 +1550,29 @@ class SSH(object):
p0.stdin.close()
p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits.
- # wait for tar to terminate, collecting any errors, further
- # waiting for transfer to complete
- _, stderr1 = p1.communicate()
# stdin and stdout of p0 is already closed, Reset to None and
# wait for child process to complete
p0.stdin = None
p0.stdout = None
- p0.communicate()
+
+ def wait_for_tar(p0):
+ _, stderr = p0.communicate()
+ if log_err:
+ for errline in stderr.strip().split("\n")[:-1]:
+ if "No such file or directory" not in errline:
+ logging.error(lf("SYNC Error",
+ sync_engine="Tarssh",
+ error=errline))
+
+ t = syncdutils.Thread(target=wait_for_tar, args=(p0, ))
+ # wait for tar to terminate, collecting any errors, further
+ # waiting for transfer to complete
+ t.start()
+
+ # wait for ssh process
+ _, stderr1 = p1.communicate()
+ t.join()
if log_err:
for errline in stderr1.strip().split("\n")[:-1]: