diff options
Diffstat (limited to 'tools')
25 files changed, 3856 insertions, 0 deletions
diff --git a/tools/Makefile.am b/tools/Makefile.am new file mode 100644 index 00000000000..5808a3728cd --- /dev/null +++ b/tools/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = gfind_missing_files glusterfind setgfid2path + +CLEANFILES = diff --git a/tools/gfind_missing_files/Makefile.am b/tools/gfind_missing_files/Makefile.am new file mode 100644 index 00000000000..181fe7091f3 --- /dev/null +++ b/tools/gfind_missing_files/Makefile.am @@ -0,0 +1,32 @@ +gfindmissingfilesdir = $(GLUSTERFS_LIBEXECDIR)/gfind_missing_files + +if WITH_SERVER +gfindmissingfiles_SCRIPTS = gfind_missing_files.sh gfid_to_path.sh \ + gfid_to_path.py +endif + +EXTRA_DIST = gfind_missing_files.sh gfid_to_path.sh \ + gfid_to_path.py + +if WITH_SERVER +gfindmissingfiles_PROGRAMS = gcrawler +endif + +gcrawler_SOURCES = gcrawler.c +gcrawler_LDADD = $(top_builddir)/libglusterfs/src/libglusterfs.la +gcrawler_LDFLAGS = $(GF_LDFLAGS) + +AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src + +AM_CFLAGS = -Wall $(GF_CFLAGS) + +if WITH_SERVER +uninstall-local: + rm -f $(DESTDIR)$(sbindir)/gfind_missing_files + +install-data-local: + rm -f $(DESTDIR)$(sbindir)/gfind_missing_files + ln -s $(GLUSTERFS_LIBEXECDIR)/gfind_missing_files/gfind_missing_files.sh $(DESTDIR)$(sbindir)/gfind_missing_files +endif + +CLEANFILES = diff --git a/tools/gfind_missing_files/gcrawler.c b/tools/gfind_missing_files/gcrawler.c new file mode 100644 index 00000000000..4acbe92bc8f --- /dev/null +++ b/tools/gfind_missing_files/gcrawler.c @@ -0,0 +1,581 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <stdio.h> +#include <errno.h> +#include <sys/stat.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> +#include <dirent.h> +#include <assert.h> +#include <glusterfs/locking.h> + +#include <glusterfs/compat.h> +#include <glusterfs/list.h> +#include <glusterfs/syscall.h> + +#define THREAD_MAX 32 +#define BUMP(name) INC(name, 1) +#define DEFAULT_WORKERS 4 + +#define NEW(x) \ + { \ + x = calloc(1, sizeof(typeof(*x))); \ + } + +#define err(x...) fprintf(stderr, x) +#define out(x...) fprintf(stdout, x) +#define dbg(x...) \ + do { \ + if (debug) \ + fprintf(stdout, x); \ + } while (0) +#define tout(x...) \ + do { \ + out("[%ld] ", pthread_self()); \ + out(x); \ + } while (0) +#define terr(x...) \ + do { \ + err("[%ld] ", pthread_self()); \ + err(x); \ + } while (0) +#define tdbg(x...) \ + do { \ + dbg("[%ld] ", pthread_self()); \ + dbg(x); \ + } while (0) + +int debug = 0; +const char *slavemnt = NULL; +int workers = 0; + +struct stats { + unsigned long long int cnt_skipped_gfids; +}; + +pthread_spinlock_t stats_lock; + +struct stats stats_total; +int stats = 0; + +#define INC(name, val) \ + do { \ + if (!stats) \ + break; \ + pthread_spin_lock(&stats_lock); \ + { \ + stats_total.cnt_##name += val; \ + } \ + pthread_spin_unlock(&stats_lock); \ + } while (0) + +void +stats_dump() +{ + if (!stats) + return; + + out("-------------------------------------------\n"); + out("Skipped_Files : %10lld\n", stats_total.cnt_skipped_gfids); + out("-------------------------------------------\n"); +} + +struct dirjob { + struct list_head list; + + char *dirname; + + struct dirjob *parent; + int ret; /* final status of this subtree */ + int refcnt; /* how many dirjobs have this as parent */ + + pthread_spinlock_t lock; +}; + +struct xwork { + pthread_t cthreads[THREAD_MAX]; /* crawler threads */ + int count; + int idle; + int stop; + + struct dirjob crawl; + + struct dirjob *rootjob; /* to verify completion in xwork_fini() */ + + pthread_mutex_t mutex; + pthread_cond_t cond; +}; + +struct dirjob * +dirjob_ref(struct dirjob *job) +{ + pthread_spin_lock(&job->lock); + { + job->refcnt++; + } + pthread_spin_unlock(&job->lock); + + return job; +} + +void +dirjob_free(struct dirjob *job) +{ + assert(list_empty(&job->list)); + + pthread_spin_destroy(&job->lock); + free(job->dirname); + free(job); +} + +void +dirjob_ret(struct dirjob *job, int err) +{ + int ret = 0; + int refcnt = 0; + struct dirjob *parent = NULL; + + pthread_spin_lock(&job->lock); + { + refcnt = --job->refcnt; + job->ret = (job->ret || err); + } + pthread_spin_unlock(&job->lock); + + if (refcnt == 0) { + ret = job->ret; + + if (ret) + terr("Failed: %s (%d)\n", job->dirname, ret); + else + tdbg("Finished: %s\n", job->dirname); + + parent = job->parent; + if (parent) + dirjob_ret(parent, ret); + + dirjob_free(job); + job = NULL; + } +} + +struct dirjob * +dirjob_new(const char *dir, struct dirjob *parent) +{ + struct dirjob *job = NULL; + + NEW(job); + if (!job) + return NULL; + + job->dirname = strdup(dir); + if (!job->dirname) { + free(job); + return NULL; + } + + INIT_LIST_HEAD(&job->list); + pthread_spin_init(&job->lock, PTHREAD_PROCESS_PRIVATE); + job->ret = 0; + + if (parent) + job->parent = dirjob_ref(parent); + + job->refcnt = 1; + + return job; +} + +void +xwork_addcrawl(struct xwork *xwork, struct dirjob *job) +{ + pthread_mutex_lock(&xwork->mutex); + { + list_add_tail(&job->list, &xwork->crawl.list); + pthread_cond_broadcast(&xwork->cond); + } + pthread_mutex_unlock(&xwork->mutex); +} + +int +xwork_add(struct xwork *xwork, const char *dir, struct dirjob *parent) +{ + struct dirjob *job = NULL; + + job = dirjob_new(dir, parent); + if (!job) + return -1; + + xwork_addcrawl(xwork, job); + + return 0; +} + +struct dirjob * +xwork_pick(struct xwork *xwork, int block) +{ + struct dirjob *job = NULL; + struct list_head *head = NULL; + + head = &xwork->crawl.list; + + pthread_mutex_lock(&xwork->mutex); + { + for (;;) { + if (xwork->stop) + break; + + if (!list_empty(head)) { + job = list_entry(head->next, typeof(*job), list); + list_del_init(&job->list); + break; + } + + if (((xwork->count * 2) == xwork->idle) && + list_empty(&xwork->crawl.list)) { + /* no outstanding jobs, and no + active workers + */ + tdbg("Jobless. Terminating\n"); + xwork->stop = 1; + pthread_cond_broadcast(&xwork->cond); + break; + } + + if (!block) + break; + + xwork->idle++; + pthread_cond_wait(&xwork->cond, &xwork->mutex); + xwork->idle--; + } + } + pthread_mutex_unlock(&xwork->mutex); + + return job; +} + +int +skip_name(const char *dirname, const char *name) +{ + if (strcmp(name, ".") == 0) + return 1; + + if (strcmp(name, "..") == 0) + return 1; + + if (strcmp(name, "changelogs") == 0) + return 1; + + if (strcmp(name, "health_check") == 0) + return 1; + + if (strcmp(name, "indices") == 0) + return 1; + + if (strcmp(name, "landfill") == 0) + return 1; + + return 0; +} + +int +skip_stat(struct dirjob *job, const char *name) +{ + if (job == NULL) + return 0; + + if (strcmp(job->dirname, ".glusterfs") == 0) { + tdbg( + "Directly adding directories under .glusterfs " + "to global list: %s\n", + name); + return 1; + } + + if (job->parent != NULL) { + if (strcmp(job->parent->dirname, ".glusterfs") == 0) { + tdbg( + "Directly adding directories under .glusterfs/XX " + "to global list: %s\n", + name); + return 1; + } + } + + return 0; +} + +int +xworker_do_crawl(struct xwork *xwork, struct dirjob *job) +{ + DIR *dirp = NULL; + int ret = -1; + int boff; + int plen; + char *path = NULL; + struct dirjob *cjob = NULL; + struct stat statbuf = { + 0, + }; + struct dirent *entry; + struct dirent scratch[2] = { + { + 0, + }, + }; + char gfid_path[PATH_MAX] = { + 0, + }; + + plen = strlen(job->dirname) + 256 + 2; + path = alloca(plen); + + tdbg("Entering: %s\n", job->dirname); + + dirp = sys_opendir(job->dirname); + if (!dirp) { + terr("opendir failed on %s (%s)\n", job->dirname, strerror(errno)); + goto out; + } + + boff = sprintf(path, "%s/", job->dirname); + + for (;;) { + errno = 0; + entry = sys_readdir(dirp, scratch); + if (!entry || errno != 0) { + if (errno != 0) { + err("readdir(%s): %s\n", job->dirname, strerror(errno)); + ret = errno; + goto out; + } + break; + } + + if (entry->d_ino == 0) + continue; + + if (skip_name(job->dirname, entry->d_name)) + continue; + + /* It is sure that, children and grandchildren of .glusterfs + * are directories, just add them to global queue. + */ + if (skip_stat(job, entry->d_name)) { + strncpy(path + boff, entry->d_name, (plen - boff)); + cjob = dirjob_new(path, job); + if (!cjob) { + err("dirjob_new(%s): %s\n", path, strerror(errno)); + ret = -1; + goto out; + } + xwork_addcrawl(xwork, cjob); + continue; + } + + (void)snprintf(gfid_path, sizeof(gfid_path), "%s/.gfid/%s", slavemnt, + entry->d_name); + ret = sys_lstat(gfid_path, &statbuf); + + if (ret && errno == ENOENT) { + out("%s\n", entry->d_name); + BUMP(skipped_gfids); + } + + if (ret && errno != ENOENT) { + err("stat on slave failed(%s): %s\n", gfid_path, strerror(errno)); + goto out; + } + } + + ret = 0; +out: + if (dirp) + (void)sys_closedir(dirp); + + return ret; +} + +void * +xworker_crawl(void *data) +{ + struct xwork *xwork = data; + struct dirjob *job = NULL; + int ret = -1; + + while ((job = xwork_pick(xwork, 0))) { + ret = xworker_do_crawl(xwork, job); + dirjob_ret(job, ret); + } + + return NULL; +} + +int +xwork_fini(struct xwork *xwork, int stop) +{ + int i = 0; + int ret = 0; + void *tret = 0; + + pthread_mutex_lock(&xwork->mutex); + { + xwork->stop = (xwork->stop || stop); + pthread_cond_broadcast(&xwork->cond); + } + pthread_mutex_unlock(&xwork->mutex); + + for (i = 0; i < xwork->count; i++) { + pthread_join(xwork->cthreads[i], &tret); + tdbg("CThread id %ld returned %p\n", xwork->cthreads[i], tret); + } + + if (debug) { + assert(xwork->rootjob->refcnt == 1); + dirjob_ret(xwork->rootjob, 0); + } + + if (stats) + pthread_spin_destroy(&stats_lock); + + return ret; +} + +int +xwork_init(struct xwork *xwork, int count) +{ + int i = 0; + int ret = 0; + struct dirjob *rootjob = NULL; + + if (stats) + pthread_spin_init(&stats_lock, PTHREAD_PROCESS_PRIVATE); + + pthread_mutex_init(&xwork->mutex, NULL); + pthread_cond_init(&xwork->cond, NULL); + + INIT_LIST_HEAD(&xwork->crawl.list); + + rootjob = dirjob_new(".glusterfs", NULL); + if (debug) + xwork->rootjob = dirjob_ref(rootjob); + + xwork_addcrawl(xwork, rootjob); + + xwork->count = count; + for (i = 0; i < count; i++) { + ret = pthread_create(&xwork->cthreads[i], NULL, xworker_crawl, xwork); + if (ret) + break; + tdbg("Spawned crawler %d thread %ld\n", i, xwork->cthreads[i]); + } + + return ret; +} + +int +xfind(const char *basedir) +{ + struct xwork xwork; + int ret = 0; + char *cwd = NULL; + + ret = chdir(basedir); + if (ret) { + err("%s: %s\n", basedir, strerror(errno)); + return ret; + } + + cwd = getcwd(0, 0); + if (!cwd) { + err("getcwd(): %s\n", strerror(errno)); + return -1; + } + + tdbg("Working directory: %s\n", cwd); + free(cwd); + + memset(&xwork, 0, sizeof(xwork)); + + ret = xwork_init(&xwork, workers); + if (ret == 0) + xworker_crawl(&xwork); + + ret = xwork_fini(&xwork, ret); + stats_dump(); + + return ret; +} + +static char * +parse_and_validate_args(int argc, char *argv[]) +{ + char *basedir = NULL; + struct stat d = { + 0, + }; + int ret = -1; +#ifndef __FreeBSD__ + unsigned char volume_id[16]; +#endif /* __FreeBSD__ */ + char *slv_mnt = NULL; + + if (argc != 4) { + err("Usage: %s <DIR> <SLAVE-VOL-MOUNT> <CRAWL-THREAD-COUNT>\n", + argv[0]); + return NULL; + } + + basedir = argv[1]; + ret = sys_lstat(basedir, &d); + if (ret) { + err("%s: %s\n", basedir, strerror(errno)); + return NULL; + } + +#ifndef __FreeBSD__ + ret = sys_lgetxattr(basedir, "trusted.glusterfs.volume-id", volume_id, 16); + if (ret != 16) { + err("%s:Not a valid brick path.\n", basedir); + return NULL; + } +#endif /* __FreeBSD__ */ + + slv_mnt = argv[2]; + ret = sys_lstat(slv_mnt, &d); + if (ret) { + err("%s: %s\n", slv_mnt, strerror(errno)); + return NULL; + } + slavemnt = argv[2]; + + workers = atoi(argv[3]); + if (workers <= 0) + workers = DEFAULT_WORKERS; + + return basedir; +} + +int +main(int argc, char *argv[]) +{ + char *basedir = NULL; + + basedir = parse_and_validate_args(argc, argv); + if (!basedir) + return 1; + + xfind(basedir); + + return 0; +} diff --git a/tools/gfind_missing_files/gfid_to_path.py b/tools/gfind_missing_files/gfid_to_path.py new file mode 100644 index 00000000000..01e08a9494a --- /dev/null +++ b/tools/gfind_missing_files/gfid_to_path.py @@ -0,0 +1,162 @@ +#!/usr/bin/python3 + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sys +import os +import xattr +import uuid +import re +import errno + +CHANGELOG_SEARCH_MAX_TRY = 31 +DEC_CTIME_START = 5 +ROOT_GFID = "00000000-0000-0000-0000-000000000001" +MAX_NUM_CHANGELOGS_TRY = 2 + + +def output_not_found(gfid): + # Write GFID to stderr + sys.stderr.write("%s\n" % gfid) + + +def output_success(path): + # Write converted Path to Stdout + sys.stdout.write("%s\n" % path) + + +def full_dir_path(gfid): + out_path = "" + while True: + path = os.path.join(".glusterfs", gfid[0:2], gfid[2:4], gfid) + path_readlink = os.readlink(path) + pgfid = os.path.dirname(path_readlink) + out_path = os.path.join(os.path.basename(path_readlink), out_path) + if pgfid == "../../00/00/%s" % ROOT_GFID: + out_path = os.path.join("./", out_path) + break + gfid = os.path.basename(pgfid) + return out_path + + +def find_path_from_changelog(fd, gfid): + """ + In given Changelog File, finds using following pattern + <T><GFID>\x00<TYPE>\x00<MODE>\x00<UID>\x00<GID>\x00<PARGFID>/<BASENAME> + Pattern search finds PARGFID and BASENAME, Convert PARGFID to Path + Using readlink and add basename to form Full path. + """ + content = fd.read() + + pattern = "E%s" % gfid + pattern += "\x00(3|23)\x00\d+\x00\d+\x00\d+\x00([^\x00]+)/([^\x00]+)" + pat = re.compile(pattern) + match = pat.search(content) + + if match: + pgfid = match.group(2) + basename = match.group(3) + if pgfid == ROOT_GFID: + return os.path.join("./", basename) + else: + full_path_parent = full_dir_path(pgfid) + if full_path_parent: + return os.path.join(full_path_parent, basename) + + return None + + +def gfid_to_path(gfid): + """ + Try readlink, if it is directory it succeeds. + Get ctime of the GFID file, Decrement by 5 sec + Search for Changelog filename, Since Changelog file generated + every 15 sec, Search and get immediate next Changelog after the file + Creation. Get the Path by searching in Changelog file. + Get the resultant file's GFID and Compare with the input, If these + GFIDs are different then Some thing is changed(May be Rename) + """ + gfid = gfid.strip() + gpath = os.path.join(".glusterfs", gfid[0:2], gfid[2:4], gfid) + try: + output_success(full_dir_path(gfid)) + return + except OSError: + # Not an SymLink + pass + + try: + ctime = int(os.stat(gpath).st_ctime) + ctime -= DEC_CTIME_START + except (OSError, IOError): + output_not_found(gfid) + return + + path = None + found_changelog = False + changelog_parse_try = 0 + for i in range(CHANGELOG_SEARCH_MAX_TRY): + cl = os.path.join(".glusterfs/changelogs", "CHANGELOG.%s" % ctime) + + try: + with open(cl, "rb") as f: + changelog_parse_try += 1 + found_changelog = True + path = find_path_from_changelog(f, gfid) + if not path and changelog_parse_try < MAX_NUM_CHANGELOGS_TRY: + ctime += 1 + continue + break + except (IOError, OSError) as e: + if e.errno == errno.ENOENT: + ctime += 1 + else: + break + + if not found_changelog: + output_not_found(gfid) + return + + if not path: + output_not_found(gfid) + return + gfid1 = str(uuid.UUID(bytes=xattr.get(path, "trusted.gfid"))) + if gfid != gfid1: + output_not_found(gfid) + return + + output_success(path) + + +def main(): + num_arguments = 3 + if not sys.stdin.isatty(): + num_arguments = 2 + + if len(sys.argv) != num_arguments: + sys.stderr.write("Invalid arguments\nUsage: " + "%s <BRICK_PATH> <GFID_FILE>\n" % sys.argv[0]) + sys.exit(1) + + path = sys.argv[1] + + if sys.stdin.isatty(): + gfid_list = os.path.abspath(sys.argv[2]) + os.chdir(path) + with open(gfid_list) as f: + for gfid in f: + gfid_to_path(gfid) + else: + os.chdir(path) + for gfid in sys.stdin: + gfid_to_path(gfid) + + +if __name__ == "__main__": + main() diff --git a/tools/gfind_missing_files/gfid_to_path.sh b/tools/gfind_missing_files/gfid_to_path.sh new file mode 100644 index 00000000000..ebe817ac2f3 --- /dev/null +++ b/tools/gfind_missing_files/gfid_to_path.sh @@ -0,0 +1,43 @@ +#!/bin/sh + +## Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +## This file is part of GlusterFS. +## +## This file is licensed to you under your choice of the GNU Lesser +## General Public License, version 3 or any later version (LGPLv3 or +## later), or the GNU General Public License, version 2 (GPLv2), in all +## cases as published by the Free Software Foundation. + +E_BADARGS=65 + + +gfid_to_path () +{ + brick_dir=$1; + gfid_file=$(readlink -e $2); + + current_dir=$(pwd); + cd $brick_dir; + + while read gfid + do + to_search=`echo .glusterfs/${gfid:0:2}"/"${gfid:2:2}"/"$gfid`; + find . -samefile $to_search | grep -v $to_search; + done < $gfid_file; + + cd $current_dir; +} + + +main () +{ + if [ $# -ne 2 ] + then + echo "Usage: `basename $0` BRICK_DIR GFID_FILE"; + exit $E_BADARGS; + fi + + gfid_to_path $1 $2; +} + +main "$@"; diff --git a/tools/gfind_missing_files/gfind_missing_files.sh b/tools/gfind_missing_files/gfind_missing_files.sh new file mode 100644 index 00000000000..e7aaa0b5dd4 --- /dev/null +++ b/tools/gfind_missing_files/gfind_missing_files.sh @@ -0,0 +1,119 @@ +#!/bin/sh + +## Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +## This file is part of GlusterFS. +## +## This file is licensed to you under your choice of the GNU Lesser +## General Public License, version 3 or any later version (LGPLv3 or +## later), or the GNU General Public License, version 2 (GPLv2), in all +## cases as published by the Free Software Foundation. + +BRICKPATH= #Brick path of gluster volume +SLAVEHOST= #Slave hostname +SLAVEVOL= #Slave volume +SLAVEMNT= #Slave gluster volume mount point +WORKERS=4 #Default number of worker threads + +out() +{ + echo "$@"; +} + +fatal() +{ + out FATAL "$@"; + exit 1 +} + +ping_host () +{ + ### Use bash internal socket support + { + exec 400<>/dev/tcp/$1/$2 + if [ $? -ne '0' ]; then + return 1; + else + exec 400>&- + return 0; + fi + } 1>&2 2>/dev/null +} + +mount_slave() +{ + local i; # inode number + SSH_PORT=22 + + SLAVEMNT=`mktemp -d` + [ "x$SLAVEMNT" = "x" ] && fatal "Could not mktemp directory"; + [ -d "$SLAVEMNT" ] || fatal "$SLAVEMNT not a directory"; + + ping_host ${SLAVEHOST} $SSH_PORT + if [ $? -ne 0 ]; then + echo "$SLAVEHOST not reachable."; + exit 1; + fi; + + glusterfs --volfile-id=$SLAVEVOL --aux-gfid-mount --volfile-server=$SLAVEHOST $SLAVEMNT; + i=$(stat -c '%i' $SLAVEMNT); + [ "x$i" = "x1" ] || fatal "Could not mount volume $2 on $SLAVEMNT Please check host and volume exists"; +} + +parse_cli() +{ + if [ "$#" -ne 4 ]; then + echo "Usage: gfind_missing_files <brick-path> <slave-host> <slave-vol> <OUTFILE>" + exit 1 + else + BRICKPATH=$1; + SLAVEHOST=$2; + SLAVEVOL=$3; + OUTFILE=$4; + + mount_slave; + echo "Slave volume is mounted at ${SLAVEMNT}" + echo + fi +} + +main() +{ + parse_cli "$@"; + + echo "Calling crawler..."; + path=$(readlink -e $0) + $(dirname $path)/gcrawler ${BRICKPATH} ${SLAVEMNT} ${WORKERS} > ${OUTFILE} + + #Clean up the mount + umount $SLAVEMNT; + rmdir $SLAVEMNT; + + echo "Crawl Complete." + num_files_missing=$(wc -l ${OUTFILE} | awk '{print $1}') + if [ $num_files_missing -eq 0 ] + then + echo "Total Missing File Count : 0" + exit 0; + fi + + echo "gfids of skipped files are available in the file ${OUTFILE}" + echo + echo "Starting gfid to path conversion" + + #Call python script to convert gfids to full pathname + INFILE=$(readlink -e ${OUTFILE}) + python $(dirname $path)/gfid_to_path.py ${BRICKPATH} ${INFILE} 1> ${OUTFILE}_pathnames 2> ${OUTFILE}_gfids + echo "Path names of skipped files are available in the file ${OUTFILE}_pathnames" + + gfid_to_path_failures=$(wc -l ${OUTFILE}_gfids | awk '{print $1}') + if [ $gfid_to_path_failures -gt 0 ] + then + echo "WARNING: Unable to convert some GFIDs to Paths, GFIDs logged to ${OUTFILE}_gfids" + echo "Use $(dirname $path)/gfid_to_path.sh <brick-path> ${OUTFILE}_gfids to convert those GFIDs to Path" + fi + + #Output + echo "Total Missing File Count : $(wc -l ${OUTFILE} | awk '{print $1}')" +} + +main "$@"; diff --git a/tools/glusterfind/Makefile.am b/tools/glusterfind/Makefile.am new file mode 100644 index 00000000000..f17dbdb228e --- /dev/null +++ b/tools/glusterfind/Makefile.am @@ -0,0 +1,24 @@ +SUBDIRS = src + +EXTRA_DIST = S57glusterfind-delete-post.py glusterfind + +if WITH_SERVER +bin_SCRIPTS = glusterfind +endif + +CLEANFILES = $(bin_SCRIPTS) + +if WITH_SERVER +deletehookscriptsdir = $(GLUSTERFS_LIBEXECDIR)/glusterfind/ +deletehookscripts_SCRIPTS = S57glusterfind-delete-post.py + +uninstall-local: + rm -f $(DESTDIR)$(GLUSTERD_WORKDIR)/hooks/1/delete/post/S57glusterfind-delete-post + +install-data-local: + $(mkdir_p) $(DESTDIR)$(GLUSTERD_WORKDIR)/glusterfind/.keys + $(mkdir_p) $(DESTDIR)$(GLUSTERD_WORKDIR)/hooks/1/delete/post/ + rm -f $(DESTDIR)$(GLUSTERD_WORKDIR)/hooks/1/delete/post/S57glusterfind-delete-post + ln -s $(GLUSTERFS_LIBEXECDIR)/glusterfind/S57glusterfind-delete-post.py \ + $(DESTDIR)$(GLUSTERD_WORKDIR)/hooks/1/delete/post/S57glusterfind-delete-post +endif diff --git a/tools/glusterfind/S57glusterfind-delete-post.py b/tools/glusterfind/S57glusterfind-delete-post.py new file mode 100755 index 00000000000..5beece220f0 --- /dev/null +++ b/tools/glusterfind/S57glusterfind-delete-post.py @@ -0,0 +1,69 @@ +#!/usr/bin/python3 +import os +import shutil +from errno import ENOENT +from subprocess import Popen, PIPE +from argparse import ArgumentParser + + +DEFAULT_GLUSTERD_WORKDIR = "/var/lib/glusterd" + + +def handle_rm_error(func, path, exc_info): + if exc_info[1].errno == ENOENT: + return + + raise exc_info[1] + + +def get_glusterd_workdir(): + p = Popen(["gluster", "system::", "getwd"], + stdout=PIPE, stderr=PIPE, universal_newlines=True) + + out, _ = p.communicate() + + if p.returncode == 0: + return out.strip() + else: + return DEFAULT_GLUSTERD_WORKDIR + + +def get_args(): + parser = ArgumentParser(description="Volume delete post hook script") + parser.add_argument("--volname") + return parser.parse_args() + + +def main(): + args = get_args() + glusterfind_dir = os.path.join(get_glusterd_workdir(), "glusterfind") + + # Check all session directories, if any directory found for + # the deleted volume, cleanup all the session directories + try: + ls_glusterfind_dir = os.listdir(glusterfind_dir) + except OSError: + ls_glusterfind_dir = [] + + for session in ls_glusterfind_dir: + # don't blow away the keys directory + if session == ".keys": + continue + + # Possible session directory + volume_session_path = os.path.join(glusterfind_dir, + session, + args.volname) + if os.path.exists(volume_session_path): + shutil.rmtree(volume_session_path, onerror=handle_rm_error) + + # Try to Remove directory, if any other dir exists for different + # volume, then rmdir will fail with ENOTEMPTY which is fine + try: + os.rmdir(os.path.join(glusterfind_dir, session)) + except (OSError, IOError): + pass + + +if __name__ == "__main__": + main() diff --git a/tools/glusterfind/glusterfind.in b/tools/glusterfind/glusterfind.in new file mode 100644 index 00000000000..ca154b625dd --- /dev/null +++ b/tools/glusterfind/glusterfind.in @@ -0,0 +1,18 @@ +#!/usr/bin/python3 + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sys +sys.path.insert(1, '@GLUSTERFS_LIBEXECDIR@/') +sys.path.insert(1, '@GLUSTERFS_LIBEXECDIR@/glusterfind') + +from glusterfind.main import main + +if __name__ == "__main__": + main() diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am new file mode 100644 index 00000000000..43b6141b01c --- /dev/null +++ b/tools/glusterfind/src/Makefile.am @@ -0,0 +1,16 @@ +glusterfinddir = $(GLUSTERFS_LIBEXECDIR)/glusterfind + +if WITH_SERVER +glusterfind_PYTHON = conf.py utils.py __init__.py \ + main.py libgfchangelog.py changelogdata.py gfind_py2py3.py + +glusterfind_SCRIPTS = changelog.py nodeagent.py \ + brickfind.py + +glusterfind_DATA = tool.conf +endif + +EXTRA_DIST = changelog.py nodeagent.py brickfind.py \ + tool.conf changelogdata.py + +CLEANFILES = diff --git a/tools/glusterfind/src/__init__.py b/tools/glusterfind/src/__init__.py new file mode 100644 index 00000000000..1753698b5fa --- /dev/null +++ b/tools/glusterfind/src/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py new file mode 100644 index 00000000000..73b6350188d --- /dev/null +++ b/tools/glusterfind/src/brickfind.py @@ -0,0 +1,118 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +import sys +import logging +from argparse import ArgumentParser, RawDescriptionHelpFormatter +try: + import urllib.parse as urllib +except ImportError: + import urllib +import time + +from utils import mkdirp, setup_logger, create_file, output_write, find +import conf + + +PROG_DESCRIPTION = """ +Changelog Crawler +""" + +logger = logging.getLogger() + + +def brickfind_crawl(brick, args): + if brick.endswith("/"): + brick = brick[0:len(brick)-1] + + working_dir = os.path.dirname(args.outfile) + mkdirp(working_dir, exit_on_err=True, logger=logger) + create_file(args.outfile, exit_on_err=True, logger=logger) + + with open(args.outfile, "a+") as fout: + brick_path_len = len(brick) + + def output_callback(path, filter_result, is_dir): + path = path.strip() + path = path[brick_path_len+1:] + + if args.type == "both": + output_write(fout, path, args.output_prefix, + encode=(not args.no_encode), tag=args.tag, + field_separator=args.field_separator) + else: + if (is_dir and args.type == "d") or ( + (not is_dir) and args.type == "f"): + output_write(fout, path, args.output_prefix, + encode=(not args.no_encode), tag=args.tag, + field_separator=args.field_separator) + + ignore_dirs = [os.path.join(brick, dirname) + for dirname in + conf.get_opt("brick_ignore_dirs").split(",")] + + find(brick, callback_func=output_callback, + ignore_dirs=ignore_dirs) + + fout.flush() + os.fsync(fout.fileno()) + + +def _get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=PROG_DESCRIPTION) + + parser.add_argument("session", help="Session Name") + parser.add_argument("volume", help="Volume Name") + parser.add_argument("node", help="Node Name") + parser.add_argument("brick", help="Brick Name") + parser.add_argument("outfile", help="Output File") + parser.add_argument("tag", help="Tag to prefix file name with") + parser.add_argument("--only-query", help="Only query, No session update", + action="store_true") + parser.add_argument("--debug", help="Debug", action="store_true") + parser.add_argument("--no-encode", + help="Do not encode path in outfile", + action="store_true") + parser.add_argument("--output-prefix", help="File prefix in output", + default=".") + parser.add_argument('--type', help="type: f, f-files only" + " d, d-directories only, by default = both", + default='both') + parser.add_argument("--field-separator", help="Field separator", + default=" ") + + return parser.parse_args() + + +if __name__ == "__main__": + args = _get_args() + session_dir = os.path.join(conf.get_opt("session_dir"), args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + status_file_pre = status_file + ".pre" + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "brickfind.log") + setup_logger(logger, log_file, args.debug) + + time_to_update = int(time.time()) + brickfind_crawl(args.brick, args) + if not args.only_query: + with open(status_file_pre, "w") as f: + f.write(str(time_to_update)) + sys.exit(0) diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py new file mode 100644 index 00000000000..a5e9ea4288f --- /dev/null +++ b/tools/glusterfind/src/changelog.py @@ -0,0 +1,469 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +import sys +import time +import xattr +import logging +from gfind_py2py3 import bytearray_to_str +from argparse import ArgumentParser, RawDescriptionHelpFormatter +import hashlib +try: + import urllib.parse as urllib +except ImportError: + import urllib +import codecs + +import libgfchangelog +from utils import mkdirp, symlink_gfid_to_path +from utils import fail, setup_logger, find +from utils import get_changelog_rollover_time +from utils import output_path_prepare +from changelogdata import ChangelogData +import conf + + +CHANGELOG_LOG_LEVEL = 9 +CHANGELOG_CONN_RETRIES = 5 +CHANGELOGAPI_NUM_WORKERS = 3 +PROG_DESCRIPTION = """ +Changelog Crawler +""" +history_turns = 0 +history_turn_time = 0 + +logger = logging.getLogger() + + +def pgfid_to_path(brick, changelog_data): + """ + For all the pgfids in table, converts into path using recursive + readlink. + """ + # pgfid1 to path1 in case of CREATE/MKNOD/MKDIR/LINK/SYMLINK + for row in changelog_data.gfidpath_get_distinct("pgfid1", {"path1": ""}): + # In case of Data/Metadata only, pgfid1 will not be there + if row[0] == "": + continue + + try: + path = symlink_gfid_to_path(brick, row[0]) + path = output_path_prepare(path, args) + changelog_data.gfidpath_set_path1(path, row[0]) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) + continue + + # pgfid2 to path2 in case of RENAME + for row in changelog_data.gfidpath_get_distinct("pgfid2", + {"type": "RENAME", + "path2": ""}): + # Only in case of Rename pgfid2 exists + if row[0] == "": + continue + + try: + path = symlink_gfid_to_path(brick, row[0]) + path = output_path_prepare(path, args) + changelog_data.gfidpath_set_path2(path, row[0]) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) + continue + + +def populate_pgfid_and_inodegfid(brick, changelog_data): + """ + For all the DATA/METADATA modifications GFID, + If symlink, directly convert to Path using Readlink. + If not symlink, try to get PGFIDs via xattr query and populate it + to pgfid table, collect inodes in inodegfid table + """ + for row in changelog_data.gfidpath_get({"path1": "", "type": "MODIFY"}): + gfid = row[3].strip() + p = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid) + if os.path.islink(p): + # It is a Directory if GFID backend path is symlink + try: + path = symlink_gfid_to_path(brick, gfid) + path = output_path_prepare(path, args) + changelog_data.gfidpath_update({"path1": path}, + {"gfid": gfid}) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) + continue + else: + try: + # INODE and GFID to inodegfid table + changelog_data.inodegfid_add(os.stat(p).st_ino, gfid) + file_xattrs = xattr.list(p) + for x in file_xattrs: + x_str = bytearray_to_str(x) + if x_str.startswith("trusted.pgfid."): + # PGFID in pgfid table + changelog_data.pgfid_add(x_str.split(".")[-1]) + except (IOError, OSError): + # All OS Errors ignored, since failures will be logged + # in End. All GFIDs present in gfidpath table + continue + + +def enum_hard_links_using_gfid2path(brick, gfid, args): + hardlinks = [] + p = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid) + if not os.path.isdir(p): + # we have a symlink or a normal file + try: + file_xattrs = xattr.list(p) + for x in file_xattrs: + x_str = bytearray_to_str(x) + if x_str.startswith("trusted.gfid2path."): + # get the value for the xattr i.e. <PGFID>/<BN> + v = xattr.getxattr(p, x_str) + v_str = bytearray_to_str(v) + pgfid, bn = v_str.split(os.sep) + try: + path = symlink_gfid_to_path(brick, pgfid) + fullpath = os.path.join(path, bn) + fullpath = output_path_prepare(fullpath, args) + hardlinks.append(fullpath) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) + continue + except (IOError, OSError): + pass + return hardlinks + + +def gfid_to_all_paths_using_gfid2path(brick, changelog_data, args): + path = "" + for row in changelog_data.gfidpath_get({"path1": "", "type": "MODIFY"}): + gfid = row[3].strip() + logger.debug("Processing gfid %s" % gfid) + hardlinks = enum_hard_links_using_gfid2path(brick, gfid, args) + + path = ",".join(hardlinks) + + changelog_data.gfidpath_update({"path1": path}, {"gfid": gfid}) + + +def gfid_to_path_using_pgfid(brick, changelog_data, args): + """ + For all the pgfids collected, Converts to Path and + does readdir on those directories and looks up inodegfid + table for matching inode number. + """ + populate_pgfid_and_inodegfid(brick, changelog_data) + + # If no GFIDs needs conversion to Path + if not changelog_data.inodegfid_exists({"converted": 0}): + return + + def inode_filter(path): + # Looks in inodegfid table, if exists returns + # inode number else None + try: + st = os.lstat(path) + except (OSError, IOError): + st = None + + if st and changelog_data.inodegfid_exists({"inode": st.st_ino}): + return st.st_ino + + return None + + # Length of brick path, to remove from output path + brick_path_len = len(brick) + + def output_callback(path, inode): + # For each path found, encodes it and updates path1 + # Also updates converted flag in inodegfid table as 1 + path = path.strip() + path = path[brick_path_len+1:] + + path = output_path_prepare(path, args) + + changelog_data.append_path1(path, inode) + changelog_data.inodegfid_update({"converted": 1}, {"inode": inode}) + + ignore_dirs = [os.path.join(brick, dirname) + for dirname in + conf.get_opt("brick_ignore_dirs").split(",")] + + for row in changelog_data.pgfid_get(): + try: + path = symlink_gfid_to_path(brick, row[0]) + find(os.path.join(brick, path), + callback_func=output_callback, + filter_func=inode_filter, + ignore_dirs=ignore_dirs, + subdirs_crawl=False) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) + continue + + +def gfid_to_path_using_batchfind(brick, changelog_data): + # If all the GFIDs converted using gfid_to_path_using_pgfid + if not changelog_data.inodegfid_exists({"converted": 0}): + return + + def inode_filter(path): + # Looks in inodegfid table, if exists returns + # inode number else None + try: + st = os.lstat(path) + except (OSError, IOError): + st = None + + if st and changelog_data.inodegfid_exists({"inode": st.st_ino}): + return st.st_ino + + return None + + # Length of brick path, to remove from output path + brick_path_len = len(brick) + + def output_callback(path, inode): + # For each path found, encodes it and updates path1 + # Also updates converted flag in inodegfid table as 1 + path = path.strip() + path = path[brick_path_len+1:] + path = output_path_prepare(path, args) + + changelog_data.append_path1(path, inode) + + ignore_dirs = [os.path.join(brick, dirname) + for dirname in + conf.get_opt("brick_ignore_dirs").split(",")] + + # Full Namespace Crawl + find(brick, callback_func=output_callback, + filter_func=inode_filter, + ignore_dirs=ignore_dirs) + + +def parse_changelog_to_db(changelog_data, filename, args): + """ + Parses a Changelog file and populates data in gfidpath table + """ + with codecs.open(filename, encoding="utf-8") as f: + changelogfile = os.path.basename(filename) + for line in f: + data = line.strip().split(" ") + if data[0] == "E" and data[2] in ["CREATE", "MKNOD", "MKDIR"]: + # CREATE/MKDIR/MKNOD + changelog_data.when_create_mknod_mkdir(changelogfile, data) + elif data[0] in ["D", "M"]: + # DATA/META + if not args.only_namespace_changes: + changelog_data.when_data_meta(changelogfile, data) + elif data[0] == "E" and data[2] in ["LINK", "SYMLINK"]: + # LINK/SYMLINK + changelog_data.when_link_symlink(changelogfile, data) + elif data[0] == "E" and data[2] == "RENAME": + # RENAME + changelog_data.when_rename(changelogfile, data) + elif data[0] == "E" and data[2] in ["UNLINK", "RMDIR"]: + # UNLINK/RMDIR + changelog_data.when_unlink_rmdir(changelogfile, data) + + +def get_changes(brick, hash_dir, log_file, start, end, args): + """ + Makes use of libgfchangelog's history API to get changelogs + containing changes from start and end time. Further collects + the modified gfids from the changelogs and writes the list + of gfid to 'gfid_list' file. + """ + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + + # Get previous session + try: + with open(status_file) as f: + start = int(f.read().strip()) + except (ValueError, OSError, IOError): + start = args.start + + try: + libgfchangelog.cl_init() + libgfchangelog.cl_register(brick, hash_dir, log_file, + CHANGELOG_LOG_LEVEL, CHANGELOG_CONN_RETRIES) + except libgfchangelog.ChangelogException as e: + fail("%s Changelog register failed: %s" % (brick, e), logger=logger) + + # Output files to record GFIDs and GFID to Path failure GFIDs + changelog_data = ChangelogData(args.outfile, args) + + # Changelogs path(Hard coded to BRICK/.glusterfs/changelogs + cl_path = os.path.join(brick, ".glusterfs/changelogs") + + # Fail if History fails for requested Start and End + try: + actual_end = libgfchangelog.cl_history_changelog( + cl_path, start, end, CHANGELOGAPI_NUM_WORKERS) + except libgfchangelog.ChangelogException as e: + fail("%s: %s Historical Changelogs not available: %s" % + (args.node, brick, e), logger=logger) + + logger.info("[1/4] Starting changelog parsing ...") + try: + # scan followed by getchanges till scan returns zero. + # history_scan() is blocking call, till it gets the number + # of changelogs to process. Returns zero when no changelogs + # to be processed. returns positive value as number of changelogs + # to be processed, which will be fetched using + # history_getchanges() + changes = [] + while libgfchangelog.cl_history_scan() > 0: + changes = libgfchangelog.cl_history_getchanges() + + for change in changes: + # Ignore if last processed changelog comes + # again in list + if change.endswith(".%s" % start): + continue + try: + parse_changelog_to_db(changelog_data, change, args) + libgfchangelog.cl_history_done(change) + except IOError as e: + logger.warn("Error parsing changelog file %s: %s" % + (change, e)) + + changelog_data.commit() + except libgfchangelog.ChangelogException as e: + fail("%s Error during Changelog Crawl: %s" % (brick, e), + logger=logger) + + logger.info("[1/4] Finished changelog parsing.") + + # Convert all pgfid available from Changelogs + logger.info("[2/4] Starting 'pgfid to path' conversions ...") + pgfid_to_path(brick, changelog_data) + changelog_data.commit() + logger.info("[2/4] Finished 'pgfid to path' conversions.") + + # Convert all gfids recorded for data and metadata to all hardlink paths + logger.info("[3/4] Starting 'gfid2path' conversions ...") + gfid_to_all_paths_using_gfid2path(brick, changelog_data, args) + changelog_data.commit() + logger.info("[3/4] Finished 'gfid2path' conversions.") + + # If some GFIDs fail to get converted from previous step, + # convert using find + logger.info("[4/4] Starting 'gfid to path using batchfind' " + "conversions ...") + gfid_to_path_using_batchfind(brick, changelog_data) + changelog_data.commit() + logger.info("[4/4] Finished 'gfid to path using batchfind' conversions.") + + return actual_end + + +def changelog_crawl(brick, start, end, args): + """ + Init function, prepares working dir and calls Changelog query + """ + if brick.endswith("/"): + brick = brick[0:len(brick)-1] + + # WORKING_DIR/BRICKHASH/OUTFILE + working_dir = os.path.dirname(args.outfile) + brickhash = hashlib.sha1(brick.encode()) + brickhash = str(brickhash.hexdigest()) + working_dir = os.path.join(working_dir, brickhash) + + mkdirp(working_dir, exit_on_err=True, logger=logger) + + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "changelog.%s.log" % brickhash) + + logger.info("%s Started Changelog Crawl. Start: %s, End: %s" + % (brick, start, end)) + return get_changes(brick, working_dir, log_file, start, end, args) + + +def _get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=PROG_DESCRIPTION) + + parser.add_argument("session", help="Session Name") + parser.add_argument("volume", help="Volume Name") + parser.add_argument("node", help="Node Name") + parser.add_argument("brick", help="Brick Name") + parser.add_argument("outfile", help="Output File") + parser.add_argument("start", help="Start Time", type=int) + parser.add_argument("end", help="End Time", type=int) + parser.add_argument("--only-query", help="Query mode only (no session)", + action="store_true") + parser.add_argument("--debug", help="Debug", action="store_true") + parser.add_argument("--no-encode", + help="Do not encode path in outfile", + action="store_true") + parser.add_argument("--output-prefix", help="File prefix in output", + default=".") + parser.add_argument("--type",default="both") + parser.add_argument("-N", "--only-namespace-changes", + help="List only namespace changes", + action="store_true") + + return parser.parse_args() + + +if __name__ == "__main__": + args = _get_args() + mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "changelog.log") + setup_logger(logger, log_file, args.debug) + + session_dir = os.path.join(conf.get_opt("session_dir"), args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + status_file_pre = status_file + ".pre" + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + + end = -1 + if args.only_query: + start = args.start + end = args.end + else: + try: + with open(status_file) as f: + start = int(f.read().strip()) + except (ValueError, OSError, IOError): + start = args.start + + # end time is optional; so a -1 may be sent to use the default method of + # identifying the end time + if end == -1: + end = int(time.time()) - get_changelog_rollover_time(args.volume) + + logger.info("%s Started Changelog Crawl - Start: %s End: %s" % (args.brick, + start, + end)) + actual_end = changelog_crawl(args.brick, start, end, args) + if not args.only_query: + with open(status_file_pre, "w") as f: + f.write(str(actual_end)) + + logger.info("%s Finished Changelog Crawl - End: %s" % (args.brick, + actual_end)) + sys.exit(0) diff --git a/tools/glusterfind/src/changelogdata.py b/tools/glusterfind/src/changelogdata.py new file mode 100644 index 00000000000..641593cf4b1 --- /dev/null +++ b/tools/glusterfind/src/changelogdata.py @@ -0,0 +1,440 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sqlite3 +import os + +from utils import RecordType, unquote_plus_space_newline +from utils import output_path_prepare + + +class OutputMerger(object): + """ + Class to merge the output files collected from + different nodes + """ + def __init__(self, db_path, all_dbs): + self.conn = sqlite3.connect(db_path) + self.cursor = self.conn.cursor() + self.cursor_reader = self.conn.cursor() + query = "DROP TABLE IF EXISTS finallist" + self.cursor.execute(query) + + query = """ + CREATE TABLE finallist( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts VARCHAR, + type VARCHAR, + gfid VARCHAR, + path1 VARCHAR, + path2 VARCHAR, + UNIQUE (type, path1, path2) ON CONFLICT IGNORE + ) + """ + self.cursor.execute(query) + + # If node database exists, read each db and insert into + # final table. Ignore if combination of TYPE PATH1 PATH2 + # already exists + for node_db in all_dbs: + if os.path.exists(node_db): + conn = sqlite3.connect(node_db) + cursor = conn.cursor() + query = """ + SELECT ts, type, gfid, path1, path2 + FROM gfidpath + WHERE path1 != '' + ORDER BY id ASC + """ + for row in cursor.execute(query): + self.add_if_not_exists(row[0], row[1], row[2], + row[3], row[4]) + + self.conn.commit() + + def add_if_not_exists(self, ts, ty, gfid, path1, path2=""): + # Adds record to finallist only if not exists + query = """ + INSERT INTO finallist(ts, type, gfid, path1, path2) + VALUES(?, ?, ?, ?, ?) + """ + self.cursor.execute(query, (ts, ty, gfid, path1, path2)) + + def get(self): + query = """SELECT type, path1, path2 FROM finallist + ORDER BY ts ASC, id ASC""" + return self.cursor_reader.execute(query) + + def get_failures(self): + query = """ + SELECT gfid + FROM finallist + WHERE path1 = '' OR (path2 = '' AND type = 'RENAME') + """ + return self.cursor_reader.execute(query) + + +class ChangelogData(object): + def __init__(self, dbpath, args): + self.conn = sqlite3.connect(dbpath) + self.cursor = self.conn.cursor() + self.cursor_reader = self.conn.cursor() + self._create_table_gfidpath() + self._create_table_pgfid() + self._create_table_inodegfid() + self.args = args + self.path_sep = "/" + + def _create_table_gfidpath(self): + drop_table = "DROP TABLE IF EXISTS gfidpath" + self.cursor.execute(drop_table) + + create_table = """ + CREATE TABLE gfidpath( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts VARCHAR, + type VARCHAR, + gfid VARCHAR(40), + pgfid1 VARCHAR(40) DEFAULT '', + bn1 VARCHAR(500) DEFAULT '', + pgfid2 VARCHAR(40) DEFAULT '', + bn2 VARCHAR(500) DEFAULT '', + path1 VARCHAR DEFAULT '', + path2 VARCHAR DEFAULT '' + ) + """ + self.cursor.execute(create_table) + + create_index = """ + CREATE INDEX gfid_index ON gfidpath(gfid); + """ + self.cursor.execute(create_index) + + def _create_table_inodegfid(self): + drop_table = "DROP TABLE IF EXISTS inodegfid" + self.cursor.execute(drop_table) + + create_table = """ + CREATE TABLE inodegfid( + inode INTEGER PRIMARY KEY, + gfid VARCHAR(40), + converted INTEGER DEFAULT 0, + UNIQUE (inode, gfid) ON CONFLICT IGNORE + ) + """ + self.cursor.execute(create_table) + + def _create_table_pgfid(self): + drop_table = "DROP TABLE IF EXISTS pgfid" + self.cursor.execute(drop_table) + + create_table = """ + CREATE TABLE pgfid( + pgfid VARCHAR(40) PRIMARY KEY, + UNIQUE (pgfid) ON CONFLICT IGNORE + ) + """ + self.cursor.execute(create_table) + + def _get(self, tablename, filters): + # SELECT * FROM <TABLENAME> WHERE <CONDITION> + params = [] + query = "SELECT * FROM %s WHERE 1=1" % tablename + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + return self.cursor_reader.execute(query, params) + + def _get_distinct(self, tablename, distinct_field, filters): + # SELECT DISTINCT <COL> FROM <TABLENAME> WHERE <CONDITION> + params = [] + query = "SELECT DISTINCT %s FROM %s WHERE 1=1" % (distinct_field, + tablename) + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + return self.cursor_reader.execute(query, params) + + def _delete(self, tablename, filters): + # DELETE FROM <TABLENAME> WHERE <CONDITIONS> + query = "DELETE FROM %s WHERE 1=1" % tablename + params = [] + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + self.cursor.execute(query, params) + + def _add(self, tablename, data): + # INSERT INTO <TABLENAME>(<col1>, <col2>..) VALUES(?,?..) + query = "INSERT INTO %s(" % tablename + fields = [] + params = [] + for key, value in data.items(): + fields.append(key) + params.append(value) + + values_substitute = len(fields)*["?"] + query += "%s) VALUES(%s)" % (",".join(fields), + ",".join(values_substitute)) + self.cursor.execute(query, params) + + def _update(self, tablename, data, filters): + # UPDATE <TABLENAME> SET col1 = ?,.. WHERE col1=? AND .. + params = [] + update_fields = [] + for key, value in data.items(): + update_fields.append("%s = ?" % key) + params.append(value) + + query = "UPDATE %s SET %s WHERE 1 = 1" % (tablename, + ", ".join(update_fields)) + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + self.cursor.execute(query, params) + + def _exists(self, tablename, filters): + if not filters: + return False + + query = "SELECT COUNT(1) FROM %s WHERE 1=1" % tablename + params = [] + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + self.cursor.execute(query, params) + row = self.cursor.fetchone() + return True if row[0] > 0 else False + + def gfidpath_add(self, changelogfile, ty, gfid, pgfid1="", bn1="", + pgfid2="", bn2="", path1="", path2=""): + self._add("gfidpath", { + "ts": changelogfile.split(".")[-1], + "type": ty, + "gfid": gfid, + "pgfid1": pgfid1, + "bn1": bn1, + "pgfid2": pgfid2, + "bn2": bn2, + "path1": path1, + "path2": path2 + }) + + def gfidpath_update(self, data, filters): + self._update("gfidpath", data, filters) + + def gfidpath_delete(self, filters): + self._delete("gfidpath", filters) + + def gfidpath_exists(self, filters): + return self._exists("gfidpath", filters) + + def gfidpath_get(self, filters={}): + return self._get("gfidpath", filters) + + def gfidpath_get_distinct(self, distinct_field, filters={}): + return self._get_distinct("gfidpath", distinct_field, filters) + + def pgfid_add(self, pgfid): + self._add("pgfid", { + "pgfid": pgfid + }) + + def pgfid_update(self, data, filters): + self._update("pgfid", data, filters) + + def pgfid_get(self, filters={}): + return self._get("pgfid", filters) + + def pgfid_get_distinct(self, distinct_field, filters={}): + return self._get_distinct("pgfid", distinct_field, filters) + + def pgfid_exists(self, filters): + return self._exists("pgfid", filters) + + def inodegfid_add(self, inode, gfid, converted=0): + self._add("inodegfid", { + "inode": inode, + "gfid": gfid, + "converted": converted + }) + + def inodegfid_update(self, data, filters): + self._update("inodegfid", data, filters) + + def inodegfid_get(self, filters={}): + return self._get("inodegfid", filters) + + def inodegfid_get_distinct(self, distinct_field, filters={}): + return self._get_distinct("inodegfid", distinct_field, filters) + + def inodegfid_exists(self, filters): + return self._exists("inodegfid", filters) + + def append_path1(self, path, inode): + # || is for concatenate in SQL + query = """UPDATE gfidpath SET path1 = path1 || ',' || ? + WHERE gfid IN (SELECT gfid FROM inodegfid WHERE inode = ?)""" + self.cursor.execute(query, (path, inode)) + + def gfidpath_set_path1(self, path1, pgfid1): + # || is for concatenate in SQL + if path1 == "": + update_str1 = "? || bn1" + update_str2 = "? || bn2" + else: + update_str1 = "? || '{0}' || bn1".format(self.path_sep) + update_str2 = "? || '{0}' || bn2".format(self.path_sep) + + query = """UPDATE gfidpath SET path1 = %s + WHERE pgfid1 = ?""" % update_str1 + self.cursor.execute(query, (path1, pgfid1)) + + # Set Path2 if pgfid1 and pgfid2 are same + query = """UPDATE gfidpath SET path2 = %s + WHERE pgfid2 = ?""" % update_str2 + self.cursor.execute(query, (path1, pgfid1)) + + def gfidpath_set_path2(self, path2, pgfid2): + # || is for concatenate in SQL + if path2 == "": + update_str = "? || bn2" + else: + update_str = "? || '{0}' || bn2".format(self.path_sep) + + query = """UPDATE gfidpath SET path2 = %s + WHERE pgfid2 = ?""" % update_str + self.cursor.execute(query, (path2, pgfid2)) + + def when_create_mknod_mkdir(self, changelogfile, data): + # E <GFID> <MKNOD|CREATE|MKDIR> <MODE> <USER> <GRP> <PGFID>/<BNAME> + # Add the Entry to DB + pgfid1, bn1 = data[6].split("/", 1) + + if self.args.no_encode: + bn1 = unquote_plus_space_newline(bn1).strip() + + self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1) + + def when_rename(self, changelogfile, data): + # E <GFID> RENAME <OLD_PGFID>/<BNAME> <PGFID>/<BNAME> + pgfid1, bn1 = data[3].split("/", 1) + pgfid2, bn2 = data[4].split("/", 1) + + if self.args.no_encode: + bn1 = unquote_plus_space_newline(bn1).strip() + bn2 = unquote_plus_space_newline(bn2).strip() + + if self.gfidpath_exists({"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}): + # If <OLD_PGFID>/<BNAME> is same as CREATE, Update + # <NEW_PGFID>/<BNAME> in NEW. + self.gfidpath_update({"pgfid1": pgfid2, "bn1": bn2}, + {"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}) + elif self.gfidpath_exists({"gfid": data[1], "type": "RENAME", + "pgfid2": pgfid1, "bn2": bn1}): + # If we are renaming file back to original name then just + # delete the entry since it will effectively be a no-op + if self.gfidpath_exists({"gfid": data[1], "type": "RENAME", + "pgfid2": pgfid1, "bn2": bn1, + "pgfid1": pgfid2, "bn1": bn2}): + self.gfidpath_delete({"gfid": data[1], "type": "RENAME", + "pgfid2": pgfid1, "bn2": bn1}) + else: + # If <OLD_PGFID>/<BNAME> is same as <PGFID2>/<BN2> + # (may be previous RENAME) + # then UPDATE <NEW_PGFID>/<BNAME> as <PGFID2>/<BN2> + self.gfidpath_update({"pgfid2": pgfid2, "bn2": bn2}, + {"gfid": data[1], "type": "RENAME", + "pgfid2": pgfid1, "bn2": bn1}) + else: + # Else insert as RENAME + self.gfidpath_add(changelogfile, RecordType.RENAME, data[1], + pgfid1, bn1, pgfid2, bn2) + + if self.gfidpath_exists({"gfid": data[1], "type": "MODIFY"}): + # If MODIFY exists already for that GFID, remove it and insert + # again so that MODIFY entry comes after RENAME entry + # Output will have MODIFY <NEWNAME> + self.gfidpath_delete({"gfid": data[1], "type": "MODIFY"}) + self.gfidpath_add(changelogfile, RecordType.MODIFY, data[1]) + + def when_link_symlink(self, changelogfile, data): + # E <GFID> <LINK|SYMLINK> <PGFID>/<BASENAME> + # Add as New record in Db as Type NEW + pgfid1, bn1 = data[3].split("/", 1) + if self.args.no_encode: + bn1 = unquote_plus_space_newline(bn1).strip() + + self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1) + + def when_data_meta(self, changelogfile, data): + # If GFID row exists, Ignore else Add to Db + if not self.gfidpath_exists({"gfid": data[1], "type": "NEW"}) and \ + not self.gfidpath_exists({"gfid": data[1], "type": "MODIFY"}): + self.gfidpath_add(changelogfile, RecordType.MODIFY, data[1]) + + def when_unlink_rmdir(self, changelogfile, data): + # E <GFID> <UNLINK|RMDIR> <PGFID>/<BASENAME> + pgfid1, bn1 = data[3].split("/", 1) + + if self.args.no_encode: + bn1 = unquote_plus_space_newline(bn1).strip() + + deleted_path = data[4] if len(data) == 5 else "" + if deleted_path != "": + deleted_path = unquote_plus_space_newline(deleted_path) + deleted_path = output_path_prepare(deleted_path, self.args) + + if self.gfidpath_exists({"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}): + # If path exists in table as NEW with same GFID + # Delete that row + self.gfidpath_delete({"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}) + else: + # Else Record as DELETE + self.gfidpath_add(changelogfile, RecordType.DELETE, data[1], + pgfid1, bn1, path1=deleted_path) + + # Update path1 as deleted_path if pgfid1 and bn1 is same as deleted + self.gfidpath_update({"path1": deleted_path}, {"gfid": data[1], + "pgfid1": pgfid1, + "bn1": bn1}) + + # Update path2 as deleted_path if pgfid2 and bn2 is same as deleted + self.gfidpath_update({"path2": deleted_path}, { + "type": RecordType.RENAME, + "gfid": data[1], + "pgfid2": pgfid1, + "bn2": bn1}) + + # If deleted directory is parent for somebody + query1 = """UPDATE gfidpath SET path1 = ? || '{0}' || bn1 + WHERE pgfid1 = ? AND path1 != ''""".format(self.path_sep) + self.cursor.execute(query1, (deleted_path, data[1])) + + query1 = """UPDATE gfidpath SET path2 = ? || '{0}' || bn1 + WHERE pgfid2 = ? AND path2 != ''""".format(self.path_sep) + self.cursor.execute(query1, (deleted_path, data[1])) + + def commit(self): + self.conn.commit() diff --git a/tools/glusterfind/src/conf.py b/tools/glusterfind/src/conf.py new file mode 100644 index 00000000000..3849ba5dd1f --- /dev/null +++ b/tools/glusterfind/src/conf.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +try: + from ConfigParser import ConfigParser +except ImportError: + from configparser import ConfigParser + +config = ConfigParser() +config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), + "tool.conf")) + + +def list_change_detectors(): + return dict(config.items("change_detectors")).keys() + + +def get_opt(opt): + return config.get("vars", opt) + + +def get_change_detector(opt): + return config.get("change_detectors", opt) diff --git a/tools/glusterfind/src/gfind_py2py3.py b/tools/glusterfind/src/gfind_py2py3.py new file mode 100644 index 00000000000..87324fbf350 --- /dev/null +++ b/tools/glusterfind/src/gfind_py2py3.py @@ -0,0 +1,88 @@ +# +# Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +# All python2/python3 compatibility routines + +import os +import sys +from ctypes import create_string_buffer + +if sys.version_info >= (3,): + + # Raw conversion of bytearray to string. Used in the cases where + # buffer is created by create_string_buffer which is a 8-bit char + # array and passed to syscalls to fetch results. Using encode/decode + # doesn't work as it converts to string altering the size. + # def bytearray_to_str(byte_arr): + def bytearray_to_str(byte_arr): + return ''.join([chr(b) for b in byte_arr]) + + def gf_create_string_buffer(size): + return create_string_buffer(b'\0', size) + + def gfind_history_changelog(libgfc, changelog_path, start, end, num_parallel, + actual_end): + return libgfc.gf_history_changelog(changelog_path.encode(), start, end, num_parallel, + actual_end) + + def gfind_changelog_register(libgfc, brick, path, log_file, log_level, + retries): + return libgfc.gf_changelog_register(brick.encode(), path.encode(), log_file.encode(), + log_level, retries) + + def gfind_history_changelog_done(libgfc, clfile): + return libgfc.gf_history_changelog_done(clfile.encode()) + + def gfind_write_row(f, row, field_separator, p_rep, row_2_rep): + f.write(u"{0}{1}{2}{3}{4}\n".format(row, + field_separator, + p_rep, + field_separator, + row_2_rep)) + + def gfind_write(f, row, field_separator, p_rep): + f.write(u"{0}{1}{2}\n".format(row, + field_separator, + p_rep)) + + +else: + + # Raw conversion of bytearray to string + def bytearray_to_str(byte_arr): + return byte_arr + + def gf_create_string_buffer(size): + return create_string_buffer('\0', size) + + def gfind_history_changelog(libgfc, changelog_path, start, end, num_parallel, + actual_end): + return libgfc.gf_history_changelog(changelog_path, start, end, + num_parallel, actual_end) + + def gfind_changelog_register(libgfc, brick, path, log_file, log_level, + retries): + return libgfc.gf_changelog_register(brick, path, log_file, + log_level, retries) + + def gfind_history_changelog_done(libgfc, clfile): + return libgfc.gf_history_changelog_done(clfile) + + def gfind_write_row(f, row, field_separator, p_rep, row_2_rep): + f.write(u"{0}{1}{2}{3}{4}\n".format(row, + field_separator, + p_rep, + field_separator, + row_2_rep).encode()) + + def gfind_write(f, row, field_separator, p_rep): + f.write(u"{0}{1}{2}\n".format(row, + field_separator, + p_rep).encode()) diff --git a/tools/glusterfind/src/libgfchangelog.py b/tools/glusterfind/src/libgfchangelog.py new file mode 100644 index 00000000000..513bb101e93 --- /dev/null +++ b/tools/glusterfind/src/libgfchangelog.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import os +from ctypes import CDLL, RTLD_GLOBAL, get_errno, create_string_buffer, c_ulong, byref +from ctypes.util import find_library +from gfind_py2py3 import bytearray_to_str, gf_create_string_buffer +from gfind_py2py3 import gfind_history_changelog, gfind_changelog_register +from gfind_py2py3 import gfind_history_changelog_done + + +class ChangelogException(OSError): + pass + +libgfc = CDLL(find_library("gfchangelog"), mode=RTLD_GLOBAL, use_errno=True) + + +def raise_oserr(prefix=None): + errn = get_errno() + prefix_or_empty = prefix + ": " if prefix else "" + raise ChangelogException(errn, prefix_or_empty + os.strerror(errn)) + + +def cl_init(): + ret = libgfc.gf_changelog_init(None) + if ret == -1: + raise_oserr(prefix="gf_changelog_init") + + +def cl_register(brick, path, log_file, log_level, retries=0): + ret = gfind_changelog_register(libgfc, brick, path, log_file,log_level, retries) + if ret == -1: + raise_oserr(prefix="gf_changelog_register") + + +def cl_history_scan(): + ret = libgfc.gf_history_changelog_scan() + if ret == -1: + raise_oserr(prefix="gf_history_changelog_scan") + + return ret + + +def cl_history_changelog(changelog_path, start, end, num_parallel): + actual_end = c_ulong() + ret = gfind_history_changelog(libgfc,changelog_path, start, end, + num_parallel, + byref(actual_end)) + if ret == -1: + raise_oserr(prefix="gf_history_changelog") + + return actual_end.value + + +def cl_history_startfresh(): + ret = libgfc.gf_history_changelog_start_fresh() + if ret == -1: + raise_oserr(prefix="gf_history_changelog_start_fresh") + + +def cl_history_getchanges(): + """ remove hardcoding for path name length """ + def clsort(f): + return f.split('.')[-1] + + changes = [] + buf = gf_create_string_buffer(4096) + + while True: + ret = libgfc.gf_history_changelog_next_change(buf, 4096) + if ret in (0, -1): + break + # py2 and py3 compatibility + result = bytearray_to_str(buf.raw[:ret - 1]) + changes.append(result) + if ret == -1: + raise_oserr(prefix="gf_history_changelog_next_change") + + return sorted(changes, key=clsort) + + +def cl_history_done(clfile): + ret = gfind_history_changelog_done(libgfc, clfile) + if ret == -1: + raise_oserr(prefix="gf_history_changelog_done") diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py new file mode 100644 index 00000000000..4b5466d0114 --- /dev/null +++ b/tools/glusterfind/src/main.py @@ -0,0 +1,921 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sys +from errno import ENOENT, ENOTEMPTY +import time +from multiprocessing import Process +import os +import xml.etree.cElementTree as etree +from argparse import ArgumentParser, RawDescriptionHelpFormatter, Action +from gfind_py2py3 import gfind_write_row, gfind_write +import logging +import shutil +import tempfile +import signal +from datetime import datetime +import codecs +import re + +from utils import execute, is_host_local, mkdirp, fail +from utils import setup_logger, human_time, handle_rm_error +from utils import get_changelog_rollover_time, cache_output, create_file +import conf +from changelogdata import OutputMerger + +PROG_DESCRIPTION = """ +GlusterFS Incremental API +""" +ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError + +logger = logging.getLogger() +vol_statusStr = "" +gtmpfilename = None +g_pid_nodefile_map = {} + + +class StoreAbsPath(Action): + def __init__(self, option_strings, dest, nargs=None, **kwargs): + super(StoreAbsPath, self).__init__(option_strings, dest, **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, self.dest, os.path.abspath(values)) + + +def get_pem_key_path(session, volume): + return os.path.join(conf.get_opt("session_dir"), + session, + volume, + "%s_%s_secret.pem" % (session, volume)) + + +def node_cmd(host, host_uuid, task, cmd, args, opts): + """ + Runs command via ssh if host is not local + """ + try: + localdir = is_host_local(host_uuid) + + # this is so to avoid deleting the ssh keys on local node which + # otherwise cause ssh password prompts on the console (race conditions) + # mode_delete() should be cleaning up the session tree + if localdir and task == "delete": + return + + pem_key_path = get_pem_key_path(args.session, args.volume) + + if not localdir: + # prefix with ssh command if not local node + cmd = ["ssh", + "-oNumberOfPasswordPrompts=0", + "-oStrictHostKeyChecking=no", + # We force TTY allocation (-t -t) so that Ctrl+C is handed + # through; see: + # https://bugzilla.redhat.com/show_bug.cgi?id=1382236 + # Note that this turns stderr of the remote `cmd` + # into stdout locally. + "-t", + "-t", + "-i", pem_key_path, + "root@%s" % host] + cmd + + (returncode, err, out) = execute(cmd, logger=logger) + if returncode != 0: + # Because the `-t -t` above turns the remote stderr into + # local stdout, we need to log both stderr and stdout + # here to print all error messages. + fail("%s - %s failed; stdout (including remote stderr):\n" + "%s\n" + "stderr:\n" + "%s" % (host, task, out, err), + returncode, + logger=logger) + + if opts.get("copy_outfile", False) and not localdir: + cmd_copy = ["scp", + "-oNumberOfPasswordPrompts=0", + "-oStrictHostKeyChecking=no", + "-i", pem_key_path, + "root@%s:/%s" % (host, opts.get("node_outfile")), + os.path.dirname(opts.get("node_outfile"))] + execute(cmd_copy, exit_msg="%s - Copy command failed" % host, + logger=logger) + except KeyboardInterrupt: + sys.exit(2) + + +def run_cmd_nodes(task, args, **kwargs): + global g_pid_nodefile_map + nodes = get_nodes(args.volume) + pool = [] + for num, node in enumerate(nodes): + host, brick = node[1].split(":") + host_uuid = node[0] + cmd = [] + opts = {} + + # tmpfilename is valid only for tasks: pre, query and cleanup + tmpfilename = kwargs.get("tmpfilename", "BADNAME") + + node_outfile = os.path.join(conf.get_opt("working_dir"), + args.session, args.volume, + tmpfilename, + "tmp_output_%s" % num) + + if task == "pre": + if vol_statusStr != "Started": + fail("Volume %s is not online" % args.volume, + logger=logger) + + # If Full backup is requested or start time is zero, use brickfind + change_detector = conf.get_change_detector("changelog") + tag = None + if args.full: + change_detector = conf.get_change_detector("brickfind") + tag = args.tag_for_full_find.strip() + if tag == "": + tag = '""' if not is_host_local(host_uuid) else "" + + # remote file will be copied into this directory + mkdirp(os.path.dirname(node_outfile), + exit_on_err=True, logger=logger) + + FS = args.field_separator + if not is_host_local(host_uuid): + FS = "'" + FS + "'" + + cmd = [change_detector, + args.session, + args.volume, + host, + brick, + node_outfile] + \ + ([str(kwargs.get("start")), str(kwargs.get("end"))] + if not args.full else []) + \ + ([tag] if tag is not None else []) + \ + ["--output-prefix", args.output_prefix] + \ + (["--debug"] if args.debug else []) + \ + (["--no-encode"] if args.no_encode else []) + \ + (["--only-namespace-changes"] if args.only_namespace_changes + else []) + \ + (["--type", args.type]) + \ + (["--field-separator", FS] if args.full else []) + + opts["node_outfile"] = node_outfile + opts["copy_outfile"] = True + elif task == "query": + # If Full backup is requested or start time is zero, use brickfind + tag = None + change_detector = conf.get_change_detector("changelog") + if args.full: + change_detector = conf.get_change_detector("brickfind") + tag = args.tag_for_full_find.strip() + if tag == "": + tag = '""' if not is_host_local(host_uuid) else "" + + # remote file will be copied into this directory + mkdirp(os.path.dirname(node_outfile), + exit_on_err=True, logger=logger) + + FS = args.field_separator + if not is_host_local(host_uuid): + FS = "'" + FS + "'" + + cmd = [change_detector, + args.session, + args.volume, + host, + brick, + node_outfile] + \ + ([str(kwargs.get("start")), str(kwargs.get("end"))] + if not args.full else []) + \ + ([tag] if tag is not None else []) + \ + ["--only-query"] + \ + ["--output-prefix", args.output_prefix] + \ + (["--debug"] if args.debug else []) + \ + (["--no-encode"] if args.no_encode else []) + \ + (["--only-namespace-changes"] + if args.only_namespace_changes else []) + \ + (["--type", args.type]) + \ + (["--field-separator", FS] if args.full else []) + + opts["node_outfile"] = node_outfile + opts["copy_outfile"] = True + elif task == "cleanup": + # After pre/query run, cleanup the working directory and other + # temp files. Remove the directory to which node_outfile has + # been copied in main node + try: + os.remove(node_outfile) + except (OSError, IOError): + logger.warn("Failed to cleanup temporary file %s" % + node_outfile) + pass + + cmd = [conf.get_opt("nodeagent"), + "cleanup", + args.session, + args.volume, + os.path.dirname(node_outfile)] + \ + (["--debug"] if args.debug else []) + elif task == "create": + if vol_statusStr != "Started": + fail("Volume %s is not online" % args.volume, + logger=logger) + + # When glusterfind create, create session directory in + # each brick nodes + cmd = [conf.get_opt("nodeagent"), + "create", + args.session, + args.volume, + brick, + kwargs.get("time_to_update")] + \ + (["--debug"] if args.debug else []) + \ + (["--reset-session-time"] if args.reset_session_time + else []) + elif task == "post": + # Rename pre status file to actual status file in each node + cmd = [conf.get_opt("nodeagent"), + "post", + args.session, + args.volume, + brick] + \ + (["--debug"] if args.debug else []) + elif task == "delete": + # When glusterfind delete, cleanup all the session files/dirs + # from each node. + cmd = [conf.get_opt("nodeagent"), + "delete", + args.session, + args.volume] + \ + (["--debug"] if args.debug else []) + + if cmd: + p = Process(target=node_cmd, + args=(host, host_uuid, task, cmd, args, opts)) + p.start() + pool.append(p) + g_pid_nodefile_map[p.pid] = node_outfile + + for num, p in enumerate(pool): + p.join() + if p.exitcode != 0: + logger.warn("Command %s failed in %s" % (task, nodes[num][1])) + if task in ["create", "delete"]: + fail("Command %s failed in %s" % (task, nodes[num][1])) + elif task == "pre" or task == "query": + if args.disable_partial: + sys.exit(1) + else: + del g_pid_nodefile_map[p.pid] + + +@cache_output +def get_nodes(volume): + """ + Get the gluster volume info xml output and parse to get + the brick details. + """ + global vol_statusStr + + cmd = ["gluster", 'volume', 'info', volume, "--xml"] + _, data, _ = execute(cmd, + exit_msg="Failed to Run Gluster Volume Info", + logger=logger) + tree = etree.fromstring(data) + + # Test to check if volume has been deleted after session creation + count_el = tree.find('volInfo/volumes/count') + if int(count_el.text) == 0: + fail("Unable to get volume details", logger=logger) + + # this status is used in caller: run_cmd_nodes + vol_statusStr = tree.find('volInfo/volumes/volume/statusStr').text + vol_typeStr = tree.find('volInfo/volumes/volume/typeStr').text + + nodes = [] + volume_el = tree.find('volInfo/volumes/volume') + try: + brick_elems = [] + if vol_typeStr == "Tier": + brick_elems.append('bricks/hotBricks/brick') + brick_elems.append('bricks/coldBricks/brick') + else: + brick_elems.append('bricks/brick') + + for elem in brick_elems: + for b in volume_el.findall(elem): + nodes.append((b.find('hostUuid').text, + b.find('name').text)) + except (ParseError, AttributeError, ValueError) as e: + fail("Failed to parse Volume Info: %s" % e, logger=logger) + + return nodes + + +def _get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=PROG_DESCRIPTION) + subparsers = parser.add_subparsers(dest="mode") + subparsers.required = True + + # create <SESSION> <VOLUME> [--debug] [--force] + parser_create = subparsers.add_parser('create') + parser_create.add_argument("session", help="Session Name") + parser_create.add_argument("volume", help="Volume Name") + parser_create.add_argument("--debug", help="Debug", action="store_true") + parser_create.add_argument("--force", help="Force option to recreate " + "the session", action="store_true") + parser_create.add_argument("--reset-session-time", + help="Reset Session Time to Current Time", + action="store_true") + + # delete <SESSION> <VOLUME> [--debug] + parser_delete = subparsers.add_parser('delete') + parser_delete.add_argument("session", help="Session Name") + parser_delete.add_argument("volume", help="Volume Name") + parser_delete.add_argument("--debug", help="Debug", action="store_true") + + # list [--session <SESSION>] [--volume <VOLUME>] + parser_list = subparsers.add_parser('list') + parser_list.add_argument("--session", help="Session Name", default="") + parser_list.add_argument("--volume", help="Volume Name", default="") + parser_list.add_argument("--debug", help="Debug", action="store_true") + + # pre <SESSION> <VOLUME> <OUTFILE> + # [--output-prefix <OUTPUT_PREFIX>] [--full] + parser_pre = subparsers.add_parser('pre') + parser_pre.add_argument("session", help="Session Name") + parser_pre.add_argument("volume", help="Volume Name") + parser_pre.add_argument("outfile", help="Output File", action=StoreAbsPath) + parser_pre.add_argument("--debug", help="Debug", action="store_true") + parser_pre.add_argument("--no-encode", + help="Do not encode path in output file", + action="store_true") + parser_pre.add_argument("--full", help="Full find", action="store_true") + parser_pre.add_argument("--disable-partial", help="Disable Partial find, " + "Fail when one node fails", action="store_true") + parser_pre.add_argument("--output-prefix", help="File prefix in output", + default=".") + parser_pre.add_argument("--regenerate-outfile", + help="Regenerate outfile, discard the outfile " + "generated from last pre command", + action="store_true") + parser_pre.add_argument("-N", "--only-namespace-changes", + help="List only namespace changes", + action="store_true") + parser_pre.add_argument("--tag-for-full-find", + help="Tag prefix for file names emitted during" + " a full find operation; default: \"NEW\"", + default="NEW") + parser_pre.add_argument('--type', help="type: f, f-files only" + " d, d-directories only, by default = both", + default='both', choices=["f", "d", "both"]) + parser_pre.add_argument("--field-separator", help="Field separator string", + default=" ") + + # query <VOLUME> <OUTFILE> --since-time <SINCE_TIME> + # [--output-prefix <OUTPUT_PREFIX>] [--full] + parser_query = subparsers.add_parser('query') + parser_query.add_argument("volume", help="Volume Name") + parser_query.add_argument("outfile", help="Output File", + action=StoreAbsPath) + parser_query.add_argument("--since-time", help="UNIX epoch time since " + "which listing is required", type=int) + parser_query.add_argument("--end-time", help="UNIX epoch time up to " + "which listing is required", type=int) + parser_query.add_argument("--no-encode", + help="Do not encode path in output file", + action="store_true") + parser_query.add_argument("--full", help="Full find", action="store_true") + parser_query.add_argument("--debug", help="Debug", action="store_true") + parser_query.add_argument("--disable-partial", help="Disable Partial find," + " Fail when one node fails", action="store_true") + parser_query.add_argument("--output-prefix", help="File prefix in output", + default=".") + parser_query.add_argument("-N", "--only-namespace-changes", + help="List only namespace changes", + action="store_true") + parser_query.add_argument("--tag-for-full-find", + help="Tag prefix for file names emitted during" + " a full find operation; default: \"NEW\"", + default="NEW") + parser_query.add_argument('--type', help="type: f, f-files only" + " d, d-directories only, by default = both", + default='both', choices=["f", "d", "both"]) + parser_query.add_argument("--field-separator", + help="Field separator string", + default=" ") + + # post <SESSION> <VOLUME> + parser_post = subparsers.add_parser('post') + parser_post.add_argument("session", help="Session Name") + parser_post.add_argument("volume", help="Volume Name") + parser_post.add_argument("--debug", help="Debug", action="store_true") + + return parser.parse_args() + + +def ssh_setup(args): + pem_key_path = get_pem_key_path(args.session, args.volume) + + if not os.path.exists(pem_key_path): + # Generate ssh-key + cmd = ["ssh-keygen", + "-N", + "", + "-f", + pem_key_path] + execute(cmd, + exit_msg="Unable to generate ssh key %s" + % pem_key_path, + logger=logger) + + logger.info("Ssh key generated %s" % pem_key_path) + + try: + shutil.copyfile(pem_key_path + ".pub", + os.path.join(conf.get_opt("session_dir"), + ".keys", + "%s_%s_secret.pem.pub" % (args.session, + args.volume))) + except (IOError, OSError) as e: + fail("Failed to copy public key to %s: %s" + % (os.path.join(conf.get_opt("session_dir"), ".keys"), e), + logger=logger) + + # Copy pub file to all nodes + cmd = ["gluster", + "system::", + "copy", + "file", + "/glusterfind/.keys/%s.pub" % os.path.basename(pem_key_path)] + + execute(cmd, exit_msg="Failed to distribute ssh keys", logger=logger) + + logger.info("Distributed ssh key to all nodes of Volume") + + # Add to authorized_keys file in each node + cmd = ["gluster", + "system::", + "execute", + "add_secret_pub", + "root", + "/glusterfind/.keys/%s.pub" % os.path.basename(pem_key_path)] + execute(cmd, + exit_msg="Failed to add ssh keys to authorized_keys file", + logger=logger) + + logger.info("Ssh key added to authorized_keys of Volume nodes") + + +def enable_volume_options(args): + execute(["gluster", "volume", "set", + args.volume, "build-pgfid", "on"], + exit_msg="Failed to set volume option build-pgfid on", + logger=logger) + logger.info("Volume option set %s, build-pgfid on" % args.volume) + + execute(["gluster", "volume", "set", + args.volume, "changelog.changelog", "on"], + exit_msg="Failed to set volume option " + "changelog.changelog on", logger=logger) + logger.info("Volume option set %s, changelog.changelog on" + % args.volume) + + execute(["gluster", "volume", "set", + args.volume, "changelog.capture-del-path", "on"], + exit_msg="Failed to set volume option " + "changelog.capture-del-path on", logger=logger) + logger.info("Volume option set %s, changelog.capture-del-path on" + % args.volume) + + +def write_output(outfile, outfilemerger, field_separator): + with codecs.open(outfile, "a", encoding="utf-8") as f: + for row in outfilemerger.get(): + # Multiple paths in case of Hardlinks + paths = row[1].split(",") + row_2_rep = None + for p in paths: + if p == "": + continue + p_rep = p.replace("//", "/") + if not row_2_rep: + row_2_rep = row[2].replace("//", "/") + if p_rep == row_2_rep: + continue + + if row_2_rep and row_2_rep != "": + gfind_write_row(f, row[0], field_separator, p_rep, row_2_rep) + + else: + gfind_write(f, row[0], field_separator, p_rep) + +def validate_volume(volume): + cmd = ["gluster", 'volume', 'info', volume, "--xml"] + _, data, _ = execute(cmd, + exit_msg="Failed to Run Gluster Volume Info", + logger=logger) + try: + tree = etree.fromstring(data) + statusStr = tree.find('volInfo/volumes/volume/statusStr').text + except (ParseError, AttributeError) as e: + fail("Invalid Volume: Check the Volume name! %s" % e) + if statusStr != "Started": + fail("Volume %s is not online" % volume) + +# The rules for a valid session name. +SESSION_NAME_RULES = { + 'min_length': 2, + 'max_length': 256, # same as maximum volume length + # Specifies all alphanumeric characters, underscore, hyphen. + 'valid_chars': r'0-9a-zA-Z_-', +} + + +# checks valid session name, fail otherwise +def validate_session_name(session): + # Check for minimum length + if len(session) < SESSION_NAME_RULES['min_length']: + fail('session_name must be at least ' + + str(SESSION_NAME_RULES['min_length']) + ' characters long.') + # Check for maximum length + if len(session) > SESSION_NAME_RULES['max_length']: + fail('session_name must not exceed ' + + str(SESSION_NAME_RULES['max_length']) + ' characters length.') + + # Matches strings composed entirely of characters specified within + if not re.match(r'^[' + SESSION_NAME_RULES['valid_chars'] + + ']+$', session): + fail('Session name can only contain these characters: ' + + SESSION_NAME_RULES['valid_chars']) + + +def mode_create(session_dir, args): + validate_session_name(args.session) + + logger.debug("Init is called - Session: %s, Volume: %s" + % (args.session, args.volume)) + mkdirp(session_dir, exit_on_err=True, logger=logger) + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + status_file = os.path.join(session_dir, args.volume, "status") + + if os.path.exists(status_file) and not args.force: + fail("Session %s already created" % args.session, logger=logger) + + if not os.path.exists(status_file) or args.force: + ssh_setup(args) + enable_volume_options(args) + + # Add Rollover time to current time to make sure changelogs + # will be available if we use this time as start time + time_to_update = int(time.time()) + get_changelog_rollover_time( + args.volume) + + run_cmd_nodes("create", args, time_to_update=str(time_to_update)) + + if not os.path.exists(status_file) or args.reset_session_time: + with open(status_file, "w") as f: + f.write(str(time_to_update)) + + sys.stdout.write("Session %s created with volume %s\n" % + (args.session, args.volume)) + + sys.exit(0) + + +def mode_query(session_dir, args): + global gtmpfilename + global g_pid_nodefile_map + + # Verify volume status + cmd = ["gluster", 'volume', 'info', args.volume, "--xml"] + _, data, _ = execute(cmd, + exit_msg="Failed to Run Gluster Volume Info", + logger=logger) + try: + tree = etree.fromstring(data) + statusStr = tree.find('volInfo/volumes/volume/statusStr').text + except (ParseError, AttributeError) as e: + fail("Invalid Volume: %s" % e, logger=logger) + + if statusStr != "Started": + fail("Volume %s is not online" % args.volume, logger=logger) + + mkdirp(session_dir, exit_on_err=True, logger=logger) + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger) + + # Configure cluster for pasword-less SSH + ssh_setup(args) + + # Enable volume options for changelog capture + enable_volume_options(args) + + # Test options + if not args.full and args.type in ["f", "d"]: + fail("--type can only be used with --full") + if not args.since_time and not args.end_time and not args.full: + fail("Please specify either {--since-time and optionally --end-time} " + "or --full", logger=logger) + + if args.since_time and args.end_time and args.full: + fail("Please specify either {--since-time and optionally --end-time} " + "or --full, but not both", + logger=logger) + + if args.end_time and not args.since_time: + fail("Please specify --since-time as well", logger=logger) + + # Start query command processing + start = -1 + end = -1 + if args.since_time: + start = args.since_time + if args.end_time: + end = args.end_time + else: + start = 0 # --full option is handled separately + + logger.debug("Query is called - Session: %s, Volume: %s, " + "Start time: %s, End time: %s" + % ("default", args.volume, start, end)) + + prefix = datetime.now().strftime("%Y%m%d-%H%M%S-%f-") + gtmpfilename = prefix + next(tempfile._get_candidate_names()) + + run_cmd_nodes("query", args, start=start, end=end, + tmpfilename=gtmpfilename) + + # Merger + if args.full: + if len(g_pid_nodefile_map) > 0: + cmd = ["sort", "-u"] + list(g_pid_nodefile_map.values()) + \ + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + else: + fail("Failed to collect any output files from peers. " + "Looks like all bricks are offline.", logger=logger) + else: + # Read each Changelogs db and generate finaldb + create_file(args.outfile, exit_on_err=True, logger=logger) + outfilemerger = OutputMerger(args.outfile + ".db", + list(g_pid_nodefile_map.values())) + write_output(args.outfile, outfilemerger, args.field_separator) + + try: + os.remove(args.outfile + ".db") + except (IOError, OSError): + pass + + run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename) + + sys.stdout.write("Generated output file %s\n" % args.outfile) + + +def mode_pre(session_dir, args): + global gtmpfilename + global g_pid_nodefile_map + + """ + Read from Session file and write to session.pre file + """ + endtime_to_update = int(time.time()) - get_changelog_rollover_time( + args.volume) + status_file = os.path.join(session_dir, args.volume, "status") + status_file_pre = status_file + ".pre" + + mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger) + + if not args.full and args.type in ["f", "d"]: + fail("--type can only be used with --full") + + # If Pre status file exists and running pre command again + if os.path.exists(status_file_pre) and not args.regenerate_outfile: + fail("Post command is not run after last pre, " + "use --regenerate-outfile") + + start = 0 + try: + with open(status_file) as f: + start = int(f.read().strip()) + except ValueError: + pass + except (OSError, IOError) as e: + fail("Error Opening Session file %s: %s" + % (status_file, e), logger=logger) + + logger.debug("Pre is called - Session: %s, Volume: %s, " + "Start time: %s, End time: %s" + % (args.session, args.volume, start, endtime_to_update)) + + prefix = datetime.now().strftime("%Y%m%d-%H%M%S-%f-") + gtmpfilename = prefix + next(tempfile._get_candidate_names()) + + run_cmd_nodes("pre", args, start=start, end=-1, tmpfilename=gtmpfilename) + + # Merger + if args.full: + if len(g_pid_nodefile_map) > 0: + cmd = ["sort", "-u"] + list(g_pid_nodefile_map.values()) + \ + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + else: + fail("Failed to collect any output files from peers. " + "Looks like all bricks are offline.", logger=logger) + else: + # Read each Changelogs db and generate finaldb + create_file(args.outfile, exit_on_err=True, logger=logger) + outfilemerger = OutputMerger(args.outfile + ".db", + list(g_pid_nodefile_map.values())) + write_output(args.outfile, outfilemerger, args.field_separator) + + try: + os.remove(args.outfile + ".db") + except (IOError, OSError): + pass + + run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename) + + with open(status_file_pre, "w") as f: + f.write(str(endtime_to_update)) + + sys.stdout.write("Generated output file %s\n" % args.outfile) + + +def mode_post(session_dir, args): + """ + If pre session file exists, overwrite session file + If pre session file does not exists, return ERROR + """ + status_file = os.path.join(session_dir, args.volume, "status") + logger.debug("Post is called - Session: %s, Volume: %s" + % (args.session, args.volume)) + status_file_pre = status_file + ".pre" + + if os.path.exists(status_file_pre): + run_cmd_nodes("post", args) + os.rename(status_file_pre, status_file) + sys.stdout.write("Session %s with volume %s updated\n" % + (args.session, args.volume)) + sys.exit(0) + else: + fail("Pre script is not run", logger=logger) + + +def mode_delete(session_dir, args): + run_cmd_nodes("delete", args) + shutil.rmtree(os.path.join(session_dir, args.volume), + onerror=handle_rm_error) + sys.stdout.write("Session %s with volume %s deleted\n" % + (args.session, args.volume)) + + # If the session contains only this volume, then cleanup the + # session directory. If a session contains multiple volumes + # then os.rmdir will fail with ENOTEMPTY + try: + os.rmdir(session_dir) + except OSError as e: + if not e.errno == ENOTEMPTY: + logger.warn("Failed to delete session directory: %s" % e) + + +def mode_list(session_dir, args): + """ + List available sessions to stdout, if session name is set + only list that session. + """ + if args.session: + if not os.path.exists(os.path.join(session_dir, args.session)): + fail("Invalid Session", logger=logger) + sessions = [args.session] + else: + sessions = [] + for d in os.listdir(session_dir): + if d != ".keys": + sessions.append(d) + + output = [] + for session in sessions: + # Session Volume Last Processed + volnames = os.listdir(os.path.join(session_dir, session)) + + for volname in volnames: + if args.volume and args.volume != volname: + continue + + status_file = os.path.join(session_dir, session, volname, "status") + last_processed = None + try: + with open(status_file) as f: + last_processed = f.read().strip() + except (OSError, IOError) as e: + if e.errno == ENOENT: + continue + else: + raise + output.append((session, volname, last_processed)) + + if output: + sys.stdout.write("%s %s %s\n" % ("SESSION".ljust(25), + "VOLUME".ljust(25), + "SESSION TIME".ljust(25))) + sys.stdout.write("-"*75) + sys.stdout.write("\n") + for session, volname, last_processed in output: + sess_time = 'Session Corrupted' + if last_processed: + try: + sess_time = human_time(last_processed) + except TypeError: + sess_time = 'Session Corrupted' + sys.stdout.write("%s %s %s\n" % (session.ljust(25), + volname.ljust(25), + sess_time.ljust(25))) + + if not output: + if args.session or args.volume: + fail("Invalid Session", logger=logger) + else: + sys.stdout.write("No sessions found.\n") + + +def main(): + global gtmpfilename + + args = None + + try: + args = _get_args() + mkdirp(conf.get_opt("session_dir"), exit_on_err=True) + + # force the default session name if mode is "query" + if args.mode == "query": + args.session = "default" + + if args.mode == "list": + session_dir = conf.get_opt("session_dir") + else: + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + + if not os.path.exists(session_dir) and \ + args.mode not in ["create", "list", "query"]: + fail("Invalid session %s" % args.session) + + # volume involved, validate the volume first + if args.mode not in ["list"]: + validate_volume(args.volume) + + + # "default" is a system defined session name + if args.mode in ["create", "post", "pre", "delete"] and \ + args.session == "default": + fail("Invalid session %s" % args.session) + + vol_dir = os.path.join(session_dir, args.volume) + if not os.path.exists(vol_dir) and args.mode not in \ + ["create", "list", "query"]: + fail("Session %s not created with volume %s" % + (args.session, args.volume)) + + mkdirp(os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "cli.log") + setup_logger(logger, log_file, args.debug) + + # globals() will have all the functions already defined. + # mode_<args.mode> will be the function name to be called + globals()["mode_" + args.mode](session_dir, args) + except KeyboardInterrupt: + if args is not None: + if args.mode == "pre" or args.mode == "query": + # cleanup session + if gtmpfilename is not None: + # no more interrupts until we clean up + signal.signal(signal.SIGINT, signal.SIG_IGN) + run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename) + + # Interrupted, exit with non zero error code + sys.exit(2) diff --git a/tools/glusterfind/src/nodeagent.py b/tools/glusterfind/src/nodeagent.py new file mode 100644 index 00000000000..679daa6fa76 --- /dev/null +++ b/tools/glusterfind/src/nodeagent.py @@ -0,0 +1,139 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import shutil +import sys +import os +import logging +from argparse import ArgumentParser, RawDescriptionHelpFormatter +try: + import urllib.parse as urllib +except ImportError: + import urllib +from errno import ENOTEMPTY + +from utils import setup_logger, mkdirp, handle_rm_error +import conf + +logger = logging.getLogger() + + +def mode_cleanup(args): + working_dir = os.path.join(conf.get_opt("working_dir"), + args.session, + args.volume, + args.tmpfilename) + + mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "changelog.log") + + setup_logger(logger, log_file) + + try: + shutil.rmtree(working_dir, onerror=handle_rm_error) + except (OSError, IOError) as e: + logger.error("Failed to delete working directory: %s" % e) + sys.exit(1) + + +def mode_create(args): + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + + if not os.path.exists(status_file) or args.reset_session_time: + with open(status_file, "w") as f: + f.write(args.time_to_update) + + sys.exit(0) + + +def mode_post(args): + session_dir = os.path.join(conf.get_opt("session_dir"), args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + status_file_pre = status_file + ".pre" + + if os.path.exists(status_file_pre): + os.rename(status_file_pre, status_file) + sys.exit(0) + + +def mode_delete(args): + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + shutil.rmtree(os.path.join(session_dir, args.volume), + onerror=handle_rm_error) + + # If the session contains only this volume, then cleanup the + # session directory. If a session contains multiple volumes + # then os.rmdir will fail with ENOTEMPTY + try: + os.rmdir(session_dir) + except OSError as e: + if not e.errno == ENOTEMPTY: + logger.warn("Failed to delete session directory: %s" % e) + + +def _get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description="Node Agent") + subparsers = parser.add_subparsers(dest="mode") + + parser_cleanup = subparsers.add_parser('cleanup') + parser_cleanup.add_argument("session", help="Session Name") + parser_cleanup.add_argument("volume", help="Volume Name") + parser_cleanup.add_argument("tmpfilename", help="Temporary File Name") + parser_cleanup.add_argument("--debug", help="Debug", action="store_true") + + parser_session_create = subparsers.add_parser('create') + parser_session_create.add_argument("session", help="Session Name") + parser_session_create.add_argument("volume", help="Volume Name") + parser_session_create.add_argument("brick", help="Brick Path") + parser_session_create.add_argument("time_to_update", help="Time to Update") + parser_session_create.add_argument("--reset-session-time", + help="Reset Session Time", + action="store_true") + parser_session_create.add_argument("--debug", help="Debug", + action="store_true") + + parser_post = subparsers.add_parser('post') + parser_post.add_argument("session", help="Session Name") + parser_post.add_argument("volume", help="Volume Name") + parser_post.add_argument("brick", help="Brick Path") + parser_post.add_argument("--debug", help="Debug", + action="store_true") + + parser_delete = subparsers.add_parser('delete') + parser_delete.add_argument("session", help="Session Name") + parser_delete.add_argument("volume", help="Volume Name") + parser_delete.add_argument("--debug", help="Debug", + action="store_true") + return parser.parse_args() + + +if __name__ == "__main__": + args = _get_args() + + # globals() will have all the functions already defined. + # mode_<args.mode> will be the function name to be called + globals()["mode_" + args.mode](args) diff --git a/tools/glusterfind/src/tool.conf.in b/tools/glusterfind/src/tool.conf.in new file mode 100644 index 00000000000..a80f4a784c0 --- /dev/null +++ b/tools/glusterfind/src/tool.conf.in @@ -0,0 +1,10 @@ +[vars] +session_dir=@GLUSTERD_WORKDIR@/glusterfind/ +working_dir=@GLUSTERFSD_MISCDIR@/glusterfind/ +log_dir=/var/log/glusterfs/glusterfind/ +nodeagent=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodeagent.py +brick_ignore_dirs=.glusterfs,.trashcan + +[change_detectors] +changelog=@GLUSTERFS_LIBEXECDIR@/glusterfind/changelog.py +brickfind=@GLUSTERFS_LIBEXECDIR@/glusterfind/brickfind.py
\ No newline at end of file diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py new file mode 100644 index 00000000000..906ebd8f252 --- /dev/null +++ b/tools/glusterfind/src/utils.py @@ -0,0 +1,267 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sys +from subprocess import PIPE, Popen +from errno import EEXIST, ENOENT +import xml.etree.cElementTree as etree +import logging +import os +from datetime import datetime + +ROOT_GFID = "00000000-0000-0000-0000-000000000001" +DEFAULT_CHANGELOG_INTERVAL = 15 +SPACE_ESCAPE_CHAR = "%20" +NEWLINE_ESCAPE_CHAR = "%0A" +PERCENTAGE_ESCAPE_CHAR = "%25" + +ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError +cache_data = {} + + +class RecordType(object): + NEW = "NEW" + MODIFY = "MODIFY" + RENAME = "RENAME" + DELETE = "DELETE" + + +def cache_output(func): + def wrapper(*args, **kwargs): + global cache_data + if cache_data.get(func.__name__, None) is None: + cache_data[func.__name__] = func(*args, **kwargs) + + return cache_data[func.__name__] + return wrapper + + +def handle_rm_error(func, path, exc_info): + if exc_info[1].errno == ENOENT: + return + + raise exc_info[1] + + +def find(path, callback_func=lambda x: True, filter_func=lambda x: True, + ignore_dirs=[], subdirs_crawl=True): + if path in ignore_dirs: + return + + # Capture filter_func output and pass it to callback function + filter_result = filter_func(path) + if filter_result is not None: + callback_func(path, filter_result, os.path.isdir(path)) + + for p in os.listdir(path): + full_path = os.path.join(path, p) + + is_dir = os.path.isdir(full_path) + if is_dir: + if subdirs_crawl: + find(full_path, callback_func, filter_func, ignore_dirs) + else: + filter_result = filter_func(full_path) + if filter_result is not None: + callback_func(full_path, filter_result) + else: + filter_result = filter_func(full_path) + if filter_result is not None: + callback_func(full_path, filter_result, is_dir) + + +def output_write(f, path, prefix=".", encode=False, tag="", + field_separator=" "): + if path == "": + return + + if prefix != ".": + path = os.path.join(prefix, path) + + if encode: + path = quote_plus_space_newline(path) + + # set the field separator + FS = "" if tag == "" else field_separator + + f.write("%s%s%s\n" % (tag.strip(), FS, path)) + + +def human_time(ts): + return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S") + + +def setup_logger(logger, path, debug=False): + if debug: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + + # create the logging file handler + fh = logging.FileHandler(path) + + formatter = logging.Formatter("[%(asctime)s] %(levelname)s " + "[%(module)s - %(lineno)s:%(funcName)s] " + "- %(message)s") + + fh.setFormatter(formatter) + + # add handler to logger object + logger.addHandler(fh) + + +def create_file(path, exit_on_err=False, logger=None): + """ + If file exists overwrite. Print error to stderr and exit + if exit_on_err is set, else raise the exception. Consumer + should handle the exception. + """ + try: + open(path, 'w').close() + except (OSError, IOError) as e: + if exit_on_err: + fail("Failed to create file %s: %s" % (path, e), logger=logger) + else: + raise + + +def mkdirp(path, exit_on_err=False, logger=None): + """ + Try creating required directory structure + ignore EEXIST and raise exception for rest of the errors. + Print error in stderr and exit if exit_on_err is set, else + raise exception. + """ + try: + os.makedirs(path) + except (OSError, IOError) as e: + if e.errno == EEXIST and os.path.isdir(path): + pass + else: + if exit_on_err: + fail("Fail to create dir %s: %s" % (path, e), logger=logger) + else: + raise + + +def fail(msg, code=1, logger=None): + """ + Write error to stderr and exit + """ + if logger: + logger.error(msg) + sys.stderr.write("%s\n" % msg) + sys.exit(code) + + +def execute(cmd, exit_msg=None, logger=None): + """ + If failure_msg is not None then return returncode, out and error. + If failure msg is set, write to stderr and exit. + """ + p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True) + + (out, err) = p.communicate() + if p.returncode != 0 and exit_msg is not None: + fail("%s: %s" % (exit_msg, err), p.returncode, logger=logger) + + return (p.returncode, out, err) + + +def symlink_gfid_to_path(brick, gfid): + """ + Each directories are symlinked to file named GFID + in .glusterfs directory of brick backend. Using readlink + we get PARGFID/basename of dir. readlink recursively till + we get PARGFID as ROOT_GFID. + """ + if gfid == ROOT_GFID: + return "" + + out_path = "" + while True: + path = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid) + path_readlink = os.readlink(path) + pgfid = os.path.dirname(path_readlink) + out_path = os.path.join(os.path.basename(path_readlink), out_path) + if pgfid == "../../00/00/%s" % ROOT_GFID: + break + gfid = os.path.basename(pgfid) + return out_path + + +@cache_output +def get_my_uuid(): + cmd = ["gluster", "system::", "uuid", "get", "--xml"] + rc, out, err = execute(cmd) + + if rc != 0: + return None + + tree = etree.fromstring(out) + uuid_el = tree.find("uuidGenerate/uuid") + return uuid_el.text + + +def is_host_local(host_uuid): + # Get UUID only if it is not done previously + # else Cache the UUID value + my_uuid = get_my_uuid() + if my_uuid == host_uuid: + return True + + return False + + +def get_changelog_rollover_time(volumename): + cmd = ["gluster", "volume", "get", volumename, + "changelog.rollover-time", "--xml"] + rc, out, err = execute(cmd) + + if rc != 0: + return DEFAULT_CHANGELOG_INTERVAL + + try: + tree = etree.fromstring(out) + val = tree.find('volGetopts/Opt/Value').text + if val is not None: + # Filter the value by split, as it may be 'X (DEFAULT)' + # and we only need 'X' + return int(val.split(' ', 1)[0]) + except ParseError: + return DEFAULT_CHANGELOG_INTERVAL + + +def output_path_prepare(path, args): + """ + If Prefix is set, joins to Path, removes ending slash + and encodes it. + """ + if args.output_prefix != ".": + path = os.path.join(args.output_prefix, path) + if path.endswith("/"): + path = path[0:len(path)-1] + + if args.no_encode: + return path + else: + return quote_plus_space_newline(path) + + +def unquote_plus_space_newline(s): + return s.replace(SPACE_ESCAPE_CHAR, " ")\ + .replace(NEWLINE_ESCAPE_CHAR, "\n")\ + .replace(PERCENTAGE_ESCAPE_CHAR, "%") + + +def quote_plus_space_newline(s): + return s.replace("%", PERCENTAGE_ESCAPE_CHAR)\ + .replace(" ", SPACE_ESCAPE_CHAR)\ + .replace("\n", NEWLINE_ESCAPE_CHAR) diff --git a/tools/setgfid2path/Makefile.am b/tools/setgfid2path/Makefile.am new file mode 100644 index 00000000000..c14787a80ce --- /dev/null +++ b/tools/setgfid2path/Makefile.am @@ -0,0 +1,5 @@ +SUBDIRS = src + +EXTRA_DIST = gluster-setgfid2path.8 + +man8_MANS = gluster-setgfid2path.8 diff --git a/tools/setgfid2path/gluster-setgfid2path.8 b/tools/setgfid2path/gluster-setgfid2path.8 new file mode 100644 index 00000000000..2e228ca8514 --- /dev/null +++ b/tools/setgfid2path/gluster-setgfid2path.8 @@ -0,0 +1,54 @@ + +.\" Copyright (c) 2017 Red Hat, Inc. <http://www.redhat.com> +.\" This file is part of GlusterFS. +.\" +.\" This file is licensed to you under your choice of the GNU Lesser +.\" General Public License, version 3 or any later version (LGPLv3 or +.\" later), or the GNU General Public License, version 2 (GPLv2), in all +.\" cases as published by the Free Software Foundation. +.\" +.\" +.TH gluster-setgfid2path 8 "Command line utility to set GFID to Path Xattrs" +.SH NAME +gluster-setgfid2path - Gluster tool to set GFID to Path xattrs +.SH SYNOPSIS +.B gluster-setgfid2path +.IR file +.SH DESCRIPTION +New feature introduced with Gluster release 3.12, to find full path from GFID. +This feature can be enabled using Volume set command \fBgluster volume set +<VOLUME> storage.gfid2path enable\fR +.PP +Once \fBgfid2path\fR feature is enabled, it starts recording the necessary +xattrs required for the feature. But it will not add xattrs for the already +existing files. This tool provides facility to update the gfid2path xattrs for +the given file path. + +.SH EXAMPLES +To add xattrs of a single file, +.PP +.nf +.RS +gluster-setgfid2path /bricks/b1/hello.txt +.RE +.fi +.PP +To set xattr for all the existing files, run the below script on each bricks. +.PP +.nf +.RS +BRICK=/bricks/b1 +find $BRICK -type d \\( -path "${BRICK}/.trashcan" -o -path \\ + "${BRICK}/.glusterfs" \\) -prune -o -type f \\ + -exec gluster-setgfid2path {} \\; +.RE +.fi +.PP +.SH SEE ALSO +.nf +\fBgluster\fR(8) +\fR +.fi +.SH COPYRIGHT +.nf +Copyright(c) 2017 Red Hat, Inc. <http://www.redhat.com> diff --git a/tools/setgfid2path/src/Makefile.am b/tools/setgfid2path/src/Makefile.am new file mode 100644 index 00000000000..7316d117070 --- /dev/null +++ b/tools/setgfid2path/src/Makefile.am @@ -0,0 +1,16 @@ +gluster_setgfid2pathdir = $(sbindir) + +if WITH_SERVER +gluster_setgfid2path_PROGRAMS = gluster-setgfid2path +endif + +gluster_setgfid2path_SOURCES = main.c + +gluster_setgfid2path_LDADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +gluster_setgfid2path_LDFLAGS = $(GF_LDFLAGS) + +AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \ + -I$(top_builddir)/rpc/xdr/src + +AM_CFLAGS = -Wall $(GF_CFLAGS) diff --git a/tools/setgfid2path/src/main.c b/tools/setgfid2path/src/main.c new file mode 100644 index 00000000000..4320a7b2481 --- /dev/null +++ b/tools/setgfid2path/src/main.c @@ -0,0 +1,130 @@ +/* + Copyright (c) 2017 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. + */ +#include <stdio.h> +#include <libgen.h> + +#include <glusterfs/common-utils.h> +#include <glusterfs/syscall.h> + +#define MAX_GFID2PATH_LINK_SUP 500 +#define GFID_SIZE 16 +#define GFID_XATTR_KEY "trusted.gfid" + +int +main(int argc, char **argv) +{ + int ret = 0; + struct stat st; + char *dname = NULL; + char *bname = NULL; + ssize_t ret_size = 0; + uuid_t pgfid_raw = { + 0, + }; + char pgfid[36 + 1] = ""; + char xxh64[GF_XXH64_DIGEST_LENGTH * 2 + 1] = { + 0, + }; + char pgfid_bname[1024] = { + 0, + }; + char *key = NULL; + char *val = NULL; + size_t key_size = 0; + size_t val_size = 0; + const char *file_path = NULL; + char *file_path1 = NULL; + char *file_path2 = NULL; + + if (argc != 2) { + fprintf(stderr, "Usage: setgfid2path <file-path>\n"); + return -1; + } + + ret = sys_lstat(argv[1], &st); + if (ret != 0) { + fprintf(stderr, "Invalid File Path\n"); + return -1; + } + + if (st.st_nlink >= MAX_GFID2PATH_LINK_SUP) { + fprintf(stderr, + "Number of Hardlink support exceeded. " + "max=%d\n", + MAX_GFID2PATH_LINK_SUP); + return -1; + } + + file_path = argv[1]; + file_path1 = strdup(file_path); + file_path2 = strdup(file_path); + + dname = dirname(file_path1); + bname = basename(file_path2); + + /* Get GFID of Parent directory */ + ret_size = sys_lgetxattr(dname, GFID_XATTR_KEY, pgfid_raw, GFID_SIZE); + if (ret_size != GFID_SIZE) { + fprintf(stderr, "Failed to get GFID of parent directory. dir=%s\n", + dname); + ret = -1; + goto out; + } + + /* Convert to UUID format */ + if (uuid_utoa_r(pgfid_raw, pgfid) == NULL) { + fprintf(stderr, + "Failed to format GFID of parent directory. " + "dir=%s GFID=%s\n", + dname, pgfid_raw); + ret = -1; + goto out; + } + + /* Find xxhash for PGFID/BaseName */ + snprintf(pgfid_bname, sizeof(pgfid_bname), "%s/%s", pgfid, bname); + gf_xxh64_wrapper((unsigned char *)pgfid_bname, strlen(pgfid_bname), + GF_XXHSUM64_DEFAULT_SEED, xxh64); + + key_size = SLEN(GFID2PATH_XATTR_KEY_PREFIX) + GF_XXH64_DIGEST_LENGTH * 2 + + 1; + key = alloca(key_size); + snprintf(key, key_size, GFID2PATH_XATTR_KEY_PREFIX "%s", xxh64); + + val_size = UUID_CANONICAL_FORM_LEN + NAME_MAX + 2; + val = alloca(val_size); + snprintf(val, val_size, "%s/%s", pgfid, bname); + + /* Set the Xattr, ignore if same key xattr already exists */ + ret = sys_lsetxattr(file_path, key, val, strlen(val), XATTR_CREATE); + if (ret == -1) { + if (errno == EEXIST) { + printf("Xattr already exists, ignoring..\n"); + ret = 0; + goto out; + } + + fprintf(stderr, "Failed to set gfid2path xattr. errno=%d\n error=%s", + errno, strerror(errno)); + ret = -1; + goto out; + } + + printf("Success. file=%s key=%s value=%s\n", file_path, key, val); + +out: + if (file_path1 != NULL) + free(file_path1); + + if (file_path2 != NULL) + free(file_path2); + + return ret; +} |
