summaryrefslogtreecommitdiffstats
path: root/tools/glusterfind/src/main.py
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2015-04-20 14:03:58 +0530
committerVijay Bellur <vbellur@redhat.com>2015-05-08 21:19:06 -0700
commite561935d0153f00f2ddacde093c12284affe9538 (patch)
tree27c031039b27d5bf31c3e40d6366cd4b2f5d1b07 /tools/glusterfind/src/main.py
parentbbff9e1ef72e2eab63e5d7ecd5dfa36497b642ed (diff)
tools/glusterfind: Partial Find
Enabled by default, if one node fails Glusterfind will not fail to return list of files from other nodes. This behavior can be changed using --disable-partial Now session is maintained in each nodes as well as in initiator node. Every pre command will pick the status file from respective node and start collecting list of changes happened after the status time. --reset-session-time, new option to force reset the session time. Next incremental run will start from this time. Change-detector argument is removed since Changelog mode is required to detect deletes and Renames. Change-Id: I0b83bc7c0e1b30b13de772b2d21fe968db4ff964 Signed-off-by: Aravinda VK <avishwan@redhat.com> BUG: 1201289 Reviewed-on: http://review.gluster.org/10320 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Kotresh HR <khiremat@redhat.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'tools/glusterfind/src/main.py')
-rw-r--r--tools/glusterfind/src/main.py278
1 files changed, 126 insertions, 152 deletions
diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py
index ceaf1173897..089a3aec3c5 100644
--- a/tools/glusterfind/src/main.py
+++ b/tools/glusterfind/src/main.py
@@ -19,7 +19,8 @@ import logging
import shutil
from utils import execute, is_host_local, mkdirp, fail
-from utils import setup_logger, human_time
+from utils import setup_logger, human_time, handle_rm_error
+from utils import get_changelog_rollover_time, cache_output
import conf
@@ -29,6 +30,7 @@ GlusterFS Incremental API
ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError
logger = logging.getLogger()
+node_outfiles = []
class StoreAbsPath(Action):
@@ -46,33 +48,13 @@ def get_pem_key_path(session, volume):
"%s_%s_secret.pem" % (session, volume))
-def node_run(volume, host, path, start, outfile, args, fallback=False):
+def node_cmd(host, host_uuid, task, cmd, args, opts):
"""
- If host is local node, execute the command locally. If not local
- execute the CHANGE_DETECTOR command via ssh and copy the output file from
- remote node using scp.
+ Runs command via ssh if host is not local
"""
- localdir = is_host_local(host)
- pem_key_path = get_pem_key_path(args.session, args.volume)
+ localdir = is_host_local(host_uuid)
- # If Full backup is requested or start time is zero, use brickfind
- change_detector = conf.get_change_detector(args.change_detector)
- if ((start == 0 or args.full) and args.change_detector == "changelog") or \
- fallback:
- change_detector = conf.get_change_detector("brickfind")
-
- # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug
- # --gfidpath <TYPE>
- cmd = [change_detector,
- args.session,
- volume,
- path,
- outfile,
- str(start),
- "--output-prefix",
- args.output_prefix] + \
- (["--debug"] if args.debug else []) + \
- (["--full"] if args.full else [])
+ pem_key_path = get_pem_key_path(args.session, args.volume)
if not localdir:
# prefix with ssh command if not local node
@@ -80,128 +62,112 @@ def node_run(volume, host, path, start, outfile, args, fallback=False):
"-i", pem_key_path,
"root@%s" % host] + cmd
- rc, out, err = execute(cmd, logger=logger)
- if rc == 2:
- # Partial History Fallback
- logger.info("%s %s Fallback to brickfind" % (host, err.strip()))
- # Exit only from process, handled in main.
- sys.exit(rc)
- elif rc != 0:
- fail("%s - Change detection failed" % host, logger=logger)
+ execute(cmd, exit_msg="%s - %s failed" % (host, task), logger=logger)
- if not localdir:
+ if opts.get("copy_outfile", False):
cmd_copy = ["scp",
"-i", pem_key_path,
- "root@%s:/%s" % (host, outfile),
- os.path.dirname(outfile)]
+ "root@%s:/%s" % (host, opts.get("node_outfile")),
+ os.path.dirname(opts.get("node_outfile"))]
execute(cmd_copy, exit_msg="%s - Copy command failed" % host,
logger=logger)
-def node_cleanup(host, args):
- localdir = is_host_local(host)
-
- pem_key_path = get_pem_key_path(args.session, args.volume)
-
- # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug
- # --gfidpath <TYPE>
- cmd = [conf.get_opt("nodecleanup"),
- args.session,
- args.volume] + (["--debug"] if args.debug else [])
-
- if not localdir:
- # prefix with ssh command if not local node
- cmd = ["ssh",
- "-i", pem_key_path,
- "root@%s" % host] + cmd
-
- execute(cmd, exit_msg="%s - Cleanup failed" % host, logger=logger)
-
-
-def cleanup(nodes, args):
+def run_cmd_nodes(task, args, **kwargs):
+ global node_outfiles
+ nodes = get_nodes(args.volume)
pool = []
for num, node in enumerate(nodes):
host, brick = node[1].split(":")
- # temp output file
+ host_uuid = node[0]
+ cmd = []
+ opts = {}
node_outfile = os.path.join(conf.get_opt("working_dir"),
- args.session,
- args.volume,
- "tmp_output_%s.txt" % num)
-
- try:
- os.remove(node_outfile)
- except (OSError, IOError):
- # TODO: Cleanup Failure, Handle
- pass
-
- p = Process(target=node_cleanup,
- args=(host, args))
- p.start()
- pool.append(p)
-
- exit_codes = 0
- for p in pool:
- p.join()
- exit_codes += (0 if p.exitcode == 0 else 1)
-
- if exit_codes != 0:
- sys.exit(1)
-
-
-def failback_node_run(brick_details, idx, volume, start, outfile, args):
- host, brick = brick_details.split(":")
- p = Process(target=node_run,
- args=(volume, host, brick, start, outfile, args, True))
- p.start()
- p.join()
- return p.exitcode
-
-
-def run_in_nodes(volume, start, args):
- """
- Get nodes of volume using gluster volume info, spawn a process
- each for a Node. Merge the output files once all the process
- complete their tasks.
- """
- nodes = get_nodes(volume)
- pool = []
- node_outfiles = []
- for num, node in enumerate(nodes):
- host, brick = node[1].split(":")
- # temp output file
- node_outfile = os.path.join(conf.get_opt("working_dir"),
- args.session,
- volume,
- "tmp_output_%s.txt" % num)
- node_outfiles.append(node_outfile)
- p = Process(target=node_run, args=(volume, host, brick, start,
- node_outfile, args))
- p.start()
- pool.append(p)
-
- exit_codes = 0
- for idx, p in enumerate(pool):
+ args.session, args.volume,
+ "tmp_output_%s" % num)
+
+ if task == "pre":
+ # If Full backup is requested or start time is zero, use brickfind
+ change_detector = conf.get_change_detector("changelog")
+ if args.full:
+ change_detector = conf.get_change_detector("brickfind")
+
+ node_outfiles.append(node_outfile)
+
+ cmd = [change_detector,
+ args.session,
+ args.volume,
+ brick,
+ node_outfile,
+ str(kwargs.get("start")),
+ "--output-prefix",
+ args.output_prefix] + \
+ (["--debug"] if args.debug else []) + \
+ (["--only-namespace-changes"] if args.only_namespace_changes
+ else [])
+
+ opts["node_outfile"] = node_outfile
+ opts["copy_outfile"] = True
+ elif task == "cleanup":
+ # After pre run, cleanup the working directory and other temp files
+ # Remove the copied node_outfile in main node
+ try:
+ os.remove(node_outfile)
+ except (OSError, IOError):
+ logger.warn("Failed to cleanup temporary file %s" %
+ node_outfile)
+ pass
+
+ cmd = [conf.get_opt("nodeagent"),
+ "cleanup",
+ args.session,
+ args.volume] + (["--debug"] if args.debug else [])
+ elif task == "create":
+ # When glusterfind create, create session directory in
+ # each brick nodes
+ cmd = [conf.get_opt("nodeagent"),
+ "create",
+ args.session,
+ args.volume,
+ brick,
+ kwargs.get("time_to_update")] + \
+ (["--debug"] if args.debug else []) + \
+ (["--reset-session-time"] if args.reset_session_time
+ else [])
+ elif task == "post":
+ # Rename pre status file to actual status file in each node
+ cmd = [conf.get_opt("nodeagent"),
+ "post",
+ args.session,
+ args.volume,
+ brick] + \
+ (["--debug"] if args.debug else [])
+ elif task == "delete":
+ # When glusterfind delete, cleanup all the session files/dirs
+ # from each node.
+ cmd = [conf.get_opt("nodeagent"),
+ "delete",
+ args.session,
+ args.volume] + \
+ (["--debug"] if args.debug else [])
+
+ if cmd:
+ p = Process(target=node_cmd,
+ args=(host, host_uuid, task, cmd, args, opts))
+ p.start()
+ pool.append(p)
+
+ for num, p in enumerate(pool):
p.join()
- # Handle the Changelog failure, fallback to Brickfind
- if p.exitcode == 2:
- rc = failback_node_run(nodes[idx][1], idx, volume, start,
- node_outfiles[idx], args)
- exit_codes += (0 if rc == 0 else 1)
- elif p.exitcode != 0:
- exit_codes += (0 if p.exitcode == 0 else 1)
-
- if exit_codes != 0:
- sys.exit(1)
-
- # Merge all output files
- cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile]
- execute(cmd,
- exit_msg="Failed to merge output files "
- "collected from nodes", logger=logger)
-
- cleanup(nodes, args)
+ if p.exitcode != 0:
+ logger.warn("Command %s failed in %s" % (task, nodes[num][1]))
+ if task in ["create", "delete"]:
+ fail("Command %s failed in %s" % (task, nodes[num][1]))
+ elif task == "pre" and args.disable_partial:
+ sys.exit(1)
+@cache_output
def get_nodes(volume):
"""
Get the gluster volume info xml output and parse to get
@@ -237,6 +203,9 @@ def _get_args():
parser_create.add_argument("--debug", help="Debug", action="store_true")
parser_create.add_argument("--force", help="Force option to recreate "
"the session", action="store_true")
+ parser_create.add_argument("--reset-session-time",
+ help="Reset Session Time to Current Time",
+ action="store_true")
# delete <SESSION> <VOLUME> [--debug]
parser_delete = subparsers.add_parser('delete')
@@ -250,7 +219,7 @@ def _get_args():
parser_list.add_argument("--volume", help="Volume Name", default="")
parser_list.add_argument("--debug", help="Debug", action="store_true")
- # pre <SESSION> <VOLUME> <OUTFILE> [--change-detector <CHANGE_DETECTOR>]
+ # pre <SESSION> <VOLUME> <OUTFILE>
# [--output-prefix <OUTPUT_PREFIX>] [--full]
parser_pre = subparsers.add_parser('pre')
parser_pre.add_argument("session", help="Session Name")
@@ -258,10 +227,8 @@ def _get_args():
parser_pre.add_argument("outfile", help="Output File", action=StoreAbsPath)
parser_pre.add_argument("--debug", help="Debug", action="store_true")
parser_pre.add_argument("--full", help="Full find", action="store_true")
- parser_pre.add_argument("--change-detector", dest="change_detector",
- help="Change detection",
- choices=conf.list_change_detectors(),
- type=str, default='changelog')
+ parser_pre.add_argument("--disable-partial", help="Disable Partial find, "
+ "Fail when one node fails", action="store_true")
parser_pre.add_argument("--output-prefix", help="File prefix in output",
default=".")
parser_pre.add_argument("--regenerate-outfile",
@@ -363,12 +330,15 @@ def mode_create(session_dir, args):
logger.info("Volume option set %s, changelog.changelog on"
% args.volume)
- if not os.path.exists(status_file):
+ # Add Rollover time to current time to make sure changelogs
+ # will be available if we use this time as start time
+ time_to_update = int(time.time()) + get_changelog_rollover_time(
+ args.volume)
+
+ run_cmd_nodes("create", args, time_to_update=str(time_to_update))
+
+ if not os.path.exists(status_file) or args.reset_session_time:
with open(status_file, "w", buffering=0) as f:
- # Add Rollover time to current time to make sure changelogs
- # will be available if we use this time as start time
- time_to_update = int(time.time()) + int(
- conf.get_opt("changelog_rollover_time"))
f.write(str(time_to_update))
sys.exit(0)
@@ -378,8 +348,8 @@ def mode_pre(session_dir, args):
"""
Read from Session file and write to session.pre file
"""
- endtime_to_update = int(time.time()) - int(
- conf.get_opt("changelog_rollover_time"))
+ endtime_to_update = int(time.time()) - get_changelog_rollover_time(
+ args.volume)
status_file = os.path.join(session_dir, args.volume, "status")
status_file_pre = status_file + ".pre"
@@ -404,7 +374,15 @@ def mode_pre(session_dir, args):
"Start time: %s, End time: %s"
% (args.session, args.volume, start, endtime_to_update))
- run_in_nodes(args.volume, start, args)
+ run_cmd_nodes("pre", args, start=start)
+
+ # Merger
+ cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile]
+ execute(cmd,
+ exit_msg="Failed to merge output files "
+ "collected from nodes", logger=logger)
+
+ run_cmd_nodes("cleanup", args)
with open(status_file_pre, "w", buffering=0) as f:
f.write(str(endtime_to_update))
@@ -423,6 +401,7 @@ def mode_post(session_dir, args):
status_file_pre = status_file + ".pre"
if os.path.exists(status_file_pre):
+ run_cmd_nodes("post", args)
os.rename(status_file_pre, status_file)
sys.exit(0)
else:
@@ -430,12 +409,7 @@ def mode_post(session_dir, args):
def mode_delete(session_dir, args):
- def handle_rm_error(func, path, exc_info):
- if exc_info[1].errno == ENOENT:
- return
-
- raise exc_info[1]
-
+ run_cmd_nodes("delete", args)
shutil.rmtree(os.path.join(session_dir, args.volume),
onerror=handle_rm_error)