diff options
Diffstat (limited to 'tools/glusterfind/src/main.py')
-rw-r--r-- | tools/glusterfind/src/main.py | 468 |
1 files changed, 468 insertions, 0 deletions
diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py new file mode 100644 index 00000000000..d6b9a24dec9 --- /dev/null +++ b/tools/glusterfind/src/main.py @@ -0,0 +1,468 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sys +from errno import ENOENT +import time +from multiprocessing import Process +import os +import xml.etree.cElementTree as etree +from argparse import ArgumentParser, RawDescriptionHelpFormatter +import logging +import shutil + +from utils import execute, is_host_local, mkdirp, fail +from utils import setup_logger, human_time +import conf + + +PROG_DESCRIPTION = """ +GlusterFS Incremental API +""" +ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError + +logger = logging.getLogger() + + +def node_run(volume, host, path, start, outfile, args, fallback=False): + """ + If host is local node, execute the command locally. If not local + execute the CHANGE_DETECTOR command via ssh and copy the output file from + remote node using scp. + """ + localdir = is_host_local(host) + + # If Full backup is requested or start time is zero, use brickfind + change_detector = conf.get_change_detector(args.change_detector) + if ((start == 0 or args.full) and args.change_detector == "changelog") or \ + fallback: + change_detector = conf.get_change_detector("brickfind") + + # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug + # --gfidpath <TYPE> + cmd = [change_detector, + args.session, + volume, + path, + outfile, + str(start), + "--output-prefix", + args.output_prefix] + \ + (["--debug"] if args.debug else []) + \ + (["--full"] if args.full else []) + + if not localdir: + # prefix with ssh command if not local node + cmd = ["ssh", + "-i", conf.get_opt("secret_pem"), + "root@%s" % host] + cmd + + rc, out, err = execute(cmd, logger=logger) + if rc == 2: + # Partial History Fallback + logger.info("%s %s Fallback to brickfind" % (host, err.strip())) + # Exit only from process, handled in main. + sys.exit(rc) + elif rc != 0: + fail("%s - Change detection failed" % host, logger=logger) + + if not localdir: + cmd_copy = ["scp", + "-i", conf.get_opt("secret_pem"), + "root@%s:/%s" % (host, outfile), + os.path.dirname(outfile)] + execute(cmd_copy, exit_msg="%s - Copy command failed" % host, + logger=logger) + + +def node_cleanup(host, args): + localdir = is_host_local(host) + + # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug + # --gfidpath <TYPE> + cmd = [conf.get_opt("nodecleanup"), + args.session, + args.volume] + (["--debug"] if args.debug else []) + + if not localdir: + # prefix with ssh command if not local node + cmd = ["ssh", + "-i", conf.get_opt("secret_pem"), + "root@%s" % host] + cmd + + execute(cmd, exit_msg="%s - Cleanup failed" % host, logger=logger) + + +def cleanup(nodes, args): + pool = [] + for num, node in enumerate(nodes): + host, brick = node[1].split(":") + # temp output file + node_outfile = os.path.join(conf.get_opt("working_dir"), + args.session, + args.volume, + "tmp_output_%s.txt" % num) + + try: + os.remove(node_outfile) + except (OSError, IOError): + # TODO: Cleanup Failure, Handle + pass + + p = Process(target=node_cleanup, + args=(host, args)) + p.start() + pool.append(p) + + exit_codes = 0 + for p in pool: + p.join() + exit_codes += (0 if p.exitcode == 0 else 1) + + if exit_codes != 0: + sys.exit(1) + + +def failback_node_run(brick_details, idx, volume, start, outfile, args): + host, brick = brick_details.split(":") + p = Process(target=node_run, + args=(volume, host, brick, start, outfile, args, True)) + p.start() + p.join() + return p.exitcode + + +def run_in_nodes(volume, start, args): + """ + Get nodes of volume using gluster volume info, spawn a process + each for a Node. Merge the output files once all the process + complete their tasks. + """ + nodes = get_nodes(volume) + pool = [] + node_outfiles = [] + for num, node in enumerate(nodes): + host, brick = node[1].split(":") + # temp output file + node_outfile = os.path.join(conf.get_opt("working_dir"), + args.session, + volume, + "tmp_output_%s.txt" % num) + node_outfiles.append(node_outfile) + p = Process(target=node_run, args=(volume, host, brick, start, + node_outfile, args)) + p.start() + pool.append(p) + + exit_codes = 0 + for idx, p in enumerate(pool): + p.join() + # Handle the Changelog failure, fallback to Brickfind + if p.exitcode == 2: + rc = failback_node_run(nodes[idx][1], idx, volume, start, + node_outfiles[idx], args) + exit_codes += (0 if rc == 0 else 1) + elif p.exitcode != 0: + exit_codes += (0 if p.exitcode == 0 else 1) + + if exit_codes != 0: + sys.exit(1) + + # Merge all output files + cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + + cleanup(nodes, args) + + +def get_nodes(volume): + """ + Get the gluster volume info xml output and parse to get + the brick details. + """ + cmd = ["gluster", 'volume', 'info', volume, "--xml"] + _, data, _ = execute(cmd, + exit_msg="Failed to Run Gluster Volume Info", + logger=logger) + tree = etree.fromstring(data) + + nodes = [] + volume_el = tree.find('volInfo/volumes/volume') + try: + for b in volume_el.findall('bricks/brick'): + nodes.append((b.find('hostUuid').text, + b.find('name').text)) + except (ParseError, AttributeError, ValueError) as e: + fail("Failed to parse Volume Info: %s" % e, logger=logger) + + return nodes + + +def _get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=PROG_DESCRIPTION) + subparsers = parser.add_subparsers(dest="mode") + + # create <SESSION> <VOLUME> [--debug] [--force] + parser_create = subparsers.add_parser('create') + parser_create.add_argument("session", help="Session Name") + parser_create.add_argument("volume", help="Volume Name") + parser_create.add_argument("--debug", help="Debug", action="store_true") + parser_create.add_argument("--force", help="Force option to recreate " + "the session", action="store_true") + + # delete <SESSION> <VOLUME> [--debug] + parser_delete = subparsers.add_parser('delete') + parser_delete.add_argument("session", help="Session Name") + parser_delete.add_argument("volume", help="Volume Name") + parser_delete.add_argument("--debug", help="Debug", action="store_true") + + # list [--session <SESSION>] [--volume <VOLUME>] + parser_list = subparsers.add_parser('list') + parser_list.add_argument("--session", help="Session Name", default="") + parser_list.add_argument("--volume", help="Volume Name", default="") + parser_list.add_argument("--debug", help="Debug", action="store_true") + + # pre <SESSION> <VOLUME> <OUTFILE> [--change-detector <CHANGE_DETECTOR>] + # [--output-prefix <OUTPUT_PREFIX>] [--full] + parser_pre = subparsers.add_parser('pre') + parser_pre.add_argument("session", help="Session Name") + parser_pre.add_argument("volume", help="Volume Name") + parser_pre.add_argument("outfile", help="Output File") + parser_pre.add_argument("--debug", help="Debug", action="store_true") + parser_pre.add_argument("--full", help="Full find", action="store_true") + parser_pre.add_argument("--change-detector", dest="change_detector", + help="Change detection", + choices=conf.list_change_detectors(), + type=str, default='changelog') + parser_pre.add_argument("--output-prefix", help="File prefix in output", + default=".") + + # post <SESSION> <VOLUME> + parser_post = subparsers.add_parser('post') + parser_post.add_argument("session", help="Session Name") + parser_post.add_argument("volume", help="Volume Name") + parser_post.add_argument("--debug", help="Debug", action="store_true") + + return parser.parse_args() + + +def ssh_setup(): + if not os.path.exists(conf.get_opt("secret_pem")): + # Generate ssh-key + cmd = ["ssh-keygen", + "-N", + "", + "-f", + conf.get_opt("secret_pem")] + execute(cmd, + exit_msg="Unable to generate ssh key %s" + % conf.get_opt("secret_pem"), + logger=logger) + + logger.info("Ssh key generated %s" % conf.get_opt("secret_pem")) + + # Copy pub file to all nodes + cmd = ["gluster", + "system::", + "copy", + "file", + "/" + os.path.basename(conf.get_opt("secret_pem")) + ".pub"] + execute(cmd, exit_msg="Failed to distribute ssh keys", logger=logger) + + logger.info("Distributed ssh key to all nodes of Volume") + + # Add to authorized_keys file in each node + cmd = ["gluster", + "system::", + "execute", + "add_secret_pub", + "root", + os.path.basename(conf.get_opt("secret_pem")) + ".pub"] + execute(cmd, + exit_msg="Failed to add ssh keys to authorized_keys file", + logger=logger) + + logger.info("Ssh key added to authorized_keys of Volume nodes") + + +def mode_create(session_dir, args): + logger.debug("Init is called - Session: %s, Volume: %s" + % (args.session, args.volume)) + + mkdirp(session_dir, exit_on_err=True, logger=logger) + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + status_file = os.path.join(session_dir, args.volume, "status") + + if os.path.exists(status_file) and not args.force: + fail("Session %s already created" % args.session, logger=logger) + + if not os.path.exists(status_file) or args.force: + ssh_setup() + + execute(["gluster", "volume", "set", + args.volume, "build-pgfid", "on"], + exit_msg="Failed to set volume option build-pgfid on", + logger=logger) + logger.info("Volume option set %s, build-pgfid on" % args.volume) + + execute(["gluster", "volume", "set", + args.volume, "changelog.changelog", "on"], + exit_msg="Failed to set volume option " + "changelog.changelog on", logger=logger) + logger.info("Volume option set %s, changelog.changelog on" + % args.volume) + + if not os.path.exists(status_file): + with open(status_file, "w", buffering=0) as f: + # Add Rollover time to current time to make sure changelogs + # will be available if we use this time as start time + time_to_update = int(time.time()) + int( + conf.get_opt("changelog_rollover_time")) + f.write(str(time_to_update)) + + sys.exit(0) + + +def mode_pre(session_dir, args): + """ + Read from Session file and write to session.pre file + """ + endtime_to_update = int(time.time()) - int( + conf.get_opt("changelog_rollover_time")) + status_file = os.path.join(session_dir, args.volume, "status") + status_file_pre = status_file + ".pre" + + mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger) + + start = 0 + try: + with open(status_file) as f: + start = int(f.read().strip()) + except ValueError: + pass + except (OSError, IOError) as e: + fail("Error Opening Session file %s: %s" + % (status_file, e), logger=logger) + + logger.debug("Pre is called - Session: %s, Volume: %s, " + "Start time: %s, End time: %s" + % (args.session, args.volume, start, endtime_to_update)) + + run_in_nodes(args.volume, start, args) + + with open(status_file_pre, "w", buffering=0) as f: + f.write(str(endtime_to_update)) + + sys.stdout.write("Generated output file %s\n" % args.outfile) + + +def mode_post(session_dir, args): + """ + If pre session file exists, overwrite session file + If pre session file does not exists, return ERROR + """ + status_file = os.path.join(session_dir, args.volume, "status") + logger.debug("Post is called - Session: %s, Volume: %s" + % (args.session, args.volume)) + status_file_pre = status_file + ".pre" + + if os.path.exists(status_file_pre): + os.rename(status_file_pre, status_file) + sys.exit(0) + else: + fail("Pre script is not run", logger=logger) + + +def mode_delete(session_dir, args): + def handle_rm_error(func, path, exc_info): + if exc_info[1].errno == ENOENT: + return + + raise exc_info[1] + + shutil.rmtree(os.path.join(session_dir, args.volume), + onerror=handle_rm_error) + + +def mode_list(session_dir, args): + """ + List available sessions to stdout, if session name is set + only list that session. + """ + if args.session: + if not os.path.exists(os.path.join(session_dir, args.session)): + fail("Invalid Session", logger=logger) + sessions = [args.session] + else: + sessions = [] + for d in os.listdir(session_dir): + sessions.append(d) + + output = [] + for session in sessions: + # Session Volume Last Processed + volnames = os.listdir(os.path.join(session_dir, session)) + + for volname in volnames: + if args.volume and args.volume != volname: + continue + + status_file = os.path.join(session_dir, session, volname, "status") + last_processed = None + try: + with open(status_file) as f: + last_processed = f.read().strip() + except (OSError, IOError) as e: + if e.errno == ENOENT: + pass + else: + raise + output.append((session, volname, last_processed)) + + if output: + sys.stdout.write("%s %s %s\n" % ("SESSION".ljust(25), + "VOLUME".ljust(25), + "SESSION TIME".ljust(25))) + sys.stdout.write("-"*75) + sys.stdout.write("\n") + for session, volname, last_processed in output: + sys.stdout.write("%s %s %s\n" % (session.ljust(25), + volname.ljust(25), + human_time(last_processed).ljust(25))) + + +def main(): + args = _get_args() + mkdirp(conf.get_opt("session_dir"), exit_on_err=True) + + if args.mode == "list": + session_dir = conf.get_opt("session_dir") + else: + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + + if not os.path.exists(session_dir) and args.mode not in ["create", "list"]: + fail("Invalid session %s" % args.session) + + mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "cli.log") + setup_logger(logger, log_file, args.debug) + + # globals() will have all the functions already defined. + # mode_<args.mode> will be the function name to be called + globals()["mode_" + args.mode](session_dir, args) |