diff options
Diffstat (limited to 'tools/glusterfind/src')
| -rw-r--r-- | tools/glusterfind/src/Makefile.am | 2 | ||||
| -rw-r--r-- | tools/glusterfind/src/brickfind.py | 25 | ||||
| -rw-r--r-- | tools/glusterfind/src/changelog.py | 72 | ||||
| -rw-r--r-- | tools/glusterfind/src/conf.py | 6 | ||||
| -rw-r--r-- | tools/glusterfind/src/gfind_py2py3.py | 88 | ||||
| -rw-r--r-- | tools/glusterfind/src/libgfchangelog.py | 23 | ||||
| -rw-r--r-- | tools/glusterfind/src/main.py | 133 | ||||
| -rw-r--r-- | tools/glusterfind/src/nodeagent.py | 8 | ||||
| -rw-r--r-- | tools/glusterfind/src/utils.py | 13 |
9 files changed, 294 insertions, 76 deletions
diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am index c180f051933..43b6141b01c 100644 --- a/tools/glusterfind/src/Makefile.am +++ b/tools/glusterfind/src/Makefile.am @@ -2,7 +2,7 @@ glusterfinddir = $(GLUSTERFS_LIBEXECDIR)/glusterfind if WITH_SERVER glusterfind_PYTHON = conf.py utils.py __init__.py \ - main.py libgfchangelog.py changelogdata.py + main.py libgfchangelog.py changelogdata.py gfind_py2py3.py glusterfind_SCRIPTS = changelog.py nodeagent.py \ brickfind.py diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py index 6bfb997f5a0..73b6350188d 100644 --- a/tools/glusterfind/src/brickfind.py +++ b/tools/glusterfind/src/brickfind.py @@ -1,4 +1,4 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> @@ -41,12 +41,20 @@ def brickfind_crawl(brick, args): with open(args.outfile, "a+") as fout: brick_path_len = len(brick) - def output_callback(path, filter_result): + def output_callback(path, filter_result, is_dir): path = path.strip() path = path[brick_path_len+1:] - output_write(fout, path, args.output_prefix, - encode=(not args.no_encode), tag=args.tag, - field_separator=args.field_separator) + + if args.type == "both": + output_write(fout, path, args.output_prefix, + encode=(not args.no_encode), tag=args.tag, + field_separator=args.field_separator) + else: + if (is_dir and args.type == "d") or ( + (not is_dir) and args.type == "f"): + output_write(fout, path, args.output_prefix, + encode=(not args.no_encode), tag=args.tag, + field_separator=args.field_separator) ignore_dirs = [os.path.join(brick, dirname) for dirname in @@ -77,6 +85,9 @@ def _get_args(): action="store_true") parser.add_argument("--output-prefix", help="File prefix in output", default=".") + parser.add_argument('--type', help="type: f, f-files only" + " d, d-directories only, by default = both", + default='both') parser.add_argument("--field-separator", help="Field separator", default=" ") @@ -87,7 +98,7 @@ if __name__ == "__main__": args = _get_args() session_dir = os.path.join(conf.get_opt("session_dir"), args.session) status_file = os.path.join(session_dir, args.volume, - "%s.status" % urllib.parse.quote_plus(args.brick)) + "%s.status" % urllib.quote_plus(args.brick)) status_file_pre = status_file + ".pre" mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, logger=logger) @@ -102,6 +113,6 @@ if __name__ == "__main__": time_to_update = int(time.time()) brickfind_crawl(args.brick, args) if not args.only_query: - with open(status_file_pre, "w", buffering=0) as f: + with open(status_file_pre, "w") as f: f.write(str(time_to_update)) sys.exit(0) diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py index 5897f790d4b..a5e9ea4288f 100644 --- a/tools/glusterfind/src/changelog.py +++ b/tools/glusterfind/src/changelog.py @@ -1,4 +1,4 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> @@ -14,6 +14,7 @@ import sys import time import xattr import logging +from gfind_py2py3 import bytearray_to_str from argparse import ArgumentParser, RawDescriptionHelpFormatter import hashlib try: @@ -50,7 +51,7 @@ def pgfid_to_path(brick, changelog_data): """ # pgfid1 to path1 in case of CREATE/MKNOD/MKDIR/LINK/SYMLINK for row in changelog_data.gfidpath_get_distinct("pgfid1", {"path1": ""}): - # In case of Data/Metadata only, pgfid1 will not be their + # In case of Data/Metadata only, pgfid1 will not be there if row[0] == "": continue @@ -105,15 +106,55 @@ def populate_pgfid_and_inodegfid(brick, changelog_data): changelog_data.inodegfid_add(os.stat(p).st_ino, gfid) file_xattrs = xattr.list(p) for x in file_xattrs: - if x.startswith("trusted.pgfid."): + x_str = bytearray_to_str(x) + if x_str.startswith("trusted.pgfid."): # PGFID in pgfid table - changelog_data.pgfid_add(x.split(".")[-1]) + changelog_data.pgfid_add(x_str.split(".")[-1]) except (IOError, OSError): # All OS Errors ignored, since failures will be logged # in End. All GFIDs present in gfidpath table continue +def enum_hard_links_using_gfid2path(brick, gfid, args): + hardlinks = [] + p = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid) + if not os.path.isdir(p): + # we have a symlink or a normal file + try: + file_xattrs = xattr.list(p) + for x in file_xattrs: + x_str = bytearray_to_str(x) + if x_str.startswith("trusted.gfid2path."): + # get the value for the xattr i.e. <PGFID>/<BN> + v = xattr.getxattr(p, x_str) + v_str = bytearray_to_str(v) + pgfid, bn = v_str.split(os.sep) + try: + path = symlink_gfid_to_path(brick, pgfid) + fullpath = os.path.join(path, bn) + fullpath = output_path_prepare(fullpath, args) + hardlinks.append(fullpath) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) + continue + except (IOError, OSError): + pass + return hardlinks + + +def gfid_to_all_paths_using_gfid2path(brick, changelog_data, args): + path = "" + for row in changelog_data.gfidpath_get({"path1": "", "type": "MODIFY"}): + gfid = row[3].strip() + logger.debug("Processing gfid %s" % gfid) + hardlinks = enum_hard_links_using_gfid2path(brick, gfid, args) + + path = ",".join(hardlinks) + + changelog_data.gfidpath_update({"path1": path}, {"gfid": gfid}) + + def gfid_to_path_using_pgfid(brick, changelog_data, args): """ For all the pgfids collected, Converts to Path and @@ -246,7 +287,7 @@ def get_changes(brick, hash_dir, log_file, start, end, args): session_dir = os.path.join(conf.get_opt("session_dir"), args.session) status_file = os.path.join(session_dir, args.volume, - "%s.status" % urllib.parse.quote_plus(args.brick)) + "%s.status" % urllib.quote_plus(args.brick)) # Get previous session try: @@ -276,6 +317,7 @@ def get_changes(brick, hash_dir, log_file, start, end, args): fail("%s: %s Historical Changelogs not available: %s" % (args.node, brick, e), logger=logger) + logger.info("[1/4] Starting changelog parsing ...") try: # scan followed by getchanges till scan returns zero. # history_scan() is blocking call, till it gets the number @@ -304,18 +346,27 @@ def get_changes(brick, hash_dir, log_file, start, end, args): fail("%s Error during Changelog Crawl: %s" % (brick, e), logger=logger) + logger.info("[1/4] Finished changelog parsing.") + # Convert all pgfid available from Changelogs + logger.info("[2/4] Starting 'pgfid to path' conversions ...") pgfid_to_path(brick, changelog_data) changelog_data.commit() + logger.info("[2/4] Finished 'pgfid to path' conversions.") - # Convert all GFIDs for which no other additional details available - gfid_to_path_using_pgfid(brick, changelog_data, args) + # Convert all gfids recorded for data and metadata to all hardlink paths + logger.info("[3/4] Starting 'gfid2path' conversions ...") + gfid_to_all_paths_using_gfid2path(brick, changelog_data, args) changelog_data.commit() + logger.info("[3/4] Finished 'gfid2path' conversions.") # If some GFIDs fail to get converted from previous step, # convert using find + logger.info("[4/4] Starting 'gfid to path using batchfind' " + "conversions ...") gfid_to_path_using_batchfind(brick, changelog_data) changelog_data.commit() + logger.info("[4/4] Finished 'gfid to path using batchfind' conversions.") return actual_end @@ -329,7 +380,7 @@ def changelog_crawl(brick, start, end, args): # WORKING_DIR/BRICKHASH/OUTFILE working_dir = os.path.dirname(args.outfile) - brickhash = hashlib.sha1(brick) + brickhash = hashlib.sha1(brick.encode()) brickhash = str(brickhash.hexdigest()) working_dir = os.path.join(working_dir, brickhash) @@ -364,6 +415,7 @@ def _get_args(): action="store_true") parser.add_argument("--output-prefix", help="File prefix in output", default=".") + parser.add_argument("--type",default="both") parser.add_argument("-N", "--only-namespace-changes", help="List only namespace changes", action="store_true") @@ -383,7 +435,7 @@ if __name__ == "__main__": session_dir = os.path.join(conf.get_opt("session_dir"), args.session) status_file = os.path.join(session_dir, args.volume, - "%s.status" % urllib.parse.quote_plus(args.brick)) + "%s.status" % urllib.quote_plus(args.brick)) status_file_pre = status_file + ".pre" mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, logger=logger) @@ -409,7 +461,7 @@ if __name__ == "__main__": end)) actual_end = changelog_crawl(args.brick, start, end, args) if not args.only_query: - with open(status_file_pre, "w", buffering=0) as f: + with open(status_file_pre, "w") as f: f.write(str(actual_end)) logger.info("%s Finished Changelog Crawl - End: %s" % (args.brick, diff --git a/tools/glusterfind/src/conf.py b/tools/glusterfind/src/conf.py index fdab38badb1..3849ba5dd1f 100644 --- a/tools/glusterfind/src/conf.py +++ b/tools/glusterfind/src/conf.py @@ -10,11 +10,11 @@ import os try: - import configparser + from ConfigParser import ConfigParser except ImportError: - import ConfigParser as configparser + from configparser import ConfigParser -config = ConfigParser.ConfigParser() +config = ConfigParser() config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), "tool.conf")) diff --git a/tools/glusterfind/src/gfind_py2py3.py b/tools/glusterfind/src/gfind_py2py3.py new file mode 100644 index 00000000000..87324fbf350 --- /dev/null +++ b/tools/glusterfind/src/gfind_py2py3.py @@ -0,0 +1,88 @@ +# +# Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +# All python2/python3 compatibility routines + +import os +import sys +from ctypes import create_string_buffer + +if sys.version_info >= (3,): + + # Raw conversion of bytearray to string. Used in the cases where + # buffer is created by create_string_buffer which is a 8-bit char + # array and passed to syscalls to fetch results. Using encode/decode + # doesn't work as it converts to string altering the size. + # def bytearray_to_str(byte_arr): + def bytearray_to_str(byte_arr): + return ''.join([chr(b) for b in byte_arr]) + + def gf_create_string_buffer(size): + return create_string_buffer(b'\0', size) + + def gfind_history_changelog(libgfc, changelog_path, start, end, num_parallel, + actual_end): + return libgfc.gf_history_changelog(changelog_path.encode(), start, end, num_parallel, + actual_end) + + def gfind_changelog_register(libgfc, brick, path, log_file, log_level, + retries): + return libgfc.gf_changelog_register(brick.encode(), path.encode(), log_file.encode(), + log_level, retries) + + def gfind_history_changelog_done(libgfc, clfile): + return libgfc.gf_history_changelog_done(clfile.encode()) + + def gfind_write_row(f, row, field_separator, p_rep, row_2_rep): + f.write(u"{0}{1}{2}{3}{4}\n".format(row, + field_separator, + p_rep, + field_separator, + row_2_rep)) + + def gfind_write(f, row, field_separator, p_rep): + f.write(u"{0}{1}{2}\n".format(row, + field_separator, + p_rep)) + + +else: + + # Raw conversion of bytearray to string + def bytearray_to_str(byte_arr): + return byte_arr + + def gf_create_string_buffer(size): + return create_string_buffer('\0', size) + + def gfind_history_changelog(libgfc, changelog_path, start, end, num_parallel, + actual_end): + return libgfc.gf_history_changelog(changelog_path, start, end, + num_parallel, actual_end) + + def gfind_changelog_register(libgfc, brick, path, log_file, log_level, + retries): + return libgfc.gf_changelog_register(brick, path, log_file, + log_level, retries) + + def gfind_history_changelog_done(libgfc, clfile): + return libgfc.gf_history_changelog_done(clfile) + + def gfind_write_row(f, row, field_separator, p_rep, row_2_rep): + f.write(u"{0}{1}{2}{3}{4}\n".format(row, + field_separator, + p_rep, + field_separator, + row_2_rep).encode()) + + def gfind_write(f, row, field_separator, p_rep): + f.write(u"{0}{1}{2}\n".format(row, + field_separator, + p_rep).encode()) diff --git a/tools/glusterfind/src/libgfchangelog.py b/tools/glusterfind/src/libgfchangelog.py index afb3387db2d..513bb101e93 100644 --- a/tools/glusterfind/src/libgfchangelog.py +++ b/tools/glusterfind/src/libgfchangelog.py @@ -9,15 +9,17 @@ # cases as published by the Free Software Foundation. import os -from ctypes import CDLL, get_errno, create_string_buffer, c_ulong, byref -from ctypes import RTLD_GLOBAL +from ctypes import CDLL, RTLD_GLOBAL, get_errno, create_string_buffer, c_ulong, byref +from ctypes.util import find_library +from gfind_py2py3 import bytearray_to_str, gf_create_string_buffer +from gfind_py2py3 import gfind_history_changelog, gfind_changelog_register +from gfind_py2py3 import gfind_history_changelog_done class ChangelogException(OSError): pass - -libgfc = CDLL("libgfchangelog.so", use_errno=True, mode=RTLD_GLOBAL) +libgfc = CDLL(find_library("gfchangelog"), mode=RTLD_GLOBAL, use_errno=True) def raise_oserr(prefix=None): @@ -33,8 +35,7 @@ def cl_init(): def cl_register(brick, path, log_file, log_level, retries=0): - ret = libgfc.gf_changelog_register(brick, path, log_file, - log_level, retries) + ret = gfind_changelog_register(libgfc, brick, path, log_file,log_level, retries) if ret == -1: raise_oserr(prefix="gf_changelog_register") @@ -49,7 +50,7 @@ def cl_history_scan(): def cl_history_changelog(changelog_path, start, end, num_parallel): actual_end = c_ulong() - ret = libgfc.gf_history_changelog(changelog_path, start, end, + ret = gfind_history_changelog(libgfc,changelog_path, start, end, num_parallel, byref(actual_end)) if ret == -1: @@ -70,13 +71,15 @@ def cl_history_getchanges(): return f.split('.')[-1] changes = [] - buf = create_string_buffer('\0', 4096) + buf = gf_create_string_buffer(4096) while True: ret = libgfc.gf_history_changelog_next_change(buf, 4096) if ret in (0, -1): break - changes.append(buf.raw[:ret - 1]) + # py2 and py3 compatibility + result = bytearray_to_str(buf.raw[:ret - 1]) + changes.append(result) if ret == -1: raise_oserr(prefix="gf_history_changelog_next_change") @@ -84,6 +87,6 @@ def cl_history_getchanges(): def cl_history_done(clfile): - ret = libgfc.gf_history_changelog_done(clfile) + ret = gfind_history_changelog_done(libgfc, clfile) if ret == -1: raise_oserr(prefix="gf_history_changelog_done") diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py index 2b782c8dca4..4b5466d0114 100644 --- a/tools/glusterfind/src/main.py +++ b/tools/glusterfind/src/main.py @@ -1,4 +1,4 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> @@ -16,12 +16,14 @@ from multiprocessing import Process import os import xml.etree.cElementTree as etree from argparse import ArgumentParser, RawDescriptionHelpFormatter, Action +from gfind_py2py3 import gfind_write_row, gfind_write import logging import shutil import tempfile import signal from datetime import datetime import codecs +import re from utils import execute, is_host_local, mkdirp, fail from utils import setup_logger, human_time, handle_rm_error @@ -35,9 +37,9 @@ GlusterFS Incremental API ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError logger = logging.getLogger() -node_outfiles = [] vol_statusStr = "" gtmpfilename = None +g_pid_nodefile_map = {} class StoreAbsPath(Action): @@ -111,7 +113,7 @@ def node_cmd(host, host_uuid, task, cmd, args, opts): def run_cmd_nodes(task, args, **kwargs): - global node_outfiles + global g_pid_nodefile_map nodes = get_nodes(args.volume) pool = [] for num, node in enumerate(nodes): @@ -142,7 +144,6 @@ def run_cmd_nodes(task, args, **kwargs): if tag == "": tag = '""' if not is_host_local(host_uuid) else "" - node_outfiles.append(node_outfile) # remote file will be copied into this directory mkdirp(os.path.dirname(node_outfile), exit_on_err=True, logger=logger) @@ -165,6 +166,7 @@ def run_cmd_nodes(task, args, **kwargs): (["--no-encode"] if args.no_encode else []) + \ (["--only-namespace-changes"] if args.only_namespace_changes else []) + \ + (["--type", args.type]) + \ (["--field-separator", FS] if args.full else []) opts["node_outfile"] = node_outfile @@ -179,7 +181,6 @@ def run_cmd_nodes(task, args, **kwargs): if tag == "": tag = '""' if not is_host_local(host_uuid) else "" - node_outfiles.append(node_outfile) # remote file will be copied into this directory mkdirp(os.path.dirname(node_outfile), exit_on_err=True, logger=logger) @@ -203,6 +204,7 @@ def run_cmd_nodes(task, args, **kwargs): (["--no-encode"] if args.no_encode else []) + \ (["--only-namespace-changes"] if args.only_namespace_changes else []) + \ + (["--type", args.type]) + \ (["--field-separator", FS] if args.full else []) opts["node_outfile"] = node_outfile @@ -262,6 +264,7 @@ def run_cmd_nodes(task, args, **kwargs): args=(host, host_uuid, task, cmd, args, opts)) p.start() pool.append(p) + g_pid_nodefile_map[p.pid] = node_outfile for num, p in enumerate(pool): p.join() @@ -269,8 +272,11 @@ def run_cmd_nodes(task, args, **kwargs): logger.warn("Command %s failed in %s" % (task, nodes[num][1])) if task in ["create", "delete"]: fail("Command %s failed in %s" % (task, nodes[num][1])) - elif task == "pre" and args.disable_partial: - sys.exit(1) + elif task == "pre" or task == "query": + if args.disable_partial: + sys.exit(1) + else: + del g_pid_nodefile_map[p.pid] @cache_output @@ -320,6 +326,7 @@ def _get_args(): parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, description=PROG_DESCRIPTION) subparsers = parser.add_subparsers(dest="mode") + subparsers.required = True # create <SESSION> <VOLUME> [--debug] [--force] parser_create = subparsers.add_parser('create') @@ -370,6 +377,9 @@ def _get_args(): help="Tag prefix for file names emitted during" " a full find operation; default: \"NEW\"", default="NEW") + parser_pre.add_argument('--type', help="type: f, f-files only" + " d, d-directories only, by default = both", + default='both', choices=["f", "d", "both"]) parser_pre.add_argument("--field-separator", help="Field separator string", default=" ") @@ -381,7 +391,7 @@ def _get_args(): action=StoreAbsPath) parser_query.add_argument("--since-time", help="UNIX epoch time since " "which listing is required", type=int) - parser_query.add_argument("--end-time", help="UNIX epoch time upto " + parser_query.add_argument("--end-time", help="UNIX epoch time up to " "which listing is required", type=int) parser_query.add_argument("--no-encode", help="Do not encode path in output file", @@ -399,6 +409,9 @@ def _get_args(): help="Tag prefix for file names emitted during" " a full find operation; default: \"NEW\"", default="NEW") + parser_query.add_argument('--type', help="type: f, f-files only" + " d, d-directories only, by default = both", + default='both', choices=["f", "d", "both"]) parser_query.add_argument("--field-separator", help="Field separator string", default=" ") @@ -503,22 +516,13 @@ def write_output(outfile, outfilemerger, field_separator): continue if row_2_rep and row_2_rep != "": - f.write("{0}{1}{2}{3}{4}\n".format(row[0], - field_separator, - p_rep, - field_separator, - row_2_rep)) - else: - f.write("{0}{1}{2}\n".format(row[0], - field_separator, - p_rep)) - + gfind_write_row(f, row[0], field_separator, p_rep, row_2_rep) -def mode_create(session_dir, args): - logger.debug("Init is called - Session: %s, Volume: %s" - % (args.session, args.volume)) + else: + gfind_write(f, row[0], field_separator, p_rep) - cmd = ["gluster", 'volume', 'info', args.volume, "--xml"] +def validate_volume(volume): + cmd = ["gluster", 'volume', 'info', volume, "--xml"] _, data, _ = execute(cmd, exit_msg="Failed to Run Gluster Volume Info", logger=logger) @@ -526,11 +530,42 @@ def mode_create(session_dir, args): tree = etree.fromstring(data) statusStr = tree.find('volInfo/volumes/volume/statusStr').text except (ParseError, AttributeError) as e: - fail("Invalid Volume: %s" % e, logger=logger) - + fail("Invalid Volume: Check the Volume name! %s" % e) if statusStr != "Started": - fail("Volume %s is not online" % args.volume, logger=logger) + fail("Volume %s is not online" % volume) + +# The rules for a valid session name. +SESSION_NAME_RULES = { + 'min_length': 2, + 'max_length': 256, # same as maximum volume length + # Specifies all alphanumeric characters, underscore, hyphen. + 'valid_chars': r'0-9a-zA-Z_-', +} + + +# checks valid session name, fail otherwise +def validate_session_name(session): + # Check for minimum length + if len(session) < SESSION_NAME_RULES['min_length']: + fail('session_name must be at least ' + + str(SESSION_NAME_RULES['min_length']) + ' characters long.') + # Check for maximum length + if len(session) > SESSION_NAME_RULES['max_length']: + fail('session_name must not exceed ' + + str(SESSION_NAME_RULES['max_length']) + ' characters length.') + + # Matches strings composed entirely of characters specified within + if not re.match(r'^[' + SESSION_NAME_RULES['valid_chars'] + + ']+$', session): + fail('Session name can only contain these characters: ' + + SESSION_NAME_RULES['valid_chars']) + +def mode_create(session_dir, args): + validate_session_name(args.session) + + logger.debug("Init is called - Session: %s, Volume: %s" + % (args.session, args.volume)) mkdirp(session_dir, exit_on_err=True, logger=logger) mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, logger=logger) @@ -551,7 +586,7 @@ def mode_create(session_dir, args): run_cmd_nodes("create", args, time_to_update=str(time_to_update)) if not os.path.exists(status_file) or args.reset_session_time: - with open(status_file, "w", buffering=0) as f: + with open(status_file, "w") as f: f.write(str(time_to_update)) sys.stdout.write("Session %s created with volume %s\n" % @@ -562,6 +597,7 @@ def mode_create(session_dir, args): def mode_query(session_dir, args): global gtmpfilename + global g_pid_nodefile_map # Verify volume status cmd = ["gluster", 'volume', 'info', args.volume, "--xml"] @@ -589,6 +625,8 @@ def mode_query(session_dir, args): enable_volume_options(args) # Test options + if not args.full and args.type in ["f", "d"]: + fail("--type can only be used with --full") if not args.since_time and not args.end_time and not args.full: fail("Please specify either {--since-time and optionally --end-time} " "or --full", logger=logger) @@ -623,14 +661,20 @@ def mode_query(session_dir, args): # Merger if args.full: - cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] - execute(cmd, - exit_msg="Failed to merge output files " - "collected from nodes", logger=logger) + if len(g_pid_nodefile_map) > 0: + cmd = ["sort", "-u"] + list(g_pid_nodefile_map.values()) + \ + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + else: + fail("Failed to collect any output files from peers. " + "Looks like all bricks are offline.", logger=logger) else: # Read each Changelogs db and generate finaldb create_file(args.outfile, exit_on_err=True, logger=logger) - outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles) + outfilemerger = OutputMerger(args.outfile + ".db", + list(g_pid_nodefile_map.values())) write_output(args.outfile, outfilemerger, args.field_separator) try: @@ -645,6 +689,7 @@ def mode_query(session_dir, args): def mode_pre(session_dir, args): global gtmpfilename + global g_pid_nodefile_map """ Read from Session file and write to session.pre file @@ -656,6 +701,9 @@ def mode_pre(session_dir, args): mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger) + if not args.full and args.type in ["f", "d"]: + fail("--type can only be used with --full") + # If Pre status file exists and running pre command again if os.path.exists(status_file_pre) and not args.regenerate_outfile: fail("Post command is not run after last pre, " @@ -682,14 +730,20 @@ def mode_pre(session_dir, args): # Merger if args.full: - cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] - execute(cmd, - exit_msg="Failed to merge output files " - "collected from nodes", logger=logger) + if len(g_pid_nodefile_map) > 0: + cmd = ["sort", "-u"] + list(g_pid_nodefile_map.values()) + \ + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + else: + fail("Failed to collect any output files from peers. " + "Looks like all bricks are offline.", logger=logger) else: # Read each Changelogs db and generate finaldb create_file(args.outfile, exit_on_err=True, logger=logger) - outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles) + outfilemerger = OutputMerger(args.outfile + ".db", + list(g_pid_nodefile_map.values())) write_output(args.outfile, outfilemerger, args.field_separator) try: @@ -699,7 +753,7 @@ def mode_pre(session_dir, args): run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename) - with open(status_file_pre, "w", buffering=0) as f: + with open(status_file_pre, "w") as f: f.write(str(endtime_to_update)) sys.stdout.write("Generated output file %s\n" % args.outfile) @@ -825,6 +879,11 @@ def main(): args.mode not in ["create", "list", "query"]: fail("Invalid session %s" % args.session) + # volume involved, validate the volume first + if args.mode not in ["list"]: + validate_volume(args.volume) + + # "default" is a system defined session name if args.mode in ["create", "post", "pre", "delete"] and \ args.session == "default": diff --git a/tools/glusterfind/src/nodeagent.py b/tools/glusterfind/src/nodeagent.py index 4aed0e2ad96..679daa6fa76 100644 --- a/tools/glusterfind/src/nodeagent.py +++ b/tools/glusterfind/src/nodeagent.py @@ -1,4 +1,4 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> @@ -52,13 +52,13 @@ def mode_create(args): session_dir = os.path.join(conf.get_opt("session_dir"), args.session) status_file = os.path.join(session_dir, args.volume, - "%s.status" % urllib.parse.quote_plus(args.brick)) + "%s.status" % urllib.quote_plus(args.brick)) mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, logger=logger) if not os.path.exists(status_file) or args.reset_session_time: - with open(status_file, "w", buffering=0) as f: + with open(status_file, "w") as f: f.write(args.time_to_update) sys.exit(0) @@ -67,7 +67,7 @@ def mode_create(args): def mode_post(args): session_dir = os.path.join(conf.get_opt("session_dir"), args.session) status_file = os.path.join(session_dir, args.volume, - "%s.status" % urllib.parse.quote_plus(args.brick)) + "%s.status" % urllib.quote_plus(args.brick)) mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, logger=logger) diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py index b376241820b..906ebd8f252 100644 --- a/tools/glusterfind/src/utils.py +++ b/tools/glusterfind/src/utils.py @@ -58,12 +58,13 @@ def find(path, callback_func=lambda x: True, filter_func=lambda x: True, # Capture filter_func output and pass it to callback function filter_result = filter_func(path) if filter_result is not None: - callback_func(path, filter_result) + callback_func(path, filter_result, os.path.isdir(path)) for p in os.listdir(path): full_path = os.path.join(path, p) - if os.path.isdir(full_path): + is_dir = os.path.isdir(full_path) + if is_dir: if subdirs_crawl: find(full_path, callback_func, filter_func, ignore_dirs) else: @@ -73,7 +74,7 @@ def find(path, callback_func=lambda x: True, filter_func=lambda x: True, else: filter_result = filter_func(full_path) if filter_result is not None: - callback_func(full_path, filter_result) + callback_func(full_path, filter_result, is_dir) def output_write(f, path, prefix=".", encode=False, tag="", @@ -229,7 +230,11 @@ def get_changelog_rollover_time(volumename): try: tree = etree.fromstring(out) - return int(tree.find('volGetopts/Opt/Value').text) + val = tree.find('volGetopts/Opt/Value').text + if val is not None: + # Filter the value by split, as it may be 'X (DEFAULT)' + # and we only need 'X' + return int(val.split(' ', 1)[0]) except ParseError: return DEFAULT_CHANGELOG_INTERVAL |
