diff options
| -rw-r--r-- | tools/glusterfind/src/changelog.py | 2 | ||||
| -rw-r--r-- | tools/glusterfind/src/main.py | 68 | ||||
| -rw-r--r-- | tools/glusterfind/src/nodeagent.py | 4 | ||||
| -rw-r--r-- | tools/glusterfind/src/utils.py | 2 | 
4 files changed, 60 insertions, 16 deletions
| diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py index 283a035fe0e..721b8d0ca3a 100644 --- a/tools/glusterfind/src/changelog.py +++ b/tools/glusterfind/src/changelog.py @@ -284,7 +284,7 @@ def get_changes(brick, hash_dir, log_file, start, end, args):          # history_getchanges()          changes = []          while libgfchangelog.cl_history_scan() > 0: -            changes += libgfchangelog.cl_history_getchanges() +            changes = libgfchangelog.cl_history_getchanges()              for change in changes:                  # Ignore if last processed changelog comes diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py index 37d6c38cc49..0c993f50db3 100644 --- a/tools/glusterfind/src/main.py +++ b/tools/glusterfind/src/main.py @@ -18,6 +18,9 @@ import xml.etree.cElementTree as etree  from argparse import ArgumentParser, RawDescriptionHelpFormatter, Action  import logging  import shutil +import tempfile +import signal +from datetime import datetime  from utils import execute, is_host_local, mkdirp, fail  from utils import setup_logger, human_time, handle_rm_error @@ -34,6 +37,7 @@ ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError  logger = logging.getLogger()  node_outfiles = []  vol_statusStr = "" +gtmpfilename = None  class StoreAbsPath(Action): @@ -71,6 +75,8 @@ def node_cmd(host, host_uuid, task, cmd, args, opts):              cmd = ["ssh",                     "-oNumberOfPasswordPrompts=0",                     "-oStrictHostKeyChecking=no", +                   "-t", +                   "-t",                     "-i", pem_key_path,                     "root@%s" % host] + cmd @@ -98,8 +104,13 @@ def run_cmd_nodes(task, args, **kwargs):          host_uuid = node[0]          cmd = []          opts = {} + +        # tmpfilename is valid only for tasks: pre, query and cleanup +        tmpfilename = kwargs.get("tmpfilename", "BADNAME") +          node_outfile = os.path.join(conf.get_opt("working_dir"),                                      args.session, args.volume, +                                    tmpfilename,                                      "tmp_output_%s" % num)          if task == "pre": @@ -117,6 +128,9 @@ def run_cmd_nodes(task, args, **kwargs):                      tag = '""' if not is_host_local(host_uuid) else ""              node_outfiles.append(node_outfile) +            # remote file will be copied into this directory +            mkdirp(os.path.dirname(node_outfile), +                   exit_on_err=True, logger=logger)              cmd = [change_detector,                     args.session, @@ -144,6 +158,9 @@ def run_cmd_nodes(task, args, **kwargs):                      tag = '""' if not is_host_local(host_uuid) else ""              node_outfiles.append(node_outfile) +            # remote file will be copied into this directory +            mkdirp(os.path.dirname(node_outfile), +                   exit_on_err=True, logger=logger)              cmd = [change_detector,                     args.session, @@ -162,8 +179,9 @@ def run_cmd_nodes(task, args, **kwargs):              opts["node_outfile"] = node_outfile              opts["copy_outfile"] = True          elif task == "cleanup": -            # After pre run, cleanup the working directory and other temp files -            # Remove the copied node_outfile in main node +            # After pre/query run, cleanup the working directory and other +            # temp files. Remove the directory to which node_outfile has +            # been copied in main node              try:                  os.remove(node_outfile)              except (OSError, IOError): @@ -174,7 +192,9 @@ def run_cmd_nodes(task, args, **kwargs):              cmd = [conf.get_opt("nodeagent"),                     "cleanup",                     args.session, -                   args.volume] + (["--debug"] if args.debug else []) +                   args.volume, +                   os.path.dirname(node_outfile)] + \ +                (["--debug"] if args.debug else [])          elif task == "create":              if vol_statusStr != "Started":                  fail("Volume %s is not online" % args.volume, @@ -422,8 +442,8 @@ def enable_volume_options(args):                  % args.volume) -def write_output(args, outfilemerger): -    with codecs.open(args.outfile, "a", encoding="utf-8") as f: +def write_output(outfile, outfilemerger): +    with codecs.open(outfile, "a", encoding="utf-8") as f:          for row in outfilemerger.get():              # Multiple paths in case of Hardlinks              paths = row[1].split(",") @@ -438,9 +458,10 @@ def write_output(args, outfilemerger):                  if p_rep == row_2_rep:                      continue -                f.write(u"{0} {1} {2}\n".format(row[0], -                                                p_rep, -                                                row_2_rep)) +                if row_2_rep and row_2_rep != "": +                    f.write(u"{0} {1} {2}\n".format(row[0], p_rep, row_2_rep)) +                else: +                    f.write(u"{0} {1}\n".format(row[0], p_rep))  def mode_create(session_dir, args): @@ -490,6 +511,8 @@ def mode_create(session_dir, args):  def mode_query(session_dir, args): +    global gtmpfilename +      # Verify volume status      cmd = ["gluster", 'volume', 'info', args.volume, "--xml"]      _, data, _ = execute(cmd, @@ -533,7 +556,10 @@ def mode_query(session_dir, args):                   "Start time: %s"                   % ("default", args.volume, start)) -    run_cmd_nodes("query", args, start=start) +    prefix = datetime.now().strftime("%Y%m%d-%H%M%S-%f-") +    gtmpfilename = prefix + next(tempfile._get_candidate_names()) + +    run_cmd_nodes("query", args, start=start, tmpfilename=gtmpfilename)      # Merger      if args.full: @@ -545,7 +571,7 @@ def mode_query(session_dir, args):          # Read each Changelogs db and generate finaldb          create_file(args.outfile, exit_on_err=True, logger=logger)          outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles) -        write_output(args, outfilemerger) +        write_output(args.outfile, outfilemerger)      try:          os.remove(args.outfile + ".db") @@ -558,6 +584,8 @@ def mode_query(session_dir, args):  def mode_pre(session_dir, args): +    global gtmpfilename +      """      Read from Session file and write to session.pre file      """ @@ -587,7 +615,10 @@ def mode_pre(session_dir, args):                   "Start time: %s, End time: %s"                   % (args.session, args.volume, start, endtime_to_update)) -    run_cmd_nodes("pre", args, start=start) +    prefix = datetime.now().strftime("%Y%m%d-%H%M%S-%f-") +    gtmpfilename = prefix + next(tempfile._get_candidate_names()) + +    run_cmd_nodes("pre", args, start=start, tmpfilename=gtmpfilename)      # Merger      if args.full: @@ -599,8 +630,7 @@ def mode_pre(session_dir, args):          # Read each Changelogs db and generate finaldb          create_file(args.outfile, exit_on_err=True, logger=logger)          outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles) - -        write_output(args, outfilemerger) +        write_output(args.outfile, outfilemerger)      try:          os.remove(args.outfile + ".db") @@ -713,6 +743,10 @@ def mode_list(session_dir, args):  def main(): +    global gtmpfilename + +    args = None +      try:          args = _get_args()          mkdirp(conf.get_opt("session_dir"), exit_on_err=True) @@ -756,5 +790,13 @@ def main():          # mode_<args.mode> will be the function name to be called          globals()["mode_" + args.mode](session_dir, args)      except KeyboardInterrupt: +        if args is not None: +            if args.mode == "pre" or args.mode == "query": +                # cleanup session +                if gtmpfilename is not None: +                    # no more interrupts until we clean up +                    signal.signal(signal.SIGINT, signal.SIG_IGN) +                    run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename) +          # Interrupted, exit with non zero error code          sys.exit(2) diff --git a/tools/glusterfind/src/nodeagent.py b/tools/glusterfind/src/nodeagent.py index f70744927eb..07d82826e0d 100644 --- a/tools/glusterfind/src/nodeagent.py +++ b/tools/glusterfind/src/nodeagent.py @@ -26,7 +26,8 @@ logger = logging.getLogger()  def mode_cleanup(args):      working_dir = os.path.join(conf.get_opt("working_dir"),                                 args.session, -                               args.volume) +                               args.volume, +                               args.tmpfilename)      mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume),             exit_on_err=True) @@ -98,6 +99,7 @@ def _get_args():      parser_cleanup = subparsers.add_parser('cleanup')      parser_cleanup.add_argument("session", help="Session Name")      parser_cleanup.add_argument("volume", help="Volume Name") +    parser_cleanup.add_argument("tmpfilename", help="Temporary File Name")      parser_cleanup.add_argument("--debug", help="Debug", action="store_true")      parser_session_create = subparsers.add_parser('create') diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py index 598cc9e7f46..70737be760a 100644 --- a/tools/glusterfind/src/utils.py +++ b/tools/glusterfind/src/utils.py @@ -227,7 +227,7 @@ def get_changelog_rollover_time(volumename):      try:          tree = etree.fromstring(out) -        return int(tree.find('volGetopts/Value').text) +        return int(tree.find('volGetopts/Opt/Value').text)      except ParseError:          return DEFAULT_CHANGELOG_INTERVAL | 
