#!/usr/bin/env python
# Copyright (c) 2015 Red Hat, Inc.
# This file is part of GlusterFS.
#
# This file is licensed to you under your choice of the GNU Lesser
# General Public License, version 3 or any later version (LGPLv3 or
# later), or the GNU General Public License, version 2 (GPLv2), in all
# cases as published by the Free Software Foundation.
import sqlite3
import urllib
import os
from utils import RecordType
class OutputMerger(object):
"""
Class to merge the output files collected from
different nodes
"""
def __init__(self, db_path, all_dbs):
self.conn = sqlite3.connect(db_path)
self.cursor = self.conn.cursor()
self.cursor_reader = self.conn.cursor()
query = "DROP TABLE IF EXISTS finallist"
self.cursor.execute(query)
query = """
CREATE TABLE finallist(
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts VARCHAR,
type VARCHAR,
gfid VARCHAR,
path1 VARCHAR,
path2 VARCHAR,
UNIQUE (type, path1, path2) ON CONFLICT IGNORE
)
"""
self.cursor.execute(query)
# If node database exists, read each db and insert into
# final table. Ignore if combination of TYPE PATH1 PATH2
# already exists
for node_db in all_dbs:
if os.path.exists(node_db):
conn = sqlite3.connect(node_db)
cursor = conn.cursor()
query = """
SELECT ts, type, gfid, path1, path2
FROM gfidpath
WHERE path1 != ''
ORDER BY id ASC
"""
for row in cursor.execute(query):
self.add_if_not_exists(row[0], row[1], row[2],
row[3], row[4])
self.conn.commit()
def add_if_not_exists(self, ts, ty, gfid, path1, path2=""):
# Adds record to finallist only if not exists
query = """
INSERT INTO finallist(ts, type, gfid, path1, path2)
VALUES(?, ?, ?, ?, ?)
"""
self.cursor.execute(query, (ts, ty, gfid, path1, path2))
def get(self):
query = """SELECT type, path1, path2 FROM finallist
ORDER BY ts ASC, id ASC"""
return self.cursor_reader.execute(query)
def get_failures(self):
query = """
SELECT gfid
FROM finallist
WHERE path1 = '' OR (path2 = '' AND type = 'RENAME')
"""
return self.cursor_reader.execute(query)
class ChangelogData(object):
def __init__(self, dbpath):
self.conn = sqlite3.connect(dbpath)
self.cursor = self.conn.cursor()
self.cursor_reader = self.conn.cursor()
self._create_table_gfidpath()
self._create_table_pgfid()
self._create_table_inodegfid()
def _create_table_gfidpath(self):
drop_table = "DROP TABLE IF EXISTS gfidpath"
self.cursor.execute(drop_table)
create_table = """
CREATE TABLE gfidpath(
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts VARCHAR,
type VARCHAR,
gfid VARCHAR(40),
pgfid1 VARCHAR(40),
bn1 VARCHAR(500),
pgfid2 VARCHAR(40),
bn2 VARCHAR(500),
path1 VARCHAR DEFAULT '',
path2 VARCHAR DEFAULT ''
)
"""
self.cursor.execute(create_table)
def _create_table_inodegfid(self):
drop_table = "DROP TABLE IF EXISTS inodegfid"
self.cursor.execute(drop_table)
create_table = """
CREATE TABLE inodegfid(
inode INTEGER PRIMARY KEY,
gfid VARCHAR(40),
converted INTEGER DEFAULT 0,
UNIQUE (inode, gfid) ON CONFLICT IGNORE
)
"""
self.cursor.execute(create_table)
def _create_table_pgfid(self):
drop_table = "DROP TABLE IF EXISTS pgfid"
self.cursor.execute(drop_table)
create_table = """
CREATE TABLE pgfid(
pgfid VARCHAR(40) PRIMARY KEY,
UNIQUE (pgfid) ON CONFLICT IGNORE
)
"""
self.cursor.execute(create_table)
def _get(self, tablename, filters):
# SELECT * FROM WHERE
params = []
query = "SELECT * FROM %s WHERE 1=1" % tablename
for key, value in filters.items():
query += " AND %s = ?" % key
params.append(value)
return self.cursor_reader.execute(query, params)
def _get_distinct(self, tablename, distinct_field, filters):
# SELECT DISTINCT
FROM WHERE
params = []
query = "SELECT DISTINCT %s FROM %s WHERE 1=1" % (distinct_field,
tablename)
for key, value in filters.items():
query += " AND %s = ?" % key
params.append(value)
return self.cursor_reader.execute(query, params)
def _delete(self, tablename, filters):
# DELETE FROM WHERE
query = "DELETE FROM %s WHERE 1=1" % tablename
params = []
for key, value in filters.items():
query += " AND %s = ?" % key
params.append(value)
self.cursor.execute(query, params)
def _add(self, tablename, data):
# INSERT INTO (, ..) VALUES(?,?..)
query = "INSERT INTO %s(" % tablename
fields = []
params = []
for key, value in data.items():
fields.append(key)
params.append(value)
values_substitute = len(fields)*["?"]
query += "%s) VALUES(%s)" % (",".join(fields),
",".join(values_substitute))
self.cursor.execute(query, params)
def _update(self, tablename, data, filters):
# UPDATE SET col1 = ?,.. WHERE col1=? AND ..
params = []
update_fields = []
for key, value in data.items():
update_fields.append("%s = ?" % key)
params.append(value)
query = "UPDATE %s SET %s WHERE 1 = 1" % (tablename,
", ".join(update_fields))
for key, value in filters.items():
query += " AND %s = ?" % key
params.append(value)
self.cursor.execute(query, params)
def _exists(self, tablename, filters):
if not filters:
return False
query = "SELECT COUNT(1) FROM %s WHERE 1=1" % tablename
params = []
for key, value in filters.items():
query += " AND %s = ?" % key
params.append(value)
self.cursor.execute(query, params)
row = self.cursor.fetchone()
return True if row[0] > 0 else False
def gfidpath_add(self, changelogfile, ty, gfid, pgfid1="", bn1="",
pgfid2="", bn2="", path1="", path2=""):
self._add("gfidpath", {
"ts": changelogfile.split(".")[-1],
"type": ty,
"gfid": gfid,
"pgfid1": pgfid1,
"bn1": bn1,
"pgfid2": pgfid2,
"bn2": bn2,
"path1": path1,
"path2": path2
})
def gfidpath_update(self, data, filters):
self._update("gfidpath", data, filters)
def gfidpath_delete(self, filters):
self._delete("gfidpath", filters)
def gfidpath_exists(self, filters):
return self._exists("gfidpath", filters)
def gfidpath_get(self, filters={}):
return self._get("gfidpath", filters)
def gfidpath_get_distinct(self, distinct_field, filters={}):
return self._get_distinct("gfidpath", distinct_field, filters)
def pgfid_add(self, pgfid):
self._add("pgfid", {
"pgfid": pgfid
})
def pgfid_update(self, data, filters):
self._update("pgfid", data, filters)
def pgfid_get(self, filters={}):
return self._get("pgfid", filters)
def pgfid_get_distinct(self, distinct_field, filters={}):
return self._get_distinct("pgfid", distinct_field, filters)
def pgfid_exists(self, filters):
return self._exists("pgfid", filters)
def inodegfid_add(self, inode, gfid, converted=0):
self._add("inodegfid", {
"inode": inode,
"gfid": gfid,
"converted": converted
})
def inodegfid_update(self, data, filters):
self._update("inodegfid", data, filters)
def inodegfid_get(self, filters={}):
return self._get("inodegfid", filters)
def inodegfid_get_distinct(self, distinct_field, filters={}):
return self._get_distinct("inodegfid", distinct_field, filters)
def inodegfid_exists(self, filters):
return self._exists("inodegfid", filters)
def append_path1(self, path, inode):
# || is for concatenate in SQL
query = """UPDATE gfidpath SET path1 = ',' || ?
WHERE gfid IN (SELECT gfid FROM inodegfid WHERE inode = ?)"""
self.cursor.execute(query, (path, inode))
def gfidpath_set_path1(self, path1, pgfid1):
# || is for concatenate in SQL
if path1 == "":
update_str1 = "? || bn1"
update_str2 = "? || bn2"
else:
update_str1 = "? || '%2F' || bn1"
update_str2 = "? || '%2F' || bn2"
query = """UPDATE gfidpath SET path1 = %s
WHERE pgfid1 = ?""" % update_str1
self.cursor.execute(query, (path1, pgfid1))
# Set Path2 if pgfid1 and pgfid2 are same
query = """UPDATE gfidpath SET path2 = %s
WHERE pgfid2 = ?""" % update_str2
self.cursor.execute(query, (path1, pgfid1))
def gfidpath_set_path2(self, path2, pgfid2):
# || is for concatenate in SQL
if path2 == "":
update_str = "? || bn2"
else:
update_str = "? || '%2F' || bn2"
query = """UPDATE gfidpath SET path2 = %s
WHERE pgfid2 = ?""" % update_str
self.cursor.execute(query, (path2, pgfid2))
def when_create_mknod_mkdir(self, changelogfile, data):
# E /
# Add the Entry to DB
pgfid1, bn1 = urllib.unquote_plus(data[6]).split("/", 1)
# Quote again the basename
bn1 = urllib.quote_plus(bn1.strip())
self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1)
def when_rename(self, changelogfile, data):
# E RENAME //
pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1)
pgfid2, bn2 = urllib.unquote_plus(data[4]).split("/", 1)
# Quote again the basename
bn1 = urllib.quote_plus(bn1.strip())
bn2 = urllib.quote_plus(bn2.strip())
if self.gfidpath_exists({"gfid": data[1], "type": "NEW",
"pgfid1": pgfid1, "bn1": bn1}):
# If / is same as CREATE, Update
# / in NEW.
self.gfidpath_update({"pgfid1": pgfid2, "bn1": bn2},
{"gfid": data[1], "type": "NEW",
"pgfid1": pgfid1, "bn1": bn1})
elif self.gfidpath_exists({"gfid": data[1], "type": "RENAME",
"pgfid2": pgfid1, "bn2": bn1}):
# If / is same as /(may be previous
# RENAME) then UPDATE / as /
self.gfidpath_update({"pgfid2": pgfid2, "bn2": bn2},
{"gfid": data[1], "type": "RENAME",
"pgfid2": pgfid1, "bn2": bn1})
else:
# Else insert as RENAME
self.gfidpath_add(changelogfile, RecordType.RENAME, data[1],
pgfid1, bn1, pgfid2, bn2)
def when_link_symlink(self, changelogfile, data):
# E /
# Add as New record in Db as Type NEW
pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1)
# Quote again the basename
bn1 = urllib.quote_plus(bn1.strip())
self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1)
def when_data_meta(self, changelogfile, data):
# If GFID row exists, Ignore else Add to Db
if not self.gfidpath_exists({"gfid": data[1]}):
self.gfidpath_add(changelogfile, RecordType.MODIFY, data[1])
def when_unlink_rmdir(self, changelogfile, data):
# E /
pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1)
# Quote again the basename
bn1 = urllib.quote_plus(bn1.strip())
deleted_path = data[4] if len(data) == 5 else ""
if self.gfidpath_exists({"gfid": data[1], "type": "NEW",
"pgfid1": pgfid1, "bn1": bn1}):
# If path exists in table as NEW with same GFID
# Delete that row
self.gfidpath_delete({"gfid": data[1], "type": "NEW",
"pgfid1": pgfid1, "bn1": bn1})
else:
# Else Record as DELETE
self.gfidpath_add(changelogfile, RecordType.DELETE, data[1],
pgfid1, bn1, path1=deleted_path)
# Update path1 as deleted_path if pgfid1 and bn1 is same as deleted
self.gfidpath_update({"path1": deleted_path}, {"gfid": data[1],
"pgfid1": pgfid1,
"bn1": bn1})
# Update path2 as deleted_path if pgfid2 and bn2 is same as deleted
self.gfidpath_update({"path2": deleted_path}, {
"type": RecordType.RENAME,
"gfid": data[1],
"pgfid2": pgfid1,
"bn2": bn1})
# If deleted directory is parent for somebody
query1 = """UPDATE gfidpath SET path1 = ? || '%2F' || bn1
WHERE pgfid1 = ? AND path1 != ''"""
self.cursor.execute(query1, (deleted_path, data[1]))
query1 = """UPDATE gfidpath SET path2 = ? || '%2F' || bn1
WHERE pgfid2 = ? AND path2 != ''"""
self.cursor.execute(query1, (deleted_path, data[1]))
def commit(self):
self.conn.commit()