diff options
Diffstat (limited to 'tools/glusterfind/src/changelog.py')
-rw-r--r-- | tools/glusterfind/src/changelog.py | 309 |
1 files changed, 309 insertions, 0 deletions
diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py new file mode 100644 index 00000000000..b7697ea5030 --- /dev/null +++ b/tools/glusterfind/src/changelog.py @@ -0,0 +1,309 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +import sys +import time +import xattr +from errno import ENOENT +import logging +from argparse import ArgumentParser, RawDescriptionHelpFormatter +import hashlib + +import libgfchangelog +from utils import create_file, mkdirp, execute, symlink_gfid_to_path +from utils import fail, setup_logger, output_write, find +import conf + + +CHANGELOG_LOG_LEVEL = 9 +CHANGELOG_CONN_RETRIES = 5 +CHANGELOGAPI_NUM_WORKERS = 3 +PROG_DESCRIPTION = """ +Changelog Crawler +""" +history_turns = 0 +history_turn_time = 0 + +logger = logging.getLogger() + + +def gfid_to_path_using_batchfind(brick, gfids_file, output_file): + """ + find -samefile gets the inode number and crawls entire namespace + to get the list of files/dirs having same inode number. + Do find without any option, except the ignore directory option, + print the output in <INODE_NUM> <PATH> format, use this output + to look into in-memory dictionary of inode numbers got from the + list of GFIDs + """ + with open(output_file, "a+") as fout: + inode_dict = {} + with open(gfids_file) as f: + for gfid in f: + gfid = gfid.strip() + backend_path = os.path.join(brick, ".glusterfs", + gfid[0:2], gfid[2:4], gfid) + + try: + inode_dict[str(os.stat(backend_path).st_ino)] = 1 + except (IOError, OSError) as e: + if e.errno == ENOENT: + continue + else: + fail("%s Failed to convert to path from " + "GFID %s: %s" % (brick, gfid, e), logger=logger) + + if not inode_dict: + return + + def inode_filter(path): + try: + st = os.lstat(path) + except (OSError, IOError) as e: + if e.errno == ENOENT: + st = None + else: + raise + + if st and inode_dict.get(str(st.st_ino), None): + return True + + return False + + brick_path_len = len(brick) + + def output_callback(path): + path = path.strip() + path = path[brick_path_len+1:] + output_write(fout, path, args.output_prefix) + + # Length of brick path, to remove from output path + find(brick, callback_func=output_callback, + filter_func=inode_filter, + ignore_dirs=[".glusterfs"]) + + fout.flush() + os.fsync(fout.fileno()) + + +def gfid_to_path_using_pgfid(brick, gfids_file, output_file, outfile_failures): + """ + Parent GFID is saved as xattr, collect Parent GFIDs from all + the files from gfids_file. Convert parent GFID to path and Crawl + each directories to get the list of files/dirs having same inode number. + Do find with maxdepth as 1 and print the output in <INODE_NUM> <PATH> + format, use this output to look into in memory dictionary of inode + numbers got from the list of GFIDs + """ + with open(output_file, "a+") as fout: + pgfids = set() + inode_dict = {} + with open(gfids_file) as f: + for gfid in f: + gfid = gfid.strip() + p = os.path.join(brick, + ".glusterfs", + gfid[0:2], + gfid[2:4], + gfid) + if os.path.islink(p): + path = symlink_gfid_to_path(brick, gfid) + output_write(fout, path, args.output_prefix) + else: + try: + inode_dict[str(os.stat(p).st_ino)] = 1 + file_xattrs = xattr.list(p) + num_parent_gfid = 0 + for x in file_xattrs: + if x.startswith("trusted.pgfid."): + num_parent_gfid += 1 + pgfids.add(x.split(".")[-1]) + + if num_parent_gfid == 0: + with open(outfile_failures, "a") as f: + f.write("%s\n" % gfid) + f.flush() + os.fsync(f.fileno()) + + except (IOError, OSError) as e: + if e.errno == ENOENT: + continue + else: + fail("%s Failed to convert to path from " + "GFID %s: %s" % (brick, gfid, e), + logger=logger) + + if not inode_dict: + return + + def inode_filter(path): + try: + st = os.lstat(path) + except (OSError, IOError) as e: + if e.errno == ENOENT: + st = None + else: + raise + + if st and inode_dict.get(str(st.st_ino), None): + return True + + return False + + # Length of brick path, to remove from output path + brick_path_len = len(brick) + + def output_callback(path): + path = path.strip() + path = path[brick_path_len+1:] + output_write(fout, path, args.output_prefix) + + for pgfid in pgfids: + path = symlink_gfid_to_path(brick, pgfid) + find(os.path.join(brick, path), + callback_func=output_callback, + filter_func=inode_filter, + ignore_dirs=[".glusterfs"], + subdirs_crawl=False) + + fout.flush() + os.fsync(fout.fileno()) + + +def sort_unique(filename): + execute(["sort", "-u", "-o", filename, filename], + exit_msg="Sort failed", logger=logger) + + +def get_changes(brick, hash_dir, log_file, end, args): + """ + Makes use of libgfchangelog's history API to get changelogs + containing changes from start and end time. Further collects + the modified gfids from the changelogs and writes the list + of gfid to 'gfid_list' file. + """ + try: + libgfchangelog.cl_register(brick, hash_dir, log_file, + CHANGELOG_LOG_LEVEL, CHANGELOG_CONN_RETRIES) + except libgfchangelog.ChangelogException as e: + fail("%s Changelog register failed: %s" % (brick, e), logger=logger) + + # Output files to record GFIDs and GFID to Path failure GFIDs + gfid_list_path = args.outfile + ".gfids" + gfid_list_failures_file = gfid_list_path + ".failures" + create_file(gfid_list_path, exit_on_err=True, logger=logger) + create_file(gfid_list_failures_file, exit_on_err=True, logger=logger) + + # Changelogs path(Hard coded to BRICK/.glusterfs/changelogs + cl_path = os.path.join(brick, ".glusterfs/changelogs") + + # Fail if History fails for requested Start and End + try: + actual_end = libgfchangelog.cl_history_changelog( + cl_path, args.start, end, CHANGELOGAPI_NUM_WORKERS) + except libgfchangelog.ChangelogException as e: + fail("%s Historical Changelogs not available: %s" % (brick, e), + logger=logger) + + try: + # scan followed by getchanges till scan returns zero. + # history_scan() is blocking call, till it gets the number + # of changelogs to process. Returns zero when no changelogs + # to be processed. returns positive value as number of changelogs + # to be processed, which will be fetched using + # history_getchanges() + changes = [] + while libgfchangelog.cl_history_scan() > 0: + changes += libgfchangelog.cl_history_getchanges() + + if changes: + with open(gfid_list_path, 'a+') as fgfid: + for change in changes: + with open(change) as f: + for line in f: + # Space delimited list, collect GFID + details = line.split() + fgfid.write("%s\n" % details[1]) + + libgfchangelog.cl_history_done(change) + fgfid.flush() + os.fsync(fgfid.fileno()) + except libgfchangelog.ChangelogException as e: + fail("%s Error during Changelog Crawl: %s" % (brick, e), + logger=logger) + + # If TS returned from history_changelog is < end time + # then FS crawl may be required, since history is only available + # till TS returned from history_changelog + if actual_end < end: + fail("Partial History available with Changelog", 2, logger=logger) + + sort_unique(gfid_list_path) + gfid_to_path_using_pgfid(brick, gfid_list_path, + args.outfile, gfid_list_failures_file) + gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile) + + +def changelog_crawl(brick, end, args): + """ + Init function, prepares working dir and calls Changelog query + """ + if brick.endswith("/"): + brick = brick[0:len(brick)-1] + + # WORKING_DIR/BRICKHASH/OUTFILE + working_dir = os.path.dirname(args.outfile) + brickhash = hashlib.sha1(brick) + brickhash = str(brickhash.hexdigest()) + working_dir = os.path.join(working_dir, brickhash) + + mkdirp(working_dir, exit_on_err=True, logger=logger) + create_file(args.outfile, exit_on_err=True, logger=logger) + create_file(args.outfile + ".gfids", exit_on_err=True, logger=logger) + + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "changelog.%s.log" % brickhash) + + logger.info("%s Started Changelog Crawl. Start: %s, End: %s" + % (brick, args.start, end)) + get_changes(brick, working_dir, log_file, end, args) + + +def _get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=PROG_DESCRIPTION) + + parser.add_argument("session", help="Session Name") + parser.add_argument("volume", help="Volume Name") + parser.add_argument("brick", help="Brick Name") + parser.add_argument("outfile", help="Output File") + parser.add_argument("start", help="Start Time", type=int) + parser.add_argument("--debug", help="Debug", action="store_true") + parser.add_argument("--output-prefix", help="File prefix in output", + default=".") + + return parser.parse_args() + + +if __name__ == "__main__": + args = _get_args() + mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "changelog.log") + setup_logger(logger, log_file, args.debug) + end = int(time.time()) - int(conf.get_opt("changelog_rollover_time")) + changelog_crawl(args.brick, end, args) + sys.exit(0) |