diff options
| -rw-r--r-- | tools/glusterfind/src/Makefile.am | 4 | ||||
| -rw-r--r-- | tools/glusterfind/src/brickfind.py | 2 | ||||
| -rw-r--r-- | tools/glusterfind/src/changelog.py | 394 | ||||
| -rw-r--r-- | tools/glusterfind/src/changelogdata.py | 412 | ||||
| -rw-r--r-- | tools/glusterfind/src/main.py | 34 | ||||
| -rw-r--r-- | tools/glusterfind/src/utils.py | 25 | 
6 files changed, 683 insertions, 188 deletions
diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am index 7b819828d97..541ff946c04 100644 --- a/tools/glusterfind/src/Makefile.am +++ b/tools/glusterfind/src/Makefile.am @@ -1,7 +1,7 @@  glusterfinddir = $(libexecdir)/glusterfs/glusterfind  glusterfind_PYTHON = conf.py utils.py __init__.py \ -	main.py libgfchangelog.py +	main.py libgfchangelog.py changelogdata.py  glusterfind_SCRIPTS = changelog.py nodeagent.py \  	brickfind.py @@ -9,6 +9,6 @@ glusterfind_SCRIPTS = changelog.py nodeagent.py \  glusterfind_DATA = tool.conf  EXTRA_DIST = changelog.py nodeagent.py brickfind.py \ -	tool.conf +	tool.conf changelogdata.py  CLEANFILES = diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py index 9758bef56ff..f300638d602 100644 --- a/tools/glusterfind/src/brickfind.py +++ b/tools/glusterfind/src/brickfind.py @@ -37,7 +37,7 @@ def brickfind_crawl(brick, args):      with open(args.outfile, "a+") as fout:          brick_path_len = len(brick) -        def output_callback(path): +        def output_callback(path, filter_result):              path = path.strip()              path = path[brick_path_len+1:]              output_write(fout, path, args.output_prefix, encode=True) diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py index 2c4ee9106e1..b5f71c7c0ee 100644 --- a/tools/glusterfind/src/changelog.py +++ b/tools/glusterfind/src/changelog.py @@ -12,16 +12,16 @@ import os  import sys  import time  import xattr -from errno import ENOENT  import logging  from argparse import ArgumentParser, RawDescriptionHelpFormatter  import hashlib  import urllib  import libgfchangelog -from utils import create_file, mkdirp, execute, symlink_gfid_to_path -from utils import fail, setup_logger, output_write, find +from utils import mkdirp, symlink_gfid_to_path +from utils import fail, setup_logger, find  from utils import get_changelog_rollover_time +from changelogdata import ChangelogData  import conf @@ -37,159 +37,202 @@ history_turn_time = 0  logger = logging.getLogger() -def gfid_to_path_using_batchfind(brick, gfids_file, output_file): +def output_path_prepare(path, output_prefix):      """ -    find -samefile gets the inode number and crawls entire namespace -    to get the list of files/dirs having same inode number. -    Do find without any option, except the ignore directory option, -    print the output in <INODE_NUM> <PATH> format, use this output -    to look into in-memory dictionary of inode numbers got from the -    list of GFIDs +    If Prefix is set, joins to Path, removes ending slash +    and encodes it.      """ -    with open(output_file, "a+") as fout: -        inode_dict = {} -        with open(gfids_file) as f: -            for gfid in f: -                gfid = gfid.strip() -                backend_path = os.path.join(brick, ".glusterfs", -                                            gfid[0:2], gfid[2:4], gfid) - -                try: -                    inode_dict[str(os.stat(backend_path).st_ino)] = 1 -                except (IOError, OSError) as e: -                    if e.errno == ENOENT: -                        continue -                    else: -                        fail("%s Failed to convert to path from " -                             "GFID %s: %s" % (brick, gfid, e), logger=logger) - -        if not inode_dict: -            return - -        def inode_filter(path): -            try: -                st = os.lstat(path) -            except (OSError, IOError) as e: -                if e.errno == ENOENT: -                    st = None -                else: -                    raise - -            if st and inode_dict.get(str(st.st_ino), None): -                return True - -            return False - -        brick_path_len = len(brick) - -        def output_callback(path): -            path = path.strip() -            path = path[brick_path_len+1:] -            output_write(fout, path, args.output_prefix) - -        ignore_dirs = [os.path.join(brick, dirname) -                       for dirname in -                       conf.get_opt("brick_ignore_dirs").split(",")] -        # Length of brick path, to remove from output path -        find(brick, callback_func=output_callback, -             filter_func=inode_filter, -             ignore_dirs=ignore_dirs) +    if output_prefix != ".": +        path = os.path.join(output_prefix, path) +        if path.endswith("/"): +            path = path[0:len(path)-1] + +    return urllib.quote_plus(path) + + +def pgfid_to_path(brick, changelog_data): +    """ +    For all the pgfids in table, converts into path using recursive +    readlink. +    """ +    # pgfid1 to path1 in case of CREATE/MKNOD/MKDIR/LINK/SYMLINK +    for row in changelog_data.gfidpath_get_distinct("pgfid1", {"path1": ""}): +        # In case of Data/Metadata only, pgfid1 will not be their +        if row[0] == "": +            continue + +        path = symlink_gfid_to_path(brick, row[0]) +        path = output_path_prepare(path, args.output_prefix) + +        changelog_data.gfidpath_set_path1(path, row[0]) + +    # pgfid2 to path2 in case of RENAME +    for row in changelog_data.gfidpath_get_distinct("pgfid2", +                                                    {"type": "RENAME", +                                                     "path2": ""}): +        # Only in case of Rename pgfid2 exists +        if row[0] == "": +            continue -        fout.flush() -        os.fsync(fout.fileno()) +        path = symlink_gfid_to_path(brick, row[0]) +        if path == "": +            continue +        path = output_path_prepare(path, args.output_prefix) +        changelog_data.gfidpath_set_path2(path, row[0]) -def gfid_to_path_using_pgfid(brick, gfids_file, output_file, outfile_failures): + +def populate_pgfid_and_inodegfid(brick, changelog_data):      """ -    Parent GFID is saved as xattr, collect Parent GFIDs from all -    the files from gfids_file. Convert parent GFID to path and Crawl -    each directories to get the list of files/dirs having same inode number. -    Do find with maxdepth as 1 and print the output in <INODE_NUM> <PATH> -    format, use this output to look into in memory dictionary of inode -    numbers got from the list of GFIDs +    For all the DATA/METADATA modifications GFID, +    If symlink, directly convert to Path using Readlink. +    If not symlink, try to get PGFIDs via xattr query and populate it +    to pgfid table, collect inodes in inodegfid table      """ -    with open(output_file, "a+") as fout: -        pgfids = set() -        inode_dict = {} -        with open(gfids_file) as f: -            for gfid in f: -                gfid = gfid.strip() -                p = os.path.join(brick, -                                 ".glusterfs", -                                 gfid[0:2], -                                 gfid[2:4], -                                 gfid) -                if os.path.islink(p): -                    path = symlink_gfid_to_path(brick, gfid) -                    output_write(fout, path, args.output_prefix) -                else: -                    try: -                        inode_dict[str(os.stat(p).st_ino)] = 1 -                        file_xattrs = xattr.list(p) -                        num_parent_gfid = 0 -                        for x in file_xattrs: -                            if x.startswith("trusted.pgfid."): -                                num_parent_gfid += 1 -                                pgfids.add(x.split(".")[-1]) - -                        if num_parent_gfid == 0: -                            with open(outfile_failures, "a") as f: -                                f.write("%s\n" % gfid) -                                f.flush() -                                os.fsync(f.fileno()) - -                    except (IOError, OSError) as e: -                        if e.errno == ENOENT: -                            continue -                        else: -                            fail("%s Failed to convert to path from " -                                 "GFID %s: %s" % (brick, gfid, e), -                                 logger=logger) - -        if not inode_dict: -            return - -        def inode_filter(path): +    for row in changelog_data.gfidpath_get({"path1": "", "type": "MODIFY"}): +        gfid = row[3].strip() +        p = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid) +        if os.path.islink(p): +            # It is a Directory if GFID backend path is symlink +            path = symlink_gfid_to_path(brick, gfid) +            if path == "": +                continue + +            path = output_path_prepare(path, args.output_prefix) + +            changelog_data.gfidpath_update({"path1": path}, +                                           {"gfid": row[0]}) +        else:              try: -                st = os.lstat(path) -            except (OSError, IOError) as e: -                if e.errno == ENOENT: -                    st = None -                else: -                    raise +                # INODE and GFID to inodegfid table +                changelog_data.inodegfid_add(os.stat(p).st_ino, gfid) +                file_xattrs = xattr.list(p) +                for x in file_xattrs: +                    if x.startswith("trusted.pgfid."): +                        # PGFID in pgfid table +                        changelog_data.pgfid_add(x.split(".")[-1]) +            except (IOError, OSError): +                # All OS Errors ignored, since failures will be logged +                # in End. All GFIDs present in gfidpath table +                continue + + +def gfid_to_path_using_pgfid(brick, changelog_data, args): +    """ +    For all the pgfids collected, Converts to Path and +    does readdir on those directories and looks up inodegfid +    table for matching inode number. +    """ +    populate_pgfid_and_inodegfid(brick, changelog_data) + +    # If no GFIDs needs conversion to Path +    if not changelog_data.inodegfid_exists({"converted": 0}): +        return + +    def inode_filter(path): +        # Looks in inodegfid table, if exists returns +        # inode number else None +        try: +            st = os.lstat(path) +        except (OSError, IOError): +            st = None + +        if st and changelog_data.inodegfid_exists({"inode": st.st_ino}): +            return st.st_ino + +        return None + +    # Length of brick path, to remove from output path +    brick_path_len = len(brick) + +    def output_callback(path, inode): +        # For each path found, encodes it and updates path1 +        # Also updates converted flag in inodegfid table as 1 +        path = path.strip() +        path = path[brick_path_len+1:] + +        path = output_path_prepare(path, args.output_prefix) -            if st and inode_dict.get(str(st.st_ino), None): -                return True +        changelog_data.append_path1(path, inode) +        changelog_data.inodegfid_update({"converted": 1}, {"inode": inode}) -            return False +    ignore_dirs = [os.path.join(brick, dirname) +                   for dirname in +                   conf.get_opt("brick_ignore_dirs").split(",")] -        # Length of brick path, to remove from output path -        brick_path_len = len(brick) +    for row in changelog_data.pgfid_get(): +        path = symlink_gfid_to_path(brick, row[0]) +        find(os.path.join(brick, path), +             callback_func=output_callback, +             filter_func=inode_filter, +             ignore_dirs=ignore_dirs, +             subdirs_crawl=False) + + +def gfid_to_path_using_batchfind(brick, changelog_data): +    # If all the GFIDs converted using gfid_to_path_using_pgfid +    if not changelog_data.inodegfid_exists({"converted": 0}): +        return + +    def inode_filter(path): +        # Looks in inodegfid table, if exists returns +        # inode number else None +        try: +            st = os.lstat(path) +        except (OSError, IOError): +            st = None + +        if st and changelog_data.inodegfid_exists({"inode": st.st_ino}): +            return st.st_ino -        def output_callback(path): -            path = path.strip() -            path = path[brick_path_len+1:] -            output_write(fout, path, args.output_prefix) +        return None -        ignore_dirs = [os.path.join(brick, dirname) -                       for dirname in -                       conf.get_opt("brick_ignore_dirs").split(",")] +    # Length of brick path, to remove from output path +    brick_path_len = len(brick) -        for pgfid in pgfids: -            path = symlink_gfid_to_path(brick, pgfid) -            find(os.path.join(brick, path), -                 callback_func=output_callback, -                 filter_func=inode_filter, -                 ignore_dirs=ignore_dirs, -                 subdirs_crawl=False) +    def output_callback(path, inode): +        # For each path found, encodes it and updates path1 +        # Also updates converted flag in inodegfid table as 1 +        path = path.strip() +        path = path[brick_path_len+1:] +        path = output_path_prepare(path, args.output_prefix) -        fout.flush() -        os.fsync(fout.fileno()) +        changelog_data.append_path1(path, inode) +    ignore_dirs = [os.path.join(brick, dirname) +                   for dirname in +                   conf.get_opt("brick_ignore_dirs").split(",")] -def sort_unique(filename): -    execute(["sort", "-u", "-o", filename, filename], -            exit_msg="Sort failed", logger=logger) +    # Full Namespace Crawl +    find(brick, callback_func=output_callback, +         filter_func=inode_filter, +         ignore_dirs=ignore_dirs) + + +def parse_changelog_to_db(changelog_data, filename): +    """ +    Parses a Changelog file and populates data in gfidpath table +    """ +    with open(filename) as f: +        changelogfile = os.path.basename(filename) +        for line in f: +            data = line.strip().split(" ") +            if data[0] == "E" and data[2] in ["CREATE", "MKNOD", "MKDIR"]: +                # CREATE/MKDIR/MKNOD +                changelog_data.when_create_mknod_mkdir(changelogfile, data) +            elif data[0] in ["D", "M"]: +                # DATA/META +                if not args.only_namespace_changes: +                    changelog_data.when_data_meta(changelogfile, data) +            elif data[0] == "E" and data[2] in ["LINK", "SYMLINK"]: +                # LINK/SYMLINK +                changelog_data.when_link_symlink(changelogfile, data) +            elif data[0] == "E" and data[2] == "RENAME": +                # RENAME +                changelog_data.when_rename(changelogfile, data) +            elif data[0] == "E" and data[2] in ["UNLINK", "RMDIR"]: +                # UNLINK/RMDIR +                changelog_data.when_unlink_rmdir(changelogfile, data)  def get_changes(brick, hash_dir, log_file, start, end, args): @@ -199,6 +242,18 @@ def get_changes(brick, hash_dir, log_file, start, end, args):      the modified gfids from the changelogs and writes the list      of gfid to 'gfid_list' file.      """ +    session_dir = os.path.join(conf.get_opt("session_dir"), +                               args.session) +    status_file = os.path.join(session_dir, args.volume, +                               "%s.status" % urllib.quote_plus(args.brick)) + +    # Get previous session +    try: +        with open(status_file) as f: +            start = int(f.read().strip()) +    except (ValueError, OSError, IOError): +        start = args.start +      try:          libgfchangelog.cl_init()          libgfchangelog.cl_register(brick, hash_dir, log_file, @@ -207,10 +262,7 @@ def get_changes(brick, hash_dir, log_file, start, end, args):          fail("%s Changelog register failed: %s" % (brick, e), logger=logger)      # Output files to record GFIDs and GFID to Path failure GFIDs -    gfid_list_path = args.outfile + ".gfids" -    gfid_list_failures_file = gfid_list_path + ".failures" -    create_file(gfid_list_path, exit_on_err=True, logger=logger) -    create_file(gfid_list_failures_file, exit_on_err=True, logger=logger) +    changelog_data = ChangelogData(args.outfile)      # Changelogs path(Hard coded to BRICK/.glusterfs/changelogs      cl_path = os.path.join(brick, ".glusterfs/changelogs") @@ -234,37 +286,31 @@ def get_changes(brick, hash_dir, log_file, start, end, args):          while libgfchangelog.cl_history_scan() > 0:              changes += libgfchangelog.cl_history_getchanges() -            if changes: -                with open(gfid_list_path, 'a+') as fgfid: -                    for change in changes: -                        # Ignore if last processed changelog comes -                        # again in list -                        if change.endswith(".%s" % start): -                            continue - -                        with open(change) as f: -                            for line in f: -                                # Space delimited list, collect GFID -                                details = line.split() -                                fgfid.write("%s\n" % details[1]) - -                        libgfchangelog.cl_history_done(change) -                    fgfid.flush() -                    os.fsync(fgfid.fileno()) +            for change in changes: +                # Ignore if last processed changelog comes +                # again in list +                if change.endswith(".%s" % start): +                    continue +                parse_changelog_to_db(changelog_data, change) +                libgfchangelog.cl_history_done(change) + +            changelog_data.commit()      except libgfchangelog.ChangelogException as e:          fail("%s Error during Changelog Crawl: %s" % (brick, e),               logger=logger) -    # If TS returned from history_changelog is < end time -    # then FS crawl may be required, since history is only available -    # till TS returned from history_changelog -    if actual_end < end: -        fail("Partial History available with Changelog", 2, logger=logger) +    # Convert all pgfid available from Changelogs +    pgfid_to_path(brick, changelog_data) +    changelog_data.commit() + +    # Convert all GFIDs for which no other additional details available +    gfid_to_path_using_pgfid(brick, changelog_data, args) +    changelog_data.commit() -    sort_unique(gfid_list_path) -    gfid_to_path_using_pgfid(brick, gfid_list_path, -                             args.outfile, gfid_list_failures_file) -    gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile) +    # If some GFIDs fail to get converted from previous step, +    # convert using find +    gfid_to_path_using_batchfind(brick, changelog_data) +    changelog_data.commit()      return actual_end @@ -283,8 +329,6 @@ def changelog_crawl(brick, start, end, args):      working_dir = os.path.join(working_dir, brickhash)      mkdirp(working_dir, exit_on_err=True, logger=logger) -    create_file(args.outfile, exit_on_err=True, logger=logger) -    create_file(args.outfile + ".gfids", exit_on_err=True, logger=logger)      log_file = os.path.join(conf.get_opt("log_dir"),                              args.session, @@ -308,6 +352,9 @@ def _get_args():      parser.add_argument("--debug", help="Debug", action="store_true")      parser.add_argument("--output-prefix", help="File prefix in output",                          default=".") +    parser.add_argument("-N", "--only-namespace-changes", +                        help="List only namespace changes", +                        action="store_true")      return parser.parse_args() @@ -336,8 +383,13 @@ if __name__ == "__main__":          start = args.start      end = int(time.time()) - get_changelog_rollover_time(args.volume) +    logger.info("%s Started Changelog Crawl - Start: %s End: %s" % (args.brick, +                                                                    start, +                                                                    end))      actual_end = changelog_crawl(args.brick, start, end, args)      with open(status_file_pre, "w", buffering=0) as f:          f.write(str(actual_end)) +    logger.info("%s Finished Changelog Crawl - End: %s" % (args.brick, +                                                           actual_end))      sys.exit(0) diff --git a/tools/glusterfind/src/changelogdata.py b/tools/glusterfind/src/changelogdata.py new file mode 100644 index 00000000000..c42aa2a2315 --- /dev/null +++ b/tools/glusterfind/src/changelogdata.py @@ -0,0 +1,412 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sqlite3 +import urllib +import os + +from utils import RecordType + + +class OutputMerger(object): +    """ +    Class to merge the output files collected from +    different nodes +    """ +    def __init__(self, db_path, all_dbs): +        self.conn = sqlite3.connect(db_path) +        self.cursor = self.conn.cursor() +        self.cursor_reader = self.conn.cursor() +        query = "DROP TABLE IF EXISTS finallist" +        self.cursor.execute(query) + +        query = """ +        CREATE TABLE finallist( +          id     INTEGER PRIMARY KEY AUTOINCREMENT, +          ts     VARCHAR, +          type   VARCHAR, +          gfid   VARCHAR, +          path1  VARCHAR, +          path2  VARCHAR, +          UNIQUE (type, path1, path2) ON CONFLICT IGNORE +        ) +        """ +        self.cursor.execute(query) + +        # If node database exists, read each db and insert into +        # final table. Ignore if combination of TYPE PATH1 PATH2 +        # already exists +        for node_db in all_dbs: +            if os.path.exists(node_db): +                conn = sqlite3.connect(node_db) +                cursor = conn.cursor() +                query = """ +                SELECT   ts, type, gfid, path1, path2 +                FROM     gfidpath +                WHERE    path1 != '' +                ORDER BY id ASC +                """ +                for row in cursor.execute(query): +                    self.add_if_not_exists(row[0], row[1], row[2], +                                           row[3], row[4]) + +        self.conn.commit() + +    def add_if_not_exists(self, ts, ty, gfid, path1, path2=""): +        # Adds record to finallist only if not exists +        query = """ +        INSERT INTO finallist(ts, type, gfid, path1, path2) +        VALUES(?, ?, ?, ?, ?) +        """ +        self.cursor.execute(query, (ts, ty, gfid, path1, path2)) + +    def get(self): +        query = """SELECT type, path1, path2 FROM finallist +        ORDER BY ts ASC, id ASC""" +        return self.cursor_reader.execute(query) + +    def get_failures(self): +        query = """ +        SELECT   gfid +        FROM     finallist +        WHERE path1 = '' OR (path2 = '' AND type = 'RENAME') +        """ +        return self.cursor_reader.execute(query) + + +class ChangelogData(object): +    def __init__(self, dbpath): +        self.conn = sqlite3.connect(dbpath) +        self.cursor = self.conn.cursor() +        self.cursor_reader = self.conn.cursor() +        self._create_table_gfidpath() +        self._create_table_pgfid() +        self._create_table_inodegfid() + +    def _create_table_gfidpath(self): +        drop_table = "DROP TABLE IF EXISTS gfidpath" +        self.cursor.execute(drop_table) + +        create_table = """ +        CREATE TABLE gfidpath( +            id     INTEGER PRIMARY KEY AUTOINCREMENT, +            ts     VARCHAR, +            type   VARCHAR, +            gfid   VARCHAR(40), +            pgfid1 VARCHAR(40), +            bn1    VARCHAR(500), +            pgfid2 VARCHAR(40), +            bn2    VARCHAR(500), +            path1  VARCHAR DEFAULT '', +            path2  VARCHAR DEFAULT '' +        ) +        """ +        self.cursor.execute(create_table) + +    def _create_table_inodegfid(self): +        drop_table = "DROP TABLE IF EXISTS inodegfid" +        self.cursor.execute(drop_table) + +        create_table = """ +        CREATE TABLE inodegfid( +            inode     INTEGER PRIMARY KEY, +            gfid      VARCHAR(40), +            converted INTEGER DEFAULT 0, +            UNIQUE    (inode, gfid) ON CONFLICT IGNORE +        ) +        """ +        self.cursor.execute(create_table) + +    def _create_table_pgfid(self): +        drop_table = "DROP TABLE IF EXISTS pgfid" +        self.cursor.execute(drop_table) + +        create_table = """ +        CREATE TABLE pgfid( +            pgfid  VARCHAR(40) PRIMARY KEY, +            UNIQUE (pgfid) ON CONFLICT IGNORE +        ) +        """ +        self.cursor.execute(create_table) + +    def _get(self, tablename, filters): +        # SELECT * FROM <TABLENAME> WHERE <CONDITION> +        params = [] +        query = "SELECT * FROM %s WHERE 1=1" % tablename + +        for key, value in filters.items(): +            query += " AND %s = ?" % key +            params.append(value) + +        return self.cursor_reader.execute(query, params) + +    def _get_distinct(self, tablename, distinct_field, filters): +        # SELECT DISTINCT <COL> FROM <TABLENAME> WHERE <CONDITION> +        params = [] +        query = "SELECT DISTINCT %s FROM %s WHERE 1=1" % (distinct_field, +                                                          tablename) + +        for key, value in filters.items(): +            query += " AND %s = ?" % key +            params.append(value) + +        return self.cursor_reader.execute(query, params) + +    def _delete(self, tablename, filters): +        # DELETE FROM <TABLENAME> WHERE <CONDITIONS> +        query = "DELETE FROM %s WHERE 1=1" % tablename +        params = [] + +        for key, value in filters.items(): +            query += " AND %s = ?" % key +            params.append(value) + +        self.cursor.execute(query, params) + +    def _add(self, tablename, data): +        # INSERT INTO <TABLENAME>(<col1>, <col2>..) VALUES(?,?..) +        query = "INSERT INTO %s(" % tablename +        fields = [] +        params = [] +        for key, value in data.items(): +            fields.append(key) +            params.append(value) + +        values_substitute = len(fields)*["?"] +        query += "%s) VALUES(%s)" % (",".join(fields), +                                     ",".join(values_substitute)) +        self.cursor.execute(query, params) + +    def _update(self, tablename, data, filters): +        # UPDATE <TABLENAME> SET col1 = ?,.. WHERE col1=? AND .. +        params = [] +        update_fields = [] +        for key, value in data.items(): +            update_fields.append("%s = ?" % key) +            params.append(value) + +        query = "UPDATE %s SET %s WHERE 1 = 1" % (tablename, +                                                  ", ".join(update_fields)) + +        for key, value in filters.items(): +            query += " AND %s = ?" % key +            params.append(value) + +        self.cursor.execute(query, params) + +    def _exists(self, tablename, filters): +        if not filters: +            return False + +        query = "SELECT COUNT(1) FROM %s WHERE 1=1" % tablename +        params = [] + +        for key, value in filters.items(): +            query += " AND %s = ?" % key +            params.append(value) + +        self.cursor.execute(query, params) +        row = self.cursor.fetchone() +        return True if row[0] > 0 else False + +    def gfidpath_add(self, changelogfile, ty, gfid, pgfid1="", bn1="", +                     pgfid2="", bn2="", path1="", path2=""): +        self._add("gfidpath", { +            "ts": changelogfile.split(".")[-1], +            "type": ty, +            "gfid": gfid, +            "pgfid1": pgfid1, +            "bn1": bn1, +            "pgfid2": pgfid2, +            "bn2": bn2, +            "path1": path1, +            "path2": path2 +        }) + +    def gfidpath_update(self, data, filters): +        self._update("gfidpath", data, filters) + +    def gfidpath_delete(self, filters): +        self._delete("gfidpath", filters) + +    def gfidpath_exists(self, filters): +        return self._exists("gfidpath", filters) + +    def gfidpath_get(self, filters={}): +        return self._get("gfidpath", filters) + +    def gfidpath_get_distinct(self, distinct_field, filters={}): +        return self._get_distinct("gfidpath", distinct_field, filters) + +    def pgfid_add(self, pgfid): +        self._add("pgfid", { +            "pgfid": pgfid +        }) + +    def pgfid_update(self, data, filters): +        self._update("pgfid", data, filters) + +    def pgfid_get(self, filters={}): +        return self._get("pgfid", filters) + +    def pgfid_get_distinct(self, distinct_field, filters={}): +        return self._get_distinct("pgfid", distinct_field, filters) + +    def pgfid_exists(self, filters): +        return self._exists("pgfid", filters) + +    def inodegfid_add(self, inode, gfid, converted=0): +        self._add("inodegfid", { +            "inode": inode, +            "gfid": gfid, +            "converted": converted +        }) + +    def inodegfid_update(self, data, filters): +        self._update("inodegfid", data, filters) + +    def inodegfid_get(self, filters={}): +        return self._get("inodegfid", filters) + +    def inodegfid_get_distinct(self, distinct_field, filters={}): +        return self._get_distinct("inodegfid", distinct_field, filters) + +    def inodegfid_exists(self, filters): +        return self._exists("inodegfid", filters) + +    def append_path1(self, path, inode): +        # || is for concatenate in SQL +        query = """UPDATE gfidpath SET path1 = ',' || ? +        WHERE gfid IN (SELECT gfid FROM inodegfid WHERE inode = ?)""" +        self.cursor.execute(query, (path, inode)) + +    def gfidpath_set_path1(self, path1, pgfid1): +        # || is for concatenate in SQL +        if path1 == "": +            update_str1 = "? || bn1" +            update_str2 = "? || bn2" +        else: +            update_str1 = "? || '%2F' || bn1" +            update_str2 = "? || '%2F' || bn2" + +        query = """UPDATE gfidpath SET path1 = %s +        WHERE pgfid1 = ?""" % update_str1 +        self.cursor.execute(query, (path1, pgfid1)) + +        # Set Path2 if pgfid1 and pgfid2 are same +        query = """UPDATE gfidpath SET path2 = %s +        WHERE pgfid2 = ?""" % update_str2 +        self.cursor.execute(query, (path1, pgfid1)) + +    def gfidpath_set_path2(self, path2, pgfid2): +        # || is for concatenate in SQL +        if path2 == "": +            update_str = "? || bn2" +        else: +            update_str = "? || '%2F' || bn2" + +        query = """UPDATE gfidpath SET path2 = %s +        WHERE pgfid2 = ?""" % update_str +        self.cursor.execute(query, (path2, pgfid2)) + +    def when_create_mknod_mkdir(self, changelogfile, data): +        # E <GFID> <MKNOD|CREATE|MKDIR> <MODE> <USER> <GRP> <PGFID>/<BNAME> +        # Add the Entry to DB +        pgfid1, bn1 = urllib.unquote_plus(data[6]).split("/", 1) + +        # Quote again the basename +        bn1 = urllib.quote_plus(bn1.strip()) + +        self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1) + +    def when_rename(self, changelogfile, data): +        # E <GFID> RENAME <OLD_PGFID>/<BNAME> <PGFID>/<BNAME> +        pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1) +        pgfid2, bn2 = urllib.unquote_plus(data[4]).split("/", 1) + +        # Quote again the basename +        bn1 = urllib.quote_plus(bn1.strip()) +        bn2 = urllib.quote_plus(bn2.strip()) + +        if self.gfidpath_exists({"gfid": data[1], "type": "NEW", +                                 "pgfid1": pgfid1, "bn1": bn1}): +            # If <OLD_PGFID>/<BNAME> is same as CREATE, Update +            # <NEW_PGFID>/<BNAME> in NEW. +            self.gfidpath_update({"pgfid1": pgfid2, "bn1": bn2}, +                                 {"gfid": data[1], "type": "NEW", +                                  "pgfid1": pgfid1, "bn1": bn1}) +        elif self.gfidpath_exists({"gfid": data[1], "type": "RENAME", +                                   "pgfid2": pgfid1, "bn2": bn1}): +            # If <OLD_PGFID>/<BNAME> is same as <PGFID2>/<BN2>(may be previous +            # RENAME) then UPDATE <NEW_PGFID>/<BNAME> as <PGFID2>/<BN2> +            self.gfidpath_update({"pgfid2": pgfid2, "bn2": bn2}, +                                 {"gfid": data[1], "type": "RENAME", +                                 "pgfid2": pgfid1, "bn2": bn1}) +        else: +            # Else insert as RENAME +            self.gfidpath_add(changelogfile, RecordType.RENAME, data[1], +                              pgfid1, bn1, pgfid2, bn2) + +    def when_link_symlink(self, changelogfile, data): +        # E <GFID> <LINK|SYMLINK> <PGFID>/<BASENAME> +        # Add as New record in Db as Type NEW +        pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1) + +        # Quote again the basename +        bn1 = urllib.quote_plus(bn1.strip()) + +        self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1) + +    def when_data_meta(self, changelogfile, data): +        # If GFID row exists, Ignore else Add to Db +        if not self.gfidpath_exists({"gfid": data[1]}): +            self.gfidpath_add(changelogfile, RecordType.MODIFY, data[1]) + +    def when_unlink_rmdir(self, changelogfile, data): +        # E <GFID> <UNLINK|RMDIR> <PGFID>/<BASENAME> +        pgfid1, bn1 = urllib.unquote_plus(data[3]).split("/", 1) +        # Quote again the basename +        bn1 = urllib.quote_plus(bn1.strip()) +        deleted_path = data[4] if len(data) == 5 else "" + +        if self.gfidpath_exists({"gfid": data[1], "type": "NEW", +                                 "pgfid1": pgfid1, "bn1": bn1}): +            # If path exists in table as NEW with same GFID +            # Delete that row +            self.gfidpath_delete({"gfid": data[1], "type": "NEW", +                                  "pgfid1": pgfid1, "bn1": bn1}) +        else: +            # Else Record as DELETE +            self.gfidpath_add(changelogfile, RecordType.DELETE, data[1], +                              pgfid1, bn1, path1=deleted_path) + +        # Update path1 as deleted_path if pgfid1 and bn1 is same as deleted +        self.gfidpath_update({"path1": deleted_path}, {"gfid": data[1], +                                                       "pgfid1": pgfid1, +                                                       "bn1": bn1}) + +        # Update path2 as deleted_path if pgfid2 and bn2 is same as deleted +        self.gfidpath_update({"path2": deleted_path}, { +            "type": RecordType.RENAME, +            "gfid": data[1], +            "pgfid2": pgfid1, +            "bn2": bn1}) + +        # If deleted directory is parent for somebody +        query1 = """UPDATE gfidpath SET path1 = ? || '%2F' || bn1 +        WHERE pgfid1 = ? AND path1 != ''""" +        self.cursor.execute(query1, (deleted_path, data[1])) + +        query1 = """UPDATE gfidpath SET path2 = ? || '%2F' || bn1 +        WHERE pgfid2 = ? AND path2 != ''""" +        self.cursor.execute(query1, (deleted_path, data[1])) + +    def commit(self): +        self.conn.commit() diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py index 089a3aec3c5..d9936eebde1 100644 --- a/tools/glusterfind/src/main.py +++ b/tools/glusterfind/src/main.py @@ -20,9 +20,9 @@ import shutil  from utils import execute, is_host_local, mkdirp, fail  from utils import setup_logger, human_time, handle_rm_error -from utils import get_changelog_rollover_time, cache_output +from utils import get_changelog_rollover_time, cache_output, create_file  import conf - +from changelogdata import OutputMerger  PROG_DESCRIPTION = """  GlusterFS Incremental API @@ -235,6 +235,9 @@ def _get_args():                              help="Regenerate outfile, discard the outfile "                              "generated from last pre command",                              action="store_true") +    parser_pre.add_argument("-N", "--only-namespace-changes", +                            help="List only namespace changes", +                            action="store_true")      # post <SESSION> <VOLUME>      parser_post = subparsers.add_parser('post') @@ -377,10 +380,29 @@ def mode_pre(session_dir, args):      run_cmd_nodes("pre", args, start=start)      # Merger -    cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] -    execute(cmd, -            exit_msg="Failed to merge output files " -            "collected from nodes", logger=logger) +    if args.full: +        cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] +        execute(cmd, +                exit_msg="Failed to merge output files " +                "collected from nodes", logger=logger) +    else: +        # Read each Changelogs db and generate finaldb +        create_file(args.outfile, exit_on_err=True, logger=logger) +        outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles) + +        with open(args.outfile, "a") as f: +            for row in outfilemerger.get(): +                # Multiple paths in case of Hardlinks +                paths = row[1].split(",") +                for p in paths: +                    if p == "": +                        continue +                    f.write("%s %s %s\n" % (row[0], p, row[2])) + +    try: +        os.remove(args.outfile + ".db") +    except (IOError, OSError): +        pass      run_cmd_nodes("cleanup", args) diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py index aea9a9dc82d..cda5ea6378e 100644 --- a/tools/glusterfind/src/utils.py +++ b/tools/glusterfind/src/utils.py @@ -24,6 +24,13 @@ ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError  cache_data = {} +class RecordType(object): +    NEW = "NEW" +    MODIFY = "MODIFY" +    RENAME = "RENAME" +    DELETE = "DELETE" + +  def cache_output(func):      def wrapper(*args, **kwargs):          global cache_data @@ -46,8 +53,10 @@ def find(path, callback_func=lambda x: True, filter_func=lambda x: True,      if path in ignore_dirs:          return -    if filter_func(path): -        callback_func(path) +    # Capture filter_func output and pass it to callback function +    filter_result = filter_func(path) +    if filter_result is not None: +        callback_func(path, filter_result)      for p in os.listdir(path):          full_path = os.path.join(path, p) @@ -56,11 +65,13 @@ def find(path, callback_func=lambda x: True, filter_func=lambda x: True,              if subdirs_crawl:                  find(full_path, callback_func, filter_func, ignore_dirs)              else: -                if filter_func(full_path): -                    callback_func(full_path) +                filter_result = filter_func(full_path) +                if filter_result is not None: +                    callback_func(full_path, filter_result)          else: -            if filter_func(full_path): -                callback_func(full_path) +            filter_result = filter_func(full_path) +            if filter_result is not None: +                callback_func(full_path, filter_result)  def output_write(f, path, prefix=".", encode=False): @@ -215,5 +226,3 @@ def get_changelog_rollover_time(volumename):          return int(tree.find('volGetopts/Value').text)      except ParseError:          return DEFAULT_CHANGELOG_INTERVAL - -  | 
