summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2015-04-20 14:03:58 +0530
committerVijay Bellur <vbellur@redhat.com>2015-05-08 21:19:06 -0700
commite561935d0153f00f2ddacde093c12284affe9538 (patch)
tree27c031039b27d5bf31c3e40d6366cd4b2f5d1b07
parentbbff9e1ef72e2eab63e5d7ecd5dfa36497b642ed (diff)
tools/glusterfind: Partial Find
Enabled by default, if one node fails Glusterfind will not fail to return list of files from other nodes. This behavior can be changed using --disable-partial Now session is maintained in each nodes as well as in initiator node. Every pre command will pick the status file from respective node and start collecting list of changes happened after the status time. --reset-session-time, new option to force reset the session time. Next incremental run will start from this time. Change-detector argument is removed since Changelog mode is required to detect deletes and Renames. Change-Id: I0b83bc7c0e1b30b13de772b2d21fe968db4ff964 Signed-off-by: Aravinda VK <avishwan@redhat.com> BUG: 1201289 Reviewed-on: http://review.gluster.org/10320 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Kotresh HR <khiremat@redhat.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
-rw-r--r--tools/glusterfind/src/Makefile.am4
-rw-r--r--tools/glusterfind/src/brickfind.py39
-rw-r--r--tools/glusterfind/src/changelog.py40
-rw-r--r--tools/glusterfind/src/main.py278
-rw-r--r--tools/glusterfind/src/nodeagent.py123
-rw-r--r--tools/glusterfind/src/nodecleanup.py51
-rw-r--r--tools/glusterfind/src/tool.conf.in3
-rw-r--r--tools/glusterfind/src/utils.py118
8 files changed, 367 insertions, 289 deletions
diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am
index 458b820fd19..7b819828d97 100644
--- a/tools/glusterfind/src/Makefile.am
+++ b/tools/glusterfind/src/Makefile.am
@@ -3,12 +3,12 @@ glusterfinddir = $(libexecdir)/glusterfs/glusterfind
glusterfind_PYTHON = conf.py utils.py __init__.py \
main.py libgfchangelog.py
-glusterfind_SCRIPTS = changelog.py nodecleanup.py \
+glusterfind_SCRIPTS = changelog.py nodeagent.py \
brickfind.py
glusterfind_DATA = tool.conf
-EXTRA_DIST = changelog.py nodecleanup.py brickfind.py \
+EXTRA_DIST = changelog.py nodeagent.py brickfind.py \
tool.conf
CLEANFILES =
diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py
index 1090f408e28..9758bef56ff 100644
--- a/tools/glusterfind/src/brickfind.py
+++ b/tools/glusterfind/src/brickfind.py
@@ -12,7 +12,8 @@ import os
import sys
import logging
from argparse import ArgumentParser, RawDescriptionHelpFormatter
-from errno import ENOENT
+import urllib
+import time
from utils import mkdirp, setup_logger, create_file, output_write, find
import conf
@@ -36,36 +37,17 @@ def brickfind_crawl(brick, args):
with open(args.outfile, "a+") as fout:
brick_path_len = len(brick)
- def mtime_filter(path):
- try:
- st = os.lstat(path)
- except (OSError, IOError) as e:
- if e.errno == ENOENT:
- st = None
- else:
- raise
-
- if st and (st.st_mtime > args.start or st.st_ctime > args.start):
- return True
-
- return False
-
def output_callback(path):
path = path.strip()
path = path[brick_path_len+1:]
- output_write(fout, path, args.output_prefix)
+ output_write(fout, path, args.output_prefix, encode=True)
ignore_dirs = [os.path.join(brick, dirname)
for dirname in
conf.get_opt("brick_ignore_dirs").split(",")]
- if args.full:
- find(brick, callback_func=output_callback,
- ignore_dirs=ignore_dirs)
- else:
- find(brick, callback_func=output_callback,
- filter_func=mtime_filter,
- ignore_dirs=ignore_dirs)
+ find(brick, callback_func=output_callback,
+ ignore_dirs=ignore_dirs)
fout.flush()
os.fsync(fout.fileno())
@@ -81,7 +63,6 @@ def _get_args():
parser.add_argument("outfile", help="Output File")
parser.add_argument("start", help="Start Time", type=float)
parser.add_argument("--debug", help="Debug", action="store_true")
- parser.add_argument("--full", help="Full Find", action="store_true")
parser.add_argument("--output-prefix", help="File prefix in output",
default=".")
@@ -90,6 +71,12 @@ def _get_args():
if __name__ == "__main__":
args = _get_args()
+ session_dir = os.path.join(conf.get_opt("session_dir"), args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+ status_file_pre = status_file + ".pre"
+ mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+ logger=logger)
mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume),
exit_on_err=True)
log_file = os.path.join(conf.get_opt("log_dir"),
@@ -97,5 +84,9 @@ if __name__ == "__main__":
args.volume,
"brickfind.log")
setup_logger(logger, log_file, args.debug)
+
+ time_to_update = int(time.time())
brickfind_crawl(args.brick, args)
+ with open(status_file_pre, "w", buffering=0) as f:
+ f.write(str(time_to_update))
sys.exit(0)
diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py
index eb73635fb32..2c4ee9106e1 100644
--- a/tools/glusterfind/src/changelog.py
+++ b/tools/glusterfind/src/changelog.py
@@ -16,10 +16,12 @@ from errno import ENOENT
import logging
from argparse import ArgumentParser, RawDescriptionHelpFormatter
import hashlib
+import urllib
import libgfchangelog
from utils import create_file, mkdirp, execute, symlink_gfid_to_path
from utils import fail, setup_logger, output_write, find
+from utils import get_changelog_rollover_time
import conf
@@ -190,7 +192,7 @@ def sort_unique(filename):
exit_msg="Sort failed", logger=logger)
-def get_changes(brick, hash_dir, log_file, end, args):
+def get_changes(brick, hash_dir, log_file, start, end, args):
"""
Makes use of libgfchangelog's history API to get changelogs
containing changes from start and end time. Further collects
@@ -216,7 +218,7 @@ def get_changes(brick, hash_dir, log_file, end, args):
# Fail if History fails for requested Start and End
try:
actual_end = libgfchangelog.cl_history_changelog(
- cl_path, args.start, end, CHANGELOGAPI_NUM_WORKERS)
+ cl_path, start, end, CHANGELOGAPI_NUM_WORKERS)
except libgfchangelog.ChangelogException as e:
fail("%s Historical Changelogs not available: %s" % (brick, e),
logger=logger)
@@ -235,6 +237,11 @@ def get_changes(brick, hash_dir, log_file, end, args):
if changes:
with open(gfid_list_path, 'a+') as fgfid:
for change in changes:
+ # Ignore if last processed changelog comes
+ # again in list
+ if change.endswith(".%s" % start):
+ continue
+
with open(change) as f:
for line in f:
# Space delimited list, collect GFID
@@ -259,8 +266,10 @@ def get_changes(brick, hash_dir, log_file, end, args):
args.outfile, gfid_list_failures_file)
gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile)
+ return actual_end
-def changelog_crawl(brick, end, args):
+
+def changelog_crawl(brick, start, end, args):
"""
Init function, prepares working dir and calls Changelog query
"""
@@ -283,8 +292,8 @@ def changelog_crawl(brick, end, args):
"changelog.%s.log" % brickhash)
logger.info("%s Started Changelog Crawl. Start: %s, End: %s"
- % (brick, args.start, end))
- get_changes(brick, working_dir, log_file, end, args)
+ % (brick, start, end))
+ return get_changes(brick, working_dir, log_file, start, end, args)
def _get_args():
@@ -312,6 +321,23 @@ if __name__ == "__main__":
args.volume,
"changelog.log")
setup_logger(logger, log_file, args.debug)
- end = int(time.time()) - int(conf.get_opt("changelog_rollover_time"))
- changelog_crawl(args.brick, end, args)
+
+ session_dir = os.path.join(conf.get_opt("session_dir"), args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+ status_file_pre = status_file + ".pre"
+ mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+ logger=logger)
+
+ try:
+ with open(status_file) as f:
+ start = int(f.read().strip())
+ except (ValueError, OSError, IOError):
+ start = args.start
+
+ end = int(time.time()) - get_changelog_rollover_time(args.volume)
+ actual_end = changelog_crawl(args.brick, start, end, args)
+ with open(status_file_pre, "w", buffering=0) as f:
+ f.write(str(actual_end))
+
sys.exit(0)
diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py
index ceaf1173897..089a3aec3c5 100644
--- a/tools/glusterfind/src/main.py
+++ b/tools/glusterfind/src/main.py
@@ -19,7 +19,8 @@ import logging
import shutil
from utils import execute, is_host_local, mkdirp, fail
-from utils import setup_logger, human_time
+from utils import setup_logger, human_time, handle_rm_error
+from utils import get_changelog_rollover_time, cache_output
import conf
@@ -29,6 +30,7 @@ GlusterFS Incremental API
ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError
logger = logging.getLogger()
+node_outfiles = []
class StoreAbsPath(Action):
@@ -46,33 +48,13 @@ def get_pem_key_path(session, volume):
"%s_%s_secret.pem" % (session, volume))
-def node_run(volume, host, path, start, outfile, args, fallback=False):
+def node_cmd(host, host_uuid, task, cmd, args, opts):
"""
- If host is local node, execute the command locally. If not local
- execute the CHANGE_DETECTOR command via ssh and copy the output file from
- remote node using scp.
+ Runs command via ssh if host is not local
"""
- localdir = is_host_local(host)
- pem_key_path = get_pem_key_path(args.session, args.volume)
+ localdir = is_host_local(host_uuid)
- # If Full backup is requested or start time is zero, use brickfind
- change_detector = conf.get_change_detector(args.change_detector)
- if ((start == 0 or args.full) and args.change_detector == "changelog") or \
- fallback:
- change_detector = conf.get_change_detector("brickfind")
-
- # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug
- # --gfidpath <TYPE>
- cmd = [change_detector,
- args.session,
- volume,
- path,
- outfile,
- str(start),
- "--output-prefix",
- args.output_prefix] + \
- (["--debug"] if args.debug else []) + \
- (["--full"] if args.full else [])
+ pem_key_path = get_pem_key_path(args.session, args.volume)
if not localdir:
# prefix with ssh command if not local node
@@ -80,128 +62,112 @@ def node_run(volume, host, path, start, outfile, args, fallback=False):
"-i", pem_key_path,
"root@%s" % host] + cmd
- rc, out, err = execute(cmd, logger=logger)
- if rc == 2:
- # Partial History Fallback
- logger.info("%s %s Fallback to brickfind" % (host, err.strip()))
- # Exit only from process, handled in main.
- sys.exit(rc)
- elif rc != 0:
- fail("%s - Change detection failed" % host, logger=logger)
+ execute(cmd, exit_msg="%s - %s failed" % (host, task), logger=logger)
- if not localdir:
+ if opts.get("copy_outfile", False):
cmd_copy = ["scp",
"-i", pem_key_path,
- "root@%s:/%s" % (host, outfile),
- os.path.dirname(outfile)]
+ "root@%s:/%s" % (host, opts.get("node_outfile")),
+ os.path.dirname(opts.get("node_outfile"))]
execute(cmd_copy, exit_msg="%s - Copy command failed" % host,
logger=logger)
-def node_cleanup(host, args):
- localdir = is_host_local(host)
-
- pem_key_path = get_pem_key_path(args.session, args.volume)
-
- # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug
- # --gfidpath <TYPE>
- cmd = [conf.get_opt("nodecleanup"),
- args.session,
- args.volume] + (["--debug"] if args.debug else [])
-
- if not localdir:
- # prefix with ssh command if not local node
- cmd = ["ssh",
- "-i", pem_key_path,
- "root@%s" % host] + cmd
-
- execute(cmd, exit_msg="%s - Cleanup failed" % host, logger=logger)
-
-
-def cleanup(nodes, args):
+def run_cmd_nodes(task, args, **kwargs):
+ global node_outfiles
+ nodes = get_nodes(args.volume)
pool = []
for num, node in enumerate(nodes):
host, brick = node[1].split(":")
- # temp output file
+ host_uuid = node[0]
+ cmd = []
+ opts = {}
node_outfile = os.path.join(conf.get_opt("working_dir"),
- args.session,
- args.volume,
- "tmp_output_%s.txt" % num)
-
- try:
- os.remove(node_outfile)
- except (OSError, IOError):
- # TODO: Cleanup Failure, Handle
- pass
-
- p = Process(target=node_cleanup,
- args=(host, args))
- p.start()
- pool.append(p)
-
- exit_codes = 0
- for p in pool:
- p.join()
- exit_codes += (0 if p.exitcode == 0 else 1)
-
- if exit_codes != 0:
- sys.exit(1)
-
-
-def failback_node_run(brick_details, idx, volume, start, outfile, args):
- host, brick = brick_details.split(":")
- p = Process(target=node_run,
- args=(volume, host, brick, start, outfile, args, True))
- p.start()
- p.join()
- return p.exitcode
-
-
-def run_in_nodes(volume, start, args):
- """
- Get nodes of volume using gluster volume info, spawn a process
- each for a Node. Merge the output files once all the process
- complete their tasks.
- """
- nodes = get_nodes(volume)
- pool = []
- node_outfiles = []
- for num, node in enumerate(nodes):
- host, brick = node[1].split(":")
- # temp output file
- node_outfile = os.path.join(conf.get_opt("working_dir"),
- args.session,
- volume,
- "tmp_output_%s.txt" % num)
- node_outfiles.append(node_outfile)
- p = Process(target=node_run, args=(volume, host, brick, start,
- node_outfile, args))
- p.start()
- pool.append(p)
-
- exit_codes = 0
- for idx, p in enumerate(pool):
+ args.session, args.volume,
+ "tmp_output_%s" % num)
+
+ if task == "pre":
+ # If Full backup is requested or start time is zero, use brickfind
+ change_detector = conf.get_change_detector("changelog")
+ if args.full:
+ change_detector = conf.get_change_detector("brickfind")
+
+ node_outfiles.append(node_outfile)
+
+ cmd = [change_detector,
+ args.session,
+ args.volume,
+ brick,
+ node_outfile,
+ str(kwargs.get("start")),
+ "--output-prefix",
+ args.output_prefix] + \
+ (["--debug"] if args.debug else []) + \
+ (["--only-namespace-changes"] if args.only_namespace_changes
+ else [])
+
+ opts["node_outfile"] = node_outfile
+ opts["copy_outfile"] = True
+ elif task == "cleanup":
+ # After pre run, cleanup the working directory and other temp files
+ # Remove the copied node_outfile in main node
+ try:
+ os.remove(node_outfile)
+ except (OSError, IOError):
+ logger.warn("Failed to cleanup temporary file %s" %
+ node_outfile)
+ pass
+
+ cmd = [conf.get_opt("nodeagent"),
+ "cleanup",
+ args.session,
+ args.volume] + (["--debug"] if args.debug else [])
+ elif task == "create":
+ # When glusterfind create, create session directory in
+ # each brick nodes
+ cmd = [conf.get_opt("nodeagent"),
+ "create",
+ args.session,
+ args.volume,
+ brick,
+ kwargs.get("time_to_update")] + \
+ (["--debug"] if args.debug else []) + \
+ (["--reset-session-time"] if args.reset_session_time
+ else [])
+ elif task == "post":
+ # Rename pre status file to actual status file in each node
+ cmd = [conf.get_opt("nodeagent"),
+ "post",
+ args.session,
+ args.volume,
+ brick] + \
+ (["--debug"] if args.debug else [])
+ elif task == "delete":
+ # When glusterfind delete, cleanup all the session files/dirs
+ # from each node.
+ cmd = [conf.get_opt("nodeagent"),
+ "delete",
+ args.session,
+ args.volume] + \
+ (["--debug"] if args.debug else [])
+
+ if cmd:
+ p = Process(target=node_cmd,
+ args=(host, host_uuid, task, cmd, args, opts))
+ p.start()
+ pool.append(p)
+
+ for num, p in enumerate(pool):
p.join()
- # Handle the Changelog failure, fallback to Brickfind
- if p.exitcode == 2:
- rc = failback_node_run(nodes[idx][1], idx, volume, start,
- node_outfiles[idx], args)
- exit_codes += (0 if rc == 0 else 1)
- elif p.exitcode != 0:
- exit_codes += (0 if p.exitcode == 0 else 1)
-
- if exit_codes != 0:
- sys.exit(1)
-
- # Merge all output files
- cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile]
- execute(cmd,
- exit_msg="Failed to merge output files "
- "collected from nodes", logger=logger)
-
- cleanup(nodes, args)
+ if p.exitcode != 0:
+ logger.warn("Command %s failed in %s" % (task, nodes[num][1]))
+ if task in ["create", "delete"]:
+ fail("Command %s failed in %s" % (task, nodes[num][1]))
+ elif task == "pre" and args.disable_partial:
+ sys.exit(1)
+@cache_output
def get_nodes(volume):
"""
Get the gluster volume info xml output and parse to get
@@ -237,6 +203,9 @@ def _get_args():
parser_create.add_argument("--debug", help="Debug", action="store_true")
parser_create.add_argument("--force", help="Force option to recreate "
"the session", action="store_true")
+ parser_create.add_argument("--reset-session-time",
+ help="Reset Session Time to Current Time",
+ action="store_true")
# delete <SESSION> <VOLUME> [--debug]
parser_delete = subparsers.add_parser('delete')
@@ -250,7 +219,7 @@ def _get_args():
parser_list.add_argument("--volume", help="Volume Name", default="")
parser_list.add_argument("--debug", help="Debug", action="store_true")
- # pre <SESSION> <VOLUME> <OUTFILE> [--change-detector <CHANGE_DETECTOR>]
+ # pre <SESSION> <VOLUME> <OUTFILE>
# [--output-prefix <OUTPUT_PREFIX>] [--full]
parser_pre = subparsers.add_parser('pre')
parser_pre.add_argument("session", help="Session Name")
@@ -258,10 +227,8 @@ def _get_args():
parser_pre.add_argument("outfile", help="Output File", action=StoreAbsPath)
parser_pre.add_argument("--debug", help="Debug", action="store_true")
parser_pre.add_argument("--full", help="Full find", action="store_true")
- parser_pre.add_argument("--change-detector", dest="change_detector",
- help="Change detection",
- choices=conf.list_change_detectors(),
- type=str, default='changelog')
+ parser_pre.add_argument("--disable-partial", help="Disable Partial find, "
+ "Fail when one node fails", action="store_true")
parser_pre.add_argument("--output-prefix", help="File prefix in output",
default=".")
parser_pre.add_argument("--regenerate-outfile",
@@ -363,12 +330,15 @@ def mode_create(session_dir, args):
logger.info("Volume option set %s, changelog.changelog on"
% args.volume)
- if not os.path.exists(status_file):
+ # Add Rollover time to current time to make sure changelogs
+ # will be available if we use this time as start time
+ time_to_update = int(time.time()) + get_changelog_rollover_time(
+ args.volume)
+
+ run_cmd_nodes("create", args, time_to_update=str(time_to_update))
+
+ if not os.path.exists(status_file) or args.reset_session_time:
with open(status_file, "w", buffering=0) as f:
- # Add Rollover time to current time to make sure changelogs
- # will be available if we use this time as start time
- time_to_update = int(time.time()) + int(
- conf.get_opt("changelog_rollover_time"))
f.write(str(time_to_update))
sys.exit(0)
@@ -378,8 +348,8 @@ def mode_pre(session_dir, args):
"""
Read from Session file and write to session.pre file
"""
- endtime_to_update = int(time.time()) - int(
- conf.get_opt("changelog_rollover_time"))
+ endtime_to_update = int(time.time()) - get_changelog_rollover_time(
+ args.volume)
status_file = os.path.join(session_dir, args.volume, "status")
status_file_pre = status_file + ".pre"
@@ -404,7 +374,15 @@ def mode_pre(session_dir, args):
"Start time: %s, End time: %s"
% (args.session, args.volume, start, endtime_to_update))
- run_in_nodes(args.volume, start, args)
+ run_cmd_nodes("pre", args, start=start)
+
+ # Merger
+ cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile]
+ execute(cmd,
+ exit_msg="Failed to merge output files "
+ "collected from nodes", logger=logger)
+
+ run_cmd_nodes("cleanup", args)
with open(status_file_pre, "w", buffering=0) as f:
f.write(str(endtime_to_update))
@@ -423,6 +401,7 @@ def mode_post(session_dir, args):
status_file_pre = status_file + ".pre"
if os.path.exists(status_file_pre):
+ run_cmd_nodes("post", args)
os.rename(status_file_pre, status_file)
sys.exit(0)
else:
@@ -430,12 +409,7 @@ def mode_post(session_dir, args):
def mode_delete(session_dir, args):
- def handle_rm_error(func, path, exc_info):
- if exc_info[1].errno == ENOENT:
- return
-
- raise exc_info[1]
-
+ run_cmd_nodes("delete", args)
shutil.rmtree(os.path.join(session_dir, args.volume),
onerror=handle_rm_error)
diff --git a/tools/glusterfind/src/nodeagent.py b/tools/glusterfind/src/nodeagent.py
new file mode 100644
index 00000000000..2e8c2fc9759
--- /dev/null
+++ b/tools/glusterfind/src/nodeagent.py
@@ -0,0 +1,123 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import shutil
+import sys
+import os
+import logging
+from argparse import ArgumentParser, RawDescriptionHelpFormatter
+import urllib
+
+from utils import setup_logger, mkdirp, handle_rm_error
+import conf
+
+logger = logging.getLogger()
+
+
+def mode_cleanup(args):
+ working_dir = os.path.join(conf.get_opt("working_dir"),
+ args.session,
+ args.volume)
+
+ mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume),
+ exit_on_err=True)
+ log_file = os.path.join(conf.get_opt("log_dir"),
+ args.session,
+ args.volume,
+ "changelog.log")
+
+ setup_logger(logger, log_file)
+
+ try:
+ shutil.rmtree(working_dir, onerror=handle_rm_error)
+ except (OSError, IOError) as e:
+ logger.error("Failed to delete working directory: %s" % e)
+ sys.exit(1)
+
+
+def mode_create(args):
+ session_dir = os.path.join(conf.get_opt("session_dir"),
+ args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+
+ mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+ logger=logger)
+
+ if not os.path.exists(status_file) or args.reset_session_time:
+ with open(status_file, "w", buffering=0) as f:
+ f.write(args.time_to_update)
+
+ sys.exit(0)
+
+
+def mode_post(args):
+ session_dir = os.path.join(conf.get_opt("session_dir"), args.session)
+ status_file = os.path.join(session_dir, args.volume,
+ "%s.status" % urllib.quote_plus(args.brick))
+
+ mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+ logger=logger)
+ status_file_pre = status_file + ".pre"
+
+ if os.path.exists(status_file_pre):
+ os.rename(status_file_pre, status_file)
+ sys.exit(0)
+
+
+def mode_delete(args):
+ session_dir = os.path.join(conf.get_opt("session_dir"),
+ args.session)
+ shutil.rmtree(os.path.join(session_dir, args.volume),
+ onerror=handle_rm_error)
+
+
+def _get_args():
+ parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
+ description="Node Agent")
+ subparsers = parser.add_subparsers(dest="mode")
+
+ parser_cleanup = subparsers.add_parser('cleanup')
+ parser_cleanup.add_argument("session", help="Session Name")
+ parser_cleanup.add_argument("volume", help="Volume Name")
+ parser_cleanup.add_argument("--debug", help="Debug", action="store_true")
+
+ parser_session_create = subparsers.add_parser('create')
+ parser_session_create.add_argument("session", help="Session Name")
+ parser_session_create.add_argument("volume", help="Volume Name")
+ parser_session_create.add_argument("brick", help="Brick Path")
+ parser_session_create.add_argument("time_to_update", help="Time to Update")
+ parser_session_create.add_argument("--reset-session-time",
+ help="Reset Session Time",
+ action="store_true")
+ parser_session_create.add_argument("--debug", help="Debug",
+ action="store_true")
+
+ parser_post = subparsers.add_parser('post')
+ parser_post.add_argument("session", help="Session Name")
+ parser_post.add_argument("volume", help="Volume Name")
+ parser_post.add_argument("brick", help="Brick Path")
+ parser_post.add_argument("--debug", help="Debug",
+ action="store_true")
+
+ parser_delete = subparsers.add_parser('delete')
+ parser_delete.add_argument("session", help="Session Name")
+ parser_delete.add_argument("volume", help="Volume Name")
+ parser_delete.add_argument("--debug", help="Debug",
+ action="store_true")
+ return parser.parse_args()
+
+
+if __name__ == "__main__":
+ args = _get_args()
+
+ # globals() will have all the functions already defined.
+ # mode_<args.mode> will be the function name to be called
+ globals()["mode_" + args.mode](args)
diff --git a/tools/glusterfind/src/nodecleanup.py b/tools/glusterfind/src/nodecleanup.py
deleted file mode 100644
index a31d4d83acd..00000000000
--- a/tools/glusterfind/src/nodecleanup.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
-# This file is part of GlusterFS.
-#
-# This file is licensed to you under your choice of the GNU Lesser
-# General Public License, version 3 or any later version (LGPLv3 or
-# later), or the GNU General Public License, version 2 (GPLv2), in all
-# cases as published by the Free Software Foundation.
-
-import shutil
-import sys
-import os
-import logging
-from errno import ENOENT
-
-from utils import setup_logger, mkdirp
-import conf
-
-logger = logging.getLogger()
-
-
-if __name__ == "__main__":
- # Args: <SESSION> <VOLUME>
- session = sys.argv[1]
- volume = sys.argv[2]
-
- working_dir = os.path.join(conf.get_opt("working_dir"),
- session,
- volume)
-
- mkdirp(os.path.join(conf.get_opt("log_dir"), session, volume),
- exit_on_err=True)
- log_file = os.path.join(conf.get_opt("log_dir"),
- session,
- volume,
- "changelog.log")
-
- setup_logger(logger, log_file)
-
- try:
- def handle_rm_error(func, path, exc_info):
- if exc_info[1].errno == ENOENT:
- return
-
- raise exc_info[1]
-
- shutil.rmtree(working_dir, onerror=handle_rm_error)
- except (OSError, IOError) as e:
- logger.error("Failed to delete working directory: %s" % e)
- sys.exit(1)
diff --git a/tools/glusterfind/src/tool.conf.in b/tools/glusterfind/src/tool.conf.in
index 54230cb4dca..a80f4a784c0 100644
--- a/tools/glusterfind/src/tool.conf.in
+++ b/tools/glusterfind/src/tool.conf.in
@@ -2,8 +2,7 @@
session_dir=@GLUSTERD_WORKDIR@/glusterfind/
working_dir=@GLUSTERFSD_MISCDIR@/glusterfind/
log_dir=/var/log/glusterfs/glusterfind/
-nodecleanup=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodecleanup.py
-changelog_rollover_time=15
+nodeagent=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodeagent.py
brick_ignore_dirs=.glusterfs,.trashcan
[change_detectors]
diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py
index de9c027e299..aea9a9dc82d 100644
--- a/tools/glusterfind/src/utils.py
+++ b/tools/glusterfind/src/utils.py
@@ -9,14 +9,36 @@
# cases as published by the Free Software Foundation.
import sys
-import socket
from subprocess import PIPE, Popen
-from errno import EPERM, EEXIST
+from errno import EEXIST, ENOENT
+import xml.etree.cElementTree as etree
import logging
import os
from datetime import datetime
+import urllib
ROOT_GFID = "00000000-0000-0000-0000-000000000001"
+DEFAULT_CHANGELOG_INTERVAL = 15
+
+ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError
+cache_data = {}
+
+
+def cache_output(func):
+ def wrapper(*args, **kwargs):
+ global cache_data
+ if cache_data.get(func.func_name, None) is None:
+ cache_data[func.func_name] = func(*args, **kwargs)
+
+ return cache_data[func.func_name]
+ return wrapper
+
+
+def handle_rm_error(func, path, exc_info):
+ if exc_info[1].errno == ENOENT:
+ return
+
+ raise exc_info[1]
def find(path, callback_func=lambda x: True, filter_func=lambda x: True,
@@ -41,12 +63,16 @@ def find(path, callback_func=lambda x: True, filter_func=lambda x: True,
callback_func(full_path)
-def output_write(f, path, prefix="."):
+def output_write(f, path, prefix=".", encode=False):
if path == "":
return
if prefix != ".":
path = os.path.join(prefix, path)
+
+ if encode:
+ path = urllib.quote_plus(path)
+
f.write("%s\n" % path)
@@ -153,51 +179,41 @@ def symlink_gfid_to_path(brick, gfid):
return out_path
-def is_host_local(host):
- """
- Find if a host is local or not.
- Code copied from $GLUSTERFS/geo-replication/syncdaemon/syncdutils.py
- """
- locaddr = False
- for ai in socket.getaddrinfo(host, None):
- # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators
- # /mgmt/glusterd/src/glusterd-utils.c#L125
- if ai[0] == socket.AF_INET:
- if ai[-1][0].split(".")[0] == "127":
- locaddr = True
- break
- elif ai[0] == socket.AF_INET6:
- if ai[-1][0] == "::1":
- locaddr = True
- break
- else:
- continue
- try:
- # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue,
- # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587
- s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP)
- except socket.error:
- ex = sys.exc_info()[1]
- if ex.errno != EPERM:
- raise
- f = None
- try:
- f = open("/proc/sys/net/ipv4/ip_nonlocal_bind")
- if int(f.read()) != 0:
- logger.warning("non-local bind is set and not "
- "allowed to create "
- "raw sockets, cannot determine "
- "if %s is local" % host)
- return False
- s = socket.socket(ai[0], socket.SOCK_DGRAM)
- finally:
- if f:
- f.close()
- try:
- s.bind(ai[-1])
- locaddr = True
- break
- except:
- pass
- s.close()
- return locaddr
+@cache_output
+def get_my_uuid():
+ cmd = ["gluster", "system::", "uuid", "get", "--xml"]
+ rc, out, err = execute(cmd)
+
+ if rc != 0:
+ return None
+
+ tree = etree.fromstring(out)
+ uuid_el = tree.find("uuidGenerate/uuid")
+ return uuid_el.text
+
+
+def is_host_local(host_uuid):
+ # Get UUID only if it is not done previously
+ # else Cache the UUID value
+ my_uuid = get_my_uuid()
+ if my_uuid == host_uuid:
+ return True
+
+ return False
+
+
+def get_changelog_rollover_time(volumename):
+ cmd = ["gluster", "volume", "get", volumename,
+ "changelog.rollover-time", "--xml"]
+ rc, out, err = execute(cmd)
+
+ if rc != 0:
+ return DEFAULT_CHANGELOG_INTERVAL
+
+ try:
+ tree = etree.fromstring(out)
+ return int(tree.find('volGetopts/Value').text)
+ except ParseError:
+ return DEFAULT_CHANGELOG_INTERVAL
+
+