diff options
-rw-r--r-- | xlators/features/marker/utils/syncdaemon/master.py | 94 | ||||
-rw-r--r-- | xlators/features/marker/utils/syncdaemon/resource.py | 95 |
2 files changed, 155 insertions, 34 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py index a275f55fb..dfa7a2e6f 100644 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -1,5 +1,6 @@ import os import sys +import threading import time import stat import signal @@ -15,20 +16,50 @@ URXTIME = (-1, 0) class GMaster(object): def get_volinfo(self): - self.volume_info = self.master.server.volume_info() - if self.volume_info['retval']: - raise RuntimeError("master is corrupt") - return self.volume_info + vol_mark_dict_list = self.master.server.foreign_marks() + return_dict = None + if vol_mark_dict_list: + for i in range(0, len(vol_mark_dict_list)): + present_time = int (time.time()) + if (present_time < vol_mark_dict_list[i]['timeout']): + logging.debug('syncing as intermediate-master with master as %s till: %d (time)' % \ + (vol_mark_dict_list[i]['uuid'], vol_mark_dict_list[i]['timeout'])) + if self.inter_master: + if (self.forgn_uuid != vol_mark_dict_list[i]['uuid']): + raise RuntimeError ('more than one master present') + else: + self.inter_master = True + self.forgn_uuid = vol_mark_dict_list[i]['uuid'] + return_dict = vol_mark_dict_list[i] + else: + logging.debug('an expired master (%s) with time-out: %d, present time: %d' % \ + (vol_mark_dict_list[i]['uuid'], vol_mark_dict_list[i]['timeout'], + present_time)) + if self.inter_master: + self.volume_info = return_dict + if return_dict: + if self.volume_info['retval']: + raise RuntimeError ("master is corrupt") + return self.volume_info + + self.volume_info = self.master.server.native_mark() + logging.debug('returning volume-mark from glusterfs: %s' %(self.volume_info)) + if self.volume_info: + if self.volume_info['retval']: + raise RuntimeError("master is corrupt") + return self.volume_info @property def uuid(self): if not getattr(self, '_uuid', None): - self._uuid = self.volume_info['uuid'] + if self.volume_info: + self._uuid = self.volume_info['uuid'] return self._uuid @property def volmark(self): - return self.volume_info['volume_mark'] + if self.volume_info: + return self.volume_info['volume_mark'] def xtime(self, path, *a, **opts): if a: @@ -36,31 +67,57 @@ class GMaster(object): else: rsc = self.master if not 'create' in opts: - opts['create'] = rsc == self.master + opts['create'] = (rsc == self.master and not self.inter_master) + if not 'default_xtime' in opts: + if self.inter_master: + opts['default_xtime'] = ENODATA + else: + opts['default_xtime'] = URXTIME xt = rsc.server.xtime(path, self.uuid) if isinstance(xt, int) and xt != ENODATA: return xt - if (xt == ENODATA or xt < self.volmark) and opts['create']: + invalid_xtime = (xt == ENODATA or xt < self.volmark) + if invalid_xtime and opts['create']: t = time.time() sec = int(t) nsec = int((t - sec) * 1000000) xt = (sec, nsec) rsc.server.set_xtime(path, self.uuid, xt) - if xt == ENODATA: - xt = URXTIME + if invalid_xtime: + xt = opts['default_xtime'] return xt def __init__(self, master, slave): self.master = master self.slave = slave - self.get_volinfo() self.jobtab = {} self.syncer = Syncer(slave) self.total_turns = int(gconf.turns) self.turns = 0 self.start = None self.change_seen = None - logging.info('master started on ' + self.uuid) + self.forgn_uuid = None + self.orig_master = False + self.inter_master = False + self.get_volinfo() + if self.volume_info: + logging.info('master started on(UUID) : ' + self.uuid) + + #pinger + if gconf.timeout and int(gconf.timeout) > 0: + def pinger(): + while True: + volmark = self.get_volinfo() + if volmark: + volmark['forgn_uuid'] = True + timeout = int (time.time()) + 2 * gconf.timeout + volmark['timeout'] = timeout + + self.slave.server.ping(volmark) + time.sleep(int(gconf.timeout) * 0.5) + t = threading.Thread(target=pinger) + t.setDaemon(True) + t.start() while True: self.crawl() @@ -95,10 +152,15 @@ class GMaster(object): logging.info("crawl took %.6f" % (time.time() - self.start)) time.sleep(1) self.start = time.time() - logging.info("crawling...") - self.get_volinfo() - if self.volume_info['uuid'] != self.uuid: - raise RuntimeError("master uuid mismatch") + volinfo = self.get_volinfo() + if volinfo: + if volinfo['uuid'] != self.uuid: + raise RuntimeError("master uuid mismatch") + logging.info("Crawling as %s (%s master mode) ..." % \ + (self.uuid,self.inter_master and "intermediate" or "primary")) + else: + logging.info("Crawling: waiting for valid key for %s" % self.uuid) + return logging.debug("entering " + path) if not xtl: xtl = self.xtime(path) diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py index ad0d98bad..8556e4246 100644 --- a/xlators/features/marker/utils/syncdaemon/resource.py +++ b/xlators/features/marker/utils/syncdaemon/resource.py @@ -10,6 +10,7 @@ import socket import logging import tempfile import threading +import time from ctypes import * from ctypes.util import find_library from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR @@ -67,12 +68,12 @@ class Xattr(object): raise OSError(errn, os.strerror(errn)) @classmethod - def lgetxattr(cls, path, attr, siz=0): + def _query_xattr(cls, path, siz, syscall, *a): if siz: buf = create_string_buffer('\0' * siz) else: buf = None - ret = cls.libc.lgetxattr(path, attr, buf, siz) + ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz))) if ret == -1: cls.raise_oserr() if siz: @@ -81,15 +82,37 @@ class Xattr(object): return ret @classmethod + def lgetxattr(cls, path, attr, siz=0): + return cls._query_xattr( path, siz, 'lgetxattr', attr) + + @classmethod + def llistxattr(cls, path, siz=0): + ret = cls._query_xattr(path, siz, 'llistxattr') + if isinstance(ret, str): + ret = ret.split('\0') + return ret + + @classmethod def lsetxattr(cls, path, attr, val): ret = cls.libc.lsetxattr(path, attr, val, len(val), 0) if ret == -1: cls.raise_oserr() + @classmethod + def llistxattr_buf(cls, path): + size = cls.llistxattr(path) + if size == -1: + raise_oserr() + return cls.llistxattr(path, size) + + class Server(object): GX_NSPACE = "trusted.glusterfs" + NTV_FMTSTR = "!" + "B"*19 + "II" + FRGN_XTRA_FMT = "I" + FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT @staticmethod def entries(path): @@ -184,7 +207,16 @@ class Server(object): lastping = 0 @classmethod - def ping(cls): + def ping(cls, dct): + if dct: + key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']]) + val = struct.pack(cls.FRGN_FMTSTR, + *(dct['version'] + + tuple(int(x,16) for x in re.findall('(?:[\da-f]){2}', dct['uuid'])) + + (dct['retval'],) + dct['volume_mark'][0:2] + (dct['timeout'],))) + Xattr.lsetxattr('.', key, val) + else: + logging.info('no volume-mark, if the behaviour persists have to check if master gsyncd is running') cls.lastping += 1 return cls.lastping @@ -243,14 +275,6 @@ class SlaveRemote(object): da1[i][k] = int(v) if da1[0] != da1[1]: raise RuntimeError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv)) - if gconf.timeout and int(gconf.timeout) > 0: - def pinger(): - while True: - self.server.ping() - time.sleep(int(gconf.timeout) * 0.5) - t = threading.Thread(target=pinger) - t.setDaemon(True) - t.start() def rsync(self, files, *args): if not files: @@ -314,16 +338,51 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): class GLUSTERServer(Server): + forgn_mark_size = struct.calcsize(Server.FRGN_FMTSTR) + nativ_mark_size = struct.calcsize(Server.NTV_FMTSTR) + @classmethod - def volume_info(cls): - vm = struct.unpack('!' + 'B'*19 + 'II', - Xattr.lgetxattr('.', '.'.join([cls.GX_NSPACE, 'volume-mark']), 27)) + def attr_unpack_dict(cls, xattr, extra_fields = ''): + fmt_string = cls.NTV_FMTSTR + extra_fields + buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) + vm = struct.unpack(fmt_string, buf) + logging.info("str: %s" % `vm`) m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]])) uuid = '-'.join(m.groups()) - return { 'version': vm[0:2], - 'uuid' : uuid, - 'retval' : vm[18], - 'volume_mark': vm[-2:] } + volinfo = { 'version': vm[0:2], + 'uuid' : uuid, + 'retval' : vm[18], + 'volume_mark': vm[18:20], + } + logging.info("volinfo: %s" % `volinfo`) + if extra_fields: + return volinfo, vm[-len(extra_fields):] + else: + return volinfo + + @classmethod + def foreign_marks(cls): + dict_list = [] + xattr_list = Xattr.llistxattr_buf('.') + for ele in xattr_list: + if (ele.find('trusted.glusterfs.volume-mark') != -1): + #buf = Xattr.lgetxattr('.', ele, cls.forgn_mark_size) + d, x = cls.attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) + d['timeout'] = x[0] + dict_list.append(d) + return dict_list + + @classmethod + def native_mark(cls): + try: + return cls.attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark'])) + except OSError: + ex = sys.exc_info()[1] + if ex.errno == ENODATA: + logging.warn("volume-mark not found") + return + else: + raise RuntimeError("master is corrupt") server = GLUSTERServer |