summaryrefslogtreecommitdiffstats
path: root/tools/glusterfind/src/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/glusterfind/src/main.py')
-rw-r--r--tools/glusterfind/src/main.py287
1 files changed, 224 insertions, 63 deletions
diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py
index 37d6c38cc49..4b5466d0114 100644
--- a/tools/glusterfind/src/main.py
+++ b/tools/glusterfind/src/main.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
@@ -16,15 +16,20 @@ from multiprocessing import Process
import os
import xml.etree.cElementTree as etree
from argparse import ArgumentParser, RawDescriptionHelpFormatter, Action
+from gfind_py2py3 import gfind_write_row, gfind_write
import logging
import shutil
+import tempfile
+import signal
+from datetime import datetime
+import codecs
+import re
from utils import execute, is_host_local, mkdirp, fail
from utils import setup_logger, human_time, handle_rm_error
from utils import get_changelog_rollover_time, cache_output, create_file
import conf
from changelogdata import OutputMerger
-import codecs
PROG_DESCRIPTION = """
GlusterFS Incremental API
@@ -32,8 +37,9 @@ GlusterFS Incremental API
ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError
logger = logging.getLogger()
-node_outfiles = []
vol_statusStr = ""
+gtmpfilename = None
+g_pid_nodefile_map = {}
class StoreAbsPath(Action):
@@ -71,10 +77,27 @@ def node_cmd(host, host_uuid, task, cmd, args, opts):
cmd = ["ssh",
"-oNumberOfPasswordPrompts=0",
"-oStrictHostKeyChecking=no",
+ # We force TTY allocation (-t -t) so that Ctrl+C is handed
+ # through; see:
+ # https://bugzilla.redhat.com/show_bug.cgi?id=1382236
+ # Note that this turns stderr of the remote `cmd`
+ # into stdout locally.
+ "-t",
+ "-t",
"-i", pem_key_path,
"root@%s" % host] + cmd
- execute(cmd, exit_msg="%s - %s failed" % (host, task), logger=logger)
+ (returncode, err, out) = execute(cmd, logger=logger)
+ if returncode != 0:
+ # Because the `-t -t` above turns the remote stderr into
+ # local stdout, we need to log both stderr and stdout
+ # here to print all error messages.
+ fail("%s - %s failed; stdout (including remote stderr):\n"
+ "%s\n"
+ "stderr:\n"
+ "%s" % (host, task, out, err),
+ returncode,
+ logger=logger)
if opts.get("copy_outfile", False) and not localdir:
cmd_copy = ["scp",
@@ -90,7 +113,7 @@ def node_cmd(host, host_uuid, task, cmd, args, opts):
def run_cmd_nodes(task, args, **kwargs):
- global node_outfiles
+ global g_pid_nodefile_map
nodes = get_nodes(args.volume)
pool = []
for num, node in enumerate(nodes):
@@ -98,8 +121,13 @@ def run_cmd_nodes(task, args, **kwargs):
host_uuid = node[0]
cmd = []
opts = {}
+
+ # tmpfilename is valid only for tasks: pre, query and cleanup
+ tmpfilename = kwargs.get("tmpfilename", "BADNAME")
+
node_outfile = os.path.join(conf.get_opt("working_dir"),
args.session, args.volume,
+ tmpfilename,
"tmp_output_%s" % num)
if task == "pre":
@@ -116,20 +144,30 @@ def run_cmd_nodes(task, args, **kwargs):
if tag == "":
tag = '""' if not is_host_local(host_uuid) else ""
- node_outfiles.append(node_outfile)
+ # remote file will be copied into this directory
+ mkdirp(os.path.dirname(node_outfile),
+ exit_on_err=True, logger=logger)
+
+ FS = args.field_separator
+ if not is_host_local(host_uuid):
+ FS = "'" + FS + "'"
cmd = [change_detector,
args.session,
args.volume,
+ host,
brick,
- node_outfile,
- str(kwargs.get("start"))] + \
+ node_outfile] + \
+ ([str(kwargs.get("start")), str(kwargs.get("end"))]
+ if not args.full else []) + \
([tag] if tag is not None else []) + \
["--output-prefix", args.output_prefix] + \
(["--debug"] if args.debug else []) + \
(["--no-encode"] if args.no_encode else []) + \
(["--only-namespace-changes"] if args.only_namespace_changes
- else [])
+ else []) + \
+ (["--type", args.type]) + \
+ (["--field-separator", FS] if args.full else [])
opts["node_outfile"] = node_outfile
opts["copy_outfile"] = True
@@ -143,27 +181,38 @@ def run_cmd_nodes(task, args, **kwargs):
if tag == "":
tag = '""' if not is_host_local(host_uuid) else ""
- node_outfiles.append(node_outfile)
+ # remote file will be copied into this directory
+ mkdirp(os.path.dirname(node_outfile),
+ exit_on_err=True, logger=logger)
+
+ FS = args.field_separator
+ if not is_host_local(host_uuid):
+ FS = "'" + FS + "'"
cmd = [change_detector,
args.session,
args.volume,
+ host,
brick,
- node_outfile,
- str(kwargs.get("start"))] + \
+ node_outfile] + \
+ ([str(kwargs.get("start")), str(kwargs.get("end"))]
+ if not args.full else []) + \
([tag] if tag is not None else []) + \
["--only-query"] + \
["--output-prefix", args.output_prefix] + \
(["--debug"] if args.debug else []) + \
(["--no-encode"] if args.no_encode else []) + \
(["--only-namespace-changes"]
- if args.only_namespace_changes else [])
+ if args.only_namespace_changes else []) + \
+ (["--type", args.type]) + \
+ (["--field-separator", FS] if args.full else [])
opts["node_outfile"] = node_outfile
opts["copy_outfile"] = True
elif task == "cleanup":
- # After pre run, cleanup the working directory and other temp files
- # Remove the copied node_outfile in main node
+ # After pre/query run, cleanup the working directory and other
+ # temp files. Remove the directory to which node_outfile has
+ # been copied in main node
try:
os.remove(node_outfile)
except (OSError, IOError):
@@ -174,7 +223,9 @@ def run_cmd_nodes(task, args, **kwargs):
cmd = [conf.get_opt("nodeagent"),
"cleanup",
args.session,
- args.volume] + (["--debug"] if args.debug else [])
+ args.volume,
+ os.path.dirname(node_outfile)] + \
+ (["--debug"] if args.debug else [])
elif task == "create":
if vol_statusStr != "Started":
fail("Volume %s is not online" % args.volume,
@@ -213,6 +264,7 @@ def run_cmd_nodes(task, args, **kwargs):
args=(host, host_uuid, task, cmd, args, opts))
p.start()
pool.append(p)
+ g_pid_nodefile_map[p.pid] = node_outfile
for num, p in enumerate(pool):
p.join()
@@ -220,8 +272,11 @@ def run_cmd_nodes(task, args, **kwargs):
logger.warn("Command %s failed in %s" % (task, nodes[num][1]))
if task in ["create", "delete"]:
fail("Command %s failed in %s" % (task, nodes[num][1]))
- elif task == "pre" and args.disable_partial:
- sys.exit(1)
+ elif task == "pre" or task == "query":
+ if args.disable_partial:
+ sys.exit(1)
+ else:
+ del g_pid_nodefile_map[p.pid]
@cache_output
@@ -245,13 +300,22 @@ def get_nodes(volume):
# this status is used in caller: run_cmd_nodes
vol_statusStr = tree.find('volInfo/volumes/volume/statusStr').text
+ vol_typeStr = tree.find('volInfo/volumes/volume/typeStr').text
nodes = []
volume_el = tree.find('volInfo/volumes/volume')
try:
- for b in volume_el.findall('bricks/brick'):
- nodes.append((b.find('hostUuid').text,
- b.find('name').text))
+ brick_elems = []
+ if vol_typeStr == "Tier":
+ brick_elems.append('bricks/hotBricks/brick')
+ brick_elems.append('bricks/coldBricks/brick')
+ else:
+ brick_elems.append('bricks/brick')
+
+ for elem in brick_elems:
+ for b in volume_el.findall(elem):
+ nodes.append((b.find('hostUuid').text,
+ b.find('name').text))
except (ParseError, AttributeError, ValueError) as e:
fail("Failed to parse Volume Info: %s" % e, logger=logger)
@@ -262,6 +326,7 @@ def _get_args():
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
description=PROG_DESCRIPTION)
subparsers = parser.add_subparsers(dest="mode")
+ subparsers.required = True
# create <SESSION> <VOLUME> [--debug] [--force]
parser_create = subparsers.add_parser('create')
@@ -312,6 +377,11 @@ def _get_args():
help="Tag prefix for file names emitted during"
" a full find operation; default: \"NEW\"",
default="NEW")
+ parser_pre.add_argument('--type', help="type: f, f-files only"
+ " d, d-directories only, by default = both",
+ default='both', choices=["f", "d", "both"])
+ parser_pre.add_argument("--field-separator", help="Field separator string",
+ default=" ")
# query <VOLUME> <OUTFILE> --since-time <SINCE_TIME>
# [--output-prefix <OUTPUT_PREFIX>] [--full]
@@ -321,6 +391,8 @@ def _get_args():
action=StoreAbsPath)
parser_query.add_argument("--since-time", help="UNIX epoch time since "
"which listing is required", type=int)
+ parser_query.add_argument("--end-time", help="UNIX epoch time up to "
+ "which listing is required", type=int)
parser_query.add_argument("--no-encode",
help="Do not encode path in output file",
action="store_true")
@@ -337,6 +409,12 @@ def _get_args():
help="Tag prefix for file names emitted during"
" a full find operation; default: \"NEW\"",
default="NEW")
+ parser_query.add_argument('--type', help="type: f, f-files only"
+ " d, d-directories only, by default = both",
+ default='both', choices=["f", "d", "both"])
+ parser_query.add_argument("--field-separator",
+ help="Field separator string",
+ default=" ")
# post <SESSION> <VOLUME>
parser_post = subparsers.add_parser('post')
@@ -422,8 +500,8 @@ def enable_volume_options(args):
% args.volume)
-def write_output(args, outfilemerger):
- with codecs.open(args.outfile, "a", encoding="utf-8") as f:
+def write_output(outfile, outfilemerger, field_separator):
+ with codecs.open(outfile, "a", encoding="utf-8") as f:
for row in outfilemerger.get():
# Multiple paths in case of Hardlinks
paths = row[1].split(",")
@@ -431,23 +509,20 @@ def write_output(args, outfilemerger):
for p in paths:
if p == "":
continue
- p_rep = p.replace("%2F%2F", "%2F").replace("//", "/")
+ p_rep = p.replace("//", "/")
if not row_2_rep:
- row_2_rep = row[2].replace("%2F%2F", "%2F").replace("//",
- "/")
+ row_2_rep = row[2].replace("//", "/")
if p_rep == row_2_rep:
continue
- f.write(u"{0} {1} {2}\n".format(row[0],
- p_rep,
- row_2_rep))
-
+ if row_2_rep and row_2_rep != "":
+ gfind_write_row(f, row[0], field_separator, p_rep, row_2_rep)
-def mode_create(session_dir, args):
- logger.debug("Init is called - Session: %s, Volume: %s"
- % (args.session, args.volume))
+ else:
+ gfind_write(f, row[0], field_separator, p_rep)
- cmd = ["gluster", 'volume', 'info', args.volume, "--xml"]
+def validate_volume(volume):
+ cmd = ["gluster", 'volume', 'info', volume, "--xml"]
_, data, _ = execute(cmd,
exit_msg="Failed to Run Gluster Volume Info",
logger=logger)
@@ -455,11 +530,42 @@ def mode_create(session_dir, args):
tree = etree.fromstring(data)
statusStr = tree.find('volInfo/volumes/volume/statusStr').text
except (ParseError, AttributeError) as e:
- fail("Invalid Volume: %s" % e, logger=logger)
-
+ fail("Invalid Volume: Check the Volume name! %s" % e)
if statusStr != "Started":
- fail("Volume %s is not online" % args.volume, logger=logger)
+ fail("Volume %s is not online" % volume)
+
+# The rules for a valid session name.
+SESSION_NAME_RULES = {
+ 'min_length': 2,
+ 'max_length': 256, # same as maximum volume length
+ # Specifies all alphanumeric characters, underscore, hyphen.
+ 'valid_chars': r'0-9a-zA-Z_-',
+}
+
+
+# checks valid session name, fail otherwise
+def validate_session_name(session):
+ # Check for minimum length
+ if len(session) < SESSION_NAME_RULES['min_length']:
+ fail('session_name must be at least ' +
+ str(SESSION_NAME_RULES['min_length']) + ' characters long.')
+ # Check for maximum length
+ if len(session) > SESSION_NAME_RULES['max_length']:
+ fail('session_name must not exceed ' +
+ str(SESSION_NAME_RULES['max_length']) + ' characters length.')
+
+ # Matches strings composed entirely of characters specified within
+ if not re.match(r'^[' + SESSION_NAME_RULES['valid_chars'] +
+ ']+$', session):
+ fail('Session name can only contain these characters: ' +
+ SESSION_NAME_RULES['valid_chars'])
+
+def mode_create(session_dir, args):
+ validate_session_name(args.session)
+
+ logger.debug("Init is called - Session: %s, Volume: %s"
+ % (args.session, args.volume))
mkdirp(session_dir, exit_on_err=True, logger=logger)
mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
logger=logger)
@@ -480,7 +586,7 @@ def mode_create(session_dir, args):
run_cmd_nodes("create", args, time_to_update=str(time_to_update))
if not os.path.exists(status_file) or args.reset_session_time:
- with open(status_file, "w", buffering=0) as f:
+ with open(status_file, "w") as f:
f.write(str(time_to_update))
sys.stdout.write("Session %s created with volume %s\n" %
@@ -490,6 +596,9 @@ def mode_create(session_dir, args):
def mode_query(session_dir, args):
+ global gtmpfilename
+ global g_pid_nodefile_map
+
# Verify volume status
cmd = ["gluster", 'volume', 'info', args.volume, "--xml"]
_, data, _ = execute(cmd,
@@ -516,48 +625,72 @@ def mode_query(session_dir, args):
enable_volume_options(args)
# Test options
- if not args.since_time and not args.full:
- fail("Please specify either --since-time or --full", logger=logger)
-
- if args.since_time and args.full:
- fail("Please specify either --since-time or --full, but not both",
+ if not args.full and args.type in ["f", "d"]:
+ fail("--type can only be used with --full")
+ if not args.since_time and not args.end_time and not args.full:
+ fail("Please specify either {--since-time and optionally --end-time} "
+ "or --full", logger=logger)
+
+ if args.since_time and args.end_time and args.full:
+ fail("Please specify either {--since-time and optionally --end-time} "
+ "or --full, but not both",
logger=logger)
+ if args.end_time and not args.since_time:
+ fail("Please specify --since-time as well", logger=logger)
+
# Start query command processing
+ start = -1
+ end = -1
if args.since_time:
start = args.since_time
+ if args.end_time:
+ end = args.end_time
else:
start = 0 # --full option is handled separately
logger.debug("Query is called - Session: %s, Volume: %s, "
- "Start time: %s"
- % ("default", args.volume, start))
+ "Start time: %s, End time: %s"
+ % ("default", args.volume, start, end))
- run_cmd_nodes("query", args, start=start)
+ prefix = datetime.now().strftime("%Y%m%d-%H%M%S-%f-")
+ gtmpfilename = prefix + next(tempfile._get_candidate_names())
+
+ run_cmd_nodes("query", args, start=start, end=end,
+ tmpfilename=gtmpfilename)
# Merger
if args.full:
- cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile]
- execute(cmd,
- exit_msg="Failed to merge output files "
- "collected from nodes", logger=logger)
+ if len(g_pid_nodefile_map) > 0:
+ cmd = ["sort", "-u"] + list(g_pid_nodefile_map.values()) + \
+ ["-o", args.outfile]
+ execute(cmd,
+ exit_msg="Failed to merge output files "
+ "collected from nodes", logger=logger)
+ else:
+ fail("Failed to collect any output files from peers. "
+ "Looks like all bricks are offline.", logger=logger)
else:
# Read each Changelogs db and generate finaldb
create_file(args.outfile, exit_on_err=True, logger=logger)
- outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles)
- write_output(args, outfilemerger)
+ outfilemerger = OutputMerger(args.outfile + ".db",
+ list(g_pid_nodefile_map.values()))
+ write_output(args.outfile, outfilemerger, args.field_separator)
try:
os.remove(args.outfile + ".db")
except (IOError, OSError):
pass
- run_cmd_nodes("cleanup", args)
+ run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename)
sys.stdout.write("Generated output file %s\n" % args.outfile)
def mode_pre(session_dir, args):
+ global gtmpfilename
+ global g_pid_nodefile_map
+
"""
Read from Session file and write to session.pre file
"""
@@ -568,6 +701,9 @@ def mode_pre(session_dir, args):
mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger)
+ if not args.full and args.type in ["f", "d"]:
+ fail("--type can only be used with --full")
+
# If Pre status file exists and running pre command again
if os.path.exists(status_file_pre) and not args.regenerate_outfile:
fail("Post command is not run after last pre, "
@@ -587,29 +723,37 @@ def mode_pre(session_dir, args):
"Start time: %s, End time: %s"
% (args.session, args.volume, start, endtime_to_update))
- run_cmd_nodes("pre", args, start=start)
+ prefix = datetime.now().strftime("%Y%m%d-%H%M%S-%f-")
+ gtmpfilename = prefix + next(tempfile._get_candidate_names())
+
+ run_cmd_nodes("pre", args, start=start, end=-1, tmpfilename=gtmpfilename)
# Merger
if args.full:
- cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile]
- execute(cmd,
- exit_msg="Failed to merge output files "
- "collected from nodes", logger=logger)
+ if len(g_pid_nodefile_map) > 0:
+ cmd = ["sort", "-u"] + list(g_pid_nodefile_map.values()) + \
+ ["-o", args.outfile]
+ execute(cmd,
+ exit_msg="Failed to merge output files "
+ "collected from nodes", logger=logger)
+ else:
+ fail("Failed to collect any output files from peers. "
+ "Looks like all bricks are offline.", logger=logger)
else:
# Read each Changelogs db and generate finaldb
create_file(args.outfile, exit_on_err=True, logger=logger)
- outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles)
-
- write_output(args, outfilemerger)
+ outfilemerger = OutputMerger(args.outfile + ".db",
+ list(g_pid_nodefile_map.values()))
+ write_output(args.outfile, outfilemerger, args.field_separator)
try:
os.remove(args.outfile + ".db")
except (IOError, OSError):
pass
- run_cmd_nodes("cleanup", args)
+ run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename)
- with open(status_file_pre, "w", buffering=0) as f:
+ with open(status_file_pre, "w") as f:
f.write(str(endtime_to_update))
sys.stdout.write("Generated output file %s\n" % args.outfile)
@@ -713,6 +857,10 @@ def mode_list(session_dir, args):
def main():
+ global gtmpfilename
+
+ args = None
+
try:
args = _get_args()
mkdirp(conf.get_opt("session_dir"), exit_on_err=True)
@@ -731,6 +879,11 @@ def main():
args.mode not in ["create", "list", "query"]:
fail("Invalid session %s" % args.session)
+ # volume involved, validate the volume first
+ if args.mode not in ["list"]:
+ validate_volume(args.volume)
+
+
# "default" is a system defined session name
if args.mode in ["create", "post", "pre", "delete"] and \
args.session == "default":
@@ -756,5 +909,13 @@ def main():
# mode_<args.mode> will be the function name to be called
globals()["mode_" + args.mode](session_dir, args)
except KeyboardInterrupt:
+ if args is not None:
+ if args.mode == "pre" or args.mode == "query":
+ # cleanup session
+ if gtmpfilename is not None:
+ # no more interrupts until we clean up
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename)
+
# Interrupted, exit with non zero error code
sys.exit(2)