diff options
| -rwxr-xr-x | extras/rebalance.py | 299 | ||||
| -rw-r--r-- | extras/volfilter.py | 167 | 
2 files changed, 466 insertions, 0 deletions
| diff --git a/extras/rebalance.py b/extras/rebalance.py new file mode 100755 index 000000000..80c614c5d --- /dev/null +++ b/extras/rebalance.py @@ -0,0 +1,299 @@ +#!/usr/bin/python + +import atexit +import copy +import optparse +import os +import pipes +import shutil +import string +import subprocess +import sys +import tempfile +import volfilter + +# It's just more convenient to have named fields. +class Brick: +        def __init__ (self, path, name): +                self.path = path +                self.sv_name = name +                self.size = 0 +                self.curr_size = 0 +                self.good_size = 0 +        def set_size (self, size): +                self.size = size +        def set_range (self, rs, re): +                self.r_start = rs +                self.r_end = re +                self.curr_size = self.r_end - self.r_start + 1 +        def __repr__ (self): +                value = self.path[:] +                value += "(%d," % self.size +                if self.curr_size: +                        value += "0x%x,0x%x)" % (self.r_start, self.r_end) +                else: +                        value += "-)" +                return value + +def get_bricks (host, vol): +        t = pipes.Template() +        t.prepend("gluster --remote-host=%s system getspec %s"%(host,vol),".-") +        return t.open(None,"r") + +def generate_stanza (vf, all_xlators, cur_subvol): +        sv_list = [] +        for sv in cur_subvol.subvols: +                generate_stanza(vf,all_xlators,sv) +                sv_list.append(sv.name) +        vf.write("volume %s\n"%cur_subvol.name) +        vf.write("  type %s\n"%cur_subvol.type) +        for kvpair in cur_subvol.opts.iteritems(): +                vf.write("  option %s %s\n"%kvpair) +        if sv_list: +                vf.write("  subvolumes %s\n"%string.join(sv_list)) +        vf.write("end-volume\n\n") + + +def mount_brick (localpath, all_xlators, dht_subvol): + +        # Generate a volfile. +        vf_name = localpath + ".vol" +        vf = open(vf_name,"w") +        generate_stanza(vf,all_xlators,dht_subvol) +        vf.flush() +        vf.close() + +        # Create a brick directory and mount the brick there. +        os.mkdir(localpath) +        subprocess.call(["glusterfs","-f",vf_name,localpath]) + +# We use the command-line tools because there's no getxattr support in the +# Python standard library (which is ridiculous IMO).  Adding the xattr package +# from PyPI would create a new and difficult dependency because the bits to +# satisfy it don't seem to exist in Fedora.  We already expect the command-line +# tools to be there, so it's safer just to rely on them. +# +# We might have to revisit this if we get as far as actually issuing millions +# of setxattr requests.  Even then, it might be better to do that part with a C +# program which has only a build-time dependency. +def get_range (brick): +        t = pipes.Template() +        cmd = "getfattr -e hex -n trusted.glusterfs.dht %s 2> /dev/null" +        t.prepend(cmd%brick,".-") +        t.append("grep ^trusted.glusterfs.dht=","--") +        f = t.open(None,"r") +        try: +                value = f.readline().rstrip().split('=')[1][2:] +        except: +                print "could not get layout for %s (might be OK)" % brick +                return None +        v_start = int("0x"+value[16:24],16) +        v_end = int("0x"+value[24:32],16) +        return (v_start, v_end) + +def calc_sizes (bricks, total): +        leftover = 1 << 32 +        for b in bricks: +               if b.size: +                        b.good_size = (b.size << 32) / total +                        leftover -= b.good_size +               else: +                        b.good_size = 0 +        if leftover: +                # Add the leftover to an old brick if we can. +                for b in bricks: +                        if b.good_size: +                                b.good_size += leftover +                                break +                else: +                        # Fine, just add it wherever. +                        bricks[0].good_size += leftover + +# Normalization means sorting the bricks by r_start and (b) ensuring that there +# are no gaps. +def normalize (in_bricks): +        out_bricks = [] +        curr_hash = 0 +        used = 0 +        while curr_hash < (1<<32): +                curr_best = None +                for b in in_bricks: +                        if b.r_start == curr_hash: +                                used += 1 +                                out_bricks.append(b) +                                in_bricks.remove(b) +                                curr_hash = b.r_end + 1 +                                break +                else: +                        print "gap found at 0x%08x" % curr_hash +                        sys.exit(1) +        return out_bricks + in_bricks, used + +def get_score (bricks): +        score = 0 +        curr_hash = 0 +        for b in bricks: +                if not b.curr_size: +                        curr_hash += b.good_size +                        continue +                new_start = curr_hash +                curr_hash += b.good_size +                new_end = curr_hash - 1 +                if new_start > b.r_start: +                        max_start = new_start +                else: +                        max_start = b.r_start +                if new_end < b.r_end: +                        min_end = new_end +                else: +                        min_end = b.r_end +                if max_start <= min_end: +                        score += (min_end - max_start + 1) +        return score + +if __name__ == "__main__": + +	my_usage = "%prog [options] server volume [directory]" +	parser = optparse.OptionParser(usage=my_usage) +        parser.add_option("-f", "--free-space", dest="free_space", +                          default=False, action="store_true", +                          help="use free space instead of total space") +        parser.add_option("-l", "--leave-mounted", dest="leave_mounted", +                          default=False, action="store_true", +                          help="leave subvolumes mounted") +        parser.add_option("-v", "--verbose", dest="verbose", +                          default=False, action="store_true", +                          help="verbose output") +	options, args = parser.parse_args() + +        if len(args) == 3: +                fix_dir = args[2] +        else: +                if len(args) != 2: +                        parser.print_help() +                        sys.exit(1) +                fix_dir = None +        hostname, volname = args[:2] + +        # Make sure stuff gets cleaned up, even if there are exceptions. +        orig_dir = os.getcwd() +        work_dir = tempfile.mkdtemp() +        bricks = [] +        def cleanup_workdir (): +                os.chdir(orig_dir) +                if options.verbose: +                        print "Cleaning up %s" % work_dir +                for b in bricks: +                        subprocess.call(["umount",b.path]) +                shutil.rmtree(work_dir) +        if not options.leave_mounted: +                atexit.register(cleanup_workdir) +        os.chdir(work_dir) + +        # Mount each brick individually, so we can issue brick-specific calls. +        if options.verbose: +                print "Mounting subvolumes..." +        index = 0 +        volfile_pipe = get_bricks(hostname,volname) +        all_xlators, last_xlator = volfilter.load(volfile_pipe) +        for dht_vol in all_xlators.itervalues(): +                if dht_vol.type == "cluster/distribute": +                        break +        else: +                print "no DHT volume found" +                sys.exit(1) +        for sv in dht_vol.subvols: +                #print "found subvol %s" % sv.name +                lpath = "%s/brick%s" % (work_dir, index) +                index += 1 +                mount_brick(lpath,all_xlators,sv) +                bricks.append(Brick(lpath,sv.name)) +        if index == 0: +                print "no bricks" +                sys.exit(1) + +        # Collect all of the sizes. +        if options.verbose: +                print "Collecting information..." +        total = 0 +        for b in bricks: +                info = os.statvfs(b.path) +                # We want a standard unit even if different bricks use +                # different block sizes.  The size is chosen to avoid overflows +                # for very large bricks with very small block sizes, but also +                # accommodate filesystems which use very large block sizes to +                # cheat on benchmarks. +                blocksper100mb = 104857600 / info[0] +                if options.free_space: +                        size = info[3] / blocksper100mb +                else: +                        size = info[2] / blocksper100mb +                if size <= 0: +                        print "brick %s has invalid size %d" % (b.path, size) +                        sys.exit(1) +                b.set_size(size) +                total += size + +        # Collect all of the layout information. +        for b in bricks: +                hash_range = get_range(b.path) +                if hash_range is not None: +                        rs, re = hash_range +                        if rs > re: +                                print "%s has backwards hash range" % b.path +                                sys.exit(1) +                        b.set_range(hash_range[0],hash_range[1]) + +        if options.verbose: +                print "Calculating new layouts..." +        calc_sizes(bricks,total) +        bricks, used = normalize(bricks) + +        # We can't afford O(n!) here, but O(n^2) should be OK and the result +        # should be almost as good. +        while used < len(bricks): +                best_place = used +                best_score = get_score(bricks) +                for i in xrange(used): +                        new_bricks = bricks[:] +                        del new_bricks[used] +                        new_bricks.insert(i,bricks[used]) +                        new_score = get_score(new_bricks) +                        if new_score > best_score: +                                best_place = i +                                best_score = new_score +                if best_place != used: +                        nb = bricks[used] +                        del bricks[used] +                        bricks.insert(best_place,nb) +                used += 1 + +        # Finalize whatever we decided on. +        curr_hash = 0 +        for b in bricks: +                b.r_start = curr_hash +                curr_hash += b.good_size +                b.r_end = curr_hash - 1 + +        print "Here are the xattr values for your size-weighted layout:" +        for b in bricks: +                print "  %s: 0x0000000200000000%08x%08x" % ( +                        b.sv_name, b.r_start, b.r_end) + +        if fix_dir: +                if options.verbose: +                        print "Fixing layout for %s" % fix_dir +                for b in bricks: +                        value = "0x0000000200000000%08x%08x" % ( +                                b.r_start, b.r_end) +                        path = "%s/%s" % (b.path, fix_dir) +                        cmd = "setfattr -n trusted.glusterfs.dht -v %s %s" % ( +                                value, path) +                        print cmd + +        if options.leave_mounted: +                print "The following subvolumes are still mounted:" +                for b in bricks: +                        print "%s on %s" % (b.sv_name, b.path) +                print "Don't forget to clean up when you're done." + diff --git a/extras/volfilter.py b/extras/volfilter.py new file mode 100644 index 000000000..0ca456a78 --- /dev/null +++ b/extras/volfilter.py @@ -0,0 +1,167 @@ +# Copyright (c) 2010-2011 Red Hat, Inc. +# +# This file is part of HekaFS. +# +# HekaFS is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License, version 3, as published by the Free +# Software Foundation. +# +# HekaFS is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE.  See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License * along +# with HekaFS.  If not, see <http://www.gnu.org/licenses/>. + +import copy +import string +import sys +import types + +good_xlators = [ +	"cluster/afr", +	"cluster/dht", +	"cluster/distribute", +	"cluster/replicate", +	"cluster/stripe", +	"debug/io-stats", +	"features/access-control", +	"features/locks", +	"features/marker", +	"features/uidmap", +	"performance/io-threads", +	"protocol/client", +	"protocol/server", +	"storage/posix", +] + +def copy_stack (old_xl,suffix,recursive=False): +	if recursive: +		new_name = old_xl.name + "-" + suffix +	else: +		new_name = suffix +	new_xl = Translator(new_name) +	new_xl.type = old_xl.type +	# The results with normal assignment here are . . . amusing. +	new_xl.opts = copy.deepcopy(old_xl.opts) +	for sv in old_xl.subvols: +		new_xl.subvols.append(copy_stack(sv,suffix,True)) +	# Patch up the path at the bottom. +	if new_xl.type == "storage/posix": +		new_xl.opts["directory"] += ("/" + suffix) +	return new_xl + +def cleanup (parent, graph): +	if parent.type in good_xlators: +		# Temporary fix so that HekaFS volumes can use the +		# SSL-enabled multi-threaded socket transport. +		if parent.type == "protocol/server": +			parent.type = "protocol/server2" +			parent.opts["transport-type"] = "ssl" +		elif parent.type == "protocol/client": +			parent.type = "protocol/client2" +			parent.opts["transport-type"] = "ssl" +		sv = [] +		for child in parent.subvols: +			sv.append(cleanup(child,graph)) +		parent.subvols = sv +	else: +		parent = cleanup(parent.subvols[0],graph) +	return parent + +class Translator: +	def __init__ (self, name): +		self.name = name +		self.type = "" +		self.opts = {} +		self.subvols = [] +		self.dumped = False +	def __repr__ (self): +		return "<Translator %s>" % self.name + +def load (path): +	# If it's a string, open it; otherwise, assume it's already a +	# file-like object (most notably from urllib*). +	if type(path) in types.StringTypes: +		fp = file(path,"r") +	else: +		fp = path +	all_xlators = {} +	xlator = None +	last_xlator = None +	while True: +		text = fp.readline() +		if text == "": +			break +		text = text.split() +		if not len(text): +			continue +		if text[0] == "volume": +			if xlator: +				raise RuntimeError, "nested volume definition" +			xlator = Translator(text[1]) +			continue +		if not xlator: +			raise RuntimeError, "text outside volume definition" +		if text[0] == "type": +			xlator.type = text[1] +			continue +		if text[0] == "option": +			xlator.opts[text[1]] = string.join(text[2:]) +			continue +		if text[0] == "subvolumes": +			for sv in text[1:]: +				xlator.subvols.append(all_xlators[sv]) +			continue +		if text[0] == "end-volume": +			all_xlators[xlator.name] = xlator +			last_xlator = xlator +			xlator = None +			continue +		raise RuntimeError, "unrecognized keyword %s" % text[0] +	if xlator: +		raise RuntimeError, "unclosed volume definition" +	return all_xlators, last_xlator + +def generate (graph, last, stream=sys.stdout): +	for sv in last.subvols: +		if not sv.dumped: +			generate(graph,sv,stream) +			print >> stream, "" +			sv.dumped = True +	print >> stream, "volume %s" % last.name +	print >> stream, "    type %s" % last.type +	for k, v in last.opts.iteritems(): +		print >> stream, "    option %s %s" % (k, v) +	if last.subvols: +		print >> stream, "    subvolumes %s" % string.join( +			[ sv.name for sv in last.subvols ]) +	print >> stream, "end-volume" + +def push_filter (graph, old_xl, filt_type, opts={}): +	suffix = "-" + old_xl.type.split("/")[1] +	if len(old_xl.name) > len(suffix): +		if old_xl.name[-len(suffix):] == suffix: +			old_xl.name = old_xl.name[:-len(suffix)] +	new_xl = Translator(old_xl.name+suffix) +	new_xl.type = old_xl.type +	new_xl.opts = old_xl.opts +	new_xl.subvols = old_xl.subvols +	graph[new_xl.name] = new_xl +	old_xl.name += ("-" + filt_type.split("/")[1]) +	old_xl.type = filt_type +	old_xl.opts = opts +	old_xl.subvols = [new_xl] +	graph[old_xl.name] = old_xl + +def delete (graph, victim): +	if len(victim.subvols) != 1: +		raise RuntimeError, "attempt to delete non-unary translator" +	for xl in graph.itervalues(): +		while xl.subvols.count(victim): +			i = xl.subvols.index(victim) +			xl.subvols[i] = victim.subvols[0] + +if __name__ == "__main__": +	graph, last = load(sys.argv[1]) +	generate(graph,last) | 
