diff options
author | Ajeet Jha <ajha@redhat.com> | 2013-12-02 12:37:34 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2014-01-27 09:43:19 -0800 |
commit | 30592e7f92515c5df620f300d6a3df6373ac6200 (patch) | |
tree | 851c232b1e3be7f5458cf6c753b92be2f482ad17 | |
parent | c8b9a9e9f82af7e752d4d881313374713701441d (diff) |
gsyncd / geo-rep: geo-replication fixes
-> "threaded" hybrid crawl.
-> Enabling metatadata synchronization.
-> Handling EINVAL/ESTALE gracefully while syncing metadata.
-> Improvments to changelog crawl code.
-> Initial crawl changelog generation format.
-> No gsyncd restart when checkpoint updated.
-> Fix symlink handling in hybrid crawl.
-> Slave's xtime key is 'stime'.
-> tar+ssh as data synchronization.
-> Instead of 'raise', just log in warning level for xtime missing cases.
-> Fix for JSON object load failure
-> Get new config value after config value reset.
-> Skip already processed changelogs.
-> Saving status of each individual worker thread.
-> GFID fetch on slave for purges.
-> Add tar ssh keys and config options.
-> Fix nlink count when using backend.
-> Include "data" operation for hardlink.
-> Use changelog time prefix as slave's time.
-> Process changelogs in parallel.
Change-Id: I09fcbb2e2e418149a6d8435abd2ac6b2f015bb06
BUG: 1036539
Signed-off-by: Ajeet Jha <ajha@redhat.com>
Reviewed-on: http://review.gluster.org/6404
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Anand Avati <avati@redhat.com>
Reviewed-on: http://review.gluster.org/6809
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
-rwxr-xr-x | geo-replication/src/peer_gsec_create.in | 10 | ||||
-rw-r--r-- | geo-replication/syncdaemon/configinterface.py | 41 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 10 | ||||
-rw-r--r-- | geo-replication/syncdaemon/master.py | 799 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 176 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 2 |
6 files changed, 769 insertions, 269 deletions
diff --git a/geo-replication/src/peer_gsec_create.in b/geo-replication/src/peer_gsec_create.in index ef630bd4417..a39fdbfb5f7 100755 --- a/geo-replication/src/peer_gsec_create.in +++ b/geo-replication/src/peer_gsec_create.in @@ -8,5 +8,11 @@ if [ ! -f "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem.pub ]; then ssh-keygen -N '' -f "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem > /dev/null fi -output=`echo command=\"@libexecdir@/glusterfs/gsyncd\" " "``cat "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem.pub` -echo $output +if [ ! -f "$GLUSTERD_WORKING_DIR"/geo-replication/tar_ssh.pem.pub ]; then + \rm -rf "$GLUSTERD_WORKING_DIR"/geo-replication/tar_ssh.pem* + ssh-keygen -N '' -f "$GLUSTERD_WORKING_DIR"/geo-replication/tar_ssh.pem > /dev/null +fi + +output1=`echo command=\"${exec_prefix}/libexec/glusterfs/gsyncd\" " "``cat "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem.pub` +output2=`echo command=\"tar \$\{SSH_ORIGINAL_COMMAND#* \}\" " "``cat "$GLUSTERD_WORKING_DIR"/geo-replication/tar_ssh.pem.pub` +echo -e "$output1\n$output2" diff --git a/geo-replication/syncdaemon/configinterface.py b/geo-replication/syncdaemon/configinterface.py index a326e824681..0f764c47a7e 100644 --- a/geo-replication/syncdaemon/configinterface.py +++ b/geo-replication/syncdaemon/configinterface.py @@ -5,6 +5,10 @@ except ImportError: import configparser as ConfigParser import re from string import Template +import os +import errno +import sys +from stat import ST_DEV, ST_INO, ST_MTIME from syncdutils import escape, unescape, norm, update_file, GsyncdError @@ -65,8 +69,38 @@ class GConffile(object): self.auxdicts = dd self.config = ConfigParser.RawConfigParser() self.config.read(path) + self.dev, self.ino, self.mtime = -1, -1, -1 self._normconfig() + def _load(self): + try: + sres = os.stat(self.path) + self.dev = sres[ST_DEV] + self.ino = sres[ST_INO] + self.mtime = sres[ST_MTIME] + except (OSError, IOError): + if sys.exc_info()[1].errno == errno.ENOENT: + sres = None + + self.config.read(self.path) + self._normconfig() + + def get_realtime(self, opt): + try: + sres = os.stat(self.path) + except (OSError, IOError): + if sys.exc_info()[1].errno == errno.ENOENT: + sres = None + else: + raise + + # compare file system stat with that of our stream file handle + if not sres or sres[ST_DEV] != self.dev or \ + sres[ST_INO] != self.ino or self.mtime != sres[ST_MTIME]: + self._load() + + return self.get(opt, printValue=False) + def section(self, rx=False): """get the section name of the section representing .peers in .config""" peers = self.peers @@ -162,7 +196,7 @@ class GConffile(object): if self.config.has_section(self.section()): update_from_sect(self.section(), MultiDict(dct, *self.auxdicts)) - def get(self, opt=None): + def get(self, opt=None, printValue=True): """print the matching key/value pairs from .config, or if @opt given, the value for @opt (according to the logic described in .update_to) @@ -173,7 +207,10 @@ class GConffile(object): opt = norm(opt) v = d.get(opt) if v: - print(v) + if printValue: + print(v) + else: + return v else: for k, v in d.iteritems(): if k == '__name__': diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 7fcc3165ac9..64c26a5d29d 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -191,6 +191,7 @@ def main_i(): op.add_option('--log-file-mbr', metavar='LOGF', type=str, action='callback', callback=store_abs) op.add_option('--state-file', metavar='STATF', type=str, action='callback', callback=store_abs) op.add_option('--state-detail-file', metavar='STATF', type=str, action='callback', callback=store_abs) + op.add_option('--georep-session-working-dir', metavar='STATF', type=str, action='callback', callback=store_abs) op.add_option('--ignore-deletes', default=False, action='store_true') op.add_option('--isolated-slave', default=False, action='store_true') op.add_option('--use-rsync-xattrs', default=False, action='store_true') @@ -202,6 +203,7 @@ def main_i(): op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='') op.add_option('--local-path', metavar='PATH', help=SUPPRESS_HELP, default='') op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh') + op.add_option('--ssh-command-tar', metavar='CMD', default='ssh') op.add_option('--rsync-command', metavar='CMD', default='rsync') op.add_option('--rsync-options', metavar='OPTS', default='') op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress') @@ -228,6 +230,7 @@ def main_i(): op.add_option('--change-interval', metavar='SEC', type=int, default=3) # working directory for changelog based mechanism op.add_option('--working-dir', metavar='DIR', type=str, action='callback', callback=store_abs) + op.add_option('--use-tarssh', default=False, action='store_true') op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local) # duh. need to specify dest or value will be mapped to None :S @@ -474,8 +477,15 @@ def main_i(): GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf') if confdata.op == 'set': logging.info('checkpoint %s set' % confdata.val) + gcnf.delete('checkpoint_completed') + gcnf.delete('checkpoint_target') elif confdata.op == 'del': logging.info('checkpoint info was reset') + # if it is removing 'checkpoint' then we need + # to remove 'checkpoint_completed' and 'checkpoint_target' too + gcnf.delete('checkpoint_completed') + gcnf.delete('checkpoint_target') + except IOError: if sys.exc_info()[1].errno == ENOENT: # directory of log path is not present, diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 95810a61ee1..721fe18bd18 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -10,15 +10,16 @@ import socket import string import errno from shutil import copyfileobj -from errno import ENOENT, ENODATA, EPIPE, EEXIST +from errno import ENOENT, ENODATA, EPIPE, EEXIST, errorcode from threading import currentThread, Condition, Lock from datetime import datetime +from libcxattr import Xattr from gconf import gconf from tempfile import mkdtemp, NamedTemporaryFile from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \ unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \ - lstat, errno_wrap + lstat, errno_wrap, update_file URXTIME = (-1, 0) @@ -59,7 +60,8 @@ def gmaster_builder(excrawl=None): crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin') sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin - class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin): + syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine + class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin, syncengine): pass return _GMaster @@ -101,14 +103,17 @@ class NormalMixin(object): if not 'default_xtime' in opts: opts['default_xtime'] = URXTIME - def xtime_low(self, server, path, **opts): - xt = server.xtime(path, self.uuid) + def xtime_low(self, rsc, path, **opts): + if rsc == self.master: + xt = rsc.server.xtime(path, self.uuid) + else: + xt = rsc.server.stime(path, self.uuid) if isinstance(xt, int) and xt != ENODATA: return xt if xt == ENODATA or xt < self.volmark: if opts['create']: xt = _xtime_now() - server.aggregated.set_xtime(path, self.uuid, xt) + rsc.server.aggregated.set_xtime(path, self.uuid, xt) else: xt = opts['default_xtime'] return xt @@ -140,7 +145,7 @@ class NormalMixin(object): return xte > xtrd def set_slave_xtime(self, path, mark): - self.slave.server.set_xtime(path, self.uuid, mark) + self.slave.server.set_stime(path, self.uuid, mark) self.slave.server.set_xtime_remote(path, self.uuid, mark) class PartialMixin(NormalMixin): @@ -190,6 +195,65 @@ class PurgeNoopMixin(object): def purge_missing(self, path, names): pass +class TarSSHEngine(object): + """Sync engine that uses tar(1) piped over ssh(1) + for data transfers. Good for lots of small files. + """ + def a_syncdata(self, files): + logging.debug('files: %s' % (files)) + for f in files: + pb = self.syncer.add(f) + def regjob(se, xte, pb): + rv = pb.wait() + if rv[0]: + logging.debug('synced ' + se) + return True + else: + # stat check for file presence + st = lstat(se) + if isinstance(st, int): + return True + logging.warn('tar+ssh: %s [errcode: %d]' % (se, rv[1])) + self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) + + def syncdata_wait(self): + if self.wait(self.FLAT_DIR_HIERARCHY, None): + return True + + def syncdata(self, files): + self.a_syncdata(files) + self.syncdata_wait() + +class RsyncEngine(object): + """Sync engine that uses rsync(1) for data transfers""" + def a_syncdata(self, files): + logging.debug('files: %s' % (files)) + for f in files: + logging.debug('candidate for syncing %s' % f) + pb = self.syncer.add(f) + def regjob(se, xte, pb): + rv = pb.wait() + if rv[0]: + logging.debug('synced ' + se) + return True + else: + if rv[1] in [23, 24]: + # stat to check if the file exist + st = lstat(se) + if isinstance(st, int): + # file got unlinked in the interim + return True + logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) + self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) + + def syncdata_wait(self): + if self.wait(self.FLAT_DIR_HIERARCHY, None): + return True + + def syncdata(self, files): + self.a_syncdata(files) + self.syncdata_wait() + class GMasterCommon(object): """abstract class impementling master role""" @@ -234,7 +298,7 @@ class GMasterCommon(object): else: rsc = self.master self.make_xtime_opts(rsc == self.master, opts) - return self.xtime_low(rsc.server, path, **opts) + return self.xtime_low(rsc, path, **opts) def get_initial_crawl_data(self): # while persisting only 'files_syncd' is non-zero, rest of @@ -243,18 +307,26 @@ class GMasterCommon(object): default_data = {'files_syncd': 0, 'files_remaining': 0, 'bytes_remaining': 0, - 'purges_remaining': 0} + 'purges_remaining': 0, + 'total_files_skipped': 0} if getattr(gconf, 'state_detail_file', None): try: - return json.load(open(gconf.state_detail_file)) - except (IOError, OSError): + with open(gconf.state_detail_file, 'r+') as f: + loaded_data= json.load(f) + diff_data = set(default_data) - set (loaded_data) + if len(diff_data): + for i in diff_data: + loaded_data[i] = default_data[i] + return loaded_data + except (IOError): ex = sys.exc_info()[1] - if ex.errno == ENOENT: - # Create file with initial data + logging.warn ('Creating new gconf.state_detail_file.') + # Create file with initial data + try: with open(gconf.state_detail_file, 'wb') as f: json.dump(default_data, f) return default_data - else: + except: raise return default_data @@ -264,6 +336,8 @@ class GMasterCommon(object): same_dir = os.path.dirname(gconf.state_detail_file) with NamedTemporaryFile(dir=same_dir, delete=False) as tmp: json.dump(self.total_crawl_stats, tmp) + tmp.flush() + os.fsync(tmp.fileno()) os.rename(tmp.name, gconf.state_detail_file) except (IOError, OSError): raise @@ -272,7 +346,13 @@ class GMasterCommon(object): self.master = master self.slave = slave self.jobtab = {} - self.syncer = Syncer(slave) + if boolify(gconf.use_tarssh): + logging.info("using 'tar over ssh' as the sync engine") + self.syncer = Syncer(slave, self.slave.tarssh) + else: + logging.info("using 'rsync' as the sync engine") + # partial transfer (cf. rsync(1)), that's normal + self.syncer = Syncer(slave, self.slave.rsync, [23, 24]) # crawls vs. turns: # - self.crawls is simply the number of crawl() invocations on root # - one turn is a maximal consecutive sequence of crawls so that each @@ -294,6 +374,8 @@ class GMasterCommon(object): self.terminate = False self.sleep_interval = 1 self.checkpoint_thread = None + self.current_files_skipped_count = 0 + self.skipped_gfid_list = [] def init_keep_alive(cls): """start the keep-alive thread """ @@ -336,7 +418,8 @@ class GMasterCommon(object): gconf.configinterface.set('volume_id', self.uuid) if self.volinfo: if self.volinfo['retval']: - raise GsyncdError("master is corrupt") + logging.warn("master cluster's info may not be valid %d" % \ + self.volinfo['retval']) self.start_checkpoint_thread() else: raise GsyncdError("master volinfo unavailable") @@ -349,7 +432,7 @@ class GMasterCommon(object): while not self.terminate: if self.start: logging.debug("... crawl #%d done, took %.6f seconds" % \ - (self.crawls, time.time() - self.start)) + (self.crawls, time.time() - self.start)) self.start = time.time() should_display_info = self.start - self.lastreport['time'] >= 60 if should_display_info: @@ -363,9 +446,20 @@ class GMasterCommon(object): if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds crawl = self.should_crawl() t0 = t1 + self.update_worker_remote_node() if not crawl: + self.update_worker_health("Passive") + # bring up _this_ brick to the cluster stime + # which is min of cluster (but max of the replicas) + brick_stime = self.xtime('.', self.slave) + cluster_stime = self.master.server.aggregated.stime_mnt('.', '.'.join([str(self.uuid), str(gconf.slave_id)])) + logging.debug("Cluster stime: %s | Brick stime: %s" % (repr(cluster_stime), repr(brick_stime))) + if not isinstance(cluster_stime, int): + if brick_stime < cluster_stime: + self.slave.server.set_stime(self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime) time.sleep(5) continue + self.update_worker_health("Active") self.crawl() if oneshot: return @@ -375,7 +469,7 @@ class GMasterCommon(object): def _checkpt_param(cls, chkpt, prm, xtimish=True): """use config backend to lookup a parameter belonging to checkpoint @chkpt""" - cprm = getattr(gconf, 'checkpoint_' + prm, None) + cprm = gconf.configinterface.get_realtime('checkpoint_' + prm) if not cprm: return chkpt_mapped, val = cprm.split(':', 1) @@ -402,17 +496,6 @@ class GMasterCommon(object): ts += '.' + str(tpair[1]) return ts - def get_extra_info(self): - str_info = '\nUptime=%s;FilesSyncd=%d;FilesPending=%d;BytesPending=%d;DeletesPending=%d;' % \ - (self._crawl_time_format(datetime.now() - self.crawl_start), \ - self.total_crawl_stats['files_syncd'], \ - self.total_crawl_stats['files_remaining'], \ - self.total_crawl_stats['bytes_remaining'], \ - self.total_crawl_stats['purges_remaining']) - str_info += '\0' - logging.debug(str_info) - return str_info - def _crawl_time_format(self, crawl_time): # Ex: 5 years, 4 days, 20:23:10 years, days = divmod(crawl_time.days, 365.25) @@ -431,27 +514,49 @@ class GMasterCommon(object): date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2)) return date - def checkpt_service(self, chan, chkpt, tgt): + def checkpt_service(self, chan, chkpt): """checkpoint service loop monitor and verify checkpoint status for @chkpt, and listen for incoming requests for whom we serve a pretty-formatted status report""" - if not chkpt: - # dummy loop for the case when there is no checkpt set - while True: + while True: + chkpt = gconf.configinterface.get_realtime("checkpoint") + if not chkpt: + gconf.configinterface.delete("checkpoint_completed") + gconf.configinterface.delete("checkpoint_target") + # dummy loop for the case when there is no checkpt set select([chan], [], []) conn, _ = chan.accept() - conn.send(self.get_extra_info()) + conn.send('\0') conn.close() - completed = self._checkpt_param(chkpt, 'completed', xtimish=False) - if completed: - completed = tuple(int(x) for x in completed.split('.')) - while True: + continue + + checkpt_tgt = self._checkpt_param(chkpt, 'target') + if not checkpt_tgt: + checkpt_tgt = self.xtime('.') + if isinstance(checkpt_tgt, int): + raise GsyncdError("master root directory is unaccessible (%s)", + os.strerror(checkpt_tgt)) + self._set_checkpt_param(chkpt, 'target', checkpt_tgt) + logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ + (repr(checkpt_tgt), chkpt)) + + # check if the label is 'now' + chkpt_lbl = chkpt + try: + x1,x2 = chkpt.split(':') + if x1 == 'now': + chkpt_lbl = "as of " + self.humantime(x2) + except: + pass + completed = self._checkpt_param(chkpt, 'completed', xtimish=False) + if completed: + completed = tuple(int(x) for x in completed.split('.')) s,_,_ = select([chan], [], [], (not completed) and 5 or None) # either request made and we re-check to not # give back stale data, or we still hunting for completion - if self.native_xtime(tgt) and self.native_xtime(tgt) < self.volmark: + if self.native_xtime(checkpt_tgt) and self.native_xtime(checkpt_tgt) < self.volmark: # indexing has been reset since setting the checkpoint status = "is invalid" else: @@ -459,12 +564,12 @@ class GMasterCommon(object): if isinstance(xtr, int): raise GsyncdError("slave root directory is unaccessible (%s)", os.strerror(xtr)) - ncompleted = self.xtime_geq(xtr, tgt) + ncompleted = self.xtime_geq(xtr, checkpt_tgt) if completed and not ncompleted: # stale data logging.warn("completion time %s for checkpoint %s became stale" % \ (self.humantime(*completed), chkpt)) completed = None - gconf.confdata.delete('checkpoint-completed') + gconf.configinterface.delete('checkpoint_completed') if ncompleted and not completed: # just reaching completion completed = "%.6f" % time.time() self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False) @@ -478,7 +583,7 @@ class GMasterCommon(object): try: conn, _ = chan.accept() try: - conn.send(" | checkpoint %s %s %s" % (chkpt, status, self.get_extra_info())) + conn.send("checkpoint %s is %s\0" % (chkpt_lbl, status)) except: exc = sys.exc_info()[1] if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ @@ -505,18 +610,8 @@ class GMasterCommon(object): pass chan.bind(state_socket) chan.listen(1) - checkpt_tgt = None - if gconf.checkpoint: - checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target') - if not checkpt_tgt: - checkpt_tgt = self.xtime('.') - if isinstance(checkpt_tgt, int): - raise GsyncdError("master root directory is unaccessible (%s)", - os.strerror(checkpt_tgt)) - self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt) - logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ - (repr(checkpt_tgt), gconf.checkpoint)) - t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt)) + chkpt = gconf.configinterface.get_realtime("checkpoint") + t = Thread(target=self.checkpt_service, args=(chan, chkpt)) t.start() self.checkpoint_thread = t @@ -567,15 +662,11 @@ class GMasterChangelogMixin(GMasterCommon): POS_GFID = 0 POS_TYPE = 1 - POS_ENTRY1 = 2 - POS_ENTRY2 = 3 # renames - - _CL_TYPE_DATA_PFX = "D " - _CL_TYPE_METADATA_PFX = "M " - _CL_TYPE_ENTRY_PFX = "E " + POS_ENTRY1 = -1 - TYPE_GFID = [_CL_TYPE_DATA_PFX] # ignoring metadata ops - TYPE_ENTRY = [_CL_TYPE_ENTRY_PFX] + TYPE_META = "M " + TYPE_GFID = "D " + TYPE_ENTRY = "E " # flat directory heirarchy for gfid based access FLAT_DIR_HIERARCHY = '.' @@ -594,39 +685,11 @@ class GMasterChangelogMixin(GMasterCommon): logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile)) return (workdir, logfile) - # update stats from *this* crawl - def update_cumulative_stats(self, files_pending): - self.total_crawl_stats['files_remaining'] = files_pending['count'] - self.total_crawl_stats['bytes_remaining'] = files_pending['bytes'] - self.total_crawl_stats['purges_remaining'] = files_pending['purge'] - - # sync data - def syncdata(self, datas): - logging.debug('datas: %s' % (datas)) - for data in datas: - logging.debug('candidate for syncing %s' % data) - pb = self.syncer.add(data) - def regjob(se, xte, pb): - rv = pb.wait() - if rv[0]: - logging.debug('synced ' + se) - return True - else: - if rv[1] in [23, 24]: - # stat to check if the file exist - st = lstat(se) - if isinstance(st, int): - # file got unlinked in the interim - return True - logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) - self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb) - if self.wait(self.FLAT_DIR_HIERARCHY, None): - return True - def process_change(self, change, done, retry): pfx = gauxpfx() clist = [] entries = [] + meta_gfid = set() datas = set() # basic crawl stats: files and bytes @@ -652,136 +715,351 @@ class GMasterChangelogMixin(GMasterCommon): dct[k] = ed[k] return dct - # regular file update: bytes & count - def _update_reg(entry, size): - if not entry in files_pending['files']: - files_pending['count'] += 1 - files_pending['bytes'] += size - files_pending['files'].append(entry) - # updates for directories, symlinks etc.. - def _update_rest(): + # entry counts (not purges) + def entry_update(): files_pending['count'] += 1 - # entry count - def entry_update(entry, size, mode): - if stat.S_ISREG(mode): - _update_reg(entry, size) - else: - _update_rest() # purge count def purge_update(): files_pending['purge'] += 1 for e in clist: e = e.strip() - et = e[self.IDX_START:self.IDX_END] - ec = e[self.IDX_END:].split(' ') - if et in self.TYPE_ENTRY: + et = e[self.IDX_START:self.IDX_END] # entry type + ec = e[self.IDX_END:].split(' ') # rest of the bits + + if et == self.TYPE_ENTRY: + # extract information according to the type of + # the entry operation. create(), mkdir() and mknod() + # have mode, uid, gid information in the changelog + # itself, so no need to stat()... ty = ec[self.POS_TYPE] + + # PARGFID/BNAME en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1])) + # GFID of the entry gfid = ec[self.POS_GFID] - # definitely need a better way bucketize entry ops + if ty in ['UNLINK', 'RMDIR']: purge_update() entries.append(edct(ty, gfid=gfid, entry=en)) - continue - go = os.path.join(pfx, gfid) - st = lstat(go) - if isinstance(st, int): - if ty == 'RENAME': - entries.append(edct('UNLINK', gfid=gfid, entry=en)) - else: - logging.debug('file %s got purged in the interim' % go) - continue - entry_update(go, st.st_size, st.st_mode) - if ty in ['CREATE', 'MKDIR', 'MKNOD']: - entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) - elif ty == 'LINK': - entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) - elif ty == 'SYMLINK': - rl = errno_wrap(os.readlink, [en], [ENOENT]) - if isinstance(rl, int): - continue - entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) - elif ty == 'RENAME': - e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2])) - entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2, stat=st)) + elif ty in ['CREATE', 'MKDIR', 'MKNOD']: + entry_update() + # stat information present in the changelog itself + entries.append(edct(ty, gfid=gfid, entry=en, mode=int(ec[2]),\ + uid=int(ec[3]), gid=int(ec[4]))) else: - logging.warn('ignoring %s [op %s]' % (gfid, ty)) - elif et in self.TYPE_GFID: - go = os.path.join(pfx, ec[0]) - st = lstat(go) - if isinstance(st, int): - logging.debug('file %s got purged in the interim' % go) - continue - entry_update(go, st.st_size, st.st_mode) - datas.update([go]) + # stat() to get mode and other information + go = os.path.join(pfx, gfid) + st = lstat(go) + if isinstance(st, int): + if ty == 'RENAME': # special hack for renames... + entries.append(edct('UNLINK', gfid=gfid, entry=en)) + else: + logging.debug('file %s got purged in the interim' % go) + continue + + if ty == 'LINK': + entry_update() + entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) + elif ty == 'SYMLINK': + rl = errno_wrap(os.readlink, [en], [ENOENT]) + if isinstance(rl, int): + continue + entry_update() + entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) + elif ty == 'RENAME': + entry_update() + e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) + entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st)) + else: + logging.warn('ignoring %s [op %s]' % (gfid, ty)) + elif et == self.TYPE_GFID: + datas.add(os.path.join(pfx, ec[0])) + elif et == self.TYPE_META: + if ec[1] == 'SETATTR': # only setattr's for now... + meta_gfid.add(os.path.join(pfx, ec[0])) + else: + logging.warn('got invalid changelog type: %s' % (et)) logging.debug('entries: %s' % repr(entries)) if not retry: - self.update_cumulative_stats(files_pending) + self.update_worker_cumilitive_status(files_pending) # sync namespace if (entries): self.slave.server.entry_ops(entries) + # sync metadata + if (meta_gfid): + meta_entries = [] + for go in meta_gfid: + st = lstat(go) + if isinstance(st, int): + logging.debug('file %s got purged in the interim' % go) + continue + meta_entries.append(edct('META', go=go, stat=st)) + if meta_entries: + self.slave.server.meta_ops(meta_entries) # sync data - if self.syncdata(datas): - if done: - self.master.server.changelog_done(change) - return True - - def sync_done(self): - self.total_crawl_stats['files_syncd'] += self.total_crawl_stats['files_remaining'] - self.total_crawl_stats['files_remaining'] = 0 - self.total_crawl_stats['bytes_remaining'] = 0 - self.total_crawl_stats['purges_remaining'] = 0 - self.update_crawl_data() + if datas: + self.a_syncdata(datas) def process(self, changes, done=1): - for change in changes: - tries = 0 - retry = False - while True: - logging.debug('processing change %s' % change) - if self.process_change(change, done, retry): - self.sync_done() - break - retry = True - tries += 1 - if tries == self.MAX_RETRIES: - logging.warn('changelog %s could not be processed - moving on...' % os.path.basename(change)) - self.sync_done() - if done: - self.master.server.changelog_done(change) - break - # it's either entry_ops() or Rsync that failed to do it's - # job. Mostly it's entry_ops() [which currently has a problem - # of failing to create an entry but failing to return an errno] - # Therefore we do not know if it's either Rsync or the freaking - # entry_ops() that failed... so we retry the _whole_ changelog - # again. - # TODO: remove entry retries when it's gets fixed. - logging.warn('incomplete sync, retrying changelog: %s' % change) - time.sleep(0.5) - self.turns += 1 + tries = 0 + retry = False - def upd_stime(self, stime): + while True: + self.skipped_gfid_list = [] + self.current_files_skipped_count = 0 + + # first, fire all changelog transfers in parallel. entry and metadata + # are performed synchronously, therefore in serial. However at the end + # of each changelog, data is synchronized with syncdata_async() - which + # means it is serial w.r.t entries/metadata of that changelog but + # happens in parallel with data of other changelogs. + + for change in changes: + logging.debug('processing change %s' % change) + self.process_change(change, done, retry) + if not retry: + self.turns += 1 # number of changelogs processed in the batch + + # Now we wait for all the data transfers fired off in the above step + # to complete. Note that this is not ideal either. Ideally we want to + # trigger the entry/meta-data transfer of the next batch while waiting + # for the data transfer of the current batch to finish. + + # Note that the reason to wait for the data transfer (vs doing it + # completely in the background and call the changelog_done() + # asynchronously) is because this waiting acts as a "backpressure" + # and prevents a spiraling increase of wait stubs from consuming + # unbounded memory and resources. + + # update the slave's time with the timestamp of the _last_ changelog + # file time suffix. Since, the changelog prefix time is the time when + # the changelog was rolled over, introduce a tolerence of 1 second to + # counter the small delta b/w the marker update and gettimeofday(). + # NOTE: this is only for changelog mode, not xsync. + + # @change is the last changelog (therefore max time for this batch) + if self.syncdata_wait(): + if done: + xtl = (int(change.split('.')[-1]) - 1, 0) + self.upd_stime(xtl) + map(self.master.server.changelog_done, changes) + self.update_worker_files_syncd() + break + + # We do not know which changelog transfer failed, retry everything. + retry = True + tries += 1 + if tries == self.MAX_RETRIES: + logging.warn('changelogs %s could not be processed - moving on...' % \ + ' '.join(map(os.path.basename, changes))) + self.update_worker_total_files_skipped(self.current_files_skipped_count) + logging.warn('SKIPPED GFID = %s' % ','.join(self.skipped_gfid_list)) + self.update_worker_files_syncd() + if done: + xtl = (int(change.split('.')[-1]) - 1, 0) + self.upd_stime(xtl) + map(self.master.server.changelog_done, changes) + break + # it's either entry_ops() or Rsync that failed to do it's + # job. Mostly it's entry_ops() [which currently has a problem + # of failing to create an entry but failing to return an errno] + # Therefore we do not know if it's either Rsync or the freaking + # entry_ops() that failed... so we retry the _whole_ changelog + # again. + # TODO: remove entry retries when it's gets fixed. + logging.warn('incomplete sync, retrying changelogs: %s' % \ + ' '.join(map(os.path.basename, changes))) + time.sleep(0.5) + + def upd_stime(self, stime, path=None): + if not path: + path = self.FLAT_DIR_HIERARCHY if not stime == URXTIME: - self.sendmark(self.FLAT_DIR_HIERARCHY, stime) + self.sendmark(path, stime) + + def get_worker_status_file(self): + file_name = gconf.local_path+'.status' + file_name = file_name.replace("/", "_") + worker_status_file = gconf.georep_session_working_dir+file_name + return worker_status_file + + def update_worker_status(self, key, value): + default_data = {"remote_node":"N/A", + "worker status":"Not Started", + "crawl status":"N/A", + "files_syncd": 0, + "files_remaining": 0, + "bytes_remaining": 0, + "purges_remaining": 0, + "total_files_skipped": 0} + worker_status_file = self.get_worker_status_file() + try: + with open(worker_status_file, 'r+') as f: + loaded_data = json.load(f) + loaded_data[key] = value + os.ftruncate(f.fileno(), 0) + os.lseek(f.fileno(), 0, os.SEEK_SET) + json.dump(loaded_data, f) + f.flush() + os.fsync(f.fileno()) + except (IOError, OSError, ValueError): + logging.info ('Creating new %s' % worker_status_file) + try: + with open(worker_status_file, 'wb') as f: + default_data[key] = value + json.dump(default_data, f) + f.flush() + os.fsync(f.fileno()) + except: + raise + + def update_worker_cumilitive_status(self, files_pending): + default_data = {"remote_node":"N/A", + "worker status":"Not Started", + "crawl status":"N/A", + "files_syncd": 0, + "files_remaining": 0, + "bytes_remaining": 0, + "purges_remaining": 0, + "total_files_skipped": 0} + worker_status_file = self.get_worker_status_file() + try: + with open(worker_status_file, 'r+') as f: + loaded_data = json.load(f) + loaded_data['files_remaining'] = files_pending['count'] + loaded_data['bytes_remaining'] = files_pending['bytes'] + loaded_data['purges_remaining'] = files_pending['purge'] + os.ftruncate(f.fileno(), 0) + os.lseek(f.fileno(), 0, os.SEEK_SET) + json.dump(loaded_data, f) + f.flush() + os.fsync(f.fileno()) + except (IOError, OSError, ValueError): + logging.info ('Creating new %s' % worker_status_file) + try: + with open(worker_status_file, 'wb') as f: + default_data['files_remaining'] = files_pending['count'] + default_data['bytes_remaining'] = files_pending['bytes'] + default_data['purges_remaining'] = files_pending['purge'] + json.dump(default_data, f) + f.flush() + os.fsync(f.fileno()) + except: + raise + + def update_worker_remote_node (self): + node = sys.argv[-1] + node = node.split("@")[-1] + remote_node_ip = node.split(":")[0] + remote_node_vol = node.split(":")[3] + remote_node = remote_node_ip + '::' + remote_node_vol + self.update_worker_status ('remote_node', remote_node) + + def update_worker_health (self, state): + self.update_worker_status ('worker status', state) + + def update_worker_crawl_status (self, state): + self.update_worker_status ('crawl status', state) + + def update_worker_files_syncd (self): + default_data = {"remote_node":"N/A", + "worker status":"Not Started", + "crawl status":"N/A", + "files_syncd": 0, + "files_remaining": 0, + "bytes_remaining": 0, + "purges_remaining": 0, + "total_files_skipped": 0} + worker_status_file = self.get_worker_status_file() + try: + with open(worker_status_file, 'r+') as f: + loaded_data = json.load(f) + loaded_data['files_syncd'] += loaded_data['files_remaining'] + loaded_data['files_remaining'] = 0 + loaded_data['bytes_remaining'] = 0 + loaded_data['purges_remaining'] = 0 + os.ftruncate(f.fileno(), 0) + os.lseek(f.fileno(), 0, os.SEEK_SET) + json.dump(loaded_data, f) + f.flush() + os.fsync(f.fileno()) + except (IOError, OSError, ValueError): + logging.info ('Creating new %s' % worker_status_file) + try: + with open(worker_status_file, 'wb') as f: + json.dump(default_data, f) + f.flush() + os.fsync(f.fileno()) + except: + raise + + def update_worker_files_remaining (self, state): + self.update_worker_status ('files_remaining', state) + + def update_worker_bytes_remaining (self, state): + self.update_worker_status ('bytes_remaining', state) + + def update_worker_purges_remaining (self, state): + self.update_worker_status ('purges_remaining', state) + + def update_worker_total_files_skipped (self, value): + default_data = {"remote_node":"N/A", + "worker status":"Not Started", + "crawl status":"N/A", + "files_syncd": 0, + "files_remaining": 0, + "bytes_remaining": 0, + "purges_remaining": 0, + "total_files_skipped": 0} + worker_status_file = self.get_worker_status_file() + try: + with open(worker_status_file, 'r+') as f: + loaded_data = json.load(f) + loaded_data['total_files_skipped'] = value + loaded_data['files_remaining'] -= value + os.ftruncate(f.fileno(), 0) + os.lseek(f.fileno(), 0, os.SEEK_SET) + json.dump(loaded_data, f) + f.flush() + os.fsync(f.fileno()) + except (IOError, OSError, ValueError): + logging.info ('Creating new %s' % worker_status_file) + try: + with open(worker_status_file, 'wb') as f: + default_data['total_files_skipped'] = value + json.dump(default_data, f) + f.flush() + os.fsync(f.fileno()) + except: + raise def crawl(self): + self.update_worker_crawl_status("Changelog Crawl") changes = [] + # get stime (from the brick) and purge changelogs + # that are _historical_ to that time. + purge_time = self.xtime('.', self.slave) + if isinstance(purge_time, int): + purge_time = None try: self.master.server.changelog_scan() self.crawls += 1 except OSError: self.fallback_xsync() + self.update_worker_crawl_status("Hybrid Crawl") changes = self.master.server.changelog_getchanges() if changes: - xtl = self.xtime(self.FLAT_DIR_HIERARCHY) - if isinstance(xtl, int): - raise GsyncdError('master is corrupt') + if purge_time: + logging.info("slave's time: %s" % repr(purge_time)) + processed = [x for x in changes if int(x.split('.')[-1]) < purge_time[0]] + for pr in processed: + logging.info('skipping already processed change: %s...' % os.path.basename(pr)) + self.master.server.changelog_done(pr) + changes.remove(pr) logging.debug('processing changes %s' % repr(changes)) self.process(changes) - self.upd_stime(xtl) def register(self): (workdir, logfile) = self.setup_working_dir() @@ -799,17 +1077,20 @@ class GMasterChangelogMixin(GMasterCommon): class GMasterXsyncMixin(GMasterChangelogMixin): """ - This crawl needs to be xtime based (as of now it's not. this is beacuse we generate CHANGELOG file during each crawl which is then processed by process_change()). For now it's used as a one-shot initial sync mechanism and only syncs directories, regular - files and symlinks. + files, hardlinks and symlinks. """ + XSYNC_MAX_ENTRIES = 1<<13 + def register(self): + self.counter = 0 + self.comlist = [] self.sleep_interval = 60 self.tempdir = self.setup_working_dir()[0] self.tempdir = os.path.join(self.tempdir, 'xsync') @@ -823,6 +1104,36 @@ class GMasterXsyncMixin(GMasterChangelogMixin): else: raise + def crawl(self): + """ + event dispatcher thread + + this thread dispatches either changelog or synchronizes stime. + additionally terminates itself on recieving a 'finale' event + """ + def Xsyncer(): + self.Xcrawl() + t = Thread(target=Xsyncer) + t.start() + logging.info('starting hybrid crawl...') + self.update_worker_crawl_status("Hybrid Crawl") + while True: + try: + item = self.comlist.pop(0) + if item[0] == 'finale': + logging.info('finished hybrid crawl syncing') + break + elif item[0] == 'xsync': + logging.info('processing xsync changelog %s' % (item[1])) + self.process([item[1]], 0) + elif item[0] == 'stime': + logging.debug('setting slave time: %s' % repr(item[1])) + self.upd_stime(item[1][1], item[1][0]) + else: + logging.warn('unknown tuple in comlist (%s)' % repr(item)) + except IndexError: + time.sleep(1) + def write_entry_change(self, prefix, data=[]): self.fh.write("%s %s\n" % (prefix, ' '.join(data))) @@ -839,24 +1150,61 @@ class GMasterXsyncMixin(GMasterChangelogMixin): def fname(self): return self.xsync_change - def crawl(self, path='.', xtr=None, done=0): - """ generate a CHANGELOG file consumable by process_change """ + def put(self, mark, item): + self.comlist.append((mark, item)) + + def sync_xsync(self, last): + """schedule a processing of changelog""" + self.close() + self.put('xsync', self.fname()) + self.counter = 0 + if not last: + time.sleep(1) # make sure changelogs are 1 second apart + self.open() + + def sync_stime(self, stime=None, last=False): + """schedule a stime synchronization""" + if stime: + self.put('stime', stime) + if last: + self.put('finale', None) + + def sync_done(self, stime=None, last=False): + self.sync_xsync(last) + if stime: + self.sync_stime(stime, last) + + def Xcrawl(self, path='.', xtr_root=None): + """ + generate a CHANGELOG file consumable by process_change. + + slave's xtime (stime) is _cached_ for comparisons across + the filesystem tree, but set after directory synchronization. + """ if path == '.': self.open() self.crawls += 1 - if not xtr: + if not xtr_root: # get the root stime and use it for all comparisons - xtr = self.xtime('.', self.slave) - if isinstance(xtr, int): - if xtr != ENOENT: - raise GsyncdError('slave is corrupt') - xtr = self.minus_infinity + xtr_root = self.xtime('.', self.slave) + if isinstance(xtr_root, int): + if xtr_root != ENOENT: + logging.warn("slave cluster not returning the " \ + "correct xtime for root (%d)" % xtr_root) + xtr_root = self.minus_infinity xtl = self.xtime(path) if isinstance(xtl, int): - raise GsyncdError('master is corrupt') - if xtr == xtl: + logging.warn("master cluster's xtime not found") + xtr = self.xtime(path, self.slave) + if isinstance(xtr, int): + if xtr != ENOENT: + logging.warn("slave cluster not returning the " \ + "correct xtime for %s (%d)" % (path, xtr)) + xtr = self.minus_infinity + xtr = max(xtr, xtr_root) + if not self.need_sync(path, xtl, xtr): if path == '.': - self.close() + self.sync_done((path, xtl), True) return self.xtime_reversion_hook(path, xtl, xtr) logging.debug("entering " + path) @@ -867,43 +1215,42 @@ class GMasterXsyncMixin(GMasterChangelogMixin): for e in dem: bname = e e = os.path.join(path, e) - st = lstat(e) + xte = self.xtime(e) + if isinstance(xte, int): + logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) + continue + if not self.need_sync(e, xte, xtr): + continue + st = self.master.server.lstat(e) if isinstance(st, int): - logging.warn('%s got purged in the interim..' % e) + logging.warn('%s got purged in the interim ...' % e) continue gfid = self.master.server.gfid(e) if isinstance(gfid, int): - logging.warn('skipping entry %s..' % (e)) - continue - xte = self.xtime(e) - if isinstance(xte, int): - raise GsyncdError('master is corrupt') - if not self.need_sync(e, xte, xtr): + logging.warn('skipping entry %s..' % e) continue mo = st.st_mode + self.counter += 1 + if self.counter == self.XSYNC_MAX_ENTRIES: + self.sync_done() if stat.S_ISDIR(mo): - self.write_entry_change("E", [gfid, 'MKDIR', escape(os.path.join(pargfid, bname))]) - self.crawl(e, xtr) + self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))]) + self.Xcrawl(e, xtr_root) + self.sync_done((e, xte), False) elif stat.S_ISLNK(mo): - rl = errno_wrap(os.readlink, [en], [ENOENT]) - if isinstance(rl, int): - continue - self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname)), rl]) - else: + self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))]) + elif stat.S_ISREG(mo): + nlink = st.st_nlink + nlink -= 1 # fixup backend stat link count # if a file has a hardlink, create a Changelog entry as 'LINK' so the slave # side will decide if to create the new entry, or to create link. - if st.st_nlink == 1: - self.write_entry_change("E", [gfid, 'MKNOD', escape(os.path.join(pargfid, bname))]) + if nlink == 1: + self.write_entry_change("E", [gfid, 'MKNOD', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))]) else: self.write_entry_change("E", [gfid, 'LINK', escape(os.path.join(pargfid, bname))]) - if stat.S_ISREG(mo): - self.write_entry_change("D", [gfid]) - + self.write_entry_change("D", [gfid]) if path == '.': - logging.info('processing xsync changelog %s' % self.fname()) - self.close() - self.process([self.fname()], done) - self.upd_stime(xtl) + self.sync_done((path, xtl), True) class BoxClosedErr(Exception): pass @@ -979,12 +1326,13 @@ class Syncer(object): each completed syncjob. """ - def __init__(self, slave): + def __init__(self, slave, sync_engine, resilient_errnos=[]): """spawn worker threads""" self.slave = slave self.lock = Lock() self.pb = PostBox() - self.bytes_synced = 0 + self.sync_engine = sync_engine + self.errnos_ok = resilient_errnos for i in range(int(gconf.sync_jobs)): t = Thread(target=self.syncjob) t.start() @@ -1002,11 +1350,10 @@ class Syncer(object): break time.sleep(0.5) pb.close() - po = self.slave.rsync(pb) + po = self.sync_engine(pb) if po.returncode == 0: ret = (True, 0) - elif po.returncode in (23, 24): - # partial transfer (cf. rsync(1)), that's normal + elif po.returncode in self.errnos_ok: ret = (False, po.returncode) else: po.errfail() diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index faf62f868c7..8deb5114b50 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -265,6 +265,9 @@ class Server(object): FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' + GFID_XATTR = 'trusted.gfid' # for backend gfid fetch, do not use GX_NSPACE_PFX + GFID_FMTSTR = "!" + "B"*16 + local_path = '' @classmethod @@ -305,6 +308,38 @@ class Server(object): raise OSError(ENOTDIR, os.strerror(ENOTDIR)) return os.listdir(path) + + @classmethod + @_pathguard + def lstat(cls, path): + try: + return os.lstat(path) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + return ex.errno + else: + raise + + + @classmethod + @_pathguard + def gfid(cls, path): + try: + buf = Xattr.lgetxattr(path, cls.GFID_XATTR, 16) + m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)])) + return '-'.join(m.groups()) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + return ex.errno + else: + raise + + @classmethod + def gfid_mnt(cls, gfidpath): + return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT]) + @classmethod @_pathguard def purge(cls, path, entries=None): @@ -397,8 +432,42 @@ class Server(object): raise @classmethod - def gfid(cls, gfidpath): - return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT]) + @_pathguard + def stime_mnt(cls, path, uuid): + """query xtime extended attribute + + Return xtime of @path for @uuid as a pair of integers. + "Normal" errors due to non-existent @path or extended attribute + are tolerated and errno is returned in such a case. + """ + + try: + return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8)) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOENT, ENODATA, ENOTDIR): + return ex.errno + else: + raise + + @classmethod + @_pathguard + def stime(cls, path, uuid): + """query xtime extended attribute + + Return xtime of @path for @uuid as a pair of integers. + "Normal" errors due to non-existent @path or extended attribute + are tolerated and errno is returned in such a case. + """ + + try: + return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8)) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOENT, ENODATA, ENOTDIR): + return ex.errno + else: + raise @classmethod def node_uuid(cls, path='.'): @@ -409,21 +478,10 @@ class Server(object): raise @classmethod - def xtime_vec(cls, path, *uuids): - """vectored version of @xtime - - accepts a list of uuids and returns a dictionary - with uuid as key(s) and xtime as value(s) - """ - xt = {} - for uuid in uuids: - xtu = cls.xtime(path, uuid) - if xtu == ENODATA: - xtu = None - if isinstance(xtu, int): - return xtu - xt[uuid] = xtu - return xt + @_pathguard + def set_stime(cls, path, uuid, mark): + """set @mark as stime for @uuid on @path""" + Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), struct.pack('!II', *mark)) @classmethod @_pathguard @@ -444,20 +502,16 @@ class Server(object): Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) @classmethod - def set_xtime_vec(cls, path, mark_dct): - """vectored (or dictered) version of set_xtime - - ignore values that match @ignore - """ - for u,t in mark_dct.items(): - cls.set_xtime(path, u, t) - - @classmethod def entry_ops(cls, entries): pfx = gauxpfx() logging.debug('entries: %s' % repr(entries)) # regular file - def entry_pack_reg(gf, bn, st): + def entry_pack_reg(gf, bn, mo, uid, gid): + blen = len(bn) + return struct.pack(cls._fmt_mknod(blen), + uid, gid, gf, mo, bn, + stat.S_IMODE(mo), 0, umask()) + def entry_pack_reg_stat(gf, bn, st): blen = len(bn) mo = st['mode'] return struct.pack(cls._fmt_mknod(blen), @@ -465,12 +519,10 @@ class Server(object): gf, mo, bn, stat.S_IMODE(mo), 0, umask()) # mkdir - def entry_pack_mkdir(gf, bn, st): + def entry_pack_mkdir(gf, bn, mo, uid, gid): blen = len(bn) - mo = st['mode'] return struct.pack(cls._fmt_mkdir(blen), - st['uid'], st['gid'], - gf, mo, bn, + uid, gid, gf, mo, bn, stat.S_IMODE(mo), umask()) #symlink def entry_pack_symlink(gf, bn, lnk, st): @@ -485,7 +537,7 @@ class Server(object): # to be purged is the GFID gotten from the changelog. # (a stat(changelog_gfid) would also be valid here) # The race here is between the GFID check and the purge. - disk_gfid = cls.gfid(entry) + disk_gfid = cls.gfid_mnt(entry) if isinstance(disk_gfid, int): return if not gfid == disk_gfid: @@ -510,15 +562,15 @@ class Server(object): else: break elif op in ['CREATE', 'MKNOD']: - blob = entry_pack_reg(gfid, bname, e['stat']) + blob = entry_pack_reg(gfid, bname, e['mode'], e['uid'], e['uid']) elif op == 'MKDIR': - blob = entry_pack_mkdir(gfid, bname, e['stat']) + blob = entry_pack_mkdir(gfid, bname, e['mode'], e['uid'], e['uid']) elif op == 'LINK': slink = os.path.join(pfx, gfid) st = lstat(slink) if isinstance(st, int): (pg, bname) = entry2pb(entry) - blob = entry_pack_reg(gfid, bname, e['stat']) + blob = entry_pack_reg_stat(gfid, bname, e['stat']) else: errno_wrap(os.link, [slink, entry], [ENOENT, EEXIST]) elif op == 'SYMLINK': @@ -528,13 +580,24 @@ class Server(object): st = lstat(entry) if isinstance(st, int): (pg, bname) = entry2pb(en) - blob = entry_pack_reg(gfid, bname, e['stat']) + blob = entry_pack_reg_stat(gfid, bname, e['stat']) else: errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST]) if blob: errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [EEXIST], [ENOENT, ESTALE, EINVAL]) @classmethod + def meta_ops(cls, meta_entries): + logging.debug('Meta-entries: %s' % repr(meta_entries)) + for e in meta_entries: + mode = e['stat']['mode'] + uid = e['stat']['uid'] + gid = e['stat']['gid'] + go = e['go'] + errno_wrap(os.chmod, [go, mode], [ENOENT], [ESTALE, EINVAL]) + errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL]) + + @classmethod def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0): Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) @@ -699,6 +762,29 @@ class SlaveRemote(object): return po + def tarssh(self, files, slaveurl): + """invoke tar+ssh + -z (compress) can be use if needed, but ommitting it now + as it results in wierd error (tar+ssh errors out (errcode: 2) + """ + if not files: + raise GsyncdError("no files to sync") + logging.debug("files: " + ", ".join(files)) + (host, rdir) = slaveurl.split(':') + tar_cmd = ["tar", "-cf", "-", "--files-from", "-"] + ssh_cmd = gconf.ssh_command_tar.split() + [host, "tar", "--overwrite", "-xf", "-", "-C", rdir] + p0 = Popen(tar_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE) + for f in files: + p0.stdin.write(f) + p0.stdin.write('\n') + p0.stdin.close() + p0.wait() + + p1.wait() + p1.terminate_geterr(fail_on_err = False) + + return p1 class AbstractUrl(object): """abstract base class for url scheme classes""" @@ -1041,12 +1127,20 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): except ValueError: pass return e + @classmethod + def lstat(cls, e): + """ path based backend stat """ + return super(brickserver, cls).lstat(e) + @classmethod + def gfid(cls, e): + """ path based backend gfid fetch """ + return super(brickserver, cls).gfid(e) if gconf.slave_id: # define {,set_}xtime in slave, thus preempting # the call to remote, so that it takes data from # the local brick - slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server) - slave.server.set_xtime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_xtime(path, uuid + '.' + gconf.slave_id, mark), slave.server) + slave.server.stime = types.MethodType(lambda _self, path, uuid: brickserver.stime(path, uuid + '.' + gconf.slave_id), slave.server) + slave.server.set_stime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_stime(path, uuid + '.' + gconf.slave_id, mark), slave.server) (g1, g2) = self.gmaster_instantiate_tuple(slave) g1.master.server = brickserver g2.master.server = brickserver @@ -1067,6 +1161,9 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def rsync(self, files): return sup(self, files, self.slavedir) + def tarssh(self, files): + return sup(self, files, self.slavedir) + class SSH(AbstractUrl, SlaveRemote): """scheme class for ssh:// urls @@ -1170,3 +1267,6 @@ class SSH(AbstractUrl, SlaveRemote): def rsync(self, files): return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), *(gconf.rsync_ssh_options.split() + [self.slaveurl])) + + def tarssh(self, files): + return sup(self, files, self.slaveurl) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 348eb38c1d0..1b5684c6d0c 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -227,7 +227,7 @@ def log_raise_exception(excont): logging.warn("!!!!!!!!!!!!!") logging.warn('!!! getting "No such file or directory" errors ' "is most likely due to MISCONFIGURATION, please consult " - "http://access.redhat.com/knowledge/docs/en-US/Red_Hat_Storage/2.0/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html") + "https://access.redhat.com/site/documentation/en-US/Red_Hat_Storage/2.1/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html") logging.warn("!!!!!!!!!!!!!") gconf.transport.terminate_geterr() elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED): |