diff options
Diffstat (limited to 'tools/glusterfind/src/changelogdata.py')
-rw-r--r-- | tools/glusterfind/src/changelogdata.py | 412 |
1 files changed, 412 insertions, 0 deletions
diff --git a/tools/glusterfind/src/changelogdata.py b/tools/glusterfind/src/changelogdata.py new file mode 100644 index 00000000000..c42aa2a2315 --- /dev/null +++ b/tools/glusterfind/src/changelogdata.py @@ -0,0 +1,412 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sqlite3 +import urllib +import os + +from utils import RecordType + + +class OutputMerger(object): + """ + Class to merge the output files collected from + different nodes + """ + def __init__(self, db_path, all_dbs): + self.conn = sqlite3.connect(db_path) + self.cursor = self.conn.cursor() + self.cursor_reader = self.conn.cursor() + query = "DROP TABLE IF EXISTS finallist" + self.cursor.execute(query) + + query = """ + CREATE TABLE finallist( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts VARCHAR, + type VARCHAR, + gfid VARCHAR, + path1 VARCHAR, + path2 VARCHAR, + UNIQUE (type, path1, path2) ON CONFLICT IGNORE + ) + """ + self.cursor.execute(query) + + # If node database exists, read each db and insert into + # final table. Ignore if combination of TYPE PATH1 PATH2 + # already exists + for node_db in all_dbs: + if os.path.exists(node_db): + conn = sqlite3.connect(node_db) + cursor = conn.cursor() + query = """ + SELECT ts, type, gfid, path1, path2 + FROM gfidpath + WHERE path1 != '' + ORDER BY id ASC + """ + for row in cursor.execute(query): + self.add_if_not_exists(row[0], row[1], row[2], + row[3], row[4]) + + self.conn.commit() + + def add_if_not_exists(self, ts, ty, gfid, path1, path2=""): + # Adds record to finallist only if not exists + query = """ + INSERT INTO finallist(ts, type, gfid, path1, path2) + VALUES(?, ?, ?, ?, ?) + """ + self.cursor.execute(query, (ts, ty, gfid, path1, path2)) + + def get(self): + query = """SELECT type, path1, path2 FROM finallist + ORDER BY ts ASC, id ASC""" + return self.cursor_reader.execute(query) + + def get_failures(self): + query = """ + SELECT gfid + FROM finallist + WHERE path1 = '' OR (path2 = '' AND type = 'RENAME') + """ + return self.cursor_reader.execute(query) + + +class ChangelogData(object): + def __init__(self, dbpath): + self.conn = sqlite3.connect(dbpath) + self.cursor = self.conn.cursor() + self.cursor_reader = self.conn.cursor() + self._create_table_gfidpath() + self._create_table_pgfid() + self._create_table_inodegfid() + + def _create_table_gfidpath(self): + drop_table = "DROP TABLE IF EXISTS gfidpath" + self.cursor.execute(drop_table) + + create_table = """ + CREATE TABLE gfidpath( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts VARCHAR, + type VARCHAR, + gfid VARCHAR(40), + pgfid1 VARCHAR(40), + bn1 VARCHAR(500), + pgfid2 VARCHAR(40), + bn2 VARCHAR(500), + path1 VARCHAR DEFAULT '', + path2 VARCHAR DEFAULT '' + ) + """ + self.cursor.execute(create_table) + + def _create_table_inodegfid(self): + drop_table = "DROP TABLE IF EXISTS inodegfid" + self.cursor.execute(drop_table) + + create_table = """ + CREATE TABLE inodegfid( + inode INTEGER PRIMARY KEY, + gfid VARCHAR(40), + converted INTEGER DEFAULT 0, + UNIQUE (inode, gfid) ON CONFLICT IGNORE + ) + """ + self.cursor.execute(create_table) + + def _create_table_pgfid(self): + drop_table = "DROP TABLE IF EXISTS pgfid" + self.cursor.execute(drop_table) + + create_table = """ + CREATE TABLE pgfid( + pgfid VARCHAR(40) PRIMARY KEY, + UNIQUE (pgfid) ON CONFLICT IGNORE + ) + """ + self.cursor.execute(create_table) + + def _get(self, tablename, filters): + # SELECT * FROM <TABLENAME> WHERE <CONDITION> + params = [] + query = "SELECT * FROM %s WHERE 1=1" % tablename + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + return self.cursor_reader.execute(query, params) + + def _get_distinct(self, tablename, distinct_field, filters): + # SELECT DISTINCT <COL> FROM <TABLENAME> WHERE <CONDITION> + params = [] + query = "SELECT DISTINCT %s FROM %s WHERE 1=1" % (distinct_field, + tablename) + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + return self.cursor_reader.execute(query, params) + + def _delete(self, tablename, filters): + # DELETE FROM <TABLENAME> WHERE <CONDITIONS> + query = "DELETE FROM %s WHERE 1=1" % tablename + params = [] + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + self.cursor.execute(query, params) + + def _add(self, tablename, data): + # INSERT INTO <TABLENAME>(<col1>, <col2>..) VALUES(?,?..) + query = "INSERT INTO %s(" % tablename + fields = [] + params = [] + for key, value in data.items(): + fields.append(key) + params.append(value) + + values_substitute = len(fields)*["?"] + query += "%s) VALUES(%s)" % (",".join(fields), + ",".join(values_substitute)) + self.cursor.execute(query, params) + + def _update(self, tablename, data, filters): + # UPDATE <TABLENAME> SET col1 = ?,.. WHERE col1=? AND .. + params = [] + update_fields = [] + for key, value in data.items(): + update_fields.append("%s = ?" % key) + params.append(value) + + query = "UPDATE %s SET %s WHERE 1 = 1" % (tablename, + ", ".join(update_fields)) + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + self.cursor.execute(query, params) + + def _exists(self, tablename, filters): + if not filters: + return False + + query = "SELECT COUNT(1) FROM %s WHERE 1=1" % tablename + params = [] + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + self.cursor.execute(query, params) + row = self.cursor.fetchone() + return True if row[0] > 0 else False + + def gfidpath_add(self, changelogfile, ty, gfid, pgfid1="", bn1="", + pgfid2="", bn2="", path1="", path2=""): + self._add("gfidpath", { + "ts": changelogfile.split(".")[-1], + "type": ty, + "gfid": gfid, + "pgfid1": pgfid1, + "bn1": bn1, + "pgfid2": pgfid2, + "bn2": bn2, + "path1": path1, + "path2": path2 + }) + + def gfidpath_update(self, data, filters): + self._update("gfidpath", data, filters) + + def gfidpath_delete(self, filters): + self._delete("gfidpath", filters) + + def gfidpath_exists(self, filters): + return self._exists("gfidpath", filters) + + def gfidpath_get(self, filters={}): + return self._get("gfidpath", filters) + + def gfidpath_get_distinct(self, distinct_field, filters={}): + return self._get_distinct("gfidpath", distinct_field, filters) + + def pgfid_add(self, pgfid): + self._add("pgfid", { + "pgfid": pgfid + }) + + def pgfid_update(self, data, filters): + self._update("pgfid", data, filters) + + def pgfid_get(self, filters={}): + return self._get("pgfid", filters) + + def pgfid_get_distinct(self, distinct_field, filters={}): + return self._get_distinct("pgfid", distinct_field, filters) + + def pgfid_exists(self, filters): + return self._exists("pgfid", filters) + + def inodegfid_add(self, inode, gfid, converted=0): + self._add("inodegfid", { + "inode": inode, + "gfid": gfid, + "converted": converted + }) + + def inodegfid_update(self, data, filters): + self._update("inodegfid", data, filters) + + def inodegfid_get(self, filters={}): + return self._get("inodegfid", filters) + + def inodegfid_get_distinct(self, distinct_field, filters={}): + return self._get_distinct("inodegfid", distinct_field, filters) + + def inodegfid_exists(self, filters): + return self._exists("inodegfid", filters) + + def append_path1(self, path, inode): + # || is for concatenate in SQL + query = """UPDATE gfidpath SET path1 = ',' || ? + WHERE gfid IN (SELECT gfid FROM inodegfid WHERE inode = ?)""" + self.cursor.execute(query, (path, inode)) + + def gfidpath_set_path1(self, path1, pgfid1): + # || is for concatenate in SQL + if path1 == "": + update_str1 = "? || bn1" + update_str2 = "? || bn2" + else: + update_str1 = "? || '%2F' || bn1" + update_str2 = "? || '%2F' || bn2" + + query = """UPDATE gfidpath SET path1 = %s + WHERE pgfid1 = ?""" % update_str1 + self.cursor.execute(query, (path1, pgfid1)) + + # Set Path2 if pgfid1 and pgfid2 are same + query = """UPDATE gfidpath SET path2 = %s + WHERE pgfid2 = ?""" % update_str2 + self.cursor.execute(query, (path1, pgfid1)) + + def gfidpath_set_path2(self, path2, pgfid2): + # || is for concatenate in SQL + if path2 == "": + update_str = "? || bn2" + else: + update_str = "? || '%2F' || bn2" + + query = """UPDATE gfidpath SET path2 = %s + WHERE pgfid2 = ?""" % update_str + self.cursor.execute(query, (path2, pgfid2)) + + def when_create_mknod_mkdir(self, changelogfile, data): + # E <GFID> <MKNOD|CREATE|MKDIR> <MODE> <USER> <GRP> <PGFID>/<BNAME> + # Add the Entry to DB + pgfid1, bn1 = urllib.unquote_plus(data[6]).split("/", 1) + + # Quote again the basename + bn1 = urllib.quote_plus(bn1.strip()) + + self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1) + + def when_rename(self, changelogfile, data): + # E <GFID> RENAME <OLD_PGFID>/<BNAME> <PGFID>/<BNAME> + pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1) + pgfid2, bn2 = urllib.unquote_plus(data[4]).split("/", 1) + + # Quote again the basename + bn1 = urllib.quote_plus(bn1.strip()) + bn2 = urllib.quote_plus(bn2.strip()) + + if self.gfidpath_exists({"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}): + # If <OLD_PGFID>/<BNAME> is same as CREATE, Update + # <NEW_PGFID>/<BNAME> in NEW. + self.gfidpath_update({"pgfid1": pgfid2, "bn1": bn2}, + {"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}) + elif self.gfidpath_exists({"gfid": data[1], "type": "RENAME", + "pgfid2": pgfid1, "bn2": bn1}): + # If <OLD_PGFID>/<BNAME> is same as <PGFID2>/<BN2>(may be previous + # RENAME) then UPDATE <NEW_PGFID>/<BNAME> as <PGFID2>/<BN2> + self.gfidpath_update({"pgfid2": pgfid2, "bn2": bn2}, + {"gfid": data[1], "type": "RENAME", + "pgfid2": pgfid1, "bn2": bn1}) + else: + # Else insert as RENAME + self.gfidpath_add(changelogfile, RecordType.RENAME, data[1], + pgfid1, bn1, pgfid2, bn2) + + def when_link_symlink(self, changelogfile, data): + # E <GFID> <LINK|SYMLINK> <PGFID>/<BASENAME> + # Add as New record in Db as Type NEW + pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1) + + # Quote again the basename + bn1 = urllib.quote_plus(bn1.strip()) + + self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1) + + def when_data_meta(self, changelogfile, data): + # If GFID row exists, Ignore else Add to Db + if not self.gfidpath_exists({"gfid": data[1]}): + self.gfidpath_add(changelogfile, RecordType.MODIFY, data[1]) + + def when_unlink_rmdir(self, changelogfile, data): + # E <GFID> <UNLINK|RMDIR> <PGFID>/<BASENAME> + pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1) + # Quote again the basename + bn1 = urllib.quote_plus(bn1.strip()) + deleted_path = data[4] if len(data) == 5 else "" + + if self.gfidpath_exists({"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}): + # If path exists in table as NEW with same GFID + # Delete that row + self.gfidpath_delete({"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}) + else: + # Else Record as DELETE + self.gfidpath_add(changelogfile, RecordType.DELETE, data[1], + pgfid1, bn1, path1=deleted_path) + + # Update path1 as deleted_path if pgfid1 and bn1 is same as deleted + self.gfidpath_update({"path1": deleted_path}, {"gfid": data[1], + "pgfid1": pgfid1, + "bn1": bn1}) + + # Update path2 as deleted_path if pgfid2 and bn2 is same as deleted + self.gfidpath_update({"path2": deleted_path}, { + "type": RecordType.RENAME, + "gfid": data[1], + "pgfid2": pgfid1, + "bn2": bn1}) + + # If deleted directory is parent for somebody + query1 = """UPDATE gfidpath SET path1 = ? || '%2F' || bn1 + WHERE pgfid1 = ? AND path1 != ''""" + self.cursor.execute(query1, (deleted_path, data[1])) + + query1 = """UPDATE gfidpath SET path2 = ? || '%2F' || bn1 + WHERE pgfid2 = ? AND path2 != ''""" + self.cursor.execute(query1, (deleted_path, data[1])) + + def commit(self): + self.conn.commit() |