diff options
| author | Csaba Henk <csaba@gluster.com> | 2011-04-02 19:40:47 +0000 | 
|---|---|---|
| committer | Vijay Bellur <vijay@dev.gluster.com> | 2011-04-04 08:02:18 -0700 | 
| commit | cfb9c834f96dc57c47dac8d27da4266d0dab1f3f (patch) | |
| tree | 7a2ec3fb364030298e5b59f3ba6e380512fe0533 /xlators/features | |
| parent | f007eb1a0701cd3e13e6ba67208cd1db9325a370 (diff) | |
syncdaemon: give some refactoring to cascading code
- expiry check of foreign volinfo moved back to GLUSTERServer,
  so that under the hood we can removexattr the expired ones;
  a nice side-effect is that we can use the same dict layout
  for foreign and native volinfo (ie., foreign needs no
  timeout field)
- get_volinfo() is renamed to get_sys_volinfo() and most of the logic
  is stripped off of it (what remained there is the check
  against foreign master ambiguity)
- volinfo transition logic is cut out to an almost purely functional
  static method (only impurity is the exeption raised upon
  forbidden volinfo change)
- ping renamed to keep-alive, as something called "ping" is
  not supposed to have payload (yeah, keep-alive is a bit fishy
  on this front too, but could not come up with better...)
Signed-off-by: Csaba Henk <csaba@gluster.com>
Signed-off-by: Vijay Bellur <vijay@dev.gluster.com>
BUG: 2535 (gsync cascading)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=2535
Diffstat (limited to 'xlators/features')
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/master.py | 150 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/resource.py | 50 | 
2 files changed, 116 insertions, 84 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py index 28b014e121a..2df1470d5f7 100644 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -15,51 +15,32 @@ URXTIME = (-1, 0)  class GMaster(object): -    def get_volinfo(self): -        vol_mark_dict_list = self.master.server.foreign_marks() -        return_dict = None -        if vol_mark_dict_list: -            for i in range(0, len(vol_mark_dict_list)): -                present_time = int (time.time()) -                if (present_time < vol_mark_dict_list[i]['timeout']): -                    logging.debug('syncing as intermediate-master with master as %s till: %d (time)' % \ -                                  (vol_mark_dict_list[i]['uuid'], vol_mark_dict_list[i]['timeout'])) -                    if self.inter_master: -                        if (self.forgn_uuid != vol_mark_dict_list[i]['uuid']): -                            raise RuntimeError ('more than one master present') -                    else: -                        self.inter_master = True -                        self.forgn_uuid = vol_mark_dict_list[i]['uuid'] -                    return_dict = vol_mark_dict_list[i] -                else: -                    logging.debug('an expired master (%s) with time-out: %d, present time: %d' % \ -                                  (vol_mark_dict_list[i]['uuid'], vol_mark_dict_list[i]['timeout'], -                                    present_time)) -        if self.inter_master: -            self.volume_info = return_dict -            if return_dict: -                if self.volume_info['retval']: -                    raise RuntimeError ("master is corrupt") -            return self.volume_info +    KFGN = 0 +    KNAT = 1 -        self.volume_info =  self.master.server.native_mark() -        logging.debug('returning volume-mark from glusterfs: %s' %(self.volume_info)) -        if self.volume_info: -            if self.volume_info['retval']: -                raise RuntimeError("master is corrupt") -            return self.volume_info +    def get_sys_volinfo(self): +        fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \ +                          self.master.server.native_volume_info() +        fgn_vi = None +        if fgn_vis: +            if len(fgn_vis) > 1: +                raise RuntimeError("cannot work with multiple foreign masters") +            fgn_vi = fgn_vis[0] +        return fgn_vi, nat_vi      @property      def uuid(self): -        if not getattr(self, '_uuid', None): -            if self.volume_info: -                self._uuid = self.volume_info['uuid'] -        return self._uuid +        if self.volinfo: +            return self.volinfo['uuid']      @property      def volmark(self): -        if self.volume_info: -            return self.volume_info['volume_mark'] +        if self.volinfo: +            return self.volinfo['volume_mark'] + +    @property +    def inter_master(self): +        return self.volinfo_state[self.KFGN] and True or False      def xtime(self, path, *a, **opts):          if a: @@ -96,28 +77,36 @@ class GMaster(object):          self.turns = 0          self.start = None          self.change_seen = None -        self.forgn_uuid = None -        self.orig_master = False -        self.inter_master = False -        self.get_volinfo() -        if self.volume_info: -            logging.info('master started on(UUID) : ' + self.uuid) +        # the authorative (foreign, native) volinfo pair +        # which lets us deduce what to do when we refetch +        # the volinfos from system +        self.volinfo_state = (None, None) +        # the actual volinfo we make use of +        self.volinfo = None -        #pinger -        if gconf.timeout and int(gconf.timeout) > 0: -            def pinger(): +        timo = int(gconf.timeout or 0) +        if timo > 0: +            def keep_alive():                  while True: -                    volmark = self.get_volinfo() -                    if volmark: -                        volmark['forgn_uuid'] = True -                        timeout = int (time.time()) + 2 * gconf.timeout -                        volmark['timeout'] = timeout - -                    self.slave.server.ping(volmark) -                    time.sleep(int(gconf.timeout) * 0.5) -        t = threading.Thread(target=pinger) -        t.setDaemon(True) -        t.start() +                    gap = timo * 0.5 +                    # first grab a reference as self.volinfo +                    # can be changed in main thread +                    vi = self.volinfo +                    if vi: +                        # then have a private copy which we can mod +                        vi = vi.copy() +                        vi['timeout'] = int(time.time()) + timo +                    else: +                        # send keep-alives more frequently to +                        # avoid a delay in announcing our volume info +                        # to slave if it becomes established in the +                        # meantime +                        gap = min(10, gap) +                    self.slave.server.keep_alive(vi) +                    time.sleep(gap) +            t = threading.Thread(target=keep_alive) +            t.setDaemon(True) +            t.start()          while True:              self.crawl() @@ -146,20 +135,53 @@ class GMaster(object):              self.slave.server.setattr(path, adct)          self.slave.server.set_xtime(path, self.uuid, mark) +    @staticmethod +    def volinfo_state_machine(volinfo_state, volinfo_sys): +        # store the value below "boxed" to emulate proper closures +        # (variables of the enclosing scope are available inner functions +        # provided they are no reassigned; mutation is OK). +        relax_mismatch = [False] +        def select_vi(vi0, vi): +            if vi and (not vi0 or vi0['uuid'] == vi['uuid']): +                # valid new value found; for the rest, we are graceful about +                # uuid mismatch +                relax_mismatch[0] = True +                return vi +            if vi0 and vi and vi0['uuid'] != vi['uuid'] and not relax_mismatch[0]: +                # uuid mismatch for master candidate, bail out +                raise RuntimeError("aborting on uuid change from %s to %s" % \ +                                   (vi0['uuid'], vi['uuid'])) +            # fall back to old +            return vi0 +        newstate = tuple(select_vi(*vip) for vip in zip(volinfo_state, volinfo_sys)) +        srep = lambda vi: vi and vi['uuid'][0:8] +        logging.debug('(%s, %s) << (%s, %s) -> (%s, %s)' % \ +                      tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate)) +        return newstate +      def crawl(self, path='.', xtl=None):          if path == '.':              if self.start:                  logging.info("crawl took %.6f" % (time.time() - self.start))              time.sleep(1)              self.start = time.time() -            volinfo = self.get_volinfo() -            if volinfo: -                if volinfo['uuid'] != self.uuid: -                    raise RuntimeError("master uuid mismatch") +            volinfo_sys = self.get_sys_volinfo() +            self.volinfo_state = self.volinfo_state_machine(self.volinfo_state, volinfo_sys) +            if self.inter_master: +                self.volinfo = volinfo_sys[self.KFGN] +            else: +                self.volinfo = volinfo_sys[self.KNAT] +            if self.volinfo: +                if self.volinfo['retval']: +                    raise RuntimeError ("master is corrupt")                  logging.info("Crawling as %s (%s master mode) ..." % \ -                             (self.uuid,self.inter_master and "intermediate" or "primary")) +                             (self.uuid, self.inter_master and "intermediate" or "primary"))              else: -                logging.info("Crawling: waiting for valid key for %s" % self.uuid) +                if self.inter_master: +                    logging.info("Crawling: waiting for being synced from %s" % \ +                                 self.volinfo_state[self.KFGN]['uuid']) +                else: +                    logging.info("Crawling: waiting for volume info")                  return          logging.debug("entering " + path)          if not xtl: diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py index bebe5c22b92..7083b56cff4 100644 --- a/xlators/features/marker/utils/syncdaemon/resource.py +++ b/xlators/features/marker/utils/syncdaemon/resource.py @@ -99,6 +99,12 @@ class Xattr(object):              cls.raise_oserr()      @classmethod +    def lremovexattr(cls, path, attr): +        ret = cls.libc.lremovexattr(path, attr) +        if ret == -1: +            cls.raise_oserr() + +    @classmethod      def llistxattr_buf(cls, path):          size = cls.llistxattr(path)          if size  == -1: @@ -106,7 +112,6 @@ class Xattr(object):          return cls.llistxattr(path, size) -  class Server(object):      GX_NSPACE = "trusted.glusterfs" @@ -205,9 +210,9 @@ class Server(object):      def pid():          return os.getpid() -    lastping = 0 +    last_keep_alive = 0      @classmethod -    def ping(cls, dct): +    def keep_alive(cls, dct):          if dct:              key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']])              val = struct.pack(cls.FRGN_FMTSTR, @@ -217,8 +222,8 @@ class Server(object):              Xattr.lsetxattr('.', key, val)          else:              logging.info('no volume-mark, if the behaviour persists have to check if master gsyncd is running') -        cls.lastping += 1 -        return cls.lastping +        cls.last_keep_alive += 1 +        return cls.last_keep_alive      @staticmethod      def version(): @@ -238,9 +243,9 @@ class SlaveLocal(object):          logging.info("slave listening")          if gconf.timeout and int(gconf.timeout) > 0:              while True: -                lp = self.server.lastping +                lp = self.server.last_keep_alive                  time.sleep(int(gconf.timeout)) -                if lp == self.server.lastping: +                if lp == self.server.last_keep_alive:                      logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout))                      break          else: @@ -339,7 +344,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):      class GLUSTERServer(Server):          @classmethod -        def attr_unpack_dict(cls, xattr, extra_fields = ''): +        def _attr_unpack_dict(cls, xattr, extra_fields = ''):              fmt_string = cls.NTV_FMTSTR + extra_fields              buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))              vm = struct.unpack(fmt_string, buf) @@ -356,27 +361,32 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                  return volinfo          @classmethod -        def foreign_marks(cls): +        def foreign_volume_infos(cls):              dict_list = []              xattr_list = Xattr.llistxattr_buf('.')              for ele in xattr_list: -                if ele.find('trusted.glusterfs.volume-mark.') == 0: -                    d, x = cls.attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) -                    d['timeout'] = x[0] -                    dict_list.append(d) +                if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0: +                    d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) +                    now = int(time.time()) +                    if x[0] > now: +                        logging.debug("volinfo[%s] expires: %d (%d sec later)" % \ +                                      (d['uuid'], x[0], x[0] - now)) +                        dict_list.append(d) +                    else: +                        try: +                            Xattr.lremovexattr('.', ele) +                        except OSError: +                            pass              return dict_list          @classmethod -        def native_mark(cls): +        def native_volume_info(cls):              try: -                return cls.attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark'])) +                return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark']))              except OSError:                  ex = sys.exc_info()[1] -                if ex.errno == ENODATA: -                    logging.warn("volume-mark not found") -                    return -                else: -                    raise RuntimeError("master is corrupt") +                if ex.errno != ENODATA: +                    raise      server = GLUSTERServer  | 
