diff options
Diffstat (limited to 'geo-replication')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 108 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 49 |
2 files changed, 70 insertions, 87 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index e743fdf2e50..8d2158fb406 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -245,8 +245,7 @@ class TarSSHEngine(object): def a_syncdata(self, files): logging.debug('files: %s' % (files)) - self.current_files_skipped_count = 0 - del self.skipped_gfid_list[:] + for f in files: pb = self.syncer.add(f) @@ -260,12 +259,9 @@ class TarSSHEngine(object): st = lstat(se) if isinstance(st, int): # file got unlinked in the interim - self.unlinked_gfids.append(se) + self.unlinked_gfids.add(se) return True - se_list = se.split('/') - self.current_files_skipped_count += 1 - self.skipped_gfid_list.append(se_list[1]) self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) def syncdata_wait(self): @@ -283,8 +279,7 @@ class RsyncEngine(object): def a_syncdata(self, files): logging.debug('files: %s' % (files)) - self.current_files_skipped_count = 0 - del self.skipped_gfid_list[:] + for f in files: logging.debug('candidate for syncing %s' % f) pb = self.syncer.add(f) @@ -299,12 +294,9 @@ class RsyncEngine(object): st = lstat(se) if isinstance(st, int): # file got unlinked in the interim - self.unlinked_gfids.append(se) + self.unlinked_gfids.add(se) return True - se_list = se.split('/') - self.current_files_skipped_count += 1 - self.skipped_gfid_list.append(se_list[1]) self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) def syncdata_wait(self): @@ -364,35 +356,6 @@ class GMasterCommon(object): self.make_xtime_opts(rsc == self.master, opts) return self.xtime_low(rsc, path, **opts) - def get_initial_crawl_data(self): - # while persisting only 'files_syncd' is non-zero, rest of - # the stats are nulls. lets keep it that way in case they - # are needed to be used some day... - default_data = {'files_syncd': 0, - 'files_remaining': 0, - 'bytes_remaining': 0, - 'purges_remaining': 0, - 'total_files_skipped': 0} - if getattr(gconf, 'state_detail_file', None): - try: - with open(gconf.state_detail_file, 'r+') as f: - loaded_data = json.load(f) - diff_data = set(default_data) - set(loaded_data) - if len(diff_data): - for i in diff_data: - loaded_data[i] = default_data[i] - return loaded_data - except IOError: - logging.warn('Creating new gconf.state_detail_file.') - # Create file with initial data - try: - with open(gconf.state_detail_file, 'wb') as f: - json.dump(default_data, f) - return default_data - except: - raise - return default_data - def __init__(self, master, slave): self.master = master self.slave = slave @@ -424,9 +387,7 @@ class GMasterCommon(object): self.volinfo = None self.terminate = False self.sleep_interval = 1 - self.current_files_skipped_count = 0 - self.skipped_gfid_list = [] - self.unlinked_gfids = [] + self.unlinked_gfids = set() def init_keep_alive(cls): """start the keep-alive thread """ @@ -819,7 +780,8 @@ class GMasterChangelogMixin(GMasterCommon): st = lstat(os.path.join(gfid_prefix, failure[0][entry_key])) if not isinstance(st, int): num_failures += 1 - logging.warn('%s FAILED: %s' % (log_prefix, repr(failure))) + logging.error('%s FAILED: %s' % (log_prefix, + repr(failure))) self.status.inc_value("failures", num_failures) @@ -994,17 +956,17 @@ class GMasterChangelogMixin(GMasterCommon): # sync data if datas: self.a_syncdata(datas) + self.datas_in_batch.update(datas) def process(self, changes, done=1): tries = 0 retry = False - self.unlinked_gfids = [] + self.unlinked_gfids = set() self.files_in_batch = 0 + self.datas_in_batch = set() + self.syncer.disable_errorlog() while True: - self.skipped_gfid_list = [] - self.current_files_skipped_count = 0 - # first, fire all changelog transfers in parallel. entry and # metadata are performed synchronously, therefore in serial. # However at the end of each changelog, data is synchronized @@ -1012,12 +974,25 @@ class GMasterChangelogMixin(GMasterCommon): # entries/metadata of that changelog but happens in parallel # with data of other changelogs. - for change in changes: - logging.debug('processing change %s' % change) - self.process_change(change, done, retry) - if not retry: - # number of changelogs processed in the batch - self.turns += 1 + if retry: + if tries == (self.MAX_RETRIES - 1): + # Enable Error logging if it is last retry + self.syncer.enable_errorlog() + + # Remove Unlinked GFIDs from Queue + for unlinked_gfid in self.unlinked_gfids: + self.datas_in_batch.remove(unlinked_gfid) + + # Retry only Sync. Do not retry entry ops + if self.datas_in_batch: + self.a_syncdata(self.datas_in_batch) + else: + for change in changes: + logging.debug('processing change %s' % change) + self.process_change(change, done, retry) + if not retry: + # number of changelogs processed in the batch + self.turns += 1 # Now we wait for all the data transfers fired off in the above # step to complete. Note that this is not ideal either. Ideally @@ -1040,7 +1015,7 @@ class GMasterChangelogMixin(GMasterCommon): # @change is the last changelog (therefore max time for this batch) if self.syncdata_wait(): - self.unlinked_gfids = [] + self.unlinked_gfids = set() if done: xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) @@ -1050,23 +1025,21 @@ class GMasterChangelogMixin(GMasterCommon): # Reset Data counter after sync self.status.dec_value("data", self.files_in_batch) self.files_in_batch = 0 + self.datas_in_batch = set() break # We do not know which changelog transfer failed, retry everything. retry = True tries += 1 if tries == self.MAX_RETRIES: - logging.warn('changelogs %s could not be processed - ' - 'moving on...' % - ' '.join(map(os.path.basename, changes))) - self.status.inc_value("failures", - self.current_files_skipped_count) - logging.warn('SKIPPED GFID = %s' % - ','.join(self.skipped_gfid_list)) + logging.error('changelogs %s could not be processed ' + 'completely - moving on...' % + ' '.join(map(os.path.basename, changes))) # Reset data counter on failure self.status.dec_value("data", self.files_in_batch) self.files_in_batch = 0 + self.datas_in_batch = set() if done: xtl = (int(change.split('.')[-1]) - 1, 0) @@ -1570,6 +1543,7 @@ class Syncer(object): def __init__(self, slave, sync_engine, resilient_errnos=[]): """spawn worker threads""" + self.log_err = False self.slave = slave self.lock = Lock() self.pb = PostBox() @@ -1592,7 +1566,7 @@ class Syncer(object): break time.sleep(0.5) pb.close() - po = self.sync_engine(pb) + po = self.sync_engine(pb, self.log_err) if po.returncode == 0: ret = (True, 0) elif po.returncode in self.errnos_ok: @@ -1609,3 +1583,9 @@ class Syncer(object): return pb except BoxClosedErr: pass + + def enable_errorlog(self): + self.log_err = True + + def disable_errorlog(self): + self.log_err = False diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 19363401e65..ac697eb39ed 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -906,7 +906,7 @@ class SlaveRemote(object): "RePCe major version mismatch: local %s, remote %s" % (exrv, rv)) - def rsync(self, files, *args): + def rsync(self, files, *args, **kw): """invoke rsync""" if not files: raise GsyncdError("no files to sync") @@ -932,12 +932,15 @@ class SlaveRemote(object): po.stdin.write(f) po.stdin.write('\0') - po.stdin.close() + stdout, stderr = po.communicate() + + if kw.get("log_err", False): + for errline in stderr.strip().split("\n")[:-1]: + logging.error("SYNC Error(Rsync): %s" % errline) if gconf.log_rsync_performance: - out = po.stdout.read() rsync_msg = [] - for line in out.split("\n"): + for line in stdout.split("\n"): if line.startswith("Number of files:") or \ line.startswith("Number of regular files transferred:") or \ line.startswith("Total file size:") or \ @@ -949,12 +952,10 @@ class SlaveRemote(object): line.startswith("sent "): rsync_msg.append(line) logging.info("rsync performance: %s" % ", ".join(rsync_msg)) - po.wait() - po.terminate_geterr(fail_on_err=False) return po - def tarssh(self, files, slaveurl): + def tarssh(self, files, slaveurl, log_err=False): """invoke tar+ssh -z (compress) can be use if needed, but omitting it now as it results in weird error (tar+ssh errors out (errcode: 2) @@ -975,15 +976,16 @@ class SlaveRemote(object): for f in files: p0.stdin.write(f) p0.stdin.write('\n') - p0.stdin.close() - # wait() for tar to terminate, collecting any errors, further + p0.stdin.close() + p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits. + # wait for tar to terminate, collecting any errors, further # waiting for transfer to complete - p0.wait() - p0.terminate_geterr(fail_on_err=False) + _, stderr1 = p1.communicate() - p1.wait() - p1.terminate_geterr(fail_on_err=False) + if log_err: + for errline in stderr1.strip().split("\n")[:-1]: + logging.error("SYNC Error(Untar): %s" % errline) return p1 @@ -1045,8 +1047,8 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote): """inhibit the resource beyond""" os.chdir(self.path) - def rsync(self, files): - return sup(self, files, self.path) + def rsync(self, files, log_err=False): + return sup(self, files, self.path, log_err=log_err) class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): @@ -1460,11 +1462,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): else: sup(self, *args) - def rsync(self, files): - return sup(self, files, self.slavedir) + def rsync(self, files, log_err=False): + return sup(self, files, self.slavedir, log_err=log_err) - def tarssh(self, files): - return sup(self, files, self.slavedir) + def tarssh(self, files, log_err=False): + return sup(self, files, self.slavedir, log_err=log_err) class SSH(AbstractUrl, SlaveRemote): @@ -1571,12 +1573,13 @@ class SSH(AbstractUrl, SlaveRemote): self.fd_pair = (i, o) return 'should' - def rsync(self, files): + def rsync(self, files, log_err=False): return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + ["-p", str(gconf.ssh_port)] + gconf.ssh_ctl_args), - *(gconf.rsync_ssh_options.split() + [self.slaveurl])) + *(gconf.rsync_ssh_options.split() + [self.slaveurl]), + log_err=log_err) - def tarssh(self, files): - return sup(self, files, self.slaveurl) + def tarssh(self, files, log_err=False): + return sup(self, files, self.slaveurl, log_err=log_err) |