summaryrefslogtreecommitdiffstats
path: root/geo-replication
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication')
-rw-r--r--geo-replication/syncdaemon/master.py108
-rw-r--r--geo-replication/syncdaemon/resource.py49
2 files changed, 70 insertions, 87 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index e743fdf2e50..8d2158fb406 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -245,8 +245,7 @@ class TarSSHEngine(object):
def a_syncdata(self, files):
logging.debug('files: %s' % (files))
- self.current_files_skipped_count = 0
- del self.skipped_gfid_list[:]
+
for f in files:
pb = self.syncer.add(f)
@@ -260,12 +259,9 @@ class TarSSHEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
- self.unlinked_gfids.append(se)
+ self.unlinked_gfids.add(se)
return True
- se_list = se.split('/')
- self.current_files_skipped_count += 1
- self.skipped_gfid_list.append(se_list[1])
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
def syncdata_wait(self):
@@ -283,8 +279,7 @@ class RsyncEngine(object):
def a_syncdata(self, files):
logging.debug('files: %s' % (files))
- self.current_files_skipped_count = 0
- del self.skipped_gfid_list[:]
+
for f in files:
logging.debug('candidate for syncing %s' % f)
pb = self.syncer.add(f)
@@ -299,12 +294,9 @@ class RsyncEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
- self.unlinked_gfids.append(se)
+ self.unlinked_gfids.add(se)
return True
- se_list = se.split('/')
- self.current_files_skipped_count += 1
- self.skipped_gfid_list.append(se_list[1])
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
def syncdata_wait(self):
@@ -364,35 +356,6 @@ class GMasterCommon(object):
self.make_xtime_opts(rsc == self.master, opts)
return self.xtime_low(rsc, path, **opts)
- def get_initial_crawl_data(self):
- # while persisting only 'files_syncd' is non-zero, rest of
- # the stats are nulls. lets keep it that way in case they
- # are needed to be used some day...
- default_data = {'files_syncd': 0,
- 'files_remaining': 0,
- 'bytes_remaining': 0,
- 'purges_remaining': 0,
- 'total_files_skipped': 0}
- if getattr(gconf, 'state_detail_file', None):
- try:
- with open(gconf.state_detail_file, 'r+') as f:
- loaded_data = json.load(f)
- diff_data = set(default_data) - set(loaded_data)
- if len(diff_data):
- for i in diff_data:
- loaded_data[i] = default_data[i]
- return loaded_data
- except IOError:
- logging.warn('Creating new gconf.state_detail_file.')
- # Create file with initial data
- try:
- with open(gconf.state_detail_file, 'wb') as f:
- json.dump(default_data, f)
- return default_data
- except:
- raise
- return default_data
-
def __init__(self, master, slave):
self.master = master
self.slave = slave
@@ -424,9 +387,7 @@ class GMasterCommon(object):
self.volinfo = None
self.terminate = False
self.sleep_interval = 1
- self.current_files_skipped_count = 0
- self.skipped_gfid_list = []
- self.unlinked_gfids = []
+ self.unlinked_gfids = set()
def init_keep_alive(cls):
"""start the keep-alive thread """
@@ -819,7 +780,8 @@ class GMasterChangelogMixin(GMasterCommon):
st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
if not isinstance(st, int):
num_failures += 1
- logging.warn('%s FAILED: %s' % (log_prefix, repr(failure)))
+ logging.error('%s FAILED: %s' % (log_prefix,
+ repr(failure)))
self.status.inc_value("failures", num_failures)
@@ -994,17 +956,17 @@ class GMasterChangelogMixin(GMasterCommon):
# sync data
if datas:
self.a_syncdata(datas)
+ self.datas_in_batch.update(datas)
def process(self, changes, done=1):
tries = 0
retry = False
- self.unlinked_gfids = []
+ self.unlinked_gfids = set()
self.files_in_batch = 0
+ self.datas_in_batch = set()
+ self.syncer.disable_errorlog()
while True:
- self.skipped_gfid_list = []
- self.current_files_skipped_count = 0
-
# first, fire all changelog transfers in parallel. entry and
# metadata are performed synchronously, therefore in serial.
# However at the end of each changelog, data is synchronized
@@ -1012,12 +974,25 @@ class GMasterChangelogMixin(GMasterCommon):
# entries/metadata of that changelog but happens in parallel
# with data of other changelogs.
- for change in changes:
- logging.debug('processing change %s' % change)
- self.process_change(change, done, retry)
- if not retry:
- # number of changelogs processed in the batch
- self.turns += 1
+ if retry:
+ if tries == (self.MAX_RETRIES - 1):
+ # Enable Error logging if it is last retry
+ self.syncer.enable_errorlog()
+
+ # Remove Unlinked GFIDs from Queue
+ for unlinked_gfid in self.unlinked_gfids:
+ self.datas_in_batch.remove(unlinked_gfid)
+
+ # Retry only Sync. Do not retry entry ops
+ if self.datas_in_batch:
+ self.a_syncdata(self.datas_in_batch)
+ else:
+ for change in changes:
+ logging.debug('processing change %s' % change)
+ self.process_change(change, done, retry)
+ if not retry:
+ # number of changelogs processed in the batch
+ self.turns += 1
# Now we wait for all the data transfers fired off in the above
# step to complete. Note that this is not ideal either. Ideally
@@ -1040,7 +1015,7 @@ class GMasterChangelogMixin(GMasterCommon):
# @change is the last changelog (therefore max time for this batch)
if self.syncdata_wait():
- self.unlinked_gfids = []
+ self.unlinked_gfids = set()
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
@@ -1050,23 +1025,21 @@ class GMasterChangelogMixin(GMasterCommon):
# Reset Data counter after sync
self.status.dec_value("data", self.files_in_batch)
self.files_in_batch = 0
+ self.datas_in_batch = set()
break
# We do not know which changelog transfer failed, retry everything.
retry = True
tries += 1
if tries == self.MAX_RETRIES:
- logging.warn('changelogs %s could not be processed - '
- 'moving on...' %
- ' '.join(map(os.path.basename, changes)))
- self.status.inc_value("failures",
- self.current_files_skipped_count)
- logging.warn('SKIPPED GFID = %s' %
- ','.join(self.skipped_gfid_list))
+ logging.error('changelogs %s could not be processed '
+ 'completely - moving on...' %
+ ' '.join(map(os.path.basename, changes)))
# Reset data counter on failure
self.status.dec_value("data", self.files_in_batch)
self.files_in_batch = 0
+ self.datas_in_batch = set()
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
@@ -1570,6 +1543,7 @@ class Syncer(object):
def __init__(self, slave, sync_engine, resilient_errnos=[]):
"""spawn worker threads"""
+ self.log_err = False
self.slave = slave
self.lock = Lock()
self.pb = PostBox()
@@ -1592,7 +1566,7 @@ class Syncer(object):
break
time.sleep(0.5)
pb.close()
- po = self.sync_engine(pb)
+ po = self.sync_engine(pb, self.log_err)
if po.returncode == 0:
ret = (True, 0)
elif po.returncode in self.errnos_ok:
@@ -1609,3 +1583,9 @@ class Syncer(object):
return pb
except BoxClosedErr:
pass
+
+ def enable_errorlog(self):
+ self.log_err = True
+
+ def disable_errorlog(self):
+ self.log_err = False
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 19363401e65..ac697eb39ed 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -906,7 +906,7 @@ class SlaveRemote(object):
"RePCe major version mismatch: local %s, remote %s" %
(exrv, rv))
- def rsync(self, files, *args):
+ def rsync(self, files, *args, **kw):
"""invoke rsync"""
if not files:
raise GsyncdError("no files to sync")
@@ -932,12 +932,15 @@ class SlaveRemote(object):
po.stdin.write(f)
po.stdin.write('\0')
- po.stdin.close()
+ stdout, stderr = po.communicate()
+
+ if kw.get("log_err", False):
+ for errline in stderr.strip().split("\n")[:-1]:
+ logging.error("SYNC Error(Rsync): %s" % errline)
if gconf.log_rsync_performance:
- out = po.stdout.read()
rsync_msg = []
- for line in out.split("\n"):
+ for line in stdout.split("\n"):
if line.startswith("Number of files:") or \
line.startswith("Number of regular files transferred:") or \
line.startswith("Total file size:") or \
@@ -949,12 +952,10 @@ class SlaveRemote(object):
line.startswith("sent "):
rsync_msg.append(line)
logging.info("rsync performance: %s" % ", ".join(rsync_msg))
- po.wait()
- po.terminate_geterr(fail_on_err=False)
return po
- def tarssh(self, files, slaveurl):
+ def tarssh(self, files, slaveurl, log_err=False):
"""invoke tar+ssh
-z (compress) can be use if needed, but omitting it now
as it results in weird error (tar+ssh errors out (errcode: 2)
@@ -975,15 +976,16 @@ class SlaveRemote(object):
for f in files:
p0.stdin.write(f)
p0.stdin.write('\n')
- p0.stdin.close()
- # wait() for tar to terminate, collecting any errors, further
+ p0.stdin.close()
+ p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits.
+ # wait for tar to terminate, collecting any errors, further
# waiting for transfer to complete
- p0.wait()
- p0.terminate_geterr(fail_on_err=False)
+ _, stderr1 = p1.communicate()
- p1.wait()
- p1.terminate_geterr(fail_on_err=False)
+ if log_err:
+ for errline in stderr1.strip().split("\n")[:-1]:
+ logging.error("SYNC Error(Untar): %s" % errline)
return p1
@@ -1045,8 +1047,8 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
"""inhibit the resource beyond"""
os.chdir(self.path)
- def rsync(self, files):
- return sup(self, files, self.path)
+ def rsync(self, files, log_err=False):
+ return sup(self, files, self.path, log_err=log_err)
class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
@@ -1460,11 +1462,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
else:
sup(self, *args)
- def rsync(self, files):
- return sup(self, files, self.slavedir)
+ def rsync(self, files, log_err=False):
+ return sup(self, files, self.slavedir, log_err=log_err)
- def tarssh(self, files):
- return sup(self, files, self.slavedir)
+ def tarssh(self, files, log_err=False):
+ return sup(self, files, self.slavedir, log_err=log_err)
class SSH(AbstractUrl, SlaveRemote):
@@ -1571,12 +1573,13 @@ class SSH(AbstractUrl, SlaveRemote):
self.fd_pair = (i, o)
return 'should'
- def rsync(self, files):
+ def rsync(self, files, log_err=False):
return sup(self, files, '-e',
" ".join(gconf.ssh_command.split() +
["-p", str(gconf.ssh_port)] +
gconf.ssh_ctl_args),
- *(gconf.rsync_ssh_options.split() + [self.slaveurl]))
+ *(gconf.rsync_ssh_options.split() + [self.slaveurl]),
+ log_err=log_err)
- def tarssh(self, files):
- return sup(self, files, self.slaveurl)
+ def tarssh(self, files, log_err=False):
+ return sup(self, files, self.slaveurl, log_err=log_err)