diff options
Diffstat (limited to 'tools/glusterfind/src/changelog.py')
-rw-r--r-- | tools/glusterfind/src/changelog.py | 394 |
1 files changed, 223 insertions, 171 deletions
diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py index 2c4ee9106e1..b5f71c7c0ee 100644 --- a/tools/glusterfind/src/changelog.py +++ b/tools/glusterfind/src/changelog.py @@ -12,16 +12,16 @@ import os import sys import time import xattr -from errno import ENOENT import logging from argparse import ArgumentParser, RawDescriptionHelpFormatter import hashlib import urllib import libgfchangelog -from utils import create_file, mkdirp, execute, symlink_gfid_to_path -from utils import fail, setup_logger, output_write, find +from utils import mkdirp, symlink_gfid_to_path +from utils import fail, setup_logger, find from utils import get_changelog_rollover_time +from changelogdata import ChangelogData import conf @@ -37,159 +37,202 @@ history_turn_time = 0 logger = logging.getLogger() -def gfid_to_path_using_batchfind(brick, gfids_file, output_file): +def output_path_prepare(path, output_prefix): """ - find -samefile gets the inode number and crawls entire namespace - to get the list of files/dirs having same inode number. - Do find without any option, except the ignore directory option, - print the output in <INODE_NUM> <PATH> format, use this output - to look into in-memory dictionary of inode numbers got from the - list of GFIDs + If Prefix is set, joins to Path, removes ending slash + and encodes it. """ - with open(output_file, "a+") as fout: - inode_dict = {} - with open(gfids_file) as f: - for gfid in f: - gfid = gfid.strip() - backend_path = os.path.join(brick, ".glusterfs", - gfid[0:2], gfid[2:4], gfid) - - try: - inode_dict[str(os.stat(backend_path).st_ino)] = 1 - except (IOError, OSError) as e: - if e.errno == ENOENT: - continue - else: - fail("%s Failed to convert to path from " - "GFID %s: %s" % (brick, gfid, e), logger=logger) - - if not inode_dict: - return - - def inode_filter(path): - try: - st = os.lstat(path) - except (OSError, IOError) as e: - if e.errno == ENOENT: - st = None - else: - raise - - if st and inode_dict.get(str(st.st_ino), None): - return True - - return False - - brick_path_len = len(brick) - - def output_callback(path): - path = path.strip() - path = path[brick_path_len+1:] - output_write(fout, path, args.output_prefix) - - ignore_dirs = [os.path.join(brick, dirname) - for dirname in - conf.get_opt("brick_ignore_dirs").split(",")] - # Length of brick path, to remove from output path - find(brick, callback_func=output_callback, - filter_func=inode_filter, - ignore_dirs=ignore_dirs) + if output_prefix != ".": + path = os.path.join(output_prefix, path) + if path.endswith("/"): + path = path[0:len(path)-1] + + return urllib.quote_plus(path) + + +def pgfid_to_path(brick, changelog_data): + """ + For all the pgfids in table, converts into path using recursive + readlink. + """ + # pgfid1 to path1 in case of CREATE/MKNOD/MKDIR/LINK/SYMLINK + for row in changelog_data.gfidpath_get_distinct("pgfid1", {"path1": ""}): + # In case of Data/Metadata only, pgfid1 will not be their + if row[0] == "": + continue + + path = symlink_gfid_to_path(brick, row[0]) + path = output_path_prepare(path, args.output_prefix) + + changelog_data.gfidpath_set_path1(path, row[0]) + + # pgfid2 to path2 in case of RENAME + for row in changelog_data.gfidpath_get_distinct("pgfid2", + {"type": "RENAME", + "path2": ""}): + # Only in case of Rename pgfid2 exists + if row[0] == "": + continue - fout.flush() - os.fsync(fout.fileno()) + path = symlink_gfid_to_path(brick, row[0]) + if path == "": + continue + path = output_path_prepare(path, args.output_prefix) + changelog_data.gfidpath_set_path2(path, row[0]) -def gfid_to_path_using_pgfid(brick, gfids_file, output_file, outfile_failures): + +def populate_pgfid_and_inodegfid(brick, changelog_data): """ - Parent GFID is saved as xattr, collect Parent GFIDs from all - the files from gfids_file. Convert parent GFID to path and Crawl - each directories to get the list of files/dirs having same inode number. - Do find with maxdepth as 1 and print the output in <INODE_NUM> <PATH> - format, use this output to look into in memory dictionary of inode - numbers got from the list of GFIDs + For all the DATA/METADATA modifications GFID, + If symlink, directly convert to Path using Readlink. + If not symlink, try to get PGFIDs via xattr query and populate it + to pgfid table, collect inodes in inodegfid table """ - with open(output_file, "a+") as fout: - pgfids = set() - inode_dict = {} - with open(gfids_file) as f: - for gfid in f: - gfid = gfid.strip() - p = os.path.join(brick, - ".glusterfs", - gfid[0:2], - gfid[2:4], - gfid) - if os.path.islink(p): - path = symlink_gfid_to_path(brick, gfid) - output_write(fout, path, args.output_prefix) - else: - try: - inode_dict[str(os.stat(p).st_ino)] = 1 - file_xattrs = xattr.list(p) - num_parent_gfid = 0 - for x in file_xattrs: - if x.startswith("trusted.pgfid."): - num_parent_gfid += 1 - pgfids.add(x.split(".")[-1]) - - if num_parent_gfid == 0: - with open(outfile_failures, "a") as f: - f.write("%s\n" % gfid) - f.flush() - os.fsync(f.fileno()) - - except (IOError, OSError) as e: - if e.errno == ENOENT: - continue - else: - fail("%s Failed to convert to path from " - "GFID %s: %s" % (brick, gfid, e), - logger=logger) - - if not inode_dict: - return - - def inode_filter(path): + for row in changelog_data.gfidpath_get({"path1": "", "type": "MODIFY"}): + gfid = row[3].strip() + p = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid) + if os.path.islink(p): + # It is a Directory if GFID backend path is symlink + path = symlink_gfid_to_path(brick, gfid) + if path == "": + continue + + path = output_path_prepare(path, args.output_prefix) + + changelog_data.gfidpath_update({"path1": path}, + {"gfid": row[0]}) + else: try: - st = os.lstat(path) - except (OSError, IOError) as e: - if e.errno == ENOENT: - st = None - else: - raise + # INODE and GFID to inodegfid table + changelog_data.inodegfid_add(os.stat(p).st_ino, gfid) + file_xattrs = xattr.list(p) + for x in file_xattrs: + if x.startswith("trusted.pgfid."): + # PGFID in pgfid table + changelog_data.pgfid_add(x.split(".")[-1]) + except (IOError, OSError): + # All OS Errors ignored, since failures will be logged + # in End. All GFIDs present in gfidpath table + continue + + +def gfid_to_path_using_pgfid(brick, changelog_data, args): + """ + For all the pgfids collected, Converts to Path and + does readdir on those directories and looks up inodegfid + table for matching inode number. + """ + populate_pgfid_and_inodegfid(brick, changelog_data) + + # If no GFIDs needs conversion to Path + if not changelog_data.inodegfid_exists({"converted": 0}): + return + + def inode_filter(path): + # Looks in inodegfid table, if exists returns + # inode number else None + try: + st = os.lstat(path) + except (OSError, IOError): + st = None + + if st and changelog_data.inodegfid_exists({"inode": st.st_ino}): + return st.st_ino + + return None + + # Length of brick path, to remove from output path + brick_path_len = len(brick) + + def output_callback(path, inode): + # For each path found, encodes it and updates path1 + # Also updates converted flag in inodegfid table as 1 + path = path.strip() + path = path[brick_path_len+1:] + + path = output_path_prepare(path, args.output_prefix) - if st and inode_dict.get(str(st.st_ino), None): - return True + changelog_data.append_path1(path, inode) + changelog_data.inodegfid_update({"converted": 1}, {"inode": inode}) - return False + ignore_dirs = [os.path.join(brick, dirname) + for dirname in + conf.get_opt("brick_ignore_dirs").split(",")] - # Length of brick path, to remove from output path - brick_path_len = len(brick) + for row in changelog_data.pgfid_get(): + path = symlink_gfid_to_path(brick, row[0]) + find(os.path.join(brick, path), + callback_func=output_callback, + filter_func=inode_filter, + ignore_dirs=ignore_dirs, + subdirs_crawl=False) + + +def gfid_to_path_using_batchfind(brick, changelog_data): + # If all the GFIDs converted using gfid_to_path_using_pgfid + if not changelog_data.inodegfid_exists({"converted": 0}): + return + + def inode_filter(path): + # Looks in inodegfid table, if exists returns + # inode number else None + try: + st = os.lstat(path) + except (OSError, IOError): + st = None + + if st and changelog_data.inodegfid_exists({"inode": st.st_ino}): + return st.st_ino - def output_callback(path): - path = path.strip() - path = path[brick_path_len+1:] - output_write(fout, path, args.output_prefix) + return None - ignore_dirs = [os.path.join(brick, dirname) - for dirname in - conf.get_opt("brick_ignore_dirs").split(",")] + # Length of brick path, to remove from output path + brick_path_len = len(brick) - for pgfid in pgfids: - path = symlink_gfid_to_path(brick, pgfid) - find(os.path.join(brick, path), - callback_func=output_callback, - filter_func=inode_filter, - ignore_dirs=ignore_dirs, - subdirs_crawl=False) + def output_callback(path, inode): + # For each path found, encodes it and updates path1 + # Also updates converted flag in inodegfid table as 1 + path = path.strip() + path = path[brick_path_len+1:] + path = output_path_prepare(path, args.output_prefix) - fout.flush() - os.fsync(fout.fileno()) + changelog_data.append_path1(path, inode) + ignore_dirs = [os.path.join(brick, dirname) + for dirname in + conf.get_opt("brick_ignore_dirs").split(",")] -def sort_unique(filename): - execute(["sort", "-u", "-o", filename, filename], - exit_msg="Sort failed", logger=logger) + # Full Namespace Crawl + find(brick, callback_func=output_callback, + filter_func=inode_filter, + ignore_dirs=ignore_dirs) + + +def parse_changelog_to_db(changelog_data, filename): + """ + Parses a Changelog file and populates data in gfidpath table + """ + with open(filename) as f: + changelogfile = os.path.basename(filename) + for line in f: + data = line.strip().split(" ") + if data[0] == "E" and data[2] in ["CREATE", "MKNOD", "MKDIR"]: + # CREATE/MKDIR/MKNOD + changelog_data.when_create_mknod_mkdir(changelogfile, data) + elif data[0] in ["D", "M"]: + # DATA/META + if not args.only_namespace_changes: + changelog_data.when_data_meta(changelogfile, data) + elif data[0] == "E" and data[2] in ["LINK", "SYMLINK"]: + # LINK/SYMLINK + changelog_data.when_link_symlink(changelogfile, data) + elif data[0] == "E" and data[2] == "RENAME": + # RENAME + changelog_data.when_rename(changelogfile, data) + elif data[0] == "E" and data[2] in ["UNLINK", "RMDIR"]: + # UNLINK/RMDIR + changelog_data.when_unlink_rmdir(changelogfile, data) def get_changes(brick, hash_dir, log_file, start, end, args): @@ -199,6 +242,18 @@ def get_changes(brick, hash_dir, log_file, start, end, args): the modified gfids from the changelogs and writes the list of gfid to 'gfid_list' file. """ + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + + # Get previous session + try: + with open(status_file) as f: + start = int(f.read().strip()) + except (ValueError, OSError, IOError): + start = args.start + try: libgfchangelog.cl_init() libgfchangelog.cl_register(brick, hash_dir, log_file, @@ -207,10 +262,7 @@ def get_changes(brick, hash_dir, log_file, start, end, args): fail("%s Changelog register failed: %s" % (brick, e), logger=logger) # Output files to record GFIDs and GFID to Path failure GFIDs - gfid_list_path = args.outfile + ".gfids" - gfid_list_failures_file = gfid_list_path + ".failures" - create_file(gfid_list_path, exit_on_err=True, logger=logger) - create_file(gfid_list_failures_file, exit_on_err=True, logger=logger) + changelog_data = ChangelogData(args.outfile) # Changelogs path(Hard coded to BRICK/.glusterfs/changelogs cl_path = os.path.join(brick, ".glusterfs/changelogs") @@ -234,37 +286,31 @@ def get_changes(brick, hash_dir, log_file, start, end, args): while libgfchangelog.cl_history_scan() > 0: changes += libgfchangelog.cl_history_getchanges() - if changes: - with open(gfid_list_path, 'a+') as fgfid: - for change in changes: - # Ignore if last processed changelog comes - # again in list - if change.endswith(".%s" % start): - continue - - with open(change) as f: - for line in f: - # Space delimited list, collect GFID - details = line.split() - fgfid.write("%s\n" % details[1]) - - libgfchangelog.cl_history_done(change) - fgfid.flush() - os.fsync(fgfid.fileno()) + for change in changes: + # Ignore if last processed changelog comes + # again in list + if change.endswith(".%s" % start): + continue + parse_changelog_to_db(changelog_data, change) + libgfchangelog.cl_history_done(change) + + changelog_data.commit() except libgfchangelog.ChangelogException as e: fail("%s Error during Changelog Crawl: %s" % (brick, e), logger=logger) - # If TS returned from history_changelog is < end time - # then FS crawl may be required, since history is only available - # till TS returned from history_changelog - if actual_end < end: - fail("Partial History available with Changelog", 2, logger=logger) + # Convert all pgfid available from Changelogs + pgfid_to_path(brick, changelog_data) + changelog_data.commit() + + # Convert all GFIDs for which no other additional details available + gfid_to_path_using_pgfid(brick, changelog_data, args) + changelog_data.commit() - sort_unique(gfid_list_path) - gfid_to_path_using_pgfid(brick, gfid_list_path, - args.outfile, gfid_list_failures_file) - gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile) + # If some GFIDs fail to get converted from previous step, + # convert using find + gfid_to_path_using_batchfind(brick, changelog_data) + changelog_data.commit() return actual_end @@ -283,8 +329,6 @@ def changelog_crawl(brick, start, end, args): working_dir = os.path.join(working_dir, brickhash) mkdirp(working_dir, exit_on_err=True, logger=logger) - create_file(args.outfile, exit_on_err=True, logger=logger) - create_file(args.outfile + ".gfids", exit_on_err=True, logger=logger) log_file = os.path.join(conf.get_opt("log_dir"), args.session, @@ -308,6 +352,9 @@ def _get_args(): parser.add_argument("--debug", help="Debug", action="store_true") parser.add_argument("--output-prefix", help="File prefix in output", default=".") + parser.add_argument("-N", "--only-namespace-changes", + help="List only namespace changes", + action="store_true") return parser.parse_args() @@ -336,8 +383,13 @@ if __name__ == "__main__": start = args.start end = int(time.time()) - get_changelog_rollover_time(args.volume) + logger.info("%s Started Changelog Crawl - Start: %s End: %s" % (args.brick, + start, + end)) actual_end = changelog_crawl(args.brick, start, end, args) with open(status_file_pre, "w", buffering=0) as f: f.write(str(actual_end)) + logger.info("%s Finished Changelog Crawl - End: %s" % (args.brick, + actual_end)) sys.exit(0) |