summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tools/glusterfind/src/Makefile.am4
-rw-r--r--tools/glusterfind/src/brickfind.py2
-rw-r--r--tools/glusterfind/src/changelog.py394
-rw-r--r--tools/glusterfind/src/changelogdata.py412
-rw-r--r--tools/glusterfind/src/main.py34
-rw-r--r--tools/glusterfind/src/utils.py25
6 files changed, 683 insertions, 188 deletions
diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am
index 7b819828d97..541ff946c04 100644
--- a/tools/glusterfind/src/Makefile.am
+++ b/tools/glusterfind/src/Makefile.am
@@ -1,7 +1,7 @@
glusterfinddir = $(libexecdir)/glusterfs/glusterfind
glusterfind_PYTHON = conf.py utils.py __init__.py \
- main.py libgfchangelog.py
+ main.py libgfchangelog.py changelogdata.py
glusterfind_SCRIPTS = changelog.py nodeagent.py \
brickfind.py
@@ -9,6 +9,6 @@ glusterfind_SCRIPTS = changelog.py nodeagent.py \
glusterfind_DATA = tool.conf
EXTRA_DIST = changelog.py nodeagent.py brickfind.py \
- tool.conf
+ tool.conf changelogdata.py
CLEANFILES =
diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py
index 9758bef56ff..f300638d602 100644
--- a/tools/glusterfind/src/brickfind.py
+++ b/tools/glusterfind/src/brickfind.py
@@ -37,7 +37,7 @@ def brickfind_crawl(brick, args):
with open(args.outfile, "a+") as fout:
brick_path_len = len(brick)
- def output_callback(path):
+ def output_callback(path, filter_result):
path = path.strip()
path = path[brick_path_len+1:]
output_write(fout, path, args.output_prefix, encode=True)
diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py
index 2c4ee9106e1..b5f71c7c0ee 100644
--- a/tools/glusterfind/src/changelog.py
+++ b/tools/glusterfind/src/changelog.py
@@ -12,16 +12,16 @@ import os
import sys
import time
import xattr
-from errno import ENOENT
import logging
from argparse import ArgumentParser, RawDescriptionHelpFormatter
import hashlib
import urllib
import libgfchangelog
-from utils import create_file, mkdirp, execute, symlink_gfid_to_path
-from utils import fail, setup_logger, output_write, find
+from utils import mkdirp, symlink_gfid_to_path
+from utils import fail, setup_logger, find
from utils import get_changelog_rollover_time
+from changelogdata import ChangelogData
import conf
@@ -37,159 +37,202 @@ history_turn_time = 0
logger = logging.getLogger()
-def gfid_to_path_using_batchfind(brick, gfids_file, output_file):
+def output_path_prepare(path, output_prefix):
"""
- find -samefile gets the inode number and crawls entire namespace
- to get the list of files/dirs having same inode number.
- Do find without any option, except the ignore directory option,
- print the output in <INODE_NUM> <PATH> format, use this output
- to look into in-memory dictionary of inode numbers got from the
- list of GFIDs
+ If Prefix is set, joins to Path, removes ending slash
+ and encodes it.
"""
- with open(output_file, "a+") as fout:
- inode_dict = {}
- with open(gfids_file) as f:
- for gfid in f:
- gfid = gfid.strip()
- backend_path = os.path.join(brick, ".glusterfs",
- gfid[0:2], gfid[2:4], gfid)
-
- try:
- inode_dict[str(os.stat(backend_path).st_ino)] = 1
- except (IOError, OSError) as e:
- if e.errno == ENOENT:
- continue
- else:
- fail("%s Failed to convert to path from "
- "GFID %s: %s" % (brick, gfid, e), logger=logger)
-
- if not inode_dict:
- return
-
- def inode_filter(path):
- try:
- st = os.lstat(path)
- except (OSError, IOError) as e:
- if e.errno == ENOENT:
- st = None
- else:
- raise
-
- if st and inode_dict.get(str(st.st_ino), None):
- return True
-
- return False
-
- brick_path_len = len(brick)
-
- def output_callback(path):
- path = path.strip()
- path = path[brick_path_len+1:]
- output_write(fout, path, args.output_prefix)
-
- ignore_dirs = [os.path.join(brick, dirname)
- for dirname in
- conf.get_opt("brick_ignore_dirs").split(",")]
- # Length of brick path, to remove from output path
- find(brick, callback_func=output_callback,
- filter_func=inode_filter,
- ignore_dirs=ignore_dirs)
+ if output_prefix != ".":
+ path = os.path.join(output_prefix, path)
+ if path.endswith("/"):
+ path = path[0:len(path)-1]
+
+ return urllib.quote_plus(path)
+
+
+def pgfid_to_path(brick, changelog_data):
+ """
+ For all the pgfids in table, converts into path using recursive
+ readlink.
+ """
+ # pgfid1 to path1 in case of CREATE/MKNOD/MKDIR/LINK/SYMLINK
+ for row in changelog_data.gfidpath_get_distinct("pgfid1", {"path1": ""}):
+ # In case of Data/Metadata only, pgfid1 will not be their
+ if row[0] == "":
+ continue
+
+ path = symlink_gfid_to_path(brick, row[0])
+ path = output_path_prepare(path, args.output_prefix)
+
+ changelog_data.gfidpath_set_path1(path, row[0])
+
+ # pgfid2 to path2 in case of RENAME
+ for row in changelog_data.gfidpath_get_distinct("pgfid2",
+ {"type": "RENAME",
+ "path2": ""}):
+ # Only in case of Rename pgfid2 exists
+ if row[0] == "":
+ continue
- fout.flush()
- os.fsync(fout.fileno())
+ path = symlink_gfid_to_path(brick, row[0])
+ if path == "":
+ continue
+ path = output_path_prepare(path, args.output_prefix)
+ changelog_data.gfidpath_set_path2(path, row[0])
-def gfid_to_path_using_pgfid(brick, gfids_file, output_file, outfile_failures):
+
+def populate_pgfid_and_inodegfid(brick, changelog_data):
"""
- Parent GFID is saved as xattr, collect Parent GFIDs from all
- the files from gfids_file. Convert parent GFID to path and Crawl
- each directories to get the list of files/dirs having same inode number.
- Do find with maxdepth as 1 and print the output in <INODE_NUM> <PATH>
- format, use this output to look into in memory dictionary of inode
- numbers got from the list of GFIDs
+ For all the DATA/METADATA modifications GFID,
+ If symlink, directly convert to Path using Readlink.
+ If not symlink, try to get PGFIDs via xattr query and populate it
+ to pgfid table, collect inodes in inodegfid table
"""
- with open(output_file, "a+") as fout:
- pgfids = set()
- inode_dict = {}
- with open(gfids_file) as f:
- for gfid in f:
- gfid = gfid.strip()
- p = os.path.join(brick,
- ".glusterfs",
- gfid[0:2],
- gfid[2:4],
- gfid)
- if os.path.islink(p):
- path = symlink_gfid_to_path(brick, gfid)
- output_write(fout, path, args.output_prefix)
- else:
- try:
- inode_dict[str(os.stat(p).st_ino)] = 1
- file_xattrs = xattr.list(p)
- num_parent_gfid = 0
- for x in file_xattrs:
- if x.startswith("trusted.pgfid."):
- num_parent_gfid += 1
- pgfids.add(x.split(".")[-1])
-
- if num_parent_gfid == 0:
- with open(outfile_failures, "a") as f:
- f.write("%s\n" % gfid)
- f.flush()
- os.fsync(f.fileno())
-
- except (IOError, OSError) as e:
- if e.errno == ENOENT:
- continue
- else:
- fail("%s Failed to convert to path from "
- "GFID %s: %s" % (brick, gfid, e),
- logger=logger)
-
- if not inode_dict:
- return
-
- def inode_filter(path):
+ for row in changelog_data.gfidpath_get({"path1": "", "type": "MODIFY"}):
+ gfid = row[3].strip()
+ p = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid)
+ if os.path.islink(p):
+ # It is a Directory if GFID backend path is symlink
+ path = symlink_gfid_to_path(brick, gfid)
+ if path == "":
+ continue
+
+ path = output_path_prepare(path, args.output_prefix)
+
+ changelog_data.gfidpath_update({"path1": path},
+ {"gfid": row[0]})
+ else:
try:
- st = os.lstat(path)
- except (OSError, IOError) as e:
- if e.errno == ENOENT:
- st = None
- else:
- raise
+ # INODE and GFID to inodegfid table
+ changelog_data.inodegfid_add(os.stat(p).st_ino, gfid)
+ file_xattrs = xattr.list(p)
+ for x in file_xattrs:
+ if x.startswith("trusted.pgfid."):
+ # PGFID in pgfid table
+ changelog_data.pgfid_add(x.split(".")[-1])
+ except (IOError, OSError):
+ # All OS Errors ignored, since failures will be logged
+ # in End. All GFIDs present in gfidpath table
+ continue
+
+
+def gfid_to_path_using_pgfid(brick, changelog_data, args):
+ """
+ For all the pgfids collected, Converts to Path and
+ does readdir on those directories and looks up inodegfid
+ table for matching inode number.
+ """
+ populate_pgfid_and_inodegfid(brick, changelog_data)
+
+ # If no GFIDs needs conversion to Path
+ if not changelog_data.inodegfid_exists({"converted": 0}):
+ return
+
+ def inode_filter(path):
+ # Looks in inodegfid table, if exists returns
+ # inode number else None
+ try:
+ st = os.lstat(path)
+ except (OSError, IOError):
+ st = None
+
+ if st and changelog_data.inodegfid_exists({"inode": st.st_ino}):
+ return st.st_ino
+
+ return None
+
+ # Length of brick path, to remove from output path
+ brick_path_len = len(brick)
+
+ def output_callback(path, inode):
+ # For each path found, encodes it and updates path1
+ # Also updates converted flag in inodegfid table as 1
+ path = path.strip()
+ path = path[brick_path_len+1:]
+
+ path = output_path_prepare(path, args.output_prefix)
- if st and inode_dict.get(str(st.st_ino), None):
- return True
+ changelog_data.append_path1(path, inode)
+ changelog_data.inodegfid_update({"converted": 1}, {"inode": inode})
- return False
+ ignore_dirs = [os.path.join(brick, dirname)
+ for dirname in
+ conf.get_opt("brick_ignore_dirs").split(",")]
- # Length of brick path, to remove from output path
- brick_path_len = len(brick)
+ for row in changelog_data.pgfid_get():
+ path = symlink_gfid_to_path(brick, row[0])
+ find(os.path.join(brick, path),
+ callback_func=output_callback,
+ filter_func=inode_filter,
+ ignore_dirs=ignore_dirs,
+ subdirs_crawl=False)
+
+
+def gfid_to_path_using_batchfind(brick, changelog_data):
+ # If all the GFIDs converted using gfid_to_path_using_pgfid
+ if not changelog_data.inodegfid_exists({"converted": 0}):
+ return
+
+ def inode_filter(path):
+ # Looks in inodegfid table, if exists returns
+ # inode number else None
+ try:
+ st = os.lstat(path)
+ except (OSError, IOError):
+ st = None
+
+ if st and changelog_data.inodegfid_exists({"inode": st.st_ino}):
+ return st.st_ino
- def output_callback(path):
- path = path.strip()
- path = path[brick_path_len+1:]
- output_write(fout, path, args.output_prefix)
+ return None
- ignore_dirs = [os.path.join(brick, dirname)
- for dirname in
- conf.get_opt("brick_ignore_dirs").split(",")]
+ # Length of brick path, to remove from output path
+ brick_path_len = len(brick)
- for pgfid in pgfids:
- path = symlink_gfid_to_path(brick, pgfid)
- find(os.path.join(brick, path),
- callback_func=output_callback,
- filter_func=inode_filter,
- ignore_dirs=ignore_dirs,
- subdirs_crawl=False)
+ def output_callback(path, inode):
+ # For each path found, encodes it and updates path1
+ # Also updates converted flag in inodegfid table as 1
+ path = path.strip()
+ path = path[brick_path_len+1:]
+ path = output_path_prepare(path, args.output_prefix)
- fout.flush()
- os.fsync(fout.fileno())
+ changelog_data.append_path1(path, inode)
+ ignore_dirs = [os.path.join(brick, dirname)
+ for dirname in
+ conf.get_opt("brick_ignore_dirs").split(",")]
-def sort_unique(filename):
- execute(["sort", "-u", "-o", filename, filename],
- exit_msg="Sort failed", logger=logger)
+ # Full Namespace Crawl
+ find(brick, callback_func=output_callback,
+ filter_func=inode_filter,
+ ignore_dirs=ignore_dirs)
+
+
+def parse_changelog_to_db(changelog_data, filename):
+ """
+ Parses a Changelog file and populates data in gfidpath table
+ """
+ with open(filename) as f:
+ changelogfile = os.path.basename(filename)
+ for line in f:
+ data = line.strip().split(" ")
+ if data[0] == "E" and data[2] in ["CREATE", "MKNOD", "MKDIR"]:
+ # CREATE/MKDIR/MKNOD
+ changelog_data.when_create_mknod_mkdir(changelogfile, data)
+ elif data[0] in ["D", "M"]:
+ # DATA/META
+ if not args.only_namespace_changes:
+ changelog_data.when_data_meta(changelogfile, data)
+ elif data[0] == "E" and data[2] in ["LINK", "SYMLINK"]:
+ # LINK/SYMLINK
+ changelog_data.when_link_symlink(changelogfile, data)
+ elif data[0] == "E" and data[2] == "RENAME":
+ # RENAME
+ changelog_data.when_rename(changelogfile, data)
+ elif data[0] == "E" and data[2] in ["UNLINK", "RMDIR"]:
+ # UNLINK/RMDIR
+ changelog_data.when_unlink_rmdir(changelogfile, data)
def get_changes(brick, hash_dir, log_file, start, end, args):
@@ -199,6 +242,18 @@ def get_changes(brick, hash_dir, log_file, start, end, args):
the modified gfids from the changelogs and writes the list
of gfid to 'gfid_list' file.
"""
+ session_dir = os.path.join(conf.get_opt("session_dir"),
+ args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+
+ # Get previous session
+ try:
+ with open(status_file) as f:
+ start = int(f.read().strip())
+ except (ValueError, OSError, IOError):
+ start = args.start
+
try:
libgfchangelog.cl_init()
libgfchangelog.cl_register(brick, hash_dir, log_file,
@@ -207,10 +262,7 @@ def get_changes(brick, hash_dir, log_file, start, end, args):
fail("%s Changelog register failed: %s" % (brick, e), logger=logger)
# Output files to record GFIDs and GFID to Path failure GFIDs
- gfid_list_path = args.outfile + ".gfids"
- gfid_list_failures_file = gfid_list_path + ".failures"
- create_file(gfid_list_path, exit_on_err=True, logger=logger)
- create_file(gfid_list_failures_file, exit_on_err=True, logger=logger)
+ changelog_data = ChangelogData(args.outfile)
# Changelogs path(Hard coded to BRICK/.glusterfs/changelogs
cl_path = os.path.join(brick, ".glusterfs/changelogs")
@@ -234,37 +286,31 @@ def get_changes(brick, hash_dir, log_file, start, end, args):
while libgfchangelog.cl_history_scan() > 0:
changes += libgfchangelog.cl_history_getchanges()
- if changes:
- with open(gfid_list_path, 'a+') as fgfid:
- for change in changes:
- # Ignore if last processed changelog comes
- # again in list
- if change.endswith(".%s" % start):
- continue
-
- with open(change) as f:
- for line in f:
- # Space delimited list, collect GFID
- details = line.split()
- fgfid.write("%s\n" % details[1])
-
- libgfchangelog.cl_history_done(change)
- fgfid.flush()
- os.fsync(fgfid.fileno())
+ for change in changes:
+ # Ignore if last processed changelog comes
+ # again in list
+ if change.endswith(".%s" % start):
+ continue
+ parse_changelog_to_db(changelog_data, change)
+ libgfchangelog.cl_history_done(change)
+
+ changelog_data.commit()
except libgfchangelog.ChangelogException as e:
fail("%s Error during Changelog Crawl: %s" % (brick, e),
logger=logger)
- # If TS returned from history_changelog is < end time
- # then FS crawl may be required, since history is only available
- # till TS returned from history_changelog
- if actual_end < end:
- fail("Partial History available with Changelog", 2, logger=logger)
+ # Convert all pgfid available from Changelogs
+ pgfid_to_path(brick, changelog_data)
+ changelog_data.commit()
+
+ # Convert all GFIDs for which no other additional details available
+ gfid_to_path_using_pgfid(brick, changelog_data, args)
+ changelog_data.commit()
- sort_unique(gfid_list_path)
- gfid_to_path_using_pgfid(brick, gfid_list_path,
- args.outfile, gfid_list_failures_file)
- gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile)
+ # If some GFIDs fail to get converted from previous step,
+ # convert using find
+ gfid_to_path_using_batchfind(brick, changelog_data)
+ changelog_data.commit()
return actual_end
@@ -283,8 +329,6 @@ def changelog_crawl(brick, start, end, args):
working_dir = os.path.join(working_dir, brickhash)
mkdirp(working_dir, exit_on_err=True, logger=logger)
- create_file(args.outfile, exit_on_err=True, logger=logger)
- create_file(args.outfile + ".gfids", exit_on_err=True, logger=logger)
log_file = os.path.join(conf.get_opt("log_dir"),
args.session,
@@ -308,6 +352,9 @@ def _get_args():
parser.add_argument("--debug", help="Debug", action="store_true")
parser.add_argument("--output-prefix", help="File prefix in output",
default=".")
+ parser.add_argument("-N", "--only-namespace-changes",
+ help="List only namespace changes",
+ action="store_true")
return parser.parse_args()
@@ -336,8 +383,13 @@ if __name__ == "__main__":
start = args.start
end = int(time.time()) - get_changelog_rollover_time(args.volume)
+ logger.info("%s Started Changelog Crawl - Start: %s End: %s" % (args.brick,
+ start,
+ end))
actual_end = changelog_crawl(args.brick, start, end, args)
with open(status_file_pre, "w", buffering=0) as f:
f.write(str(actual_end))
+ logger.info("%s Finished Changelog Crawl - End: %s" % (args.brick,
+ actual_end))
sys.exit(0)
diff --git a/tools/glusterfind/src/changelogdata.py b/tools/glusterfind/src/changelogdata.py
new file mode 100644
index 00000000000..c42aa2a2315
--- /dev/null
+++ b/tools/glusterfind/src/changelogdata.py
@@ -0,0 +1,412 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import sqlite3
+import urllib
+import os
+
+from utils import RecordType
+
+
+class OutputMerger(object):
+ """
+ Class to merge the output files collected from
+ different nodes
+ """
+ def __init__(self, db_path, all_dbs):
+ self.conn = sqlite3.connect(db_path)
+ self.cursor = self.conn.cursor()
+ self.cursor_reader = self.conn.cursor()
+ query = "DROP TABLE IF EXISTS finallist"
+ self.cursor.execute(query)
+
+ query = """
+ CREATE TABLE finallist(
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ ts VARCHAR,
+ type VARCHAR,
+ gfid VARCHAR,
+ path1 VARCHAR,
+ path2 VARCHAR,
+ UNIQUE (type, path1, path2) ON CONFLICT IGNORE
+ )
+ """
+ self.cursor.execute(query)
+
+ # If node database exists, read each db and insert into
+ # final table. Ignore if combination of TYPE PATH1 PATH2
+ # already exists
+ for node_db in all_dbs:
+ if os.path.exists(node_db):
+ conn = sqlite3.connect(node_db)
+ cursor = conn.cursor()
+ query = """
+ SELECT ts, type, gfid, path1, path2
+ FROM gfidpath
+ WHERE path1 != ''
+ ORDER BY id ASC
+ """
+ for row in cursor.execute(query):
+ self.add_if_not_exists(row[0], row[1], row[2],
+ row[3], row[4])
+
+ self.conn.commit()
+
+ def add_if_not_exists(self, ts, ty, gfid, path1, path2=""):
+ # Adds record to finallist only if not exists
+ query = """
+ INSERT INTO finallist(ts, type, gfid, path1, path2)
+ VALUES(?, ?, ?, ?, ?)
+ """
+ self.cursor.execute(query, (ts, ty, gfid, path1, path2))
+
+ def get(self):
+ query = """SELECT type, path1, path2 FROM finallist
+ ORDER BY ts ASC, id ASC"""
+ return self.cursor_reader.execute(query)
+
+ def get_failures(self):
+ query = """
+ SELECT gfid
+ FROM finallist
+ WHERE path1 = '' OR (path2 = '' AND type = 'RENAME')
+ """
+ return self.cursor_reader.execute(query)
+
+
+class ChangelogData(object):
+ def __init__(self, dbpath):
+ self.conn = sqlite3.connect(dbpath)
+ self.cursor = self.conn.cursor()
+ self.cursor_reader = self.conn.cursor()
+ self._create_table_gfidpath()
+ self._create_table_pgfid()
+ self._create_table_inodegfid()
+
+ def _create_table_gfidpath(self):
+ drop_table = "DROP TABLE IF EXISTS gfidpath"
+ self.cursor.execute(drop_table)
+
+ create_table = """
+ CREATE TABLE gfidpath(
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ ts VARCHAR,
+ type VARCHAR,
+ gfid VARCHAR(40),
+ pgfid1 VARCHAR(40),
+ bn1 VARCHAR(500),
+ pgfid2 VARCHAR(40),
+ bn2 VARCHAR(500),
+ path1 VARCHAR DEFAULT '',
+ path2 VARCHAR DEFAULT ''
+ )
+ """
+ self.cursor.execute(create_table)
+
+ def _create_table_inodegfid(self):
+ drop_table = "DROP TABLE IF EXISTS inodegfid"
+ self.cursor.execute(drop_table)
+
+ create_table = """
+ CREATE TABLE inodegfid(
+ inode INTEGER PRIMARY KEY,
+ gfid VARCHAR(40),
+ converted INTEGER DEFAULT 0,
+ UNIQUE (inode, gfid) ON CONFLICT IGNORE
+ )
+ """
+ self.cursor.execute(create_table)
+
+ def _create_table_pgfid(self):
+ drop_table = "DROP TABLE IF EXISTS pgfid"
+ self.cursor.execute(drop_table)
+
+ create_table = """
+ CREATE TABLE pgfid(
+ pgfid VARCHAR(40) PRIMARY KEY,
+ UNIQUE (pgfid) ON CONFLICT IGNORE
+ )
+ """
+ self.cursor.execute(create_table)
+
+ def _get(self, tablename, filters):
+ # SELECT * FROM <TABLENAME> WHERE <CONDITION>
+ params = []
+ query = "SELECT * FROM %s WHERE 1=1" % tablename
+
+ for key, value in filters.items():
+ query += " AND %s = ?" % key
+ params.append(value)
+
+ return self.cursor_reader.execute(query, params)
+
+ def _get_distinct(self, tablename, distinct_field, filters):
+ # SELECT DISTINCT <COL> FROM <TABLENAME> WHERE <CONDITION>
+ params = []
+ query = "SELECT DISTINCT %s FROM %s WHERE 1=1" % (distinct_field,
+ tablename)
+
+ for key, value in filters.items():
+ query += " AND %s = ?" % key
+ params.append(value)
+
+ return self.cursor_reader.execute(query, params)
+
+ def _delete(self, tablename, filters):
+ # DELETE FROM <TABLENAME> WHERE <CONDITIONS>
+ query = "DELETE FROM %s WHERE 1=1" % tablename
+ params = []
+
+ for key, value in filters.items():
+ query += " AND %s = ?" % key
+ params.append(value)
+
+ self.cursor.execute(query, params)
+
+ def _add(self, tablename, data):
+ # INSERT INTO <TABLENAME>(<col1>, <col2>..) VALUES(?,?..)
+ query = "INSERT INTO %s(" % tablename
+ fields = []
+ params = []
+ for key, value in data.items():
+ fields.append(key)
+ params.append(value)
+
+ values_substitute = len(fields)*["?"]
+ query += "%s) VALUES(%s)" % (",".join(fields),
+ ",".join(values_substitute))
+ self.cursor.execute(query, params)
+
+ def _update(self, tablename, data, filters):
+ # UPDATE <TABLENAME> SET col1 = ?,.. WHERE col1=? AND ..
+ params = []
+ update_fields = []
+ for key, value in data.items():
+ update_fields.append("%s = ?" % key)
+ params.append(value)
+
+ query = "UPDATE %s SET %s WHERE 1 = 1" % (tablename,
+ ", ".join(update_fields))
+
+ for key, value in filters.items():
+ query += " AND %s = ?" % key
+ params.append(value)
+
+ self.cursor.execute(query, params)
+
+ def _exists(self, tablename, filters):
+ if not filters:
+ return False
+
+ query = "SELECT COUNT(1) FROM %s WHERE 1=1" % tablename
+ params = []
+
+ for key, value in filters.items():
+ query += " AND %s = ?" % key
+ params.append(value)
+
+ self.cursor.execute(query, params)
+ row = self.cursor.fetchone()
+ return True if row[0] > 0 else False
+
+ def gfidpath_add(self, changelogfile, ty, gfid, pgfid1="", bn1="",
+ pgfid2="", bn2="", path1="", path2=""):
+ self._add("gfidpath", {
+ "ts": changelogfile.split(".")[-1],
+ "type": ty,
+ "gfid": gfid,
+ "pgfid1": pgfid1,
+ "bn1": bn1,
+ "pgfid2": pgfid2,
+ "bn2": bn2,
+ "path1": path1,
+ "path2": path2
+ })
+
+ def gfidpath_update(self, data, filters):
+ self._update("gfidpath", data, filters)
+
+ def gfidpath_delete(self, filters):
+ self._delete("gfidpath", filters)
+
+ def gfidpath_exists(self, filters):
+ return self._exists("gfidpath", filters)
+
+ def gfidpath_get(self, filters={}):
+ return self._get("gfidpath", filters)
+
+ def gfidpath_get_distinct(self, distinct_field, filters={}):
+ return self._get_distinct("gfidpath", distinct_field, filters)
+
+ def pgfid_add(self, pgfid):
+ self._add("pgfid", {
+ "pgfid": pgfid
+ })
+
+ def pgfid_update(self, data, filters):
+ self._update("pgfid", data, filters)
+
+ def pgfid_get(self, filters={}):
+ return self._get("pgfid", filters)
+
+ def pgfid_get_distinct(self, distinct_field, filters={}):
+ return self._get_distinct("pgfid", distinct_field, filters)
+
+ def pgfid_exists(self, filters):
+ return self._exists("pgfid", filters)
+
+ def inodegfid_add(self, inode, gfid, converted=0):
+ self._add("inodegfid", {
+ "inode": inode,
+ "gfid": gfid,
+ "converted": converted
+ })
+
+ def inodegfid_update(self, data, filters):
+ self._update("inodegfid", data, filters)
+
+ def inodegfid_get(self, filters={}):
+ return self._get("inodegfid", filters)
+
+ def inodegfid_get_distinct(self, distinct_field, filters={}):
+ return self._get_distinct("inodegfid", distinct_field, filters)
+
+ def inodegfid_exists(self, filters):
+ return self._exists("inodegfid", filters)
+
+ def append_path1(self, path, inode):
+ # || is for concatenate in SQL
+ query = """UPDATE gfidpath SET path1 = ',' || ?
+ WHERE gfid IN (SELECT gfid FROM inodegfid WHERE inode = ?)"""
+ self.cursor.execute(query, (path, inode))
+
+ def gfidpath_set_path1(self, path1, pgfid1):
+ # || is for concatenate in SQL
+ if path1 == "":
+ update_str1 = "? || bn1"
+ update_str2 = "? || bn2"
+ else:
+ update_str1 = "? || '%2F' || bn1"
+ update_str2 = "? || '%2F' || bn2"
+
+ query = """UPDATE gfidpath SET path1 = %s
+ WHERE pgfid1 = ?""" % update_str1
+ self.cursor.execute(query, (path1, pgfid1))
+
+ # Set Path2 if pgfid1 and pgfid2 are same
+ query = """UPDATE gfidpath SET path2 = %s
+ WHERE pgfid2 = ?""" % update_str2
+ self.cursor.execute(query, (path1, pgfid1))
+
+ def gfidpath_set_path2(self, path2, pgfid2):
+ # || is for concatenate in SQL
+ if path2 == "":
+ update_str = "? || bn2"
+ else:
+ update_str = "? || '%2F' || bn2"
+
+ query = """UPDATE gfidpath SET path2 = %s
+ WHERE pgfid2 = ?""" % update_str
+ self.cursor.execute(query, (path2, pgfid2))
+
+ def when_create_mknod_mkdir(self, changelogfile, data):
+ # E <GFID> <MKNOD|CREATE|MKDIR> <MODE> <USER> <GRP> <PGFID>/<BNAME>
+ # Add the Entry to DB
+ pgfid1, bn1 = urllib.unquote_plus(data[6]).split("/", 1)
+
+ # Quote again the basename
+ bn1 = urllib.quote_plus(bn1.strip())
+
+ self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1)
+
+ def when_rename(self, changelogfile, data):
+ # E <GFID> RENAME <OLD_PGFID>/<BNAME> <PGFID>/<BNAME>
+ pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1)
+ pgfid2, bn2 = urllib.unquote_plus(data[4]).split("/", 1)
+
+ # Quote again the basename
+ bn1 = urllib.quote_plus(bn1.strip())
+ bn2 = urllib.quote_plus(bn2.strip())
+
+ if self.gfidpath_exists({"gfid": data[1], "type": "NEW",
+ "pgfid1": pgfid1, "bn1": bn1}):
+ # If <OLD_PGFID>/<BNAME> is same as CREATE, Update
+ # <NEW_PGFID>/<BNAME> in NEW.
+ self.gfidpath_update({"pgfid1": pgfid2, "bn1": bn2},
+ {"gfid": data[1], "type": "NEW",
+ "pgfid1": pgfid1, "bn1": bn1})
+ elif self.gfidpath_exists({"gfid": data[1], "type": "RENAME",
+ "pgfid2": pgfid1, "bn2": bn1}):
+ # If <OLD_PGFID>/<BNAME> is same as <PGFID2>/<BN2>(may be previous
+ # RENAME) then UPDATE <NEW_PGFID>/<BNAME> as <PGFID2>/<BN2>
+ self.gfidpath_update({"pgfid2": pgfid2, "bn2": bn2},
+ {"gfid": data[1], "type": "RENAME",
+ "pgfid2": pgfid1, "bn2": bn1})
+ else:
+ # Else insert as RENAME
+ self.gfidpath_add(changelogfile, RecordType.RENAME, data[1],
+ pgfid1, bn1, pgfid2, bn2)
+
+ def when_link_symlink(self, changelogfile, data):
+ # E <GFID> <LINK|SYMLINK> <PGFID>/<BASENAME>
+ # Add as New record in Db as Type NEW
+ pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1)
+
+ # Quote again the basename
+ bn1 = urllib.quote_plus(bn1.strip())
+
+ self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1)
+
+ def when_data_meta(self, changelogfile, data):
+ # If GFID row exists, Ignore else Add to Db
+ if not self.gfidpath_exists({"gfid": data[1]}):
+ self.gfidpath_add(changelogfile, RecordType.MODIFY, data[1])
+
+ def when_unlink_rmdir(self, changelogfile, data):
+ # E <GFID> <UNLINK|RMDIR> <PGFID>/<BASENAME>
+ pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1)
+ # Quote again the basename
+ bn1 = urllib.quote_plus(bn1.strip())
+ deleted_path = data[4] if len(data) == 5 else ""
+
+ if self.gfidpath_exists({"gfid": data[1], "type": "NEW",
+ "pgfid1": pgfid1, "bn1": bn1}):
+ # If path exists in table as NEW with same GFID
+ # Delete that row
+ self.gfidpath_delete({"gfid": data[1], "type": "NEW",
+ "pgfid1": pgfid1, "bn1": bn1})
+ else:
+ # Else Record as DELETE
+ self.gfidpath_add(changelogfile, RecordType.DELETE, data[1],
+ pgfid1, bn1, path1=deleted_path)
+
+ # Update path1 as deleted_path if pgfid1 and bn1 is same as deleted
+ self.gfidpath_update({"path1": deleted_path}, {"gfid": data[1],
+ "pgfid1": pgfid1,
+ "bn1": bn1})
+
+ # Update path2 as deleted_path if pgfid2 and bn2 is same as deleted
+ self.gfidpath_update({"path2": deleted_path}, {
+ "type": RecordType.RENAME,
+ "gfid": data[1],
+ "pgfid2": pgfid1,
+ "bn2": bn1})
+
+ # If deleted directory is parent for somebody
+ query1 = """UPDATE gfidpath SET path1 = ? || '%2F' || bn1
+ WHERE pgfid1 = ? AND path1 != ''"""
+ self.cursor.execute(query1, (deleted_path, data[1]))
+
+ query1 = """UPDATE gfidpath SET path2 = ? || '%2F' || bn1
+ WHERE pgfid2 = ? AND path2 != ''"""
+ self.cursor.execute(query1, (deleted_path, data[1]))
+
+ def commit(self):
+ self.conn.commit()
diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py
index 089a3aec3c5..d9936eebde1 100644
--- a/tools/glusterfind/src/main.py
+++ b/tools/glusterfind/src/main.py
@@ -20,9 +20,9 @@ import shutil
from utils import execute, is_host_local, mkdirp, fail
from utils import setup_logger, human_time, handle_rm_error
-from utils import get_changelog_rollover_time, cache_output
+from utils import get_changelog_rollover_time, cache_output, create_file
import conf
-
+from changelogdata import OutputMerger
PROG_DESCRIPTION = """
GlusterFS Incremental API
@@ -235,6 +235,9 @@ def _get_args():
help="Regenerate outfile, discard the outfile "
"generated from last pre command",
action="store_true")
+ parser_pre.add_argument("-N", "--only-namespace-changes",
+ help="List only namespace changes",
+ action="store_true")
# post <SESSION> <VOLUME>
parser_post = subparsers.add_parser('post')
@@ -377,10 +380,29 @@ def mode_pre(session_dir, args):
run_cmd_nodes("pre", args, start=start)
# Merger
- cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile]
- execute(cmd,
- exit_msg="Failed to merge output files "
- "collected from nodes", logger=logger)
+ if args.full:
+ cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile]
+ execute(cmd,
+ exit_msg="Failed to merge output files "
+ "collected from nodes", logger=logger)
+ else:
+ # Read each Changelogs db and generate finaldb
+ create_file(args.outfile, exit_on_err=True, logger=logger)
+ outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles)
+
+ with open(args.outfile, "a") as f:
+ for row in outfilemerger.get():
+ # Multiple paths in case of Hardlinks
+ paths = row[1].split(",")
+ for p in paths:
+ if p == "":
+ continue
+ f.write("%s %s %s\n" % (row[0], p, row[2]))
+
+ try:
+ os.remove(args.outfile + ".db")
+ except (IOError, OSError):
+ pass
run_cmd_nodes("cleanup", args)
diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py
index aea9a9dc82d..cda5ea6378e 100644
--- a/tools/glusterfind/src/utils.py
+++ b/tools/glusterfind/src/utils.py
@@ -24,6 +24,13 @@ ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError
cache_data = {}
+class RecordType(object):
+ NEW = "NEW"
+ MODIFY = "MODIFY"
+ RENAME = "RENAME"
+ DELETE = "DELETE"
+
+
def cache_output(func):
def wrapper(*args, **kwargs):
global cache_data
@@ -46,8 +53,10 @@ def find(path, callback_func=lambda x: True, filter_func=lambda x: True,
if path in ignore_dirs:
return
- if filter_func(path):
- callback_func(path)
+ # Capture filter_func output and pass it to callback function
+ filter_result = filter_func(path)
+ if filter_result is not None:
+ callback_func(path, filter_result)
for p in os.listdir(path):
full_path = os.path.join(path, p)
@@ -56,11 +65,13 @@ def find(path, callback_func=lambda x: True, filter_func=lambda x: True,
if subdirs_crawl:
find(full_path, callback_func, filter_func, ignore_dirs)
else:
- if filter_func(full_path):
- callback_func(full_path)
+ filter_result = filter_func(full_path)
+ if filter_result is not None:
+ callback_func(full_path, filter_result)
else:
- if filter_func(full_path):
- callback_func(full_path)
+ filter_result = filter_func(full_path)
+ if filter_result is not None:
+ callback_func(full_path, filter_result)
def output_write(f, path, prefix=".", encode=False):
@@ -215,5 +226,3 @@ def get_changelog_rollover_time(volumename):
return int(tree.find('volGetopts/Value').text)
except ParseError:
return DEFAULT_CHANGELOG_INTERVAL
-
-