summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/resource.py
diff options
context:
space:
mode:
authorAvra Sengupta <asengupt@redhat.com>2013-06-01 16:17:57 +0530
committerVijay Bellur <vbellur@redhat.com>2013-07-26 13:18:57 -0700
commitb13c483dca20e4015b958f8959328e665a357f60 (patch)
tree2af62fc50bae39e930fcbe09101d3e51c76eb6fc /geo-replication/syncdaemon/resource.py
parent4944fc943efc41df1841e4e559180171f6541112 (diff)
gsyncd: distribute the crawling load
* also consume changelog for change detection. * Status fixes * Use new libgfchangelog done API * process (and sync) one changelog at a time Change-Id: I24891615bb762e0741b1819ddfdef8802326cb16 BUG: 847839 Original Author: Csaba Henk <csaba@redhat.com> Original Author: Aravinda VK <avishwan@redhat.com> Original Author: Venky Shankar <vshankar@redhat.com> Original Author: Amar Tumballi <amarts@redhat.com> Original Author: Avra Sengupta <asengupt@redhat.com> Signed-off-by: Avra Sengupta <asengupt@redhat.com> Reviewed-on: http://review.gluster.org/5131 Reviewed-by: Vijay Bellur <vbellur@redhat.com> Tested-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r--geo-replication/syncdaemon/resource.py207
1 files changed, 188 insertions, 19 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 73102fbc..52989fe2 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -5,13 +5,14 @@ import stat
import time
import fcntl
import errno
+import types
import struct
import socket
import logging
import tempfile
import threading
import subprocess
-from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR
+from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR, ENOTEMPTY
from select import error as SelectError
from gconf import gconf
@@ -19,7 +20,8 @@ import repce
from repce import RepceServer, RepceClient
from master import gmaster_builder
import syncdutils
-from syncdutils import GsyncdError, select, privileged, boolify
+from syncdutils import GsyncdError, select, privileged, boolify, funcode
+from syncdutils import umask, entry2pb, gauxpfx, errno_wrap
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
@@ -105,7 +107,18 @@ class _MetaXattr(object):
setattr(self, m, getattr(LXattr, m))
return getattr(self, meth)
+class _MetaChangelog(object):
+ def __getattr__(self, meth):
+ from libgfchangelog import Changes as LChanges
+ xmeth = [ m for m in dir(LChanges) if m[0] != '_' ]
+ if not meth in xmeth:
+ return
+ for m in xmeth:
+ setattr(self, m, getattr(LChanges, m))
+ return getattr(self, meth)
+
Xattr = _MetaXattr()
+Changes = _MetaChangelog()
class Popen(subprocess.Popen):
@@ -245,10 +258,24 @@ class Server(object):
and classmethods and is used directly, without instantiation.)
"""
- GX_NSPACE = (privileged() and "trusted" or "system") + ".glusterfs"
+ GX_NSPACE_PFX = (privileged() and "trusted" or "system")
+ GX_NSPACE = GX_NSPACE_PFX + ".glusterfs"
NTV_FMTSTR = "!" + "B"*19 + "II"
FRGN_XTRA_FMT = "I"
FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT
+ GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0'
+
+ local_path = ''
+
+ @classmethod
+ def _fmt_mknod(cls, l):
+ return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l+1)
+ @classmethod
+ def _fmt_mkdir(cls, l):
+ return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l+1)
+ @classmethod
+ def _fmt_symlink(cls, l1, l2):
+ return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1+1, l2+1)
def _pathguard(f):
"""decorator method that checks
@@ -257,22 +284,21 @@ class Server(object):
point out of the managed tree
"""
- fc = getattr(f, 'func_code', None)
- if not fc:
- # python 3
- fc = f.__code__
+ fc = funcode(f)
pi = list(fc.co_varnames).index('path')
def ff(*a):
path = a[pi]
ps = path.split('/')
if path[0] == '/' or '..' in ps:
raise ValueError('unsafe path')
+ a = list(a)
+ a[pi] = os.path.join(a[0].local_path, path)
return f(*a)
return ff
- @staticmethod
+ @classmethod
@_pathguard
- def entries(path):
+ def entries(cls, path):
"""directory entries in an array"""
# prevent symlinks being followed
if not stat.S_ISDIR(os.lstat(path).st_mode):
@@ -371,6 +397,18 @@ class Server(object):
raise
@classmethod
+ def gfid(cls, gfidpath):
+ return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid', cls.GX_GFID_CANONICAL_LEN], [ENOENT])
+
+ @classmethod
+ def node_uuid(cls, path='.'):
+ try:
+ uuid_l = Xattr.lgetxattr_buf(path, '.'.join([cls.GX_NSPACE, 'node-uuid']))
+ return uuid_l[:-1].split(' ')
+ except OSError:
+ raise
+
+ @classmethod
def xtime_vec(cls, path, *uuids):
"""vectored version of @xtime
@@ -402,9 +440,96 @@ class Server(object):
for u,t in mark_dct.items():
cls.set_xtime(path, u, t)
- @staticmethod
+ @classmethod
+ def entry_ops(cls, entries):
+ pfx = gauxpfx()
+ logging.debug('entries: %s' % repr(entries))
+ # regular file
+ def entry_pack_reg(gf, bn, st):
+ blen = len(bn)
+ mo = st['mode']
+ return struct.pack(cls._fmt_mknod(blen),
+ st['uid'], st['gid'],
+ gf, mo, bn,
+ stat.S_IMODE(mo), 0, umask())
+ # mkdir
+ def entry_pack_mkdir(gf, bn, st):
+ blen = len(bn)
+ mo = st['mode']
+ return struct.pack(cls._fmt_mkdir(blen),
+ st['uid'], st['gid'],
+ gf, mo, bn,
+ stat.S_IMODE(mo), umask())
+ #symlink
+ def entry_pack_symlink(gf, bn, lnk, st):
+ blen = len(bn)
+ llen = len(lnk)
+ return struct.pack(cls._fmt_symlink(blen, llen),
+ st['uid'], st['gid'],
+ gf, st['mode'], bn, lnk)
+ def entry_purge(entry, gfid):
+ # This is an extremely racy code and needs to be fixed ASAP.
+ # The GFID check here is to be sure that the pargfid/bname
+ # to be purged is the GFID gotten from the changelog.
+ # (a stat(changelog_gfid) would also be valid here)
+ # The race here is between the GFID check and the purge.
+ disk_gfid = cls.gfid(entry)
+ if isinstance(disk_gfid, int):
+ return
+ if not gfid == disk_gfid:
+ return
+ er = errno_wrap(os.unlink, [entry], [ENOENT, EISDIR])
+ if isinstance(er, int):
+ if er == EISDIR:
+ er = errno_wrap(os.rmdir, [entry], [ENOENT, ENOTEMPTY])
+ if er == ENOTEMPTY:
+ return er
+ for e in entries:
+ blob = None
+ op = e['op']
+ gfid = e['gfid']
+ entry = e['entry']
+ (pg, bname) = entry2pb(entry)
+ if op in ['RMDIR', 'UNLINK']:
+ while True:
+ er = entry_purge(entry, gfid)
+ if isinstance(er, int):
+ time.sleep(1)
+ else:
+ break
+ elif op == 'CREATE':
+ blob = entry_pack_reg(gfid, bname, e['stat'])
+ elif op == 'MKDIR':
+ blob = entry_pack_mkdir(gfid, bname, e['stat'])
+ elif op == 'LINK':
+ errno_wrap(os.link, [os.path.join(pfx, gfid), entry], [ENOENT, EEXIST])
+ elif op == 'SYMLINK':
+ blob = entry_pack_symlink(gfid, bname, e['link'], e['stat'])
+ elif op == 'RENAME':
+ en = e['entry1']
+ errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])
+ if blob:
+ errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [ENOENT, EEXIST])
+
+ @classmethod
+ def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0):
+ Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)
+
+ @classmethod
+ def changelog_scan(cls):
+ Changes.cl_scan()
+
+ @classmethod
+ def changelog_getchanges(cls):
+ return Changes.cl_getchanges()
+
+ @classmethod
+ def changelog_done(cls, clfile):
+ Changes.cl_done(clfile)
+
+ @classmethod
@_pathguard
- def setattr(path, adct):
+ def setattr(cls, path, adct):
"""set file attributes
@adct is a dict, where 'own', 'mode' and 'times'
@@ -537,10 +662,10 @@ class SlaveRemote(object):
raise GsyncdError("no files to sync")
logging.debug("files: " + ", ".join(files))
argv = gconf.rsync_command.split() + \
- ['-aR0', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \
+ ['-avR0', '--inplace', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \
gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \
['.'] + list(args)
- po = Popen(argv, stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
+ po = Popen(argv, stdin=subprocess.PIPE,stderr=subprocess.PIPE)
for f in files:
po.stdin.write(f)
po.stdin.write('\0')
@@ -685,7 +810,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def can_connect_to(self, remote):
"""determine our position in the connectibility matrix"""
- return True
+ return not remote or \
+ (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER))
class Mounter(object):
"""Abstract base class for mounter backends"""
@@ -864,6 +990,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
sup(self, *a, **kw)
self.slavedir = "/proc/%d/cwd" % self.server.pid()
+ def gmaster_instantiate_tuple(self, slave):
+ """return a tuple of the 'one shot' and the 'main crawl' class instance"""
+ return (gmaster_builder('xsync')(self, slave), gmaster_builder()(self, slave))
+
def service_loop(self, *args):
"""enter service loop
@@ -873,7 +1003,41 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
- else do that's what's inherited
"""
if args:
- gmaster_builder()(self, args[0]).crawl_loop()
+ slave = args[0]
+ if gconf.local_path:
+ class brickserver(FILE.FILEServer):
+ local_path = gconf.local_path
+ aggregated = self.server
+ @classmethod
+ def entries(cls, path):
+ e = super(brickserver, cls).entries(path)
+ # on the brick don't mess with /.glusterfs
+ if path == '.':
+ try:
+ e.remove('.glusterfs')
+ except ValueError:
+ pass
+ return e
+ if gconf.slave_id:
+ # define {,set_}xtime in slave, thus preempting
+ # the call to remote, so that it takes data from
+ # the local brick
+ slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server)
+ slave.server.set_xtime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_xtime(path, uuid + '.' + gconf.slave_id, mark), slave.server)
+ (g1, g2) = self.gmaster_instantiate_tuple(slave)
+ g1.master.server = brickserver
+ g2.master.server = brickserver
+ else:
+ (g1, g2) = self.gmaster_instantiate_tuple(slave)
+ g1.master.server.aggregated = gmaster.master.server
+ g2.master.server.aggregated = gmaster.master.server
+ # bad bad bad: bad way to do things like this
+ # need to make this elegant
+ # register the crawlers and start crawling
+ g1.register()
+ g2.register()
+ g1.crawlwrap(oneshot=True)
+ g2.crawlwrap()
else:
sup(self, *args)
@@ -893,13 +1057,18 @@ class SSH(AbstractUrl, SlaveRemote):
'^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ]))
self.inner_rsc = parse_url(inner_url)
- def canonical_path(self):
- m = re.match('([^@]+)@(.+)', self.remote_addr)
+ @staticmethod
+ def parse_ssh_address(addr):
+ m = re.match('([^@]+)@(.+)', addr)
if m:
u, h = m.groups()
else:
- u, h = syncdutils.getusername(), self.remote_addr
- remote_addr = '@'.join([u, gethostbyname(h)])
+ u, h = syncdutils.getusername(), addr
+ return {'user': u, 'host': h}
+
+ def canonical_path(self):
+ rap = self.parse_ssh_address(self.remote_addr)
+ remote_addr = '@'.join([rap['user'], gethostbyname(rap['host'])])
return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)])
def can_connect_to(self, remote):