diff options
Diffstat (limited to 'tools/glusterfind/src')
-rw-r--r-- | tools/glusterfind/src/Makefile.am | 4 | ||||
-rw-r--r-- | tools/glusterfind/src/brickfind.py | 2 | ||||
-rw-r--r-- | tools/glusterfind/src/changelog.py | 394 | ||||
-rw-r--r-- | tools/glusterfind/src/changelogdata.py | 412 | ||||
-rw-r--r-- | tools/glusterfind/src/main.py | 34 | ||||
-rw-r--r-- | tools/glusterfind/src/utils.py | 25 |
6 files changed, 683 insertions, 188 deletions
diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am index 7b819828d97..541ff946c04 100644 --- a/tools/glusterfind/src/Makefile.am +++ b/tools/glusterfind/src/Makefile.am @@ -1,7 +1,7 @@ glusterfinddir = $(libexecdir)/glusterfs/glusterfind glusterfind_PYTHON = conf.py utils.py __init__.py \ - main.py libgfchangelog.py + main.py libgfchangelog.py changelogdata.py glusterfind_SCRIPTS = changelog.py nodeagent.py \ brickfind.py @@ -9,6 +9,6 @@ glusterfind_SCRIPTS = changelog.py nodeagent.py \ glusterfind_DATA = tool.conf EXTRA_DIST = changelog.py nodeagent.py brickfind.py \ - tool.conf + tool.conf changelogdata.py CLEANFILES = diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py index 9758bef56ff..f300638d602 100644 --- a/tools/glusterfind/src/brickfind.py +++ b/tools/glusterfind/src/brickfind.py @@ -37,7 +37,7 @@ def brickfind_crawl(brick, args): with open(args.outfile, "a+") as fout: brick_path_len = len(brick) - def output_callback(path): + def output_callback(path, filter_result): path = path.strip() path = path[brick_path_len+1:] output_write(fout, path, args.output_prefix, encode=True) diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py index 2c4ee9106e1..b5f71c7c0ee 100644 --- a/tools/glusterfind/src/changelog.py +++ b/tools/glusterfind/src/changelog.py @@ -12,16 +12,16 @@ import os import sys import time import xattr -from errno import ENOENT import logging from argparse import ArgumentParser, RawDescriptionHelpFormatter import hashlib import urllib import libgfchangelog -from utils import create_file, mkdirp, execute, symlink_gfid_to_path -from utils import fail, setup_logger, output_write, find +from utils import mkdirp, symlink_gfid_to_path +from utils import fail, setup_logger, find from utils import get_changelog_rollover_time +from changelogdata import ChangelogData import conf @@ -37,159 +37,202 @@ history_turn_time = 0 logger = logging.getLogger() -def gfid_to_path_using_batchfind(brick, gfids_file, output_file): +def output_path_prepare(path, output_prefix): """ - find -samefile gets the inode number and crawls entire namespace - to get the list of files/dirs having same inode number. - Do find without any option, except the ignore directory option, - print the output in <INODE_NUM> <PATH> format, use this output - to look into in-memory dictionary of inode numbers got from the - list of GFIDs + If Prefix is set, joins to Path, removes ending slash + and encodes it. """ - with open(output_file, "a+") as fout: - inode_dict = {} - with open(gfids_file) as f: - for gfid in f: - gfid = gfid.strip() - backend_path = os.path.join(brick, ".glusterfs", - gfid[0:2], gfid[2:4], gfid) - - try: - inode_dict[str(os.stat(backend_path).st_ino)] = 1 - except (IOError, OSError) as e: - if e.errno == ENOENT: - continue - else: - fail("%s Failed to convert to path from " - "GFID %s: %s" % (brick, gfid, e), logger=logger) - - if not inode_dict: - return - - def inode_filter(path): - try: - st = os.lstat(path) - except (OSError, IOError) as e: - if e.errno == ENOENT: - st = None - else: - raise - - if st and inode_dict.get(str(st.st_ino), None): - return True - - return False - - brick_path_len = len(brick) - - def output_callback(path): - path = path.strip() - path = path[brick_path_len+1:] - output_write(fout, path, args.output_prefix) - - ignore_dirs = [os.path.join(brick, dirname) - for dirname in - conf.get_opt("brick_ignore_dirs").split(",")] - # Length of brick path, to remove from output path - find(brick, callback_func=output_callback, - filter_func=inode_filter, - ignore_dirs=ignore_dirs) + if output_prefix != ".": + path = os.path.join(output_prefix, path) + if path.endswith("/"): + path = path[0:len(path)-1] + + return urllib.quote_plus(path) + + +def pgfid_to_path(brick, changelog_data): + """ + For all the pgfids in table, converts into path using recursive + readlink. + """ + # pgfid1 to path1 in case of CREATE/MKNOD/MKDIR/LINK/SYMLINK + for row in changelog_data.gfidpath_get_distinct("pgfid1", {"path1": ""}): + # In case of Data/Metadata only, pgfid1 will not be their + if row[0] == "": + continue + + path = symlink_gfid_to_path(brick, row[0]) + path = output_path_prepare(path, args.output_prefix) + + changelog_data.gfidpath_set_path1(path, row[0]) + + # pgfid2 to path2 in case of RENAME + for row in changelog_data.gfidpath_get_distinct("pgfid2", + {"type": "RENAME", + "path2": ""}): + # Only in case of Rename pgfid2 exists + if row[0] == "": + continue - fout.flush() - os.fsync(fout.fileno()) + path = symlink_gfid_to_path(brick, row[0]) + if path == "": + continue + path = output_path_prepare(path, args.output_prefix) + changelog_data.gfidpath_set_path2(path, row[0]) -def gfid_to_path_using_pgfid(brick, gfids_file, output_file, outfile_failures): + +def populate_pgfid_and_inodegfid(brick, changelog_data): """ - Parent GFID is saved as xattr, collect Parent GFIDs from all - the files from gfids_file. Convert parent GFID to path and Crawl - each directories to get the list of files/dirs having same inode number. - Do find with maxdepth as 1 and print the output in <INODE_NUM> <PATH> - format, use this output to look into in memory dictionary of inode - numbers got from the list of GFIDs + For all the DATA/METADATA modifications GFID, + If symlink, directly convert to Path using Readlink. + If not symlink, try to get PGFIDs via xattr query and populate it + to pgfid table, collect inodes in inodegfid table """ - with open(output_file, "a+") as fout: - pgfids = set() - inode_dict = {} - with open(gfids_file) as f: - for gfid in f: - gfid = gfid.strip() - p = os.path.join(brick, - ".glusterfs", - gfid[0:2], - gfid[2:4], - gfid) - if os.path.islink(p): - path = symlink_gfid_to_path(brick, gfid) - output_write(fout, path, args.output_prefix) - else: - try: - inode_dict[str(os.stat(p).st_ino)] = 1 - file_xattrs = xattr.list(p) - num_parent_gfid = 0 - for x in file_xattrs: - if x.startswith("trusted.pgfid."): - num_parent_gfid += 1 - pgfids.add(x.split(".")[-1]) - - if num_parent_gfid == 0: - with open(outfile_failures, "a") as f: - f.write("%s\n" % gfid) - f.flush() - os.fsync(f.fileno()) - - except (IOError, OSError) as e: - if e.errno == ENOENT: - continue - else: - fail("%s Failed to convert to path from " - "GFID %s: %s" % (brick, gfid, e), - logger=logger) - - if not inode_dict: - return - - def inode_filter(path): + for row in changelog_data.gfidpath_get({"path1": "", "type": "MODIFY"}): + gfid = row[3].strip() + p = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid) + if os.path.islink(p): + # It is a Directory if GFID backend path is symlink + path = symlink_gfid_to_path(brick, gfid) + if path == "": + continue + + path = output_path_prepare(path, args.output_prefix) + + changelog_data.gfidpath_update({"path1": path}, + {"gfid": row[0]}) + else: try: - st = os.lstat(path) - except (OSError, IOError) as e: - if e.errno == ENOENT: - st = None - else: - raise + # INODE and GFID to inodegfid table + changelog_data.inodegfid_add(os.stat(p).st_ino, gfid) + file_xattrs = xattr.list(p) + for x in file_xattrs: + if x.startswith("trusted.pgfid."): + # PGFID in pgfid table + changelog_data.pgfid_add(x.split(".")[-1]) + except (IOError, OSError): + # All OS Errors ignored, since failures will be logged + # in End. All GFIDs present in gfidpath table + continue + + +def gfid_to_path_using_pgfid(brick, changelog_data, args): + """ + For all the pgfids collected, Converts to Path and + does readdir on those directories and looks up inodegfid + table for matching inode number. + """ + populate_pgfid_and_inodegfid(brick, changelog_data) + + # If no GFIDs needs conversion to Path + if not changelog_data.inodegfid_exists({"converted": 0}): + return + + def inode_filter(path): + # Looks in inodegfid table, if exists returns + # inode number else None + try: + st = os.lstat(path) + except (OSError, IOError): + st = None + + if st and changelog_data.inodegfid_exists({"inode": st.st_ino}): + return st.st_ino + + return None + + # Length of brick path, to remove from output path + brick_path_len = len(brick) + + def output_callback(path, inode): + # For each path found, encodes it and updates path1 + # Also updates converted flag in inodegfid table as 1 + path = path.strip() + path = path[brick_path_len+1:] + + path = output_path_prepare(path, args.output_prefix) - if st and inode_dict.get(str(st.st_ino), None): - return True + changelog_data.append_path1(path, inode) + changelog_data.inodegfid_update({"converted": 1}, {"inode": inode}) - return False + ignore_dirs = [os.path.join(brick, dirname) + for dirname in + conf.get_opt("brick_ignore_dirs").split(",")] - # Length of brick path, to remove from output path - brick_path_len = len(brick) + for row in changelog_data.pgfid_get(): + path = symlink_gfid_to_path(brick, row[0]) + find(os.path.join(brick, path), + callback_func=output_callback, + filter_func=inode_filter, + ignore_dirs=ignore_dirs, + subdirs_crawl=False) + + +def gfid_to_path_using_batchfind(brick, changelog_data): + # If all the GFIDs converted using gfid_to_path_using_pgfid + if not changelog_data.inodegfid_exists({"converted": 0}): + return + + def inode_filter(path): + # Looks in inodegfid table, if exists returns + # inode number else None + try: + st = os.lstat(path) + except (OSError, IOError): + st = None + + if st and changelog_data.inodegfid_exists({"inode": st.st_ino}): + return st.st_ino - def output_callback(path): - path = path.strip() - path = path[brick_path_len+1:] - output_write(fout, path, args.output_prefix) + return None - ignore_dirs = [os.path.join(brick, dirname) - for dirname in - conf.get_opt("brick_ignore_dirs").split(",")] + # Length of brick path, to remove from output path + brick_path_len = len(brick) - for pgfid in pgfids: - path = symlink_gfid_to_path(brick, pgfid) - find(os.path.join(brick, path), - callback_func=output_callback, - filter_func=inode_filter, - ignore_dirs=ignore_dirs, - subdirs_crawl=False) + def output_callback(path, inode): + # For each path found, encodes it and updates path1 + # Also updates converted flag in inodegfid table as 1 + path = path.strip() + path = path[brick_path_len+1:] + path = output_path_prepare(path, args.output_prefix) - fout.flush() - os.fsync(fout.fileno()) + changelog_data.append_path1(path, inode) + ignore_dirs = [os.path.join(brick, dirname) + for dirname in + conf.get_opt("brick_ignore_dirs").split(",")] -def sort_unique(filename): - execute(["sort", "-u", "-o", filename, filename], - exit_msg="Sort failed", logger=logger) + # Full Namespace Crawl + find(brick, callback_func=output_callback, + filter_func=inode_filter, + ignore_dirs=ignore_dirs) + + +def parse_changelog_to_db(changelog_data, filename): + """ + Parses a Changelog file and populates data in gfidpath table + """ + with open(filename) as f: + changelogfile = os.path.basename(filename) + for line in f: + data = line.strip().split(" ") + if data[0] == "E" and data[2] in ["CREATE", "MKNOD", "MKDIR"]: + # CREATE/MKDIR/MKNOD + changelog_data.when_create_mknod_mkdir(changelogfile, data) + elif data[0] in ["D", "M"]: + # DATA/META + if not args.only_namespace_changes: + changelog_data.when_data_meta(changelogfile, data) + elif data[0] == "E" and data[2] in ["LINK", "SYMLINK"]: + # LINK/SYMLINK + changelog_data.when_link_symlink(changelogfile, data) + elif data[0] == "E" and data[2] == "RENAME": + # RENAME + changelog_data.when_rename(changelogfile, data) + elif data[0] == "E" and data[2] in ["UNLINK", "RMDIR"]: + # UNLINK/RMDIR + changelog_data.when_unlink_rmdir(changelogfile, data) def get_changes(brick, hash_dir, log_file, start, end, args): @@ -199,6 +242,18 @@ def get_changes(brick, hash_dir, log_file, start, end, args): the modified gfids from the changelogs and writes the list of gfid to 'gfid_list' file. """ + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + + # Get previous session + try: + with open(status_file) as f: + start = int(f.read().strip()) + except (ValueError, OSError, IOError): + start = args.start + try: libgfchangelog.cl_init() libgfchangelog.cl_register(brick, hash_dir, log_file, @@ -207,10 +262,7 @@ def get_changes(brick, hash_dir, log_file, start, end, args): fail("%s Changelog register failed: %s" % (brick, e), logger=logger) # Output files to record GFIDs and GFID to Path failure GFIDs - gfid_list_path = args.outfile + ".gfids" - gfid_list_failures_file = gfid_list_path + ".failures" - create_file(gfid_list_path, exit_on_err=True, logger=logger) - create_file(gfid_list_failures_file, exit_on_err=True, logger=logger) + changelog_data = ChangelogData(args.outfile) # Changelogs path(Hard coded to BRICK/.glusterfs/changelogs cl_path = os.path.join(brick, ".glusterfs/changelogs") @@ -234,37 +286,31 @@ def get_changes(brick, hash_dir, log_file, start, end, args): while libgfchangelog.cl_history_scan() > 0: changes += libgfchangelog.cl_history_getchanges() - if changes: - with open(gfid_list_path, 'a+') as fgfid: - for change in changes: - # Ignore if last processed changelog comes - # again in list - if change.endswith(".%s" % start): - continue - - with open(change) as f: - for line in f: - # Space delimited list, collect GFID - details = line.split() - fgfid.write("%s\n" % details[1]) - - libgfchangelog.cl_history_done(change) - fgfid.flush() - os.fsync(fgfid.fileno()) + for change in changes: + # Ignore if last processed changelog comes + # again in list + if change.endswith(".%s" % start): + continue + parse_changelog_to_db(changelog_data, change) + libgfchangelog.cl_history_done(change) + + changelog_data.commit() except libgfchangelog.ChangelogException as e: fail("%s Error during Changelog Crawl: %s" % (brick, e), logger=logger) - # If TS returned from history_changelog is < end time - # then FS crawl may be required, since history is only available - # till TS returned from history_changelog - if actual_end < end: - fail("Partial History available with Changelog", 2, logger=logger) + # Convert all pgfid available from Changelogs + pgfid_to_path(brick, changelog_data) + changelog_data.commit() + + # Convert all GFIDs for which no other additional details available + gfid_to_path_using_pgfid(brick, changelog_data, args) + changelog_data.commit() - sort_unique(gfid_list_path) - gfid_to_path_using_pgfid(brick, gfid_list_path, - args.outfile, gfid_list_failures_file) - gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile) + # If some GFIDs fail to get converted from previous step, + # convert using find + gfid_to_path_using_batchfind(brick, changelog_data) + changelog_data.commit() return actual_end @@ -283,8 +329,6 @@ def changelog_crawl(brick, start, end, args): working_dir = os.path.join(working_dir, brickhash) mkdirp(working_dir, exit_on_err=True, logger=logger) - create_file(args.outfile, exit_on_err=True, logger=logger) - create_file(args.outfile + ".gfids", exit_on_err=True, logger=logger) log_file = os.path.join(conf.get_opt("log_dir"), args.session, @@ -308,6 +352,9 @@ def _get_args(): parser.add_argument("--debug", help="Debug", action="store_true") parser.add_argument("--output-prefix", help="File prefix in output", default=".") + parser.add_argument("-N", "--only-namespace-changes", + help="List only namespace changes", + action="store_true") return parser.parse_args() @@ -336,8 +383,13 @@ if __name__ == "__main__": start = args.start end = int(time.time()) - get_changelog_rollover_time(args.volume) + logger.info("%s Started Changelog Crawl - Start: %s End: %s" % (args.brick, + start, + end)) actual_end = changelog_crawl(args.brick, start, end, args) with open(status_file_pre, "w", buffering=0) as f: f.write(str(actual_end)) + logger.info("%s Finished Changelog Crawl - End: %s" % (args.brick, + actual_end)) sys.exit(0) diff --git a/tools/glusterfind/src/changelogdata.py b/tools/glusterfind/src/changelogdata.py new file mode 100644 index 00000000000..c42aa2a2315 --- /dev/null +++ b/tools/glusterfind/src/changelogdata.py @@ -0,0 +1,412 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sqlite3 +import urllib +import os + +from utils import RecordType + + +class OutputMerger(object): + """ + Class to merge the output files collected from + different nodes + """ + def __init__(self, db_path, all_dbs): + self.conn = sqlite3.connect(db_path) + self.cursor = self.conn.cursor() + self.cursor_reader = self.conn.cursor() + query = "DROP TABLE IF EXISTS finallist" + self.cursor.execute(query) + + query = """ + CREATE TABLE finallist( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts VARCHAR, + type VARCHAR, + gfid VARCHAR, + path1 VARCHAR, + path2 VARCHAR, + UNIQUE (type, path1, path2) ON CONFLICT IGNORE + ) + """ + self.cursor.execute(query) + + # If node database exists, read each db and insert into + # final table. Ignore if combination of TYPE PATH1 PATH2 + # already exists + for node_db in all_dbs: + if os.path.exists(node_db): + conn = sqlite3.connect(node_db) + cursor = conn.cursor() + query = """ + SELECT ts, type, gfid, path1, path2 + FROM gfidpath + WHERE path1 != '' + ORDER BY id ASC + """ + for row in cursor.execute(query): + self.add_if_not_exists(row[0], row[1], row[2], + row[3], row[4]) + + self.conn.commit() + + def add_if_not_exists(self, ts, ty, gfid, path1, path2=""): + # Adds record to finallist only if not exists + query = """ + INSERT INTO finallist(ts, type, gfid, path1, path2) + VALUES(?, ?, ?, ?, ?) + """ + self.cursor.execute(query, (ts, ty, gfid, path1, path2)) + + def get(self): + query = """SELECT type, path1, path2 FROM finallist + ORDER BY ts ASC, id ASC""" + return self.cursor_reader.execute(query) + + def get_failures(self): + query = """ + SELECT gfid + FROM finallist + WHERE path1 = '' OR (path2 = '' AND type = 'RENAME') + """ + return self.cursor_reader.execute(query) + + +class ChangelogData(object): + def __init__(self, dbpath): + self.conn = sqlite3.connect(dbpath) + self.cursor = self.conn.cursor() + self.cursor_reader = self.conn.cursor() + self._create_table_gfidpath() + self._create_table_pgfid() + self._create_table_inodegfid() + + def _create_table_gfidpath(self): + drop_table = "DROP TABLE IF EXISTS gfidpath" + self.cursor.execute(drop_table) + + create_table = """ + CREATE TABLE gfidpath( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts VARCHAR, + type VARCHAR, + gfid VARCHAR(40), + pgfid1 VARCHAR(40), + bn1 VARCHAR(500), + pgfid2 VARCHAR(40), + bn2 VARCHAR(500), + path1 VARCHAR DEFAULT '', + path2 VARCHAR DEFAULT '' + ) + """ + self.cursor.execute(create_table) + + def _create_table_inodegfid(self): + drop_table = "DROP TABLE IF EXISTS inodegfid" + self.cursor.execute(drop_table) + + create_table = """ + CREATE TABLE inodegfid( + inode INTEGER PRIMARY KEY, + gfid VARCHAR(40), + converted INTEGER DEFAULT 0, + UNIQUE (inode, gfid) ON CONFLICT IGNORE + ) + """ + self.cursor.execute(create_table) + + def _create_table_pgfid(self): + drop_table = "DROP TABLE IF EXISTS pgfid" + self.cursor.execute(drop_table) + + create_table = """ + CREATE TABLE pgfid( + pgfid VARCHAR(40) PRIMARY KEY, + UNIQUE (pgfid) ON CONFLICT IGNORE + ) + """ + self.cursor.execute(create_table) + + def _get(self, tablename, filters): + # SELECT * FROM <TABLENAME> WHERE <CONDITION> + params = [] + query = "SELECT * FROM %s WHERE 1=1" % tablename + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + return self.cursor_reader.execute(query, params) + + def _get_distinct(self, tablename, distinct_field, filters): + # SELECT DISTINCT <COL> FROM <TABLENAME> WHERE <CONDITION> + params = [] + query = "SELECT DISTINCT %s FROM %s WHERE 1=1" % (distinct_field, + tablename) + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + return self.cursor_reader.execute(query, params) + + def _delete(self, tablename, filters): + # DELETE FROM <TABLENAME> WHERE <CONDITIONS> + query = "DELETE FROM %s WHERE 1=1" % tablename + params = [] + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + self.cursor.execute(query, params) + + def _add(self, tablename, data): + # INSERT INTO <TABLENAME>(<col1>, <col2>..) VALUES(?,?..) + query = "INSERT INTO %s(" % tablename + fields = [] + params = [] + for key, value in data.items(): + fields.append(key) + params.append(value) + + values_substitute = len(fields)*["?"] + query += "%s) VALUES(%s)" % (",".join(fields), + ",".join(values_substitute)) + self.cursor.execute(query, params) + + def _update(self, tablename, data, filters): + # UPDATE <TABLENAME> SET col1 = ?,.. WHERE col1=? AND .. + params = [] + update_fields = [] + for key, value in data.items(): + update_fields.append("%s = ?" % key) + params.append(value) + + query = "UPDATE %s SET %s WHERE 1 = 1" % (tablename, + ", ".join(update_fields)) + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + self.cursor.execute(query, params) + + def _exists(self, tablename, filters): + if not filters: + return False + + query = "SELECT COUNT(1) FROM %s WHERE 1=1" % tablename + params = [] + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + self.cursor.execute(query, params) + row = self.cursor.fetchone() + return True if row[0] > 0 else False + + def gfidpath_add(self, changelogfile, ty, gfid, pgfid1="", bn1="", + pgfid2="", bn2="", path1="", path2=""): + self._add("gfidpath", { + "ts": changelogfile.split(".")[-1], + "type": ty, + "gfid": gfid, + "pgfid1": pgfid1, + "bn1": bn1, + "pgfid2": pgfid2, + "bn2": bn2, + "path1": path1, + "path2": path2 + }) + + def gfidpath_update(self, data, filters): + self._update("gfidpath", data, filters) + + def gfidpath_delete(self, filters): + self._delete("gfidpath", filters) + + def gfidpath_exists(self, filters): + return self._exists("gfidpath", filters) + + def gfidpath_get(self, filters={}): + return self._get("gfidpath", filters) + + def gfidpath_get_distinct(self, distinct_field, filters={}): + return self._get_distinct("gfidpath", distinct_field, filters) + + def pgfid_add(self, pgfid): + self._add("pgfid", { + "pgfid": pgfid + }) + + def pgfid_update(self, data, filters): + self._update("pgfid", data, filters) + + def pgfid_get(self, filters={}): + return self._get("pgfid", filters) + + def pgfid_get_distinct(self, distinct_field, filters={}): + return self._get_distinct("pgfid", distinct_field, filters) + + def pgfid_exists(self, filters): + return self._exists("pgfid", filters) + + def inodegfid_add(self, inode, gfid, converted=0): + self._add("inodegfid", { + "inode": inode, + "gfid": gfid, + "converted": converted + }) + + def inodegfid_update(self, data, filters): + self._update("inodegfid", data, filters) + + def inodegfid_get(self, filters={}): + return self._get("inodegfid", filters) + + def inodegfid_get_distinct(self, distinct_field, filters={}): + return self._get_distinct("inodegfid", distinct_field, filters) + + def inodegfid_exists(self, filters): + return self._exists("inodegfid", filters) + + def append_path1(self, path, inode): + # || is for concatenate in SQL + query = """UPDATE gfidpath SET path1 = ',' || ? + WHERE gfid IN (SELECT gfid FROM inodegfid WHERE inode = ?)""" + self.cursor.execute(query, (path, inode)) + + def gfidpath_set_path1(self, path1, pgfid1): + # || is for concatenate in SQL + if path1 == "": + update_str1 = "? || bn1" + update_str2 = "? || bn2" + else: + update_str1 = "? || '%2F' || bn1" + update_str2 = "? || '%2F' || bn2" + + query = """UPDATE gfidpath SET path1 = %s + WHERE pgfid1 = ?""" % update_str1 + self.cursor.execute(query, (path1, pgfid1)) + + # Set Path2 if pgfid1 and pgfid2 are same + query = """UPDATE gfidpath SET path2 = %s + WHERE pgfid2 = ?""" % update_str2 + self.cursor.execute(query, (path1, pgfid1)) + + def gfidpath_set_path2(self, path2, pgfid2): + # || is for concatenate in SQL + if path2 == "": + update_str = "? || bn2" + else: + update_str = "? || '%2F' || bn2" + + query = """UPDATE gfidpath SET path2 = %s + WHERE pgfid2 = ?""" % update_str + self.cursor.execute(query, (path2, pgfid2)) + + def when_create_mknod_mkdir(self, changelogfile, data): + # E <GFID> <MKNOD|CREATE|MKDIR> <MODE> <USER> <GRP> <PGFID>/<BNAME> + # Add the Entry to DB + pgfid1, bn1 = urllib.unquote_plus(data[6]).split("/", 1) + + # Quote again the basename + bn1 = urllib.quote_plus(bn1.strip()) + + self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1) + + def when_rename(self, changelogfile, data): + # E <GFID> RENAME <OLD_PGFID>/<BNAME> <PGFID>/<BNAME> + pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1) + pgfid2, bn2 = urllib.unquote_plus(data[4]).split("/", 1) + + # Quote again the basename + bn1 = urllib.quote_plus(bn1.strip()) + bn2 = urllib.quote_plus(bn2.strip()) + + if self.gfidpath_exists({"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}): + # If <OLD_PGFID>/<BNAME> is same as CREATE, Update + # <NEW_PGFID>/<BNAME> in NEW. + self.gfidpath_update({"pgfid1": pgfid2, "bn1": bn2}, + {"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}) + elif self.gfidpath_exists({"gfid": data[1], "type": "RENAME", + "pgfid2": pgfid1, "bn2": bn1}): + # If <OLD_PGFID>/<BNAME> is same as <PGFID2>/<BN2>(may be previous + # RENAME) then UPDATE <NEW_PGFID>/<BNAME> as <PGFID2>/<BN2> + self.gfidpath_update({"pgfid2": pgfid2, "bn2": bn2}, + {"gfid": data[1], "type": "RENAME", + "pgfid2": pgfid1, "bn2": bn1}) + else: + # Else insert as RENAME + self.gfidpath_add(changelogfile, RecordType.RENAME, data[1], + pgfid1, bn1, pgfid2, bn2) + + def when_link_symlink(self, changelogfile, data): + # E <GFID> <LINK|SYMLINK> <PGFID>/<BASENAME> + # Add as New record in Db as Type NEW + pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1) + + # Quote again the basename + bn1 = urllib.quote_plus(bn1.strip()) + + self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1) + + def when_data_meta(self, changelogfile, data): + # If GFID row exists, Ignore else Add to Db + if not self.gfidpath_exists({"gfid": data[1]}): + self.gfidpath_add(changelogfile, RecordType.MODIFY, data[1]) + + def when_unlink_rmdir(self, changelogfile, data): + # E <GFID> <UNLINK|RMDIR> <PGFID>/<BASENAME> + pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1) + # Quote again the basename + bn1 = urllib.quote_plus(bn1.strip()) + deleted_path = data[4] if len(data) == 5 else "" + + if self.gfidpath_exists({"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}): + # If path exists in table as NEW with same GFID + # Delete that row + self.gfidpath_delete({"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}) + else: + # Else Record as DELETE + self.gfidpath_add(changelogfile, RecordType.DELETE, data[1], + pgfid1, bn1, path1=deleted_path) + + # Update path1 as deleted_path if pgfid1 and bn1 is same as deleted + self.gfidpath_update({"path1": deleted_path}, {"gfid": data[1], + "pgfid1": pgfid1, + "bn1": bn1}) + + # Update path2 as deleted_path if pgfid2 and bn2 is same as deleted + self.gfidpath_update({"path2": deleted_path}, { + "type": RecordType.RENAME, + "gfid": data[1], + "pgfid2": pgfid1, + "bn2": bn1}) + + # If deleted directory is parent for somebody + query1 = """UPDATE gfidpath SET path1 = ? || '%2F' || bn1 + WHERE pgfid1 = ? AND path1 != ''""" + self.cursor.execute(query1, (deleted_path, data[1])) + + query1 = """UPDATE gfidpath SET path2 = ? || '%2F' || bn1 + WHERE pgfid2 = ? AND path2 != ''""" + self.cursor.execute(query1, (deleted_path, data[1])) + + def commit(self): + self.conn.commit() diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py index 089a3aec3c5..d9936eebde1 100644 --- a/tools/glusterfind/src/main.py +++ b/tools/glusterfind/src/main.py @@ -20,9 +20,9 @@ import shutil from utils import execute, is_host_local, mkdirp, fail from utils import setup_logger, human_time, handle_rm_error -from utils import get_changelog_rollover_time, cache_output +from utils import get_changelog_rollover_time, cache_output, create_file import conf - +from changelogdata import OutputMerger PROG_DESCRIPTION = """ GlusterFS Incremental API @@ -235,6 +235,9 @@ def _get_args(): help="Regenerate outfile, discard the outfile " "generated from last pre command", action="store_true") + parser_pre.add_argument("-N", "--only-namespace-changes", + help="List only namespace changes", + action="store_true") # post <SESSION> <VOLUME> parser_post = subparsers.add_parser('post') @@ -377,10 +380,29 @@ def mode_pre(session_dir, args): run_cmd_nodes("pre", args, start=start) # Merger - cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] - execute(cmd, - exit_msg="Failed to merge output files " - "collected from nodes", logger=logger) + if args.full: + cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + else: + # Read each Changelogs db and generate finaldb + create_file(args.outfile, exit_on_err=True, logger=logger) + outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles) + + with open(args.outfile, "a") as f: + for row in outfilemerger.get(): + # Multiple paths in case of Hardlinks + paths = row[1].split(",") + for p in paths: + if p == "": + continue + f.write("%s %s %s\n" % (row[0], p, row[2])) + + try: + os.remove(args.outfile + ".db") + except (IOError, OSError): + pass run_cmd_nodes("cleanup", args) diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py index aea9a9dc82d..cda5ea6378e 100644 --- a/tools/glusterfind/src/utils.py +++ b/tools/glusterfind/src/utils.py @@ -24,6 +24,13 @@ ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError cache_data = {} +class RecordType(object): + NEW = "NEW" + MODIFY = "MODIFY" + RENAME = "RENAME" + DELETE = "DELETE" + + def cache_output(func): def wrapper(*args, **kwargs): global cache_data @@ -46,8 +53,10 @@ def find(path, callback_func=lambda x: True, filter_func=lambda x: True, if path in ignore_dirs: return - if filter_func(path): - callback_func(path) + # Capture filter_func output and pass it to callback function + filter_result = filter_func(path) + if filter_result is not None: + callback_func(path, filter_result) for p in os.listdir(path): full_path = os.path.join(path, p) @@ -56,11 +65,13 @@ def find(path, callback_func=lambda x: True, filter_func=lambda x: True, if subdirs_crawl: find(full_path, callback_func, filter_func, ignore_dirs) else: - if filter_func(full_path): - callback_func(full_path) + filter_result = filter_func(full_path) + if filter_result is not None: + callback_func(full_path, filter_result) else: - if filter_func(full_path): - callback_func(full_path) + filter_result = filter_func(full_path) + if filter_result is not None: + callback_func(full_path, filter_result) def output_write(f, path, prefix=".", encode=False): @@ -215,5 +226,3 @@ def get_changelog_rollover_time(volumename): return int(tree.find('volGetopts/Value').text) except ParseError: return DEFAULT_CHANGELOG_INTERVAL - - |