diff options
Diffstat (limited to 'tools/glusterfind/src/main.py')
-rw-r--r-- | tools/glusterfind/src/main.py | 278 |
1 files changed, 126 insertions, 152 deletions
diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py index ceaf1173897..089a3aec3c5 100644 --- a/tools/glusterfind/src/main.py +++ b/tools/glusterfind/src/main.py @@ -19,7 +19,8 @@ import logging import shutil from utils import execute, is_host_local, mkdirp, fail -from utils import setup_logger, human_time +from utils import setup_logger, human_time, handle_rm_error +from utils import get_changelog_rollover_time, cache_output import conf @@ -29,6 +30,7 @@ GlusterFS Incremental API ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError logger = logging.getLogger() +node_outfiles = [] class StoreAbsPath(Action): @@ -46,33 +48,13 @@ def get_pem_key_path(session, volume): "%s_%s_secret.pem" % (session, volume)) -def node_run(volume, host, path, start, outfile, args, fallback=False): +def node_cmd(host, host_uuid, task, cmd, args, opts): """ - If host is local node, execute the command locally. If not local - execute the CHANGE_DETECTOR command via ssh and copy the output file from - remote node using scp. + Runs command via ssh if host is not local """ - localdir = is_host_local(host) - pem_key_path = get_pem_key_path(args.session, args.volume) + localdir = is_host_local(host_uuid) - # If Full backup is requested or start time is zero, use brickfind - change_detector = conf.get_change_detector(args.change_detector) - if ((start == 0 or args.full) and args.change_detector == "changelog") or \ - fallback: - change_detector = conf.get_change_detector("brickfind") - - # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug - # --gfidpath <TYPE> - cmd = [change_detector, - args.session, - volume, - path, - outfile, - str(start), - "--output-prefix", - args.output_prefix] + \ - (["--debug"] if args.debug else []) + \ - (["--full"] if args.full else []) + pem_key_path = get_pem_key_path(args.session, args.volume) if not localdir: # prefix with ssh command if not local node @@ -80,128 +62,112 @@ def node_run(volume, host, path, start, outfile, args, fallback=False): "-i", pem_key_path, "root@%s" % host] + cmd - rc, out, err = execute(cmd, logger=logger) - if rc == 2: - # Partial History Fallback - logger.info("%s %s Fallback to brickfind" % (host, err.strip())) - # Exit only from process, handled in main. - sys.exit(rc) - elif rc != 0: - fail("%s - Change detection failed" % host, logger=logger) + execute(cmd, exit_msg="%s - %s failed" % (host, task), logger=logger) - if not localdir: + if opts.get("copy_outfile", False): cmd_copy = ["scp", "-i", pem_key_path, - "root@%s:/%s" % (host, outfile), - os.path.dirname(outfile)] + "root@%s:/%s" % (host, opts.get("node_outfile")), + os.path.dirname(opts.get("node_outfile"))] execute(cmd_copy, exit_msg="%s - Copy command failed" % host, logger=logger) -def node_cleanup(host, args): - localdir = is_host_local(host) - - pem_key_path = get_pem_key_path(args.session, args.volume) - - # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug - # --gfidpath <TYPE> - cmd = [conf.get_opt("nodecleanup"), - args.session, - args.volume] + (["--debug"] if args.debug else []) - - if not localdir: - # prefix with ssh command if not local node - cmd = ["ssh", - "-i", pem_key_path, - "root@%s" % host] + cmd - - execute(cmd, exit_msg="%s - Cleanup failed" % host, logger=logger) - - -def cleanup(nodes, args): +def run_cmd_nodes(task, args, **kwargs): + global node_outfiles + nodes = get_nodes(args.volume) pool = [] for num, node in enumerate(nodes): host, brick = node[1].split(":") - # temp output file + host_uuid = node[0] + cmd = [] + opts = {} node_outfile = os.path.join(conf.get_opt("working_dir"), - args.session, - args.volume, - "tmp_output_%s.txt" % num) - - try: - os.remove(node_outfile) - except (OSError, IOError): - # TODO: Cleanup Failure, Handle - pass - - p = Process(target=node_cleanup, - args=(host, args)) - p.start() - pool.append(p) - - exit_codes = 0 - for p in pool: - p.join() - exit_codes += (0 if p.exitcode == 0 else 1) - - if exit_codes != 0: - sys.exit(1) - - -def failback_node_run(brick_details, idx, volume, start, outfile, args): - host, brick = brick_details.split(":") - p = Process(target=node_run, - args=(volume, host, brick, start, outfile, args, True)) - p.start() - p.join() - return p.exitcode - - -def run_in_nodes(volume, start, args): - """ - Get nodes of volume using gluster volume info, spawn a process - each for a Node. Merge the output files once all the process - complete their tasks. - """ - nodes = get_nodes(volume) - pool = [] - node_outfiles = [] - for num, node in enumerate(nodes): - host, brick = node[1].split(":") - # temp output file - node_outfile = os.path.join(conf.get_opt("working_dir"), - args.session, - volume, - "tmp_output_%s.txt" % num) - node_outfiles.append(node_outfile) - p = Process(target=node_run, args=(volume, host, brick, start, - node_outfile, args)) - p.start() - pool.append(p) - - exit_codes = 0 - for idx, p in enumerate(pool): + args.session, args.volume, + "tmp_output_%s" % num) + + if task == "pre": + # If Full backup is requested or start time is zero, use brickfind + change_detector = conf.get_change_detector("changelog") + if args.full: + change_detector = conf.get_change_detector("brickfind") + + node_outfiles.append(node_outfile) + + cmd = [change_detector, + args.session, + args.volume, + brick, + node_outfile, + str(kwargs.get("start")), + "--output-prefix", + args.output_prefix] + \ + (["--debug"] if args.debug else []) + \ + (["--only-namespace-changes"] if args.only_namespace_changes + else []) + + opts["node_outfile"] = node_outfile + opts["copy_outfile"] = True + elif task == "cleanup": + # After pre run, cleanup the working directory and other temp files + # Remove the copied node_outfile in main node + try: + os.remove(node_outfile) + except (OSError, IOError): + logger.warn("Failed to cleanup temporary file %s" % + node_outfile) + pass + + cmd = [conf.get_opt("nodeagent"), + "cleanup", + args.session, + args.volume] + (["--debug"] if args.debug else []) + elif task == "create": + # When glusterfind create, create session directory in + # each brick nodes + cmd = [conf.get_opt("nodeagent"), + "create", + args.session, + args.volume, + brick, + kwargs.get("time_to_update")] + \ + (["--debug"] if args.debug else []) + \ + (["--reset-session-time"] if args.reset_session_time + else []) + elif task == "post": + # Rename pre status file to actual status file in each node + cmd = [conf.get_opt("nodeagent"), + "post", + args.session, + args.volume, + brick] + \ + (["--debug"] if args.debug else []) + elif task == "delete": + # When glusterfind delete, cleanup all the session files/dirs + # from each node. + cmd = [conf.get_opt("nodeagent"), + "delete", + args.session, + args.volume] + \ + (["--debug"] if args.debug else []) + + if cmd: + p = Process(target=node_cmd, + args=(host, host_uuid, task, cmd, args, opts)) + p.start() + pool.append(p) + + for num, p in enumerate(pool): p.join() - # Handle the Changelog failure, fallback to Brickfind - if p.exitcode == 2: - rc = failback_node_run(nodes[idx][1], idx, volume, start, - node_outfiles[idx], args) - exit_codes += (0 if rc == 0 else 1) - elif p.exitcode != 0: - exit_codes += (0 if p.exitcode == 0 else 1) - - if exit_codes != 0: - sys.exit(1) - - # Merge all output files - cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] - execute(cmd, - exit_msg="Failed to merge output files " - "collected from nodes", logger=logger) - - cleanup(nodes, args) + if p.exitcode != 0: + logger.warn("Command %s failed in %s" % (task, nodes[num][1])) + if task in ["create", "delete"]: + fail("Command %s failed in %s" % (task, nodes[num][1])) + elif task == "pre" and args.disable_partial: + sys.exit(1) +@cache_output def get_nodes(volume): """ Get the gluster volume info xml output and parse to get @@ -237,6 +203,9 @@ def _get_args(): parser_create.add_argument("--debug", help="Debug", action="store_true") parser_create.add_argument("--force", help="Force option to recreate " "the session", action="store_true") + parser_create.add_argument("--reset-session-time", + help="Reset Session Time to Current Time", + action="store_true") # delete <SESSION> <VOLUME> [--debug] parser_delete = subparsers.add_parser('delete') @@ -250,7 +219,7 @@ def _get_args(): parser_list.add_argument("--volume", help="Volume Name", default="") parser_list.add_argument("--debug", help="Debug", action="store_true") - # pre <SESSION> <VOLUME> <OUTFILE> [--change-detector <CHANGE_DETECTOR>] + # pre <SESSION> <VOLUME> <OUTFILE> # [--output-prefix <OUTPUT_PREFIX>] [--full] parser_pre = subparsers.add_parser('pre') parser_pre.add_argument("session", help="Session Name") @@ -258,10 +227,8 @@ def _get_args(): parser_pre.add_argument("outfile", help="Output File", action=StoreAbsPath) parser_pre.add_argument("--debug", help="Debug", action="store_true") parser_pre.add_argument("--full", help="Full find", action="store_true") - parser_pre.add_argument("--change-detector", dest="change_detector", - help="Change detection", - choices=conf.list_change_detectors(), - type=str, default='changelog') + parser_pre.add_argument("--disable-partial", help="Disable Partial find, " + "Fail when one node fails", action="store_true") parser_pre.add_argument("--output-prefix", help="File prefix in output", default=".") parser_pre.add_argument("--regenerate-outfile", @@ -363,12 +330,15 @@ def mode_create(session_dir, args): logger.info("Volume option set %s, changelog.changelog on" % args.volume) - if not os.path.exists(status_file): + # Add Rollover time to current time to make sure changelogs + # will be available if we use this time as start time + time_to_update = int(time.time()) + get_changelog_rollover_time( + args.volume) + + run_cmd_nodes("create", args, time_to_update=str(time_to_update)) + + if not os.path.exists(status_file) or args.reset_session_time: with open(status_file, "w", buffering=0) as f: - # Add Rollover time to current time to make sure changelogs - # will be available if we use this time as start time - time_to_update = int(time.time()) + int( - conf.get_opt("changelog_rollover_time")) f.write(str(time_to_update)) sys.exit(0) @@ -378,8 +348,8 @@ def mode_pre(session_dir, args): """ Read from Session file and write to session.pre file """ - endtime_to_update = int(time.time()) - int( - conf.get_opt("changelog_rollover_time")) + endtime_to_update = int(time.time()) - get_changelog_rollover_time( + args.volume) status_file = os.path.join(session_dir, args.volume, "status") status_file_pre = status_file + ".pre" @@ -404,7 +374,15 @@ def mode_pre(session_dir, args): "Start time: %s, End time: %s" % (args.session, args.volume, start, endtime_to_update)) - run_in_nodes(args.volume, start, args) + run_cmd_nodes("pre", args, start=start) + + # Merger + cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + + run_cmd_nodes("cleanup", args) with open(status_file_pre, "w", buffering=0) as f: f.write(str(endtime_to_update)) @@ -423,6 +401,7 @@ def mode_post(session_dir, args): status_file_pre = status_file + ".pre" if os.path.exists(status_file_pre): + run_cmd_nodes("post", args) os.rename(status_file_pre, status_file) sys.exit(0) else: @@ -430,12 +409,7 @@ def mode_post(session_dir, args): def mode_delete(session_dir, args): - def handle_rm_error(func, path, exc_info): - if exc_info[1].errno == ENOENT: - return - - raise exc_info[1] - + run_cmd_nodes("delete", args) shutil.rmtree(os.path.join(session_dir, args.volume), onerror=handle_rm_error) |