diff options
author | Ajeet Jha <ajha@redhat.com> | 2013-12-02 12:37:34 +0530 |
---|---|---|
committer | Anand Avati <avati@redhat.com> | 2013-12-12 00:16:03 -0800 |
commit | f999c17da5a5353196e68e7a68af64f91df6b902 (patch) | |
tree | d2e1df4111c492662f1e7eb5cfc6f0236af3818e /geo-replication/syncdaemon/master.py | |
parent | 8f2fc6fb3a63ca87d82b6fa933f94fb1e3283a26 (diff) |
gsyncd / geo-rep: geo-replication fixes
-> "threaded" hybrid crawl.
-> Enabling metatadata synchronization.
-> Handling EINVAL/ESTALE gracefully while syncing metadata.
-> Improvments to changelog crawl code.
-> Initial crawl changelog generation format.
-> No gsyncd restart when checkpoint updated.
-> Fix symlink handling in hybrid crawl.
-> Slave's xtime key is 'stime'.
-> tar+ssh as data synchronization.
-> Instead of 'raise', just log in warning level for xtime missing cases.
-> Fix for JSON object load failure
-> Get new config value after config value reset.
-> Skip already processed changelogs.
-> Saving status of each individual worker thread.
-> GFID fetch on slave for purges.
-> Add tar ssh keys and config options.
-> Fix nlink count when using backend.
-> Include "data" operation for hardlink.
-> Use changelog time prefix as slave's time.
-> Process changelogs in parallel.
Change-Id: I09fcbb2e2e418149a6d8435abd2ac6b2f015bb06
BUG: 1036539
Signed-off-by: Ajeet Jha <ajha@redhat.com>
Reviewed-on: http://review.gluster.org/6404
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Anand Avati <avati@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 799 |
1 files changed, 573 insertions, 226 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 95810a61ee1..721fe18bd18 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -10,15 +10,16 @@ import socket import string import errno from shutil import copyfileobj -from errno import ENOENT, ENODATA, EPIPE, EEXIST +from errno import ENOENT, ENODATA, EPIPE, EEXIST, errorcode from threading import currentThread, Condition, Lock from datetime import datetime +from libcxattr import Xattr from gconf import gconf from tempfile import mkdtemp, NamedTemporaryFile from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \ unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \ - lstat, errno_wrap + lstat, errno_wrap, update_file URXTIME = (-1, 0) @@ -59,7 +60,8 @@ def gmaster_builder(excrawl=None): crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin') sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin - class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin): + syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine + class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin, syncengine): pass return _GMaster @@ -101,14 +103,17 @@ class NormalMixin(object): if not 'default_xtime' in opts: opts['default_xtime'] = URXTIME - def xtime_low(self, server, path, **opts): - xt = server.xtime(path, self.uuid) + def xtime_low(self, rsc, path, **opts): + if rsc == self.master: + xt = rsc.server.xtime(path, self.uuid) + else: + xt = rsc.server.stime(path, self.uuid) if isinstance(xt, int) and xt != ENODATA: return xt if xt == ENODATA or xt < self.volmark: if opts['create']: xt = _xtime_now() - server.aggregated.set_xtime(path, self.uuid, xt) + rsc.server.aggregated.set_xtime(path, self.uuid, xt) else: xt = opts['default_xtime'] return xt @@ -140,7 +145,7 @@ class NormalMixin(object): return xte > xtrd def set_slave_xtime(self, path, mark): - self.slave.server.set_xtime(path, self.uuid, mark) + self.slave.server.set_stime(path, self.uuid, mark) self.slave.server.set_xtime_remote(path, self.uuid, mark) class PartialMixin(NormalMixin): @@ -190,6 +195,65 @@ class PurgeNoopMixin(object): def purge_missing(self, path, names): pass +class TarSSHEngine(object): + """Sync engine that uses tar(1) piped over ssh(1) + for data transfers. Good for lots of small files. + """ + def a_syncdata(self, files): + logging.debug('files: %s' % (files)) + for f in files: + pb = self.syncer.add(f) + def regjob(se, xte, pb): + rv = pb.wait() + if rv[0]: + logging.debug('synced ' + se) + return True + else: + # stat check for file presence + st = lstat(se) + if isinstance(st, int): + return True + logging.warn('tar+ssh: %s [errcode: %d]' % (se, rv[1])) + self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) + + def syncdata_wait(self): + if self.wait(self.FLAT_DIR_HIERARCHY, None): + return True + + def syncdata(self, files): + self.a_syncdata(files) + self.syncdata_wait() + +class RsyncEngine(object): + """Sync engine that uses rsync(1) for data transfers""" + def a_syncdata(self, files): + logging.debug('files: %s' % (files)) + for f in files: + logging.debug('candidate for syncing %s' % f) + pb = self.syncer.add(f) + def regjob(se, xte, pb): + rv = pb.wait() + if rv[0]: + logging.debug('synced ' + se) + return True + else: + if rv[1] in [23, 24]: + # stat to check if the file exist + st = lstat(se) + if isinstance(st, int): + # file got unlinked in the interim + return True + logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) + self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) + + def syncdata_wait(self): + if self.wait(self.FLAT_DIR_HIERARCHY, None): + return True + + def syncdata(self, files): + self.a_syncdata(files) + self.syncdata_wait() + class GMasterCommon(object): """abstract class impementling master role""" @@ -234,7 +298,7 @@ class GMasterCommon(object): else: rsc = self.master self.make_xtime_opts(rsc == self.master, opts) - return self.xtime_low(rsc.server, path, **opts) + return self.xtime_low(rsc, path, **opts) def get_initial_crawl_data(self): # while persisting only 'files_syncd' is non-zero, rest of @@ -243,18 +307,26 @@ class GMasterCommon(object): default_data = {'files_syncd': 0, 'files_remaining': 0, 'bytes_remaining': 0, - 'purges_remaining': 0} + 'purges_remaining': 0, + 'total_files_skipped': 0} if getattr(gconf, 'state_detail_file', None): try: - return json.load(open(gconf.state_detail_file)) - except (IOError, OSError): + with open(gconf.state_detail_file, 'r+') as f: + loaded_data= json.load(f) + diff_data = set(default_data) - set (loaded_data) + if len(diff_data): + for i in diff_data: + loaded_data[i] = default_data[i] + return loaded_data + except (IOError): ex = sys.exc_info()[1] - if ex.errno == ENOENT: - # Create file with initial data + logging.warn ('Creating new gconf.state_detail_file.') + # Create file with initial data + try: with open(gconf.state_detail_file, 'wb') as f: json.dump(default_data, f) return default_data - else: + except: raise return default_data @@ -264,6 +336,8 @@ class GMasterCommon(object): same_dir = os.path.dirname(gconf.state_detail_file) with NamedTemporaryFile(dir=same_dir, delete=False) as tmp: json.dump(self.total_crawl_stats, tmp) + tmp.flush() + os.fsync(tmp.fileno()) os.rename(tmp.name, gconf.state_detail_file) except (IOError, OSError): raise @@ -272,7 +346,13 @@ class GMasterCommon(object): self.master = master self.slave = slave self.jobtab = {} - self.syncer = Syncer(slave) + if boolify(gconf.use_tarssh): + logging.info("using 'tar over ssh' as the sync engine") + self.syncer = Syncer(slave, self.slave.tarssh) + else: + logging.info("using 'rsync' as the sync engine") + # partial transfer (cf. rsync(1)), that's normal + self.syncer = Syncer(slave, self.slave.rsync, [23, 24]) # crawls vs. turns: # - self.crawls is simply the number of crawl() invocations on root # - one turn is a maximal consecutive sequence of crawls so that each @@ -294,6 +374,8 @@ class GMasterCommon(object): self.terminate = False self.sleep_interval = 1 self.checkpoint_thread = None + self.current_files_skipped_count = 0 + self.skipped_gfid_list = [] def init_keep_alive(cls): """start the keep-alive thread """ @@ -336,7 +418,8 @@ class GMasterCommon(object): gconf.configinterface.set('volume_id', self.uuid) if self.volinfo: if self.volinfo['retval']: - raise GsyncdError("master is corrupt") + logging.warn("master cluster's info may not be valid %d" % \ + self.volinfo['retval']) self.start_checkpoint_thread() else: raise GsyncdError("master volinfo unavailable") @@ -349,7 +432,7 @@ class GMasterCommon(object): while not self.terminate: if self.start: logging.debug("... crawl #%d done, took %.6f seconds" % \ - (self.crawls, time.time() - self.start)) + (self.crawls, time.time() - self.start)) self.start = time.time() should_display_info = self.start - self.lastreport['time'] >= 60 if should_display_info: @@ -363,9 +446,20 @@ class GMasterCommon(object): if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds crawl = self.should_crawl() t0 = t1 + self.update_worker_remote_node() if not crawl: + self.update_worker_health("Passive") + # bring up _this_ brick to the cluster stime + # which is min of cluster (but max of the replicas) + brick_stime = self.xtime('.', self.slave) + cluster_stime = self.master.server.aggregated.stime_mnt('.', '.'.join([str(self.uuid), str(gconf.slave_id)])) + logging.debug("Cluster stime: %s | Brick stime: %s" % (repr(cluster_stime), repr(brick_stime))) + if not isinstance(cluster_stime, int): + if brick_stime < cluster_stime: + self.slave.server.set_stime(self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime) time.sleep(5) continue + self.update_worker_health("Active") self.crawl() if oneshot: return @@ -375,7 +469,7 @@ class GMasterCommon(object): def _checkpt_param(cls, chkpt, prm, xtimish=True): """use config backend to lookup a parameter belonging to checkpoint @chkpt""" - cprm = getattr(gconf, 'checkpoint_' + prm, None) + cprm = gconf.configinterface.get_realtime('checkpoint_' + prm) if not cprm: return chkpt_mapped, val = cprm.split(':', 1) @@ -402,17 +496,6 @@ class GMasterCommon(object): ts += '.' + str(tpair[1]) return ts - def get_extra_info(self): - str_info = '\nUptime=%s;FilesSyncd=%d;FilesPending=%d;BytesPending=%d;DeletesPending=%d;' % \ - (self._crawl_time_format(datetime.now() - self.crawl_start), \ - self.total_crawl_stats['files_syncd'], \ - self.total_crawl_stats['files_remaining'], \ - self.total_crawl_stats['bytes_remaining'], \ - self.total_crawl_stats['purges_remaining']) - str_info += '\0' - logging.debug(str_info) - return str_info - def _crawl_time_format(self, crawl_time): # Ex: 5 years, 4 days, 20:23:10 years, days = divmod(crawl_time.days, 365.25) @@ -431,27 +514,49 @@ class GMasterCommon(object): date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2)) return date - def checkpt_service(self, chan, chkpt, tgt): + def checkpt_service(self, chan, chkpt): """checkpoint service loop monitor and verify checkpoint status for @chkpt, and listen for incoming requests for whom we serve a pretty-formatted status report""" - if not chkpt: - # dummy loop for the case when there is no checkpt set - while True: + while True: + chkpt = gconf.configinterface.get_realtime("checkpoint") + if not chkpt: + gconf.configinterface.delete("checkpoint_completed") + gconf.configinterface.delete("checkpoint_target") + # dummy loop for the case when there is no checkpt set select([chan], [], []) conn, _ = chan.accept() - conn.send(self.get_extra_info()) + conn.send('\0') conn.close() - completed = self._checkpt_param(chkpt, 'completed', xtimish=False) - if completed: - completed = tuple(int(x) for x in completed.split('.')) - while True: + continue + + checkpt_tgt = self._checkpt_param(chkpt, 'target') + if not checkpt_tgt: + checkpt_tgt = self.xtime('.') + if isinstance(checkpt_tgt, int): + raise GsyncdError("master root directory is unaccessible (%s)", + os.strerror(checkpt_tgt)) + self._set_checkpt_param(chkpt, 'target', checkpt_tgt) + logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ + (repr(checkpt_tgt), chkpt)) + + # check if the label is 'now' + chkpt_lbl = chkpt + try: + x1,x2 = chkpt.split(':') + if x1 == 'now': + chkpt_lbl = "as of " + self.humantime(x2) + except: + pass + completed = self._checkpt_param(chkpt, 'completed', xtimish=False) + if completed: + completed = tuple(int(x) for x in completed.split('.')) s,_,_ = select([chan], [], [], (not completed) and 5 or None) # either request made and we re-check to not # give back stale data, or we still hunting for completion - if self.native_xtime(tgt) and self.native_xtime(tgt) < self.volmark: + if self.native_xtime(checkpt_tgt) and self.native_xtime(checkpt_tgt) < self.volmark: # indexing has been reset since setting the checkpoint status = "is invalid" else: @@ -459,12 +564,12 @@ class GMasterCommon(object): if isinstance(xtr, int): raise GsyncdError("slave root directory is unaccessible (%s)", os.strerror(xtr)) - ncompleted = self.xtime_geq(xtr, tgt) + ncompleted = self.xtime_geq(xtr, checkpt_tgt) if completed and not ncompleted: # stale data logging.warn("completion time %s for checkpoint %s became stale" % \ (self.humantime(*completed), chkpt)) completed = None - gconf.confdata.delete('checkpoint-completed') + gconf.configinterface.delete('checkpoint_completed') if ncompleted and not completed: # just reaching completion completed = "%.6f" % time.time() self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False) @@ -478,7 +583,7 @@ class GMasterCommon(object): try: conn, _ = chan.accept() try: - conn.send(" | checkpoint %s %s %s" % (chkpt, status, self.get_extra_info())) + conn.send("checkpoint %s is %s\0" % (chkpt_lbl, status)) except: exc = sys.exc_info()[1] if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ @@ -505,18 +610,8 @@ class GMasterCommon(object): pass chan.bind(state_socket) chan.listen(1) - checkpt_tgt = None - if gconf.checkpoint: - checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target') - if not checkpt_tgt: - checkpt_tgt = self.xtime('.') - if isinstance(checkpt_tgt, int): - raise GsyncdError("master root directory is unaccessible (%s)", - os.strerror(checkpt_tgt)) - self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt) - logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ - (repr(checkpt_tgt), gconf.checkpoint)) - t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt)) + chkpt = gconf.configinterface.get_realtime("checkpoint") + t = Thread(target=self.checkpt_service, args=(chan, chkpt)) t.start() self.checkpoint_thread = t @@ -567,15 +662,11 @@ class GMasterChangelogMixin(GMasterCommon): POS_GFID = 0 POS_TYPE = 1 - POS_ENTRY1 = 2 - POS_ENTRY2 = 3 # renames - - _CL_TYPE_DATA_PFX = "D " - _CL_TYPE_METADATA_PFX = "M " - _CL_TYPE_ENTRY_PFX = "E " + POS_ENTRY1 = -1 - TYPE_GFID = [_CL_TYPE_DATA_PFX] # ignoring metadata ops - TYPE_ENTRY = [_CL_TYPE_ENTRY_PFX] + TYPE_META = "M " + TYPE_GFID = "D " + TYPE_ENTRY = "E " # flat directory heirarchy for gfid based access FLAT_DIR_HIERARCHY = '.' @@ -594,39 +685,11 @@ class GMasterChangelogMixin(GMasterCommon): logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile)) return (workdir, logfile) - # update stats from *this* crawl - def update_cumulative_stats(self, files_pending): - self.total_crawl_stats['files_remaining'] = files_pending['count'] - self.total_crawl_stats['bytes_remaining'] = files_pending['bytes'] - self.total_crawl_stats['purges_remaining'] = files_pending['purge'] - - # sync data - def syncdata(self, datas): - logging.debug('datas: %s' % (datas)) - for data in datas: - logging.debug('candidate for syncing %s' % data) - pb = self.syncer.add(data) - def regjob(se, xte, pb): - rv = pb.wait() - if rv[0]: - logging.debug('synced ' + se) - return True - else: - if rv[1] in [23, 24]: - # stat to check if the file exist - st = lstat(se) - if isinstance(st, int): - # file got unlinked in the interim - return True - logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) - self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb) - if self.wait(self.FLAT_DIR_HIERARCHY, None): - return True - def process_change(self, change, done, retry): pfx = gauxpfx() clist = [] entries = [] + meta_gfid = set() datas = set() # basic crawl stats: files and bytes @@ -652,136 +715,351 @@ class GMasterChangelogMixin(GMasterCommon): dct[k] = ed[k] return dct - # regular file update: bytes & count - def _update_reg(entry, size): - if not entry in files_pending['files']: - files_pending['count'] += 1 - files_pending['bytes'] += size - files_pending['files'].append(entry) - # updates for directories, symlinks etc.. - def _update_rest(): + # entry counts (not purges) + def entry_update(): files_pending['count'] += 1 - # entry count - def entry_update(entry, size, mode): - if stat.S_ISREG(mode): - _update_reg(entry, size) - else: - _update_rest() # purge count def purge_update(): files_pending['purge'] += 1 for e in clist: e = e.strip() - et = e[self.IDX_START:self.IDX_END] - ec = e[self.IDX_END:].split(' ') - if et in self.TYPE_ENTRY: + et = e[self.IDX_START:self.IDX_END] # entry type + ec = e[self.IDX_END:].split(' ') # rest of the bits + + if et == self.TYPE_ENTRY: + # extract information according to the type of + # the entry operation. create(), mkdir() and mknod() + # have mode, uid, gid information in the changelog + # itself, so no need to stat()... ty = ec[self.POS_TYPE] + + # PARGFID/BNAME en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1])) + # GFID of the entry gfid = ec[self.POS_GFID] - # definitely need a better way bucketize entry ops + if ty in ['UNLINK', 'RMDIR']: purge_update() entries.append(edct(ty, gfid=gfid, entry=en)) - continue - go = os.path.join(pfx, gfid) - st = lstat(go) - if isinstance(st, int): - if ty == 'RENAME': - entries.append(edct('UNLINK', gfid=gfid, entry=en)) - else: - logging.debug('file %s got purged in the interim' % go) - continue - entry_update(go, st.st_size, st.st_mode) - if ty in ['CREATE', 'MKDIR', 'MKNOD']: - entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) - elif ty == 'LINK': - entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) - elif ty == 'SYMLINK': - rl = errno_wrap(os.readlink, [en], [ENOENT]) - if isinstance(rl, int): - continue - entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) - elif ty == 'RENAME': - e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2])) - entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2, stat=st)) + elif ty in ['CREATE', 'MKDIR', 'MKNOD']: + entry_update() + # stat information present in the changelog itself + entries.append(edct(ty, gfid=gfid, entry=en, mode=int(ec[2]),\ + uid=int(ec[3]), gid=int(ec[4]))) else: - logging.warn('ignoring %s [op %s]' % (gfid, ty)) - elif et in self.TYPE_GFID: - go = os.path.join(pfx, ec[0]) - st = lstat(go) - if isinstance(st, int): - logging.debug('file %s got purged in the interim' % go) - continue - entry_update(go, st.st_size, st.st_mode) - datas.update([go]) + # stat() to get mode and other information + go = os.path.join(pfx, gfid) + st = lstat(go) + if isinstance(st, int): + if ty == 'RENAME': # special hack for renames... + entries.append(edct('UNLINK', gfid=gfid, entry=en)) + else: + logging.debug('file %s got purged in the interim' % go) + continue + + if ty == 'LINK': + entry_update() + entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) + elif ty == 'SYMLINK': + rl = errno_wrap(os.readlink, [en], [ENOENT]) + if isinstance(rl, int): + continue + entry_update() + entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) + elif ty == 'RENAME': + entry_update() + e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) + entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st)) + else: + logging.warn('ignoring %s [op %s]' % (gfid, ty)) + elif et == self.TYPE_GFID: + datas.add(os.path.join(pfx, ec[0])) + elif et == self.TYPE_META: + if ec[1] == 'SETATTR': # only setattr's for now... + meta_gfid.add(os.path.join(pfx, ec[0])) + else: + logging.warn('got invalid changelog type: %s' % (et)) logging.debug('entries: %s' % repr(entries)) if not retry: - self.update_cumulative_stats(files_pending) + self.update_worker_cumilitive_status(files_pending) # sync namespace if (entries): self.slave.server.entry_ops(entries) + # sync metadata + if (meta_gfid): + meta_entries = [] + for go in meta_gfid: + st = lstat(go) + if isinstance(st, int): + logging.debug('file %s got purged in the interim' % go) + continue + meta_entries.append(edct('META', go=go, stat=st)) + if meta_entries: + self.slave.server.meta_ops(meta_entries) # sync data - if self.syncdata(datas): - if done: - self.master.server.changelog_done(change) - return True - - def sync_done(self): - self.total_crawl_stats['files_syncd'] += self.total_crawl_stats['files_remaining'] - self.total_crawl_stats['files_remaining'] = 0 - self.total_crawl_stats['bytes_remaining'] = 0 - self.total_crawl_stats['purges_remaining'] = 0 - self.update_crawl_data() + if datas: + self.a_syncdata(datas) def process(self, changes, done=1): - for change in changes: - tries = 0 - retry = False - while True: - logging.debug('processing change %s' % change) - if self.process_change(change, done, retry): - self.sync_done() - break - retry = True - tries += 1 - if tries == self.MAX_RETRIES: - logging.warn('changelog %s could not be processed - moving on...' % os.path.basename(change)) - self.sync_done() - if done: - self.master.server.changelog_done(change) - break - # it's either entry_ops() or Rsync that failed to do it's - # job. Mostly it's entry_ops() [which currently has a problem - # of failing to create an entry but failing to return an errno] - # Therefore we do not know if it's either Rsync or the freaking - # entry_ops() that failed... so we retry the _whole_ changelog - # again. - # TODO: remove entry retries when it's gets fixed. - logging.warn('incomplete sync, retrying changelog: %s' % change) - time.sleep(0.5) - self.turns += 1 + tries = 0 + retry = False - def upd_stime(self, stime): + while True: + self.skipped_gfid_list = [] + self.current_files_skipped_count = 0 + + # first, fire all changelog transfers in parallel. entry and metadata + # are performed synchronously, therefore in serial. However at the end + # of each changelog, data is synchronized with syncdata_async() - which + # means it is serial w.r.t entries/metadata of that changelog but + # happens in parallel with data of other changelogs. + + for change in changes: + logging.debug('processing change %s' % change) + self.process_change(change, done, retry) + if not retry: + self.turns += 1 # number of changelogs processed in the batch + + # Now we wait for all the data transfers fired off in the above step + # to complete. Note that this is not ideal either. Ideally we want to + # trigger the entry/meta-data transfer of the next batch while waiting + # for the data transfer of the current batch to finish. + + # Note that the reason to wait for the data transfer (vs doing it + # completely in the background and call the changelog_done() + # asynchronously) is because this waiting acts as a "backpressure" + # and prevents a spiraling increase of wait stubs from consuming + # unbounded memory and resources. + + # update the slave's time with the timestamp of the _last_ changelog + # file time suffix. Since, the changelog prefix time is the time when + # the changelog was rolled over, introduce a tolerence of 1 second to + # counter the small delta b/w the marker update and gettimeofday(). + # NOTE: this is only for changelog mode, not xsync. + + # @change is the last changelog (therefore max time for this batch) + if self.syncdata_wait(): + if done: + xtl = (int(change.split('.')[-1]) - 1, 0) + self.upd_stime(xtl) + map(self.master.server.changelog_done, changes) + self.update_worker_files_syncd() + break + + # We do not know which changelog transfer failed, retry everything. + retry = True + tries += 1 + if tries == self.MAX_RETRIES: + logging.warn('changelogs %s could not be processed - moving on...' % \ + ' '.join(map(os.path.basename, changes))) + self.update_worker_total_files_skipped(self.current_files_skipped_count) + logging.warn('SKIPPED GFID = %s' % ','.join(self.skipped_gfid_list)) + self.update_worker_files_syncd() + if done: + xtl = (int(change.split('.')[-1]) - 1, 0) + self.upd_stime(xtl) + map(self.master.server.changelog_done, changes) + break + # it's either entry_ops() or Rsync that failed to do it's + # job. Mostly it's entry_ops() [which currently has a problem + # of failing to create an entry but failing to return an errno] + # Therefore we do not know if it's either Rsync or the freaking + # entry_ops() that failed... so we retry the _whole_ changelog + # again. + # TODO: remove entry retries when it's gets fixed. + logging.warn('incomplete sync, retrying changelogs: %s' % \ + ' '.join(map(os.path.basename, changes))) + time.sleep(0.5) + + def upd_stime(self, stime, path=None): + if not path: + path = self.FLAT_DIR_HIERARCHY if not stime == URXTIME: - self.sendmark(self.FLAT_DIR_HIERARCHY, stime) + self.sendmark(path, stime) + + def get_worker_status_file(self): + file_name = gconf.local_path+'.status' + file_name = file_name.replace("/", "_") + worker_status_file = gconf.georep_session_working_dir+file_name + return worker_status_file + + def update_worker_status(self, key, value): + default_data = {"remote_node":"N/A", + "worker status":"Not Started", + "crawl status":"N/A", + "files_syncd": 0, + "files_remaining": 0, + "bytes_remaining": 0, + "purges_remaining": 0, + "total_files_skipped": 0} + worker_status_file = self.get_worker_status_file() + try: + with open(worker_status_file, 'r+') as f: + loaded_data = json.load(f) + loaded_data[key] = value + os.ftruncate(f.fileno(), 0) + os.lseek(f.fileno(), 0, os.SEEK_SET) + json.dump(loaded_data, f) + f.flush() + os.fsync(f.fileno()) + except (IOError, OSError, ValueError): + logging.info ('Creating new %s' % worker_status_file) + try: + with open(worker_status_file, 'wb') as f: + default_data[key] = value + json.dump(default_data, f) + f.flush() + os.fsync(f.fileno()) + except: + raise + + def update_worker_cumilitive_status(self, files_pending): + default_data = {"remote_node":"N/A", + "worker status":"Not Started", + "crawl status":"N/A", + "files_syncd": 0, + "files_remaining": 0, + "bytes_remaining": 0, + "purges_remaining": 0, + "total_files_skipped": 0} + worker_status_file = self.get_worker_status_file() + try: + with open(worker_status_file, 'r+') as f: + loaded_data = json.load(f) + loaded_data['files_remaining'] = files_pending['count'] + loaded_data['bytes_remaining'] = files_pending['bytes'] + loaded_data['purges_remaining'] = files_pending['purge'] + os.ftruncate(f.fileno(), 0) + os.lseek(f.fileno(), 0, os.SEEK_SET) + json.dump(loaded_data, f) + f.flush() + os.fsync(f.fileno()) + except (IOError, OSError, ValueError): + logging.info ('Creating new %s' % worker_status_file) + try: + with open(worker_status_file, 'wb') as f: + default_data['files_remaining'] = files_pending['count'] + default_data['bytes_remaining'] = files_pending['bytes'] + default_data['purges_remaining'] = files_pending['purge'] + json.dump(default_data, f) + f.flush() + os.fsync(f.fileno()) + except: + raise + + def update_worker_remote_node (self): + node = sys.argv[-1] + node = node.split("@")[-1] + remote_node_ip = node.split(":")[0] + remote_node_vol = node.split(":")[3] + remote_node = remote_node_ip + '::' + remote_node_vol + self.update_worker_status ('remote_node', remote_node) + + def update_worker_health (self, state): + self.update_worker_status ('worker status', state) + + def update_worker_crawl_status (self, state): + self.update_worker_status ('crawl status', state) + + def update_worker_files_syncd (self): + default_data = {"remote_node":"N/A", + "worker status":"Not Started", + "crawl status":"N/A", + "files_syncd": 0, + "files_remaining": 0, + "bytes_remaining": 0, + "purges_remaining": 0, + "total_files_skipped": 0} + worker_status_file = self.get_worker_status_file() + try: + with open(worker_status_file, 'r+') as f: + loaded_data = json.load(f) + loaded_data['files_syncd'] += loaded_data['files_remaining'] + loaded_data['files_remaining'] = 0 + loaded_data['bytes_remaining'] = 0 + loaded_data['purges_remaining'] = 0 + os.ftruncate(f.fileno(), 0) + os.lseek(f.fileno(), 0, os.SEEK_SET) + json.dump(loaded_data, f) + f.flush() + os.fsync(f.fileno()) + except (IOError, OSError, ValueError): + logging.info ('Creating new %s' % worker_status_file) + try: + with open(worker_status_file, 'wb') as f: + json.dump(default_data, f) + f.flush() + os.fsync(f.fileno()) + except: + raise + + def update_worker_files_remaining (self, state): + self.update_worker_status ('files_remaining', state) + + def update_worker_bytes_remaining (self, state): + self.update_worker_status ('bytes_remaining', state) + + def update_worker_purges_remaining (self, state): + self.update_worker_status ('purges_remaining', state) + + def update_worker_total_files_skipped (self, value): + default_data = {"remote_node":"N/A", + "worker status":"Not Started", + "crawl status":"N/A", + "files_syncd": 0, + "files_remaining": 0, + "bytes_remaining": 0, + "purges_remaining": 0, + "total_files_skipped": 0} + worker_status_file = self.get_worker_status_file() + try: + with open(worker_status_file, 'r+') as f: + loaded_data = json.load(f) + loaded_data['total_files_skipped'] = value + loaded_data['files_remaining'] -= value + os.ftruncate(f.fileno(), 0) + os.lseek(f.fileno(), 0, os.SEEK_SET) + json.dump(loaded_data, f) + f.flush() + os.fsync(f.fileno()) + except (IOError, OSError, ValueError): + logging.info ('Creating new %s' % worker_status_file) + try: + with open(worker_status_file, 'wb') as f: + default_data['total_files_skipped'] = value + json.dump(default_data, f) + f.flush() + os.fsync(f.fileno()) + except: + raise def crawl(self): + self.update_worker_crawl_status("Changelog Crawl") changes = [] + # get stime (from the brick) and purge changelogs + # that are _historical_ to that time. + purge_time = self.xtime('.', self.slave) + if isinstance(purge_time, int): + purge_time = None try: self.master.server.changelog_scan() self.crawls += 1 except OSError: self.fallback_xsync() + self.update_worker_crawl_status("Hybrid Crawl") changes = self.master.server.changelog_getchanges() if changes: - xtl = self.xtime(self.FLAT_DIR_HIERARCHY) - if isinstance(xtl, int): - raise GsyncdError('master is corrupt') + if purge_time: + logging.info("slave's time: %s" % repr(purge_time)) + processed = [x for x in changes if int(x.split('.')[-1]) < purge_time[0]] + for pr in processed: + logging.info('skipping already processed change: %s...' % os.path.basename(pr)) + self.master.server.changelog_done(pr) + changes.remove(pr) logging.debug('processing changes %s' % repr(changes)) self.process(changes) - self.upd_stime(xtl) def register(self): (workdir, logfile) = self.setup_working_dir() @@ -799,17 +1077,20 @@ class GMasterChangelogMixin(GMasterCommon): class GMasterXsyncMixin(GMasterChangelogMixin): """ - This crawl needs to be xtime based (as of now it's not. this is beacuse we generate CHANGELOG file during each crawl which is then processed by process_change()). For now it's used as a one-shot initial sync mechanism and only syncs directories, regular - files and symlinks. + files, hardlinks and symlinks. """ + XSYNC_MAX_ENTRIES = 1<<13 + def register(self): + self.counter = 0 + self.comlist = [] self.sleep_interval = 60 self.tempdir = self.setup_working_dir()[0] self.tempdir = os.path.join(self.tempdir, 'xsync') @@ -823,6 +1104,36 @@ class GMasterXsyncMixin(GMasterChangelogMixin): else: raise + def crawl(self): + """ + event dispatcher thread + + this thread dispatches either changelog or synchronizes stime. + additionally terminates itself on recieving a 'finale' event + """ + def Xsyncer(): + self.Xcrawl() + t = Thread(target=Xsyncer) + t.start() + logging.info('starting hybrid crawl...') + self.update_worker_crawl_status("Hybrid Crawl") + while True: + try: + item = self.comlist.pop(0) + if item[0] == 'finale': + logging.info('finished hybrid crawl syncing') + break + elif item[0] == 'xsync': + logging.info('processing xsync changelog %s' % (item[1])) + self.process([item[1]], 0) + elif item[0] == 'stime': + logging.debug('setting slave time: %s' % repr(item[1])) + self.upd_stime(item[1][1], item[1][0]) + else: + logging.warn('unknown tuple in comlist (%s)' % repr(item)) + except IndexError: + time.sleep(1) + def write_entry_change(self, prefix, data=[]): self.fh.write("%s %s\n" % (prefix, ' '.join(data))) @@ -839,24 +1150,61 @@ class GMasterXsyncMixin(GMasterChangelogMixin): def fname(self): return self.xsync_change - def crawl(self, path='.', xtr=None, done=0): - """ generate a CHANGELOG file consumable by process_change """ + def put(self, mark, item): + self.comlist.append((mark, item)) + + def sync_xsync(self, last): + """schedule a processing of changelog""" + self.close() + self.put('xsync', self.fname()) + self.counter = 0 + if not last: + time.sleep(1) # make sure changelogs are 1 second apart + self.open() + + def sync_stime(self, stime=None, last=False): + """schedule a stime synchronization""" + if stime: + self.put('stime', stime) + if last: + self.put('finale', None) + + def sync_done(self, stime=None, last=False): + self.sync_xsync(last) + if stime: + self.sync_stime(stime, last) + + def Xcrawl(self, path='.', xtr_root=None): + """ + generate a CHANGELOG file consumable by process_change. + + slave's xtime (stime) is _cached_ for comparisons across + the filesystem tree, but set after directory synchronization. + """ if path == '.': self.open() self.crawls += 1 - if not xtr: + if not xtr_root: # get the root stime and use it for all comparisons - xtr = self.xtime('.', self.slave) - if isinstance(xtr, int): - if xtr != ENOENT: - raise GsyncdError('slave is corrupt') - xtr = self.minus_infinity + xtr_root = self.xtime('.', self.slave) + if isinstance(xtr_root, int): + if xtr_root != ENOENT: + logging.warn("slave cluster not returning the " \ + "correct xtime for root (%d)" % xtr_root) + xtr_root = self.minus_infinity xtl = self.xtime(path) if isinstance(xtl, int): - raise GsyncdError('master is corrupt') - if xtr == xtl: + logging.warn("master cluster's xtime not found") + xtr = self.xtime(path, self.slave) + if isinstance(xtr, int): + if xtr != ENOENT: + logging.warn("slave cluster not returning the " \ + "correct xtime for %s (%d)" % (path, xtr)) + xtr = self.minus_infinity + xtr = max(xtr, xtr_root) + if not self.need_sync(path, xtl, xtr): if path == '.': - self.close() + self.sync_done((path, xtl), True) return self.xtime_reversion_hook(path, xtl, xtr) logging.debug("entering " + path) @@ -867,43 +1215,42 @@ class GMasterXsyncMixin(GMasterChangelogMixin): for e in dem: bname = e e = os.path.join(path, e) - st = lstat(e) + xte = self.xtime(e) + if isinstance(xte, int): + logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) + continue + if not self.need_sync(e, xte, xtr): + continue + st = self.master.server.lstat(e) if isinstance(st, int): - logging.warn('%s got purged in the interim..' % e) + logging.warn('%s got purged in the interim ...' % e) continue gfid = self.master.server.gfid(e) if isinstance(gfid, int): - logging.warn('skipping entry %s..' % (e)) - continue - xte = self.xtime(e) - if isinstance(xte, int): - raise GsyncdError('master is corrupt') - if not self.need_sync(e, xte, xtr): + logging.warn('skipping entry %s..' % e) continue mo = st.st_mode + self.counter += 1 + if self.counter == self.XSYNC_MAX_ENTRIES: + self.sync_done() if stat.S_ISDIR(mo): - self.write_entry_change("E", [gfid, 'MKDIR', escape(os.path.join(pargfid, bname))]) - self.crawl(e, xtr) + self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))]) + self.Xcrawl(e, xtr_root) + self.sync_done((e, xte), False) elif stat.S_ISLNK(mo): - rl = errno_wrap(os.readlink, [en], [ENOENT]) - if isinstance(rl, int): - continue - self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname)), rl]) - else: + self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))]) + elif stat.S_ISREG(mo): + nlink = st.st_nlink + nlink -= 1 # fixup backend stat link count # if a file has a hardlink, create a Changelog entry as 'LINK' so the slave # side will decide if to create the new entry, or to create link. - if st.st_nlink == 1: - self.write_entry_change("E", [gfid, 'MKNOD', escape(os.path.join(pargfid, bname))]) + if nlink == 1: + self.write_entry_change("E", [gfid, 'MKNOD', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))]) else: self.write_entry_change("E", [gfid, 'LINK', escape(os.path.join(pargfid, bname))]) - if stat.S_ISREG(mo): - self.write_entry_change("D", [gfid]) - + self.write_entry_change("D", [gfid]) if path == '.': - logging.info('processing xsync changelog %s' % self.fname()) - self.close() - self.process([self.fname()], done) - self.upd_stime(xtl) + self.sync_done((path, xtl), True) class BoxClosedErr(Exception): pass @@ -979,12 +1326,13 @@ class Syncer(object): each completed syncjob. """ - def __init__(self, slave): + def __init__(self, slave, sync_engine, resilient_errnos=[]): """spawn worker threads""" self.slave = slave self.lock = Lock() self.pb = PostBox() - self.bytes_synced = 0 + self.sync_engine = sync_engine + self.errnos_ok = resilient_errnos for i in range(int(gconf.sync_jobs)): t = Thread(target=self.syncjob) t.start() @@ -1002,11 +1350,10 @@ class Syncer(object): break time.sleep(0.5) pb.close() - po = self.slave.rsync(pb) + po = self.sync_engine(pb) if po.returncode == 0: ret = (True, 0) - elif po.returncode in (23, 24): - # partial transfer (cf. rsync(1)), that's normal + elif po.returncode in self.errnos_ok: ret = (False, po.returncode) else: po.errfail() |