diff options
| author | Ajeet Jha <ajha@redhat.com> | 2013-12-02 12:37:34 +0530 | 
|---|---|---|
| committer | Vijay Bellur <vbellur@redhat.com> | 2014-01-27 09:43:19 -0800 | 
| commit | 30592e7f92515c5df620f300d6a3df6373ac6200 (patch) | |
| tree | 851c232b1e3be7f5458cf6c753b92be2f482ad17 | |
| parent | c8b9a9e9f82af7e752d4d881313374713701441d (diff) | |
gsyncd / geo-rep: geo-replication fixes
-> "threaded" hybrid crawl.
-> Enabling metatadata synchronization.
-> Handling EINVAL/ESTALE gracefully while syncing metadata.
-> Improvments to changelog crawl code.
-> Initial crawl changelog generation format.
-> No gsyncd restart when checkpoint updated.
-> Fix symlink handling in hybrid crawl.
-> Slave's xtime key is 'stime'.
-> tar+ssh as data synchronization.
-> Instead of 'raise', just log in warning level for xtime missing cases.
-> Fix for JSON object load failure
-> Get new config value after config value reset.
-> Skip already processed changelogs.
-> Saving status of each individual worker thread.
-> GFID fetch on slave for purges.
-> Add tar ssh keys and config options.
-> Fix nlink count when using backend.
-> Include "data" operation for hardlink.
-> Use changelog time prefix as slave's time.
-> Process changelogs in parallel.
Change-Id: I09fcbb2e2e418149a6d8435abd2ac6b2f015bb06
BUG: 1036539
Signed-off-by: Ajeet Jha <ajha@redhat.com>
Reviewed-on: http://review.gluster.org/6404
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Anand Avati <avati@redhat.com>
Reviewed-on: http://review.gluster.org/6809
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
| -rwxr-xr-x | geo-replication/src/peer_gsec_create.in | 10 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/configinterface.py | 41 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 10 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 799 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 176 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 2 | 
6 files changed, 769 insertions, 269 deletions
| diff --git a/geo-replication/src/peer_gsec_create.in b/geo-replication/src/peer_gsec_create.in index ef630bd44..a39fdbfb5 100755 --- a/geo-replication/src/peer_gsec_create.in +++ b/geo-replication/src/peer_gsec_create.in @@ -8,5 +8,11 @@ if [ ! -f "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem.pub ]; then      ssh-keygen -N '' -f "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem > /dev/null  fi -output=`echo command=\"@libexecdir@/glusterfs/gsyncd\" " "``cat "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem.pub` -echo $output +if [ ! -f "$GLUSTERD_WORKING_DIR"/geo-replication/tar_ssh.pem.pub ]; then +    \rm -rf "$GLUSTERD_WORKING_DIR"/geo-replication/tar_ssh.pem* +    ssh-keygen -N '' -f "$GLUSTERD_WORKING_DIR"/geo-replication/tar_ssh.pem > /dev/null +fi + +output1=`echo command=\"${exec_prefix}/libexec/glusterfs/gsyncd\" " "``cat "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem.pub` +output2=`echo command=\"tar \$\{SSH_ORIGINAL_COMMAND#* \}\" " "``cat "$GLUSTERD_WORKING_DIR"/geo-replication/tar_ssh.pem.pub` +echo -e "$output1\n$output2" diff --git a/geo-replication/syncdaemon/configinterface.py b/geo-replication/syncdaemon/configinterface.py index a326e8246..0f764c47a 100644 --- a/geo-replication/syncdaemon/configinterface.py +++ b/geo-replication/syncdaemon/configinterface.py @@ -5,6 +5,10 @@ except ImportError:      import configparser as ConfigParser  import re  from string import Template +import os +import errno +import sys +from stat import ST_DEV, ST_INO, ST_MTIME  from syncdutils import escape, unescape, norm, update_file, GsyncdError @@ -65,8 +69,38 @@ class GConffile(object):          self.auxdicts = dd          self.config = ConfigParser.RawConfigParser()          self.config.read(path) +        self.dev, self.ino, self.mtime = -1, -1, -1          self._normconfig() +    def _load(self): +        try: +            sres = os.stat(self.path) +            self.dev = sres[ST_DEV] +            self.ino = sres[ST_INO] +            self.mtime = sres[ST_MTIME] +        except (OSError, IOError): +            if sys.exc_info()[1].errno == errno.ENOENT: +                sres = None + +        self.config.read(self.path) +        self._normconfig() + +    def get_realtime(self, opt): +        try: +            sres = os.stat(self.path) +        except (OSError, IOError): +            if sys.exc_info()[1].errno == errno.ENOENT: +                sres = None +            else: +                raise + +        # compare file system stat with that of our stream file handle +        if not sres or sres[ST_DEV] != self.dev or \ +           sres[ST_INO] != self.ino or self.mtime != sres[ST_MTIME]: +            self._load() + +        return self.get(opt, printValue=False) +      def section(self, rx=False):          """get the section name of the section representing .peers in .config"""          peers = self.peers @@ -162,7 +196,7 @@ class GConffile(object):          if self.config.has_section(self.section()):              update_from_sect(self.section(), MultiDict(dct, *self.auxdicts)) -    def get(self, opt=None): +    def get(self, opt=None, printValue=True):          """print the matching key/value pairs from .config,             or if @opt given, the value for @opt (according to the             logic described in .update_to) @@ -173,7 +207,10 @@ class GConffile(object):              opt = norm(opt)              v = d.get(opt)              if v: -                print(v) +                if printValue: +                    print(v) +                else: +                    return v          else:              for k, v in d.iteritems():                  if k == '__name__': diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 7fcc3165a..64c26a5d2 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -191,6 +191,7 @@ def main_i():      op.add_option('--log-file-mbr',        metavar='LOGF',  type=str, action='callback', callback=store_abs)      op.add_option('--state-file',          metavar='STATF', type=str, action='callback', callback=store_abs)      op.add_option('--state-detail-file',   metavar='STATF', type=str, action='callback', callback=store_abs) +    op.add_option('--georep-session-working-dir', metavar='STATF', type=str, action='callback', callback=store_abs)      op.add_option('--ignore-deletes',      default=False, action='store_true')      op.add_option('--isolated-slave',      default=False, action='store_true')      op.add_option('--use-rsync-xattrs',    default=False, action='store_true') @@ -202,6 +203,7 @@ def main_i():      op.add_option('--local-id',            metavar='ID',    help=SUPPRESS_HELP, default='')      op.add_option('--local-path',          metavar='PATH',  help=SUPPRESS_HELP, default='')      op.add_option('-s', '--ssh-command',   metavar='CMD',   default='ssh') +    op.add_option('--ssh-command-tar',     metavar='CMD',   default='ssh')      op.add_option('--rsync-command',       metavar='CMD',   default='rsync')      op.add_option('--rsync-options',       metavar='OPTS',  default='')      op.add_option('--rsync-ssh-options',   metavar='OPTS',  default='--compress') @@ -228,6 +230,7 @@ def main_i():      op.add_option('--change-interval', metavar='SEC', type=int, default=3)      # working directory for changelog based mechanism      op.add_option('--working-dir', metavar='DIR', type=str, action='callback', callback=store_abs) +    op.add_option('--use-tarssh', default=False, action='store_true')      op.add_option('-c', '--config-file',   metavar='CONF',  type=str, action='callback', callback=store_local)      # duh. need to specify dest or value will be mapped to None :S @@ -474,8 +477,15 @@ def main_i():              GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf')              if confdata.op == 'set':                  logging.info('checkpoint %s set' % confdata.val) +                gcnf.delete('checkpoint_completed') +                gcnf.delete('checkpoint_target')              elif confdata.op == 'del':                  logging.info('checkpoint info was reset') +                # if it is removing 'checkpoint' then we need +                # to remove 'checkpoint_completed' and 'checkpoint_target' too +                gcnf.delete('checkpoint_completed') +                gcnf.delete('checkpoint_target') +          except IOError:              if sys.exc_info()[1].errno == ENOENT:                  # directory of log path is not present, diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 95810a61e..721fe18bd 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -10,15 +10,16 @@ import socket  import string  import errno  from shutil import copyfileobj -from errno import ENOENT, ENODATA, EPIPE, EEXIST +from errno import ENOENT, ENODATA, EPIPE, EEXIST, errorcode  from threading import currentThread, Condition, Lock  from datetime import datetime +from libcxattr import Xattr  from gconf import gconf  from tempfile import mkdtemp, NamedTemporaryFile  from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \                         unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \ -                       lstat, errno_wrap +                       lstat, errno_wrap, update_file  URXTIME = (-1, 0) @@ -59,7 +60,8 @@ def gmaster_builder(excrawl=None):      crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')      sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin      purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin -    class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin): +    syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine +    class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin, syncengine):          pass      return _GMaster @@ -101,14 +103,17 @@ class NormalMixin(object):          if not 'default_xtime' in opts:              opts['default_xtime'] = URXTIME -    def xtime_low(self, server, path, **opts): -        xt = server.xtime(path, self.uuid) +    def xtime_low(self, rsc, path, **opts): +        if rsc == self.master: +            xt = rsc.server.xtime(path, self.uuid) +        else: +            xt = rsc.server.stime(path, self.uuid)          if isinstance(xt, int) and xt != ENODATA:              return xt          if xt == ENODATA or xt < self.volmark:              if opts['create']:                  xt = _xtime_now() -                server.aggregated.set_xtime(path, self.uuid, xt) +                rsc.server.aggregated.set_xtime(path, self.uuid, xt)              else:                  xt = opts['default_xtime']          return xt @@ -140,7 +145,7 @@ class NormalMixin(object):          return xte > xtrd      def set_slave_xtime(self, path, mark): -        self.slave.server.set_xtime(path, self.uuid, mark) +        self.slave.server.set_stime(path, self.uuid, mark)          self.slave.server.set_xtime_remote(path, self.uuid, mark)  class PartialMixin(NormalMixin): @@ -190,6 +195,65 @@ class PurgeNoopMixin(object):      def purge_missing(self, path, names):          pass +class TarSSHEngine(object): +    """Sync engine that uses tar(1) piped over ssh(1) +       for data transfers. Good for lots of small files. +    """ +    def a_syncdata(self, files): +        logging.debug('files: %s' % (files)) +        for f in files: +            pb = self.syncer.add(f) +            def regjob(se, xte, pb): +                rv = pb.wait() +                if rv[0]: +                    logging.debug('synced ' + se) +                    return True +                else: +                    # stat check for file presence +                    st = lstat(se) +                    if isinstance(st, int): +                        return True +                    logging.warn('tar+ssh: %s [errcode: %d]' % (se, rv[1])) +            self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) + +    def syncdata_wait(self): +        if self.wait(self.FLAT_DIR_HIERARCHY, None): +            return True + +    def syncdata(self, files): +        self.a_syncdata(files) +        self.syncdata_wait() + +class RsyncEngine(object): +    """Sync engine that uses rsync(1) for data transfers""" +    def a_syncdata(self, files): +        logging.debug('files: %s' % (files)) +        for f in files: +            logging.debug('candidate for syncing %s' % f) +            pb = self.syncer.add(f) +            def regjob(se, xte, pb): +                rv = pb.wait() +                if rv[0]: +                    logging.debug('synced ' + se) +                    return True +                else: +                    if rv[1] in [23, 24]: +                        # stat to check if the file exist +                        st = lstat(se) +                        if isinstance(st, int): +                            # file got unlinked in the interim +                            return True +                    logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) +            self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) + +    def syncdata_wait(self): +        if self.wait(self.FLAT_DIR_HIERARCHY, None): +            return True + +    def syncdata(self, files): +        self.a_syncdata(files) +        self.syncdata_wait() +  class GMasterCommon(object):      """abstract class impementling master role""" @@ -234,7 +298,7 @@ class GMasterCommon(object):          else:              rsc = self.master          self.make_xtime_opts(rsc == self.master, opts) -        return self.xtime_low(rsc.server, path, **opts) +        return self.xtime_low(rsc, path, **opts)      def get_initial_crawl_data(self):          # while persisting only 'files_syncd' is non-zero, rest of @@ -243,18 +307,26 @@ class GMasterCommon(object):          default_data = {'files_syncd': 0,                          'files_remaining': 0,                          'bytes_remaining': 0, -                        'purges_remaining': 0} +                        'purges_remaining': 0, +                        'total_files_skipped': 0}          if getattr(gconf, 'state_detail_file', None):              try: -                return json.load(open(gconf.state_detail_file)) -            except (IOError, OSError): +                with open(gconf.state_detail_file, 'r+') as f: +                    loaded_data= json.load(f) +                    diff_data = set(default_data) - set (loaded_data) +                    if len(diff_data): +                        for i in diff_data: +                            loaded_data[i] = default_data[i] +                    return loaded_data +            except (IOError):                  ex = sys.exc_info()[1] -                if ex.errno == ENOENT: -                    # Create file with initial data +                logging.warn ('Creating new gconf.state_detail_file.') +                # Create file with initial data +                try:                      with open(gconf.state_detail_file, 'wb') as f:                          json.dump(default_data, f)                      return default_data -                else: +                except:                      raise          return default_data @@ -264,6 +336,8 @@ class GMasterCommon(object):                  same_dir = os.path.dirname(gconf.state_detail_file)                  with NamedTemporaryFile(dir=same_dir, delete=False) as tmp:                      json.dump(self.total_crawl_stats, tmp) +                    tmp.flush() +                    os.fsync(tmp.fileno())                      os.rename(tmp.name, gconf.state_detail_file)              except (IOError, OSError):                  raise @@ -272,7 +346,13 @@ class GMasterCommon(object):          self.master = master          self.slave = slave          self.jobtab = {} -        self.syncer = Syncer(slave) +        if boolify(gconf.use_tarssh): +            logging.info("using 'tar over ssh' as the sync engine") +            self.syncer = Syncer(slave, self.slave.tarssh) +        else: +            logging.info("using 'rsync' as the sync engine") +            # partial transfer (cf. rsync(1)), that's normal +            self.syncer = Syncer(slave, self.slave.rsync, [23, 24])          # crawls vs. turns:          # - self.crawls is simply the number of crawl() invocations on root          # - one turn is a maximal consecutive sequence of crawls so that each @@ -294,6 +374,8 @@ class GMasterCommon(object):          self.terminate = False          self.sleep_interval = 1          self.checkpoint_thread = None +        self.current_files_skipped_count = 0 +        self.skipped_gfid_list = []      def init_keep_alive(cls):          """start the keep-alive thread """ @@ -336,7 +418,8 @@ class GMasterCommon(object):          gconf.configinterface.set('volume_id', self.uuid)          if self.volinfo:              if self.volinfo['retval']: -                raise GsyncdError("master is corrupt") +                logging.warn("master cluster's info may not be valid %d" % \ +                             self.volinfo['retval'])              self.start_checkpoint_thread()          else:              raise GsyncdError("master volinfo unavailable") @@ -349,7 +432,7 @@ class GMasterCommon(object):          while not self.terminate:              if self.start:                  logging.debug("... crawl #%d done, took %.6f seconds" % \ -                                  (self.crawls, time.time() - self.start)) +                              (self.crawls, time.time() - self.start))              self.start = time.time()              should_display_info = self.start - self.lastreport['time'] >= 60              if should_display_info: @@ -363,9 +446,20 @@ class GMasterCommon(object):              if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds                  crawl = self.should_crawl()                  t0 = t1 +            self.update_worker_remote_node()              if not crawl: +                self.update_worker_health("Passive") +                # bring up _this_ brick to the cluster stime +                # which is min of cluster (but max of the replicas) +                brick_stime = self.xtime('.', self.slave) +                cluster_stime = self.master.server.aggregated.stime_mnt('.', '.'.join([str(self.uuid), str(gconf.slave_id)])) +                logging.debug("Cluster stime: %s | Brick stime: %s" % (repr(cluster_stime), repr(brick_stime))) +                if not isinstance(cluster_stime, int): +                    if brick_stime < cluster_stime: +                        self.slave.server.set_stime(self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)                  time.sleep(5)                  continue +            self.update_worker_health("Active")              self.crawl()              if oneshot:                  return @@ -375,7 +469,7 @@ class GMasterCommon(object):      def _checkpt_param(cls, chkpt, prm, xtimish=True):          """use config backend to lookup a parameter belonging to             checkpoint @chkpt""" -        cprm = getattr(gconf, 'checkpoint_' + prm, None) +        cprm = gconf.configinterface.get_realtime('checkpoint_' + prm)          if not cprm:              return          chkpt_mapped, val = cprm.split(':', 1) @@ -402,17 +496,6 @@ class GMasterCommon(object):              ts += '.' + str(tpair[1])          return ts -    def get_extra_info(self): -        str_info = '\nUptime=%s;FilesSyncd=%d;FilesPending=%d;BytesPending=%d;DeletesPending=%d;' % \ -            (self._crawl_time_format(datetime.now() - self.crawl_start), \ -                 self.total_crawl_stats['files_syncd'], \ -                 self.total_crawl_stats['files_remaining'], \ -                 self.total_crawl_stats['bytes_remaining'], \ -                 self.total_crawl_stats['purges_remaining']) -        str_info += '\0' -        logging.debug(str_info) -        return str_info -      def _crawl_time_format(self, crawl_time):          # Ex: 5 years, 4 days, 20:23:10          years, days = divmod(crawl_time.days, 365.25) @@ -431,27 +514,49 @@ class GMasterCommon(object):          date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2))          return date -    def checkpt_service(self, chan, chkpt, tgt): +    def checkpt_service(self, chan, chkpt):          """checkpoint service loop          monitor and verify checkpoint status for @chkpt, and listen          for incoming requests for whom we serve a pretty-formatted          status report""" -        if not chkpt: -            # dummy loop for the case when there is no checkpt set -            while True: +        while True: +            chkpt = gconf.configinterface.get_realtime("checkpoint") +            if not chkpt: +                gconf.configinterface.delete("checkpoint_completed") +                gconf.configinterface.delete("checkpoint_target") +                # dummy loop for the case when there is no checkpt set                  select([chan], [], [])                  conn, _ = chan.accept() -                conn.send(self.get_extra_info()) +                conn.send('\0')                  conn.close() -        completed = self._checkpt_param(chkpt, 'completed', xtimish=False) -        if completed: -            completed = tuple(int(x) for x in completed.split('.')) -        while True: +                continue + +            checkpt_tgt = self._checkpt_param(chkpt, 'target') +            if not checkpt_tgt: +                checkpt_tgt = self.xtime('.') +                if isinstance(checkpt_tgt, int): +                    raise GsyncdError("master root directory is unaccessible (%s)", +                                      os.strerror(checkpt_tgt)) +                self._set_checkpt_param(chkpt, 'target', checkpt_tgt) +            logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ +                          (repr(checkpt_tgt), chkpt)) + +            # check if the label is 'now' +            chkpt_lbl = chkpt +            try: +                x1,x2 = chkpt.split(':') +                if x1 == 'now': +                    chkpt_lbl = "as of " + self.humantime(x2) +            except: +                pass +            completed = self._checkpt_param(chkpt, 'completed', xtimish=False) +            if completed: +                completed = tuple(int(x) for x in completed.split('.'))              s,_,_ = select([chan], [], [], (not completed) and 5 or None)              # either request made and we re-check to not              # give back stale data, or we still hunting for completion -            if self.native_xtime(tgt) and self.native_xtime(tgt) < self.volmark: +            if self.native_xtime(checkpt_tgt) and self.native_xtime(checkpt_tgt) < self.volmark:                  # indexing has been reset since setting the checkpoint                  status = "is invalid"              else: @@ -459,12 +564,12 @@ class GMasterCommon(object):                  if isinstance(xtr, int):                      raise GsyncdError("slave root directory is unaccessible (%s)",                                        os.strerror(xtr)) -                ncompleted = self.xtime_geq(xtr, tgt) +                ncompleted = self.xtime_geq(xtr, checkpt_tgt)                  if completed and not ncompleted: # stale data                      logging.warn("completion time %s for checkpoint %s became stale" % \                                   (self.humantime(*completed), chkpt))                      completed = None -                    gconf.confdata.delete('checkpoint-completed') +                    gconf.configinterface.delete('checkpoint_completed')                  if ncompleted and not completed: # just reaching completion                      completed = "%.6f" % time.time()                      self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False) @@ -478,7 +583,7 @@ class GMasterCommon(object):                  try:                      conn, _ = chan.accept()                      try: -                        conn.send("  | checkpoint %s %s %s" % (chkpt, status, self.get_extra_info())) +                        conn.send("checkpoint %s is %s\0" % (chkpt_lbl, status))                      except:                          exc = sys.exc_info()[1]                          if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ @@ -505,18 +610,8 @@ class GMasterCommon(object):                  pass          chan.bind(state_socket)          chan.listen(1) -        checkpt_tgt = None -        if gconf.checkpoint: -            checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target') -            if not checkpt_tgt: -                checkpt_tgt = self.xtime('.') -                if isinstance(checkpt_tgt, int): -                    raise GsyncdError("master root directory is unaccessible (%s)", -                                      os.strerror(checkpt_tgt)) -                self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt) -            logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ -                          (repr(checkpt_tgt), gconf.checkpoint)) -        t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt)) +        chkpt = gconf.configinterface.get_realtime("checkpoint") +        t = Thread(target=self.checkpt_service, args=(chan, chkpt))          t.start()          self.checkpoint_thread = t @@ -567,15 +662,11 @@ class GMasterChangelogMixin(GMasterCommon):      POS_GFID   = 0      POS_TYPE   = 1 -    POS_ENTRY1 = 2 -    POS_ENTRY2 = 3  # renames - -    _CL_TYPE_DATA_PFX     = "D " -    _CL_TYPE_METADATA_PFX = "M " -    _CL_TYPE_ENTRY_PFX    = "E " +    POS_ENTRY1 = -1 -    TYPE_GFID  = [_CL_TYPE_DATA_PFX] # ignoring metadata ops -    TYPE_ENTRY = [_CL_TYPE_ENTRY_PFX] +    TYPE_META  = "M " +    TYPE_GFID  = "D " +    TYPE_ENTRY = "E "      # flat directory heirarchy for gfid based access      FLAT_DIR_HIERARCHY = '.' @@ -594,39 +685,11 @@ class GMasterChangelogMixin(GMasterCommon):          logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile))          return (workdir, logfile) -    # update stats from *this* crawl -    def update_cumulative_stats(self, files_pending): -        self.total_crawl_stats['files_remaining']  = files_pending['count'] -        self.total_crawl_stats['bytes_remaining']  = files_pending['bytes'] -        self.total_crawl_stats['purges_remaining'] = files_pending['purge'] - -    # sync data -    def syncdata(self, datas): -        logging.debug('datas: %s' % (datas)) -        for data in datas: -            logging.debug('candidate for syncing %s' % data) -            pb = self.syncer.add(data) -            def regjob(se, xte, pb): -                rv = pb.wait() -                if rv[0]: -                    logging.debug('synced ' + se) -                    return True -                else: -                    if rv[1] in [23, 24]: -                        # stat to check if the file exist -                        st = lstat(se) -                        if isinstance(st, int): -                            # file got unlinked in the interim -                            return True -                    logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) -            self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb) -        if self.wait(self.FLAT_DIR_HIERARCHY, None): -            return True -      def process_change(self, change, done, retry):          pfx = gauxpfx()          clist   = []          entries = [] +        meta_gfid = set()          datas = set()          # basic crawl stats: files and bytes @@ -652,136 +715,351 @@ class GMasterChangelogMixin(GMasterCommon):                      dct[k] = ed[k]              return dct -        # regular file update: bytes & count -        def _update_reg(entry, size): -            if not entry in files_pending['files']: -                files_pending['count'] += 1 -                files_pending['bytes'] += size -                files_pending['files'].append(entry) -        # updates for directories, symlinks etc.. -        def _update_rest(): +        # entry counts (not purges) +        def entry_update():              files_pending['count'] += 1 -        # entry count -        def entry_update(entry, size, mode): -            if stat.S_ISREG(mode): -                _update_reg(entry, size) -            else: -                _update_rest()          # purge count          def purge_update():              files_pending['purge'] += 1          for e in clist:              e = e.strip() -            et = e[self.IDX_START:self.IDX_END] -            ec = e[self.IDX_END:].split(' ') -            if et in self.TYPE_ENTRY: +            et = e[self.IDX_START:self.IDX_END]   # entry type +            ec = e[self.IDX_END:].split(' ')      # rest of the bits + +            if et == self.TYPE_ENTRY: +                # extract information according to the type of +                # the entry operation. create(), mkdir() and mknod() +                # have mode, uid, gid information in the changelog +                # itself, so no need to stat()...                  ty = ec[self.POS_TYPE] + +                # PARGFID/BNAME                  en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1])) +                # GFID of the entry                  gfid = ec[self.POS_GFID] -                # definitely need a better way bucketize entry ops +                  if ty in ['UNLINK', 'RMDIR']:                      purge_update()                      entries.append(edct(ty, gfid=gfid, entry=en)) -                    continue -                go = os.path.join(pfx, gfid) -                st = lstat(go) -                if isinstance(st, int): -		    if ty == 'RENAME': -                        entries.append(edct('UNLINK', gfid=gfid, entry=en)) -		    else: -                        logging.debug('file %s got purged in the interim' % go) -                    continue -                entry_update(go, st.st_size, st.st_mode) -                if ty in ['CREATE', 'MKDIR', 'MKNOD']: -                    entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) -                elif ty == 'LINK': -                    entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) -                elif ty == 'SYMLINK': -                    rl = errno_wrap(os.readlink, [en], [ENOENT]) -                    if isinstance(rl, int): -                        continue -                    entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) -                elif ty == 'RENAME': -                    e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2])) -                    entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2, stat=st)) +                elif ty in ['CREATE', 'MKDIR', 'MKNOD']: +                    entry_update() +                    # stat information present in the changelog itself +                    entries.append(edct(ty, gfid=gfid, entry=en, mode=int(ec[2]),\ +                                        uid=int(ec[3]), gid=int(ec[4])))                  else: -                    logging.warn('ignoring %s [op %s]' % (gfid, ty)) -            elif et in self.TYPE_GFID: -                go = os.path.join(pfx, ec[0]) -                st = lstat(go) -                if isinstance(st, int): -                    logging.debug('file %s got purged in the interim' % go) -                    continue -                entry_update(go, st.st_size, st.st_mode) -                datas.update([go]) +                    # stat() to get mode and other information +                    go = os.path.join(pfx, gfid) +                    st = lstat(go) +                    if isinstance(st, int): +                        if ty == 'RENAME': # special hack for renames... +                            entries.append(edct('UNLINK', gfid=gfid, entry=en)) +                        else: +                            logging.debug('file %s got purged in the interim' % go) +                        continue + +                    if ty == 'LINK': +                        entry_update() +                        entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) +                    elif ty == 'SYMLINK': +                        rl = errno_wrap(os.readlink, [en], [ENOENT]) +                        if isinstance(rl, int): +                            continue +                        entry_update() +                        entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) +                    elif ty == 'RENAME': +                        entry_update() +                        e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) +                        entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st)) +                    else: +                        logging.warn('ignoring %s [op %s]' % (gfid, ty)) +            elif et == self.TYPE_GFID: +                datas.add(os.path.join(pfx, ec[0])) +            elif et == self.TYPE_META: +                if ec[1] == 'SETATTR': # only setattr's for now... +                    meta_gfid.add(os.path.join(pfx, ec[0])) +            else: +                logging.warn('got invalid changelog type: %s' % (et))          logging.debug('entries: %s' % repr(entries))          if not retry: -            self.update_cumulative_stats(files_pending) +            self.update_worker_cumilitive_status(files_pending)          # sync namespace          if (entries):              self.slave.server.entry_ops(entries) +        # sync metadata +        if (meta_gfid): +            meta_entries = [] +            for go in meta_gfid: +                st = lstat(go) +                if isinstance(st, int): +                    logging.debug('file %s got purged in the interim' % go) +                    continue +                meta_entries.append(edct('META', go=go, stat=st)) +            if meta_entries: +                self.slave.server.meta_ops(meta_entries)          # sync data -        if self.syncdata(datas): -            if done: -                self.master.server.changelog_done(change) -            return True - -    def sync_done(self): -        self.total_crawl_stats['files_syncd'] += self.total_crawl_stats['files_remaining'] -        self.total_crawl_stats['files_remaining']  = 0 -        self.total_crawl_stats['bytes_remaining']  = 0 -        self.total_crawl_stats['purges_remaining'] = 0 -        self.update_crawl_data() +        if datas: +            self.a_syncdata(datas)      def process(self, changes, done=1): -        for change in changes: -            tries = 0 -            retry = False -            while True: -                logging.debug('processing change %s' % change) -                if self.process_change(change, done, retry): -                    self.sync_done() -                    break -                retry = True -                tries += 1 -                if tries == self.MAX_RETRIES: -                    logging.warn('changelog %s could not be processed - moving on...' % os.path.basename(change)) -                    self.sync_done() -                    if done: -                        self.master.server.changelog_done(change) -                    break -                # it's either entry_ops() or Rsync that failed to do it's -                # job. Mostly it's entry_ops() [which currently has a problem -                # of failing to create an entry but failing to return an errno] -                # Therefore we do not know if it's either Rsync or the freaking -                # entry_ops() that failed... so we retry the _whole_ changelog -                # again. -                # TODO: remove entry retries when it's gets fixed. -                logging.warn('incomplete sync, retrying changelog: %s' % change) -                time.sleep(0.5) -            self.turns += 1 +        tries = 0 +        retry = False -    def upd_stime(self, stime): +        while True: +            self.skipped_gfid_list = [] +            self.current_files_skipped_count = 0 + +            # first, fire all changelog transfers in parallel. entry and metadata +            # are performed synchronously, therefore in serial. However at the end +            # of each changelog, data is synchronized with syncdata_async() - which +            # means it is serial w.r.t entries/metadata of that changelog but +            # happens in parallel with data of other changelogs. + +            for change in changes: +                logging.debug('processing change %s' % change) +                self.process_change(change, done, retry) +                if not retry: +                    self.turns += 1 # number of changelogs processed in the batch + +            # Now we wait for all the data transfers fired off in the above step +            # to complete. Note that this is not ideal either. Ideally we want to +            # trigger the entry/meta-data transfer of the next batch while waiting +            # for the data transfer of the current batch to finish. + +            # Note that the reason to wait for the data transfer (vs doing it +            # completely in the background and call the changelog_done() +            # asynchronously) is because this waiting acts as a "backpressure" +            # and prevents a spiraling increase of wait stubs from consuming +            # unbounded memory and resources. + +            # update the slave's time with the timestamp of the _last_ changelog +            # file time suffix. Since, the changelog prefix time is the time when +            # the changelog was rolled over, introduce a tolerence of 1 second to +            # counter the small delta b/w the marker update and gettimeofday(). +            # NOTE: this is only for changelog mode, not xsync. + +            # @change is the last changelog (therefore max time for this batch) +            if self.syncdata_wait(): +                if done: +                    xtl = (int(change.split('.')[-1]) - 1, 0) +                    self.upd_stime(xtl) +                    map(self.master.server.changelog_done, changes) +                self.update_worker_files_syncd() +                break + +            # We do not know which changelog transfer failed, retry everything. +            retry = True +            tries += 1 +            if tries == self.MAX_RETRIES: +                logging.warn('changelogs %s could not be processed - moving on...' % \ +                             ' '.join(map(os.path.basename, changes))) +                self.update_worker_total_files_skipped(self.current_files_skipped_count) +                logging.warn('SKIPPED GFID = %s' % ','.join(self.skipped_gfid_list)) +                self.update_worker_files_syncd() +                if done: +                    xtl = (int(change.split('.')[-1]) - 1, 0) +                    self.upd_stime(xtl) +                    map(self.master.server.changelog_done, changes) +                break +            # it's either entry_ops() or Rsync that failed to do it's +            # job. Mostly it's entry_ops() [which currently has a problem +            # of failing to create an entry but failing to return an errno] +            # Therefore we do not know if it's either Rsync or the freaking +            # entry_ops() that failed... so we retry the _whole_ changelog +            # again. +            # TODO: remove entry retries when it's gets fixed. +            logging.warn('incomplete sync, retrying changelogs: %s' % \ +                         ' '.join(map(os.path.basename, changes))) +            time.sleep(0.5) + +    def upd_stime(self, stime, path=None): +        if not path: +            path = self.FLAT_DIR_HIERARCHY          if not stime == URXTIME: -            self.sendmark(self.FLAT_DIR_HIERARCHY, stime) +            self.sendmark(path, stime) + +    def get_worker_status_file(self): +        file_name = gconf.local_path+'.status' +        file_name = file_name.replace("/", "_") +        worker_status_file = gconf.georep_session_working_dir+file_name +        return worker_status_file + +    def update_worker_status(self, key, value): +        default_data = {"remote_node":"N/A", +                        "worker status":"Not Started", +                        "crawl status":"N/A", +                        "files_syncd": 0, +                        "files_remaining": 0, +                        "bytes_remaining": 0, +                        "purges_remaining": 0, +                        "total_files_skipped": 0} +        worker_status_file = self.get_worker_status_file() +        try: +            with open(worker_status_file, 'r+') as f: +                loaded_data = json.load(f) +                loaded_data[key] = value +                os.ftruncate(f.fileno(), 0) +                os.lseek(f.fileno(), 0, os.SEEK_SET) +                json.dump(loaded_data, f) +                f.flush() +                os.fsync(f.fileno()) +        except (IOError, OSError, ValueError): +            logging.info ('Creating new %s' % worker_status_file) +            try: +                with open(worker_status_file, 'wb') as f: +                    default_data[key] = value +                    json.dump(default_data, f) +                    f.flush() +                    os.fsync(f.fileno()) +            except: +                raise + +    def update_worker_cumilitive_status(self, files_pending): +        default_data = {"remote_node":"N/A", +                        "worker status":"Not Started", +                        "crawl status":"N/A", +                        "files_syncd": 0, +                        "files_remaining": 0, +                        "bytes_remaining": 0, +                        "purges_remaining": 0, +                        "total_files_skipped": 0} +        worker_status_file = self.get_worker_status_file() +        try: +            with open(worker_status_file, 'r+') as f: +                loaded_data = json.load(f) +                loaded_data['files_remaining']  = files_pending['count'] +                loaded_data['bytes_remaining']  = files_pending['bytes'] +                loaded_data['purges_remaining'] = files_pending['purge'] +                os.ftruncate(f.fileno(), 0) +                os.lseek(f.fileno(), 0, os.SEEK_SET) +                json.dump(loaded_data, f) +                f.flush() +                os.fsync(f.fileno()) +        except (IOError, OSError, ValueError): +            logging.info ('Creating new %s' % worker_status_file) +            try: +                with open(worker_status_file, 'wb') as f: +                    default_data['files_remaining']  = files_pending['count'] +                    default_data['bytes_remaining']  = files_pending['bytes'] +                    default_data['purges_remaining'] = files_pending['purge'] +                    json.dump(default_data, f) +                    f.flush() +                    os.fsync(f.fileno()) +            except: +                raise + +    def update_worker_remote_node (self): +        node = sys.argv[-1] +        node = node.split("@")[-1] +        remote_node_ip = node.split(":")[0] +        remote_node_vol = node.split(":")[3] +        remote_node = remote_node_ip + '::' + remote_node_vol +        self.update_worker_status ('remote_node', remote_node) + +    def update_worker_health (self, state): +        self.update_worker_status ('worker status', state) + +    def update_worker_crawl_status (self, state): +        self.update_worker_status ('crawl status', state) + +    def update_worker_files_syncd (self): +        default_data = {"remote_node":"N/A", +                        "worker status":"Not Started", +                        "crawl status":"N/A", +                        "files_syncd": 0, +                        "files_remaining": 0, +                        "bytes_remaining": 0, +                        "purges_remaining": 0, +                        "total_files_skipped": 0} +        worker_status_file = self.get_worker_status_file() +        try: +            with open(worker_status_file, 'r+') as f: +                loaded_data = json.load(f) +                loaded_data['files_syncd'] += loaded_data['files_remaining'] +                loaded_data['files_remaining']  = 0 +                loaded_data['bytes_remaining']  = 0 +                loaded_data['purges_remaining'] = 0 +                os.ftruncate(f.fileno(), 0) +                os.lseek(f.fileno(), 0, os.SEEK_SET) +                json.dump(loaded_data, f) +                f.flush() +                os.fsync(f.fileno()) +        except (IOError, OSError, ValueError): +            logging.info ('Creating new %s' % worker_status_file) +            try: +                with open(worker_status_file, 'wb') as f: +                    json.dump(default_data, f) +                    f.flush() +                    os.fsync(f.fileno()) +            except: +                raise + +    def update_worker_files_remaining (self, state): +        self.update_worker_status ('files_remaining', state) + +    def update_worker_bytes_remaining (self, state): +        self.update_worker_status ('bytes_remaining', state) + +    def update_worker_purges_remaining (self, state): +        self.update_worker_status ('purges_remaining', state) + +    def update_worker_total_files_skipped (self, value): +        default_data = {"remote_node":"N/A", +                        "worker status":"Not Started", +                        "crawl status":"N/A", +                        "files_syncd": 0, +                        "files_remaining": 0, +                        "bytes_remaining": 0, +                        "purges_remaining": 0, +                        "total_files_skipped": 0} +        worker_status_file = self.get_worker_status_file() +        try: +            with open(worker_status_file, 'r+') as f: +                loaded_data = json.load(f) +                loaded_data['total_files_skipped'] = value +                loaded_data['files_remaining'] -= value +                os.ftruncate(f.fileno(), 0) +                os.lseek(f.fileno(), 0, os.SEEK_SET) +                json.dump(loaded_data, f) +                f.flush() +                os.fsync(f.fileno()) +        except (IOError, OSError, ValueError): +            logging.info ('Creating new %s' % worker_status_file) +            try: +                with open(worker_status_file, 'wb') as f: +                    default_data['total_files_skipped'] = value +                    json.dump(default_data, f) +                    f.flush() +                    os.fsync(f.fileno()) +            except: +                raise      def crawl(self): +        self.update_worker_crawl_status("Changelog Crawl")          changes = [] +        # get stime (from the brick) and purge changelogs +        # that are _historical_ to that time. +        purge_time = self.xtime('.', self.slave) +        if isinstance(purge_time, int): +            purge_time = None          try:              self.master.server.changelog_scan()              self.crawls += 1          except OSError:              self.fallback_xsync() +            self.update_worker_crawl_status("Hybrid Crawl")          changes = self.master.server.changelog_getchanges()          if changes: -            xtl = self.xtime(self.FLAT_DIR_HIERARCHY) -            if isinstance(xtl, int): -                raise GsyncdError('master is corrupt') +            if purge_time: +                logging.info("slave's time: %s" % repr(purge_time)) +                processed = [x for x in changes if int(x.split('.')[-1]) < purge_time[0]] +                for pr in processed: +                    logging.info('skipping already processed change: %s...' % os.path.basename(pr)) +                    self.master.server.changelog_done(pr) +                    changes.remove(pr)              logging.debug('processing changes %s' % repr(changes))              self.process(changes) -            self.upd_stime(xtl)      def register(self):          (workdir, logfile) = self.setup_working_dir() @@ -799,17 +1077,20 @@ class GMasterChangelogMixin(GMasterCommon):  class GMasterXsyncMixin(GMasterChangelogMixin):      """ -      This crawl needs to be xtime based (as of now      it's not. this is beacuse we generate CHANGELOG      file during each crawl which is then processed      by process_change()).      For now it's used as a one-shot initial sync      mechanism and only syncs directories, regular -    files and symlinks. +    files, hardlinks and symlinks.      """ +    XSYNC_MAX_ENTRIES = 1<<13 +      def register(self): +        self.counter = 0 +        self.comlist = []          self.sleep_interval = 60          self.tempdir = self.setup_working_dir()[0]          self.tempdir = os.path.join(self.tempdir, 'xsync') @@ -823,6 +1104,36 @@ class GMasterXsyncMixin(GMasterChangelogMixin):              else:                  raise +    def crawl(self): +        """ +        event dispatcher thread + +        this thread dispatches either changelog or synchronizes stime. +        additionally terminates itself on recieving a 'finale' event +        """ +        def Xsyncer(): +            self.Xcrawl() +        t = Thread(target=Xsyncer) +        t.start() +        logging.info('starting hybrid crawl...') +        self.update_worker_crawl_status("Hybrid Crawl") +        while True: +            try: +                item = self.comlist.pop(0) +                if item[0] == 'finale': +                    logging.info('finished hybrid crawl syncing') +                    break +                elif item[0] == 'xsync': +                    logging.info('processing xsync changelog %s' % (item[1])) +                    self.process([item[1]], 0) +                elif item[0] == 'stime': +                    logging.debug('setting slave time: %s' % repr(item[1])) +                    self.upd_stime(item[1][1], item[1][0]) +                else: +                    logging.warn('unknown tuple in comlist (%s)' % repr(item)) +            except IndexError: +                time.sleep(1) +      def write_entry_change(self, prefix, data=[]):          self.fh.write("%s %s\n" % (prefix, ' '.join(data))) @@ -839,24 +1150,61 @@ class GMasterXsyncMixin(GMasterChangelogMixin):      def fname(self):          return self.xsync_change -    def crawl(self, path='.', xtr=None, done=0): -        """ generate a CHANGELOG file consumable by process_change """ +    def put(self, mark, item): +        self.comlist.append((mark, item)) + +    def sync_xsync(self, last): +        """schedule a processing of changelog""" +        self.close() +        self.put('xsync', self.fname()) +        self.counter = 0 +        if not last: +            time.sleep(1) # make sure changelogs are 1 second apart +            self.open() + +    def sync_stime(self, stime=None, last=False): +        """schedule a stime synchronization""" +        if stime: +            self.put('stime', stime) +        if last: +            self.put('finale', None) + +    def sync_done(self, stime=None, last=False): +        self.sync_xsync(last) +        if stime: +            self.sync_stime(stime, last) + +    def Xcrawl(self, path='.', xtr_root=None): +        """ +        generate a CHANGELOG file consumable by process_change. + +        slave's xtime (stime) is _cached_ for comparisons across +        the filesystem tree, but set after directory synchronization. +        """          if path == '.':              self.open()              self.crawls += 1 -        if not xtr: +        if not xtr_root:              # get the root stime and use it for all comparisons -            xtr = self.xtime('.', self.slave) -            if isinstance(xtr, int): -                if xtr != ENOENT: -                    raise GsyncdError('slave is corrupt') -                xtr = self.minus_infinity +            xtr_root = self.xtime('.', self.slave) +            if isinstance(xtr_root, int): +                if xtr_root != ENOENT: +                    logging.warn("slave cluster not returning the " \ +                                 "correct xtime for root (%d)" % xtr_root) +                xtr_root = self.minus_infinity          xtl = self.xtime(path)          if isinstance(xtl, int): -            raise GsyncdError('master is corrupt') -        if xtr == xtl: +            logging.warn("master cluster's xtime not found") +        xtr = self.xtime(path, self.slave) +        if isinstance(xtr, int): +            if xtr != ENOENT: +                logging.warn("slave cluster not returning the " \ +                             "correct xtime for %s (%d)" % (path, xtr)) +            xtr = self.minus_infinity +        xtr = max(xtr, xtr_root) +        if not self.need_sync(path, xtl, xtr):              if path == '.': -                self.close() +                self.sync_done((path, xtl), True)              return          self.xtime_reversion_hook(path, xtl, xtr)          logging.debug("entering " + path) @@ -867,43 +1215,42 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          for e in dem:              bname = e              e = os.path.join(path, e) -            st = lstat(e) +            xte = self.xtime(e) +            if isinstance(xte, int): +                logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) +                continue +            if not self.need_sync(e, xte, xtr): +                continue +            st = self.master.server.lstat(e)              if isinstance(st, int): -                logging.warn('%s got purged in the interim..' % e) +                logging.warn('%s got purged in the interim ...' % e)                  continue              gfid = self.master.server.gfid(e)              if isinstance(gfid, int): -                logging.warn('skipping entry %s..' % (e)) -                continue -            xte = self.xtime(e) -            if isinstance(xte, int): -                raise GsyncdError('master is corrupt') -            if not self.need_sync(e, xte, xtr): +                logging.warn('skipping entry %s..' % e)                  continue              mo = st.st_mode +            self.counter += 1 +            if self.counter == self.XSYNC_MAX_ENTRIES: +                self.sync_done()              if stat.S_ISDIR(mo): -                self.write_entry_change("E", [gfid, 'MKDIR', escape(os.path.join(pargfid, bname))]) -                self.crawl(e, xtr) +                self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))]) +                self.Xcrawl(e, xtr_root) +                self.sync_done((e, xte), False)              elif stat.S_ISLNK(mo): -                rl = errno_wrap(os.readlink, [en], [ENOENT]) -                if isinstance(rl, int): -                    continue -                self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname)), rl]) -            else: +                self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))]) +            elif stat.S_ISREG(mo): +                nlink = st.st_nlink +                nlink -= 1 # fixup backend stat link count                  # if a file has a hardlink, create a Changelog entry as 'LINK' so the slave                  # side will decide if to create the new entry, or to create link. -                if st.st_nlink == 1: -                    self.write_entry_change("E", [gfid, 'MKNOD', escape(os.path.join(pargfid, bname))]) +                if nlink == 1: +                    self.write_entry_change("E", [gfid, 'MKNOD', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))])                  else:                      self.write_entry_change("E", [gfid, 'LINK', escape(os.path.join(pargfid, bname))]) -                if stat.S_ISREG(mo): -                    self.write_entry_change("D", [gfid]) - +                self.write_entry_change("D", [gfid])          if path == '.': -            logging.info('processing xsync changelog %s' % self.fname()) -            self.close() -            self.process([self.fname()], done) -            self.upd_stime(xtl) +            self.sync_done((path, xtl), True)  class BoxClosedErr(Exception):      pass @@ -979,12 +1326,13 @@ class Syncer(object):      each completed syncjob.      """ -    def __init__(self, slave): +    def __init__(self, slave, sync_engine, resilient_errnos=[]):          """spawn worker threads"""          self.slave = slave          self.lock = Lock()          self.pb = PostBox() -        self.bytes_synced = 0 +        self.sync_engine = sync_engine +        self.errnos_ok = resilient_errnos          for i in range(int(gconf.sync_jobs)):              t = Thread(target=self.syncjob)              t.start() @@ -1002,11 +1350,10 @@ class Syncer(object):                      break                  time.sleep(0.5)              pb.close() -            po = self.slave.rsync(pb) +            po = self.sync_engine(pb)              if po.returncode == 0:                  ret = (True, 0) -            elif po.returncode in (23, 24): -                # partial transfer (cf. rsync(1)), that's normal +            elif po.returncode in self.errnos_ok:                  ret = (False, po.returncode)              else:                  po.errfail() diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index faf62f868..8deb5114b 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -265,6 +265,9 @@ class Server(object):      FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT      GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' +    GFID_XATTR = 'trusted.gfid'  # for backend gfid fetch, do not use GX_NSPACE_PFX +    GFID_FMTSTR = "!" + "B"*16 +      local_path = ''      @classmethod @@ -305,6 +308,38 @@ class Server(object):              raise OSError(ENOTDIR, os.strerror(ENOTDIR))          return os.listdir(path) + +    @classmethod +    @_pathguard +    def lstat(cls, path): +        try: +            return os.lstat(path) +        except (IOError, OSError): +            ex = sys.exc_info()[1] +            if ex.errno == ENOENT: +                return ex.errno +            else: +                raise + + +    @classmethod +    @_pathguard +    def gfid(cls, path): +        try: +            buf = Xattr.lgetxattr(path, cls.GFID_XATTR, 16) +            m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)])) +            return '-'.join(m.groups()) +        except (IOError, OSError): +            ex = sys.exc_info()[1] +            if ex.errno == ENOENT: +                return ex.errno +            else: +                raise + +    @classmethod +    def gfid_mnt(cls, gfidpath): +        return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT]) +      @classmethod      @_pathguard      def purge(cls, path, entries=None): @@ -397,8 +432,42 @@ class Server(object):                  raise      @classmethod -    def gfid(cls, gfidpath): -        return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT]) +    @_pathguard +    def stime_mnt(cls, path, uuid): +        """query xtime extended attribute + +        Return xtime of @path for @uuid as a pair of integers. +        "Normal" errors due to non-existent @path or extended attribute +        are tolerated and errno is returned in such a case. +        """ + +        try: +            return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8)) +        except OSError: +            ex = sys.exc_info()[1] +            if ex.errno in (ENOENT, ENODATA, ENOTDIR): +                return ex.errno +            else: +                raise + +    @classmethod +    @_pathguard +    def stime(cls, path, uuid): +        """query xtime extended attribute + +        Return xtime of @path for @uuid as a pair of integers. +        "Normal" errors due to non-existent @path or extended attribute +        are tolerated and errno is returned in such a case. +        """ + +        try: +            return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8)) +        except OSError: +            ex = sys.exc_info()[1] +            if ex.errno in (ENOENT, ENODATA, ENOTDIR): +                return ex.errno +            else: +                raise      @classmethod      def node_uuid(cls, path='.'): @@ -409,21 +478,10 @@ class Server(object):              raise      @classmethod -    def xtime_vec(cls, path, *uuids): -        """vectored version of @xtime - -        accepts a list of uuids and returns a dictionary -        with uuid as key(s) and xtime as value(s) -        """ -        xt = {} -        for uuid in uuids: -            xtu = cls.xtime(path, uuid) -            if xtu == ENODATA: -                xtu = None -            if isinstance(xtu, int): -                return xtu -            xt[uuid] = xtu -        return xt +    @_pathguard +    def set_stime(cls, path, uuid, mark): +        """set @mark as stime for @uuid on @path""" +        Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), struct.pack('!II', *mark))      @classmethod      @_pathguard @@ -444,20 +502,16 @@ class Server(object):          Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))      @classmethod -    def set_xtime_vec(cls, path, mark_dct): -        """vectored (or dictered) version of set_xtime - -        ignore values that match @ignore -        """ -        for u,t in mark_dct.items(): -            cls.set_xtime(path, u, t) - -    @classmethod      def entry_ops(cls, entries):          pfx = gauxpfx()          logging.debug('entries: %s' % repr(entries))          # regular file -        def entry_pack_reg(gf, bn, st): +        def entry_pack_reg(gf, bn, mo, uid, gid): +            blen = len(bn) +            return struct.pack(cls._fmt_mknod(blen), +                               uid, gid, gf, mo, bn, +                               stat.S_IMODE(mo), 0, umask()) +        def entry_pack_reg_stat(gf, bn, st):              blen = len(bn)              mo = st['mode']              return struct.pack(cls._fmt_mknod(blen), @@ -465,12 +519,10 @@ class Server(object):                                 gf, mo, bn,                                 stat.S_IMODE(mo), 0, umask())          # mkdir -        def entry_pack_mkdir(gf, bn, st): +        def entry_pack_mkdir(gf, bn, mo, uid, gid):              blen = len(bn) -            mo = st['mode']              return struct.pack(cls._fmt_mkdir(blen), -                               st['uid'], st['gid'], -                               gf, mo, bn, +                               uid, gid, gf, mo, bn,                                 stat.S_IMODE(mo), umask())          #symlink          def entry_pack_symlink(gf, bn, lnk, st): @@ -485,7 +537,7 @@ class Server(object):              # to be purged is the GFID gotten from the changelog.              # (a stat(changelog_gfid) would also be valid here)              # The race here is between the GFID check and the purge. -            disk_gfid = cls.gfid(entry) +            disk_gfid = cls.gfid_mnt(entry)              if isinstance(disk_gfid, int):                  return              if not gfid == disk_gfid: @@ -510,15 +562,15 @@ class Server(object):                      else:                          break              elif op in ['CREATE', 'MKNOD']: -                blob = entry_pack_reg(gfid, bname, e['stat']) +                blob = entry_pack_reg(gfid, bname, e['mode'], e['uid'], e['uid'])              elif op == 'MKDIR': -                blob = entry_pack_mkdir(gfid, bname, e['stat']) +                blob = entry_pack_mkdir(gfid, bname, e['mode'], e['uid'], e['uid'])              elif op == 'LINK':                  slink = os.path.join(pfx, gfid)                  st = lstat(slink)                  if isinstance(st, int):                      (pg, bname) = entry2pb(entry) -                    blob = entry_pack_reg(gfid, bname, e['stat']) +                    blob = entry_pack_reg_stat(gfid, bname, e['stat'])                  else:                      errno_wrap(os.link, [slink, entry], [ENOENT, EEXIST])              elif op == 'SYMLINK': @@ -528,13 +580,24 @@ class Server(object):                  st = lstat(entry)                  if isinstance(st, int):                      (pg, bname) = entry2pb(en) -                    blob = entry_pack_reg(gfid, bname, e['stat']) +                    blob = entry_pack_reg_stat(gfid, bname, e['stat'])                  else:                      errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])              if blob:                  errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [EEXIST], [ENOENT, ESTALE, EINVAL])      @classmethod +    def meta_ops(cls, meta_entries): +        logging.debug('Meta-entries: %s' % repr(meta_entries)) +        for e in meta_entries: +            mode = e['stat']['mode'] +            uid  = e['stat']['uid'] +            gid  = e['stat']['gid'] +            go   = e['go'] +            errno_wrap(os.chmod, [go, mode], [ENOENT], [ESTALE, EINVAL]) +            errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL]) + +    @classmethod      def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0):          Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) @@ -699,6 +762,29 @@ class SlaveRemote(object):          return po +    def tarssh(self, files, slaveurl): +        """invoke tar+ssh +        -z (compress) can be use if needed, but ommitting it now +        as it results in wierd error (tar+ssh errors out (errcode: 2) +        """ +        if not files: +            raise GsyncdError("no files to sync") +        logging.debug("files: " + ", ".join(files)) +        (host, rdir) = slaveurl.split(':') +        tar_cmd = ["tar", "-cf", "-", "--files-from", "-"] +        ssh_cmd = gconf.ssh_command_tar.split() +  [host, "tar", "--overwrite", "-xf", "-", "-C", rdir] +        p0 = Popen(tar_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) +        p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE) +        for f in files: +            p0.stdin.write(f) +            p0.stdin.write('\n') +        p0.stdin.close() +        p0.wait() + +        p1.wait() +        p1.terminate_geterr(fail_on_err = False) + +        return p1  class AbstractUrl(object):      """abstract base class for url scheme classes""" @@ -1041,12 +1127,20 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                              except ValueError:                                  pass                          return e +                    @classmethod +                    def lstat(cls, e): +                        """ path based backend stat """ +                        return super(brickserver, cls).lstat(e) +                    @classmethod +                    def gfid(cls, e): +                        """ path based backend gfid fetch """ +                        return super(brickserver, cls).gfid(e)                  if gconf.slave_id:                      # define {,set_}xtime in slave, thus preempting                      # the call to remote, so that it takes data from                      # the local brick -                    slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server) -                    slave.server.set_xtime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_xtime(path, uuid + '.' + gconf.slave_id, mark), slave.server) +                    slave.server.stime = types.MethodType(lambda _self, path, uuid: brickserver.stime(path, uuid + '.' + gconf.slave_id), slave.server) +                    slave.server.set_stime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_stime(path, uuid + '.' + gconf.slave_id, mark), slave.server)                  (g1, g2) = self.gmaster_instantiate_tuple(slave)                  g1.master.server = brickserver                  g2.master.server = brickserver @@ -1067,6 +1161,9 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):      def rsync(self, files):          return sup(self, files, self.slavedir) +    def tarssh(self, files): +        return sup(self, files, self.slavedir) +  class SSH(AbstractUrl, SlaveRemote):      """scheme class for ssh:// urls @@ -1170,3 +1267,6 @@ class SSH(AbstractUrl, SlaveRemote):      def rsync(self, files):          return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args),                     *(gconf.rsync_ssh_options.split() + [self.slaveurl])) + +    def tarssh(self, files): +        return sup(self, files, self.slaveurl) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 348eb38c1..1b5684c6d 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -227,7 +227,7 @@ def log_raise_exception(excont):                      logging.warn("!!!!!!!!!!!!!")                      logging.warn('!!! getting "No such file or directory" errors '                                   "is most likely due to MISCONFIGURATION, please consult " -                                 "http://access.redhat.com/knowledge/docs/en-US/Red_Hat_Storage/2.0/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html") +                                 "https://access.redhat.com/site/documentation/en-US/Red_Hat_Storage/2.1/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html")                      logging.warn("!!!!!!!!!!!!!")                  gconf.transport.terminate_geterr()          elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED): | 
