summaryrefslogtreecommitdiffstats
path: root/tools/glusterfind/src
diff options
context:
space:
mode:
Diffstat (limited to 'tools/glusterfind/src')
-rw-r--r--tools/glusterfind/src/Makefile.am4
-rw-r--r--tools/glusterfind/src/brickfind.py39
-rw-r--r--tools/glusterfind/src/changelog.py40
-rw-r--r--tools/glusterfind/src/main.py278
-rw-r--r--tools/glusterfind/src/nodeagent.py123
-rw-r--r--tools/glusterfind/src/nodecleanup.py51
-rw-r--r--tools/glusterfind/src/tool.conf.in3
-rw-r--r--tools/glusterfind/src/utils.py118
8 files changed, 367 insertions, 289 deletions
diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am
index 458b820fd19..7b819828d97 100644
--- a/tools/glusterfind/src/Makefile.am
+++ b/tools/glusterfind/src/Makefile.am
@@ -3,12 +3,12 @@ glusterfinddir = $(libexecdir)/glusterfs/glusterfind
glusterfind_PYTHON = conf.py utils.py __init__.py \
main.py libgfchangelog.py
-glusterfind_SCRIPTS = changelog.py nodecleanup.py \
+glusterfind_SCRIPTS = changelog.py nodeagent.py \
brickfind.py
glusterfind_DATA = tool.conf
-EXTRA_DIST = changelog.py nodecleanup.py brickfind.py \
+EXTRA_DIST = changelog.py nodeagent.py brickfind.py \
tool.conf
CLEANFILES =
diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py
index 1090f408e28..9758bef56ff 100644
--- a/tools/glusterfind/src/brickfind.py
+++ b/tools/glusterfind/src/brickfind.py
@@ -12,7 +12,8 @@ import os
import sys
import logging
from argparse import ArgumentParser, RawDescriptionHelpFormatter
-from errno import ENOENT
+import urllib
+import time
from utils import mkdirp, setup_logger, create_file, output_write, find
import conf
@@ -36,36 +37,17 @@ def brickfind_crawl(brick, args):
with open(args.outfile, "a+") as fout:
brick_path_len = len(brick)
- def mtime_filter(path):
- try:
- st = os.lstat(path)
- except (OSError, IOError) as e:
- if e.errno == ENOENT:
- st = None
- else:
- raise
-
- if st and (st.st_mtime > args.start or st.st_ctime > args.start):
- return True
-
- return False
-
def output_callback(path):
path = path.strip()
path = path[brick_path_len+1:]
- output_write(fout, path, args.output_prefix)
+ output_write(fout, path, args.output_prefix, encode=True)
ignore_dirs = [os.path.join(brick, dirname)
for dirname in
conf.get_opt("brick_ignore_dirs").split(",")]
- if args.full:
- find(brick, callback_func=output_callback,
- ignore_dirs=ignore_dirs)
- else:
- find(brick, callback_func=output_callback,
- filter_func=mtime_filter,
- ignore_dirs=ignore_dirs)
+ find(brick, callback_func=output_callback,
+ ignore_dirs=ignore_dirs)
fout.flush()
os.fsync(fout.fileno())
@@ -81,7 +63,6 @@ def _get_args():
parser.add_argument("outfile", help="Output File")
parser.add_argument("start", help="Start Time", type=float)
parser.add_argument("--debug", help="Debug", action="store_true")
- parser.add_argument("--full", help="Full Find", action="store_true")
parser.add_argument("--output-prefix", help="File prefix in output",
default=".")
@@ -90,6 +71,12 @@ def _get_args():
if __name__ == "__main__":
args = _get_args()
+ session_dir = os.path.join(conf.get_opt("session_dir"), args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+ status_file_pre = status_file + ".pre"
+ mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+ logger=logger)
mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume),
exit_on_err=True)
log_file = os.path.join(conf.get_opt("log_dir"),
@@ -97,5 +84,9 @@ if __name__ == "__main__":
args.volume,
"brickfind.log")
setup_logger(logger, log_file, args.debug)
+
+ time_to_update = int(time.time())
brickfind_crawl(args.brick, args)
+ with open(status_file_pre, "w", buffering=0) as f:
+ f.write(str(time_to_update))
sys.exit(0)
diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py
index eb73635fb32..2c4ee9106e1 100644
--- a/tools/glusterfind/src/changelog.py
+++ b/tools/glusterfind/src/changelog.py
@@ -16,10 +16,12 @@ from errno import ENOENT
import logging
from argparse import ArgumentParser, RawDescriptionHelpFormatter
import hashlib
+import urllib
import libgfchangelog
from utils import create_file, mkdirp, execute, symlink_gfid_to_path
from utils import fail, setup_logger, output_write, find
+from utils import get_changelog_rollover_time
import conf
@@ -190,7 +192,7 @@ def sort_unique(filename):
exit_msg="Sort failed", logger=logger)
-def get_changes(brick, hash_dir, log_file, end, args):
+def get_changes(brick, hash_dir, log_file, start, end, args):
"""
Makes use of libgfchangelog's history API to get changelogs
containing changes from start and end time. Further collects
@@ -216,7 +218,7 @@ def get_changes(brick, hash_dir, log_file, end, args):
# Fail if History fails for requested Start and End
try:
actual_end = libgfchangelog.cl_history_changelog(
- cl_path, args.start, end, CHANGELOGAPI_NUM_WORKERS)
+ cl_path, start, end, CHANGELOGAPI_NUM_WORKERS)
except libgfchangelog.ChangelogException as e:
fail("%s Historical Changelogs not available: %s" % (brick, e),
logger=logger)
@@ -235,6 +237,11 @@ def get_changes(brick, hash_dir, log_file, end, args):
if changes:
with open(gfid_list_path, 'a+') as fgfid:
for change in changes:
+ # Ignore if last processed changelog comes
+ # again in list
+ if change.endswith(".%s" % start):
+ continue
+
with open(change) as f:
for line in f:
# Space delimited list, collect GFID
@@ -259,8 +266,10 @@ def get_changes(brick, hash_dir, log_file, end, args):
args.outfile, gfid_list_failures_file)
gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile)
+ return actual_end
-def changelog_crawl(brick, end, args):
+
+def changelog_crawl(brick, start, end, args):
"""
Init function, prepares working dir and calls Changelog query
"""
@@ -283,8 +292,8 @@ def changelog_crawl(brick, end, args):
"changelog.%s.log" % brickhash)
logger.info("%s Started Changelog Crawl. Start: %s, End: %s"
- % (brick, args.start, end))
- get_changes(brick, working_dir, log_file, end, args)
+ % (brick, start, end))
+ return get_changes(brick, working_dir, log_file, start, end, args)
def _get_args():
@@ -312,6 +321,23 @@ if __name__ == "__main__":
args.volume,
"changelog.log")
setup_logger(logger, log_file, args.debug)
- end = int(time.time()) - int(conf.get_opt("changelog_rollover_time"))
- changelog_crawl(args.brick, end, args)
+
+ session_dir = os.path.join(conf.get_opt("session_dir"), args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+ status_file_pre = status_file + ".pre"
+ mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+ logger=logger)
+
+ try:
+ with open(status_file) as f:
+ start = int(f.read().strip())
+ except (ValueError, OSError, IOError):
+ start = args.start
+
+ end = int(time.time()) - get_changelog_rollover_time(args.volume)
+ actual_end = changelog_crawl(args.brick, start, end, args)
+ with open(status_file_pre, "w", buffering=0) as f:
+ f.write(str(actual_end))
+
sys.exit(0)
diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py
index ceaf1173897..089a3aec3c5 100644
--- a/tools/glusterfind/src/main.py
+++ b/tools/glusterfind/src/main.py
@@ -19,7 +19,8 @@ import logging
import shutil
from utils import execute, is_host_local, mkdirp, fail
-from utils import setup_logger, human_time
+from utils import setup_logger, human_time, handle_rm_error
+from utils import get_changelog_rollover_time, cache_output
import conf
@@ -29,6 +30,7 @@ GlusterFS Incremental API
ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError
logger = logging.getLogger()
+node_outfiles = []
class StoreAbsPath(Action):
@@ -46,33 +48,13 @@ def get_pem_key_path(session, volume):
"%s_%s_secret.pem" % (session, volume))
-def node_run(volume, host, path, start, outfile, args, fallback=False):
+def node_cmd(host, host_uuid, task, cmd, args, opts):
"""
- If host is local node, execute the command locally. If not local
- execute the CHANGE_DETECTOR command via ssh and copy the output file from
- remote node using scp.
+ Runs command via ssh if host is not local
"""
- localdir = is_host_local(host)
- pem_key_path = get_pem_key_path(args.session, args.volume)
+ localdir = is_host_local(host_uuid)
- # If Full backup is requested or start time is zero, use brickfind
- change_detector = conf.get_change_detector(args.change_detector)
- if ((start == 0 or args.full) and args.change_detector == "changelog") or \
- fallback:
- change_detector = conf.get_change_detector("brickfind")
-
- # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug
- # --gfidpath <TYPE>
- cmd = [change_detector,
- args.session,
- volume,
- path,
- outfile,
- str(start),
- "--output-prefix",
- args.output_prefix] + \
- (["--debug"] if args.debug else []) + \
- (["--full"] if args.full else [])
+ pem_key_path = get_pem_key_path(args.session, args.volume)
if not localdir:
# prefix with ssh command if not local node
@@ -80,128 +62,112 @@ def node_run(volume, host, path, start, outfile, args, fallback=False):
"-i", pem_key_path,
"root@%s" % host] + cmd
- rc, out, err = execute(cmd, logger=logger)
- if rc == 2:
- # Partial History Fallback
- logger.info("%s %s Fallback to brickfind" % (host, err.strip()))
- # Exit only from process, handled in main.
- sys.exit(rc)
- elif rc != 0:
- fail("%s - Change detection failed" % host, logger=logger)
+ execute(cmd, exit_msg="%s - %s failed" % (host, task), logger=logger)
- if not localdir:
+ if opts.get("copy_outfile", False):
cmd_copy = ["scp",
"-i", pem_key_path,
- "root@%s:/%s" % (host, outfile),
- os.path.dirname(outfile)]
+ "root@%s:/%s" % (host, opts.get("node_outfile")),
+ os.path.dirname(opts.get("node_outfile"))]
execute(cmd_copy, exit_msg="%s - Copy command failed" % host,
logger=logger)
-def node_cleanup(host, args):
- localdir = is_host_local(host)
-
- pem_key_path = get_pem_key_path(args.session, args.volume)
-
- # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug
- # --gfidpath <TYPE>
- cmd = [conf.get_opt("nodecleanup"),
- args.session,
- args.volume] + (["--debug"] if args.debug else [])
-
- if not localdir:
- # prefix with ssh command if not local node
- cmd = ["ssh",
- "-i", pem_key_path,
- "root@%s" % host] + cmd
-
- execute(cmd, exit_msg="%s - Cleanup failed" % host, logger=logger)
-
-
-def cleanup(nodes, args):
+def run_cmd_nodes(task, args, **kwargs):
+ global node_outfiles
+ nodes = get_nodes(args.volume)
pool = []
for num, node in enumerate(nodes):
host, brick = node[1].split(":")
- # temp output file
+ host_uuid = node[0]
+ cmd = []
+ opts = {}
node_outfile = os.path.join(conf.get_opt("working_dir"),
- args.session,
- args.volume,
- "tmp_output_%s.txt" % num)
-
- try:
- os.remove(node_outfile)
- except (OSError, IOError):
- # TODO: Cleanup Failure, Handle
- pass
-
- p = Process(target=node_cleanup,
- args=(host, args))
- p.start()
- pool.append(p)
-
- exit_codes = 0
- for p in pool:
- p.join()
- exit_codes += (0 if p.exitcode == 0 else 1)
-
- if exit_codes != 0:
- sys.exit(1)
-
-
-def failback_node_run(brick_details, idx, volume, start, outfile, args):
- host, brick = brick_details.split(":")
- p = Process(target=node_run,
- args=(volume, host, brick, start, outfile, args, True))
- p.start()
- p.join()
- return p.exitcode
-
-
-def run_in_nodes(volume, start, args):
- """
- Get nodes of volume using gluster volume info, spawn a process
- each for a Node. Merge the output files once all the process
- complete their tasks.
- """
- nodes = get_nodes(volume)
- pool = []
- node_outfiles = []
- for num, node in enumerate(nodes):
- host, brick = node[1].split(":")
- # temp output file
- node_outfile = os.path.join(conf.get_opt("working_dir"),
- args.session,
- volume,
- "tmp_output_%s.txt" % num)
- node_outfiles.append(node_outfile)
- p = Process(target=node_run, args=(volume, host, brick, start,
- node_outfile, args))
- p.start()
- pool.append(p)
-
- exit_codes = 0
- for idx, p in enumerate(pool):
+ args.session, args.volume,
+ "tmp_output_%s" % num)
+
+ if task == "pre":
+ # If Full backup is requested or start time is zero, use brickfind
+ change_detector = conf.get_change_detector("changelog")
+ if args.full:
+ change_detector = conf.get_change_detector("brickfind")
+
+ node_outfiles.append(node_outfile)
+
+ cmd = [change_detector,
+ args.session,
+ args.volume,
+ brick,
+ node_outfile,
+ str(kwargs.get("start")),
+ "--output-prefix",
+ args.output_prefix] + \
+ (["--debug"] if args.debug else []) + \
+ (["--only-namespace-changes"] if args.only_namespace_changes
+ else [])
+
+ opts["node_outfile"] = node_outfile
+ opts["copy_outfile"] = True
+ elif task == "cleanup":
+ # After pre run, cleanup the working directory and other temp files
+ # Remove the copied node_outfile in main node
+ try:
+ os.remove(node_outfile)
+ except (OSError, IOError):
+ logger.warn("Failed to cleanup temporary file %s" %
+ node_outfile)
+ pass
+
+ cmd = [conf.get_opt("nodeagent"),
+ "cleanup",
+ args.session,
+ args.volume] + (["--debug"] if args.debug else [])
+ elif task == "create":
+ # When glusterfind create, create session directory in
+ # each brick nodes
+ cmd = [conf.get_opt("nodeagent"),
+ "create",
+ args.session,
+ args.volume,
+ brick,
+ kwargs.get("time_to_update")] + \
+ (["--debug"] if args.debug else []) + \
+ (["--reset-session-time"] if args.reset_session_time
+ else [])
+ elif task == "post":
+ # Rename pre status file to actual status file in each node
+ cmd = [conf.get_opt("nodeagent"),
+ "post",
+ args.session,
+ args.volume,
+ brick] + \
+ (["--debug"] if args.debug else [])
+ elif task == "delete":
+ # When glusterfind delete, cleanup all the session files/dirs
+ # from each node.
+ cmd = [conf.get_opt("nodeagent"),
+ "delete",
+ args.session,
+ args.volume] + \
+ (["--debug"] if args.debug else [])
+
+ if cmd:
+ p = Process(target=node_cmd,
+ args=(host, host_uuid, task, cmd, args, opts))
+ p.start()
+ pool.append(p)
+
+ for num, p in enumerate(pool):
p.join()
- # Handle the Changelog failure, fallback to Brickfind
- if p.exitcode == 2:
- rc = failback_node_run(nodes[idx][1], idx, volume, start,
- node_outfiles[idx], args)
- exit_codes += (0 if rc == 0 else 1)
- elif p.exitcode != 0:
- exit_codes += (0 if p.exitcode == 0 else 1)
-
- if exit_codes != 0:
- sys.exit(1)
-
- # Merge all output files
- cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile]
- execute(cmd,
- exit_msg="Failed to merge output files "
- "collected from nodes", logger=logger)
-
- cleanup(nodes, args)
+ if p.exitcode != 0:
+ logger.warn("Command %s failed in %s" % (task, nodes[num][1]))
+ if task in ["create", "delete"]:
+ fail("Command %s failed in %s" % (task, nodes[num][1]))
+ elif task == "pre" and args.disable_partial:
+ sys.exit(1)
+@cache_output
def get_nodes(volume):
"""
Get the gluster volume info xml output and parse to get
@@ -237,6 +203,9 @@ def _get_args():
parser_create.add_argument("--debug", help="Debug", action="store_true")
parser_create.add_argument("--force", help="Force option to recreate "
"the session", action="store_true")
+ parser_create.add_argument("--reset-session-time",
+ help="Reset Session Time to Current Time",
+ action="store_true")
# delete <SESSION> <VOLUME> [--debug]
parser_delete = subparsers.add_parser('delete')
@@ -250,7 +219,7 @@ def _get_args():
parser_list.add_argument("--volume", help="Volume Name", default="")
parser_list.add_argument("--debug", help="Debug", action="store_true")
- # pre <SESSION> <VOLUME> <OUTFILE> [--change-detector <CHANGE_DETECTOR>]
+ # pre <SESSION> <VOLUME> <OUTFILE>
# [--output-prefix <OUTPUT_PREFIX>] [--full]
parser_pre = subparsers.add_parser('pre')
parser_pre.add_argument("session", help="Session Name")
@@ -258,10 +227,8 @@ def _get_args():
parser_pre.add_argument("outfile", help="Output File", action=StoreAbsPath)
parser_pre.add_argument("--debug", help="Debug", action="store_true")
parser_pre.add_argument("--full", help="Full find", action="store_true")
- parser_pre.add_argument("--change-detector", dest="change_detector",
- help="Change detection",
- choices=conf.list_change_detectors(),
- type=str, default='changelog')
+ parser_pre.add_argument("--disable-partial", help="Disable Partial find, "
+ "Fail when one node fails", action="store_true")
parser_pre.add_argument("--output-prefix", help="File prefix in output",
default=".")
parser_pre.add_argument("--regenerate-outfile",
@@ -363,12 +330,15 @@ def mode_create(session_dir, args):
logger.info("Volume option set %s, changelog.changelog on"
% args.volume)
- if not os.path.exists(status_file):
+ # Add Rollover time to current time to make sure changelogs
+ # will be available if we use this time as start time
+ time_to_update = int(time.time()) + get_changelog_rollover_time(
+ args.volume)
+
+ run_cmd_nodes("create", args, time_to_update=str(time_to_update))
+
+ if not os.path.exists(status_file) or args.reset_session_time:
with open(status_file, "w", buffering=0) as f:
- # Add Rollover time to current time to make sure changelogs
- # will be available if we use this time as start time
- time_to_update = int(time.time()) + int(
- conf.get_opt("changelog_rollover_time"))
f.write(str(time_to_update))
sys.exit(0)
@@ -378,8 +348,8 @@ def mode_pre(session_dir, args):
"""
Read from Session file and write to session.pre file
"""
- endtime_to_update = int(time.time()) - int(
- conf.get_opt("changelog_rollover_time"))
+ endtime_to_update = int(time.time()) - get_changelog_rollover_time(
+ args.volume)
status_file = os.path.join(session_dir, args.volume, "status")
status_file_pre = status_file + ".pre"
@@ -404,7 +374,15 @@ def mode_pre(session_dir, args):
"Start time: %s, End time: %s"
% (args.session, args.volume, start, endtime_to_update))
- run_in_nodes(args.volume, start, args)
+ run_cmd_nodes("pre", args, start=start)
+
+ # Merger
+ cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile]
+ execute(cmd,
+ exit_msg="Failed to merge output files "
+ "collected from nodes", logger=logger)
+
+ run_cmd_nodes("cleanup", args)
with open(status_file_pre, "w", buffering=0) as f:
f.write(str(endtime_to_update))
@@ -423,6 +401,7 @@ def mode_post(session_dir, args):
status_file_pre = status_file + ".pre"
if os.path.exists(status_file_pre):
+ run_cmd_nodes("post", args)
os.rename(status_file_pre, status_file)
sys.exit(0)
else:
@@ -430,12 +409,7 @@ def mode_post(session_dir, args):
def mode_delete(session_dir, args):
- def handle_rm_error(func, path, exc_info):
- if exc_info[1].errno == ENOENT:
- return
-
- raise exc_info[1]
-
+ run_cmd_nodes("delete", args)
shutil.rmtree(os.path.join(session_dir, args.volume),
onerror=handle_rm_error)
diff --git a/tools/glusterfind/src/nodeagent.py b/tools/glusterfind/src/nodeagent.py
new file mode 100644
index 00000000000..2e8c2fc9759
--- /dev/null
+++ b/tools/glusterfind/src/nodeagent.py
@@ -0,0 +1,123 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import shutil
+import sys
+import os
+import logging
+from argparse import ArgumentParser, RawDescriptionHelpFormatter
+import urllib
+
+from utils import setup_logger, mkdirp, handle_rm_error
+import conf
+
+logger = logging.getLogger()
+
+
+def mode_cleanup(args):
+ working_dir = os.path.join(conf.get_opt("working_dir"),
+ args.session,
+ args.volume)
+
+ mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume),
+ exit_on_err=True)
+ log_file = os.path.join(conf.get_opt("log_dir"),
+ args.session,
+ args.volume,
+ "changelog.log")
+
+ setup_logger(logger, log_file)
+
+ try:
+ shutil.rmtree(working_dir, onerror=handle_rm_error)
+ except (OSError, IOError) as e:
+ logger.error("Failed to delete working directory: %s" % e)
+ sys.exit(1)
+
+
+def mode_create(args):
+ session_dir = os.path.join(conf.get_opt("session_dir"),
+ args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+
+ mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+ logger=logger)
+
+ if not os.path.exists(status_file) or args.reset_session_time:
+ with open(status_file, "w", buffering=0) as f:
+ f.write(args.time_to_update)
+
+ sys.exit(0)
+
+
+def mode_post(args):
+ session_dir = os.path.join(conf.get_opt("session_dir"), args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+
+ mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+ logger=logger)
+ status_file_pre = status_file + ".pre"
+
+ if os.path.exists(status_file_pre):
+ os.rename(status_file_pre, status_file)
+ sys.exit(0)
+
+
+def mode_delete(args):
+ session_dir = os.path.join(conf.get_opt("session_dir"),
+ args.session)
+ shutil.rmtree(os.path.join(session_dir, args.volume),
+ onerror=handle_rm_error)
+
+
+def _get_args():
+ parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
+ description="Node Agent")
+ subparsers = parser.add_subparsers(dest="mode")
+
+ parser_cleanup = subparsers.add_parser('cleanup')
+ parser_cleanup.add_argument("session", help="Session Name")
+ parser_cleanup.add_argument("volume", help="Volume Name")
+ parser_cleanup.add_argument("--debug", help="Debug", action="store_true")
+
+ parser_session_create = subparsers.add_parser('create')
+ parser_session_create.add_argument("session", help="Session Name")
+ parser_session_create.add_argument("volume", help="Volume Name")
+ parser_session_create.add_argument("brick", help="Brick Path")
+ parser_session_create.add_argument("time_to_update", help="Time to Update")
+ parser_session_create.add_argument("--reset-session-time",
+ help="Reset Session Time",
+ action="store_true")
+ parser_session_create.add_argument("--debug", help="Debug",
+ action="store_true")
+
+ parser_post = subparsers.add_parser('post')
+ parser_post.add_argument("session", help="Session Name")
+ parser_post.add_argument("volume", help="Volume Name")
+ parser_post.add_argument("brick", help="Brick Path")
+ parser_post.add_argument("--debug", help="Debug",
+ action="store_true")
+
+ parser_delete = subparsers.add_parser('delete')
+ parser_delete.add_argument("session", help="Session Name")
+ parser_delete.add_argument("volume", help="Volume Name")
+ parser_delete.add_argument("--debug", help="Debug",
+ action="store_true")
+ return parser.parse_args()
+
+
+if __name__ == "__main__":
+ args = _get_args()
+
+ # globals() will have all the functions already defined.
+ # mode_<args.mode> will be the function name to be called
+ globals()["mode_" + args.mode](args)
diff --git a/tools/glusterfind/src/nodecleanup.py b/tools/glusterfind/src/nodecleanup.py
deleted file mode 100644
index a31d4d83acd..00000000000
--- a/tools/glusterfind/src/nodecleanup.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
-# This file is part of GlusterFS.
-#
-# This file is licensed to you under your choice of the GNU Lesser
-# General Public License, version 3 or any later version (LGPLv3 or
-# later), or the GNU General Public License, version 2 (GPLv2), in all
-# cases as published by the Free Software Foundation.
-
-import shutil
-import sys
-import os
-import logging
-from errno import ENOENT
-
-from utils import setup_logger, mkdirp
-import conf
-
-logger = logging.getLogger()
-
-
-if __name__ == "__main__":
- # Args: <SESSION> <VOLUME>
- session = sys.argv[1]
- volume = sys.argv[2]
-
- working_dir = os.path.join(conf.get_opt("working_dir"),
- session,
- volume)
-
- mkdirp(os.path.join(conf.get_opt("log_dir"), session, volume),
- exit_on_err=True)
- log_file = os.path.join(conf.get_opt("log_dir"),
- session,
- volume,
- "changelog.log")
-
- setup_logger(logger, log_file)
-
- try:
- def handle_rm_error(func, path, exc_info):
- if exc_info[1].errno == ENOENT:
- return
-
- raise exc_info[1]
-
- shutil.rmtree(working_dir, onerror=handle_rm_error)
- except (OSError, IOError) as e:
- logger.error("Failed to delete working directory: %s" % e)
- sys.exit(1)
diff --git a/tools/glusterfind/src/tool.conf.in b/tools/glusterfind/src/tool.conf.in
index 54230cb4dca..a80f4a784c0 100644
--- a/tools/glusterfind/src/tool.conf.in
+++ b/tools/glusterfind/src/tool.conf.in
@@ -2,8 +2,7 @@
session_dir=@GLUSTERD_WORKDIR@/glusterfind/
working_dir=@GLUSTERFSD_MISCDIR@/glusterfind/
log_dir=/var/log/glusterfs/glusterfind/
-nodecleanup=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodecleanup.py
-changelog_rollover_time=15
+nodeagent=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodeagent.py
brick_ignore_dirs=.glusterfs,.trashcan
[change_detectors]
diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py
index de9c027e299..aea9a9dc82d 100644
--- a/tools/glusterfind/src/utils.py
+++ b/tools/glusterfind/src/utils.py
@@ -9,14 +9,36 @@
# cases as published by the Free Software Foundation.
import sys
-import socket
from subprocess import PIPE, Popen
-from errno import EPERM, EEXIST
+from errno import EEXIST, ENOENT
+import xml.etree.cElementTree as etree
import logging
import os
from datetime import datetime
+import urllib
ROOT_GFID = "00000000-0000-0000-0000-000000000001"
+DEFAULT_CHANGELOG_INTERVAL = 15
+
+ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError
+cache_data = {}
+
+
+def cache_output(func):
+ def wrapper(*args, **kwargs):
+ global cache_data
+ if cache_data.get(func.func_name, None) is None:
+ cache_data[func.func_name] = func(*args, **kwargs)
+
+ return cache_data[func.func_name]
+ return wrapper
+
+
+def handle_rm_error(func, path, exc_info):
+ if exc_info[1].errno == ENOENT:
+ return
+
+ raise exc_info[1]
def find(path, callback_func=lambda x: True, filter_func=lambda x: True,
@@ -41,12 +63,16 @@ def find(path, callback_func=lambda x: True, filter_func=lambda x: True,
callback_func(full_path)
-def output_write(f, path, prefix="."):
+def output_write(f, path, prefix=".", encode=False):
if path == "":
return
if prefix != ".":
path = os.path.join(prefix, path)
+
+ if encode:
+ path = urllib.quote_plus(path)
+
f.write("%s\n" % path)
@@ -153,51 +179,41 @@ def symlink_gfid_to_path(brick, gfid):
return out_path
-def is_host_local(host):
- """
- Find if a host is local or not.
- Code copied from $GLUSTERFS/geo-replication/syncdaemon/syncdutils.py
- """
- locaddr = False
- for ai in socket.getaddrinfo(host, None):
- # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators
- # /mgmt/glusterd/src/glusterd-utils.c#L125
- if ai[0] == socket.AF_INET:
- if ai[-1][0].split(".")[0] == "127":
- locaddr = True
- break
- elif ai[0] == socket.AF_INET6:
- if ai[-1][0] == "::1":
- locaddr = True
- break
- else:
- continue
- try:
- # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue,
- # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587
- s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP)
- except socket.error:
- ex = sys.exc_info()[1]
- if ex.errno != EPERM:
- raise
- f = None
- try:
- f = open("/proc/sys/net/ipv4/ip_nonlocal_bind")
- if int(f.read()) != 0:
- logger.warning("non-local bind is set and not "
- "allowed to create "
- "raw sockets, cannot determine "
- "if %s is local" % host)
- return False
- s = socket.socket(ai[0], socket.SOCK_DGRAM)
- finally:
- if f:
- f.close()
- try:
- s.bind(ai[-1])
- locaddr = True
- break
- except:
- pass
- s.close()
- return locaddr
+@cache_output
+def get_my_uuid():
+ cmd = ["gluster", "system::", "uuid", "get", "--xml"]
+ rc, out, err = execute(cmd)
+
+ if rc != 0:
+ return None
+
+ tree = etree.fromstring(out)
+ uuid_el = tree.find("uuidGenerate/uuid")
+ return uuid_el.text
+
+
+def is_host_local(host_uuid):
+ # Get UUID only if it is not done previously
+ # else Cache the UUID value
+ my_uuid = get_my_uuid()
+ if my_uuid == host_uuid:
+ return True
+
+ return False
+
+
+def get_changelog_rollover_time(volumename):
+ cmd = ["gluster", "volume", "get", volumename,
+ "changelog.rollover-time", "--xml"]
+ rc, out, err = execute(cmd)
+
+ if rc != 0:
+ return DEFAULT_CHANGELOG_INTERVAL
+
+ try:
+ tree = etree.fromstring(out)
+ return int(tree.find('volGetopts/Value').text)
+ except ParseError:
+ return DEFAULT_CHANGELOG_INTERVAL
+
+