diff options
Diffstat (limited to 'geo-replication')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 12 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 44 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 2 | 
3 files changed, 48 insertions, 10 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index e0a3604071c..e34def6f6ab 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -928,6 +928,12 @@ class GMasterChangelogMixin(GMasterCommon):          def purge_update():              files_pending['purge'] += 1 +        def log_failures(failures, entry_key, gfid_prefix, log_prefix): +            for failure in failures: +                st = lstat(os.path.join(gfid_prefix, failure[0][entry_key])) +                if not isinstance(st, int): +                    logging.warn('%s FAILED: %s' % (log_prefix, repr(failure))) +          for e in clist:              e = e.strip()              et = e[self.IDX_START:self.IDX_END]   # entry type @@ -1030,7 +1036,8 @@ class GMasterChangelogMixin(GMasterCommon):              self.update_worker_cumilitive_status(files_pending)          # sync namespace          if entries: -            self.slave.server.entry_ops(entries) +            failures = self.slave.server.entry_ops(entries) +            log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')          # sync metadata          if meta_gfid:              meta_entries = [] @@ -1044,7 +1051,8 @@ class GMasterChangelogMixin(GMasterCommon):                      continue                  meta_entries.append(edct('META', go=go[0], stat=st))              if meta_entries: -                self.slave.server.meta_ops(meta_entries) +                failures = self.slave.server.meta_ops(meta_entries) +                log_failures(failures, 'go', '', 'META')          # sync data          if datas:              self.a_syncdata(datas) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index fe86044d546..d3d1ee36e01 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -607,6 +607,19 @@ class Server(object):                      er = errno_wrap(os.rmdir, [entry], [ENOENT, ENOTEMPTY])                      if er == ENOTEMPTY:                          return er + +        def collect_failure(e, cmd_ret): +            # We do this for failing fops on Slave +            # Master should be logging this +            if cmd_ret == EEXIST: +                disk_gfid = cls.gfid_mnt(e['entry']) +                if isinstance(disk_gfid, basestring): +                    if e['gfid'] != disk_gfid: +                        failures.append((e, cmd_ret, disk_gfid)) +            else: +                failures.append((e, cmd_ret)) + +        failures = []          for e in entries:              blob = None              op = e['op'] @@ -644,7 +657,10 @@ class Server(object):                      (pg, bname) = entry2pb(entry)                      blob = entry_pack_reg_stat(gfid, bname, e['stat'])                  else: -                    errno_wrap(os.link, [slink, entry], [ENOENT, EEXIST]) +                    cmd_ret = errno_wrap(os.link, +                                         [slink, entry], +                                         [ENOENT, EEXIST]) +                    collect_failure(e, cmd_ret)              elif op == 'SYMLINK':                  blob = entry_pack_symlink(gfid, bname, e['link'], e['stat'])              elif op == 'RENAME': @@ -655,16 +671,22 @@ class Server(object):                          (pg, bname) = entry2pb(en)                          blob = entry_pack_reg_stat(gfid, bname, e['stat'])                  else: -                    errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST]) +                    cmd_ret = errno_wrap(os.rename, +                                         [entry, en], +                                         [ENOENT, EEXIST]) +                    collect_failure(e, cmd_ret)              if blob: -                errno_wrap(Xattr.lsetxattr, -                           [pg, 'glusterfs.gfid.newfile', blob], -                           [EEXIST], -                           [ENOENT, ESTALE, EINVAL]) +                cmd_ret = errno_wrap(Xattr.lsetxattr, +                                     [pg, 'glusterfs.gfid.newfile', blob], +                                     [EEXIST], +                                     [ENOENT, ESTALE, EINVAL]) +                collect_failure(e, cmd_ret) +        return failures      @classmethod      def meta_ops(cls, meta_entries):          logging.debug('Meta-entries: %s' % repr(meta_entries)) +        failures = []          for e in meta_entries:              mode = e['stat']['mode']              uid = e['stat']['uid'] @@ -672,10 +694,18 @@ class Server(object):              atime = e['stat']['atime']              mtime = e['stat']['mtime']              go = e['go'] -            errno_wrap(os.chmod, [go, mode], [ENOENT], [ESTALE, EINVAL]) +            cmd_ret = errno_wrap(os.chmod, [go, mode], +                                 [ENOENT], [ESTALE, EINVAL]) +            # This is a fail fast mechanism +            # We do this for failing fops on Slave +            # Master should be logging this +            if isinstance(cmd_ret, int): +                failures.append((e, cmd_ret)) +                continue              errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL])              errno_wrap(os.utime, [go, (atime, mtime)],                         [ENOENT], [ESTALE, EINVAL]) +        return failures      @classmethod      @_pathguard diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index dbbc31deb2a..2614c828104 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -484,7 +484,7 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):              if nr_tries == GF_OP_RETRIES:                  # probably a screwed state, cannot do much...                  logging.warn('reached maximum retries (%s)...' % repr(arg)) -                return +                return ex.errno              time.sleep(0.250)  # retry the call  | 
