diff options
-rw-r--r-- | tools/glusterfind/src/Makefile.am | 4 | ||||
-rw-r--r-- | tools/glusterfind/src/brickfind.py | 39 | ||||
-rw-r--r-- | tools/glusterfind/src/changelog.py | 40 | ||||
-rw-r--r-- | tools/glusterfind/src/main.py | 278 | ||||
-rw-r--r-- | tools/glusterfind/src/nodeagent.py | 123 | ||||
-rw-r--r-- | tools/glusterfind/src/nodecleanup.py | 51 | ||||
-rw-r--r-- | tools/glusterfind/src/tool.conf.in | 3 | ||||
-rw-r--r-- | tools/glusterfind/src/utils.py | 118 |
8 files changed, 367 insertions, 289 deletions
diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am index 458b820fd19..7b819828d97 100644 --- a/tools/glusterfind/src/Makefile.am +++ b/tools/glusterfind/src/Makefile.am @@ -3,12 +3,12 @@ glusterfinddir = $(libexecdir)/glusterfs/glusterfind glusterfind_PYTHON = conf.py utils.py __init__.py \ main.py libgfchangelog.py -glusterfind_SCRIPTS = changelog.py nodecleanup.py \ +glusterfind_SCRIPTS = changelog.py nodeagent.py \ brickfind.py glusterfind_DATA = tool.conf -EXTRA_DIST = changelog.py nodecleanup.py brickfind.py \ +EXTRA_DIST = changelog.py nodeagent.py brickfind.py \ tool.conf CLEANFILES = diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py index 1090f408e28..9758bef56ff 100644 --- a/tools/glusterfind/src/brickfind.py +++ b/tools/glusterfind/src/brickfind.py @@ -12,7 +12,8 @@ import os import sys import logging from argparse import ArgumentParser, RawDescriptionHelpFormatter -from errno import ENOENT +import urllib +import time from utils import mkdirp, setup_logger, create_file, output_write, find import conf @@ -36,36 +37,17 @@ def brickfind_crawl(brick, args): with open(args.outfile, "a+") as fout: brick_path_len = len(brick) - def mtime_filter(path): - try: - st = os.lstat(path) - except (OSError, IOError) as e: - if e.errno == ENOENT: - st = None - else: - raise - - if st and (st.st_mtime > args.start or st.st_ctime > args.start): - return True - - return False - def output_callback(path): path = path.strip() path = path[brick_path_len+1:] - output_write(fout, path, args.output_prefix) + output_write(fout, path, args.output_prefix, encode=True) ignore_dirs = [os.path.join(brick, dirname) for dirname in conf.get_opt("brick_ignore_dirs").split(",")] - if args.full: - find(brick, callback_func=output_callback, - ignore_dirs=ignore_dirs) - else: - find(brick, callback_func=output_callback, - filter_func=mtime_filter, - ignore_dirs=ignore_dirs) + find(brick, callback_func=output_callback, + ignore_dirs=ignore_dirs) fout.flush() os.fsync(fout.fileno()) @@ -81,7 +63,6 @@ def _get_args(): parser.add_argument("outfile", help="Output File") parser.add_argument("start", help="Start Time", type=float) parser.add_argument("--debug", help="Debug", action="store_true") - parser.add_argument("--full", help="Full Find", action="store_true") parser.add_argument("--output-prefix", help="File prefix in output", default=".") @@ -90,6 +71,12 @@ def _get_args(): if __name__ == "__main__": args = _get_args() + session_dir = os.path.join(conf.get_opt("session_dir"), args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + status_file_pre = status_file + ".pre" + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), exit_on_err=True) log_file = os.path.join(conf.get_opt("log_dir"), @@ -97,5 +84,9 @@ if __name__ == "__main__": args.volume, "brickfind.log") setup_logger(logger, log_file, args.debug) + + time_to_update = int(time.time()) brickfind_crawl(args.brick, args) + with open(status_file_pre, "w", buffering=0) as f: + f.write(str(time_to_update)) sys.exit(0) diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py index eb73635fb32..2c4ee9106e1 100644 --- a/tools/glusterfind/src/changelog.py +++ b/tools/glusterfind/src/changelog.py @@ -16,10 +16,12 @@ from errno import ENOENT import logging from argparse import ArgumentParser, RawDescriptionHelpFormatter import hashlib +import urllib import libgfchangelog from utils import create_file, mkdirp, execute, symlink_gfid_to_path from utils import fail, setup_logger, output_write, find +from utils import get_changelog_rollover_time import conf @@ -190,7 +192,7 @@ def sort_unique(filename): exit_msg="Sort failed", logger=logger) -def get_changes(brick, hash_dir, log_file, end, args): +def get_changes(brick, hash_dir, log_file, start, end, args): """ Makes use of libgfchangelog's history API to get changelogs containing changes from start and end time. Further collects @@ -216,7 +218,7 @@ def get_changes(brick, hash_dir, log_file, end, args): # Fail if History fails for requested Start and End try: actual_end = libgfchangelog.cl_history_changelog( - cl_path, args.start, end, CHANGELOGAPI_NUM_WORKERS) + cl_path, start, end, CHANGELOGAPI_NUM_WORKERS) except libgfchangelog.ChangelogException as e: fail("%s Historical Changelogs not available: %s" % (brick, e), logger=logger) @@ -235,6 +237,11 @@ def get_changes(brick, hash_dir, log_file, end, args): if changes: with open(gfid_list_path, 'a+') as fgfid: for change in changes: + # Ignore if last processed changelog comes + # again in list + if change.endswith(".%s" % start): + continue + with open(change) as f: for line in f: # Space delimited list, collect GFID @@ -259,8 +266,10 @@ def get_changes(brick, hash_dir, log_file, end, args): args.outfile, gfid_list_failures_file) gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile) + return actual_end -def changelog_crawl(brick, end, args): + +def changelog_crawl(brick, start, end, args): """ Init function, prepares working dir and calls Changelog query """ @@ -283,8 +292,8 @@ def changelog_crawl(brick, end, args): "changelog.%s.log" % brickhash) logger.info("%s Started Changelog Crawl. Start: %s, End: %s" - % (brick, args.start, end)) - get_changes(brick, working_dir, log_file, end, args) + % (brick, start, end)) + return get_changes(brick, working_dir, log_file, start, end, args) def _get_args(): @@ -312,6 +321,23 @@ if __name__ == "__main__": args.volume, "changelog.log") setup_logger(logger, log_file, args.debug) - end = int(time.time()) - int(conf.get_opt("changelog_rollover_time")) - changelog_crawl(args.brick, end, args) + + session_dir = os.path.join(conf.get_opt("session_dir"), args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + status_file_pre = status_file + ".pre" + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + + try: + with open(status_file) as f: + start = int(f.read().strip()) + except (ValueError, OSError, IOError): + start = args.start + + end = int(time.time()) - get_changelog_rollover_time(args.volume) + actual_end = changelog_crawl(args.brick, start, end, args) + with open(status_file_pre, "w", buffering=0) as f: + f.write(str(actual_end)) + sys.exit(0) diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py index ceaf1173897..089a3aec3c5 100644 --- a/tools/glusterfind/src/main.py +++ b/tools/glusterfind/src/main.py @@ -19,7 +19,8 @@ import logging import shutil from utils import execute, is_host_local, mkdirp, fail -from utils import setup_logger, human_time +from utils import setup_logger, human_time, handle_rm_error +from utils import get_changelog_rollover_time, cache_output import conf @@ -29,6 +30,7 @@ GlusterFS Incremental API ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError logger = logging.getLogger() +node_outfiles = [] class StoreAbsPath(Action): @@ -46,33 +48,13 @@ def get_pem_key_path(session, volume): "%s_%s_secret.pem" % (session, volume)) -def node_run(volume, host, path, start, outfile, args, fallback=False): +def node_cmd(host, host_uuid, task, cmd, args, opts): """ - If host is local node, execute the command locally. If not local - execute the CHANGE_DETECTOR command via ssh and copy the output file from - remote node using scp. + Runs command via ssh if host is not local """ - localdir = is_host_local(host) - pem_key_path = get_pem_key_path(args.session, args.volume) + localdir = is_host_local(host_uuid) - # If Full backup is requested or start time is zero, use brickfind - change_detector = conf.get_change_detector(args.change_detector) - if ((start == 0 or args.full) and args.change_detector == "changelog") or \ - fallback: - change_detector = conf.get_change_detector("brickfind") - - # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug - # --gfidpath <TYPE> - cmd = [change_detector, - args.session, - volume, - path, - outfile, - str(start), - "--output-prefix", - args.output_prefix] + \ - (["--debug"] if args.debug else []) + \ - (["--full"] if args.full else []) + pem_key_path = get_pem_key_path(args.session, args.volume) if not localdir: # prefix with ssh command if not local node @@ -80,128 +62,112 @@ def node_run(volume, host, path, start, outfile, args, fallback=False): "-i", pem_key_path, "root@%s" % host] + cmd - rc, out, err = execute(cmd, logger=logger) - if rc == 2: - # Partial History Fallback - logger.info("%s %s Fallback to brickfind" % (host, err.strip())) - # Exit only from process, handled in main. - sys.exit(rc) - elif rc != 0: - fail("%s - Change detection failed" % host, logger=logger) + execute(cmd, exit_msg="%s - %s failed" % (host, task), logger=logger) - if not localdir: + if opts.get("copy_outfile", False): cmd_copy = ["scp", "-i", pem_key_path, - "root@%s:/%s" % (host, outfile), - os.path.dirname(outfile)] + "root@%s:/%s" % (host, opts.get("node_outfile")), + os.path.dirname(opts.get("node_outfile"))] execute(cmd_copy, exit_msg="%s - Copy command failed" % host, logger=logger) -def node_cleanup(host, args): - localdir = is_host_local(host) - - pem_key_path = get_pem_key_path(args.session, args.volume) - - # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug - # --gfidpath <TYPE> - cmd = [conf.get_opt("nodecleanup"), - args.session, - args.volume] + (["--debug"] if args.debug else []) - - if not localdir: - # prefix with ssh command if not local node - cmd = ["ssh", - "-i", pem_key_path, - "root@%s" % host] + cmd - - execute(cmd, exit_msg="%s - Cleanup failed" % host, logger=logger) - - -def cleanup(nodes, args): +def run_cmd_nodes(task, args, **kwargs): + global node_outfiles + nodes = get_nodes(args.volume) pool = [] for num, node in enumerate(nodes): host, brick = node[1].split(":") - # temp output file + host_uuid = node[0] + cmd = [] + opts = {} node_outfile = os.path.join(conf.get_opt("working_dir"), - args.session, - args.volume, - "tmp_output_%s.txt" % num) - - try: - os.remove(node_outfile) - except (OSError, IOError): - # TODO: Cleanup Failure, Handle - pass - - p = Process(target=node_cleanup, - args=(host, args)) - p.start() - pool.append(p) - - exit_codes = 0 - for p in pool: - p.join() - exit_codes += (0 if p.exitcode == 0 else 1) - - if exit_codes != 0: - sys.exit(1) - - -def failback_node_run(brick_details, idx, volume, start, outfile, args): - host, brick = brick_details.split(":") - p = Process(target=node_run, - args=(volume, host, brick, start, outfile, args, True)) - p.start() - p.join() - return p.exitcode - - -def run_in_nodes(volume, start, args): - """ - Get nodes of volume using gluster volume info, spawn a process - each for a Node. Merge the output files once all the process - complete their tasks. - """ - nodes = get_nodes(volume) - pool = [] - node_outfiles = [] - for num, node in enumerate(nodes): - host, brick = node[1].split(":") - # temp output file - node_outfile = os.path.join(conf.get_opt("working_dir"), - args.session, - volume, - "tmp_output_%s.txt" % num) - node_outfiles.append(node_outfile) - p = Process(target=node_run, args=(volume, host, brick, start, - node_outfile, args)) - p.start() - pool.append(p) - - exit_codes = 0 - for idx, p in enumerate(pool): + args.session, args.volume, + "tmp_output_%s" % num) + + if task == "pre": + # If Full backup is requested or start time is zero, use brickfind + change_detector = conf.get_change_detector("changelog") + if args.full: + change_detector = conf.get_change_detector("brickfind") + + node_outfiles.append(node_outfile) + + cmd = [change_detector, + args.session, + args.volume, + brick, + node_outfile, + str(kwargs.get("start")), + "--output-prefix", + args.output_prefix] + \ + (["--debug"] if args.debug else []) + \ + (["--only-namespace-changes"] if args.only_namespace_changes + else []) + + opts["node_outfile"] = node_outfile + opts["copy_outfile"] = True + elif task == "cleanup": + # After pre run, cleanup the working directory and other temp files + # Remove the copied node_outfile in main node + try: + os.remove(node_outfile) + except (OSError, IOError): + logger.warn("Failed to cleanup temporary file %s" % + node_outfile) + pass + + cmd = [conf.get_opt("nodeagent"), + "cleanup", + args.session, + args.volume] + (["--debug"] if args.debug else []) + elif task == "create": + # When glusterfind create, create session directory in + # each brick nodes + cmd = [conf.get_opt("nodeagent"), + "create", + args.session, + args.volume, + brick, + kwargs.get("time_to_update")] + \ + (["--debug"] if args.debug else []) + \ + (["--reset-session-time"] if args.reset_session_time + else []) + elif task == "post": + # Rename pre status file to actual status file in each node + cmd = [conf.get_opt("nodeagent"), + "post", + args.session, + args.volume, + brick] + \ + (["--debug"] if args.debug else []) + elif task == "delete": + # When glusterfind delete, cleanup all the session files/dirs + # from each node. + cmd = [conf.get_opt("nodeagent"), + "delete", + args.session, + args.volume] + \ + (["--debug"] if args.debug else []) + + if cmd: + p = Process(target=node_cmd, + args=(host, host_uuid, task, cmd, args, opts)) + p.start() + pool.append(p) + + for num, p in enumerate(pool): p.join() - # Handle the Changelog failure, fallback to Brickfind - if p.exitcode == 2: - rc = failback_node_run(nodes[idx][1], idx, volume, start, - node_outfiles[idx], args) - exit_codes += (0 if rc == 0 else 1) - elif p.exitcode != 0: - exit_codes += (0 if p.exitcode == 0 else 1) - - if exit_codes != 0: - sys.exit(1) - - # Merge all output files - cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] - execute(cmd, - exit_msg="Failed to merge output files " - "collected from nodes", logger=logger) - - cleanup(nodes, args) + if p.exitcode != 0: + logger.warn("Command %s failed in %s" % (task, nodes[num][1])) + if task in ["create", "delete"]: + fail("Command %s failed in %s" % (task, nodes[num][1])) + elif task == "pre" and args.disable_partial: + sys.exit(1) +@cache_output def get_nodes(volume): """ Get the gluster volume info xml output and parse to get @@ -237,6 +203,9 @@ def _get_args(): parser_create.add_argument("--debug", help="Debug", action="store_true") parser_create.add_argument("--force", help="Force option to recreate " "the session", action="store_true") + parser_create.add_argument("--reset-session-time", + help="Reset Session Time to Current Time", + action="store_true") # delete <SESSION> <VOLUME> [--debug] parser_delete = subparsers.add_parser('delete') @@ -250,7 +219,7 @@ def _get_args(): parser_list.add_argument("--volume", help="Volume Name", default="") parser_list.add_argument("--debug", help="Debug", action="store_true") - # pre <SESSION> <VOLUME> <OUTFILE> [--change-detector <CHANGE_DETECTOR>] + # pre <SESSION> <VOLUME> <OUTFILE> # [--output-prefix <OUTPUT_PREFIX>] [--full] parser_pre = subparsers.add_parser('pre') parser_pre.add_argument("session", help="Session Name") @@ -258,10 +227,8 @@ def _get_args(): parser_pre.add_argument("outfile", help="Output File", action=StoreAbsPath) parser_pre.add_argument("--debug", help="Debug", action="store_true") parser_pre.add_argument("--full", help="Full find", action="store_true") - parser_pre.add_argument("--change-detector", dest="change_detector", - help="Change detection", - choices=conf.list_change_detectors(), - type=str, default='changelog') + parser_pre.add_argument("--disable-partial", help="Disable Partial find, " + "Fail when one node fails", action="store_true") parser_pre.add_argument("--output-prefix", help="File prefix in output", default=".") parser_pre.add_argument("--regenerate-outfile", @@ -363,12 +330,15 @@ def mode_create(session_dir, args): logger.info("Volume option set %s, changelog.changelog on" % args.volume) - if not os.path.exists(status_file): + # Add Rollover time to current time to make sure changelogs + # will be available if we use this time as start time + time_to_update = int(time.time()) + get_changelog_rollover_time( + args.volume) + + run_cmd_nodes("create", args, time_to_update=str(time_to_update)) + + if not os.path.exists(status_file) or args.reset_session_time: with open(status_file, "w", buffering=0) as f: - # Add Rollover time to current time to make sure changelogs - # will be available if we use this time as start time - time_to_update = int(time.time()) + int( - conf.get_opt("changelog_rollover_time")) f.write(str(time_to_update)) sys.exit(0) @@ -378,8 +348,8 @@ def mode_pre(session_dir, args): """ Read from Session file and write to session.pre file """ - endtime_to_update = int(time.time()) - int( - conf.get_opt("changelog_rollover_time")) + endtime_to_update = int(time.time()) - get_changelog_rollover_time( + args.volume) status_file = os.path.join(session_dir, args.volume, "status") status_file_pre = status_file + ".pre" @@ -404,7 +374,15 @@ def mode_pre(session_dir, args): "Start time: %s, End time: %s" % (args.session, args.volume, start, endtime_to_update)) - run_in_nodes(args.volume, start, args) + run_cmd_nodes("pre", args, start=start) + + # Merger + cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + + run_cmd_nodes("cleanup", args) with open(status_file_pre, "w", buffering=0) as f: f.write(str(endtime_to_update)) @@ -423,6 +401,7 @@ def mode_post(session_dir, args): status_file_pre = status_file + ".pre" if os.path.exists(status_file_pre): + run_cmd_nodes("post", args) os.rename(status_file_pre, status_file) sys.exit(0) else: @@ -430,12 +409,7 @@ def mode_post(session_dir, args): def mode_delete(session_dir, args): - def handle_rm_error(func, path, exc_info): - if exc_info[1].errno == ENOENT: - return - - raise exc_info[1] - + run_cmd_nodes("delete", args) shutil.rmtree(os.path.join(session_dir, args.volume), onerror=handle_rm_error) diff --git a/tools/glusterfind/src/nodeagent.py b/tools/glusterfind/src/nodeagent.py new file mode 100644 index 00000000000..2e8c2fc9759 --- /dev/null +++ b/tools/glusterfind/src/nodeagent.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import shutil +import sys +import os +import logging +from argparse import ArgumentParser, RawDescriptionHelpFormatter +import urllib + +from utils import setup_logger, mkdirp, handle_rm_error +import conf + +logger = logging.getLogger() + + +def mode_cleanup(args): + working_dir = os.path.join(conf.get_opt("working_dir"), + args.session, + args.volume) + + mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "changelog.log") + + setup_logger(logger, log_file) + + try: + shutil.rmtree(working_dir, onerror=handle_rm_error) + except (OSError, IOError) as e: + logger.error("Failed to delete working directory: %s" % e) + sys.exit(1) + + +def mode_create(args): + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + + if not os.path.exists(status_file) or args.reset_session_time: + with open(status_file, "w", buffering=0) as f: + f.write(args.time_to_update) + + sys.exit(0) + + +def mode_post(args): + session_dir = os.path.join(conf.get_opt("session_dir"), args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + status_file_pre = status_file + ".pre" + + if os.path.exists(status_file_pre): + os.rename(status_file_pre, status_file) + sys.exit(0) + + +def mode_delete(args): + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + shutil.rmtree(os.path.join(session_dir, args.volume), + onerror=handle_rm_error) + + +def _get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description="Node Agent") + subparsers = parser.add_subparsers(dest="mode") + + parser_cleanup = subparsers.add_parser('cleanup') + parser_cleanup.add_argument("session", help="Session Name") + parser_cleanup.add_argument("volume", help="Volume Name") + parser_cleanup.add_argument("--debug", help="Debug", action="store_true") + + parser_session_create = subparsers.add_parser('create') + parser_session_create.add_argument("session", help="Session Name") + parser_session_create.add_argument("volume", help="Volume Name") + parser_session_create.add_argument("brick", help="Brick Path") + parser_session_create.add_argument("time_to_update", help="Time to Update") + parser_session_create.add_argument("--reset-session-time", + help="Reset Session Time", + action="store_true") + parser_session_create.add_argument("--debug", help="Debug", + action="store_true") + + parser_post = subparsers.add_parser('post') + parser_post.add_argument("session", help="Session Name") + parser_post.add_argument("volume", help="Volume Name") + parser_post.add_argument("brick", help="Brick Path") + parser_post.add_argument("--debug", help="Debug", + action="store_true") + + parser_delete = subparsers.add_parser('delete') + parser_delete.add_argument("session", help="Session Name") + parser_delete.add_argument("volume", help="Volume Name") + parser_delete.add_argument("--debug", help="Debug", + action="store_true") + return parser.parse_args() + + +if __name__ == "__main__": + args = _get_args() + + # globals() will have all the functions already defined. + # mode_<args.mode> will be the function name to be called + globals()["mode_" + args.mode](args) diff --git a/tools/glusterfind/src/nodecleanup.py b/tools/glusterfind/src/nodecleanup.py deleted file mode 100644 index a31d4d83acd..00000000000 --- a/tools/glusterfind/src/nodecleanup.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> -# This file is part of GlusterFS. -# -# This file is licensed to you under your choice of the GNU Lesser -# General Public License, version 3 or any later version (LGPLv3 or -# later), or the GNU General Public License, version 2 (GPLv2), in all -# cases as published by the Free Software Foundation. - -import shutil -import sys -import os -import logging -from errno import ENOENT - -from utils import setup_logger, mkdirp -import conf - -logger = logging.getLogger() - - -if __name__ == "__main__": - # Args: <SESSION> <VOLUME> - session = sys.argv[1] - volume = sys.argv[2] - - working_dir = os.path.join(conf.get_opt("working_dir"), - session, - volume) - - mkdirp(os.path.join(conf.get_opt("log_dir"), session, volume), - exit_on_err=True) - log_file = os.path.join(conf.get_opt("log_dir"), - session, - volume, - "changelog.log") - - setup_logger(logger, log_file) - - try: - def handle_rm_error(func, path, exc_info): - if exc_info[1].errno == ENOENT: - return - - raise exc_info[1] - - shutil.rmtree(working_dir, onerror=handle_rm_error) - except (OSError, IOError) as e: - logger.error("Failed to delete working directory: %s" % e) - sys.exit(1) diff --git a/tools/glusterfind/src/tool.conf.in b/tools/glusterfind/src/tool.conf.in index 54230cb4dca..a80f4a784c0 100644 --- a/tools/glusterfind/src/tool.conf.in +++ b/tools/glusterfind/src/tool.conf.in @@ -2,8 +2,7 @@ session_dir=@GLUSTERD_WORKDIR@/glusterfind/ working_dir=@GLUSTERFSD_MISCDIR@/glusterfind/ log_dir=/var/log/glusterfs/glusterfind/ -nodecleanup=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodecleanup.py -changelog_rollover_time=15 +nodeagent=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodeagent.py brick_ignore_dirs=.glusterfs,.trashcan [change_detectors] diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py index de9c027e299..aea9a9dc82d 100644 --- a/tools/glusterfind/src/utils.py +++ b/tools/glusterfind/src/utils.py @@ -9,14 +9,36 @@ # cases as published by the Free Software Foundation. import sys -import socket from subprocess import PIPE, Popen -from errno import EPERM, EEXIST +from errno import EEXIST, ENOENT +import xml.etree.cElementTree as etree import logging import os from datetime import datetime +import urllib ROOT_GFID = "00000000-0000-0000-0000-000000000001" +DEFAULT_CHANGELOG_INTERVAL = 15 + +ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError +cache_data = {} + + +def cache_output(func): + def wrapper(*args, **kwargs): + global cache_data + if cache_data.get(func.func_name, None) is None: + cache_data[func.func_name] = func(*args, **kwargs) + + return cache_data[func.func_name] + return wrapper + + +def handle_rm_error(func, path, exc_info): + if exc_info[1].errno == ENOENT: + return + + raise exc_info[1] def find(path, callback_func=lambda x: True, filter_func=lambda x: True, @@ -41,12 +63,16 @@ def find(path, callback_func=lambda x: True, filter_func=lambda x: True, callback_func(full_path) -def output_write(f, path, prefix="."): +def output_write(f, path, prefix=".", encode=False): if path == "": return if prefix != ".": path = os.path.join(prefix, path) + + if encode: + path = urllib.quote_plus(path) + f.write("%s\n" % path) @@ -153,51 +179,41 @@ def symlink_gfid_to_path(brick, gfid): return out_path -def is_host_local(host): - """ - Find if a host is local or not. - Code copied from $GLUSTERFS/geo-replication/syncdaemon/syncdutils.py - """ - locaddr = False - for ai in socket.getaddrinfo(host, None): - # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators - # /mgmt/glusterd/src/glusterd-utils.c#L125 - if ai[0] == socket.AF_INET: - if ai[-1][0].split(".")[0] == "127": - locaddr = True - break - elif ai[0] == socket.AF_INET6: - if ai[-1][0] == "::1": - locaddr = True - break - else: - continue - try: - # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue, - # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587 - s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP) - except socket.error: - ex = sys.exc_info()[1] - if ex.errno != EPERM: - raise - f = None - try: - f = open("/proc/sys/net/ipv4/ip_nonlocal_bind") - if int(f.read()) != 0: - logger.warning("non-local bind is set and not " - "allowed to create " - "raw sockets, cannot determine " - "if %s is local" % host) - return False - s = socket.socket(ai[0], socket.SOCK_DGRAM) - finally: - if f: - f.close() - try: - s.bind(ai[-1]) - locaddr = True - break - except: - pass - s.close() - return locaddr +@cache_output +def get_my_uuid(): + cmd = ["gluster", "system::", "uuid", "get", "--xml"] + rc, out, err = execute(cmd) + + if rc != 0: + return None + + tree = etree.fromstring(out) + uuid_el = tree.find("uuidGenerate/uuid") + return uuid_el.text + + +def is_host_local(host_uuid): + # Get UUID only if it is not done previously + # else Cache the UUID value + my_uuid = get_my_uuid() + if my_uuid == host_uuid: + return True + + return False + + +def get_changelog_rollover_time(volumename): + cmd = ["gluster", "volume", "get", volumename, + "changelog.rollover-time", "--xml"] + rc, out, err = execute(cmd) + + if rc != 0: + return DEFAULT_CHANGELOG_INTERVAL + + try: + tree = etree.fromstring(out) + return int(tree.find('volGetopts/Value').text) + except ParseError: + return DEFAULT_CHANGELOG_INTERVAL + + |