summaryrefslogtreecommitdiffstats
path: root/tools/glusterfind/src/changelogdata.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/glusterfind/src/changelogdata.py')
-rw-r--r--tools/glusterfind/src/changelogdata.py412
1 files changed, 412 insertions, 0 deletions
diff --git a/tools/glusterfind/src/changelogdata.py b/tools/glusterfind/src/changelogdata.py
new file mode 100644
index 00000000000..c42aa2a2315
--- /dev/null
+++ b/tools/glusterfind/src/changelogdata.py
@@ -0,0 +1,412 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import sqlite3
+import urllib
+import os
+
+from utils import RecordType
+
+
+class OutputMerger(object):
+ """
+ Class to merge the output files collected from
+ different nodes
+ """
+ def __init__(self, db_path, all_dbs):
+ self.conn = sqlite3.connect(db_path)
+ self.cursor = self.conn.cursor()
+ self.cursor_reader = self.conn.cursor()
+ query = "DROP TABLE IF EXISTS finallist"
+ self.cursor.execute(query)
+
+ query = """
+ CREATE TABLE finallist(
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ ts VARCHAR,
+ type VARCHAR,
+ gfid VARCHAR,
+ path1 VARCHAR,
+ path2 VARCHAR,
+ UNIQUE (type, path1, path2) ON CONFLICT IGNORE
+ )
+ """
+ self.cursor.execute(query)
+
+ # If node database exists, read each db and insert into
+ # final table. Ignore if combination of TYPE PATH1 PATH2
+ # already exists
+ for node_db in all_dbs:
+ if os.path.exists(node_db):
+ conn = sqlite3.connect(node_db)
+ cursor = conn.cursor()
+ query = """
+ SELECT ts, type, gfid, path1, path2
+ FROM gfidpath
+ WHERE path1 != ''
+ ORDER BY id ASC
+ """
+ for row in cursor.execute(query):
+ self.add_if_not_exists(row[0], row[1], row[2],
+ row[3], row[4])
+
+ self.conn.commit()
+
+ def add_if_not_exists(self, ts, ty, gfid, path1, path2=""):
+ # Adds record to finallist only if not exists
+ query = """
+ INSERT INTO finallist(ts, type, gfid, path1, path2)
+ VALUES(?, ?, ?, ?, ?)
+ """
+ self.cursor.execute(query, (ts, ty, gfid, path1, path2))
+
+ def get(self):
+ query = """SELECT type, path1, path2 FROM finallist
+ ORDER BY ts ASC, id ASC"""
+ return self.cursor_reader.execute(query)
+
+ def get_failures(self):
+ query = """
+ SELECT gfid
+ FROM finallist
+ WHERE path1 = '' OR (path2 = '' AND type = 'RENAME')
+ """
+ return self.cursor_reader.execute(query)
+
+
+class ChangelogData(object):
+ def __init__(self, dbpath):
+ self.conn = sqlite3.connect(dbpath)
+ self.cursor = self.conn.cursor()
+ self.cursor_reader = self.conn.cursor()
+ self._create_table_gfidpath()
+ self._create_table_pgfid()
+ self._create_table_inodegfid()
+
+ def _create_table_gfidpath(self):
+ drop_table = "DROP TABLE IF EXISTS gfidpath"
+ self.cursor.execute(drop_table)
+
+ create_table = """
+ CREATE TABLE gfidpath(
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ ts VARCHAR,
+ type VARCHAR,
+ gfid VARCHAR(40),
+ pgfid1 VARCHAR(40),
+ bn1 VARCHAR(500),
+ pgfid2 VARCHAR(40),
+ bn2 VARCHAR(500),
+ path1 VARCHAR DEFAULT '',
+ path2 VARCHAR DEFAULT ''
+ )
+ """
+ self.cursor.execute(create_table)
+
+ def _create_table_inodegfid(self):
+ drop_table = "DROP TABLE IF EXISTS inodegfid"
+ self.cursor.execute(drop_table)
+
+ create_table = """
+ CREATE TABLE inodegfid(
+ inode INTEGER PRIMARY KEY,
+ gfid VARCHAR(40),
+ converted INTEGER DEFAULT 0,
+ UNIQUE (inode, gfid) ON CONFLICT IGNORE
+ )
+ """
+ self.cursor.execute(create_table)
+
+ def _create_table_pgfid(self):
+ drop_table = "DROP TABLE IF EXISTS pgfid"
+ self.cursor.execute(drop_table)
+
+ create_table = """
+ CREATE TABLE pgfid(
+ pgfid VARCHAR(40) PRIMARY KEY,
+ UNIQUE (pgfid) ON CONFLICT IGNORE
+ )
+ """
+ self.cursor.execute(create_table)
+
+ def _get(self, tablename, filters):
+ # SELECT * FROM <TABLENAME> WHERE <CONDITION>
+ params = []
+ query = "SELECT * FROM %s WHERE 1=1" % tablename
+
+ for key, value in filters.items():
+ query += " AND %s = ?" % key
+ params.append(value)
+
+ return self.cursor_reader.execute(query, params)
+
+ def _get_distinct(self, tablename, distinct_field, filters):
+ # SELECT DISTINCT <COL> FROM <TABLENAME> WHERE <CONDITION>
+ params = []
+ query = "SELECT DISTINCT %s FROM %s WHERE 1=1" % (distinct_field,
+ tablename)
+
+ for key, value in filters.items():
+ query += " AND %s = ?" % key
+ params.append(value)
+
+ return self.cursor_reader.execute(query, params)
+
+ def _delete(self, tablename, filters):
+ # DELETE FROM <TABLENAME> WHERE <CONDITIONS>
+ query = "DELETE FROM %s WHERE 1=1" % tablename
+ params = []
+
+ for key, value in filters.items():
+ query += " AND %s = ?" % key
+ params.append(value)
+
+ self.cursor.execute(query, params)
+
+ def _add(self, tablename, data):
+ # INSERT INTO <TABLENAME>(<col1>, <col2>..) VALUES(?,?..)
+ query = "INSERT INTO %s(" % tablename
+ fields = []
+ params = []
+ for key, value in data.items():
+ fields.append(key)
+ params.append(value)
+
+ values_substitute = len(fields)*["?"]
+ query += "%s) VALUES(%s)" % (",".join(fields),
+ ",".join(values_substitute))
+ self.cursor.execute(query, params)
+
+ def _update(self, tablename, data, filters):
+ # UPDATE <TABLENAME> SET col1 = ?,.. WHERE col1=? AND ..
+ params = []
+ update_fields = []
+ for key, value in data.items():
+ update_fields.append("%s = ?" % key)
+ params.append(value)
+
+ query = "UPDATE %s SET %s WHERE 1 = 1" % (tablename,
+ ", ".join(update_fields))
+
+ for key, value in filters.items():
+ query += " AND %s = ?" % key
+ params.append(value)
+
+ self.cursor.execute(query, params)
+
+ def _exists(self, tablename, filters):
+ if not filters:
+ return False
+
+ query = "SELECT COUNT(1) FROM %s WHERE 1=1" % tablename
+ params = []
+
+ for key, value in filters.items():
+ query += " AND %s = ?" % key
+ params.append(value)
+
+ self.cursor.execute(query, params)
+ row = self.cursor.fetchone()
+ return True if row[0] > 0 else False
+
+ def gfidpath_add(self, changelogfile, ty, gfid, pgfid1="", bn1="",
+ pgfid2="", bn2="", path1="", path2=""):
+ self._add("gfidpath", {
+ "ts": changelogfile.split(".")[-1],
+ "type": ty,
+ "gfid": gfid,
+ "pgfid1": pgfid1,
+ "bn1": bn1,
+ "pgfid2": pgfid2,
+ "bn2": bn2,
+ "path1": path1,
+ "path2": path2
+ })
+
+ def gfidpath_update(self, data, filters):
+ self._update("gfidpath", data, filters)
+
+ def gfidpath_delete(self, filters):
+ self._delete("gfidpath", filters)
+
+ def gfidpath_exists(self, filters):
+ return self._exists("gfidpath", filters)
+
+ def gfidpath_get(self, filters={}):
+ return self._get("gfidpath", filters)
+
+ def gfidpath_get_distinct(self, distinct_field, filters={}):
+ return self._get_distinct("gfidpath", distinct_field, filters)
+
+ def pgfid_add(self, pgfid):
+ self._add("pgfid", {
+ "pgfid": pgfid
+ })
+
+ def pgfid_update(self, data, filters):
+ self._update("pgfid", data, filters)
+
+ def pgfid_get(self, filters={}):
+ return self._get("pgfid", filters)
+
+ def pgfid_get_distinct(self, distinct_field, filters={}):
+ return self._get_distinct("pgfid", distinct_field, filters)
+
+ def pgfid_exists(self, filters):
+ return self._exists("pgfid", filters)
+
+ def inodegfid_add(self, inode, gfid, converted=0):
+ self._add("inodegfid", {
+ "inode": inode,
+ "gfid": gfid,
+ "converted": converted
+ })
+
+ def inodegfid_update(self, data, filters):
+ self._update("inodegfid", data, filters)
+
+ def inodegfid_get(self, filters={}):
+ return self._get("inodegfid", filters)
+
+ def inodegfid_get_distinct(self, distinct_field, filters={}):
+ return self._get_distinct("inodegfid", distinct_field, filters)
+
+ def inodegfid_exists(self, filters):
+ return self._exists("inodegfid", filters)
+
+ def append_path1(self, path, inode):
+ # || is for concatenate in SQL
+ query = """UPDATE gfidpath SET path1 = ',' || ?
+ WHERE gfid IN (SELECT gfid FROM inodegfid WHERE inode = ?)"""
+ self.cursor.execute(query, (path, inode))
+
+ def gfidpath_set_path1(self, path1, pgfid1):
+ # || is for concatenate in SQL
+ if path1 == "":
+ update_str1 = "? || bn1"
+ update_str2 = "? || bn2"
+ else:
+ update_str1 = "? || '%2F' || bn1"
+ update_str2 = "? || '%2F' || bn2"
+
+ query = """UPDATE gfidpath SET path1 = %s
+ WHERE pgfid1 = ?""" % update_str1
+ self.cursor.execute(query, (path1, pgfid1))
+
+ # Set Path2 if pgfid1 and pgfid2 are same
+ query = """UPDATE gfidpath SET path2 = %s
+ WHERE pgfid2 = ?""" % update_str2
+ self.cursor.execute(query, (path1, pgfid1))
+
+ def gfidpath_set_path2(self, path2, pgfid2):
+ # || is for concatenate in SQL
+ if path2 == "":
+ update_str = "? || bn2"
+ else:
+ update_str = "? || '%2F' || bn2"
+
+ query = """UPDATE gfidpath SET path2 = %s
+ WHERE pgfid2 = ?""" % update_str
+ self.cursor.execute(query, (path2, pgfid2))
+
+ def when_create_mknod_mkdir(self, changelogfile, data):
+ # E <GFID> <MKNOD|CREATE|MKDIR> <MODE> <USER> <GRP> <PGFID>/<BNAME>
+ # Add the Entry to DB
+ pgfid1, bn1 = urllib.unquote_plus(data[6]).split("/", 1)
+
+ # Quote again the basename
+ bn1 = urllib.quote_plus(bn1.strip())
+
+ self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1)
+
+ def when_rename(self, changelogfile, data):
+ # E <GFID> RENAME <OLD_PGFID>/<BNAME> <PGFID>/<BNAME>
+ pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1)
+ pgfid2, bn2 = urllib.unquote_plus(data[4]).split("/", 1)
+
+ # Quote again the basename
+ bn1 = urllib.quote_plus(bn1.strip())
+ bn2 = urllib.quote_plus(bn2.strip())
+
+ if self.gfidpath_exists({"gfid": data[1], "type": "NEW",
+ "pgfid1": pgfid1, "bn1": bn1}):
+ # If <OLD_PGFID>/<BNAME> is same as CREATE, Update
+ # <NEW_PGFID>/<BNAME> in NEW.
+ self.gfidpath_update({"pgfid1": pgfid2, "bn1": bn2},
+ {"gfid": data[1], "type": "NEW",
+ "pgfid1": pgfid1, "bn1": bn1})
+ elif self.gfidpath_exists({"gfid": data[1], "type": "RENAME",
+ "pgfid2": pgfid1, "bn2": bn1}):
+ # If <OLD_PGFID>/<BNAME> is same as <PGFID2>/<BN2>(may be previous
+ # RENAME) then UPDATE <NEW_PGFID>/<BNAME> as <PGFID2>/<BN2>
+ self.gfidpath_update({"pgfid2": pgfid2, "bn2": bn2},
+ {"gfid": data[1], "type": "RENAME",
+ "pgfid2": pgfid1, "bn2": bn1})
+ else:
+ # Else insert as RENAME
+ self.gfidpath_add(changelogfile, RecordType.RENAME, data[1],
+ pgfid1, bn1, pgfid2, bn2)
+
+ def when_link_symlink(self, changelogfile, data):
+ # E <GFID> <LINK|SYMLINK> <PGFID>/<BASENAME>
+ # Add as New record in Db as Type NEW
+ pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1)
+
+ # Quote again the basename
+ bn1 = urllib.quote_plus(bn1.strip())
+
+ self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1)
+
+ def when_data_meta(self, changelogfile, data):
+ # If GFID row exists, Ignore else Add to Db
+ if not self.gfidpath_exists({"gfid": data[1]}):
+ self.gfidpath_add(changelogfile, RecordType.MODIFY, data[1])
+
+ def when_unlink_rmdir(self, changelogfile, data):
+ # E <GFID> <UNLINK|RMDIR> <PGFID>/<BASENAME>
+ pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1)
+ # Quote again the basename
+ bn1 = urllib.quote_plus(bn1.strip())
+ deleted_path = data[4] if len(data) == 5 else ""
+
+ if self.gfidpath_exists({"gfid": data[1], "type": "NEW",
+ "pgfid1": pgfid1, "bn1": bn1}):
+ # If path exists in table as NEW with same GFID
+ # Delete that row
+ self.gfidpath_delete({"gfid": data[1], "type": "NEW",
+ "pgfid1": pgfid1, "bn1": bn1})
+ else:
+ # Else Record as DELETE
+ self.gfidpath_add(changelogfile, RecordType.DELETE, data[1],
+ pgfid1, bn1, path1=deleted_path)
+
+ # Update path1 as deleted_path if pgfid1 and bn1 is same as deleted
+ self.gfidpath_update({"path1": deleted_path}, {"gfid": data[1],
+ "pgfid1": pgfid1,
+ "bn1": bn1})
+
+ # Update path2 as deleted_path if pgfid2 and bn2 is same as deleted
+ self.gfidpath_update({"path2": deleted_path}, {
+ "type": RecordType.RENAME,
+ "gfid": data[1],
+ "pgfid2": pgfid1,
+ "bn2": bn1})
+
+ # If deleted directory is parent for somebody
+ query1 = """UPDATE gfidpath SET path1 = ? || '%2F' || bn1
+ WHERE pgfid1 = ? AND path1 != ''"""
+ self.cursor.execute(query1, (deleted_path, data[1]))
+
+ query1 = """UPDATE gfidpath SET path2 = ? || '%2F' || bn1
+ WHERE pgfid2 = ? AND path2 != ''"""
+ self.cursor.execute(query1, (deleted_path, data[1]))
+
+ def commit(self):
+ self.conn.commit()