summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKaushik BV <kaushikbv@gluster.com>2011-03-29 09:10:07 +0000
committerVijay Bellur <vijay@dev.gluster.com>2011-03-29 08:46:20 -0700
commit4597929cc527f8abaf9ef9e1d5499ea416e5c7ff (patch)
tree29f96ebe322d250188e4c3692201a2d574dd70ed
parent4c246c02f4ab569fca92255b7efb819243711d6b (diff)
Gsyncd: Cascading of gsync daemons
This patch allows the slave of a gsyncd to be started as the master of another slave gsyncd. Signed-off-by: Kaushik BV <kaushikbv@gluster.com> Signed-off-by: Vijay Bellur <vijay@dev.gluster.com> BUG: 2535 (gsync cascading) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=2535
-rw-r--r--xlators/features/marker/utils/syncdaemon/master.py94
-rw-r--r--xlators/features/marker/utils/syncdaemon/resource.py95
2 files changed, 155 insertions, 34 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py
index a275f55fbe6..dfa7a2e6f56 100644
--- a/xlators/features/marker/utils/syncdaemon/master.py
+++ b/xlators/features/marker/utils/syncdaemon/master.py
@@ -1,5 +1,6 @@
import os
import sys
+import threading
import time
import stat
import signal
@@ -15,20 +16,50 @@ URXTIME = (-1, 0)
class GMaster(object):
def get_volinfo(self):
- self.volume_info = self.master.server.volume_info()
- if self.volume_info['retval']:
- raise RuntimeError("master is corrupt")
- return self.volume_info
+ vol_mark_dict_list = self.master.server.foreign_marks()
+ return_dict = None
+ if vol_mark_dict_list:
+ for i in range(0, len(vol_mark_dict_list)):
+ present_time = int (time.time())
+ if (present_time < vol_mark_dict_list[i]['timeout']):
+ logging.debug('syncing as intermediate-master with master as %s till: %d (time)' % \
+ (vol_mark_dict_list[i]['uuid'], vol_mark_dict_list[i]['timeout']))
+ if self.inter_master:
+ if (self.forgn_uuid != vol_mark_dict_list[i]['uuid']):
+ raise RuntimeError ('more than one master present')
+ else:
+ self.inter_master = True
+ self.forgn_uuid = vol_mark_dict_list[i]['uuid']
+ return_dict = vol_mark_dict_list[i]
+ else:
+ logging.debug('an expired master (%s) with time-out: %d, present time: %d' % \
+ (vol_mark_dict_list[i]['uuid'], vol_mark_dict_list[i]['timeout'],
+ present_time))
+ if self.inter_master:
+ self.volume_info = return_dict
+ if return_dict:
+ if self.volume_info['retval']:
+ raise RuntimeError ("master is corrupt")
+ return self.volume_info
+
+ self.volume_info = self.master.server.native_mark()
+ logging.debug('returning volume-mark from glusterfs: %s' %(self.volume_info))
+ if self.volume_info:
+ if self.volume_info['retval']:
+ raise RuntimeError("master is corrupt")
+ return self.volume_info
@property
def uuid(self):
if not getattr(self, '_uuid', None):
- self._uuid = self.volume_info['uuid']
+ if self.volume_info:
+ self._uuid = self.volume_info['uuid']
return self._uuid
@property
def volmark(self):
- return self.volume_info['volume_mark']
+ if self.volume_info:
+ return self.volume_info['volume_mark']
def xtime(self, path, *a, **opts):
if a:
@@ -36,31 +67,57 @@ class GMaster(object):
else:
rsc = self.master
if not 'create' in opts:
- opts['create'] = rsc == self.master
+ opts['create'] = (rsc == self.master and not self.inter_master)
+ if not 'default_xtime' in opts:
+ if self.inter_master:
+ opts['default_xtime'] = ENODATA
+ else:
+ opts['default_xtime'] = URXTIME
xt = rsc.server.xtime(path, self.uuid)
if isinstance(xt, int) and xt != ENODATA:
return xt
- if (xt == ENODATA or xt < self.volmark) and opts['create']:
+ invalid_xtime = (xt == ENODATA or xt < self.volmark)
+ if invalid_xtime and opts['create']:
t = time.time()
sec = int(t)
nsec = int((t - sec) * 1000000)
xt = (sec, nsec)
rsc.server.set_xtime(path, self.uuid, xt)
- if xt == ENODATA:
- xt = URXTIME
+ if invalid_xtime:
+ xt = opts['default_xtime']
return xt
def __init__(self, master, slave):
self.master = master
self.slave = slave
- self.get_volinfo()
self.jobtab = {}
self.syncer = Syncer(slave)
self.total_turns = int(gconf.turns)
self.turns = 0
self.start = None
self.change_seen = None
- logging.info('master started on ' + self.uuid)
+ self.forgn_uuid = None
+ self.orig_master = False
+ self.inter_master = False
+ self.get_volinfo()
+ if self.volume_info:
+ logging.info('master started on(UUID) : ' + self.uuid)
+
+ #pinger
+ if gconf.timeout and int(gconf.timeout) > 0:
+ def pinger():
+ while True:
+ volmark = self.get_volinfo()
+ if volmark:
+ volmark['forgn_uuid'] = True
+ timeout = int (time.time()) + 2 * gconf.timeout
+ volmark['timeout'] = timeout
+
+ self.slave.server.ping(volmark)
+ time.sleep(int(gconf.timeout) * 0.5)
+ t = threading.Thread(target=pinger)
+ t.setDaemon(True)
+ t.start()
while True:
self.crawl()
@@ -95,10 +152,15 @@ class GMaster(object):
logging.info("crawl took %.6f" % (time.time() - self.start))
time.sleep(1)
self.start = time.time()
- logging.info("crawling...")
- self.get_volinfo()
- if self.volume_info['uuid'] != self.uuid:
- raise RuntimeError("master uuid mismatch")
+ volinfo = self.get_volinfo()
+ if volinfo:
+ if volinfo['uuid'] != self.uuid:
+ raise RuntimeError("master uuid mismatch")
+ logging.info("Crawling as %s (%s master mode) ..." % \
+ (self.uuid,self.inter_master and "intermediate" or "primary"))
+ else:
+ logging.info("Crawling: waiting for valid key for %s" % self.uuid)
+ return
logging.debug("entering " + path)
if not xtl:
xtl = self.xtime(path)
diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py
index ad0d98bad13..8556e4246f7 100644
--- a/xlators/features/marker/utils/syncdaemon/resource.py
+++ b/xlators/features/marker/utils/syncdaemon/resource.py
@@ -10,6 +10,7 @@ import socket
import logging
import tempfile
import threading
+import time
from ctypes import *
from ctypes.util import find_library
from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR
@@ -67,12 +68,12 @@ class Xattr(object):
raise OSError(errn, os.strerror(errn))
@classmethod
- def lgetxattr(cls, path, attr, siz=0):
+ def _query_xattr(cls, path, siz, syscall, *a):
if siz:
buf = create_string_buffer('\0' * siz)
else:
buf = None
- ret = cls.libc.lgetxattr(path, attr, buf, siz)
+ ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz)))
if ret == -1:
cls.raise_oserr()
if siz:
@@ -81,15 +82,37 @@ class Xattr(object):
return ret
@classmethod
+ def lgetxattr(cls, path, attr, siz=0):
+ return cls._query_xattr( path, siz, 'lgetxattr', attr)
+
+ @classmethod
+ def llistxattr(cls, path, siz=0):
+ ret = cls._query_xattr(path, siz, 'llistxattr')
+ if isinstance(ret, str):
+ ret = ret.split('\0')
+ return ret
+
+ @classmethod
def lsetxattr(cls, path, attr, val):
ret = cls.libc.lsetxattr(path, attr, val, len(val), 0)
if ret == -1:
cls.raise_oserr()
+ @classmethod
+ def llistxattr_buf(cls, path):
+ size = cls.llistxattr(path)
+ if size == -1:
+ raise_oserr()
+ return cls.llistxattr(path, size)
+
+
class Server(object):
GX_NSPACE = "trusted.glusterfs"
+ NTV_FMTSTR = "!" + "B"*19 + "II"
+ FRGN_XTRA_FMT = "I"
+ FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT
@staticmethod
def entries(path):
@@ -184,7 +207,16 @@ class Server(object):
lastping = 0
@classmethod
- def ping(cls):
+ def ping(cls, dct):
+ if dct:
+ key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']])
+ val = struct.pack(cls.FRGN_FMTSTR,
+ *(dct['version'] +
+ tuple(int(x,16) for x in re.findall('(?:[\da-f]){2}', dct['uuid'])) +
+ (dct['retval'],) + dct['volume_mark'][0:2] + (dct['timeout'],)))
+ Xattr.lsetxattr('.', key, val)
+ else:
+ logging.info('no volume-mark, if the behaviour persists have to check if master gsyncd is running')
cls.lastping += 1
return cls.lastping
@@ -243,14 +275,6 @@ class SlaveRemote(object):
da1[i][k] = int(v)
if da1[0] != da1[1]:
raise RuntimeError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv))
- if gconf.timeout and int(gconf.timeout) > 0:
- def pinger():
- while True:
- self.server.ping()
- time.sleep(int(gconf.timeout) * 0.5)
- t = threading.Thread(target=pinger)
- t.setDaemon(True)
- t.start()
def rsync(self, files, *args):
if not files:
@@ -314,16 +338,51 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
class GLUSTERServer(Server):
+ forgn_mark_size = struct.calcsize(Server.FRGN_FMTSTR)
+ nativ_mark_size = struct.calcsize(Server.NTV_FMTSTR)
+
@classmethod
- def volume_info(cls):
- vm = struct.unpack('!' + 'B'*19 + 'II',
- Xattr.lgetxattr('.', '.'.join([cls.GX_NSPACE, 'volume-mark']), 27))
+ def attr_unpack_dict(cls, xattr, extra_fields = ''):
+ fmt_string = cls.NTV_FMTSTR + extra_fields
+ buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))
+ vm = struct.unpack(fmt_string, buf)
+ logging.info("str: %s" % `vm`)
m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]]))
uuid = '-'.join(m.groups())
- return { 'version': vm[0:2],
- 'uuid' : uuid,
- 'retval' : vm[18],
- 'volume_mark': vm[-2:] }
+ volinfo = { 'version': vm[0:2],
+ 'uuid' : uuid,
+ 'retval' : vm[18],
+ 'volume_mark': vm[18:20],
+ }
+ logging.info("volinfo: %s" % `volinfo`)
+ if extra_fields:
+ return volinfo, vm[-len(extra_fields):]
+ else:
+ return volinfo
+
+ @classmethod
+ def foreign_marks(cls):
+ dict_list = []
+ xattr_list = Xattr.llistxattr_buf('.')
+ for ele in xattr_list:
+ if (ele.find('trusted.glusterfs.volume-mark') != -1):
+ #buf = Xattr.lgetxattr('.', ele, cls.forgn_mark_size)
+ d, x = cls.attr_unpack_dict(ele, cls.FRGN_XTRA_FMT)
+ d['timeout'] = x[0]
+ dict_list.append(d)
+ return dict_list
+
+ @classmethod
+ def native_mark(cls):
+ try:
+ return cls.attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark']))
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == ENODATA:
+ logging.warn("volume-mark not found")
+ return
+ else:
+ raise RuntimeError("master is corrupt")
server = GLUSTERServer