summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAjeet Jha <ajha@redhat.com>2013-12-02 12:37:34 +0530
committerVijay Bellur <vbellur@redhat.com>2014-01-27 09:43:19 -0800
commit30592e7f92515c5df620f300d6a3df6373ac6200 (patch)
tree851c232b1e3be7f5458cf6c753b92be2f482ad17
parentc8b9a9e9f82af7e752d4d881313374713701441d (diff)
gsyncd / geo-rep: geo-replication fixes
-> "threaded" hybrid crawl. -> Enabling metatadata synchronization. -> Handling EINVAL/ESTALE gracefully while syncing metadata. -> Improvments to changelog crawl code. -> Initial crawl changelog generation format. -> No gsyncd restart when checkpoint updated. -> Fix symlink handling in hybrid crawl. -> Slave's xtime key is 'stime'. -> tar+ssh as data synchronization. -> Instead of 'raise', just log in warning level for xtime missing cases. -> Fix for JSON object load failure -> Get new config value after config value reset. -> Skip already processed changelogs. -> Saving status of each individual worker thread. -> GFID fetch on slave for purges. -> Add tar ssh keys and config options. -> Fix nlink count when using backend. -> Include "data" operation for hardlink. -> Use changelog time prefix as slave's time. -> Process changelogs in parallel. Change-Id: I09fcbb2e2e418149a6d8435abd2ac6b2f015bb06 BUG: 1036539 Signed-off-by: Ajeet Jha <ajha@redhat.com> Reviewed-on: http://review.gluster.org/6404 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Anand Avati <avati@redhat.com> Reviewed-on: http://review.gluster.org/6809 Reviewed-by: Vijay Bellur <vbellur@redhat.com>
-rwxr-xr-xgeo-replication/src/peer_gsec_create.in10
-rw-r--r--geo-replication/syncdaemon/configinterface.py41
-rw-r--r--geo-replication/syncdaemon/gsyncd.py10
-rw-r--r--geo-replication/syncdaemon/master.py799
-rw-r--r--geo-replication/syncdaemon/resource.py176
-rw-r--r--geo-replication/syncdaemon/syncdutils.py2
6 files changed, 769 insertions, 269 deletions
diff --git a/geo-replication/src/peer_gsec_create.in b/geo-replication/src/peer_gsec_create.in
index ef630bd44..a39fdbfb5 100755
--- a/geo-replication/src/peer_gsec_create.in
+++ b/geo-replication/src/peer_gsec_create.in
@@ -8,5 +8,11 @@ if [ ! -f "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem.pub ]; then
ssh-keygen -N '' -f "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem > /dev/null
fi
-output=`echo command=\"@libexecdir@/glusterfs/gsyncd\" " "``cat "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem.pub`
-echo $output
+if [ ! -f "$GLUSTERD_WORKING_DIR"/geo-replication/tar_ssh.pem.pub ]; then
+ \rm -rf "$GLUSTERD_WORKING_DIR"/geo-replication/tar_ssh.pem*
+ ssh-keygen -N '' -f "$GLUSTERD_WORKING_DIR"/geo-replication/tar_ssh.pem > /dev/null
+fi
+
+output1=`echo command=\"${exec_prefix}/libexec/glusterfs/gsyncd\" " "``cat "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem.pub`
+output2=`echo command=\"tar \$\{SSH_ORIGINAL_COMMAND#* \}\" " "``cat "$GLUSTERD_WORKING_DIR"/geo-replication/tar_ssh.pem.pub`
+echo -e "$output1\n$output2"
diff --git a/geo-replication/syncdaemon/configinterface.py b/geo-replication/syncdaemon/configinterface.py
index a326e8246..0f764c47a 100644
--- a/geo-replication/syncdaemon/configinterface.py
+++ b/geo-replication/syncdaemon/configinterface.py
@@ -5,6 +5,10 @@ except ImportError:
import configparser as ConfigParser
import re
from string import Template
+import os
+import errno
+import sys
+from stat import ST_DEV, ST_INO, ST_MTIME
from syncdutils import escape, unescape, norm, update_file, GsyncdError
@@ -65,8 +69,38 @@ class GConffile(object):
self.auxdicts = dd
self.config = ConfigParser.RawConfigParser()
self.config.read(path)
+ self.dev, self.ino, self.mtime = -1, -1, -1
self._normconfig()
+ def _load(self):
+ try:
+ sres = os.stat(self.path)
+ self.dev = sres[ST_DEV]
+ self.ino = sres[ST_INO]
+ self.mtime = sres[ST_MTIME]
+ except (OSError, IOError):
+ if sys.exc_info()[1].errno == errno.ENOENT:
+ sres = None
+
+ self.config.read(self.path)
+ self._normconfig()
+
+ def get_realtime(self, opt):
+ try:
+ sres = os.stat(self.path)
+ except (OSError, IOError):
+ if sys.exc_info()[1].errno == errno.ENOENT:
+ sres = None
+ else:
+ raise
+
+ # compare file system stat with that of our stream file handle
+ if not sres or sres[ST_DEV] != self.dev or \
+ sres[ST_INO] != self.ino or self.mtime != sres[ST_MTIME]:
+ self._load()
+
+ return self.get(opt, printValue=False)
+
def section(self, rx=False):
"""get the section name of the section representing .peers in .config"""
peers = self.peers
@@ -162,7 +196,7 @@ class GConffile(object):
if self.config.has_section(self.section()):
update_from_sect(self.section(), MultiDict(dct, *self.auxdicts))
- def get(self, opt=None):
+ def get(self, opt=None, printValue=True):
"""print the matching key/value pairs from .config,
or if @opt given, the value for @opt (according to the
logic described in .update_to)
@@ -173,7 +207,10 @@ class GConffile(object):
opt = norm(opt)
v = d.get(opt)
if v:
- print(v)
+ if printValue:
+ print(v)
+ else:
+ return v
else:
for k, v in d.iteritems():
if k == '__name__':
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index 7fcc3165a..64c26a5d2 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -191,6 +191,7 @@ def main_i():
op.add_option('--log-file-mbr', metavar='LOGF', type=str, action='callback', callback=store_abs)
op.add_option('--state-file', metavar='STATF', type=str, action='callback', callback=store_abs)
op.add_option('--state-detail-file', metavar='STATF', type=str, action='callback', callback=store_abs)
+ op.add_option('--georep-session-working-dir', metavar='STATF', type=str, action='callback', callback=store_abs)
op.add_option('--ignore-deletes', default=False, action='store_true')
op.add_option('--isolated-slave', default=False, action='store_true')
op.add_option('--use-rsync-xattrs', default=False, action='store_true')
@@ -202,6 +203,7 @@ def main_i():
op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='')
op.add_option('--local-path', metavar='PATH', help=SUPPRESS_HELP, default='')
op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh')
+ op.add_option('--ssh-command-tar', metavar='CMD', default='ssh')
op.add_option('--rsync-command', metavar='CMD', default='rsync')
op.add_option('--rsync-options', metavar='OPTS', default='')
op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress')
@@ -228,6 +230,7 @@ def main_i():
op.add_option('--change-interval', metavar='SEC', type=int, default=3)
# working directory for changelog based mechanism
op.add_option('--working-dir', metavar='DIR', type=str, action='callback', callback=store_abs)
+ op.add_option('--use-tarssh', default=False, action='store_true')
op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local)
# duh. need to specify dest or value will be mapped to None :S
@@ -474,8 +477,15 @@ def main_i():
GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf')
if confdata.op == 'set':
logging.info('checkpoint %s set' % confdata.val)
+ gcnf.delete('checkpoint_completed')
+ gcnf.delete('checkpoint_target')
elif confdata.op == 'del':
logging.info('checkpoint info was reset')
+ # if it is removing 'checkpoint' then we need
+ # to remove 'checkpoint_completed' and 'checkpoint_target' too
+ gcnf.delete('checkpoint_completed')
+ gcnf.delete('checkpoint_target')
+
except IOError:
if sys.exc_info()[1].errno == ENOENT:
# directory of log path is not present,
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 95810a61e..721fe18bd 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -10,15 +10,16 @@ import socket
import string
import errno
from shutil import copyfileobj
-from errno import ENOENT, ENODATA, EPIPE, EEXIST
+from errno import ENOENT, ENODATA, EPIPE, EEXIST, errorcode
from threading import currentThread, Condition, Lock
from datetime import datetime
+from libcxattr import Xattr
from gconf import gconf
from tempfile import mkdtemp, NamedTemporaryFile
from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \
unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \
- lstat, errno_wrap
+ lstat, errno_wrap, update_file
URXTIME = (-1, 0)
@@ -59,7 +60,8 @@ def gmaster_builder(excrawl=None):
crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin
purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin
- class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin):
+ syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine
+ class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin, syncengine):
pass
return _GMaster
@@ -101,14 +103,17 @@ class NormalMixin(object):
if not 'default_xtime' in opts:
opts['default_xtime'] = URXTIME
- def xtime_low(self, server, path, **opts):
- xt = server.xtime(path, self.uuid)
+ def xtime_low(self, rsc, path, **opts):
+ if rsc == self.master:
+ xt = rsc.server.xtime(path, self.uuid)
+ else:
+ xt = rsc.server.stime(path, self.uuid)
if isinstance(xt, int) and xt != ENODATA:
return xt
if xt == ENODATA or xt < self.volmark:
if opts['create']:
xt = _xtime_now()
- server.aggregated.set_xtime(path, self.uuid, xt)
+ rsc.server.aggregated.set_xtime(path, self.uuid, xt)
else:
xt = opts['default_xtime']
return xt
@@ -140,7 +145,7 @@ class NormalMixin(object):
return xte > xtrd
def set_slave_xtime(self, path, mark):
- self.slave.server.set_xtime(path, self.uuid, mark)
+ self.slave.server.set_stime(path, self.uuid, mark)
self.slave.server.set_xtime_remote(path, self.uuid, mark)
class PartialMixin(NormalMixin):
@@ -190,6 +195,65 @@ class PurgeNoopMixin(object):
def purge_missing(self, path, names):
pass
+class TarSSHEngine(object):
+ """Sync engine that uses tar(1) piped over ssh(1)
+ for data transfers. Good for lots of small files.
+ """
+ def a_syncdata(self, files):
+ logging.debug('files: %s' % (files))
+ for f in files:
+ pb = self.syncer.add(f)
+ def regjob(se, xte, pb):
+ rv = pb.wait()
+ if rv[0]:
+ logging.debug('synced ' + se)
+ return True
+ else:
+ # stat check for file presence
+ st = lstat(se)
+ if isinstance(st, int):
+ return True
+ logging.warn('tar+ssh: %s [errcode: %d]' % (se, rv[1]))
+ self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
+
+ def syncdata_wait(self):
+ if self.wait(self.FLAT_DIR_HIERARCHY, None):
+ return True
+
+ def syncdata(self, files):
+ self.a_syncdata(files)
+ self.syncdata_wait()
+
+class RsyncEngine(object):
+ """Sync engine that uses rsync(1) for data transfers"""
+ def a_syncdata(self, files):
+ logging.debug('files: %s' % (files))
+ for f in files:
+ logging.debug('candidate for syncing %s' % f)
+ pb = self.syncer.add(f)
+ def regjob(se, xte, pb):
+ rv = pb.wait()
+ if rv[0]:
+ logging.debug('synced ' + se)
+ return True
+ else:
+ if rv[1] in [23, 24]:
+ # stat to check if the file exist
+ st = lstat(se)
+ if isinstance(st, int):
+ # file got unlinked in the interim
+ return True
+ logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))
+ self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
+
+ def syncdata_wait(self):
+ if self.wait(self.FLAT_DIR_HIERARCHY, None):
+ return True
+
+ def syncdata(self, files):
+ self.a_syncdata(files)
+ self.syncdata_wait()
+
class GMasterCommon(object):
"""abstract class impementling master role"""
@@ -234,7 +298,7 @@ class GMasterCommon(object):
else:
rsc = self.master
self.make_xtime_opts(rsc == self.master, opts)
- return self.xtime_low(rsc.server, path, **opts)
+ return self.xtime_low(rsc, path, **opts)
def get_initial_crawl_data(self):
# while persisting only 'files_syncd' is non-zero, rest of
@@ -243,18 +307,26 @@ class GMasterCommon(object):
default_data = {'files_syncd': 0,
'files_remaining': 0,
'bytes_remaining': 0,
- 'purges_remaining': 0}
+ 'purges_remaining': 0,
+ 'total_files_skipped': 0}
if getattr(gconf, 'state_detail_file', None):
try:
- return json.load(open(gconf.state_detail_file))
- except (IOError, OSError):
+ with open(gconf.state_detail_file, 'r+') as f:
+ loaded_data= json.load(f)
+ diff_data = set(default_data) - set (loaded_data)
+ if len(diff_data):
+ for i in diff_data:
+ loaded_data[i] = default_data[i]
+ return loaded_data
+ except (IOError):
ex = sys.exc_info()[1]
- if ex.errno == ENOENT:
- # Create file with initial data
+ logging.warn ('Creating new gconf.state_detail_file.')
+ # Create file with initial data
+ try:
with open(gconf.state_detail_file, 'wb') as f:
json.dump(default_data, f)
return default_data
- else:
+ except:
raise
return default_data
@@ -264,6 +336,8 @@ class GMasterCommon(object):
same_dir = os.path.dirname(gconf.state_detail_file)
with NamedTemporaryFile(dir=same_dir, delete=False) as tmp:
json.dump(self.total_crawl_stats, tmp)
+ tmp.flush()
+ os.fsync(tmp.fileno())
os.rename(tmp.name, gconf.state_detail_file)
except (IOError, OSError):
raise
@@ -272,7 +346,13 @@ class GMasterCommon(object):
self.master = master
self.slave = slave
self.jobtab = {}
- self.syncer = Syncer(slave)
+ if boolify(gconf.use_tarssh):
+ logging.info("using 'tar over ssh' as the sync engine")
+ self.syncer = Syncer(slave, self.slave.tarssh)
+ else:
+ logging.info("using 'rsync' as the sync engine")
+ # partial transfer (cf. rsync(1)), that's normal
+ self.syncer = Syncer(slave, self.slave.rsync, [23, 24])
# crawls vs. turns:
# - self.crawls is simply the number of crawl() invocations on root
# - one turn is a maximal consecutive sequence of crawls so that each
@@ -294,6 +374,8 @@ class GMasterCommon(object):
self.terminate = False
self.sleep_interval = 1
self.checkpoint_thread = None
+ self.current_files_skipped_count = 0
+ self.skipped_gfid_list = []
def init_keep_alive(cls):
"""start the keep-alive thread """
@@ -336,7 +418,8 @@ class GMasterCommon(object):
gconf.configinterface.set('volume_id', self.uuid)
if self.volinfo:
if self.volinfo['retval']:
- raise GsyncdError("master is corrupt")
+ logging.warn("master cluster's info may not be valid %d" % \
+ self.volinfo['retval'])
self.start_checkpoint_thread()
else:
raise GsyncdError("master volinfo unavailable")
@@ -349,7 +432,7 @@ class GMasterCommon(object):
while not self.terminate:
if self.start:
logging.debug("... crawl #%d done, took %.6f seconds" % \
- (self.crawls, time.time() - self.start))
+ (self.crawls, time.time() - self.start))
self.start = time.time()
should_display_info = self.start - self.lastreport['time'] >= 60
if should_display_info:
@@ -363,9 +446,20 @@ class GMasterCommon(object):
if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds
crawl = self.should_crawl()
t0 = t1
+ self.update_worker_remote_node()
if not crawl:
+ self.update_worker_health("Passive")
+ # bring up _this_ brick to the cluster stime
+ # which is min of cluster (but max of the replicas)
+ brick_stime = self.xtime('.', self.slave)
+ cluster_stime = self.master.server.aggregated.stime_mnt('.', '.'.join([str(self.uuid), str(gconf.slave_id)]))
+ logging.debug("Cluster stime: %s | Brick stime: %s" % (repr(cluster_stime), repr(brick_stime)))
+ if not isinstance(cluster_stime, int):
+ if brick_stime < cluster_stime:
+ self.slave.server.set_stime(self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
time.sleep(5)
continue
+ self.update_worker_health("Active")
self.crawl()
if oneshot:
return
@@ -375,7 +469,7 @@ class GMasterCommon(object):
def _checkpt_param(cls, chkpt, prm, xtimish=True):
"""use config backend to lookup a parameter belonging to
checkpoint @chkpt"""
- cprm = getattr(gconf, 'checkpoint_' + prm, None)
+ cprm = gconf.configinterface.get_realtime('checkpoint_' + prm)
if not cprm:
return
chkpt_mapped, val = cprm.split(':', 1)
@@ -402,17 +496,6 @@ class GMasterCommon(object):
ts += '.' + str(tpair[1])
return ts
- def get_extra_info(self):
- str_info = '\nUptime=%s;FilesSyncd=%d;FilesPending=%d;BytesPending=%d;DeletesPending=%d;' % \
- (self._crawl_time_format(datetime.now() - self.crawl_start), \
- self.total_crawl_stats['files_syncd'], \
- self.total_crawl_stats['files_remaining'], \
- self.total_crawl_stats['bytes_remaining'], \
- self.total_crawl_stats['purges_remaining'])
- str_info += '\0'
- logging.debug(str_info)
- return str_info
-
def _crawl_time_format(self, crawl_time):
# Ex: 5 years, 4 days, 20:23:10
years, days = divmod(crawl_time.days, 365.25)
@@ -431,27 +514,49 @@ class GMasterCommon(object):
date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2))
return date
- def checkpt_service(self, chan, chkpt, tgt):
+ def checkpt_service(self, chan, chkpt):
"""checkpoint service loop
monitor and verify checkpoint status for @chkpt, and listen
for incoming requests for whom we serve a pretty-formatted
status report"""
- if not chkpt:
- # dummy loop for the case when there is no checkpt set
- while True:
+ while True:
+ chkpt = gconf.configinterface.get_realtime("checkpoint")
+ if not chkpt:
+ gconf.configinterface.delete("checkpoint_completed")
+ gconf.configinterface.delete("checkpoint_target")
+ # dummy loop for the case when there is no checkpt set
select([chan], [], [])
conn, _ = chan.accept()
- conn.send(self.get_extra_info())
+ conn.send('\0')
conn.close()
- completed = self._checkpt_param(chkpt, 'completed', xtimish=False)
- if completed:
- completed = tuple(int(x) for x in completed.split('.'))
- while True:
+ continue
+
+ checkpt_tgt = self._checkpt_param(chkpt, 'target')
+ if not checkpt_tgt:
+ checkpt_tgt = self.xtime('.')
+ if isinstance(checkpt_tgt, int):
+ raise GsyncdError("master root directory is unaccessible (%s)",
+ os.strerror(checkpt_tgt))
+ self._set_checkpt_param(chkpt, 'target', checkpt_tgt)
+ logging.debug("checkpoint target %s has been determined for checkpoint %s" % \
+ (repr(checkpt_tgt), chkpt))
+
+ # check if the label is 'now'
+ chkpt_lbl = chkpt
+ try:
+ x1,x2 = chkpt.split(':')
+ if x1 == 'now':
+ chkpt_lbl = "as of " + self.humantime(x2)
+ except:
+ pass
+ completed = self._checkpt_param(chkpt, 'completed', xtimish=False)
+ if completed:
+ completed = tuple(int(x) for x in completed.split('.'))
s,_,_ = select([chan], [], [], (not completed) and 5 or None)
# either request made and we re-check to not
# give back stale data, or we still hunting for completion
- if self.native_xtime(tgt) and self.native_xtime(tgt) < self.volmark:
+ if self.native_xtime(checkpt_tgt) and self.native_xtime(checkpt_tgt) < self.volmark:
# indexing has been reset since setting the checkpoint
status = "is invalid"
else:
@@ -459,12 +564,12 @@ class GMasterCommon(object):
if isinstance(xtr, int):
raise GsyncdError("slave root directory is unaccessible (%s)",
os.strerror(xtr))
- ncompleted = self.xtime_geq(xtr, tgt)
+ ncompleted = self.xtime_geq(xtr, checkpt_tgt)
if completed and not ncompleted: # stale data
logging.warn("completion time %s for checkpoint %s became stale" % \
(self.humantime(*completed), chkpt))
completed = None
- gconf.confdata.delete('checkpoint-completed')
+ gconf.configinterface.delete('checkpoint_completed')
if ncompleted and not completed: # just reaching completion
completed = "%.6f" % time.time()
self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False)
@@ -478,7 +583,7 @@ class GMasterCommon(object):
try:
conn, _ = chan.accept()
try:
- conn.send(" | checkpoint %s %s %s" % (chkpt, status, self.get_extra_info()))
+ conn.send("checkpoint %s is %s\0" % (chkpt_lbl, status))
except:
exc = sys.exc_info()[1]
if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \
@@ -505,18 +610,8 @@ class GMasterCommon(object):
pass
chan.bind(state_socket)
chan.listen(1)
- checkpt_tgt = None
- if gconf.checkpoint:
- checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target')
- if not checkpt_tgt:
- checkpt_tgt = self.xtime('.')
- if isinstance(checkpt_tgt, int):
- raise GsyncdError("master root directory is unaccessible (%s)",
- os.strerror(checkpt_tgt))
- self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt)
- logging.debug("checkpoint target %s has been determined for checkpoint %s" % \
- (repr(checkpt_tgt), gconf.checkpoint))
- t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt))
+ chkpt = gconf.configinterface.get_realtime("checkpoint")
+ t = Thread(target=self.checkpt_service, args=(chan, chkpt))
t.start()
self.checkpoint_thread = t
@@ -567,15 +662,11 @@ class GMasterChangelogMixin(GMasterCommon):
POS_GFID = 0
POS_TYPE = 1
- POS_ENTRY1 = 2
- POS_ENTRY2 = 3 # renames
-
- _CL_TYPE_DATA_PFX = "D "
- _CL_TYPE_METADATA_PFX = "M "
- _CL_TYPE_ENTRY_PFX = "E "
+ POS_ENTRY1 = -1
- TYPE_GFID = [_CL_TYPE_DATA_PFX] # ignoring metadata ops
- TYPE_ENTRY = [_CL_TYPE_ENTRY_PFX]
+ TYPE_META = "M "
+ TYPE_GFID = "D "
+ TYPE_ENTRY = "E "
# flat directory heirarchy for gfid based access
FLAT_DIR_HIERARCHY = '.'
@@ -594,39 +685,11 @@ class GMasterChangelogMixin(GMasterCommon):
logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile))
return (workdir, logfile)
- # update stats from *this* crawl
- def update_cumulative_stats(self, files_pending):
- self.total_crawl_stats['files_remaining'] = files_pending['count']
- self.total_crawl_stats['bytes_remaining'] = files_pending['bytes']
- self.total_crawl_stats['purges_remaining'] = files_pending['purge']
-
- # sync data
- def syncdata(self, datas):
- logging.debug('datas: %s' % (datas))
- for data in datas:
- logging.debug('candidate for syncing %s' % data)
- pb = self.syncer.add(data)
- def regjob(se, xte, pb):
- rv = pb.wait()
- if rv[0]:
- logging.debug('synced ' + se)
- return True
- else:
- if rv[1] in [23, 24]:
- # stat to check if the file exist
- st = lstat(se)
- if isinstance(st, int):
- # file got unlinked in the interim
- return True
- logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))
- self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb)
- if self.wait(self.FLAT_DIR_HIERARCHY, None):
- return True
-
def process_change(self, change, done, retry):
pfx = gauxpfx()
clist = []
entries = []
+ meta_gfid = set()
datas = set()
# basic crawl stats: files and bytes
@@ -652,136 +715,351 @@ class GMasterChangelogMixin(GMasterCommon):
dct[k] = ed[k]
return dct
- # regular file update: bytes & count
- def _update_reg(entry, size):
- if not entry in files_pending['files']:
- files_pending['count'] += 1
- files_pending['bytes'] += size
- files_pending['files'].append(entry)
- # updates for directories, symlinks etc..
- def _update_rest():
+ # entry counts (not purges)
+ def entry_update():
files_pending['count'] += 1
- # entry count
- def entry_update(entry, size, mode):
- if stat.S_ISREG(mode):
- _update_reg(entry, size)
- else:
- _update_rest()
# purge count
def purge_update():
files_pending['purge'] += 1
for e in clist:
e = e.strip()
- et = e[self.IDX_START:self.IDX_END]
- ec = e[self.IDX_END:].split(' ')
- if et in self.TYPE_ENTRY:
+ et = e[self.IDX_START:self.IDX_END] # entry type
+ ec = e[self.IDX_END:].split(' ') # rest of the bits
+
+ if et == self.TYPE_ENTRY:
+ # extract information according to the type of
+ # the entry operation. create(), mkdir() and mknod()
+ # have mode, uid, gid information in the changelog
+ # itself, so no need to stat()...
ty = ec[self.POS_TYPE]
+
+ # PARGFID/BNAME
en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1]))
+ # GFID of the entry
gfid = ec[self.POS_GFID]
- # definitely need a better way bucketize entry ops
+
if ty in ['UNLINK', 'RMDIR']:
purge_update()
entries.append(edct(ty, gfid=gfid, entry=en))
- continue
- go = os.path.join(pfx, gfid)
- st = lstat(go)
- if isinstance(st, int):
- if ty == 'RENAME':
- entries.append(edct('UNLINK', gfid=gfid, entry=en))
- else:
- logging.debug('file %s got purged in the interim' % go)
- continue
- entry_update(go, st.st_size, st.st_mode)
- if ty in ['CREATE', 'MKDIR', 'MKNOD']:
- entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
- elif ty == 'LINK':
- entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
- elif ty == 'SYMLINK':
- rl = errno_wrap(os.readlink, [en], [ENOENT])
- if isinstance(rl, int):
- continue
- entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
- elif ty == 'RENAME':
- e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2]))
- entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2, stat=st))
+ elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
+ entry_update()
+ # stat information present in the changelog itself
+ entries.append(edct(ty, gfid=gfid, entry=en, mode=int(ec[2]),\
+ uid=int(ec[3]), gid=int(ec[4])))
else:
- logging.warn('ignoring %s [op %s]' % (gfid, ty))
- elif et in self.TYPE_GFID:
- go = os.path.join(pfx, ec[0])
- st = lstat(go)
- if isinstance(st, int):
- logging.debug('file %s got purged in the interim' % go)
- continue
- entry_update(go, st.st_size, st.st_mode)
- datas.update([go])
+ # stat() to get mode and other information
+ go = os.path.join(pfx, gfid)
+ st = lstat(go)
+ if isinstance(st, int):
+ if ty == 'RENAME': # special hack for renames...
+ entries.append(edct('UNLINK', gfid=gfid, entry=en))
+ else:
+ logging.debug('file %s got purged in the interim' % go)
+ continue
+
+ if ty == 'LINK':
+ entry_update()
+ entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
+ elif ty == 'SYMLINK':
+ rl = errno_wrap(os.readlink, [en], [ENOENT])
+ if isinstance(rl, int):
+ continue
+ entry_update()
+ entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
+ elif ty == 'RENAME':
+ entry_update()
+ e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
+ entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st))
+ else:
+ logging.warn('ignoring %s [op %s]' % (gfid, ty))
+ elif et == self.TYPE_GFID:
+ datas.add(os.path.join(pfx, ec[0]))
+ elif et == self.TYPE_META:
+ if ec[1] == 'SETATTR': # only setattr's for now...
+ meta_gfid.add(os.path.join(pfx, ec[0]))
+ else:
+ logging.warn('got invalid changelog type: %s' % (et))
logging.debug('entries: %s' % repr(entries))
if not retry:
- self.update_cumulative_stats(files_pending)
+ self.update_worker_cumilitive_status(files_pending)
# sync namespace
if (entries):
self.slave.server.entry_ops(entries)
+ # sync metadata
+ if (meta_gfid):
+ meta_entries = []
+ for go in meta_gfid:
+ st = lstat(go)
+ if isinstance(st, int):
+ logging.debug('file %s got purged in the interim' % go)
+ continue
+ meta_entries.append(edct('META', go=go, stat=st))
+ if meta_entries:
+ self.slave.server.meta_ops(meta_entries)
# sync data
- if self.syncdata(datas):
- if done:
- self.master.server.changelog_done(change)
- return True
-
- def sync_done(self):
- self.total_crawl_stats['files_syncd'] += self.total_crawl_stats['files_remaining']
- self.total_crawl_stats['files_remaining'] = 0
- self.total_crawl_stats['bytes_remaining'] = 0
- self.total_crawl_stats['purges_remaining'] = 0
- self.update_crawl_data()
+ if datas:
+ self.a_syncdata(datas)
def process(self, changes, done=1):
- for change in changes:
- tries = 0
- retry = False
- while True:
- logging.debug('processing change %s' % change)
- if self.process_change(change, done, retry):
- self.sync_done()
- break
- retry = True
- tries += 1
- if tries == self.MAX_RETRIES:
- logging.warn('changelog %s could not be processed - moving on...' % os.path.basename(change))
- self.sync_done()
- if done:
- self.master.server.changelog_done(change)
- break
- # it's either entry_ops() or Rsync that failed to do it's
- # job. Mostly it's entry_ops() [which currently has a problem
- # of failing to create an entry but failing to return an errno]
- # Therefore we do not know if it's either Rsync or the freaking
- # entry_ops() that failed... so we retry the _whole_ changelog
- # again.
- # TODO: remove entry retries when it's gets fixed.
- logging.warn('incomplete sync, retrying changelog: %s' % change)
- time.sleep(0.5)
- self.turns += 1
+ tries = 0
+ retry = False
- def upd_stime(self, stime):
+ while True:
+ self.skipped_gfid_list = []
+ self.current_files_skipped_count = 0
+
+ # first, fire all changelog transfers in parallel. entry and metadata
+ # are performed synchronously, therefore in serial. However at the end
+ # of each changelog, data is synchronized with syncdata_async() - which
+ # means it is serial w.r.t entries/metadata of that changelog but
+ # happens in parallel with data of other changelogs.
+
+ for change in changes:
+ logging.debug('processing change %s' % change)
+ self.process_change(change, done, retry)
+ if not retry:
+ self.turns += 1 # number of changelogs processed in the batch
+
+ # Now we wait for all the data transfers fired off in the above step
+ # to complete. Note that this is not ideal either. Ideally we want to
+ # trigger the entry/meta-data transfer of the next batch while waiting
+ # for the data transfer of the current batch to finish.
+
+ # Note that the reason to wait for the data transfer (vs doing it
+ # completely in the background and call the changelog_done()
+ # asynchronously) is because this waiting acts as a "backpressure"
+ # and prevents a spiraling increase of wait stubs from consuming
+ # unbounded memory and resources.
+
+ # update the slave's time with the timestamp of the _last_ changelog
+ # file time suffix. Since, the changelog prefix time is the time when
+ # the changelog was rolled over, introduce a tolerence of 1 second to
+ # counter the small delta b/w the marker update and gettimeofday().
+ # NOTE: this is only for changelog mode, not xsync.
+
+ # @change is the last changelog (therefore max time for this batch)
+ if self.syncdata_wait():
+ if done:
+ xtl = (int(change.split('.')[-1]) - 1, 0)
+ self.upd_stime(xtl)
+ map(self.master.server.changelog_done, changes)
+ self.update_worker_files_syncd()
+ break
+
+ # We do not know which changelog transfer failed, retry everything.
+ retry = True
+ tries += 1
+ if tries == self.MAX_RETRIES:
+ logging.warn('changelogs %s could not be processed - moving on...' % \
+ ' '.join(map(os.path.basename, changes)))
+ self.update_worker_total_files_skipped(self.current_files_skipped_count)
+ logging.warn('SKIPPED GFID = %s' % ','.join(self.skipped_gfid_list))
+ self.update_worker_files_syncd()
+ if done:
+ xtl = (int(change.split('.')[-1]) - 1, 0)
+ self.upd_stime(xtl)
+ map(self.master.server.changelog_done, changes)
+ break
+ # it's either entry_ops() or Rsync that failed to do it's
+ # job. Mostly it's entry_ops() [which currently has a problem
+ # of failing to create an entry but failing to return an errno]
+ # Therefore we do not know if it's either Rsync or the freaking
+ # entry_ops() that failed... so we retry the _whole_ changelog
+ # again.
+ # TODO: remove entry retries when it's gets fixed.
+ logging.warn('incomplete sync, retrying changelogs: %s' % \
+ ' '.join(map(os.path.basename, changes)))
+ time.sleep(0.5)
+
+ def upd_stime(self, stime, path=None):
+ if not path:
+ path = self.FLAT_DIR_HIERARCHY
if not stime == URXTIME:
- self.sendmark(self.FLAT_DIR_HIERARCHY, stime)
+ self.sendmark(path, stime)
+
+ def get_worker_status_file(self):
+ file_name = gconf.local_path+'.status'
+ file_name = file_name.replace("/", "_")
+ worker_status_file = gconf.georep_session_working_dir+file_name
+ return worker_status_file
+
+ def update_worker_status(self, key, value):
+ default_data = {"remote_node":"N/A",
+ "worker status":"Not Started",
+ "crawl status":"N/A",
+ "files_syncd": 0,
+ "files_remaining": 0,
+ "bytes_remaining": 0,
+ "purges_remaining": 0,
+ "total_files_skipped": 0}
+ worker_status_file = self.get_worker_status_file()
+ try:
+ with open(worker_status_file, 'r+') as f:
+ loaded_data = json.load(f)
+ loaded_data[key] = value
+ os.ftruncate(f.fileno(), 0)
+ os.lseek(f.fileno(), 0, os.SEEK_SET)
+ json.dump(loaded_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except (IOError, OSError, ValueError):
+ logging.info ('Creating new %s' % worker_status_file)
+ try:
+ with open(worker_status_file, 'wb') as f:
+ default_data[key] = value
+ json.dump(default_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except:
+ raise
+
+ def update_worker_cumilitive_status(self, files_pending):
+ default_data = {"remote_node":"N/A",
+ "worker status":"Not Started",
+ "crawl status":"N/A",
+ "files_syncd": 0,
+ "files_remaining": 0,
+ "bytes_remaining": 0,
+ "purges_remaining": 0,
+ "total_files_skipped": 0}
+ worker_status_file = self.get_worker_status_file()
+ try:
+ with open(worker_status_file, 'r+') as f:
+ loaded_data = json.load(f)
+ loaded_data['files_remaining'] = files_pending['count']
+ loaded_data['bytes_remaining'] = files_pending['bytes']
+ loaded_data['purges_remaining'] = files_pending['purge']
+ os.ftruncate(f.fileno(), 0)
+ os.lseek(f.fileno(), 0, os.SEEK_SET)
+ json.dump(loaded_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except (IOError, OSError, ValueError):
+ logging.info ('Creating new %s' % worker_status_file)
+ try:
+ with open(worker_status_file, 'wb') as f:
+ default_data['files_remaining'] = files_pending['count']
+ default_data['bytes_remaining'] = files_pending['bytes']
+ default_data['purges_remaining'] = files_pending['purge']
+ json.dump(default_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except:
+ raise
+
+ def update_worker_remote_node (self):
+ node = sys.argv[-1]
+ node = node.split("@")[-1]
+ remote_node_ip = node.split(":")[0]
+ remote_node_vol = node.split(":")[3]
+ remote_node = remote_node_ip + '::' + remote_node_vol
+ self.update_worker_status ('remote_node', remote_node)
+
+ def update_worker_health (self, state):
+ self.update_worker_status ('worker status', state)
+
+ def update_worker_crawl_status (self, state):
+ self.update_worker_status ('crawl status', state)
+
+ def update_worker_files_syncd (self):
+ default_data = {"remote_node":"N/A",
+ "worker status":"Not Started",
+ "crawl status":"N/A",
+ "files_syncd": 0,
+ "files_remaining": 0,
+ "bytes_remaining": 0,
+ "purges_remaining": 0,
+ "total_files_skipped": 0}
+ worker_status_file = self.get_worker_status_file()
+ try:
+ with open(worker_status_file, 'r+') as f:
+ loaded_data = json.load(f)
+ loaded_data['files_syncd'] += loaded_data['files_remaining']
+ loaded_data['files_remaining'] = 0
+ loaded_data['bytes_remaining'] = 0
+ loaded_data['purges_remaining'] = 0
+ os.ftruncate(f.fileno(), 0)
+ os.lseek(f.fileno(), 0, os.SEEK_SET)
+ json.dump(loaded_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except (IOError, OSError, ValueError):
+ logging.info ('Creating new %s' % worker_status_file)
+ try:
+ with open(worker_status_file, 'wb') as f:
+ json.dump(default_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except:
+ raise
+
+ def update_worker_files_remaining (self, state):
+ self.update_worker_status ('files_remaining', state)
+
+ def update_worker_bytes_remaining (self, state):
+ self.update_worker_status ('bytes_remaining', state)
+
+ def update_worker_purges_remaining (self, state):
+ self.update_worker_status ('purges_remaining', state)
+
+ def update_worker_total_files_skipped (self, value):
+ default_data = {"remote_node":"N/A",
+ "worker status":"Not Started",
+ "crawl status":"N/A",
+ "files_syncd": 0,
+ "files_remaining": 0,
+ "bytes_remaining": 0,
+ "purges_remaining": 0,
+ "total_files_skipped": 0}
+ worker_status_file = self.get_worker_status_file()
+ try:
+ with open(worker_status_file, 'r+') as f:
+ loaded_data = json.load(f)
+ loaded_data['total_files_skipped'] = value
+ loaded_data['files_remaining'] -= value
+ os.ftruncate(f.fileno(), 0)
+ os.lseek(f.fileno(), 0, os.SEEK_SET)
+ json.dump(loaded_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except (IOError, OSError, ValueError):
+ logging.info ('Creating new %s' % worker_status_file)
+ try:
+ with open(worker_status_file, 'wb') as f:
+ default_data['total_files_skipped'] = value
+ json.dump(default_data, f)
+ f.flush()
+ os.fsync(f.fileno())
+ except:
+ raise
def crawl(self):
+ self.update_worker_crawl_status("Changelog Crawl")
changes = []
+ # get stime (from the brick) and purge changelogs
+ # that are _historical_ to that time.
+ purge_time = self.xtime('.', self.slave)
+ if isinstance(purge_time, int):
+ purge_time = None
try:
self.master.server.changelog_scan()
self.crawls += 1
except OSError:
self.fallback_xsync()
+ self.update_worker_crawl_status("Hybrid Crawl")
changes = self.master.server.changelog_getchanges()
if changes:
- xtl = self.xtime(self.FLAT_DIR_HIERARCHY)
- if isinstance(xtl, int):
- raise GsyncdError('master is corrupt')
+ if purge_time:
+ logging.info("slave's time: %s" % repr(purge_time))
+ processed = [x for x in changes if int(x.split('.')[-1]) < purge_time[0]]
+ for pr in processed:
+ logging.info('skipping already processed change: %s...' % os.path.basename(pr))
+ self.master.server.changelog_done(pr)
+ changes.remove(pr)
logging.debug('processing changes %s' % repr(changes))
self.process(changes)
- self.upd_stime(xtl)
def register(self):
(workdir, logfile) = self.setup_working_dir()
@@ -799,17 +1077,20 @@ class GMasterChangelogMixin(GMasterCommon):
class GMasterXsyncMixin(GMasterChangelogMixin):
"""
-
This crawl needs to be xtime based (as of now
it's not. this is beacuse we generate CHANGELOG
file during each crawl which is then processed
by process_change()).
For now it's used as a one-shot initial sync
mechanism and only syncs directories, regular
- files and symlinks.
+ files, hardlinks and symlinks.
"""
+ XSYNC_MAX_ENTRIES = 1<<13
+
def register(self):
+ self.counter = 0
+ self.comlist = []
self.sleep_interval = 60
self.tempdir = self.setup_working_dir()[0]
self.tempdir = os.path.join(self.tempdir, 'xsync')
@@ -823,6 +1104,36 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
else:
raise
+ def crawl(self):
+ """
+ event dispatcher thread
+
+ this thread dispatches either changelog or synchronizes stime.
+ additionally terminates itself on recieving a 'finale' event
+ """
+ def Xsyncer():
+ self.Xcrawl()
+ t = Thread(target=Xsyncer)
+ t.start()
+ logging.info('starting hybrid crawl...')
+ self.update_worker_crawl_status("Hybrid Crawl")
+ while True:
+ try:
+ item = self.comlist.pop(0)
+ if item[0] == 'finale':
+ logging.info('finished hybrid crawl syncing')
+ break
+ elif item[0] == 'xsync':
+ logging.info('processing xsync changelog %s' % (item[1]))
+ self.process([item[1]], 0)
+ elif item[0] == 'stime':
+ logging.debug('setting slave time: %s' % repr(item[1]))
+ self.upd_stime(item[1][1], item[1][0])
+ else:
+ logging.warn('unknown tuple in comlist (%s)' % repr(item))
+ except IndexError:
+ time.sleep(1)
+
def write_entry_change(self, prefix, data=[]):
self.fh.write("%s %s\n" % (prefix, ' '.join(data)))
@@ -839,24 +1150,61 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
def fname(self):
return self.xsync_change
- def crawl(self, path='.', xtr=None, done=0):
- """ generate a CHANGELOG file consumable by process_change """
+ def put(self, mark, item):
+ self.comlist.append((mark, item))
+
+ def sync_xsync(self, last):
+ """schedule a processing of changelog"""
+ self.close()
+ self.put('xsync', self.fname())
+ self.counter = 0
+ if not last:
+ time.sleep(1) # make sure changelogs are 1 second apart
+ self.open()
+
+ def sync_stime(self, stime=None, last=False):
+ """schedule a stime synchronization"""
+ if stime:
+ self.put('stime', stime)
+ if last:
+ self.put('finale', None)
+
+ def sync_done(self, stime=None, last=False):
+ self.sync_xsync(last)
+ if stime:
+ self.sync_stime(stime, last)
+
+ def Xcrawl(self, path='.', xtr_root=None):
+ """
+ generate a CHANGELOG file consumable by process_change.
+
+ slave's xtime (stime) is _cached_ for comparisons across
+ the filesystem tree, but set after directory synchronization.
+ """
if path == '.':
self.open()
self.crawls += 1
- if not xtr:
+ if not xtr_root:
# get the root stime and use it for all comparisons
- xtr = self.xtime('.', self.slave)
- if isinstance(xtr, int):
- if xtr != ENOENT:
- raise GsyncdError('slave is corrupt')
- xtr = self.minus_infinity
+ xtr_root = self.xtime('.', self.slave)
+ if isinstance(xtr_root, int):
+ if xtr_root != ENOENT:
+ logging.warn("slave cluster not returning the " \
+ "correct xtime for root (%d)" % xtr_root)
+ xtr_root = self.minus_infinity
xtl = self.xtime(path)
if isinstance(xtl, int):
- raise GsyncdError('master is corrupt')
- if xtr == xtl:
+ logging.warn("master cluster's xtime not found")
+ xtr = self.xtime(path, self.slave)
+ if isinstance(xtr, int):
+ if xtr != ENOENT:
+ logging.warn("slave cluster not returning the " \
+ "correct xtime for %s (%d)" % (path, xtr))
+ xtr = self.minus_infinity
+ xtr = max(xtr, xtr_root)
+ if not self.need_sync(path, xtl, xtr):
if path == '.':
- self.close()
+ self.sync_done((path, xtl), True)
return
self.xtime_reversion_hook(path, xtl, xtr)
logging.debug("entering " + path)
@@ -867,43 +1215,42 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
for e in dem:
bname = e
e = os.path.join(path, e)
- st = lstat(e)
+ xte = self.xtime(e)
+ if isinstance(xte, int):
+ logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte]))
+ continue
+ if not self.need_sync(e, xte, xtr):
+ continue
+ st = self.master.server.lstat(e)
if isinstance(st, int):
- logging.warn('%s got purged in the interim..' % e)
+ logging.warn('%s got purged in the interim ...' % e)
continue
gfid = self.master.server.gfid(e)
if isinstance(gfid, int):
- logging.warn('skipping entry %s..' % (e))
- continue
- xte = self.xtime(e)
- if isinstance(xte, int):
- raise GsyncdError('master is corrupt')
- if not self.need_sync(e, xte, xtr):
+ logging.warn('skipping entry %s..' % e)
continue
mo = st.st_mode
+ self.counter += 1
+ if self.counter == self.XSYNC_MAX_ENTRIES:
+ self.sync_done()
if stat.S_ISDIR(mo):
- self.write_entry_change("E", [gfid, 'MKDIR', escape(os.path.join(pargfid, bname))])
- self.crawl(e, xtr)
+ self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))])
+ self.Xcrawl(e, xtr_root)
+ self.sync_done((e, xte), False)
elif stat.S_ISLNK(mo):
- rl = errno_wrap(os.readlink, [en], [ENOENT])
- if isinstance(rl, int):
- continue
- self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname)), rl])
- else:
+ self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))])
+ elif stat.S_ISREG(mo):
+ nlink = st.st_nlink
+ nlink -= 1 # fixup backend stat link count
# if a file has a hardlink, create a Changelog entry as 'LINK' so the slave
# side will decide if to create the new entry, or to create link.
- if st.st_nlink == 1:
- self.write_entry_change("E", [gfid, 'MKNOD', escape(os.path.join(pargfid, bname))])
+ if nlink == 1:
+ self.write_entry_change("E", [gfid, 'MKNOD', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))])
else:
self.write_entry_change("E", [gfid, 'LINK', escape(os.path.join(pargfid, bname))])
- if stat.S_ISREG(mo):
- self.write_entry_change("D", [gfid])
-
+ self.write_entry_change("D", [gfid])
if path == '.':
- logging.info('processing xsync changelog %s' % self.fname())
- self.close()
- self.process([self.fname()], done)
- self.upd_stime(xtl)
+ self.sync_done((path, xtl), True)
class BoxClosedErr(Exception):
pass
@@ -979,12 +1326,13 @@ class Syncer(object):
each completed syncjob.
"""
- def __init__(self, slave):
+ def __init__(self, slave, sync_engine, resilient_errnos=[]):
"""spawn worker threads"""
self.slave = slave
self.lock = Lock()
self.pb = PostBox()
- self.bytes_synced = 0
+ self.sync_engine = sync_engine
+ self.errnos_ok = resilient_errnos
for i in range(int(gconf.sync_jobs)):
t = Thread(target=self.syncjob)
t.start()
@@ -1002,11 +1350,10 @@ class Syncer(object):
break
time.sleep(0.5)
pb.close()
- po = self.slave.rsync(pb)
+ po = self.sync_engine(pb)
if po.returncode == 0:
ret = (True, 0)
- elif po.returncode in (23, 24):
- # partial transfer (cf. rsync(1)), that's normal
+ elif po.returncode in self.errnos_ok:
ret = (False, po.returncode)
else:
po.errfail()
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index faf62f868..8deb5114b 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -265,6 +265,9 @@ class Server(object):
FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT
GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0'
+ GFID_XATTR = 'trusted.gfid' # for backend gfid fetch, do not use GX_NSPACE_PFX
+ GFID_FMTSTR = "!" + "B"*16
+
local_path = ''
@classmethod
@@ -305,6 +308,38 @@ class Server(object):
raise OSError(ENOTDIR, os.strerror(ENOTDIR))
return os.listdir(path)
+
+ @classmethod
+ @_pathguard
+ def lstat(cls, path):
+ try:
+ return os.lstat(path)
+ except (IOError, OSError):
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ return ex.errno
+ else:
+ raise
+
+
+ @classmethod
+ @_pathguard
+ def gfid(cls, path):
+ try:
+ buf = Xattr.lgetxattr(path, cls.GFID_XATTR, 16)
+ m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)]))
+ return '-'.join(m.groups())
+ except (IOError, OSError):
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ return ex.errno
+ else:
+ raise
+
+ @classmethod
+ def gfid_mnt(cls, gfidpath):
+ return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT])
+
@classmethod
@_pathguard
def purge(cls, path, entries=None):
@@ -397,8 +432,42 @@ class Server(object):
raise
@classmethod
- def gfid(cls, gfidpath):
- return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT])
+ @_pathguard
+ def stime_mnt(cls, path, uuid):
+ """query xtime extended attribute
+
+ Return xtime of @path for @uuid as a pair of integers.
+ "Normal" errors due to non-existent @path or extended attribute
+ are tolerated and errno is returned in such a case.
+ """
+
+ try:
+ return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8))
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOENT, ENODATA, ENOTDIR):
+ return ex.errno
+ else:
+ raise
+
+ @classmethod
+ @_pathguard
+ def stime(cls, path, uuid):
+ """query xtime extended attribute
+
+ Return xtime of @path for @uuid as a pair of integers.
+ "Normal" errors due to non-existent @path or extended attribute
+ are tolerated and errno is returned in such a case.
+ """
+
+ try:
+ return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8))
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOENT, ENODATA, ENOTDIR):
+ return ex.errno
+ else:
+ raise
@classmethod
def node_uuid(cls, path='.'):
@@ -409,21 +478,10 @@ class Server(object):
raise
@classmethod
- def xtime_vec(cls, path, *uuids):
- """vectored version of @xtime
-
- accepts a list of uuids and returns a dictionary
- with uuid as key(s) and xtime as value(s)
- """
- xt = {}
- for uuid in uuids:
- xtu = cls.xtime(path, uuid)
- if xtu == ENODATA:
- xtu = None
- if isinstance(xtu, int):
- return xtu
- xt[uuid] = xtu
- return xt
+ @_pathguard
+ def set_stime(cls, path, uuid, mark):
+ """set @mark as stime for @uuid on @path"""
+ Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), struct.pack('!II', *mark))
@classmethod
@_pathguard
@@ -444,20 +502,16 @@ class Server(object):
Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))
@classmethod
- def set_xtime_vec(cls, path, mark_dct):
- """vectored (or dictered) version of set_xtime
-
- ignore values that match @ignore
- """
- for u,t in mark_dct.items():
- cls.set_xtime(path, u, t)
-
- @classmethod
def entry_ops(cls, entries):
pfx = gauxpfx()
logging.debug('entries: %s' % repr(entries))
# regular file
- def entry_pack_reg(gf, bn, st):
+ def entry_pack_reg(gf, bn, mo, uid, gid):
+ blen = len(bn)
+ return struct.pack(cls._fmt_mknod(blen),
+ uid, gid, gf, mo, bn,
+ stat.S_IMODE(mo), 0, umask())
+ def entry_pack_reg_stat(gf, bn, st):
blen = len(bn)
mo = st['mode']
return struct.pack(cls._fmt_mknod(blen),
@@ -465,12 +519,10 @@ class Server(object):
gf, mo, bn,
stat.S_IMODE(mo), 0, umask())
# mkdir
- def entry_pack_mkdir(gf, bn, st):
+ def entry_pack_mkdir(gf, bn, mo, uid, gid):
blen = len(bn)
- mo = st['mode']
return struct.pack(cls._fmt_mkdir(blen),
- st['uid'], st['gid'],
- gf, mo, bn,
+ uid, gid, gf, mo, bn,
stat.S_IMODE(mo), umask())
#symlink
def entry_pack_symlink(gf, bn, lnk, st):
@@ -485,7 +537,7 @@ class Server(object):
# to be purged is the GFID gotten from the changelog.
# (a stat(changelog_gfid) would also be valid here)
# The race here is between the GFID check and the purge.
- disk_gfid = cls.gfid(entry)
+ disk_gfid = cls.gfid_mnt(entry)
if isinstance(disk_gfid, int):
return
if not gfid == disk_gfid:
@@ -510,15 +562,15 @@ class Server(object):
else:
break
elif op in ['CREATE', 'MKNOD']:
- blob = entry_pack_reg(gfid, bname, e['stat'])
+ blob = entry_pack_reg(gfid, bname, e['mode'], e['uid'], e['uid'])
elif op == 'MKDIR':
- blob = entry_pack_mkdir(gfid, bname, e['stat'])
+ blob = entry_pack_mkdir(gfid, bname, e['mode'], e['uid'], e['uid'])
elif op == 'LINK':
slink = os.path.join(pfx, gfid)
st = lstat(slink)
if isinstance(st, int):
(pg, bname) = entry2pb(entry)
- blob = entry_pack_reg(gfid, bname, e['stat'])
+ blob = entry_pack_reg_stat(gfid, bname, e['stat'])
else:
errno_wrap(os.link, [slink, entry], [ENOENT, EEXIST])
elif op == 'SYMLINK':
@@ -528,13 +580,24 @@ class Server(object):
st = lstat(entry)
if isinstance(st, int):
(pg, bname) = entry2pb(en)
- blob = entry_pack_reg(gfid, bname, e['stat'])
+ blob = entry_pack_reg_stat(gfid, bname, e['stat'])
else:
errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])
if blob:
errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [EEXIST], [ENOENT, ESTALE, EINVAL])
@classmethod
+ def meta_ops(cls, meta_entries):
+ logging.debug('Meta-entries: %s' % repr(meta_entries))
+ for e in meta_entries:
+ mode = e['stat']['mode']
+ uid = e['stat']['uid']
+ gid = e['stat']['gid']
+ go = e['go']
+ errno_wrap(os.chmod, [go, mode], [ENOENT], [ESTALE, EINVAL])
+ errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL])
+
+ @classmethod
def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0):
Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)
@@ -699,6 +762,29 @@ class SlaveRemote(object):
return po
+ def tarssh(self, files, slaveurl):
+ """invoke tar+ssh
+ -z (compress) can be use if needed, but ommitting it now
+ as it results in wierd error (tar+ssh errors out (errcode: 2)
+ """
+ if not files:
+ raise GsyncdError("no files to sync")
+ logging.debug("files: " + ", ".join(files))
+ (host, rdir) = slaveurl.split(':')
+ tar_cmd = ["tar", "-cf", "-", "--files-from", "-"]
+ ssh_cmd = gconf.ssh_command_tar.split() + [host, "tar", "--overwrite", "-xf", "-", "-C", rdir]
+ p0 = Popen(tar_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+ p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE)
+ for f in files:
+ p0.stdin.write(f)
+ p0.stdin.write('\n')
+ p0.stdin.close()
+ p0.wait()
+
+ p1.wait()
+ p1.terminate_geterr(fail_on_err = False)
+
+ return p1
class AbstractUrl(object):
"""abstract base class for url scheme classes"""
@@ -1041,12 +1127,20 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
except ValueError:
pass
return e
+ @classmethod
+ def lstat(cls, e):
+ """ path based backend stat """
+ return super(brickserver, cls).lstat(e)
+ @classmethod
+ def gfid(cls, e):
+ """ path based backend gfid fetch """
+ return super(brickserver, cls).gfid(e)
if gconf.slave_id:
# define {,set_}xtime in slave, thus preempting
# the call to remote, so that it takes data from
# the local brick
- slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server)
- slave.server.set_xtime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_xtime(path, uuid + '.' + gconf.slave_id, mark), slave.server)
+ slave.server.stime = types.MethodType(lambda _self, path, uuid: brickserver.stime(path, uuid + '.' + gconf.slave_id), slave.server)
+ slave.server.set_stime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_stime(path, uuid + '.' + gconf.slave_id, mark), slave.server)
(g1, g2) = self.gmaster_instantiate_tuple(slave)
g1.master.server = brickserver
g2.master.server = brickserver
@@ -1067,6 +1161,9 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def rsync(self, files):
return sup(self, files, self.slavedir)
+ def tarssh(self, files):
+ return sup(self, files, self.slavedir)
+
class SSH(AbstractUrl, SlaveRemote):
"""scheme class for ssh:// urls
@@ -1170,3 +1267,6 @@ class SSH(AbstractUrl, SlaveRemote):
def rsync(self, files):
return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args),
*(gconf.rsync_ssh_options.split() + [self.slaveurl]))
+
+ def tarssh(self, files):
+ return sup(self, files, self.slaveurl)
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 348eb38c1..1b5684c6d 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -227,7 +227,7 @@ def log_raise_exception(excont):
logging.warn("!!!!!!!!!!!!!")
logging.warn('!!! getting "No such file or directory" errors '
"is most likely due to MISCONFIGURATION, please consult "
- "http://access.redhat.com/knowledge/docs/en-US/Red_Hat_Storage/2.0/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html")
+ "https://access.redhat.com/site/documentation/en-US/Red_Hat_Storage/2.1/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html")
logging.warn("!!!!!!!!!!!!!")
gconf.transport.terminate_geterr()
elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED):