diff options
author | Aravinda VK <avishwan@redhat.com> | 2015-02-18 19:07:23 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2015-03-18 02:54:25 -0700 |
commit | 7e98a0e0b1e346f89047b57e495f66f1b4223997 (patch) | |
tree | 0524dda8ad7066acab8d50537fe4f6e08f19fd29 /tools/glusterfind | |
parent | c9b1aea9b3f7087336c75548b59574f251813136 (diff) |
feature/glusterfind: A tool to find incremental changes
Documentation is available in patch:
http://review.gluster.org/#/c/9800/
A tool which helps to get list of modified files or list of all files in
GlusterFS Volume using Changelog or find command.
Usage
=====
glusterfind --help
Create:
-------
glusterfind create --help
The tool creates status file $GLUSTERD_WORKDIR/SESSION/VOLUME/status
and records current timestamp to initiate the session. This timestamp
will be used as start time for next runs.
As part of create also generates ssh key and distributes to all peers.
and enables build.pgfid and changelog using volume set command.
Pre:
----
glusterfind pre --help
This command is used to generate the list of files modified after session
creation time or after last run. To get list of all files/dirs in Volume,
run pre command with `--full` argument.
The tool gets all nodes details using gluster volume info and runs node
agent for each brick in respective nodes via ssh command. Once these node
agents generate the output file, tool copies to local using scp. Merges all
the output files to generate the final output file.
Post:
-----
glusterfind post --help
After consuming the list, this sub command is called to update the session
time based on pre command status file.
List:
-----
glusterfind list --help
To view all the sessions
Delete:
-------
glusterfind delete --help
Delete session.
Known Issues
------------
1. Deleted files will not get listed, since we can't convert GFID to
Path if file/dir is deleted.
2. Only new name will get listed if Renamed.
3. All hardlinks will get listed.
Change-Id: I82991feb0aea85cb6ec035fddbf80a2b276e86b0
BUG: 1193893
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/9682
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Venky Shankar <vshankar@redhat.com>
Reviewed-by: Prashanth Pai <ppai@redhat.com>
Reviewed-by: Kotresh HR <khiremat@redhat.com>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'tools/glusterfind')
-rw-r--r-- | tools/glusterfind/Makefile.am | 7 | ||||
-rw-r--r-- | tools/glusterfind/glusterfind.in | 17 | ||||
-rw-r--r-- | tools/glusterfind/src/Makefile.am | 14 | ||||
-rw-r--r-- | tools/glusterfind/src/__init__.py | 9 | ||||
-rw-r--r-- | tools/glusterfind/src/brickfind.py | 97 | ||||
-rw-r--r-- | tools/glusterfind/src/changelog.py | 309 | ||||
-rw-r--r-- | tools/glusterfind/src/conf.py | 28 | ||||
-rw-r--r-- | tools/glusterfind/src/libgfchangelog.py | 83 | ||||
-rw-r--r-- | tools/glusterfind/src/main.py | 468 | ||||
-rw-r--r-- | tools/glusterfind/src/nodecleanup.py | 51 | ||||
-rw-r--r-- | tools/glusterfind/src/tool.conf.in | 11 | ||||
-rw-r--r-- | tools/glusterfind/src/utils.py | 203 |
12 files changed, 1297 insertions, 0 deletions
diff --git a/tools/glusterfind/Makefile.am b/tools/glusterfind/Makefile.am new file mode 100644 index 00000000000..c99a3ddcb37 --- /dev/null +++ b/tools/glusterfind/Makefile.am @@ -0,0 +1,7 @@ +SUBDIRS = src + +EXTRA_DIST = + +bin_SCRIPTS = glusterfind + +CLEANFILES = $(bin_SCRIPTS) diff --git a/tools/glusterfind/glusterfind.in b/tools/glusterfind/glusterfind.in new file mode 100644 index 00000000000..cff8973980a --- /dev/null +++ b/tools/glusterfind/glusterfind.in @@ -0,0 +1,17 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sys +sys.path.insert(1, '@GLUSTERFS_LIBEXECDIR@/') + +from glusterfind.main import main + +if __name__ == "__main__": + main() diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am new file mode 100644 index 00000000000..458b820fd19 --- /dev/null +++ b/tools/glusterfind/src/Makefile.am @@ -0,0 +1,14 @@ +glusterfinddir = $(libexecdir)/glusterfs/glusterfind + +glusterfind_PYTHON = conf.py utils.py __init__.py \ + main.py libgfchangelog.py + +glusterfind_SCRIPTS = changelog.py nodecleanup.py \ + brickfind.py + +glusterfind_DATA = tool.conf + +EXTRA_DIST = changelog.py nodecleanup.py brickfind.py \ + tool.conf + +CLEANFILES = diff --git a/tools/glusterfind/src/__init__.py b/tools/glusterfind/src/__init__.py new file mode 100644 index 00000000000..eb941c6d67c --- /dev/null +++ b/tools/glusterfind/src/__init__.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py new file mode 100644 index 00000000000..4aee225d22e --- /dev/null +++ b/tools/glusterfind/src/brickfind.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +import sys +import logging +from argparse import ArgumentParser, RawDescriptionHelpFormatter +from errno import ENOENT + +from utils import mkdirp, setup_logger, create_file, output_write, find +import conf + + +PROG_DESCRIPTION = """ +Changelog Crawler +""" + +logger = logging.getLogger() + + +def brickfind_crawl(brick, args): + if brick.endswith("/"): + brick = brick[0:len(brick)-1] + + working_dir = os.path.dirname(args.outfile) + mkdirp(working_dir, exit_on_err=True, logger=logger) + create_file(args.outfile, exit_on_err=True, logger=logger) + + with open(args.outfile, "a+") as fout: + brick_path_len = len(brick) + + def mtime_filter(path): + try: + st = os.lstat(path) + except (OSError, IOError) as e: + if e.errno == ENOENT: + st = None + else: + raise + + if st and (st.st_mtime > args.start or st.st_ctime > args.start): + return True + + return False + + def output_callback(path): + path = path.strip() + path = path[brick_path_len+1:] + output_write(fout, path, args.output_prefix) + + if args.full: + find(brick, callback_func=output_callback, + ignore_dirs=[".glusterfs"]) + else: + find(brick, callback_func=output_callback, + filter_func=mtime_filter, + ignore_dirs=[".glusterfs"]) + + fout.flush() + os.fsync(fout.fileno()) + + +def _get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=PROG_DESCRIPTION) + + parser.add_argument("session", help="Session Name") + parser.add_argument("volume", help="Volume Name") + parser.add_argument("brick", help="Brick Name") + parser.add_argument("outfile", help="Output File") + parser.add_argument("start", help="Start Time", type=float) + parser.add_argument("--debug", help="Debug", action="store_true") + parser.add_argument("--full", help="Full Find", action="store_true") + parser.add_argument("--output-prefix", help="File prefix in output", + default=".") + + return parser.parse_args() + + +if __name__ == "__main__": + args = _get_args() + mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "brickfind.log") + setup_logger(logger, log_file, args.debug) + brickfind_crawl(args.brick, args) + sys.exit(0) diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py new file mode 100644 index 00000000000..b7697ea5030 --- /dev/null +++ b/tools/glusterfind/src/changelog.py @@ -0,0 +1,309 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +import sys +import time +import xattr +from errno import ENOENT +import logging +from argparse import ArgumentParser, RawDescriptionHelpFormatter +import hashlib + +import libgfchangelog +from utils import create_file, mkdirp, execute, symlink_gfid_to_path +from utils import fail, setup_logger, output_write, find +import conf + + +CHANGELOG_LOG_LEVEL = 9 +CHANGELOG_CONN_RETRIES = 5 +CHANGELOGAPI_NUM_WORKERS = 3 +PROG_DESCRIPTION = """ +Changelog Crawler +""" +history_turns = 0 +history_turn_time = 0 + +logger = logging.getLogger() + + +def gfid_to_path_using_batchfind(brick, gfids_file, output_file): + """ + find -samefile gets the inode number and crawls entire namespace + to get the list of files/dirs having same inode number. + Do find without any option, except the ignore directory option, + print the output in <INODE_NUM> <PATH> format, use this output + to look into in-memory dictionary of inode numbers got from the + list of GFIDs + """ + with open(output_file, "a+") as fout: + inode_dict = {} + with open(gfids_file) as f: + for gfid in f: + gfid = gfid.strip() + backend_path = os.path.join(brick, ".glusterfs", + gfid[0:2], gfid[2:4], gfid) + + try: + inode_dict[str(os.stat(backend_path).st_ino)] = 1 + except (IOError, OSError) as e: + if e.errno == ENOENT: + continue + else: + fail("%s Failed to convert to path from " + "GFID %s: %s" % (brick, gfid, e), logger=logger) + + if not inode_dict: + return + + def inode_filter(path): + try: + st = os.lstat(path) + except (OSError, IOError) as e: + if e.errno == ENOENT: + st = None + else: + raise + + if st and inode_dict.get(str(st.st_ino), None): + return True + + return False + + brick_path_len = len(brick) + + def output_callback(path): + path = path.strip() + path = path[brick_path_len+1:] + output_write(fout, path, args.output_prefix) + + # Length of brick path, to remove from output path + find(brick, callback_func=output_callback, + filter_func=inode_filter, + ignore_dirs=[".glusterfs"]) + + fout.flush() + os.fsync(fout.fileno()) + + +def gfid_to_path_using_pgfid(brick, gfids_file, output_file, outfile_failures): + """ + Parent GFID is saved as xattr, collect Parent GFIDs from all + the files from gfids_file. Convert parent GFID to path and Crawl + each directories to get the list of files/dirs having same inode number. + Do find with maxdepth as 1 and print the output in <INODE_NUM> <PATH> + format, use this output to look into in memory dictionary of inode + numbers got from the list of GFIDs + """ + with open(output_file, "a+") as fout: + pgfids = set() + inode_dict = {} + with open(gfids_file) as f: + for gfid in f: + gfid = gfid.strip() + p = os.path.join(brick, + ".glusterfs", + gfid[0:2], + gfid[2:4], + gfid) + if os.path.islink(p): + path = symlink_gfid_to_path(brick, gfid) + output_write(fout, path, args.output_prefix) + else: + try: + inode_dict[str(os.stat(p).st_ino)] = 1 + file_xattrs = xattr.list(p) + num_parent_gfid = 0 + for x in file_xattrs: + if x.startswith("trusted.pgfid."): + num_parent_gfid += 1 + pgfids.add(x.split(".")[-1]) + + if num_parent_gfid == 0: + with open(outfile_failures, "a") as f: + f.write("%s\n" % gfid) + f.flush() + os.fsync(f.fileno()) + + except (IOError, OSError) as e: + if e.errno == ENOENT: + continue + else: + fail("%s Failed to convert to path from " + "GFID %s: %s" % (brick, gfid, e), + logger=logger) + + if not inode_dict: + return + + def inode_filter(path): + try: + st = os.lstat(path) + except (OSError, IOError) as e: + if e.errno == ENOENT: + st = None + else: + raise + + if st and inode_dict.get(str(st.st_ino), None): + return True + + return False + + # Length of brick path, to remove from output path + brick_path_len = len(brick) + + def output_callback(path): + path = path.strip() + path = path[brick_path_len+1:] + output_write(fout, path, args.output_prefix) + + for pgfid in pgfids: + path = symlink_gfid_to_path(brick, pgfid) + find(os.path.join(brick, path), + callback_func=output_callback, + filter_func=inode_filter, + ignore_dirs=[".glusterfs"], + subdirs_crawl=False) + + fout.flush() + os.fsync(fout.fileno()) + + +def sort_unique(filename): + execute(["sort", "-u", "-o", filename, filename], + exit_msg="Sort failed", logger=logger) + + +def get_changes(brick, hash_dir, log_file, end, args): + """ + Makes use of libgfchangelog's history API to get changelogs + containing changes from start and end time. Further collects + the modified gfids from the changelogs and writes the list + of gfid to 'gfid_list' file. + """ + try: + libgfchangelog.cl_register(brick, hash_dir, log_file, + CHANGELOG_LOG_LEVEL, CHANGELOG_CONN_RETRIES) + except libgfchangelog.ChangelogException as e: + fail("%s Changelog register failed: %s" % (brick, e), logger=logger) + + # Output files to record GFIDs and GFID to Path failure GFIDs + gfid_list_path = args.outfile + ".gfids" + gfid_list_failures_file = gfid_list_path + ".failures" + create_file(gfid_list_path, exit_on_err=True, logger=logger) + create_file(gfid_list_failures_file, exit_on_err=True, logger=logger) + + # Changelogs path(Hard coded to BRICK/.glusterfs/changelogs + cl_path = os.path.join(brick, ".glusterfs/changelogs") + + # Fail if History fails for requested Start and End + try: + actual_end = libgfchangelog.cl_history_changelog( + cl_path, args.start, end, CHANGELOGAPI_NUM_WORKERS) + except libgfchangelog.ChangelogException as e: + fail("%s Historical Changelogs not available: %s" % (brick, e), + logger=logger) + + try: + # scan followed by getchanges till scan returns zero. + # history_scan() is blocking call, till it gets the number + # of changelogs to process. Returns zero when no changelogs + # to be processed. returns positive value as number of changelogs + # to be processed, which will be fetched using + # history_getchanges() + changes = [] + while libgfchangelog.cl_history_scan() > 0: + changes += libgfchangelog.cl_history_getchanges() + + if changes: + with open(gfid_list_path, 'a+') as fgfid: + for change in changes: + with open(change) as f: + for line in f: + # Space delimited list, collect GFID + details = line.split() + fgfid.write("%s\n" % details[1]) + + libgfchangelog.cl_history_done(change) + fgfid.flush() + os.fsync(fgfid.fileno()) + except libgfchangelog.ChangelogException as e: + fail("%s Error during Changelog Crawl: %s" % (brick, e), + logger=logger) + + # If TS returned from history_changelog is < end time + # then FS crawl may be required, since history is only available + # till TS returned from history_changelog + if actual_end < end: + fail("Partial History available with Changelog", 2, logger=logger) + + sort_unique(gfid_list_path) + gfid_to_path_using_pgfid(brick, gfid_list_path, + args.outfile, gfid_list_failures_file) + gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile) + + +def changelog_crawl(brick, end, args): + """ + Init function, prepares working dir and calls Changelog query + """ + if brick.endswith("/"): + brick = brick[0:len(brick)-1] + + # WORKING_DIR/BRICKHASH/OUTFILE + working_dir = os.path.dirname(args.outfile) + brickhash = hashlib.sha1(brick) + brickhash = str(brickhash.hexdigest()) + working_dir = os.path.join(working_dir, brickhash) + + mkdirp(working_dir, exit_on_err=True, logger=logger) + create_file(args.outfile, exit_on_err=True, logger=logger) + create_file(args.outfile + ".gfids", exit_on_err=True, logger=logger) + + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "changelog.%s.log" % brickhash) + + logger.info("%s Started Changelog Crawl. Start: %s, End: %s" + % (brick, args.start, end)) + get_changes(brick, working_dir, log_file, end, args) + + +def _get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=PROG_DESCRIPTION) + + parser.add_argument("session", help="Session Name") + parser.add_argument("volume", help="Volume Name") + parser.add_argument("brick", help="Brick Name") + parser.add_argument("outfile", help="Output File") + parser.add_argument("start", help="Start Time", type=int) + parser.add_argument("--debug", help="Debug", action="store_true") + parser.add_argument("--output-prefix", help="File prefix in output", + default=".") + + return parser.parse_args() + + +if __name__ == "__main__": + args = _get_args() + mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "changelog.log") + setup_logger(logger, log_file, args.debug) + end = int(time.time()) - int(conf.get_opt("changelog_rollover_time")) + changelog_crawl(args.brick, end, args) + sys.exit(0) diff --git a/tools/glusterfind/src/conf.py b/tools/glusterfind/src/conf.py new file mode 100644 index 00000000000..2c6eac2bb14 --- /dev/null +++ b/tools/glusterfind/src/conf.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +import ConfigParser + +config = ConfigParser.ConfigParser() +config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), + "tool.conf")) + + +def list_change_detectors(): + return dict(config.items("change_detectors")).keys() + + +def get_opt(opt): + return config.get("vars", opt) + + +def get_change_detector(opt): + return config.get("change_detectors", opt) diff --git a/tools/glusterfind/src/libgfchangelog.py b/tools/glusterfind/src/libgfchangelog.py new file mode 100644 index 00000000000..e54a16a4742 --- /dev/null +++ b/tools/glusterfind/src/libgfchangelog.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +from ctypes import CDLL, get_errno, create_string_buffer, c_ulong, byref +from ctypes import RTLD_GLOBAL +from ctypes.util import find_library + + +class ChangelogException(OSError): + pass + + +libgfc = CDLL(find_library("gfchangelog"), use_errno=True, mode=RTLD_GLOBAL) + + +def raise_oserr(): + errn = get_errno() + raise ChangelogException(errn, os.strerror(errn)) + + +def cl_register(brick, path, log_file, log_level, retries=0): + ret = libgfc.gf_changelog_register(brick, path, log_file, + log_level, retries) + if ret == -1: + raise_oserr() + + +def cl_history_scan(): + ret = libgfc.gf_history_changelog_scan() + if ret == -1: + raise_oserr() + + return ret + + +def cl_history_changelog(changelog_path, start, end, num_parallel): + actual_end = c_ulong() + ret = libgfc.gf_history_changelog(changelog_path, start, end, + num_parallel, + byref(actual_end)) + if ret == -1: + raise_oserr() + + return actual_end.value + + +def cl_history_startfresh(): + ret = libgfc.gf_history_changelog_start_fresh() + if ret == -1: + raise_oserr() + + +def cl_history_getchanges(): + """ remove hardcoding for path name length """ + def clsort(f): + return f.split('.')[-1] + + changes = [] + buf = create_string_buffer('\0', 4096) + + while True: + ret = libgfc.gf_history_changelog_next_change(buf, 4096) + if ret in (0, -1): + break + changes.append(buf.raw[:ret - 1]) + if ret == -1: + raise_oserr() + + return sorted(changes, key=clsort) + + +def cl_history_done(clfile): + ret = libgfc.gf_history_changelog_done(clfile) + if ret == -1: + raise_oserr() diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py new file mode 100644 index 00000000000..d6b9a24dec9 --- /dev/null +++ b/tools/glusterfind/src/main.py @@ -0,0 +1,468 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sys +from errno import ENOENT +import time +from multiprocessing import Process +import os +import xml.etree.cElementTree as etree +from argparse import ArgumentParser, RawDescriptionHelpFormatter +import logging +import shutil + +from utils import execute, is_host_local, mkdirp, fail +from utils import setup_logger, human_time +import conf + + +PROG_DESCRIPTION = """ +GlusterFS Incremental API +""" +ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError + +logger = logging.getLogger() + + +def node_run(volume, host, path, start, outfile, args, fallback=False): + """ + If host is local node, execute the command locally. If not local + execute the CHANGE_DETECTOR command via ssh and copy the output file from + remote node using scp. + """ + localdir = is_host_local(host) + + # If Full backup is requested or start time is zero, use brickfind + change_detector = conf.get_change_detector(args.change_detector) + if ((start == 0 or args.full) and args.change_detector == "changelog") or \ + fallback: + change_detector = conf.get_change_detector("brickfind") + + # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug + # --gfidpath <TYPE> + cmd = [change_detector, + args.session, + volume, + path, + outfile, + str(start), + "--output-prefix", + args.output_prefix] + \ + (["--debug"] if args.debug else []) + \ + (["--full"] if args.full else []) + + if not localdir: + # prefix with ssh command if not local node + cmd = ["ssh", + "-i", conf.get_opt("secret_pem"), + "root@%s" % host] + cmd + + rc, out, err = execute(cmd, logger=logger) + if rc == 2: + # Partial History Fallback + logger.info("%s %s Fallback to brickfind" % (host, err.strip())) + # Exit only from process, handled in main. + sys.exit(rc) + elif rc != 0: + fail("%s - Change detection failed" % host, logger=logger) + + if not localdir: + cmd_copy = ["scp", + "-i", conf.get_opt("secret_pem"), + "root@%s:/%s" % (host, outfile), + os.path.dirname(outfile)] + execute(cmd_copy, exit_msg="%s - Copy command failed" % host, + logger=logger) + + +def node_cleanup(host, args): + localdir = is_host_local(host) + + # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug + # --gfidpath <TYPE> + cmd = [conf.get_opt("nodecleanup"), + args.session, + args.volume] + (["--debug"] if args.debug else []) + + if not localdir: + # prefix with ssh command if not local node + cmd = ["ssh", + "-i", conf.get_opt("secret_pem"), + "root@%s" % host] + cmd + + execute(cmd, exit_msg="%s - Cleanup failed" % host, logger=logger) + + +def cleanup(nodes, args): + pool = [] + for num, node in enumerate(nodes): + host, brick = node[1].split(":") + # temp output file + node_outfile = os.path.join(conf.get_opt("working_dir"), + args.session, + args.volume, + "tmp_output_%s.txt" % num) + + try: + os.remove(node_outfile) + except (OSError, IOError): + # TODO: Cleanup Failure, Handle + pass + + p = Process(target=node_cleanup, + args=(host, args)) + p.start() + pool.append(p) + + exit_codes = 0 + for p in pool: + p.join() + exit_codes += (0 if p.exitcode == 0 else 1) + + if exit_codes != 0: + sys.exit(1) + + +def failback_node_run(brick_details, idx, volume, start, outfile, args): + host, brick = brick_details.split(":") + p = Process(target=node_run, + args=(volume, host, brick, start, outfile, args, True)) + p.start() + p.join() + return p.exitcode + + +def run_in_nodes(volume, start, args): + """ + Get nodes of volume using gluster volume info, spawn a process + each for a Node. Merge the output files once all the process + complete their tasks. + """ + nodes = get_nodes(volume) + pool = [] + node_outfiles = [] + for num, node in enumerate(nodes): + host, brick = node[1].split(":") + # temp output file + node_outfile = os.path.join(conf.get_opt("working_dir"), + args.session, + volume, + "tmp_output_%s.txt" % num) + node_outfiles.append(node_outfile) + p = Process(target=node_run, args=(volume, host, brick, start, + node_outfile, args)) + p.start() + pool.append(p) + + exit_codes = 0 + for idx, p in enumerate(pool): + p.join() + # Handle the Changelog failure, fallback to Brickfind + if p.exitcode == 2: + rc = failback_node_run(nodes[idx][1], idx, volume, start, + node_outfiles[idx], args) + exit_codes += (0 if rc == 0 else 1) + elif p.exitcode != 0: + exit_codes += (0 if p.exitcode == 0 else 1) + + if exit_codes != 0: + sys.exit(1) + + # Merge all output files + cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + + cleanup(nodes, args) + + +def get_nodes(volume): + """ + Get the gluster volume info xml output and parse to get + the brick details. + """ + cmd = ["gluster", 'volume', 'info', volume, "--xml"] + _, data, _ = execute(cmd, + exit_msg="Failed to Run Gluster Volume Info", + logger=logger) + tree = etree.fromstring(data) + + nodes = [] + volume_el = tree.find('volInfo/volumes/volume') + try: + for b in volume_el.findall('bricks/brick'): + nodes.append((b.find('hostUuid').text, + b.find('name').text)) + except (ParseError, AttributeError, ValueError) as e: + fail("Failed to parse Volume Info: %s" % e, logger=logger) + + return nodes + + +def _get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=PROG_DESCRIPTION) + subparsers = parser.add_subparsers(dest="mode") + + # create <SESSION> <VOLUME> [--debug] [--force] + parser_create = subparsers.add_parser('create') + parser_create.add_argument("session", help="Session Name") + parser_create.add_argument("volume", help="Volume Name") + parser_create.add_argument("--debug", help="Debug", action="store_true") + parser_create.add_argument("--force", help="Force option to recreate " + "the session", action="store_true") + + # delete <SESSION> <VOLUME> [--debug] + parser_delete = subparsers.add_parser('delete') + parser_delete.add_argument("session", help="Session Name") + parser_delete.add_argument("volume", help="Volume Name") + parser_delete.add_argument("--debug", help="Debug", action="store_true") + + # list [--session <SESSION>] [--volume <VOLUME>] + parser_list = subparsers.add_parser('list') + parser_list.add_argument("--session", help="Session Name", default="") + parser_list.add_argument("--volume", help="Volume Name", default="") + parser_list.add_argument("--debug", help="Debug", action="store_true") + + # pre <SESSION> <VOLUME> <OUTFILE> [--change-detector <CHANGE_DETECTOR>] + # [--output-prefix <OUTPUT_PREFIX>] [--full] + parser_pre = subparsers.add_parser('pre') + parser_pre.add_argument("session", help="Session Name") + parser_pre.add_argument("volume", help="Volume Name") + parser_pre.add_argument("outfile", help="Output File") + parser_pre.add_argument("--debug", help="Debug", action="store_true") + parser_pre.add_argument("--full", help="Full find", action="store_true") + parser_pre.add_argument("--change-detector", dest="change_detector", + help="Change detection", + choices=conf.list_change_detectors(), + type=str, default='changelog') + parser_pre.add_argument("--output-prefix", help="File prefix in output", + default=".") + + # post <SESSION> <VOLUME> + parser_post = subparsers.add_parser('post') + parser_post.add_argument("session", help="Session Name") + parser_post.add_argument("volume", help="Volume Name") + parser_post.add_argument("--debug", help="Debug", action="store_true") + + return parser.parse_args() + + +def ssh_setup(): + if not os.path.exists(conf.get_opt("secret_pem")): + # Generate ssh-key + cmd = ["ssh-keygen", + "-N", + "", + "-f", + conf.get_opt("secret_pem")] + execute(cmd, + exit_msg="Unable to generate ssh key %s" + % conf.get_opt("secret_pem"), + logger=logger) + + logger.info("Ssh key generated %s" % conf.get_opt("secret_pem")) + + # Copy pub file to all nodes + cmd = ["gluster", + "system::", + "copy", + "file", + "/" + os.path.basename(conf.get_opt("secret_pem")) + ".pub"] + execute(cmd, exit_msg="Failed to distribute ssh keys", logger=logger) + + logger.info("Distributed ssh key to all nodes of Volume") + + # Add to authorized_keys file in each node + cmd = ["gluster", + "system::", + "execute", + "add_secret_pub", + "root", + os.path.basename(conf.get_opt("secret_pem")) + ".pub"] + execute(cmd, + exit_msg="Failed to add ssh keys to authorized_keys file", + logger=logger) + + logger.info("Ssh key added to authorized_keys of Volume nodes") + + +def mode_create(session_dir, args): + logger.debug("Init is called - Session: %s, Volume: %s" + % (args.session, args.volume)) + + mkdirp(session_dir, exit_on_err=True, logger=logger) + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + status_file = os.path.join(session_dir, args.volume, "status") + + if os.path.exists(status_file) and not args.force: + fail("Session %s already created" % args.session, logger=logger) + + if not os.path.exists(status_file) or args.force: + ssh_setup() + + execute(["gluster", "volume", "set", + args.volume, "build-pgfid", "on"], + exit_msg="Failed to set volume option build-pgfid on", + logger=logger) + logger.info("Volume option set %s, build-pgfid on" % args.volume) + + execute(["gluster", "volume", "set", + args.volume, "changelog.changelog", "on"], + exit_msg="Failed to set volume option " + "changelog.changelog on", logger=logger) + logger.info("Volume option set %s, changelog.changelog on" + % args.volume) + + if not os.path.exists(status_file): + with open(status_file, "w", buffering=0) as f: + # Add Rollover time to current time to make sure changelogs + # will be available if we use this time as start time + time_to_update = int(time.time()) + int( + conf.get_opt("changelog_rollover_time")) + f.write(str(time_to_update)) + + sys.exit(0) + + +def mode_pre(session_dir, args): + """ + Read from Session file and write to session.pre file + """ + endtime_to_update = int(time.time()) - int( + conf.get_opt("changelog_rollover_time")) + status_file = os.path.join(session_dir, args.volume, "status") + status_file_pre = status_file + ".pre" + + mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger) + + start = 0 + try: + with open(status_file) as f: + start = int(f.read().strip()) + except ValueError: + pass + except (OSError, IOError) as e: + fail("Error Opening Session file %s: %s" + % (status_file, e), logger=logger) + + logger.debug("Pre is called - Session: %s, Volume: %s, " + "Start time: %s, End time: %s" + % (args.session, args.volume, start, endtime_to_update)) + + run_in_nodes(args.volume, start, args) + + with open(status_file_pre, "w", buffering=0) as f: + f.write(str(endtime_to_update)) + + sys.stdout.write("Generated output file %s\n" % args.outfile) + + +def mode_post(session_dir, args): + """ + If pre session file exists, overwrite session file + If pre session file does not exists, return ERROR + """ + status_file = os.path.join(session_dir, args.volume, "status") + logger.debug("Post is called - Session: %s, Volume: %s" + % (args.session, args.volume)) + status_file_pre = status_file + ".pre" + + if os.path.exists(status_file_pre): + os.rename(status_file_pre, status_file) + sys.exit(0) + else: + fail("Pre script is not run", logger=logger) + + +def mode_delete(session_dir, args): + def handle_rm_error(func, path, exc_info): + if exc_info[1].errno == ENOENT: + return + + raise exc_info[1] + + shutil.rmtree(os.path.join(session_dir, args.volume), + onerror=handle_rm_error) + + +def mode_list(session_dir, args): + """ + List available sessions to stdout, if session name is set + only list that session. + """ + if args.session: + if not os.path.exists(os.path.join(session_dir, args.session)): + fail("Invalid Session", logger=logger) + sessions = [args.session] + else: + sessions = [] + for d in os.listdir(session_dir): + sessions.append(d) + + output = [] + for session in sessions: + # Session Volume Last Processed + volnames = os.listdir(os.path.join(session_dir, session)) + + for volname in volnames: + if args.volume and args.volume != volname: + continue + + status_file = os.path.join(session_dir, session, volname, "status") + last_processed = None + try: + with open(status_file) as f: + last_processed = f.read().strip() + except (OSError, IOError) as e: + if e.errno == ENOENT: + pass + else: + raise + output.append((session, volname, last_processed)) + + if output: + sys.stdout.write("%s %s %s\n" % ("SESSION".ljust(25), + "VOLUME".ljust(25), + "SESSION TIME".ljust(25))) + sys.stdout.write("-"*75) + sys.stdout.write("\n") + for session, volname, last_processed in output: + sys.stdout.write("%s %s %s\n" % (session.ljust(25), + volname.ljust(25), + human_time(last_processed).ljust(25))) + + +def main(): + args = _get_args() + mkdirp(conf.get_opt("session_dir"), exit_on_err=True) + + if args.mode == "list": + session_dir = conf.get_opt("session_dir") + else: + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + + if not os.path.exists(session_dir) and args.mode not in ["create", "list"]: + fail("Invalid session %s" % args.session) + + mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "cli.log") + setup_logger(logger, log_file, args.debug) + + # globals() will have all the functions already defined. + # mode_<args.mode> will be the function name to be called + globals()["mode_" + args.mode](session_dir, args) diff --git a/tools/glusterfind/src/nodecleanup.py b/tools/glusterfind/src/nodecleanup.py new file mode 100644 index 00000000000..a31d4d83acd --- /dev/null +++ b/tools/glusterfind/src/nodecleanup.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import shutil +import sys +import os +import logging +from errno import ENOENT + +from utils import setup_logger, mkdirp +import conf + +logger = logging.getLogger() + + +if __name__ == "__main__": + # Args: <SESSION> <VOLUME> + session = sys.argv[1] + volume = sys.argv[2] + + working_dir = os.path.join(conf.get_opt("working_dir"), + session, + volume) + + mkdirp(os.path.join(conf.get_opt("log_dir"), session, volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + session, + volume, + "changelog.log") + + setup_logger(logger, log_file) + + try: + def handle_rm_error(func, path, exc_info): + if exc_info[1].errno == ENOENT: + return + + raise exc_info[1] + + shutil.rmtree(working_dir, onerror=handle_rm_error) + except (OSError, IOError) as e: + logger.error("Failed to delete working directory: %s" % e) + sys.exit(1) diff --git a/tools/glusterfind/src/tool.conf.in b/tools/glusterfind/src/tool.conf.in new file mode 100644 index 00000000000..bae46499aa0 --- /dev/null +++ b/tools/glusterfind/src/tool.conf.in @@ -0,0 +1,11 @@ +[vars] +session_dir=@GLUSTERD_WORKDIR@/glusterfind/ +secret_pem=@GLUSTERD_WORKDIR@/glusterfind.secret.pem +working_dir=@GLUSTERFSD_MISCDIR@/glusterfind/ +log_dir=/var/log/glusterfs/glusterfind/ +nodecleanup=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodecleanup.py +changelog_rollover_time=15 + +[change_detectors] +changelog=@GLUSTERFS_LIBEXECDIR@/glusterfind/changelog.py +brickfind=@GLUSTERFS_LIBEXECDIR@/glusterfind/brickfind.py
\ No newline at end of file diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py new file mode 100644 index 00000000000..c503a2b9f58 --- /dev/null +++ b/tools/glusterfind/src/utils.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sys +import socket +from subprocess import PIPE, Popen +from errno import EPERM, EEXIST +import logging +import os +from datetime import datetime + +ROOT_GFID = "00000000-0000-0000-0000-000000000001" + + +def find(path, callback_func=lambda x: True, filter_func=lambda x: True, + ignore_dirs=[], subdirs_crawl=True): + if os.path.basename(path) in ignore_dirs: + return + + if filter_func(path): + callback_func(path) + + for p in os.listdir(path): + full_path = os.path.join(path, p) + + if os.path.isdir(full_path): + if subdirs_crawl: + find(full_path, callback_func, filter_func, ignore_dirs) + else: + if filter_func(full_path): + callback_func(full_path) + else: + if filter_func(full_path): + callback_func(full_path) + + +def output_write(f, path, prefix="."): + if path == "": + return + + if prefix != ".": + path = os.path.join(prefix, path) + f.write("%s\n" % path) + + +def human_time(ts): + return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S") + + +def setup_logger(logger, path, debug=False): + if debug: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + + # create the logging file handler + fh = logging.FileHandler(path) + + formatter = logging.Formatter("[%(asctime)s] %(levelname)s " + "[%(module)s - %(lineno)s:%(funcName)s] " + "- %(message)s") + + fh.setFormatter(formatter) + + # add handler to logger object + logger.addHandler(fh) + + +def create_file(path, exit_on_err=False, logger=None): + """ + If file exists overwrite. Print error to stderr and exit + if exit_on_err is set, else raise the exception. Consumer + should handle the exception. + """ + try: + open(path, 'w').close() + except (OSError, IOError) as e: + if exit_on_err: + fail("Failed to create file %s: %s" % (path, e), logger=logger) + else: + raise + + +def mkdirp(path, exit_on_err=False, logger=None): + """ + Try creating required directory structure + ignore EEXIST and raise exception for rest of the errors. + Print error in stderr and exit if exit_on_err is set, else + raise exception. + """ + try: + os.makedirs(path) + except (OSError, IOError) as e: + if e.errno == EEXIST and os.path.isdir(path): + pass + else: + if exit_on_err: + fail("Fail to create dir %s: %s" % (path, e), logger=logger) + else: + raise + + +def fail(msg, code=1, logger=None): + """ + Write error to stderr and exit + """ + if logger: + logger.error(msg) + sys.stderr.write("%s\n" % msg) + sys.exit(code) + + +def execute(cmd, exit_msg=None, logger=None): + """ + If failure_msg is not None then return returncode, out and error. + If failure msg is set, write to stderr and exit. + """ + p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True) + + (out, err) = p.communicate() + if p.returncode != 0 and exit_msg is not None: + fail("%s: %s" % (exit_msg, err), p.returncode, logger=logger) + + return (p.returncode, out, err) + + +def symlink_gfid_to_path(brick, gfid): + """ + Each directories are symlinked to file named GFID + in .glusterfs directory of brick backend. Using readlink + we get PARGFID/basename of dir. readlink recursively till + we get PARGFID as ROOT_GFID. + """ + if gfid == ROOT_GFID: + return "" + + out_path = "" + while True: + path = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid) + path_readlink = os.readlink(path) + pgfid = os.path.dirname(path_readlink) + out_path = os.path.join(os.path.basename(path_readlink), out_path) + if pgfid == "../../00/00/%s" % ROOT_GFID: + break + gfid = os.path.basename(pgfid) + return out_path + + +def is_host_local(host): + """ + Find if a host is local or not. + Code copied from $GLUSTERFS/geo-replication/syncdaemon/syncdutils.py + """ + locaddr = False + for ai in socket.getaddrinfo(host, None): + # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators + # /mgmt/glusterd/src/glusterd-utils.c#L125 + if ai[0] == socket.AF_INET: + if ai[-1][0].split(".")[0] == "127": + locaddr = True + break + elif ai[0] == socket.AF_INET6: + if ai[-1][0] == "::1": + locaddr = True + break + else: + continue + try: + # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue, + # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587 + s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP) + except socket.error: + ex = sys.exc_info()[1] + if ex.errno != EPERM: + raise + f = None + try: + f = open("/proc/sys/net/ipv4/ip_nonlocal_bind") + if int(f.read()) != 0: + logger.warning("non-local bind is set and not " + "allowed to create " + "raw sockets, cannot determine " + "if %s is local" % host) + return False + s = socket.socket(ai[0], socket.SOCK_DGRAM) + finally: + if f: + f.close() + try: + s.bind(ai[-1]) + locaddr = True + break + except: + pass + s.close() + return locaddr |