diff options
Diffstat (limited to 'tools')
-rw-r--r-- | tools/Makefile.am | 2 | ||||
-rw-r--r-- | tools/glusterfind/Makefile.am | 7 | ||||
-rw-r--r-- | tools/glusterfind/glusterfind.in | 17 | ||||
-rw-r--r-- | tools/glusterfind/src/Makefile.am | 14 | ||||
-rw-r--r-- | tools/glusterfind/src/__init__.py | 9 | ||||
-rw-r--r-- | tools/glusterfind/src/brickfind.py | 97 | ||||
-rw-r--r-- | tools/glusterfind/src/changelog.py | 309 | ||||
-rw-r--r-- | tools/glusterfind/src/conf.py | 28 | ||||
-rw-r--r-- | tools/glusterfind/src/libgfchangelog.py | 83 | ||||
-rw-r--r-- | tools/glusterfind/src/main.py | 468 | ||||
-rw-r--r-- | tools/glusterfind/src/nodecleanup.py | 51 | ||||
-rw-r--r-- | tools/glusterfind/src/tool.conf.in | 11 | ||||
-rw-r--r-- | tools/glusterfind/src/utils.py | 203 |
13 files changed, 1298 insertions, 1 deletions
diff --git a/tools/Makefile.am b/tools/Makefile.am index 74229ab41e7..d689f60fa52 100644 --- a/tools/Makefile.am +++ b/tools/Makefile.am @@ -1,3 +1,3 @@ -SUBDIRS = gfind_missing_files +SUBDIRS = gfind_missing_files glusterfind CLEANFILES = diff --git a/tools/glusterfind/Makefile.am b/tools/glusterfind/Makefile.am new file mode 100644 index 00000000000..c99a3ddcb37 --- /dev/null +++ b/tools/glusterfind/Makefile.am @@ -0,0 +1,7 @@ +SUBDIRS = src + +EXTRA_DIST = + +bin_SCRIPTS = glusterfind + +CLEANFILES = $(bin_SCRIPTS) diff --git a/tools/glusterfind/glusterfind.in b/tools/glusterfind/glusterfind.in new file mode 100644 index 00000000000..cff8973980a --- /dev/null +++ b/tools/glusterfind/glusterfind.in @@ -0,0 +1,17 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sys +sys.path.insert(1, '@GLUSTERFS_LIBEXECDIR@/') + +from glusterfind.main import main + +if __name__ == "__main__": + main() diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am new file mode 100644 index 00000000000..458b820fd19 --- /dev/null +++ b/tools/glusterfind/src/Makefile.am @@ -0,0 +1,14 @@ +glusterfinddir = $(libexecdir)/glusterfs/glusterfind + +glusterfind_PYTHON = conf.py utils.py __init__.py \ + main.py libgfchangelog.py + +glusterfind_SCRIPTS = changelog.py nodecleanup.py \ + brickfind.py + +glusterfind_DATA = tool.conf + +EXTRA_DIST = changelog.py nodecleanup.py brickfind.py \ + tool.conf + +CLEANFILES = diff --git a/tools/glusterfind/src/__init__.py b/tools/glusterfind/src/__init__.py new file mode 100644 index 00000000000..eb941c6d67c --- /dev/null +++ b/tools/glusterfind/src/__init__.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py new file mode 100644 index 00000000000..4aee225d22e --- /dev/null +++ b/tools/glusterfind/src/brickfind.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +import sys +import logging +from argparse import ArgumentParser, RawDescriptionHelpFormatter +from errno import ENOENT + +from utils import mkdirp, setup_logger, create_file, output_write, find +import conf + + +PROG_DESCRIPTION = """ +Changelog Crawler +""" + +logger = logging.getLogger() + + +def brickfind_crawl(brick, args): + if brick.endswith("/"): + brick = brick[0:len(brick)-1] + + working_dir = os.path.dirname(args.outfile) + mkdirp(working_dir, exit_on_err=True, logger=logger) + create_file(args.outfile, exit_on_err=True, logger=logger) + + with open(args.outfile, "a+") as fout: + brick_path_len = len(brick) + + def mtime_filter(path): + try: + st = os.lstat(path) + except (OSError, IOError) as e: + if e.errno == ENOENT: + st = None + else: + raise + + if st and (st.st_mtime > args.start or st.st_ctime > args.start): + return True + + return False + + def output_callback(path): + path = path.strip() + path = path[brick_path_len+1:] + output_write(fout, path, args.output_prefix) + + if args.full: + find(brick, callback_func=output_callback, + ignore_dirs=[".glusterfs"]) + else: + find(brick, callback_func=output_callback, + filter_func=mtime_filter, + ignore_dirs=[".glusterfs"]) + + fout.flush() + os.fsync(fout.fileno()) + + +def _get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=PROG_DESCRIPTION) + + parser.add_argument("session", help="Session Name") + parser.add_argument("volume", help="Volume Name") + parser.add_argument("brick", help="Brick Name") + parser.add_argument("outfile", help="Output File") + parser.add_argument("start", help="Start Time", type=float) + parser.add_argument("--debug", help="Debug", action="store_true") + parser.add_argument("--full", help="Full Find", action="store_true") + parser.add_argument("--output-prefix", help="File prefix in output", + default=".") + + return parser.parse_args() + + +if __name__ == "__main__": + args = _get_args() + mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "brickfind.log") + setup_logger(logger, log_file, args.debug) + brickfind_crawl(args.brick, args) + sys.exit(0) diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py new file mode 100644 index 00000000000..b7697ea5030 --- /dev/null +++ b/tools/glusterfind/src/changelog.py @@ -0,0 +1,309 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +import sys +import time +import xattr +from errno import ENOENT +import logging +from argparse import ArgumentParser, RawDescriptionHelpFormatter +import hashlib + +import libgfchangelog +from utils import create_file, mkdirp, execute, symlink_gfid_to_path +from utils import fail, setup_logger, output_write, find +import conf + + +CHANGELOG_LOG_LEVEL = 9 +CHANGELOG_CONN_RETRIES = 5 +CHANGELOGAPI_NUM_WORKERS = 3 +PROG_DESCRIPTION = """ +Changelog Crawler +""" +history_turns = 0 +history_turn_time = 0 + +logger = logging.getLogger() + + +def gfid_to_path_using_batchfind(brick, gfids_file, output_file): + """ + find -samefile gets the inode number and crawls entire namespace + to get the list of files/dirs having same inode number. + Do find without any option, except the ignore directory option, + print the output in <INODE_NUM> <PATH> format, use this output + to look into in-memory dictionary of inode numbers got from the + list of GFIDs + """ + with open(output_file, "a+") as fout: + inode_dict = {} + with open(gfids_file) as f: + for gfid in f: + gfid = gfid.strip() + backend_path = os.path.join(brick, ".glusterfs", + gfid[0:2], gfid[2:4], gfid) + + try: + inode_dict[str(os.stat(backend_path).st_ino)] = 1 + except (IOError, OSError) as e: + if e.errno == ENOENT: + continue + else: + fail("%s Failed to convert to path from " + "GFID %s: %s" % (brick, gfid, e), logger=logger) + + if not inode_dict: + return + + def inode_filter(path): + try: + st = os.lstat(path) + except (OSError, IOError) as e: + if e.errno == ENOENT: + st = None + else: + raise + + if st and inode_dict.get(str(st.st_ino), None): + return True + + return False + + brick_path_len = len(brick) + + def output_callback(path): + path = path.strip() + path = path[brick_path_len+1:] + output_write(fout, path, args.output_prefix) + + # Length of brick path, to remove from output path + find(brick, callback_func=output_callback, + filter_func=inode_filter, + ignore_dirs=[".glusterfs"]) + + fout.flush() + os.fsync(fout.fileno()) + + +def gfid_to_path_using_pgfid(brick, gfids_file, output_file, outfile_failures): + """ + Parent GFID is saved as xattr, collect Parent GFIDs from all + the files from gfids_file. Convert parent GFID to path and Crawl + each directories to get the list of files/dirs having same inode number. + Do find with maxdepth as 1 and print the output in <INODE_NUM> <PATH> + format, use this output to look into in memory dictionary of inode + numbers got from the list of GFIDs + """ + with open(output_file, "a+") as fout: + pgfids = set() + inode_dict = {} + with open(gfids_file) as f: + for gfid in f: + gfid = gfid.strip() + p = os.path.join(brick, + ".glusterfs", + gfid[0:2], + gfid[2:4], + gfid) + if os.path.islink(p): + path = symlink_gfid_to_path(brick, gfid) + output_write(fout, path, args.output_prefix) + else: + try: + inode_dict[str(os.stat(p).st_ino)] = 1 + file_xattrs = xattr.list(p) + num_parent_gfid = 0 + for x in file_xattrs: + if x.startswith("trusted.pgfid."): + num_parent_gfid += 1 + pgfids.add(x.split(".")[-1]) + + if num_parent_gfid == 0: + with open(outfile_failures, "a") as f: + f.write("%s\n" % gfid) + f.flush() + os.fsync(f.fileno()) + + except (IOError, OSError) as e: + if e.errno == ENOENT: + continue + else: + fail("%s Failed to convert to path from " + "GFID %s: %s" % (brick, gfid, e), + logger=logger) + + if not inode_dict: + return + + def inode_filter(path): + try: + st = os.lstat(path) + except (OSError, IOError) as e: + if e.errno == ENOENT: + st = None + else: + raise + + if st and inode_dict.get(str(st.st_ino), None): + return True + + return False + + # Length of brick path, to remove from output path + brick_path_len = len(brick) + + def output_callback(path): + path = path.strip() + path = path[brick_path_len+1:] + output_write(fout, path, args.output_prefix) + + for pgfid in pgfids: + path = symlink_gfid_to_path(brick, pgfid) + find(os.path.join(brick, path), + callback_func=output_callback, + filter_func=inode_filter, + ignore_dirs=[".glusterfs"], + subdirs_crawl=False) + + fout.flush() + os.fsync(fout.fileno()) + + +def sort_unique(filename): + execute(["sort", "-u", "-o", filename, filename], + exit_msg="Sort failed", logger=logger) + + +def get_changes(brick, hash_dir, log_file, end, args): + """ + Makes use of libgfchangelog's history API to get changelogs + containing changes from start and end time. Further collects + the modified gfids from the changelogs and writes the list + of gfid to 'gfid_list' file. + """ + try: + libgfchangelog.cl_register(brick, hash_dir, log_file, + CHANGELOG_LOG_LEVEL, CHANGELOG_CONN_RETRIES) + except libgfchangelog.ChangelogException as e: + fail("%s Changelog register failed: %s" % (brick, e), logger=logger) + + # Output files to record GFIDs and GFID to Path failure GFIDs + gfid_list_path = args.outfile + ".gfids" + gfid_list_failures_file = gfid_list_path + ".failures" + create_file(gfid_list_path, exit_on_err=True, logger=logger) + create_file(gfid_list_failures_file, exit_on_err=True, logger=logger) + + # Changelogs path(Hard coded to BRICK/.glusterfs/changelogs + cl_path = os.path.join(brick, ".glusterfs/changelogs") + + # Fail if History fails for requested Start and End + try: + actual_end = libgfchangelog.cl_history_changelog( + cl_path, args.start, end, CHANGELOGAPI_NUM_WORKERS) + except libgfchangelog.ChangelogException as e: + fail("%s Historical Changelogs not available: %s" % (brick, e), + logger=logger) + + try: + # scan followed by getchanges till scan returns zero. + # history_scan() is blocking call, till it gets the number + # of changelogs to process. Returns zero when no changelogs + # to be processed. returns positive value as number of changelogs + # to be processed, which will be fetched using + # history_getchanges() + changes = [] + while libgfchangelog.cl_history_scan() > 0: + changes += libgfchangelog.cl_history_getchanges() + + if changes: + with open(gfid_list_path, 'a+') as fgfid: + for change in changes: + with open(change) as f: + for line in f: + # Space delimited list, collect GFID + details = line.split() + fgfid.write("%s\n" % details[1]) + + libgfchangelog.cl_history_done(change) + fgfid.flush() + os.fsync(fgfid.fileno()) + except libgfchangelog.ChangelogException as e: + fail("%s Error during Changelog Crawl: %s" % (brick, e), + logger=logger) + + # If TS returned from history_changelog is < end time + # then FS crawl may be required, since history is only available + # till TS returned from history_changelog + if actual_end < end: + fail("Partial History available with Changelog", 2, logger=logger) + + sort_unique(gfid_list_path) + gfid_to_path_using_pgfid(brick, gfid_list_path, + args.outfile, gfid_list_failures_file) + gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile) + + +def changelog_crawl(brick, end, args): + """ + Init function, prepares working dir and calls Changelog query + """ + if brick.endswith("/"): + brick = brick[0:len(brick)-1] + + # WORKING_DIR/BRICKHASH/OUTFILE + working_dir = os.path.dirname(args.outfile) + brickhash = hashlib.sha1(brick) + brickhash = str(brickhash.hexdigest()) + working_dir = os.path.join(working_dir, brickhash) + + mkdirp(working_dir, exit_on_err=True, logger=logger) + create_file(args.outfile, exit_on_err=True, logger=logger) + create_file(args.outfile + ".gfids", exit_on_err=True, logger=logger) + + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "changelog.%s.log" % brickhash) + + logger.info("%s Started Changelog Crawl. Start: %s, End: %s" + % (brick, args.start, end)) + get_changes(brick, working_dir, log_file, end, args) + + +def _get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=PROG_DESCRIPTION) + + parser.add_argument("session", help="Session Name") + parser.add_argument("volume", help="Volume Name") + parser.add_argument("brick", help="Brick Name") + parser.add_argument("outfile", help="Output File") + parser.add_argument("start", help="Start Time", type=int) + parser.add_argument("--debug", help="Debug", action="store_true") + parser.add_argument("--output-prefix", help="File prefix in output", + default=".") + + return parser.parse_args() + + +if __name__ == "__main__": + args = _get_args() + mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "changelog.log") + setup_logger(logger, log_file, args.debug) + end = int(time.time()) - int(conf.get_opt("changelog_rollover_time")) + changelog_crawl(args.brick, end, args) + sys.exit(0) diff --git a/tools/glusterfind/src/conf.py b/tools/glusterfind/src/conf.py new file mode 100644 index 00000000000..2c6eac2bb14 --- /dev/null +++ b/tools/glusterfind/src/conf.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +import ConfigParser + +config = ConfigParser.ConfigParser() +config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), + "tool.conf")) + + +def list_change_detectors(): + return dict(config.items("change_detectors")).keys() + + +def get_opt(opt): + return config.get("vars", opt) + + +def get_change_detector(opt): + return config.get("change_detectors", opt) diff --git a/tools/glusterfind/src/libgfchangelog.py b/tools/glusterfind/src/libgfchangelog.py new file mode 100644 index 00000000000..e54a16a4742 --- /dev/null +++ b/tools/glusterfind/src/libgfchangelog.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +from ctypes import CDLL, get_errno, create_string_buffer, c_ulong, byref +from ctypes import RTLD_GLOBAL +from ctypes.util import find_library + + +class ChangelogException(OSError): + pass + + +libgfc = CDLL(find_library("gfchangelog"), use_errno=True, mode=RTLD_GLOBAL) + + +def raise_oserr(): + errn = get_errno() + raise ChangelogException(errn, os.strerror(errn)) + + +def cl_register(brick, path, log_file, log_level, retries=0): + ret = libgfc.gf_changelog_register(brick, path, log_file, + log_level, retries) + if ret == -1: + raise_oserr() + + +def cl_history_scan(): + ret = libgfc.gf_history_changelog_scan() + if ret == -1: + raise_oserr() + + return ret + + +def cl_history_changelog(changelog_path, start, end, num_parallel): + actual_end = c_ulong() + ret = libgfc.gf_history_changelog(changelog_path, start, end, + num_parallel, + byref(actual_end)) + if ret == -1: + raise_oserr() + + return actual_end.value + + +def cl_history_startfresh(): + ret = libgfc.gf_history_changelog_start_fresh() + if ret == -1: + raise_oserr() + + +def cl_history_getchanges(): + """ remove hardcoding for path name length """ + def clsort(f): + return f.split('.')[-1] + + changes = [] + buf = create_string_buffer('\0', 4096) + + while True: + ret = libgfc.gf_history_changelog_next_change(buf, 4096) + if ret in (0, -1): + break + changes.append(buf.raw[:ret - 1]) + if ret == -1: + raise_oserr() + + return sorted(changes, key=clsort) + + +def cl_history_done(clfile): + ret = libgfc.gf_history_changelog_done(clfile) + if ret == -1: + raise_oserr() diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py new file mode 100644 index 00000000000..d6b9a24dec9 --- /dev/null +++ b/tools/glusterfind/src/main.py @@ -0,0 +1,468 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sys +from errno import ENOENT +import time +from multiprocessing import Process +import os +import xml.etree.cElementTree as etree +from argparse import ArgumentParser, RawDescriptionHelpFormatter +import logging +import shutil + +from utils import execute, is_host_local, mkdirp, fail +from utils import setup_logger, human_time +import conf + + +PROG_DESCRIPTION = """ +GlusterFS Incremental API +""" +ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError + +logger = logging.getLogger() + + +def node_run(volume, host, path, start, outfile, args, fallback=False): + """ + If host is local node, execute the command locally. If not local + execute the CHANGE_DETECTOR command via ssh and copy the output file from + remote node using scp. + """ + localdir = is_host_local(host) + + # If Full backup is requested or start time is zero, use brickfind + change_detector = conf.get_change_detector(args.change_detector) + if ((start == 0 or args.full) and args.change_detector == "changelog") or \ + fallback: + change_detector = conf.get_change_detector("brickfind") + + # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug + # --gfidpath <TYPE> + cmd = [change_detector, + args.session, + volume, + path, + outfile, + str(start), + "--output-prefix", + args.output_prefix] + \ + (["--debug"] if args.debug else []) + \ + (["--full"] if args.full else []) + + if not localdir: + # prefix with ssh command if not local node + cmd = ["ssh", + "-i", conf.get_opt("secret_pem"), + "root@%s" % host] + cmd + + rc, out, err = execute(cmd, logger=logger) + if rc == 2: + # Partial History Fallback + logger.info("%s %s Fallback to brickfind" % (host, err.strip())) + # Exit only from process, handled in main. + sys.exit(rc) + elif rc != 0: + fail("%s - Change detection failed" % host, logger=logger) + + if not localdir: + cmd_copy = ["scp", + "-i", conf.get_opt("secret_pem"), + "root@%s:/%s" % (host, outfile), + os.path.dirname(outfile)] + execute(cmd_copy, exit_msg="%s - Copy command failed" % host, + logger=logger) + + +def node_cleanup(host, args): + localdir = is_host_local(host) + + # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug + # --gfidpath <TYPE> + cmd = [conf.get_opt("nodecleanup"), + args.session, + args.volume] + (["--debug"] if args.debug else []) + + if not localdir: + # prefix with ssh command if not local node + cmd = ["ssh", + "-i", conf.get_opt("secret_pem"), + "root@%s" % host] + cmd + + execute(cmd, exit_msg="%s - Cleanup failed" % host, logger=logger) + + +def cleanup(nodes, args): + pool = [] + for num, node in enumerate(nodes): + host, brick = node[1].split(":") + # temp output file + node_outfile = os.path.join(conf.get_opt("working_dir"), + args.session, + args.volume, + "tmp_output_%s.txt" % num) + + try: + os.remove(node_outfile) + except (OSError, IOError): + # TODO: Cleanup Failure, Handle + pass + + p = Process(target=node_cleanup, + args=(host, args)) + p.start() + pool.append(p) + + exit_codes = 0 + for p in pool: + p.join() + exit_codes += (0 if p.exitcode == 0 else 1) + + if exit_codes != 0: + sys.exit(1) + + +def failback_node_run(brick_details, idx, volume, start, outfile, args): + host, brick = brick_details.split(":") + p = Process(target=node_run, + args=(volume, host, brick, start, outfile, args, True)) + p.start() + p.join() + return p.exitcode + + +def run_in_nodes(volume, start, args): + """ + Get nodes of volume using gluster volume info, spawn a process + each for a Node. Merge the output files once all the process + complete their tasks. + """ + nodes = get_nodes(volume) + pool = [] + node_outfiles = [] + for num, node in enumerate(nodes): + host, brick = node[1].split(":") + # temp output file + node_outfile = os.path.join(conf.get_opt("working_dir"), + args.session, + volume, + "tmp_output_%s.txt" % num) + node_outfiles.append(node_outfile) + p = Process(target=node_run, args=(volume, host, brick, start, + node_outfile, args)) + p.start() + pool.append(p) + + exit_codes = 0 + for idx, p in enumerate(pool): + p.join() + # Handle the Changelog failure, fallback to Brickfind + if p.exitcode == 2: + rc = failback_node_run(nodes[idx][1], idx, volume, start, + node_outfiles[idx], args) + exit_codes += (0 if rc == 0 else 1) + elif p.exitcode != 0: + exit_codes += (0 if p.exitcode == 0 else 1) + + if exit_codes != 0: + sys.exit(1) + + # Merge all output files + cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + + cleanup(nodes, args) + + +def get_nodes(volume): + """ + Get the gluster volume info xml output and parse to get + the brick details. + """ + cmd = ["gluster", 'volume', 'info', volume, "--xml"] + _, data, _ = execute(cmd, + exit_msg="Failed to Run Gluster Volume Info", + logger=logger) + tree = etree.fromstring(data) + + nodes = [] + volume_el = tree.find('volInfo/volumes/volume') + try: + for b in volume_el.findall('bricks/brick'): + nodes.append((b.find('hostUuid').text, + b.find('name').text)) + except (ParseError, AttributeError, ValueError) as e: + fail("Failed to parse Volume Info: %s" % e, logger=logger) + + return nodes + + +def _get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=PROG_DESCRIPTION) + subparsers = parser.add_subparsers(dest="mode") + + # create <SESSION> <VOLUME> [--debug] [--force] + parser_create = subparsers.add_parser('create') + parser_create.add_argument("session", help="Session Name") + parser_create.add_argument("volume", help="Volume Name") + parser_create.add_argument("--debug", help="Debug", action="store_true") + parser_create.add_argument("--force", help="Force option to recreate " + "the session", action="store_true") + + # delete <SESSION> <VOLUME> [--debug] + parser_delete = subparsers.add_parser('delete') + parser_delete.add_argument("session", help="Session Name") + parser_delete.add_argument("volume", help="Volume Name") + parser_delete.add_argument("--debug", help="Debug", action="store_true") + + # list [--session <SESSION>] [--volume <VOLUME>] + parser_list = subparsers.add_parser('list') + parser_list.add_argument("--session", help="Session Name", default="") + parser_list.add_argument("--volume", help="Volume Name", default="") + parser_list.add_argument("--debug", help="Debug", action="store_true") + + # pre <SESSION> <VOLUME> <OUTFILE> [--change-detector <CHANGE_DETECTOR>] + # [--output-prefix <OUTPUT_PREFIX>] [--full] + parser_pre = subparsers.add_parser('pre') + parser_pre.add_argument("session", help="Session Name") + parser_pre.add_argument("volume", help="Volume Name") + parser_pre.add_argument("outfile", help="Output File") + parser_pre.add_argument("--debug", help="Debug", action="store_true") + parser_pre.add_argument("--full", help="Full find", action="store_true") + parser_pre.add_argument("--change-detector", dest="change_detector", + help="Change detection", + choices=conf.list_change_detectors(), + type=str, default='changelog') + parser_pre.add_argument("--output-prefix", help="File prefix in output", + default=".") + + # post <SESSION> <VOLUME> + parser_post = subparsers.add_parser('post') + parser_post.add_argument("session", help="Session Name") + parser_post.add_argument("volume", help="Volume Name") + parser_post.add_argument("--debug", help="Debug", action="store_true") + + return parser.parse_args() + + +def ssh_setup(): + if not os.path.exists(conf.get_opt("secret_pem")): + # Generate ssh-key + cmd = ["ssh-keygen", + "-N", + "", + "-f", + conf.get_opt("secret_pem")] + execute(cmd, + exit_msg="Unable to generate ssh key %s" + % conf.get_opt("secret_pem"), + logger=logger) + + logger.info("Ssh key generated %s" % conf.get_opt("secret_pem")) + + # Copy pub file to all nodes + cmd = ["gluster", + "system::", + "copy", + "file", + "/" + os.path.basename(conf.get_opt("secret_pem")) + ".pub"] + execute(cmd, exit_msg="Failed to distribute ssh keys", logger=logger) + + logger.info("Distributed ssh key to all nodes of Volume") + + # Add to authorized_keys file in each node + cmd = ["gluster", + "system::", + "execute", + "add_secret_pub", + "root", + os.path.basename(conf.get_opt("secret_pem")) + ".pub"] + execute(cmd, + exit_msg="Failed to add ssh keys to authorized_keys file", + logger=logger) + + logger.info("Ssh key added to authorized_keys of Volume nodes") + + +def mode_create(session_dir, args): + logger.debug("Init is called - Session: %s, Volume: %s" + % (args.session, args.volume)) + + mkdirp(session_dir, exit_on_err=True, logger=logger) + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + status_file = os.path.join(session_dir, args.volume, "status") + + if os.path.exists(status_file) and not args.force: + fail("Session %s already created" % args.session, logger=logger) + + if not os.path.exists(status_file) or args.force: + ssh_setup() + + execute(["gluster", "volume", "set", + args.volume, "build-pgfid", "on"], + exit_msg="Failed to set volume option build-pgfid on", + logger=logger) + logger.info("Volume option set %s, build-pgfid on" % args.volume) + + execute(["gluster", "volume", "set", + args.volume, "changelog.changelog", "on"], + exit_msg="Failed to set volume option " + "changelog.changelog on", logger=logger) + logger.info("Volume option set %s, changelog.changelog on" + % args.volume) + + if not os.path.exists(status_file): + with open(status_file, "w", buffering=0) as f: + # Add Rollover time to current time to make sure changelogs + # will be available if we use this time as start time + time_to_update = int(time.time()) + int( + conf.get_opt("changelog_rollover_time")) + f.write(str(time_to_update)) + + sys.exit(0) + + +def mode_pre(session_dir, args): + """ + Read from Session file and write to session.pre file + """ + endtime_to_update = int(time.time()) - int( + conf.get_opt("changelog_rollover_time")) + status_file = os.path.join(session_dir, args.volume, "status") + status_file_pre = status_file + ".pre" + + mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger) + + start = 0 + try: + with open(status_file) as f: + start = int(f.read().strip()) + except ValueError: + pass + except (OSError, IOError) as e: + fail("Error Opening Session file %s: %s" + % (status_file, e), logger=logger) + + logger.debug("Pre is called - Session: %s, Volume: %s, " + "Start time: %s, End time: %s" + % (args.session, args.volume, start, endtime_to_update)) + + run_in_nodes(args.volume, start, args) + + with open(status_file_pre, "w", buffering=0) as f: + f.write(str(endtime_to_update)) + + sys.stdout.write("Generated output file %s\n" % args.outfile) + + +def mode_post(session_dir, args): + """ + If pre session file exists, overwrite session file + If pre session file does not exists, return ERROR + """ + status_file = os.path.join(session_dir, args.volume, "status") + logger.debug("Post is called - Session: %s, Volume: %s" + % (args.session, args.volume)) + status_file_pre = status_file + ".pre" + + if os.path.exists(status_file_pre): + os.rename(status_file_pre, status_file) + sys.exit(0) + else: + fail("Pre script is not run", logger=logger) + + +def mode_delete(session_dir, args): + def handle_rm_error(func, path, exc_info): + if exc_info[1].errno == ENOENT: + return + + raise exc_info[1] + + shutil.rmtree(os.path.join(session_dir, args.volume), + onerror=handle_rm_error) + + +def mode_list(session_dir, args): + """ + List available sessions to stdout, if session name is set + only list that session. + """ + if args.session: + if not os.path.exists(os.path.join(session_dir, args.session)): + fail("Invalid Session", logger=logger) + sessions = [args.session] + else: + sessions = [] + for d in os.listdir(session_dir): + sessions.append(d) + + output = [] + for session in sessions: + # Session Volume Last Processed + volnames = os.listdir(os.path.join(session_dir, session)) + + for volname in volnames: + if args.volume and args.volume != volname: + continue + + status_file = os.path.join(session_dir, session, volname, "status") + last_processed = None + try: + with open(status_file) as f: + last_processed = f.read().strip() + except (OSError, IOError) as e: + if e.errno == ENOENT: + pass + else: + raise + output.append((session, volname, last_processed)) + + if output: + sys.stdout.write("%s %s %s\n" % ("SESSION".ljust(25), + "VOLUME".ljust(25), + "SESSION TIME".ljust(25))) + sys.stdout.write("-"*75) + sys.stdout.write("\n") + for session, volname, last_processed in output: + sys.stdout.write("%s %s %s\n" % (session.ljust(25), + volname.ljust(25), + human_time(last_processed).ljust(25))) + + +def main(): + args = _get_args() + mkdirp(conf.get_opt("session_dir"), exit_on_err=True) + + if args.mode == "list": + session_dir = conf.get_opt("session_dir") + else: + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + + if not os.path.exists(session_dir) and args.mode not in ["create", "list"]: + fail("Invalid session %s" % args.session) + + mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "cli.log") + setup_logger(logger, log_file, args.debug) + + # globals() will have all the functions already defined. + # mode_<args.mode> will be the function name to be called + globals()["mode_" + args.mode](session_dir, args) diff --git a/tools/glusterfind/src/nodecleanup.py b/tools/glusterfind/src/nodecleanup.py new file mode 100644 index 00000000000..a31d4d83acd --- /dev/null +++ b/tools/glusterfind/src/nodecleanup.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import shutil +import sys +import os +import logging +from errno import ENOENT + +from utils import setup_logger, mkdirp +import conf + +logger = logging.getLogger() + + +if __name__ == "__main__": + # Args: <SESSION> <VOLUME> + session = sys.argv[1] + volume = sys.argv[2] + + working_dir = os.path.join(conf.get_opt("working_dir"), + session, + volume) + + mkdirp(os.path.join(conf.get_opt("log_dir"), session, volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + session, + volume, + "changelog.log") + + setup_logger(logger, log_file) + + try: + def handle_rm_error(func, path, exc_info): + if exc_info[1].errno == ENOENT: + return + + raise exc_info[1] + + shutil.rmtree(working_dir, onerror=handle_rm_error) + except (OSError, IOError) as e: + logger.error("Failed to delete working directory: %s" % e) + sys.exit(1) diff --git a/tools/glusterfind/src/tool.conf.in b/tools/glusterfind/src/tool.conf.in new file mode 100644 index 00000000000..bae46499aa0 --- /dev/null +++ b/tools/glusterfind/src/tool.conf.in @@ -0,0 +1,11 @@ +[vars] +session_dir=@GLUSTERD_WORKDIR@/glusterfind/ +secret_pem=@GLUSTERD_WORKDIR@/glusterfind.secret.pem +working_dir=@GLUSTERFSD_MISCDIR@/glusterfind/ +log_dir=/var/log/glusterfs/glusterfind/ +nodecleanup=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodecleanup.py +changelog_rollover_time=15 + +[change_detectors] +changelog=@GLUSTERFS_LIBEXECDIR@/glusterfind/changelog.py +brickfind=@GLUSTERFS_LIBEXECDIR@/glusterfind/brickfind.py
\ No newline at end of file diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py new file mode 100644 index 00000000000..c503a2b9f58 --- /dev/null +++ b/tools/glusterfind/src/utils.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sys +import socket +from subprocess import PIPE, Popen +from errno import EPERM, EEXIST +import logging +import os +from datetime import datetime + +ROOT_GFID = "00000000-0000-0000-0000-000000000001" + + +def find(path, callback_func=lambda x: True, filter_func=lambda x: True, + ignore_dirs=[], subdirs_crawl=True): + if os.path.basename(path) in ignore_dirs: + return + + if filter_func(path): + callback_func(path) + + for p in os.listdir(path): + full_path = os.path.join(path, p) + + if os.path.isdir(full_path): + if subdirs_crawl: + find(full_path, callback_func, filter_func, ignore_dirs) + else: + if filter_func(full_path): + callback_func(full_path) + else: + if filter_func(full_path): + callback_func(full_path) + + +def output_write(f, path, prefix="."): + if path == "": + return + + if prefix != ".": + path = os.path.join(prefix, path) + f.write("%s\n" % path) + + +def human_time(ts): + return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S") + + +def setup_logger(logger, path, debug=False): + if debug: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + + # create the logging file handler + fh = logging.FileHandler(path) + + formatter = logging.Formatter("[%(asctime)s] %(levelname)s " + "[%(module)s - %(lineno)s:%(funcName)s] " + "- %(message)s") + + fh.setFormatter(formatter) + + # add handler to logger object + logger.addHandler(fh) + + +def create_file(path, exit_on_err=False, logger=None): + """ + If file exists overwrite. Print error to stderr and exit + if exit_on_err is set, else raise the exception. Consumer + should handle the exception. + """ + try: + open(path, 'w').close() + except (OSError, IOError) as e: + if exit_on_err: + fail("Failed to create file %s: %s" % (path, e), logger=logger) + else: + raise + + +def mkdirp(path, exit_on_err=False, logger=None): + """ + Try creating required directory structure + ignore EEXIST and raise exception for rest of the errors. + Print error in stderr and exit if exit_on_err is set, else + raise exception. + """ + try: + os.makedirs(path) + except (OSError, IOError) as e: + if e.errno == EEXIST and os.path.isdir(path): + pass + else: + if exit_on_err: + fail("Fail to create dir %s: %s" % (path, e), logger=logger) + else: + raise + + +def fail(msg, code=1, logger=None): + """ + Write error to stderr and exit + """ + if logger: + logger.error(msg) + sys.stderr.write("%s\n" % msg) + sys.exit(code) + + +def execute(cmd, exit_msg=None, logger=None): + """ + If failure_msg is not None then return returncode, out and error. + If failure msg is set, write to stderr and exit. + """ + p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True) + + (out, err) = p.communicate() + if p.returncode != 0 and exit_msg is not None: + fail("%s: %s" % (exit_msg, err), p.returncode, logger=logger) + + return (p.returncode, out, err) + + +def symlink_gfid_to_path(brick, gfid): + """ + Each directories are symlinked to file named GFID + in .glusterfs directory of brick backend. Using readlink + we get PARGFID/basename of dir. readlink recursively till + we get PARGFID as ROOT_GFID. + """ + if gfid == ROOT_GFID: + return "" + + out_path = "" + while True: + path = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid) + path_readlink = os.readlink(path) + pgfid = os.path.dirname(path_readlink) + out_path = os.path.join(os.path.basename(path_readlink), out_path) + if pgfid == "../../00/00/%s" % ROOT_GFID: + break + gfid = os.path.basename(pgfid) + return out_path + + +def is_host_local(host): + """ + Find if a host is local or not. + Code copied from $GLUSTERFS/geo-replication/syncdaemon/syncdutils.py + """ + locaddr = False + for ai in socket.getaddrinfo(host, None): + # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators + # /mgmt/glusterd/src/glusterd-utils.c#L125 + if ai[0] == socket.AF_INET: + if ai[-1][0].split(".")[0] == "127": + locaddr = True + break + elif ai[0] == socket.AF_INET6: + if ai[-1][0] == "::1": + locaddr = True + break + else: + continue + try: + # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue, + # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587 + s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP) + except socket.error: + ex = sys.exc_info()[1] + if ex.errno != EPERM: + raise + f = None + try: + f = open("/proc/sys/net/ipv4/ip_nonlocal_bind") + if int(f.read()) != 0: + logger.warning("non-local bind is set and not " + "allowed to create " + "raw sockets, cannot determine " + "if %s is local" % host) + return False + s = socket.socket(ai[0], socket.SOCK_DGRAM) + finally: + if f: + f.close() + try: + s.bind(ai[-1]) + locaddr = True + break + except: + pass + s.close() + return locaddr |