summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r--geo-replication/syncdaemon/master.py108
1 files changed, 44 insertions, 64 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index e743fdf2e50..8d2158fb406 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -245,8 +245,7 @@ class TarSSHEngine(object):
def a_syncdata(self, files):
logging.debug('files: %s' % (files))
- self.current_files_skipped_count = 0
- del self.skipped_gfid_list[:]
+
for f in files:
pb = self.syncer.add(f)
@@ -260,12 +259,9 @@ class TarSSHEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
- self.unlinked_gfids.append(se)
+ self.unlinked_gfids.add(se)
return True
- se_list = se.split('/')
- self.current_files_skipped_count += 1
- self.skipped_gfid_list.append(se_list[1])
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
def syncdata_wait(self):
@@ -283,8 +279,7 @@ class RsyncEngine(object):
def a_syncdata(self, files):
logging.debug('files: %s' % (files))
- self.current_files_skipped_count = 0
- del self.skipped_gfid_list[:]
+
for f in files:
logging.debug('candidate for syncing %s' % f)
pb = self.syncer.add(f)
@@ -299,12 +294,9 @@ class RsyncEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
- self.unlinked_gfids.append(se)
+ self.unlinked_gfids.add(se)
return True
- se_list = se.split('/')
- self.current_files_skipped_count += 1
- self.skipped_gfid_list.append(se_list[1])
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
def syncdata_wait(self):
@@ -364,35 +356,6 @@ class GMasterCommon(object):
self.make_xtime_opts(rsc == self.master, opts)
return self.xtime_low(rsc, path, **opts)
- def get_initial_crawl_data(self):
- # while persisting only 'files_syncd' is non-zero, rest of
- # the stats are nulls. lets keep it that way in case they
- # are needed to be used some day...
- default_data = {'files_syncd': 0,
- 'files_remaining': 0,
- 'bytes_remaining': 0,
- 'purges_remaining': 0,
- 'total_files_skipped': 0}
- if getattr(gconf, 'state_detail_file', None):
- try:
- with open(gconf.state_detail_file, 'r+') as f:
- loaded_data = json.load(f)
- diff_data = set(default_data) - set(loaded_data)
- if len(diff_data):
- for i in diff_data:
- loaded_data[i] = default_data[i]
- return loaded_data
- except IOError:
- logging.warn('Creating new gconf.state_detail_file.')
- # Create file with initial data
- try:
- with open(gconf.state_detail_file, 'wb') as f:
- json.dump(default_data, f)
- return default_data
- except:
- raise
- return default_data
-
def __init__(self, master, slave):
self.master = master
self.slave = slave
@@ -424,9 +387,7 @@ class GMasterCommon(object):
self.volinfo = None
self.terminate = False
self.sleep_interval = 1
- self.current_files_skipped_count = 0
- self.skipped_gfid_list = []
- self.unlinked_gfids = []
+ self.unlinked_gfids = set()
def init_keep_alive(cls):
"""start the keep-alive thread """
@@ -819,7 +780,8 @@ class GMasterChangelogMixin(GMasterCommon):
st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
if not isinstance(st, int):
num_failures += 1
- logging.warn('%s FAILED: %s' % (log_prefix, repr(failure)))
+ logging.error('%s FAILED: %s' % (log_prefix,
+ repr(failure)))
self.status.inc_value("failures", num_failures)
@@ -994,17 +956,17 @@ class GMasterChangelogMixin(GMasterCommon):
# sync data
if datas:
self.a_syncdata(datas)
+ self.datas_in_batch.update(datas)
def process(self, changes, done=1):
tries = 0
retry = False
- self.unlinked_gfids = []
+ self.unlinked_gfids = set()
self.files_in_batch = 0
+ self.datas_in_batch = set()
+ self.syncer.disable_errorlog()
while True:
- self.skipped_gfid_list = []
- self.current_files_skipped_count = 0
-
# first, fire all changelog transfers in parallel. entry and
# metadata are performed synchronously, therefore in serial.
# However at the end of each changelog, data is synchronized
@@ -1012,12 +974,25 @@ class GMasterChangelogMixin(GMasterCommon):
# entries/metadata of that changelog but happens in parallel
# with data of other changelogs.
- for change in changes:
- logging.debug('processing change %s' % change)
- self.process_change(change, done, retry)
- if not retry:
- # number of changelogs processed in the batch
- self.turns += 1
+ if retry:
+ if tries == (self.MAX_RETRIES - 1):
+ # Enable Error logging if it is last retry
+ self.syncer.enable_errorlog()
+
+ # Remove Unlinked GFIDs from Queue
+ for unlinked_gfid in self.unlinked_gfids:
+ self.datas_in_batch.remove(unlinked_gfid)
+
+ # Retry only Sync. Do not retry entry ops
+ if self.datas_in_batch:
+ self.a_syncdata(self.datas_in_batch)
+ else:
+ for change in changes:
+ logging.debug('processing change %s' % change)
+ self.process_change(change, done, retry)
+ if not retry:
+ # number of changelogs processed in the batch
+ self.turns += 1
# Now we wait for all the data transfers fired off in the above
# step to complete. Note that this is not ideal either. Ideally
@@ -1040,7 +1015,7 @@ class GMasterChangelogMixin(GMasterCommon):
# @change is the last changelog (therefore max time for this batch)
if self.syncdata_wait():
- self.unlinked_gfids = []
+ self.unlinked_gfids = set()
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
@@ -1050,23 +1025,21 @@ class GMasterChangelogMixin(GMasterCommon):
# Reset Data counter after sync
self.status.dec_value("data", self.files_in_batch)
self.files_in_batch = 0
+ self.datas_in_batch = set()
break
# We do not know which changelog transfer failed, retry everything.
retry = True
tries += 1
if tries == self.MAX_RETRIES:
- logging.warn('changelogs %s could not be processed - '
- 'moving on...' %
- ' '.join(map(os.path.basename, changes)))
- self.status.inc_value("failures",
- self.current_files_skipped_count)
- logging.warn('SKIPPED GFID = %s' %
- ','.join(self.skipped_gfid_list))
+ logging.error('changelogs %s could not be processed '
+ 'completely - moving on...' %
+ ' '.join(map(os.path.basename, changes)))
# Reset data counter on failure
self.status.dec_value("data", self.files_in_batch)
self.files_in_batch = 0
+ self.datas_in_batch = set()
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
@@ -1570,6 +1543,7 @@ class Syncer(object):
def __init__(self, slave, sync_engine, resilient_errnos=[]):
"""spawn worker threads"""
+ self.log_err = False
self.slave = slave
self.lock = Lock()
self.pb = PostBox()
@@ -1592,7 +1566,7 @@ class Syncer(object):
break
time.sleep(0.5)
pb.close()
- po = self.sync_engine(pb)
+ po = self.sync_engine(pb, self.log_err)
if po.returncode == 0:
ret = (True, 0)
elif po.returncode in self.errnos_ok:
@@ -1609,3 +1583,9 @@ class Syncer(object):
return pb
except BoxClosedErr:
pass
+
+ def enable_errorlog(self):
+ self.log_err = True
+
+ def disable_errorlog(self):
+ self.log_err = False