diff options
-rw-r--r-- | geo-replication/syncdaemon/master.py | 156 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 14 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 10 | ||||
-rw-r--r-- | xlators/features/changelog/src/changelog.c | 4 |
4 files changed, 104 insertions, 80 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 12eadb1073a..cf2f7db0706 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -17,7 +17,8 @@ from datetime import datetime from gconf import gconf from tempfile import mkdtemp, NamedTemporaryFile from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \ - unescape, select, gauxpfx, md5hex, selfkill, entry2pb + unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \ + lstat URXTIME = (-1, 0) @@ -380,7 +381,13 @@ class GMasterCommon(object): return self.xtime_low(rsc.server, path, **opts) def get_initial_crawl_data(self): - default_data = {'sync_time': 0, 'files_synced': 0, 'bytes_synced': 0} + # while persisting only 'files_syncd' is non-zero, rest of + # the stats are nulls. lets keep it that way in case they + # are needed to be used some day... + default_data = {'files_syncd': 0, + 'files_remaining': 0, + 'bytes_remaining': 0, + 'purges_remaining': 0} if getattr(gconf, 'state_detail_file', None): try: return json.load(open(gconf.state_detail_file)) @@ -393,7 +400,6 @@ class GMasterCommon(object): return default_data else: raise - return default_data def update_crawl_data(self): @@ -422,10 +428,9 @@ class GMasterCommon(object): self.crawls = 0 self.turns = 0 self.total_turns = int(gconf.turns) + self.crawl_start = datetime.now() self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} - self.crawl_stats = {'sync_time': 0, 'last_synctime': 0, 'crawl_starttime': 0, - 'crawl_time': 0, 'files_synced': 0, 'bytes_synced' :0} - self.total_crawl_stats = self.get_initial_crawl_data() + self.total_crawl_stats = None self.start = None self.change_seen = None # the authoritative (foreign, native) volinfo pair @@ -491,9 +496,8 @@ class GMasterCommon(object): # for a passive gsyncd (ie. in a replicate scenario) # the keepalive thread would keep the connection alive. self.init_keep_alive() + self.total_crawl_stats = self.get_initial_crawl_data() self.lastreport['time'] = time.time() - self.crawl_stats['crawl_starttime'] = datetime.now() - logging.info('crawl interval: %d seconds' % self.sleep_interval) t0 = time.time() crawl = self.should_crawl() @@ -556,17 +560,13 @@ class GMasterCommon(object): return ts def get_extra_info(self): - str_info = "\nFilesSynced=%d;" % (self.crawl_stats['files_synced']) - str_info += "BytesSynced=%s;" % (self.crawl_stats['bytes_synced']) - - self.crawl_stats['crawl_time'] = datetime.now() - self.crawl_stats['crawl_starttime'] - - str_info += "Uptime=%s;" % (self._crawl_time_format(self.crawl_stats['crawl_time'])) - str_info += "SyncTime=%s;" % (self.crawl_stats['sync_time']) - str_info += "TotalSyncTime=%s;" % (self.total_crawl_stats['sync_time']) - str_info += "TotalFilesSynced=%d;" % (self.total_crawl_stats['files_synced']) - str_info += "TotalBytesSynced=%s;" % (self.total_crawl_stats['bytes_synced']) - str_info += "\0" + str_info = '\nUptime=%s;FilesSyncd=%d;FilesPending=%d;BytesPending=%d;DeletesPending=%d;' % \ + (self._crawl_time_format(datetime.now() - self.crawl_start), \ + self.total_crawl_stats['files_syncd'], \ + self.total_crawl_stats['files_remaining'], \ + self.total_crawl_stats['bytes_remaining'], \ + self.total_crawl_stats['purges_remaining']) + str_info += '\0' logging.debug(str_info) return str_info @@ -788,15 +788,11 @@ class GMasterChangelogMixin(GMasterCommon): logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile)) return (workdir, logfile) - def lstat(self, e): - try: - return os.lstat(e) - except (IOError, OSError): - ex = sys.exc_info()[1] - if ex.errno == ENOENT: - return ex.errno - else: - raise + # update stats from *this* crawl + def update_cumulative_stats(self, files_pending): + self.total_crawl_stats['files_remaining'] = files_pending['count'] + self.total_crawl_stats['bytes_remaining'] = files_pending['bytes'] + self.total_crawl_stats['purges_remaining'] = files_pending['purge'] # sync data def syncdata(self, datas): @@ -804,43 +800,31 @@ class GMasterChangelogMixin(GMasterCommon): for data in datas: logging.debug('candidate for syncing %s' % data) pb = self.syncer.add(data) - timeA = datetime.now() def regjob(se, xte, pb): rv = pb.wait() if rv[0]: logging.debug('synced ' + se) - # update stats - timeB = datetime.now() - self.crawl_stats['last_synctime'] = timeB - timeA - self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) - self.crawl_stats['files_synced'] += 1 - self.crawl_stats['bytes_synced'] += self.syncer.bytes_synced - - # cumulative statistics - self.total_crawl_stats['bytes_synced'] += self.syncer.bytes_synced - self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) - self.total_crawl_stats['files_synced'] += 1 return True else: if rv[1] in [23, 24]: # stat to check if the file exist - st = self.lstat(se) + st = lstat(se) if isinstance(st, int): # file got unlinked in the interim return True logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb) if self.wait(self.FLAT_DIR_HIERARCHY, None): - self.update_crawl_data() return True - def process_change(self, change, done): + def process_change(self, change, done, retry): + pfx = gauxpfx() clist = [] entries = [] - purges = set() - links = set() datas = set() - pfx = gauxpfx() + + # basic crawl stats: files and bytes + files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []} try: f = open(change, "r") clist = f.readlines() @@ -861,6 +845,27 @@ class GMasterChangelogMixin(GMasterCommon): else: dct[k] = ed[k] return dct + + # regular file update: bytes & count + def _update_reg(entry, size): + if not entry in files_pending['files']: + files_pending['count'] += 1 + files_pending['bytes'] += size + files_pending['files'].append(entry) + # updates for directories, symlinks etc.. + def _update_rest(): + files_pending['count'] += 1 + + # entry count + def entry_update(entry, size, mode): + if stat.S_ISREG(mode): + _update_reg(entry, size) + else: + _update_rest() + # purge count + def purge_update(): + files_pending['purge'] += 1 + for e in clist: e = e.strip() et = e[self.IDX_START:self.IDX_END] @@ -871,20 +876,19 @@ class GMasterChangelogMixin(GMasterCommon): gfid = ec[self.POS_GFID] # definitely need a better way bucketize entry ops if ty in ['UNLINK', 'RMDIR']: - entries.append(edct(ty, gfid=gfid, entry=en)) - purges.update([os.path.join(pfx, gfid)]) - continue - if not ty == 'RENAME': - go = os.path.join(pfx, gfid) - st = self.lstat(go) - if isinstance(st, int): - logging.debug('file %s got purged in the interim' % go) - continue + purge_update() + entries.append(edct(ty, gfid=gfid, entry=en)) + continue + go = os.path.join(pfx, gfid) + st = lstat(go) + if isinstance(st, int): + logging.debug('file %s got purged in the interim' % go) + continue + entry_update(go, st.st_size, st.st_mode) if ty in ['CREATE', 'MKDIR', 'MKNOD']: entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) elif ty == 'LINK': entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) - links.update([os.path.join(pfx, gfid)]) elif ty == 'SYMLINK': entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=os.readlink(en))) elif ty == 'RENAME': @@ -893,30 +897,41 @@ class GMasterChangelogMixin(GMasterCommon): else: pass elif et in self.TYPE_GFID: - da = os.path.join(pfx, ec[0]) - st = self.lstat(da) + go = os.path.join(pfx, ec[0]) + st = lstat(go) if isinstance(st, int): - logging.debug('file %s got purged in the interim' % da) + logging.debug('file %s got purged in the interim' % go) continue - datas.update([da]) + entry_update(go, st.st_size, st.st_mode) + datas.update([go]) logging.debug('entries: %s' % repr(entries)) + if not retry: + self.update_cumulative_stats(files_pending) # sync namespace if (entries): self.slave.server.entry_ops(entries) # sync data - if self.syncdata(datas - (purges - links)): + if self.syncdata(datas): if done: self.master.server.changelog_done(change) return True + def sync_done(self): + self.total_crawl_stats['files_syncd'] += self.total_crawl_stats['files_remaining'] + self.total_crawl_stats['files_remaining'] = 0 + self.total_crawl_stats['bytes_remaining'] = 0 + self.total_crawl_stats['purges_remaining'] = 0 + self.update_crawl_data() + def process(self, changes, done=1): for change in changes: - times = 0 + retry = False while True: - times += 1 - logging.debug('processing change %s [%d time(s)]' % (change, times)) - if self.process_change(change, done): + logging.debug('processing change %s' % change) + if self.process_change(change, done, retry): + self.sync_done() break + retry = True # it's either entry_ops() or Rsync that failed to do it's # job. Mostly it's entry_ops() [which currently has a problem # of failing to create an entry but failing to return an errno] @@ -1032,7 +1047,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): for e in dem: bname = e e = os.path.join(path, e) - st = self.lstat(e) + st = lstat(e) if isinstance(st, int): logging.warn('%s got purged in the interim..' % e) continue @@ -1191,19 +1206,10 @@ class GMasterXtimeMixin(GMasterCommon): elif stat.S_ISREG(mo): logging.debug("syncing %s ..." % e) pb = self.syncer.add(e) - timeA = datetime.now() def regjob(e, xte, pb): if pb.wait()[0]: logging.debug("synced " + e) self.sendmark_regular(e, xte) - # update stats - timeB = datetime.now() - self.crawl_stats['last_synctime'] = timeB - timeA - self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) - self.crawl_stats['files_synced'] += 1 - self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) - self.total_crawl_stats['files_synced'] += 1 - self.update_crawl_data() return True else: logging.warn("failed to sync " + e) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 1010247aed1..2357b4f914c 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -21,7 +21,7 @@ from repce import RepceServer, RepceClient from master import gmaster_builder import syncdutils from syncdutils import GsyncdError, select, privileged, boolify, funcode -from syncdutils import umask, entry2pb, gauxpfx, errno_wrap +from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) @@ -514,12 +514,20 @@ class Server(object): elif op == 'MKDIR': blob = entry_pack_mkdir(gfid, bname, e['stat']) elif op == 'LINK': - errno_wrap(os.link, [os.path.join(pfx, gfid), entry], [ENOENT, EEXIST]) + st = lstat(entry) + if isinstance(st, int): + blob = entry_pack_reg(gfid, bname, e['stat']) + else: + errno_wrap(os.link, [os.path.join(pfx, gfid), entry], [ENOENT, EEXIST]) elif op == 'SYMLINK': blob = entry_pack_symlink(gfid, bname, e['link'], e['stat']) elif op == 'RENAME': en = e['entry1'] - errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST]) + st = lstat(entry) + if isinstance(st, int): + blob = entry_pack_reg(gfid, bname, e['stat']) + else: + errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST]) if blob: errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [ENOENT, EEXIST]) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index c09b2505ddd..2655dd9835e 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -419,3 +419,13 @@ def errno_wrap(call, arg=[], errnos=[]): if not ex.errno == ESTALE: raise time.sleep(0.5) # retry the call + +def lstat(e): + try: + return os.lstat(e) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + return ex.errno + else: + raise diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c index fe064342911..0e85ee7a0ab 100644 --- a/xlators/features/changelog/src/changelog.c +++ b/xlators/features/changelog/src/changelog.c @@ -203,7 +203,6 @@ changelog_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, dict_t *xdata) { size_t xtra_len = 0; - uuid_t null_uuid = {0,}; changelog_priv_t *priv = NULL; changelog_opt_t *co = NULL; @@ -211,7 +210,8 @@ changelog_rename (call_frame_t *frame, xlator_t *this, CHANGELOG_NOT_ACTIVE_THEN_GOTO (frame, priv, wind); /* 3 == fop + oldloc + newloc */ - CHANGELOG_INIT_NOCHECK (this, frame->local, NULL, null_uuid, 3); + CHANGELOG_INIT_NOCHECK (this, frame->local, + NULL, oldloc->inode->gfid, 3); co = changelog_get_usable_buffer (frame->local); if (!co) |