diff options
Diffstat (limited to 'tools')
26 files changed, 2564 insertions, 961 deletions
diff --git a/tools/Makefile.am b/tools/Makefile.am index d689f60fa52..5808a3728cd 100644 --- a/tools/Makefile.am +++ b/tools/Makefile.am @@ -1,3 +1,3 @@ -SUBDIRS = gfind_missing_files glusterfind +SUBDIRS = gfind_missing_files glusterfind setgfid2path CLEANFILES = diff --git a/tools/gfind_missing_files/Makefile.am b/tools/gfind_missing_files/Makefile.am index 456aad836b6..181fe7091f3 100644 --- a/tools/gfind_missing_files/Makefile.am +++ b/tools/gfind_missing_files/Makefile.am @@ -1,24 +1,32 @@ -gfindmissingfilesdir = $(libexecdir)/glusterfs/gfind_missing_files +gfindmissingfilesdir = $(GLUSTERFS_LIBEXECDIR)/gfind_missing_files +if WITH_SERVER gfindmissingfiles_SCRIPTS = gfind_missing_files.sh gfid_to_path.sh \ gfid_to_path.py +endif EXTRA_DIST = gfind_missing_files.sh gfid_to_path.sh \ gfid_to_path.py +if WITH_SERVER gfindmissingfiles_PROGRAMS = gcrawler +endif gcrawler_SOURCES = gcrawler.c +gcrawler_LDADD = $(top_builddir)/libglusterfs/src/libglusterfs.la +gcrawler_LDFLAGS = $(GF_LDFLAGS) AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src AM_CFLAGS = -Wall $(GF_CFLAGS) +if WITH_SERVER uninstall-local: rm -f $(DESTDIR)$(sbindir)/gfind_missing_files install-data-local: rm -f $(DESTDIR)$(sbindir)/gfind_missing_files - ln -s $(libexecdir)/glusterfs/gfind_missing_files/gfind_missing_files.sh $(DESTDIR)$(sbindir)/gfind_missing_files + ln -s $(GLUSTERFS_LIBEXECDIR)/gfind_missing_files/gfind_missing_files.sh $(DESTDIR)$(sbindir)/gfind_missing_files +endif CLEANFILES = diff --git a/tools/gfind_missing_files/gcrawler.c b/tools/gfind_missing_files/gcrawler.c index 517e773cb7c..4acbe92bc8f 100644 --- a/tools/gfind_missing_files/gcrawler.c +++ b/tools/gfind_missing_files/gcrawler.c @@ -12,43 +12,54 @@ #include <errno.h> #include <sys/stat.h> #include <unistd.h> -#include <pthread.h> #include <stdlib.h> #include <string.h> #include <dirent.h> #include <assert.h> +#include <glusterfs/locking.h> -#ifndef __FreeBSD__ -#ifdef __NetBSD__ -#include <sys/xattr.h> -#else -#include <attr/xattr.h> -#endif /* __NetBSD__ */ -#endif /* __FreeBSD__ */ - -#include "list.h" +#include <glusterfs/compat.h> +#include <glusterfs/list.h> +#include <glusterfs/syscall.h> #define THREAD_MAX 32 #define BUMP(name) INC(name, 1) #define DEFAULT_WORKERS 4 -#define NEW(x) { \ - x = calloc (1, sizeof (typeof (*x))); \ - } - -#define err(x ...) fprintf(stderr, x) -#define out(x ...) fprintf(stdout, x) -#define dbg(x ...) do { if (debug) fprintf(stdout, x); } while (0) -#define tout(x ...) do { out("[%ld] ", pthread_self()); out(x); } while (0) -#define terr(x ...) do { err("[%ld] ", pthread_self()); err(x); } while (0) -#define tdbg(x ...) do { dbg("[%ld] ", pthread_self()); dbg(x); } while (0) +#define NEW(x) \ + { \ + x = calloc(1, sizeof(typeof(*x))); \ + } + +#define err(x...) fprintf(stderr, x) +#define out(x...) fprintf(stdout, x) +#define dbg(x...) \ + do { \ + if (debug) \ + fprintf(stdout, x); \ + } while (0) +#define tout(x...) \ + do { \ + out("[%ld] ", pthread_self()); \ + out(x); \ + } while (0) +#define terr(x...) \ + do { \ + err("[%ld] ", pthread_self()); \ + err(x); \ + } while (0) +#define tdbg(x...) \ + do { \ + dbg("[%ld] ", pthread_self()); \ + dbg(x); \ + } while (0) int debug = 0; const char *slavemnt = NULL; int workers = 0; struct stats { - unsigned long long int cnt_skipped_gfids; + unsigned long long int cnt_skipped_gfids; }; pthread_spinlock_t stats_lock; @@ -56,517 +67,515 @@ pthread_spinlock_t stats_lock; struct stats stats_total; int stats = 0; -#define INC(name, val) do { \ - if (!stats) \ - break; \ - pthread_spin_lock(&stats_lock); \ - { \ - stats_total.cnt_##name += val; \ - } \ - pthread_spin_unlock(&stats_lock); \ - } while (0) +#define INC(name, val) \ + do { \ + if (!stats) \ + break; \ + pthread_spin_lock(&stats_lock); \ + { \ + stats_total.cnt_##name += val; \ + } \ + pthread_spin_unlock(&stats_lock); \ + } while (0) void stats_dump() { - if (!stats) - return; + if (!stats) + return; - out("-------------------------------------------\n"); - out("Skipped_Files : %10lld\n", stats_total.cnt_skipped_gfids); - out("-------------------------------------------\n"); + out("-------------------------------------------\n"); + out("Skipped_Files : %10lld\n", stats_total.cnt_skipped_gfids); + out("-------------------------------------------\n"); } struct dirjob { - struct list_head list; + struct list_head list; - char *dirname; + char *dirname; - struct dirjob *parent; - int ret; /* final status of this subtree */ - int refcnt; /* how many dirjobs have this as parent */ + struct dirjob *parent; + int ret; /* final status of this subtree */ + int refcnt; /* how many dirjobs have this as parent */ - pthread_spinlock_t lock; + pthread_spinlock_t lock; }; - struct xwork { - pthread_t cthreads[THREAD_MAX]; /* crawler threads */ - int count; - int idle; - int stop; + pthread_t cthreads[THREAD_MAX]; /* crawler threads */ + int count; + int idle; + int stop; - struct dirjob crawl; + struct dirjob crawl; - struct dirjob *rootjob; /* to verify completion in xwork_fini() */ + struct dirjob *rootjob; /* to verify completion in xwork_fini() */ - pthread_mutex_t mutex; - pthread_cond_t cond; + pthread_mutex_t mutex; + pthread_cond_t cond; }; - struct dirjob * -dirjob_ref (struct dirjob *job) +dirjob_ref(struct dirjob *job) { - pthread_spin_lock (&job->lock); - { - job->refcnt++; - } - pthread_spin_unlock (&job->lock); + pthread_spin_lock(&job->lock); + { + job->refcnt++; + } + pthread_spin_unlock(&job->lock); - return job; + return job; } - void -dirjob_free (struct dirjob *job) +dirjob_free(struct dirjob *job) { - assert (list_empty (&job->list)); + assert(list_empty(&job->list)); - pthread_spin_destroy (&job->lock); - free (job->dirname); - free (job); + pthread_spin_destroy(&job->lock); + free(job->dirname); + free(job); } void -dirjob_ret (struct dirjob *job, int err) +dirjob_ret(struct dirjob *job, int err) { - int ret = 0; - int refcnt = 0; - struct dirjob *parent = NULL; - - pthread_spin_lock (&job->lock); - { - refcnt = --job->refcnt; - job->ret = (job->ret || err); - } - pthread_spin_unlock (&job->lock); - - if (refcnt == 0) { - ret = job->ret; - - if (ret) - terr ("Failed: %s (%d)\n", job->dirname, ret); - else - tdbg ("Finished: %s\n", job->dirname); - - parent = job->parent; - if (parent) - dirjob_ret (parent, ret); + int ret = 0; + int refcnt = 0; + struct dirjob *parent = NULL; + + pthread_spin_lock(&job->lock); + { + refcnt = --job->refcnt; + job->ret = (job->ret || err); + } + pthread_spin_unlock(&job->lock); + + if (refcnt == 0) { + ret = job->ret; + + if (ret) + terr("Failed: %s (%d)\n", job->dirname, ret); + else + tdbg("Finished: %s\n", job->dirname); + + parent = job->parent; + if (parent) + dirjob_ret(parent, ret); - dirjob_free (job); - job = NULL; - } + dirjob_free(job); + job = NULL; + } } - struct dirjob * -dirjob_new (const char *dir, struct dirjob *parent) +dirjob_new(const char *dir, struct dirjob *parent) { - struct dirjob *job = NULL; + struct dirjob *job = NULL; - NEW(job); - if (!job) - return NULL; + NEW(job); + if (!job) + return NULL; - job->dirname = strdup (dir); - if (!job->dirname) { - free (job); - return NULL; - } + job->dirname = strdup(dir); + if (!job->dirname) { + free(job); + return NULL; + } - INIT_LIST_HEAD(&job->list); - pthread_spin_init (&job->lock, PTHREAD_PROCESS_PRIVATE); - job->ret = 0; + INIT_LIST_HEAD(&job->list); + pthread_spin_init(&job->lock, PTHREAD_PROCESS_PRIVATE); + job->ret = 0; - if (parent) - job->parent = dirjob_ref (parent); + if (parent) + job->parent = dirjob_ref(parent); - job->refcnt = 1; + job->refcnt = 1; - return job; + return job; } void -xwork_addcrawl (struct xwork *xwork, struct dirjob *job) +xwork_addcrawl(struct xwork *xwork, struct dirjob *job) { - pthread_mutex_lock (&xwork->mutex); - { - list_add_tail (&job->list, &xwork->crawl.list); - pthread_cond_broadcast (&xwork->cond); - } - pthread_mutex_unlock (&xwork->mutex); + pthread_mutex_lock(&xwork->mutex); + { + list_add_tail(&job->list, &xwork->crawl.list); + pthread_cond_broadcast(&xwork->cond); + } + pthread_mutex_unlock(&xwork->mutex); } int -xwork_add (struct xwork *xwork, const char *dir, struct dirjob *parent) +xwork_add(struct xwork *xwork, const char *dir, struct dirjob *parent) { - struct dirjob *job = NULL; + struct dirjob *job = NULL; - job = dirjob_new (dir, parent); - if (!job) - return -1; + job = dirjob_new(dir, parent); + if (!job) + return -1; - xwork_addcrawl (xwork, job); + xwork_addcrawl(xwork, job); - return 0; + return 0; } - struct dirjob * -xwork_pick (struct xwork *xwork, int block) +xwork_pick(struct xwork *xwork, int block) { - struct dirjob *job = NULL; - struct list_head *head = NULL; + struct dirjob *job = NULL; + struct list_head *head = NULL; - head = &xwork->crawl.list; + head = &xwork->crawl.list; - pthread_mutex_lock (&xwork->mutex); - { - for (;;) { - if (xwork->stop) - break; - - if (!list_empty (head)) { - job = list_entry (head->next, typeof(*job), - list); - list_del_init (&job->list); - break; - } - - if (((xwork->count * 2) == xwork->idle) && - list_empty (&xwork->crawl.list)) { - /* no outstanding jobs, and no - active workers - */ - tdbg ("Jobless. Terminating\n"); - xwork->stop = 1; - pthread_cond_broadcast (&xwork->cond); - break; - } - - if (!block) - break; - - xwork->idle++; - pthread_cond_wait (&xwork->cond, &xwork->mutex); - xwork->idle--; - } + pthread_mutex_lock(&xwork->mutex); + { + for (;;) { + if (xwork->stop) + break; + + if (!list_empty(head)) { + job = list_entry(head->next, typeof(*job), list); + list_del_init(&job->list); + break; + } + + if (((xwork->count * 2) == xwork->idle) && + list_empty(&xwork->crawl.list)) { + /* no outstanding jobs, and no + active workers + */ + tdbg("Jobless. Terminating\n"); + xwork->stop = 1; + pthread_cond_broadcast(&xwork->cond); + break; + } + + if (!block) + break; + + xwork->idle++; + pthread_cond_wait(&xwork->cond, &xwork->mutex); + xwork->idle--; } - pthread_mutex_unlock (&xwork->mutex); + } + pthread_mutex_unlock(&xwork->mutex); - return job; + return job; } int -skip_name (const char *dirname, const char *name) +skip_name(const char *dirname, const char *name) { - if (strcmp (name, ".") == 0) - return 1; + if (strcmp(name, ".") == 0) + return 1; - if (strcmp (name, "..") == 0) - return 1; + if (strcmp(name, "..") == 0) + return 1; - if (strcmp (name, "changelogs") == 0) - return 1; + if (strcmp(name, "changelogs") == 0) + return 1; - if (strcmp (name, "health_check") == 0) - return 1; + if (strcmp(name, "health_check") == 0) + return 1; - if (strcmp (name, "indices") == 0) - return 1; + if (strcmp(name, "indices") == 0) + return 1; - if (strcmp (name, "landfill") == 0) - return 1; + if (strcmp(name, "landfill") == 0) + return 1; - return 0; + return 0; } int -skip_stat (struct dirjob *job, const char *name) +skip_stat(struct dirjob *job, const char *name) { - if (job == NULL) - return 0; - - if (strcmp (job->dirname, ".glusterfs") == 0) { - tdbg ("Directly adding directories under .glusterfs " - "to global list: %s\n", name); - return 1; - } + if (job == NULL) + return 0; - if (job->parent != NULL) { - if (strcmp (job->parent->dirname, ".glusterfs") == 0) { - tdbg ("Directly adding directories under .glusterfs/XX " - "to global list: %s\n", name); - return 1; - } + if (strcmp(job->dirname, ".glusterfs") == 0) { + tdbg( + "Directly adding directories under .glusterfs " + "to global list: %s\n", + name); + return 1; + } + + if (job->parent != NULL) { + if (strcmp(job->parent->dirname, ".glusterfs") == 0) { + tdbg( + "Directly adding directories under .glusterfs/XX " + "to global list: %s\n", + name); + return 1; } + } - return 0; + return 0; } int -xworker_do_crawl (struct xwork *xwork, struct dirjob *job) +xworker_do_crawl(struct xwork *xwork, struct dirjob *job) { - DIR *dirp = NULL; - int ret = -1; - int boff; - int plen; - struct dirent *result; - char dbuf[512]; - char *path = NULL; - struct dirjob *cjob = NULL; - struct stat statbuf = {0,}; - char gfid_path[4096] = {0,}; - - - plen = strlen (job->dirname) + 256 + 2; - path = alloca (plen); - - tdbg ("Entering: %s\n", job->dirname); - - dirp = opendir (job->dirname); - if (!dirp) { - terr ("opendir failed on %s (%s)\n", job->dirname, - strerror (errno)); + DIR *dirp = NULL; + int ret = -1; + int boff; + int plen; + char *path = NULL; + struct dirjob *cjob = NULL; + struct stat statbuf = { + 0, + }; + struct dirent *entry; + struct dirent scratch[2] = { + { + 0, + }, + }; + char gfid_path[PATH_MAX] = { + 0, + }; + + plen = strlen(job->dirname) + 256 + 2; + path = alloca(plen); + + tdbg("Entering: %s\n", job->dirname); + + dirp = sys_opendir(job->dirname); + if (!dirp) { + terr("opendir failed on %s (%s)\n", job->dirname, strerror(errno)); + goto out; + } + + boff = sprintf(path, "%s/", job->dirname); + + for (;;) { + errno = 0; + entry = sys_readdir(dirp, scratch); + if (!entry || errno != 0) { + if (errno != 0) { + err("readdir(%s): %s\n", job->dirname, strerror(errno)); + ret = errno; goto out; + } + break; } - boff = sprintf (path, "%s/", job->dirname); + if (entry->d_ino == 0) + continue; + + if (skip_name(job->dirname, entry->d_name)) + continue; + + /* It is sure that, children and grandchildren of .glusterfs + * are directories, just add them to global queue. + */ + if (skip_stat(job, entry->d_name)) { + strncpy(path + boff, entry->d_name, (plen - boff)); + cjob = dirjob_new(path, job); + if (!cjob) { + err("dirjob_new(%s): %s\n", path, strerror(errno)); + ret = -1; + goto out; + } + xwork_addcrawl(xwork, cjob); + continue; + } - for (;;) { - ret = readdir_r (dirp, (struct dirent *)dbuf, &result); - if (ret) { - err ("readdir_r(%s): %s\n", job->dirname, - strerror (errno)); - goto out; - } - - if (!result) /* EOF */ - break; - - if (result->d_ino == 0) - continue; - - if (skip_name (job->dirname, result->d_name)) - continue; - - /* It is sure that, children and grandchildren of .glusterfs - * are directories, just add them to global queue. - */ - if (skip_stat (job, result->d_name)) { - strncpy (path + boff, result->d_name, (plen-boff)); - cjob = dirjob_new (path, job); - if (!cjob) { - err ("dirjob_new(%s): %s\n", - path, strerror (errno)); - ret = -1; - goto out; - } - xwork_addcrawl (xwork, cjob); - continue; - } - - strcpy (gfid_path, slavemnt); - strcat (gfid_path, "/.gfid/"); - strcat (gfid_path, result->d_name); - ret = lstat (gfid_path, &statbuf); - - if (ret && errno == ENOENT) { - out ("%s\n", result->d_name); - BUMP (skipped_gfids); - } - - if (ret && errno != ENOENT) { - err ("stat on slave failed(%s): %s\n", - gfid_path, strerror (errno)); - goto out; - } + (void)snprintf(gfid_path, sizeof(gfid_path), "%s/.gfid/%s", slavemnt, + entry->d_name); + ret = sys_lstat(gfid_path, &statbuf); + + if (ret && errno == ENOENT) { + out("%s\n", entry->d_name); + BUMP(skipped_gfids); } - ret = 0; + if (ret && errno != ENOENT) { + err("stat on slave failed(%s): %s\n", gfid_path, strerror(errno)); + goto out; + } + } + + ret = 0; out: - if (dirp) - closedir (dirp); + if (dirp) + (void)sys_closedir(dirp); - return ret; + return ret; } - void * -xworker_crawl (void *data) +xworker_crawl(void *data) { - struct xwork *xwork = data; - struct dirjob *job = NULL; - int ret = -1; + struct xwork *xwork = data; + struct dirjob *job = NULL; + int ret = -1; - while ((job = xwork_pick (xwork, 0))) { - ret = xworker_do_crawl (xwork, job); - dirjob_ret (job, ret); - } + while ((job = xwork_pick(xwork, 0))) { + ret = xworker_do_crawl(xwork, job); + dirjob_ret(job, ret); + } - return NULL; + return NULL; } int -xwork_fini (struct xwork *xwork, int stop) +xwork_fini(struct xwork *xwork, int stop) { - int i = 0; - int ret = 0; - void *tret = 0; - - pthread_mutex_lock (&xwork->mutex); - { - xwork->stop = (xwork->stop || stop); - pthread_cond_broadcast (&xwork->cond); - } - pthread_mutex_unlock (&xwork->mutex); - - for (i = 0; i < xwork->count; i++) { - pthread_join (xwork->cthreads[i], &tret); - tdbg ("CThread id %ld returned %p\n", - xwork->cthreads[i], tret); - } - - if (debug) { - assert (xwork->rootjob->refcnt == 1); - dirjob_ret (xwork->rootjob, 0); - } - - if (stats) - pthread_spin_destroy(&stats_lock); - - return ret; + int i = 0; + int ret = 0; + void *tret = 0; + + pthread_mutex_lock(&xwork->mutex); + { + xwork->stop = (xwork->stop || stop); + pthread_cond_broadcast(&xwork->cond); + } + pthread_mutex_unlock(&xwork->mutex); + + for (i = 0; i < xwork->count; i++) { + pthread_join(xwork->cthreads[i], &tret); + tdbg("CThread id %ld returned %p\n", xwork->cthreads[i], tret); + } + + if (debug) { + assert(xwork->rootjob->refcnt == 1); + dirjob_ret(xwork->rootjob, 0); + } + + if (stats) + pthread_spin_destroy(&stats_lock); + + return ret; } - int -xwork_init (struct xwork *xwork, int count) +xwork_init(struct xwork *xwork, int count) { - int i = 0; - int ret = 0; - struct dirjob *rootjob = NULL; + int i = 0; + int ret = 0; + struct dirjob *rootjob = NULL; - if (stats) - pthread_spin_init (&stats_lock, PTHREAD_PROCESS_PRIVATE); + if (stats) + pthread_spin_init(&stats_lock, PTHREAD_PROCESS_PRIVATE); - pthread_mutex_init (&xwork->mutex, NULL); - pthread_cond_init (&xwork->cond, NULL); + pthread_mutex_init(&xwork->mutex, NULL); + pthread_cond_init(&xwork->cond, NULL); - INIT_LIST_HEAD (&xwork->crawl.list); + INIT_LIST_HEAD(&xwork->crawl.list); - rootjob = dirjob_new (".glusterfs", NULL); - if (debug) - xwork->rootjob = dirjob_ref (rootjob); + rootjob = dirjob_new(".glusterfs", NULL); + if (debug) + xwork->rootjob = dirjob_ref(rootjob); - xwork_addcrawl (xwork, rootjob); + xwork_addcrawl(xwork, rootjob); - xwork->count = count; - for (i = 0; i < count; i++) { - ret = pthread_create (&xwork->cthreads[i], NULL, - xworker_crawl, xwork); - if (ret) - break; - tdbg ("Spawned crawler %d thread %ld\n", i, - xwork->cthreads[i]); - } + xwork->count = count; + for (i = 0; i < count; i++) { + ret = pthread_create(&xwork->cthreads[i], NULL, xworker_crawl, xwork); + if (ret) + break; + tdbg("Spawned crawler %d thread %ld\n", i, xwork->cthreads[i]); + } - return ret; + return ret; } - int -xfind (const char *basedir) +xfind(const char *basedir) { - struct xwork xwork; - int ret = 0; - char *cwd = NULL; - - ret = chdir (basedir); - if (ret) { - err ("%s: %s\n", basedir, strerror (errno)); - return ret; - } + struct xwork xwork; + int ret = 0; + char *cwd = NULL; - cwd = getcwd (0, 0); - if (!cwd) { - err ("getcwd(): %s\n", strerror (errno)); - return -1; - } + ret = chdir(basedir); + if (ret) { + err("%s: %s\n", basedir, strerror(errno)); + return ret; + } - tdbg ("Working directory: %s\n", cwd); - free (cwd); + cwd = getcwd(0, 0); + if (!cwd) { + err("getcwd(): %s\n", strerror(errno)); + return -1; + } - memset (&xwork, 0, sizeof (xwork)); + tdbg("Working directory: %s\n", cwd); + free(cwd); - ret = xwork_init (&xwork, workers); - if (ret == 0) - xworker_crawl (&xwork); + memset(&xwork, 0, sizeof(xwork)); - ret = xwork_fini (&xwork, ret); - stats_dump (); + ret = xwork_init(&xwork, workers); + if (ret == 0) + xworker_crawl(&xwork); - return ret; + ret = xwork_fini(&xwork, ret); + stats_dump(); + + return ret; } static char * -parse_and_validate_args (int argc, char *argv[]) +parse_and_validate_args(int argc, char *argv[]) { - char *basedir = NULL; - struct stat d = {0, }; - int ret = -1; + char *basedir = NULL; + struct stat d = { + 0, + }; + int ret = -1; #ifndef __FreeBSD__ - unsigned char volume_id[16]; + unsigned char volume_id[16]; #endif /* __FreeBSD__ */ - char *slv_mnt = NULL; + char *slv_mnt = NULL; - if (argc != 4) { - err ("Usage: %s <DIR> <SLAVE-VOL-MOUNT> <CRAWL-THREAD-COUNT>\n", - argv[0]); - return NULL; - } + if (argc != 4) { + err("Usage: %s <DIR> <SLAVE-VOL-MOUNT> <CRAWL-THREAD-COUNT>\n", + argv[0]); + return NULL; + } - basedir = argv[1]; - ret = lstat (basedir, &d); - if (ret) { - err ("%s: %s\n", basedir, strerror (errno)); - return NULL; - } + basedir = argv[1]; + ret = sys_lstat(basedir, &d); + if (ret) { + err("%s: %s\n", basedir, strerror(errno)); + return NULL; + } #ifndef __FreeBSD__ - ret = lgetxattr (basedir, "trusted.glusterfs.volume-id", - volume_id, 16); - if (ret != 16) { - err ("%s:Not a valid brick path.\n", basedir); - return NULL; - } + ret = sys_lgetxattr(basedir, "trusted.glusterfs.volume-id", volume_id, 16); + if (ret != 16) { + err("%s:Not a valid brick path.\n", basedir); + return NULL; + } #endif /* __FreeBSD__ */ - slv_mnt = argv[2]; - ret = lstat (slv_mnt, &d); - if (ret) { - err ("%s: %s\n", slv_mnt, strerror (errno)); - return NULL; - } - slavemnt = argv[2]; + slv_mnt = argv[2]; + ret = sys_lstat(slv_mnt, &d); + if (ret) { + err("%s: %s\n", slv_mnt, strerror(errno)); + return NULL; + } + slavemnt = argv[2]; - workers = atoi(argv[3]); - if (workers <= 0) - workers = DEFAULT_WORKERS; + workers = atoi(argv[3]); + if (workers <= 0) + workers = DEFAULT_WORKERS; - return basedir; + return basedir; } int -main (int argc, char *argv[]) +main(int argc, char *argv[]) { - char *basedir = NULL; + char *basedir = NULL; - basedir = parse_and_validate_args (argc, argv); - if (!basedir) - return 1; + basedir = parse_and_validate_args(argc, argv); + if (!basedir) + return 1; - xfind (basedir); + xfind(basedir); - return 0; + return 0; } diff --git a/tools/gfind_missing_files/gfid_to_path.py b/tools/gfind_missing_files/gfid_to_path.py index 8362f68b955..01e08a9494a 100644 --- a/tools/gfind_missing_files/gfid_to_path.py +++ b/tools/gfind_missing_files/gfid_to_path.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. diff --git a/tools/gfind_missing_files/gfid_to_path.sh b/tools/gfind_missing_files/gfid_to_path.sh index 20ac6a94fd2..ebe817ac2f3 100644 --- a/tools/gfind_missing_files/gfid_to_path.sh +++ b/tools/gfind_missing_files/gfid_to_path.sh @@ -11,7 +11,7 @@ E_BADARGS=65 -function gfid_to_path() +gfid_to_path () { brick_dir=$1; gfid_file=$(readlink -e $2); @@ -29,7 +29,8 @@ function gfid_to_path() } -function main(){ +main () +{ if [ $# -ne 2 ] then echo "Usage: `basename $0` BRICK_DIR GFID_FILE"; diff --git a/tools/gfind_missing_files/gfind_missing_files.sh b/tools/gfind_missing_files/gfind_missing_files.sh index 07d6befc958..e7aaa0b5dd4 100644 --- a/tools/gfind_missing_files/gfind_missing_files.sh +++ b/tools/gfind_missing_files/gfind_missing_files.sh @@ -14,18 +14,18 @@ SLAVEVOL= #Slave volume SLAVEMNT= #Slave gluster volume mount point WORKERS=4 #Default number of worker threads -function out() +out() { echo "$@"; } -function fatal() +fatal() { out FATAL "$@"; exit 1 } -function ping_host () +ping_host () { ### Use bash internal socket support { @@ -39,7 +39,7 @@ function ping_host () } 1>&2 2>/dev/null } -function mount_slave() +mount_slave() { local i; # inode number SSH_PORT=22 @@ -59,9 +59,9 @@ function mount_slave() [ "x$i" = "x1" ] || fatal "Could not mount volume $2 on $SLAVEMNT Please check host and volume exists"; } -function parse_cli() +parse_cli() { - if [[ $# -ne 4 ]]; then + if [ "$#" -ne 4 ]; then echo "Usage: gfind_missing_files <brick-path> <slave-host> <slave-vol> <OUTFILE>" exit 1 else @@ -76,7 +76,7 @@ function parse_cli() fi } -function main() +main() { parse_cli "$@"; diff --git a/tools/glusterfind/Makefile.am b/tools/glusterfind/Makefile.am index c99a3ddcb37..f17dbdb228e 100644 --- a/tools/glusterfind/Makefile.am +++ b/tools/glusterfind/Makefile.am @@ -1,7 +1,24 @@ SUBDIRS = src -EXTRA_DIST = +EXTRA_DIST = S57glusterfind-delete-post.py glusterfind +if WITH_SERVER bin_SCRIPTS = glusterfind +endif CLEANFILES = $(bin_SCRIPTS) + +if WITH_SERVER +deletehookscriptsdir = $(GLUSTERFS_LIBEXECDIR)/glusterfind/ +deletehookscripts_SCRIPTS = S57glusterfind-delete-post.py + +uninstall-local: + rm -f $(DESTDIR)$(GLUSTERD_WORKDIR)/hooks/1/delete/post/S57glusterfind-delete-post + +install-data-local: + $(mkdir_p) $(DESTDIR)$(GLUSTERD_WORKDIR)/glusterfind/.keys + $(mkdir_p) $(DESTDIR)$(GLUSTERD_WORKDIR)/hooks/1/delete/post/ + rm -f $(DESTDIR)$(GLUSTERD_WORKDIR)/hooks/1/delete/post/S57glusterfind-delete-post + ln -s $(GLUSTERFS_LIBEXECDIR)/glusterfind/S57glusterfind-delete-post.py \ + $(DESTDIR)$(GLUSTERD_WORKDIR)/hooks/1/delete/post/S57glusterfind-delete-post +endif diff --git a/tools/glusterfind/S57glusterfind-delete-post.py b/tools/glusterfind/S57glusterfind-delete-post.py new file mode 100755 index 00000000000..5beece220f0 --- /dev/null +++ b/tools/glusterfind/S57glusterfind-delete-post.py @@ -0,0 +1,69 @@ +#!/usr/bin/python3 +import os +import shutil +from errno import ENOENT +from subprocess import Popen, PIPE +from argparse import ArgumentParser + + +DEFAULT_GLUSTERD_WORKDIR = "/var/lib/glusterd" + + +def handle_rm_error(func, path, exc_info): + if exc_info[1].errno == ENOENT: + return + + raise exc_info[1] + + +def get_glusterd_workdir(): + p = Popen(["gluster", "system::", "getwd"], + stdout=PIPE, stderr=PIPE, universal_newlines=True) + + out, _ = p.communicate() + + if p.returncode == 0: + return out.strip() + else: + return DEFAULT_GLUSTERD_WORKDIR + + +def get_args(): + parser = ArgumentParser(description="Volume delete post hook script") + parser.add_argument("--volname") + return parser.parse_args() + + +def main(): + args = get_args() + glusterfind_dir = os.path.join(get_glusterd_workdir(), "glusterfind") + + # Check all session directories, if any directory found for + # the deleted volume, cleanup all the session directories + try: + ls_glusterfind_dir = os.listdir(glusterfind_dir) + except OSError: + ls_glusterfind_dir = [] + + for session in ls_glusterfind_dir: + # don't blow away the keys directory + if session == ".keys": + continue + + # Possible session directory + volume_session_path = os.path.join(glusterfind_dir, + session, + args.volname) + if os.path.exists(volume_session_path): + shutil.rmtree(volume_session_path, onerror=handle_rm_error) + + # Try to Remove directory, if any other dir exists for different + # volume, then rmdir will fail with ENOTEMPTY which is fine + try: + os.rmdir(os.path.join(glusterfind_dir, session)) + except (OSError, IOError): + pass + + +if __name__ == "__main__": + main() diff --git a/tools/glusterfind/glusterfind.in b/tools/glusterfind/glusterfind.in index cff8973980a..ca154b625dd 100644 --- a/tools/glusterfind/glusterfind.in +++ b/tools/glusterfind/glusterfind.in @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. @@ -10,6 +10,7 @@ import sys sys.path.insert(1, '@GLUSTERFS_LIBEXECDIR@/') +sys.path.insert(1, '@GLUSTERFS_LIBEXECDIR@/glusterfind') from glusterfind.main import main diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am index 458b820fd19..43b6141b01c 100644 --- a/tools/glusterfind/src/Makefile.am +++ b/tools/glusterfind/src/Makefile.am @@ -1,14 +1,16 @@ -glusterfinddir = $(libexecdir)/glusterfs/glusterfind +glusterfinddir = $(GLUSTERFS_LIBEXECDIR)/glusterfind +if WITH_SERVER glusterfind_PYTHON = conf.py utils.py __init__.py \ - main.py libgfchangelog.py + main.py libgfchangelog.py changelogdata.py gfind_py2py3.py -glusterfind_SCRIPTS = changelog.py nodecleanup.py \ +glusterfind_SCRIPTS = changelog.py nodeagent.py \ brickfind.py glusterfind_DATA = tool.conf +endif -EXTRA_DIST = changelog.py nodecleanup.py brickfind.py \ - tool.conf +EXTRA_DIST = changelog.py nodeagent.py brickfind.py \ + tool.conf changelogdata.py CLEANFILES = diff --git a/tools/glusterfind/src/__init__.py b/tools/glusterfind/src/__init__.py index eb941c6d67c..1753698b5fa 100644 --- a/tools/glusterfind/src/__init__.py +++ b/tools/glusterfind/src/__init__.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py index 1090f408e28..73b6350188d 100644 --- a/tools/glusterfind/src/brickfind.py +++ b/tools/glusterfind/src/brickfind.py @@ -1,4 +1,5 @@ -#!/usr/bin/env python +#!/usr/bin/python3 +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. @@ -12,7 +13,11 @@ import os import sys import logging from argparse import ArgumentParser, RawDescriptionHelpFormatter -from errno import ENOENT +try: + import urllib.parse as urllib +except ImportError: + import urllib +import time from utils import mkdirp, setup_logger, create_file, output_write, find import conf @@ -36,36 +41,27 @@ def brickfind_crawl(brick, args): with open(args.outfile, "a+") as fout: brick_path_len = len(brick) - def mtime_filter(path): - try: - st = os.lstat(path) - except (OSError, IOError) as e: - if e.errno == ENOENT: - st = None - else: - raise - - if st and (st.st_mtime > args.start or st.st_ctime > args.start): - return True - - return False - - def output_callback(path): + def output_callback(path, filter_result, is_dir): path = path.strip() path = path[brick_path_len+1:] - output_write(fout, path, args.output_prefix) + + if args.type == "both": + output_write(fout, path, args.output_prefix, + encode=(not args.no_encode), tag=args.tag, + field_separator=args.field_separator) + else: + if (is_dir and args.type == "d") or ( + (not is_dir) and args.type == "f"): + output_write(fout, path, args.output_prefix, + encode=(not args.no_encode), tag=args.tag, + field_separator=args.field_separator) ignore_dirs = [os.path.join(brick, dirname) for dirname in conf.get_opt("brick_ignore_dirs").split(",")] - if args.full: - find(brick, callback_func=output_callback, - ignore_dirs=ignore_dirs) - else: - find(brick, callback_func=output_callback, - filter_func=mtime_filter, - ignore_dirs=ignore_dirs) + find(brick, callback_func=output_callback, + ignore_dirs=ignore_dirs) fout.flush() os.fsync(fout.fileno()) @@ -77,19 +73,35 @@ def _get_args(): parser.add_argument("session", help="Session Name") parser.add_argument("volume", help="Volume Name") + parser.add_argument("node", help="Node Name") parser.add_argument("brick", help="Brick Name") parser.add_argument("outfile", help="Output File") - parser.add_argument("start", help="Start Time", type=float) + parser.add_argument("tag", help="Tag to prefix file name with") + parser.add_argument("--only-query", help="Only query, No session update", + action="store_true") parser.add_argument("--debug", help="Debug", action="store_true") - parser.add_argument("--full", help="Full Find", action="store_true") + parser.add_argument("--no-encode", + help="Do not encode path in outfile", + action="store_true") parser.add_argument("--output-prefix", help="File prefix in output", default=".") + parser.add_argument('--type', help="type: f, f-files only" + " d, d-directories only, by default = both", + default='both') + parser.add_argument("--field-separator", help="Field separator", + default=" ") return parser.parse_args() if __name__ == "__main__": args = _get_args() + session_dir = os.path.join(conf.get_opt("session_dir"), args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + status_file_pre = status_file + ".pre" + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), exit_on_err=True) log_file = os.path.join(conf.get_opt("log_dir"), @@ -97,5 +109,10 @@ if __name__ == "__main__": args.volume, "brickfind.log") setup_logger(logger, log_file, args.debug) + + time_to_update = int(time.time()) brickfind_crawl(args.brick, args) + if not args.only_query: + with open(status_file_pre, "w") as f: + f.write(str(time_to_update)) sys.exit(0) diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py index eb73635fb32..a5e9ea4288f 100644 --- a/tools/glusterfind/src/changelog.py +++ b/tools/glusterfind/src/changelog.py @@ -1,4 +1,5 @@ -#!/usr/bin/env python +#!/usr/bin/python3 +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. @@ -12,14 +13,22 @@ import os import sys import time import xattr -from errno import ENOENT import logging +from gfind_py2py3 import bytearray_to_str from argparse import ArgumentParser, RawDescriptionHelpFormatter import hashlib +try: + import urllib.parse as urllib +except ImportError: + import urllib +import codecs import libgfchangelog -from utils import create_file, mkdirp, execute, symlink_gfid_to_path -from utils import fail, setup_logger, output_write, find +from utils import mkdirp, symlink_gfid_to_path +from utils import fail, setup_logger, find +from utils import get_changelog_rollover_time +from utils import output_path_prepare +from changelogdata import ChangelogData import conf @@ -35,168 +44,258 @@ history_turn_time = 0 logger = logging.getLogger() -def gfid_to_path_using_batchfind(brick, gfids_file, output_file): +def pgfid_to_path(brick, changelog_data): """ - find -samefile gets the inode number and crawls entire namespace - to get the list of files/dirs having same inode number. - Do find without any option, except the ignore directory option, - print the output in <INODE_NUM> <PATH> format, use this output - to look into in-memory dictionary of inode numbers got from the - list of GFIDs + For all the pgfids in table, converts into path using recursive + readlink. """ - with open(output_file, "a+") as fout: - inode_dict = {} - with open(gfids_file) as f: - for gfid in f: - gfid = gfid.strip() - backend_path = os.path.join(brick, ".glusterfs", - gfid[0:2], gfid[2:4], gfid) - - try: - inode_dict[str(os.stat(backend_path).st_ino)] = 1 - except (IOError, OSError) as e: - if e.errno == ENOENT: - continue - else: - fail("%s Failed to convert to path from " - "GFID %s: %s" % (brick, gfid, e), logger=logger) - - if not inode_dict: - return - - def inode_filter(path): + # pgfid1 to path1 in case of CREATE/MKNOD/MKDIR/LINK/SYMLINK + for row in changelog_data.gfidpath_get_distinct("pgfid1", {"path1": ""}): + # In case of Data/Metadata only, pgfid1 will not be there + if row[0] == "": + continue + + try: + path = symlink_gfid_to_path(brick, row[0]) + path = output_path_prepare(path, args) + changelog_data.gfidpath_set_path1(path, row[0]) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) + continue + + # pgfid2 to path2 in case of RENAME + for row in changelog_data.gfidpath_get_distinct("pgfid2", + {"type": "RENAME", + "path2": ""}): + # Only in case of Rename pgfid2 exists + if row[0] == "": + continue + + try: + path = symlink_gfid_to_path(brick, row[0]) + path = output_path_prepare(path, args) + changelog_data.gfidpath_set_path2(path, row[0]) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) + continue + + +def populate_pgfid_and_inodegfid(brick, changelog_data): + """ + For all the DATA/METADATA modifications GFID, + If symlink, directly convert to Path using Readlink. + If not symlink, try to get PGFIDs via xattr query and populate it + to pgfid table, collect inodes in inodegfid table + """ + for row in changelog_data.gfidpath_get({"path1": "", "type": "MODIFY"}): + gfid = row[3].strip() + p = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid) + if os.path.islink(p): + # It is a Directory if GFID backend path is symlink try: - st = os.lstat(path) - except (OSError, IOError) as e: - if e.errno == ENOENT: - st = None - else: - raise - - if st and inode_dict.get(str(st.st_ino), None): - return True - - return False + path = symlink_gfid_to_path(brick, gfid) + path = output_path_prepare(path, args) + changelog_data.gfidpath_update({"path1": path}, + {"gfid": gfid}) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) + continue + else: + try: + # INODE and GFID to inodegfid table + changelog_data.inodegfid_add(os.stat(p).st_ino, gfid) + file_xattrs = xattr.list(p) + for x in file_xattrs: + x_str = bytearray_to_str(x) + if x_str.startswith("trusted.pgfid."): + # PGFID in pgfid table + changelog_data.pgfid_add(x_str.split(".")[-1]) + except (IOError, OSError): + # All OS Errors ignored, since failures will be logged + # in End. All GFIDs present in gfidpath table + continue + + +def enum_hard_links_using_gfid2path(brick, gfid, args): + hardlinks = [] + p = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid) + if not os.path.isdir(p): + # we have a symlink or a normal file + try: + file_xattrs = xattr.list(p) + for x in file_xattrs: + x_str = bytearray_to_str(x) + if x_str.startswith("trusted.gfid2path."): + # get the value for the xattr i.e. <PGFID>/<BN> + v = xattr.getxattr(p, x_str) + v_str = bytearray_to_str(v) + pgfid, bn = v_str.split(os.sep) + try: + path = symlink_gfid_to_path(brick, pgfid) + fullpath = os.path.join(path, bn) + fullpath = output_path_prepare(fullpath, args) + hardlinks.append(fullpath) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) + continue + except (IOError, OSError): + pass + return hardlinks - brick_path_len = len(brick) - def output_callback(path): - path = path.strip() - path = path[brick_path_len+1:] - output_write(fout, path, args.output_prefix) +def gfid_to_all_paths_using_gfid2path(brick, changelog_data, args): + path = "" + for row in changelog_data.gfidpath_get({"path1": "", "type": "MODIFY"}): + gfid = row[3].strip() + logger.debug("Processing gfid %s" % gfid) + hardlinks = enum_hard_links_using_gfid2path(brick, gfid, args) - ignore_dirs = [os.path.join(brick, dirname) - for dirname in - conf.get_opt("brick_ignore_dirs").split(",")] - # Length of brick path, to remove from output path - find(brick, callback_func=output_callback, - filter_func=inode_filter, - ignore_dirs=ignore_dirs) + path = ",".join(hardlinks) - fout.flush() - os.fsync(fout.fileno()) + changelog_data.gfidpath_update({"path1": path}, {"gfid": gfid}) -def gfid_to_path_using_pgfid(brick, gfids_file, output_file, outfile_failures): +def gfid_to_path_using_pgfid(brick, changelog_data, args): """ - Parent GFID is saved as xattr, collect Parent GFIDs from all - the files from gfids_file. Convert parent GFID to path and Crawl - each directories to get the list of files/dirs having same inode number. - Do find with maxdepth as 1 and print the output in <INODE_NUM> <PATH> - format, use this output to look into in memory dictionary of inode - numbers got from the list of GFIDs + For all the pgfids collected, Converts to Path and + does readdir on those directories and looks up inodegfid + table for matching inode number. """ - with open(output_file, "a+") as fout: - pgfids = set() - inode_dict = {} - with open(gfids_file) as f: - for gfid in f: - gfid = gfid.strip() - p = os.path.join(brick, - ".glusterfs", - gfid[0:2], - gfid[2:4], - gfid) - if os.path.islink(p): - path = symlink_gfid_to_path(brick, gfid) - output_write(fout, path, args.output_prefix) - else: - try: - inode_dict[str(os.stat(p).st_ino)] = 1 - file_xattrs = xattr.list(p) - num_parent_gfid = 0 - for x in file_xattrs: - if x.startswith("trusted.pgfid."): - num_parent_gfid += 1 - pgfids.add(x.split(".")[-1]) - - if num_parent_gfid == 0: - with open(outfile_failures, "a") as f: - f.write("%s\n" % gfid) - f.flush() - os.fsync(f.fileno()) + populate_pgfid_and_inodegfid(brick, changelog_data) - except (IOError, OSError) as e: - if e.errno == ENOENT: - continue - else: - fail("%s Failed to convert to path from " - "GFID %s: %s" % (brick, gfid, e), - logger=logger) + # If no GFIDs needs conversion to Path + if not changelog_data.inodegfid_exists({"converted": 0}): + return - if not inode_dict: - return + def inode_filter(path): + # Looks in inodegfid table, if exists returns + # inode number else None + try: + st = os.lstat(path) + except (OSError, IOError): + st = None - def inode_filter(path): - try: - st = os.lstat(path) - except (OSError, IOError) as e: - if e.errno == ENOENT: - st = None - else: - raise + if st and changelog_data.inodegfid_exists({"inode": st.st_ino}): + return st.st_ino + + return None - if st and inode_dict.get(str(st.st_ino), None): - return True + # Length of brick path, to remove from output path + brick_path_len = len(brick) - return False + def output_callback(path, inode): + # For each path found, encodes it and updates path1 + # Also updates converted flag in inodegfid table as 1 + path = path.strip() + path = path[brick_path_len+1:] - # Length of brick path, to remove from output path - brick_path_len = len(brick) + path = output_path_prepare(path, args) - def output_callback(path): - path = path.strip() - path = path[brick_path_len+1:] - output_write(fout, path, args.output_prefix) + changelog_data.append_path1(path, inode) + changelog_data.inodegfid_update({"converted": 1}, {"inode": inode}) - ignore_dirs = [os.path.join(brick, dirname) - for dirname in - conf.get_opt("brick_ignore_dirs").split(",")] + ignore_dirs = [os.path.join(brick, dirname) + for dirname in + conf.get_opt("brick_ignore_dirs").split(",")] - for pgfid in pgfids: - path = symlink_gfid_to_path(brick, pgfid) + for row in changelog_data.pgfid_get(): + try: + path = symlink_gfid_to_path(brick, row[0]) find(os.path.join(brick, path), callback_func=output_callback, filter_func=inode_filter, ignore_dirs=ignore_dirs, subdirs_crawl=False) + except (IOError, OSError) as e: + logger.warn("Error converting to path: %s" % e) + continue + + +def gfid_to_path_using_batchfind(brick, changelog_data): + # If all the GFIDs converted using gfid_to_path_using_pgfid + if not changelog_data.inodegfid_exists({"converted": 0}): + return - fout.flush() - os.fsync(fout.fileno()) + def inode_filter(path): + # Looks in inodegfid table, if exists returns + # inode number else None + try: + st = os.lstat(path) + except (OSError, IOError): + st = None + if st and changelog_data.inodegfid_exists({"inode": st.st_ino}): + return st.st_ino -def sort_unique(filename): - execute(["sort", "-u", "-o", filename, filename], - exit_msg="Sort failed", logger=logger) + return None + # Length of brick path, to remove from output path + brick_path_len = len(brick) -def get_changes(brick, hash_dir, log_file, end, args): + def output_callback(path, inode): + # For each path found, encodes it and updates path1 + # Also updates converted flag in inodegfid table as 1 + path = path.strip() + path = path[brick_path_len+1:] + path = output_path_prepare(path, args) + + changelog_data.append_path1(path, inode) + + ignore_dirs = [os.path.join(brick, dirname) + for dirname in + conf.get_opt("brick_ignore_dirs").split(",")] + + # Full Namespace Crawl + find(brick, callback_func=output_callback, + filter_func=inode_filter, + ignore_dirs=ignore_dirs) + + +def parse_changelog_to_db(changelog_data, filename, args): + """ + Parses a Changelog file and populates data in gfidpath table + """ + with codecs.open(filename, encoding="utf-8") as f: + changelogfile = os.path.basename(filename) + for line in f: + data = line.strip().split(" ") + if data[0] == "E" and data[2] in ["CREATE", "MKNOD", "MKDIR"]: + # CREATE/MKDIR/MKNOD + changelog_data.when_create_mknod_mkdir(changelogfile, data) + elif data[0] in ["D", "M"]: + # DATA/META + if not args.only_namespace_changes: + changelog_data.when_data_meta(changelogfile, data) + elif data[0] == "E" and data[2] in ["LINK", "SYMLINK"]: + # LINK/SYMLINK + changelog_data.when_link_symlink(changelogfile, data) + elif data[0] == "E" and data[2] == "RENAME": + # RENAME + changelog_data.when_rename(changelogfile, data) + elif data[0] == "E" and data[2] in ["UNLINK", "RMDIR"]: + # UNLINK/RMDIR + changelog_data.when_unlink_rmdir(changelogfile, data) + + +def get_changes(brick, hash_dir, log_file, start, end, args): """ Makes use of libgfchangelog's history API to get changelogs containing changes from start and end time. Further collects the modified gfids from the changelogs and writes the list of gfid to 'gfid_list' file. """ + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + + # Get previous session + try: + with open(status_file) as f: + start = int(f.read().strip()) + except (ValueError, OSError, IOError): + start = args.start + try: libgfchangelog.cl_init() libgfchangelog.cl_register(brick, hash_dir, log_file, @@ -205,10 +304,7 @@ def get_changes(brick, hash_dir, log_file, end, args): fail("%s Changelog register failed: %s" % (brick, e), logger=logger) # Output files to record GFIDs and GFID to Path failure GFIDs - gfid_list_path = args.outfile + ".gfids" - gfid_list_failures_file = gfid_list_path + ".failures" - create_file(gfid_list_path, exit_on_err=True, logger=logger) - create_file(gfid_list_failures_file, exit_on_err=True, logger=logger) + changelog_data = ChangelogData(args.outfile, args) # Changelogs path(Hard coded to BRICK/.glusterfs/changelogs cl_path = os.path.join(brick, ".glusterfs/changelogs") @@ -216,11 +312,12 @@ def get_changes(brick, hash_dir, log_file, end, args): # Fail if History fails for requested Start and End try: actual_end = libgfchangelog.cl_history_changelog( - cl_path, args.start, end, CHANGELOGAPI_NUM_WORKERS) + cl_path, start, end, CHANGELOGAPI_NUM_WORKERS) except libgfchangelog.ChangelogException as e: - fail("%s Historical Changelogs not available: %s" % (brick, e), - logger=logger) + fail("%s: %s Historical Changelogs not available: %s" % + (args.node, brick, e), logger=logger) + logger.info("[1/4] Starting changelog parsing ...") try: # scan followed by getchanges till scan returns zero. # history_scan() is blocking call, till it gets the number @@ -230,37 +327,51 @@ def get_changes(brick, hash_dir, log_file, end, args): # history_getchanges() changes = [] while libgfchangelog.cl_history_scan() > 0: - changes += libgfchangelog.cl_history_getchanges() - - if changes: - with open(gfid_list_path, 'a+') as fgfid: - for change in changes: - with open(change) as f: - for line in f: - # Space delimited list, collect GFID - details = line.split() - fgfid.write("%s\n" % details[1]) - - libgfchangelog.cl_history_done(change) - fgfid.flush() - os.fsync(fgfid.fileno()) + changes = libgfchangelog.cl_history_getchanges() + + for change in changes: + # Ignore if last processed changelog comes + # again in list + if change.endswith(".%s" % start): + continue + try: + parse_changelog_to_db(changelog_data, change, args) + libgfchangelog.cl_history_done(change) + except IOError as e: + logger.warn("Error parsing changelog file %s: %s" % + (change, e)) + + changelog_data.commit() except libgfchangelog.ChangelogException as e: fail("%s Error during Changelog Crawl: %s" % (brick, e), logger=logger) - # If TS returned from history_changelog is < end time - # then FS crawl may be required, since history is only available - # till TS returned from history_changelog - if actual_end < end: - fail("Partial History available with Changelog", 2, logger=logger) + logger.info("[1/4] Finished changelog parsing.") - sort_unique(gfid_list_path) - gfid_to_path_using_pgfid(brick, gfid_list_path, - args.outfile, gfid_list_failures_file) - gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile) + # Convert all pgfid available from Changelogs + logger.info("[2/4] Starting 'pgfid to path' conversions ...") + pgfid_to_path(brick, changelog_data) + changelog_data.commit() + logger.info("[2/4] Finished 'pgfid to path' conversions.") + # Convert all gfids recorded for data and metadata to all hardlink paths + logger.info("[3/4] Starting 'gfid2path' conversions ...") + gfid_to_all_paths_using_gfid2path(brick, changelog_data, args) + changelog_data.commit() + logger.info("[3/4] Finished 'gfid2path' conversions.") -def changelog_crawl(brick, end, args): + # If some GFIDs fail to get converted from previous step, + # convert using find + logger.info("[4/4] Starting 'gfid to path using batchfind' " + "conversions ...") + gfid_to_path_using_batchfind(brick, changelog_data) + changelog_data.commit() + logger.info("[4/4] Finished 'gfid to path using batchfind' conversions.") + + return actual_end + + +def changelog_crawl(brick, start, end, args): """ Init function, prepares working dir and calls Changelog query """ @@ -269,13 +380,11 @@ def changelog_crawl(brick, end, args): # WORKING_DIR/BRICKHASH/OUTFILE working_dir = os.path.dirname(args.outfile) - brickhash = hashlib.sha1(brick) + brickhash = hashlib.sha1(brick.encode()) brickhash = str(brickhash.hexdigest()) working_dir = os.path.join(working_dir, brickhash) mkdirp(working_dir, exit_on_err=True, logger=logger) - create_file(args.outfile, exit_on_err=True, logger=logger) - create_file(args.outfile + ".gfids", exit_on_err=True, logger=logger) log_file = os.path.join(conf.get_opt("log_dir"), args.session, @@ -283,8 +392,8 @@ def changelog_crawl(brick, end, args): "changelog.%s.log" % brickhash) logger.info("%s Started Changelog Crawl. Start: %s, End: %s" - % (brick, args.start, end)) - get_changes(brick, working_dir, log_file, end, args) + % (brick, start, end)) + return get_changes(brick, working_dir, log_file, start, end, args) def _get_args(): @@ -293,12 +402,23 @@ def _get_args(): parser.add_argument("session", help="Session Name") parser.add_argument("volume", help="Volume Name") + parser.add_argument("node", help="Node Name") parser.add_argument("brick", help="Brick Name") parser.add_argument("outfile", help="Output File") parser.add_argument("start", help="Start Time", type=int) + parser.add_argument("end", help="End Time", type=int) + parser.add_argument("--only-query", help="Query mode only (no session)", + action="store_true") parser.add_argument("--debug", help="Debug", action="store_true") + parser.add_argument("--no-encode", + help="Do not encode path in outfile", + action="store_true") parser.add_argument("--output-prefix", help="File prefix in output", default=".") + parser.add_argument("--type",default="both") + parser.add_argument("-N", "--only-namespace-changes", + help="List only namespace changes", + action="store_true") return parser.parse_args() @@ -312,6 +432,38 @@ if __name__ == "__main__": args.volume, "changelog.log") setup_logger(logger, log_file, args.debug) - end = int(time.time()) - int(conf.get_opt("changelog_rollover_time")) - changelog_crawl(args.brick, end, args) + + session_dir = os.path.join(conf.get_opt("session_dir"), args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + status_file_pre = status_file + ".pre" + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + + end = -1 + if args.only_query: + start = args.start + end = args.end + else: + try: + with open(status_file) as f: + start = int(f.read().strip()) + except (ValueError, OSError, IOError): + start = args.start + + # end time is optional; so a -1 may be sent to use the default method of + # identifying the end time + if end == -1: + end = int(time.time()) - get_changelog_rollover_time(args.volume) + + logger.info("%s Started Changelog Crawl - Start: %s End: %s" % (args.brick, + start, + end)) + actual_end = changelog_crawl(args.brick, start, end, args) + if not args.only_query: + with open(status_file_pre, "w") as f: + f.write(str(actual_end)) + + logger.info("%s Finished Changelog Crawl - End: %s" % (args.brick, + actual_end)) sys.exit(0) diff --git a/tools/glusterfind/src/changelogdata.py b/tools/glusterfind/src/changelogdata.py new file mode 100644 index 00000000000..641593cf4b1 --- /dev/null +++ b/tools/glusterfind/src/changelogdata.py @@ -0,0 +1,440 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import sqlite3 +import os + +from utils import RecordType, unquote_plus_space_newline +from utils import output_path_prepare + + +class OutputMerger(object): + """ + Class to merge the output files collected from + different nodes + """ + def __init__(self, db_path, all_dbs): + self.conn = sqlite3.connect(db_path) + self.cursor = self.conn.cursor() + self.cursor_reader = self.conn.cursor() + query = "DROP TABLE IF EXISTS finallist" + self.cursor.execute(query) + + query = """ + CREATE TABLE finallist( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts VARCHAR, + type VARCHAR, + gfid VARCHAR, + path1 VARCHAR, + path2 VARCHAR, + UNIQUE (type, path1, path2) ON CONFLICT IGNORE + ) + """ + self.cursor.execute(query) + + # If node database exists, read each db and insert into + # final table. Ignore if combination of TYPE PATH1 PATH2 + # already exists + for node_db in all_dbs: + if os.path.exists(node_db): + conn = sqlite3.connect(node_db) + cursor = conn.cursor() + query = """ + SELECT ts, type, gfid, path1, path2 + FROM gfidpath + WHERE path1 != '' + ORDER BY id ASC + """ + for row in cursor.execute(query): + self.add_if_not_exists(row[0], row[1], row[2], + row[3], row[4]) + + self.conn.commit() + + def add_if_not_exists(self, ts, ty, gfid, path1, path2=""): + # Adds record to finallist only if not exists + query = """ + INSERT INTO finallist(ts, type, gfid, path1, path2) + VALUES(?, ?, ?, ?, ?) + """ + self.cursor.execute(query, (ts, ty, gfid, path1, path2)) + + def get(self): + query = """SELECT type, path1, path2 FROM finallist + ORDER BY ts ASC, id ASC""" + return self.cursor_reader.execute(query) + + def get_failures(self): + query = """ + SELECT gfid + FROM finallist + WHERE path1 = '' OR (path2 = '' AND type = 'RENAME') + """ + return self.cursor_reader.execute(query) + + +class ChangelogData(object): + def __init__(self, dbpath, args): + self.conn = sqlite3.connect(dbpath) + self.cursor = self.conn.cursor() + self.cursor_reader = self.conn.cursor() + self._create_table_gfidpath() + self._create_table_pgfid() + self._create_table_inodegfid() + self.args = args + self.path_sep = "/" + + def _create_table_gfidpath(self): + drop_table = "DROP TABLE IF EXISTS gfidpath" + self.cursor.execute(drop_table) + + create_table = """ + CREATE TABLE gfidpath( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts VARCHAR, + type VARCHAR, + gfid VARCHAR(40), + pgfid1 VARCHAR(40) DEFAULT '', + bn1 VARCHAR(500) DEFAULT '', + pgfid2 VARCHAR(40) DEFAULT '', + bn2 VARCHAR(500) DEFAULT '', + path1 VARCHAR DEFAULT '', + path2 VARCHAR DEFAULT '' + ) + """ + self.cursor.execute(create_table) + + create_index = """ + CREATE INDEX gfid_index ON gfidpath(gfid); + """ + self.cursor.execute(create_index) + + def _create_table_inodegfid(self): + drop_table = "DROP TABLE IF EXISTS inodegfid" + self.cursor.execute(drop_table) + + create_table = """ + CREATE TABLE inodegfid( + inode INTEGER PRIMARY KEY, + gfid VARCHAR(40), + converted INTEGER DEFAULT 0, + UNIQUE (inode, gfid) ON CONFLICT IGNORE + ) + """ + self.cursor.execute(create_table) + + def _create_table_pgfid(self): + drop_table = "DROP TABLE IF EXISTS pgfid" + self.cursor.execute(drop_table) + + create_table = """ + CREATE TABLE pgfid( + pgfid VARCHAR(40) PRIMARY KEY, + UNIQUE (pgfid) ON CONFLICT IGNORE + ) + """ + self.cursor.execute(create_table) + + def _get(self, tablename, filters): + # SELECT * FROM <TABLENAME> WHERE <CONDITION> + params = [] + query = "SELECT * FROM %s WHERE 1=1" % tablename + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + return self.cursor_reader.execute(query, params) + + def _get_distinct(self, tablename, distinct_field, filters): + # SELECT DISTINCT <COL> FROM <TABLENAME> WHERE <CONDITION> + params = [] + query = "SELECT DISTINCT %s FROM %s WHERE 1=1" % (distinct_field, + tablename) + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + return self.cursor_reader.execute(query, params) + + def _delete(self, tablename, filters): + # DELETE FROM <TABLENAME> WHERE <CONDITIONS> + query = "DELETE FROM %s WHERE 1=1" % tablename + params = [] + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + self.cursor.execute(query, params) + + def _add(self, tablename, data): + # INSERT INTO <TABLENAME>(<col1>, <col2>..) VALUES(?,?..) + query = "INSERT INTO %s(" % tablename + fields = [] + params = [] + for key, value in data.items(): + fields.append(key) + params.append(value) + + values_substitute = len(fields)*["?"] + query += "%s) VALUES(%s)" % (",".join(fields), + ",".join(values_substitute)) + self.cursor.execute(query, params) + + def _update(self, tablename, data, filters): + # UPDATE <TABLENAME> SET col1 = ?,.. WHERE col1=? AND .. + params = [] + update_fields = [] + for key, value in data.items(): + update_fields.append("%s = ?" % key) + params.append(value) + + query = "UPDATE %s SET %s WHERE 1 = 1" % (tablename, + ", ".join(update_fields)) + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + self.cursor.execute(query, params) + + def _exists(self, tablename, filters): + if not filters: + return False + + query = "SELECT COUNT(1) FROM %s WHERE 1=1" % tablename + params = [] + + for key, value in filters.items(): + query += " AND %s = ?" % key + params.append(value) + + self.cursor.execute(query, params) + row = self.cursor.fetchone() + return True if row[0] > 0 else False + + def gfidpath_add(self, changelogfile, ty, gfid, pgfid1="", bn1="", + pgfid2="", bn2="", path1="", path2=""): + self._add("gfidpath", { + "ts": changelogfile.split(".")[-1], + "type": ty, + "gfid": gfid, + "pgfid1": pgfid1, + "bn1": bn1, + "pgfid2": pgfid2, + "bn2": bn2, + "path1": path1, + "path2": path2 + }) + + def gfidpath_update(self, data, filters): + self._update("gfidpath", data, filters) + + def gfidpath_delete(self, filters): + self._delete("gfidpath", filters) + + def gfidpath_exists(self, filters): + return self._exists("gfidpath", filters) + + def gfidpath_get(self, filters={}): + return self._get("gfidpath", filters) + + def gfidpath_get_distinct(self, distinct_field, filters={}): + return self._get_distinct("gfidpath", distinct_field, filters) + + def pgfid_add(self, pgfid): + self._add("pgfid", { + "pgfid": pgfid + }) + + def pgfid_update(self, data, filters): + self._update("pgfid", data, filters) + + def pgfid_get(self, filters={}): + return self._get("pgfid", filters) + + def pgfid_get_distinct(self, distinct_field, filters={}): + return self._get_distinct("pgfid", distinct_field, filters) + + def pgfid_exists(self, filters): + return self._exists("pgfid", filters) + + def inodegfid_add(self, inode, gfid, converted=0): + self._add("inodegfid", { + "inode": inode, + "gfid": gfid, + "converted": converted + }) + + def inodegfid_update(self, data, filters): + self._update("inodegfid", data, filters) + + def inodegfid_get(self, filters={}): + return self._get("inodegfid", filters) + + def inodegfid_get_distinct(self, distinct_field, filters={}): + return self._get_distinct("inodegfid", distinct_field, filters) + + def inodegfid_exists(self, filters): + return self._exists("inodegfid", filters) + + def append_path1(self, path, inode): + # || is for concatenate in SQL + query = """UPDATE gfidpath SET path1 = path1 || ',' || ? + WHERE gfid IN (SELECT gfid FROM inodegfid WHERE inode = ?)""" + self.cursor.execute(query, (path, inode)) + + def gfidpath_set_path1(self, path1, pgfid1): + # || is for concatenate in SQL + if path1 == "": + update_str1 = "? || bn1" + update_str2 = "? || bn2" + else: + update_str1 = "? || '{0}' || bn1".format(self.path_sep) + update_str2 = "? || '{0}' || bn2".format(self.path_sep) + + query = """UPDATE gfidpath SET path1 = %s + WHERE pgfid1 = ?""" % update_str1 + self.cursor.execute(query, (path1, pgfid1)) + + # Set Path2 if pgfid1 and pgfid2 are same + query = """UPDATE gfidpath SET path2 = %s + WHERE pgfid2 = ?""" % update_str2 + self.cursor.execute(query, (path1, pgfid1)) + + def gfidpath_set_path2(self, path2, pgfid2): + # || is for concatenate in SQL + if path2 == "": + update_str = "? || bn2" + else: + update_str = "? || '{0}' || bn2".format(self.path_sep) + + query = """UPDATE gfidpath SET path2 = %s + WHERE pgfid2 = ?""" % update_str + self.cursor.execute(query, (path2, pgfid2)) + + def when_create_mknod_mkdir(self, changelogfile, data): + # E <GFID> <MKNOD|CREATE|MKDIR> <MODE> <USER> <GRP> <PGFID>/<BNAME> + # Add the Entry to DB + pgfid1, bn1 = data[6].split("/", 1) + + if self.args.no_encode: + bn1 = unquote_plus_space_newline(bn1).strip() + + self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1) + + def when_rename(self, changelogfile, data): + # E <GFID> RENAME <OLD_PGFID>/<BNAME> <PGFID>/<BNAME> + pgfid1, bn1 = data[3].split("/", 1) + pgfid2, bn2 = data[4].split("/", 1) + + if self.args.no_encode: + bn1 = unquote_plus_space_newline(bn1).strip() + bn2 = unquote_plus_space_newline(bn2).strip() + + if self.gfidpath_exists({"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}): + # If <OLD_PGFID>/<BNAME> is same as CREATE, Update + # <NEW_PGFID>/<BNAME> in NEW. + self.gfidpath_update({"pgfid1": pgfid2, "bn1": bn2}, + {"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}) + elif self.gfidpath_exists({"gfid": data[1], "type": "RENAME", + "pgfid2": pgfid1, "bn2": bn1}): + # If we are renaming file back to original name then just + # delete the entry since it will effectively be a no-op + if self.gfidpath_exists({"gfid": data[1], "type": "RENAME", + "pgfid2": pgfid1, "bn2": bn1, + "pgfid1": pgfid2, "bn1": bn2}): + self.gfidpath_delete({"gfid": data[1], "type": "RENAME", + "pgfid2": pgfid1, "bn2": bn1}) + else: + # If <OLD_PGFID>/<BNAME> is same as <PGFID2>/<BN2> + # (may be previous RENAME) + # then UPDATE <NEW_PGFID>/<BNAME> as <PGFID2>/<BN2> + self.gfidpath_update({"pgfid2": pgfid2, "bn2": bn2}, + {"gfid": data[1], "type": "RENAME", + "pgfid2": pgfid1, "bn2": bn1}) + else: + # Else insert as RENAME + self.gfidpath_add(changelogfile, RecordType.RENAME, data[1], + pgfid1, bn1, pgfid2, bn2) + + if self.gfidpath_exists({"gfid": data[1], "type": "MODIFY"}): + # If MODIFY exists already for that GFID, remove it and insert + # again so that MODIFY entry comes after RENAME entry + # Output will have MODIFY <NEWNAME> + self.gfidpath_delete({"gfid": data[1], "type": "MODIFY"}) + self.gfidpath_add(changelogfile, RecordType.MODIFY, data[1]) + + def when_link_symlink(self, changelogfile, data): + # E <GFID> <LINK|SYMLINK> <PGFID>/<BASENAME> + # Add as New record in Db as Type NEW + pgfid1, bn1 = data[3].split("/", 1) + if self.args.no_encode: + bn1 = unquote_plus_space_newline(bn1).strip() + + self.gfidpath_add(changelogfile, RecordType.NEW, data[1], pgfid1, bn1) + + def when_data_meta(self, changelogfile, data): + # If GFID row exists, Ignore else Add to Db + if not self.gfidpath_exists({"gfid": data[1], "type": "NEW"}) and \ + not self.gfidpath_exists({"gfid": data[1], "type": "MODIFY"}): + self.gfidpath_add(changelogfile, RecordType.MODIFY, data[1]) + + def when_unlink_rmdir(self, changelogfile, data): + # E <GFID> <UNLINK|RMDIR> <PGFID>/<BASENAME> + pgfid1, bn1 = data[3].split("/", 1) + + if self.args.no_encode: + bn1 = unquote_plus_space_newline(bn1).strip() + + deleted_path = data[4] if len(data) == 5 else "" + if deleted_path != "": + deleted_path = unquote_plus_space_newline(deleted_path) + deleted_path = output_path_prepare(deleted_path, self.args) + + if self.gfidpath_exists({"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}): + # If path exists in table as NEW with same GFID + # Delete that row + self.gfidpath_delete({"gfid": data[1], "type": "NEW", + "pgfid1": pgfid1, "bn1": bn1}) + else: + # Else Record as DELETE + self.gfidpath_add(changelogfile, RecordType.DELETE, data[1], + pgfid1, bn1, path1=deleted_path) + + # Update path1 as deleted_path if pgfid1 and bn1 is same as deleted + self.gfidpath_update({"path1": deleted_path}, {"gfid": data[1], + "pgfid1": pgfid1, + "bn1": bn1}) + + # Update path2 as deleted_path if pgfid2 and bn2 is same as deleted + self.gfidpath_update({"path2": deleted_path}, { + "type": RecordType.RENAME, + "gfid": data[1], + "pgfid2": pgfid1, + "bn2": bn1}) + + # If deleted directory is parent for somebody + query1 = """UPDATE gfidpath SET path1 = ? || '{0}' || bn1 + WHERE pgfid1 = ? AND path1 != ''""".format(self.path_sep) + self.cursor.execute(query1, (deleted_path, data[1])) + + query1 = """UPDATE gfidpath SET path2 = ? || '{0}' || bn1 + WHERE pgfid2 = ? AND path2 != ''""".format(self.path_sep) + self.cursor.execute(query1, (deleted_path, data[1])) + + def commit(self): + self.conn.commit() diff --git a/tools/glusterfind/src/conf.py b/tools/glusterfind/src/conf.py index 2c6eac2bb14..3849ba5dd1f 100644 --- a/tools/glusterfind/src/conf.py +++ b/tools/glusterfind/src/conf.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. @@ -9,9 +9,12 @@ # cases as published by the Free Software Foundation. import os -import ConfigParser +try: + from ConfigParser import ConfigParser +except ImportError: + from configparser import ConfigParser -config = ConfigParser.ConfigParser() +config = ConfigParser() config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), "tool.conf")) diff --git a/tools/glusterfind/src/gfind_py2py3.py b/tools/glusterfind/src/gfind_py2py3.py new file mode 100644 index 00000000000..87324fbf350 --- /dev/null +++ b/tools/glusterfind/src/gfind_py2py3.py @@ -0,0 +1,88 @@ +# +# Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +# All python2/python3 compatibility routines + +import os +import sys +from ctypes import create_string_buffer + +if sys.version_info >= (3,): + + # Raw conversion of bytearray to string. Used in the cases where + # buffer is created by create_string_buffer which is a 8-bit char + # array and passed to syscalls to fetch results. Using encode/decode + # doesn't work as it converts to string altering the size. + # def bytearray_to_str(byte_arr): + def bytearray_to_str(byte_arr): + return ''.join([chr(b) for b in byte_arr]) + + def gf_create_string_buffer(size): + return create_string_buffer(b'\0', size) + + def gfind_history_changelog(libgfc, changelog_path, start, end, num_parallel, + actual_end): + return libgfc.gf_history_changelog(changelog_path.encode(), start, end, num_parallel, + actual_end) + + def gfind_changelog_register(libgfc, brick, path, log_file, log_level, + retries): + return libgfc.gf_changelog_register(brick.encode(), path.encode(), log_file.encode(), + log_level, retries) + + def gfind_history_changelog_done(libgfc, clfile): + return libgfc.gf_history_changelog_done(clfile.encode()) + + def gfind_write_row(f, row, field_separator, p_rep, row_2_rep): + f.write(u"{0}{1}{2}{3}{4}\n".format(row, + field_separator, + p_rep, + field_separator, + row_2_rep)) + + def gfind_write(f, row, field_separator, p_rep): + f.write(u"{0}{1}{2}\n".format(row, + field_separator, + p_rep)) + + +else: + + # Raw conversion of bytearray to string + def bytearray_to_str(byte_arr): + return byte_arr + + def gf_create_string_buffer(size): + return create_string_buffer('\0', size) + + def gfind_history_changelog(libgfc, changelog_path, start, end, num_parallel, + actual_end): + return libgfc.gf_history_changelog(changelog_path, start, end, + num_parallel, actual_end) + + def gfind_changelog_register(libgfc, brick, path, log_file, log_level, + retries): + return libgfc.gf_changelog_register(brick, path, log_file, + log_level, retries) + + def gfind_history_changelog_done(libgfc, clfile): + return libgfc.gf_history_changelog_done(clfile) + + def gfind_write_row(f, row, field_separator, p_rep, row_2_rep): + f.write(u"{0}{1}{2}{3}{4}\n".format(row, + field_separator, + p_rep, + field_separator, + row_2_rep).encode()) + + def gfind_write(f, row, field_separator, p_rep): + f.write(u"{0}{1}{2}\n".format(row, + field_separator, + p_rep).encode()) diff --git a/tools/glusterfind/src/libgfchangelog.py b/tools/glusterfind/src/libgfchangelog.py index 44e8fd5a61a..513bb101e93 100644 --- a/tools/glusterfind/src/libgfchangelog.py +++ b/tools/glusterfind/src/libgfchangelog.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. @@ -9,51 +9,52 @@ # cases as published by the Free Software Foundation. import os -from ctypes import CDLL, get_errno, create_string_buffer, c_ulong, byref -from ctypes import RTLD_GLOBAL +from ctypes import CDLL, RTLD_GLOBAL, get_errno, create_string_buffer, c_ulong, byref from ctypes.util import find_library +from gfind_py2py3 import bytearray_to_str, gf_create_string_buffer +from gfind_py2py3 import gfind_history_changelog, gfind_changelog_register +from gfind_py2py3 import gfind_history_changelog_done class ChangelogException(OSError): pass +libgfc = CDLL(find_library("gfchangelog"), mode=RTLD_GLOBAL, use_errno=True) -libgfc = CDLL(find_library("gfchangelog"), use_errno=True, mode=RTLD_GLOBAL) - -def raise_oserr(): +def raise_oserr(prefix=None): errn = get_errno() - raise ChangelogException(errn, os.strerror(errn)) + prefix_or_empty = prefix + ": " if prefix else "" + raise ChangelogException(errn, prefix_or_empty + os.strerror(errn)) def cl_init(): ret = libgfc.gf_changelog_init(None) if ret == -1: - raise_oserr() + raise_oserr(prefix="gf_changelog_init") def cl_register(brick, path, log_file, log_level, retries=0): - ret = libgfc.gf_changelog_register(brick, path, log_file, - log_level, retries) + ret = gfind_changelog_register(libgfc, brick, path, log_file,log_level, retries) if ret == -1: - raise_oserr() + raise_oserr(prefix="gf_changelog_register") def cl_history_scan(): ret = libgfc.gf_history_changelog_scan() if ret == -1: - raise_oserr() + raise_oserr(prefix="gf_history_changelog_scan") return ret def cl_history_changelog(changelog_path, start, end, num_parallel): actual_end = c_ulong() - ret = libgfc.gf_history_changelog(changelog_path, start, end, + ret = gfind_history_changelog(libgfc,changelog_path, start, end, num_parallel, byref(actual_end)) if ret == -1: - raise_oserr() + raise_oserr(prefix="gf_history_changelog") return actual_end.value @@ -61,7 +62,7 @@ def cl_history_changelog(changelog_path, start, end, num_parallel): def cl_history_startfresh(): ret = libgfc.gf_history_changelog_start_fresh() if ret == -1: - raise_oserr() + raise_oserr(prefix="gf_history_changelog_start_fresh") def cl_history_getchanges(): @@ -70,20 +71,22 @@ def cl_history_getchanges(): return f.split('.')[-1] changes = [] - buf = create_string_buffer('\0', 4096) + buf = gf_create_string_buffer(4096) while True: ret = libgfc.gf_history_changelog_next_change(buf, 4096) if ret in (0, -1): break - changes.append(buf.raw[:ret - 1]) + # py2 and py3 compatibility + result = bytearray_to_str(buf.raw[:ret - 1]) + changes.append(result) if ret == -1: - raise_oserr() + raise_oserr(prefix="gf_history_changelog_next_change") return sorted(changes, key=clsort) def cl_history_done(clfile): - ret = libgfc.gf_history_changelog_done(clfile) + ret = gfind_history_changelog_done(libgfc, clfile) if ret == -1: - raise_oserr() + raise_oserr(prefix="gf_history_changelog_done") diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py index 17043dca213..4b5466d0114 100644 --- a/tools/glusterfind/src/main.py +++ b/tools/glusterfind/src/main.py @@ -1,4 +1,5 @@ -#!/usr/bin/env python +#!/usr/bin/python3 +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. @@ -9,19 +10,26 @@ # cases as published by the Free Software Foundation. import sys -from errno import ENOENT +from errno import ENOENT, ENOTEMPTY import time from multiprocessing import Process import os import xml.etree.cElementTree as etree from argparse import ArgumentParser, RawDescriptionHelpFormatter, Action +from gfind_py2py3 import gfind_write_row, gfind_write import logging import shutil +import tempfile +import signal +from datetime import datetime +import codecs +import re from utils import execute, is_host_local, mkdirp, fail -from utils import setup_logger, human_time +from utils import setup_logger, human_time, handle_rm_error +from utils import get_changelog_rollover_time, cache_output, create_file import conf - +from changelogdata import OutputMerger PROG_DESCRIPTION = """ GlusterFS Incremental API @@ -29,6 +37,9 @@ GlusterFS Incremental API ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError logger = logging.getLogger() +vol_statusStr = "" +gtmpfilename = None +g_pid_nodefile_map = {} class StoreAbsPath(Action): @@ -39,176 +50,272 @@ class StoreAbsPath(Action): setattr(namespace, self.dest, os.path.abspath(values)) -def node_run(volume, host, path, start, outfile, args, fallback=False): - """ - If host is local node, execute the command locally. If not local - execute the CHANGE_DETECTOR command via ssh and copy the output file from - remote node using scp. - """ - localdir = is_host_local(host) - - # If Full backup is requested or start time is zero, use brickfind - change_detector = conf.get_change_detector(args.change_detector) - if ((start == 0 or args.full) and args.change_detector == "changelog") or \ - fallback: - change_detector = conf.get_change_detector("brickfind") - - # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug - # --gfidpath <TYPE> - cmd = [change_detector, - args.session, - volume, - path, - outfile, - str(start), - "--output-prefix", - args.output_prefix] + \ - (["--debug"] if args.debug else []) + \ - (["--full"] if args.full else []) - - if not localdir: - # prefix with ssh command if not local node - cmd = ["ssh", - "-i", conf.get_opt("secret_pem"), - "root@%s" % host] + cmd - - rc, out, err = execute(cmd, logger=logger) - if rc == 2: - # Partial History Fallback - logger.info("%s %s Fallback to brickfind" % (host, err.strip())) - # Exit only from process, handled in main. - sys.exit(rc) - elif rc != 0: - fail("%s - Change detection failed" % host, logger=logger) - - if not localdir: - cmd_copy = ["scp", - "-i", conf.get_opt("secret_pem"), - "root@%s:/%s" % (host, outfile), - os.path.dirname(outfile)] - execute(cmd_copy, exit_msg="%s - Copy command failed" % host, - logger=logger) - - -def node_cleanup(host, args): - localdir = is_host_local(host) - - # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug - # --gfidpath <TYPE> - cmd = [conf.get_opt("nodecleanup"), - args.session, - args.volume] + (["--debug"] if args.debug else []) +def get_pem_key_path(session, volume): + return os.path.join(conf.get_opt("session_dir"), + session, + volume, + "%s_%s_secret.pem" % (session, volume)) - if not localdir: - # prefix with ssh command if not local node - cmd = ["ssh", - "-i", conf.get_opt("secret_pem"), - "root@%s" % host] + cmd - execute(cmd, exit_msg="%s - Cleanup failed" % host, logger=logger) +def node_cmd(host, host_uuid, task, cmd, args, opts): + """ + Runs command via ssh if host is not local + """ + try: + localdir = is_host_local(host_uuid) + # this is so to avoid deleting the ssh keys on local node which + # otherwise cause ssh password prompts on the console (race conditions) + # mode_delete() should be cleaning up the session tree + if localdir and task == "delete": + return -def cleanup(nodes, args): + pem_key_path = get_pem_key_path(args.session, args.volume) + + if not localdir: + # prefix with ssh command if not local node + cmd = ["ssh", + "-oNumberOfPasswordPrompts=0", + "-oStrictHostKeyChecking=no", + # We force TTY allocation (-t -t) so that Ctrl+C is handed + # through; see: + # https://bugzilla.redhat.com/show_bug.cgi?id=1382236 + # Note that this turns stderr of the remote `cmd` + # into stdout locally. + "-t", + "-t", + "-i", pem_key_path, + "root@%s" % host] + cmd + + (returncode, err, out) = execute(cmd, logger=logger) + if returncode != 0: + # Because the `-t -t` above turns the remote stderr into + # local stdout, we need to log both stderr and stdout + # here to print all error messages. + fail("%s - %s failed; stdout (including remote stderr):\n" + "%s\n" + "stderr:\n" + "%s" % (host, task, out, err), + returncode, + logger=logger) + + if opts.get("copy_outfile", False) and not localdir: + cmd_copy = ["scp", + "-oNumberOfPasswordPrompts=0", + "-oStrictHostKeyChecking=no", + "-i", pem_key_path, + "root@%s:/%s" % (host, opts.get("node_outfile")), + os.path.dirname(opts.get("node_outfile"))] + execute(cmd_copy, exit_msg="%s - Copy command failed" % host, + logger=logger) + except KeyboardInterrupt: + sys.exit(2) + + +def run_cmd_nodes(task, args, **kwargs): + global g_pid_nodefile_map + nodes = get_nodes(args.volume) pool = [] for num, node in enumerate(nodes): host, brick = node[1].split(":") - # temp output file - node_outfile = os.path.join(conf.get_opt("working_dir"), - args.session, - args.volume, - "tmp_output_%s.txt" % num) - - try: - os.remove(node_outfile) - except (OSError, IOError): - # TODO: Cleanup Failure, Handle - pass - - p = Process(target=node_cleanup, - args=(host, args)) - p.start() - pool.append(p) - - exit_codes = 0 - for p in pool: - p.join() - exit_codes += (0 if p.exitcode == 0 else 1) + host_uuid = node[0] + cmd = [] + opts = {} - if exit_codes != 0: - sys.exit(1) + # tmpfilename is valid only for tasks: pre, query and cleanup + tmpfilename = kwargs.get("tmpfilename", "BADNAME") - -def failback_node_run(brick_details, idx, volume, start, outfile, args): - host, brick = brick_details.split(":") - p = Process(target=node_run, - args=(volume, host, brick, start, outfile, args, True)) - p.start() - p.join() - return p.exitcode - - -def run_in_nodes(volume, start, args): - """ - Get nodes of volume using gluster volume info, spawn a process - each for a Node. Merge the output files once all the process - complete their tasks. - """ - nodes = get_nodes(volume) - pool = [] - node_outfiles = [] - for num, node in enumerate(nodes): - host, brick = node[1].split(":") - # temp output file node_outfile = os.path.join(conf.get_opt("working_dir"), - args.session, - volume, - "tmp_output_%s.txt" % num) - node_outfiles.append(node_outfile) - p = Process(target=node_run, args=(volume, host, brick, start, - node_outfile, args)) - p.start() - pool.append(p) - - exit_codes = 0 - for idx, p in enumerate(pool): + args.session, args.volume, + tmpfilename, + "tmp_output_%s" % num) + + if task == "pre": + if vol_statusStr != "Started": + fail("Volume %s is not online" % args.volume, + logger=logger) + + # If Full backup is requested or start time is zero, use brickfind + change_detector = conf.get_change_detector("changelog") + tag = None + if args.full: + change_detector = conf.get_change_detector("brickfind") + tag = args.tag_for_full_find.strip() + if tag == "": + tag = '""' if not is_host_local(host_uuid) else "" + + # remote file will be copied into this directory + mkdirp(os.path.dirname(node_outfile), + exit_on_err=True, logger=logger) + + FS = args.field_separator + if not is_host_local(host_uuid): + FS = "'" + FS + "'" + + cmd = [change_detector, + args.session, + args.volume, + host, + brick, + node_outfile] + \ + ([str(kwargs.get("start")), str(kwargs.get("end"))] + if not args.full else []) + \ + ([tag] if tag is not None else []) + \ + ["--output-prefix", args.output_prefix] + \ + (["--debug"] if args.debug else []) + \ + (["--no-encode"] if args.no_encode else []) + \ + (["--only-namespace-changes"] if args.only_namespace_changes + else []) + \ + (["--type", args.type]) + \ + (["--field-separator", FS] if args.full else []) + + opts["node_outfile"] = node_outfile + opts["copy_outfile"] = True + elif task == "query": + # If Full backup is requested or start time is zero, use brickfind + tag = None + change_detector = conf.get_change_detector("changelog") + if args.full: + change_detector = conf.get_change_detector("brickfind") + tag = args.tag_for_full_find.strip() + if tag == "": + tag = '""' if not is_host_local(host_uuid) else "" + + # remote file will be copied into this directory + mkdirp(os.path.dirname(node_outfile), + exit_on_err=True, logger=logger) + + FS = args.field_separator + if not is_host_local(host_uuid): + FS = "'" + FS + "'" + + cmd = [change_detector, + args.session, + args.volume, + host, + brick, + node_outfile] + \ + ([str(kwargs.get("start")), str(kwargs.get("end"))] + if not args.full else []) + \ + ([tag] if tag is not None else []) + \ + ["--only-query"] + \ + ["--output-prefix", args.output_prefix] + \ + (["--debug"] if args.debug else []) + \ + (["--no-encode"] if args.no_encode else []) + \ + (["--only-namespace-changes"] + if args.only_namespace_changes else []) + \ + (["--type", args.type]) + \ + (["--field-separator", FS] if args.full else []) + + opts["node_outfile"] = node_outfile + opts["copy_outfile"] = True + elif task == "cleanup": + # After pre/query run, cleanup the working directory and other + # temp files. Remove the directory to which node_outfile has + # been copied in main node + try: + os.remove(node_outfile) + except (OSError, IOError): + logger.warn("Failed to cleanup temporary file %s" % + node_outfile) + pass + + cmd = [conf.get_opt("nodeagent"), + "cleanup", + args.session, + args.volume, + os.path.dirname(node_outfile)] + \ + (["--debug"] if args.debug else []) + elif task == "create": + if vol_statusStr != "Started": + fail("Volume %s is not online" % args.volume, + logger=logger) + + # When glusterfind create, create session directory in + # each brick nodes + cmd = [conf.get_opt("nodeagent"), + "create", + args.session, + args.volume, + brick, + kwargs.get("time_to_update")] + \ + (["--debug"] if args.debug else []) + \ + (["--reset-session-time"] if args.reset_session_time + else []) + elif task == "post": + # Rename pre status file to actual status file in each node + cmd = [conf.get_opt("nodeagent"), + "post", + args.session, + args.volume, + brick] + \ + (["--debug"] if args.debug else []) + elif task == "delete": + # When glusterfind delete, cleanup all the session files/dirs + # from each node. + cmd = [conf.get_opt("nodeagent"), + "delete", + args.session, + args.volume] + \ + (["--debug"] if args.debug else []) + + if cmd: + p = Process(target=node_cmd, + args=(host, host_uuid, task, cmd, args, opts)) + p.start() + pool.append(p) + g_pid_nodefile_map[p.pid] = node_outfile + + for num, p in enumerate(pool): p.join() - # Handle the Changelog failure, fallback to Brickfind - if p.exitcode == 2: - rc = failback_node_run(nodes[idx][1], idx, volume, start, - node_outfiles[idx], args) - exit_codes += (0 if rc == 0 else 1) - elif p.exitcode != 0: - exit_codes += (0 if p.exitcode == 0 else 1) - - if exit_codes != 0: - sys.exit(1) - - # Merge all output files - cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile] - execute(cmd, - exit_msg="Failed to merge output files " - "collected from nodes", logger=logger) - - cleanup(nodes, args) + if p.exitcode != 0: + logger.warn("Command %s failed in %s" % (task, nodes[num][1])) + if task in ["create", "delete"]: + fail("Command %s failed in %s" % (task, nodes[num][1])) + elif task == "pre" or task == "query": + if args.disable_partial: + sys.exit(1) + else: + del g_pid_nodefile_map[p.pid] +@cache_output def get_nodes(volume): """ Get the gluster volume info xml output and parse to get the brick details. """ + global vol_statusStr + cmd = ["gluster", 'volume', 'info', volume, "--xml"] _, data, _ = execute(cmd, exit_msg="Failed to Run Gluster Volume Info", logger=logger) tree = etree.fromstring(data) + # Test to check if volume has been deleted after session creation + count_el = tree.find('volInfo/volumes/count') + if int(count_el.text) == 0: + fail("Unable to get volume details", logger=logger) + + # this status is used in caller: run_cmd_nodes + vol_statusStr = tree.find('volInfo/volumes/volume/statusStr').text + vol_typeStr = tree.find('volInfo/volumes/volume/typeStr').text + nodes = [] volume_el = tree.find('volInfo/volumes/volume') try: - for b in volume_el.findall('bricks/brick'): - nodes.append((b.find('hostUuid').text, - b.find('name').text)) + brick_elems = [] + if vol_typeStr == "Tier": + brick_elems.append('bricks/hotBricks/brick') + brick_elems.append('bricks/coldBricks/brick') + else: + brick_elems.append('bricks/brick') + + for elem in brick_elems: + for b in volume_el.findall(elem): + nodes.append((b.find('hostUuid').text, + b.find('name').text)) except (ParseError, AttributeError, ValueError) as e: fail("Failed to parse Volume Info: %s" % e, logger=logger) @@ -219,6 +326,7 @@ def _get_args(): parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, description=PROG_DESCRIPTION) subparsers = parser.add_subparsers(dest="mode") + subparsers.required = True # create <SESSION> <VOLUME> [--debug] [--force] parser_create = subparsers.add_parser('create') @@ -227,6 +335,9 @@ def _get_args(): parser_create.add_argument("--debug", help="Debug", action="store_true") parser_create.add_argument("--force", help="Force option to recreate " "the session", action="store_true") + parser_create.add_argument("--reset-session-time", + help="Reset Session Time to Current Time", + action="store_true") # delete <SESSION> <VOLUME> [--debug] parser_delete = subparsers.add_parser('delete') @@ -240,20 +351,70 @@ def _get_args(): parser_list.add_argument("--volume", help="Volume Name", default="") parser_list.add_argument("--debug", help="Debug", action="store_true") - # pre <SESSION> <VOLUME> <OUTFILE> [--change-detector <CHANGE_DETECTOR>] + # pre <SESSION> <VOLUME> <OUTFILE> # [--output-prefix <OUTPUT_PREFIX>] [--full] parser_pre = subparsers.add_parser('pre') parser_pre.add_argument("session", help="Session Name") parser_pre.add_argument("volume", help="Volume Name") parser_pre.add_argument("outfile", help="Output File", action=StoreAbsPath) parser_pre.add_argument("--debug", help="Debug", action="store_true") + parser_pre.add_argument("--no-encode", + help="Do not encode path in output file", + action="store_true") parser_pre.add_argument("--full", help="Full find", action="store_true") - parser_pre.add_argument("--change-detector", dest="change_detector", - help="Change detection", - choices=conf.list_change_detectors(), - type=str, default='changelog') + parser_pre.add_argument("--disable-partial", help="Disable Partial find, " + "Fail when one node fails", action="store_true") parser_pre.add_argument("--output-prefix", help="File prefix in output", default=".") + parser_pre.add_argument("--regenerate-outfile", + help="Regenerate outfile, discard the outfile " + "generated from last pre command", + action="store_true") + parser_pre.add_argument("-N", "--only-namespace-changes", + help="List only namespace changes", + action="store_true") + parser_pre.add_argument("--tag-for-full-find", + help="Tag prefix for file names emitted during" + " a full find operation; default: \"NEW\"", + default="NEW") + parser_pre.add_argument('--type', help="type: f, f-files only" + " d, d-directories only, by default = both", + default='both', choices=["f", "d", "both"]) + parser_pre.add_argument("--field-separator", help="Field separator string", + default=" ") + + # query <VOLUME> <OUTFILE> --since-time <SINCE_TIME> + # [--output-prefix <OUTPUT_PREFIX>] [--full] + parser_query = subparsers.add_parser('query') + parser_query.add_argument("volume", help="Volume Name") + parser_query.add_argument("outfile", help="Output File", + action=StoreAbsPath) + parser_query.add_argument("--since-time", help="UNIX epoch time since " + "which listing is required", type=int) + parser_query.add_argument("--end-time", help="UNIX epoch time up to " + "which listing is required", type=int) + parser_query.add_argument("--no-encode", + help="Do not encode path in output file", + action="store_true") + parser_query.add_argument("--full", help="Full find", action="store_true") + parser_query.add_argument("--debug", help="Debug", action="store_true") + parser_query.add_argument("--disable-partial", help="Disable Partial find," + " Fail when one node fails", action="store_true") + parser_query.add_argument("--output-prefix", help="File prefix in output", + default=".") + parser_query.add_argument("-N", "--only-namespace-changes", + help="List only namespace changes", + action="store_true") + parser_query.add_argument("--tag-for-full-find", + help="Tag prefix for file names emitted during" + " a full find operation; default: \"NEW\"", + default="NEW") + parser_query.add_argument('--type', help="type: f, f-files only" + " d, d-directories only, by default = both", + default='both', choices=["f", "d", "both"]) + parser_query.add_argument("--field-separator", + help="Field separator string", + default=" ") # post <SESSION> <VOLUME> parser_post = subparsers.add_parser('post') @@ -264,27 +425,41 @@ def _get_args(): return parser.parse_args() -def ssh_setup(): - if not os.path.exists(conf.get_opt("secret_pem")): +def ssh_setup(args): + pem_key_path = get_pem_key_path(args.session, args.volume) + + if not os.path.exists(pem_key_path): # Generate ssh-key cmd = ["ssh-keygen", "-N", "", "-f", - conf.get_opt("secret_pem")] + pem_key_path] execute(cmd, exit_msg="Unable to generate ssh key %s" - % conf.get_opt("secret_pem"), + % pem_key_path, logger=logger) - logger.info("Ssh key generated %s" % conf.get_opt("secret_pem")) + logger.info("Ssh key generated %s" % pem_key_path) + + try: + shutil.copyfile(pem_key_path + ".pub", + os.path.join(conf.get_opt("session_dir"), + ".keys", + "%s_%s_secret.pem.pub" % (args.session, + args.volume))) + except (IOError, OSError) as e: + fail("Failed to copy public key to %s: %s" + % (os.path.join(conf.get_opt("session_dir"), ".keys"), e), + logger=logger) # Copy pub file to all nodes cmd = ["gluster", "system::", "copy", "file", - "/" + os.path.basename(conf.get_opt("secret_pem")) + ".pub"] + "/glusterfind/.keys/%s.pub" % os.path.basename(pem_key_path)] + execute(cmd, exit_msg="Failed to distribute ssh keys", logger=logger) logger.info("Distributed ssh key to all nodes of Volume") @@ -295,7 +470,7 @@ def ssh_setup(): "execute", "add_secret_pub", "root", - os.path.basename(conf.get_opt("secret_pem")) + ".pub"] + "/glusterfind/.keys/%s.pub" % os.path.basename(pem_key_path)] execute(cmd, exit_msg="Failed to add ssh keys to authorized_keys file", logger=logger) @@ -303,14 +478,94 @@ def ssh_setup(): logger.info("Ssh key added to authorized_keys of Volume nodes") +def enable_volume_options(args): + execute(["gluster", "volume", "set", + args.volume, "build-pgfid", "on"], + exit_msg="Failed to set volume option build-pgfid on", + logger=logger) + logger.info("Volume option set %s, build-pgfid on" % args.volume) + + execute(["gluster", "volume", "set", + args.volume, "changelog.changelog", "on"], + exit_msg="Failed to set volume option " + "changelog.changelog on", logger=logger) + logger.info("Volume option set %s, changelog.changelog on" + % args.volume) + + execute(["gluster", "volume", "set", + args.volume, "changelog.capture-del-path", "on"], + exit_msg="Failed to set volume option " + "changelog.capture-del-path on", logger=logger) + logger.info("Volume option set %s, changelog.capture-del-path on" + % args.volume) + + +def write_output(outfile, outfilemerger, field_separator): + with codecs.open(outfile, "a", encoding="utf-8") as f: + for row in outfilemerger.get(): + # Multiple paths in case of Hardlinks + paths = row[1].split(",") + row_2_rep = None + for p in paths: + if p == "": + continue + p_rep = p.replace("//", "/") + if not row_2_rep: + row_2_rep = row[2].replace("//", "/") + if p_rep == row_2_rep: + continue + + if row_2_rep and row_2_rep != "": + gfind_write_row(f, row[0], field_separator, p_rep, row_2_rep) + + else: + gfind_write(f, row[0], field_separator, p_rep) + +def validate_volume(volume): + cmd = ["gluster", 'volume', 'info', volume, "--xml"] + _, data, _ = execute(cmd, + exit_msg="Failed to Run Gluster Volume Info", + logger=logger) + try: + tree = etree.fromstring(data) + statusStr = tree.find('volInfo/volumes/volume/statusStr').text + except (ParseError, AttributeError) as e: + fail("Invalid Volume: Check the Volume name! %s" % e) + if statusStr != "Started": + fail("Volume %s is not online" % volume) + +# The rules for a valid session name. +SESSION_NAME_RULES = { + 'min_length': 2, + 'max_length': 256, # same as maximum volume length + # Specifies all alphanumeric characters, underscore, hyphen. + 'valid_chars': r'0-9a-zA-Z_-', +} + + +# checks valid session name, fail otherwise +def validate_session_name(session): + # Check for minimum length + if len(session) < SESSION_NAME_RULES['min_length']: + fail('session_name must be at least ' + + str(SESSION_NAME_RULES['min_length']) + ' characters long.') + # Check for maximum length + if len(session) > SESSION_NAME_RULES['max_length']: + fail('session_name must not exceed ' + + str(SESSION_NAME_RULES['max_length']) + ' characters length.') + + # Matches strings composed entirely of characters specified within + if not re.match(r'^[' + SESSION_NAME_RULES['valid_chars'] + + ']+$', session): + fail('Session name can only contain these characters: ' + + SESSION_NAME_RULES['valid_chars']) + + def mode_create(session_dir, args): + validate_session_name(args.session) + logger.debug("Init is called - Session: %s, Volume: %s" % (args.session, args.volume)) - - execute(["gluster", "volume", "info", args.volume], - exit_msg="Unable to get volume details", - logger=logger) - mkdirp(session_dir, exit_on_err=True, logger=logger) mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, logger=logger) @@ -320,43 +575,140 @@ def mode_create(session_dir, args): fail("Session %s already created" % args.session, logger=logger) if not os.path.exists(status_file) or args.force: - ssh_setup() + ssh_setup(args) + enable_volume_options(args) - execute(["gluster", "volume", "set", - args.volume, "build-pgfid", "on"], - exit_msg="Failed to set volume option build-pgfid on", - logger=logger) - logger.info("Volume option set %s, build-pgfid on" % args.volume) - - execute(["gluster", "volume", "set", - args.volume, "changelog.changelog", "on"], - exit_msg="Failed to set volume option " - "changelog.changelog on", logger=logger) - logger.info("Volume option set %s, changelog.changelog on" - % args.volume) - - if not os.path.exists(status_file): - with open(status_file, "w", buffering=0) as f: - # Add Rollover time to current time to make sure changelogs - # will be available if we use this time as start time - time_to_update = int(time.time()) + int( - conf.get_opt("changelog_rollover_time")) + # Add Rollover time to current time to make sure changelogs + # will be available if we use this time as start time + time_to_update = int(time.time()) + get_changelog_rollover_time( + args.volume) + + run_cmd_nodes("create", args, time_to_update=str(time_to_update)) + + if not os.path.exists(status_file) or args.reset_session_time: + with open(status_file, "w") as f: f.write(str(time_to_update)) + sys.stdout.write("Session %s created with volume %s\n" % + (args.session, args.volume)) + sys.exit(0) +def mode_query(session_dir, args): + global gtmpfilename + global g_pid_nodefile_map + + # Verify volume status + cmd = ["gluster", 'volume', 'info', args.volume, "--xml"] + _, data, _ = execute(cmd, + exit_msg="Failed to Run Gluster Volume Info", + logger=logger) + try: + tree = etree.fromstring(data) + statusStr = tree.find('volInfo/volumes/volume/statusStr').text + except (ParseError, AttributeError) as e: + fail("Invalid Volume: %s" % e, logger=logger) + + if statusStr != "Started": + fail("Volume %s is not online" % args.volume, logger=logger) + + mkdirp(session_dir, exit_on_err=True, logger=logger) + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger) + + # Configure cluster for pasword-less SSH + ssh_setup(args) + + # Enable volume options for changelog capture + enable_volume_options(args) + + # Test options + if not args.full and args.type in ["f", "d"]: + fail("--type can only be used with --full") + if not args.since_time and not args.end_time and not args.full: + fail("Please specify either {--since-time and optionally --end-time} " + "or --full", logger=logger) + + if args.since_time and args.end_time and args.full: + fail("Please specify either {--since-time and optionally --end-time} " + "or --full, but not both", + logger=logger) + + if args.end_time and not args.since_time: + fail("Please specify --since-time as well", logger=logger) + + # Start query command processing + start = -1 + end = -1 + if args.since_time: + start = args.since_time + if args.end_time: + end = args.end_time + else: + start = 0 # --full option is handled separately + + logger.debug("Query is called - Session: %s, Volume: %s, " + "Start time: %s, End time: %s" + % ("default", args.volume, start, end)) + + prefix = datetime.now().strftime("%Y%m%d-%H%M%S-%f-") + gtmpfilename = prefix + next(tempfile._get_candidate_names()) + + run_cmd_nodes("query", args, start=start, end=end, + tmpfilename=gtmpfilename) + + # Merger + if args.full: + if len(g_pid_nodefile_map) > 0: + cmd = ["sort", "-u"] + list(g_pid_nodefile_map.values()) + \ + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + else: + fail("Failed to collect any output files from peers. " + "Looks like all bricks are offline.", logger=logger) + else: + # Read each Changelogs db and generate finaldb + create_file(args.outfile, exit_on_err=True, logger=logger) + outfilemerger = OutputMerger(args.outfile + ".db", + list(g_pid_nodefile_map.values())) + write_output(args.outfile, outfilemerger, args.field_separator) + + try: + os.remove(args.outfile + ".db") + except (IOError, OSError): + pass + + run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename) + + sys.stdout.write("Generated output file %s\n" % args.outfile) + + def mode_pre(session_dir, args): + global gtmpfilename + global g_pid_nodefile_map + """ Read from Session file and write to session.pre file """ - endtime_to_update = int(time.time()) - int( - conf.get_opt("changelog_rollover_time")) + endtime_to_update = int(time.time()) - get_changelog_rollover_time( + args.volume) status_file = os.path.join(session_dir, args.volume, "status") status_file_pre = status_file + ".pre" mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger) + if not args.full and args.type in ["f", "d"]: + fail("--type can only be used with --full") + + # If Pre status file exists and running pre command again + if os.path.exists(status_file_pre) and not args.regenerate_outfile: + fail("Post command is not run after last pre, " + "use --regenerate-outfile") + start = 0 try: with open(status_file) as f: @@ -371,9 +723,37 @@ def mode_pre(session_dir, args): "Start time: %s, End time: %s" % (args.session, args.volume, start, endtime_to_update)) - run_in_nodes(args.volume, start, args) + prefix = datetime.now().strftime("%Y%m%d-%H%M%S-%f-") + gtmpfilename = prefix + next(tempfile._get_candidate_names()) + + run_cmd_nodes("pre", args, start=start, end=-1, tmpfilename=gtmpfilename) + + # Merger + if args.full: + if len(g_pid_nodefile_map) > 0: + cmd = ["sort", "-u"] + list(g_pid_nodefile_map.values()) + \ + ["-o", args.outfile] + execute(cmd, + exit_msg="Failed to merge output files " + "collected from nodes", logger=logger) + else: + fail("Failed to collect any output files from peers. " + "Looks like all bricks are offline.", logger=logger) + else: + # Read each Changelogs db and generate finaldb + create_file(args.outfile, exit_on_err=True, logger=logger) + outfilemerger = OutputMerger(args.outfile + ".db", + list(g_pid_nodefile_map.values())) + write_output(args.outfile, outfilemerger, args.field_separator) + + try: + os.remove(args.outfile + ".db") + except (IOError, OSError): + pass - with open(status_file_pre, "w", buffering=0) as f: + run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename) + + with open(status_file_pre, "w") as f: f.write(str(endtime_to_update)) sys.stdout.write("Generated output file %s\n" % args.outfile) @@ -390,21 +770,30 @@ def mode_post(session_dir, args): status_file_pre = status_file + ".pre" if os.path.exists(status_file_pre): + run_cmd_nodes("post", args) os.rename(status_file_pre, status_file) + sys.stdout.write("Session %s with volume %s updated\n" % + (args.session, args.volume)) sys.exit(0) else: fail("Pre script is not run", logger=logger) def mode_delete(session_dir, args): - def handle_rm_error(func, path, exc_info): - if exc_info[1].errno == ENOENT: - return - - raise exc_info[1] - + run_cmd_nodes("delete", args) shutil.rmtree(os.path.join(session_dir, args.volume), onerror=handle_rm_error) + sys.stdout.write("Session %s with volume %s deleted\n" % + (args.session, args.volume)) + + # If the session contains only this volume, then cleanup the + # session directory. If a session contains multiple volumes + # then os.rmdir will fail with ENOTEMPTY + try: + os.rmdir(session_dir) + except OSError as e: + if not e.errno == ENOTEMPTY: + logger.warn("Failed to delete session directory: %s" % e) def mode_list(session_dir, args): @@ -419,7 +808,8 @@ def mode_list(session_dir, args): else: sessions = [] for d in os.listdir(session_dir): - sessions.append(d) + if d != ".keys": + sessions.append(d) output = [] for session in sessions: @@ -437,7 +827,7 @@ def mode_list(session_dir, args): last_processed = f.read().strip() except (OSError, IOError) as e: if e.errno == ENOENT: - pass + continue else: raise output.append((session, volname, last_processed)) @@ -449,35 +839,83 @@ def mode_list(session_dir, args): sys.stdout.write("-"*75) sys.stdout.write("\n") for session, volname, last_processed in output: + sess_time = 'Session Corrupted' + if last_processed: + try: + sess_time = human_time(last_processed) + except TypeError: + sess_time = 'Session Corrupted' sys.stdout.write("%s %s %s\n" % (session.ljust(25), volname.ljust(25), - human_time(last_processed).ljust(25))) + sess_time.ljust(25))) - if not output and (args.session or args.volume): - fail("Invalid Session", logger=logger) + if not output: + if args.session or args.volume: + fail("Invalid Session", logger=logger) + else: + sys.stdout.write("No sessions found.\n") def main(): - args = _get_args() - mkdirp(conf.get_opt("session_dir"), exit_on_err=True) + global gtmpfilename - if args.mode == "list": - session_dir = conf.get_opt("session_dir") - else: - session_dir = os.path.join(conf.get_opt("session_dir"), - args.session) + args = None - if not os.path.exists(session_dir) and args.mode not in ["create", "list"]: - fail("Invalid session %s" % args.session) + try: + args = _get_args() + mkdirp(conf.get_opt("session_dir"), exit_on_err=True) - mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), - exit_on_err=True) - log_file = os.path.join(conf.get_opt("log_dir"), - args.session, - args.volume, - "cli.log") - setup_logger(logger, log_file, args.debug) + # force the default session name if mode is "query" + if args.mode == "query": + args.session = "default" + + if args.mode == "list": + session_dir = conf.get_opt("session_dir") + else: + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) - # globals() will have all the functions already defined. - # mode_<args.mode> will be the function name to be called - globals()["mode_" + args.mode](session_dir, args) + if not os.path.exists(session_dir) and \ + args.mode not in ["create", "list", "query"]: + fail("Invalid session %s" % args.session) + + # volume involved, validate the volume first + if args.mode not in ["list"]: + validate_volume(args.volume) + + + # "default" is a system defined session name + if args.mode in ["create", "post", "pre", "delete"] and \ + args.session == "default": + fail("Invalid session %s" % args.session) + + vol_dir = os.path.join(session_dir, args.volume) + if not os.path.exists(vol_dir) and args.mode not in \ + ["create", "list", "query"]: + fail("Session %s not created with volume %s" % + (args.session, args.volume)) + + mkdirp(os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "cli.log") + setup_logger(logger, log_file, args.debug) + + # globals() will have all the functions already defined. + # mode_<args.mode> will be the function name to be called + globals()["mode_" + args.mode](session_dir, args) + except KeyboardInterrupt: + if args is not None: + if args.mode == "pre" or args.mode == "query": + # cleanup session + if gtmpfilename is not None: + # no more interrupts until we clean up + signal.signal(signal.SIGINT, signal.SIG_IGN) + run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename) + + # Interrupted, exit with non zero error code + sys.exit(2) diff --git a/tools/glusterfind/src/nodeagent.py b/tools/glusterfind/src/nodeagent.py new file mode 100644 index 00000000000..679daa6fa76 --- /dev/null +++ b/tools/glusterfind/src/nodeagent.py @@ -0,0 +1,139 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. + +import shutil +import sys +import os +import logging +from argparse import ArgumentParser, RawDescriptionHelpFormatter +try: + import urllib.parse as urllib +except ImportError: + import urllib +from errno import ENOTEMPTY + +from utils import setup_logger, mkdirp, handle_rm_error +import conf + +logger = logging.getLogger() + + +def mode_cleanup(args): + working_dir = os.path.join(conf.get_opt("working_dir"), + args.session, + args.volume, + args.tmpfilename) + + mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume), + exit_on_err=True) + log_file = os.path.join(conf.get_opt("log_dir"), + args.session, + args.volume, + "changelog.log") + + setup_logger(logger, log_file) + + try: + shutil.rmtree(working_dir, onerror=handle_rm_error) + except (OSError, IOError) as e: + logger.error("Failed to delete working directory: %s" % e) + sys.exit(1) + + +def mode_create(args): + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + + if not os.path.exists(status_file) or args.reset_session_time: + with open(status_file, "w") as f: + f.write(args.time_to_update) + + sys.exit(0) + + +def mode_post(args): + session_dir = os.path.join(conf.get_opt("session_dir"), args.session) + status_file = os.path.join(session_dir, args.volume, + "%s.status" % urllib.quote_plus(args.brick)) + + mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True, + logger=logger) + status_file_pre = status_file + ".pre" + + if os.path.exists(status_file_pre): + os.rename(status_file_pre, status_file) + sys.exit(0) + + +def mode_delete(args): + session_dir = os.path.join(conf.get_opt("session_dir"), + args.session) + shutil.rmtree(os.path.join(session_dir, args.volume), + onerror=handle_rm_error) + + # If the session contains only this volume, then cleanup the + # session directory. If a session contains multiple volumes + # then os.rmdir will fail with ENOTEMPTY + try: + os.rmdir(session_dir) + except OSError as e: + if not e.errno == ENOTEMPTY: + logger.warn("Failed to delete session directory: %s" % e) + + +def _get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description="Node Agent") + subparsers = parser.add_subparsers(dest="mode") + + parser_cleanup = subparsers.add_parser('cleanup') + parser_cleanup.add_argument("session", help="Session Name") + parser_cleanup.add_argument("volume", help="Volume Name") + parser_cleanup.add_argument("tmpfilename", help="Temporary File Name") + parser_cleanup.add_argument("--debug", help="Debug", action="store_true") + + parser_session_create = subparsers.add_parser('create') + parser_session_create.add_argument("session", help="Session Name") + parser_session_create.add_argument("volume", help="Volume Name") + parser_session_create.add_argument("brick", help="Brick Path") + parser_session_create.add_argument("time_to_update", help="Time to Update") + parser_session_create.add_argument("--reset-session-time", + help="Reset Session Time", + action="store_true") + parser_session_create.add_argument("--debug", help="Debug", + action="store_true") + + parser_post = subparsers.add_parser('post') + parser_post.add_argument("session", help="Session Name") + parser_post.add_argument("volume", help="Volume Name") + parser_post.add_argument("brick", help="Brick Path") + parser_post.add_argument("--debug", help="Debug", + action="store_true") + + parser_delete = subparsers.add_parser('delete') + parser_delete.add_argument("session", help="Session Name") + parser_delete.add_argument("volume", help="Volume Name") + parser_delete.add_argument("--debug", help="Debug", + action="store_true") + return parser.parse_args() + + +if __name__ == "__main__": + args = _get_args() + + # globals() will have all the functions already defined. + # mode_<args.mode> will be the function name to be called + globals()["mode_" + args.mode](args) diff --git a/tools/glusterfind/src/nodecleanup.py b/tools/glusterfind/src/nodecleanup.py deleted file mode 100644 index a31d4d83acd..00000000000 --- a/tools/glusterfind/src/nodecleanup.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> -# This file is part of GlusterFS. -# -# This file is licensed to you under your choice of the GNU Lesser -# General Public License, version 3 or any later version (LGPLv3 or -# later), or the GNU General Public License, version 2 (GPLv2), in all -# cases as published by the Free Software Foundation. - -import shutil -import sys -import os -import logging -from errno import ENOENT - -from utils import setup_logger, mkdirp -import conf - -logger = logging.getLogger() - - -if __name__ == "__main__": - # Args: <SESSION> <VOLUME> - session = sys.argv[1] - volume = sys.argv[2] - - working_dir = os.path.join(conf.get_opt("working_dir"), - session, - volume) - - mkdirp(os.path.join(conf.get_opt("log_dir"), session, volume), - exit_on_err=True) - log_file = os.path.join(conf.get_opt("log_dir"), - session, - volume, - "changelog.log") - - setup_logger(logger, log_file) - - try: - def handle_rm_error(func, path, exc_info): - if exc_info[1].errno == ENOENT: - return - - raise exc_info[1] - - shutil.rmtree(working_dir, onerror=handle_rm_error) - except (OSError, IOError) as e: - logger.error("Failed to delete working directory: %s" % e) - sys.exit(1) diff --git a/tools/glusterfind/src/tool.conf.in b/tools/glusterfind/src/tool.conf.in index 48ecdda06cc..a80f4a784c0 100644 --- a/tools/glusterfind/src/tool.conf.in +++ b/tools/glusterfind/src/tool.conf.in @@ -1,10 +1,8 @@ [vars] session_dir=@GLUSTERD_WORKDIR@/glusterfind/ -secret_pem=@GLUSTERD_WORKDIR@/glusterfind.secret.pem working_dir=@GLUSTERFSD_MISCDIR@/glusterfind/ log_dir=/var/log/glusterfs/glusterfind/ -nodecleanup=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodecleanup.py -changelog_rollover_time=15 +nodeagent=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodeagent.py brick_ignore_dirs=.glusterfs,.trashcan [change_detectors] diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py index de9c027e299..906ebd8f252 100644 --- a/tools/glusterfind/src/utils.py +++ b/tools/glusterfind/src/utils.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +# -*- coding: utf-8 -*- # Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/> # This file is part of GlusterFS. @@ -9,14 +9,45 @@ # cases as published by the Free Software Foundation. import sys -import socket from subprocess import PIPE, Popen -from errno import EPERM, EEXIST +from errno import EEXIST, ENOENT +import xml.etree.cElementTree as etree import logging import os from datetime import datetime ROOT_GFID = "00000000-0000-0000-0000-000000000001" +DEFAULT_CHANGELOG_INTERVAL = 15 +SPACE_ESCAPE_CHAR = "%20" +NEWLINE_ESCAPE_CHAR = "%0A" +PERCENTAGE_ESCAPE_CHAR = "%25" + +ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError +cache_data = {} + + +class RecordType(object): + NEW = "NEW" + MODIFY = "MODIFY" + RENAME = "RENAME" + DELETE = "DELETE" + + +def cache_output(func): + def wrapper(*args, **kwargs): + global cache_data + if cache_data.get(func.__name__, None) is None: + cache_data[func.__name__] = func(*args, **kwargs) + + return cache_data[func.__name__] + return wrapper + + +def handle_rm_error(func, path, exc_info): + if exc_info[1].errno == ENOENT: + return + + raise exc_info[1] def find(path, callback_func=lambda x: True, filter_func=lambda x: True, @@ -24,30 +55,43 @@ def find(path, callback_func=lambda x: True, filter_func=lambda x: True, if path in ignore_dirs: return - if filter_func(path): - callback_func(path) + # Capture filter_func output and pass it to callback function + filter_result = filter_func(path) + if filter_result is not None: + callback_func(path, filter_result, os.path.isdir(path)) for p in os.listdir(path): full_path = os.path.join(path, p) - if os.path.isdir(full_path): + is_dir = os.path.isdir(full_path) + if is_dir: if subdirs_crawl: find(full_path, callback_func, filter_func, ignore_dirs) else: - if filter_func(full_path): - callback_func(full_path) + filter_result = filter_func(full_path) + if filter_result is not None: + callback_func(full_path, filter_result) else: - if filter_func(full_path): - callback_func(full_path) + filter_result = filter_func(full_path) + if filter_result is not None: + callback_func(full_path, filter_result, is_dir) -def output_write(f, path, prefix="."): +def output_write(f, path, prefix=".", encode=False, tag="", + field_separator=" "): if path == "": return if prefix != ".": path = os.path.join(prefix, path) - f.write("%s\n" % path) + + if encode: + path = quote_plus_space_newline(path) + + # set the field separator + FS = "" if tag == "" else field_separator + + f.write("%s%s%s\n" % (tag.strip(), FS, path)) def human_time(ts): @@ -153,51 +197,71 @@ def symlink_gfid_to_path(brick, gfid): return out_path -def is_host_local(host): +@cache_output +def get_my_uuid(): + cmd = ["gluster", "system::", "uuid", "get", "--xml"] + rc, out, err = execute(cmd) + + if rc != 0: + return None + + tree = etree.fromstring(out) + uuid_el = tree.find("uuidGenerate/uuid") + return uuid_el.text + + +def is_host_local(host_uuid): + # Get UUID only if it is not done previously + # else Cache the UUID value + my_uuid = get_my_uuid() + if my_uuid == host_uuid: + return True + + return False + + +def get_changelog_rollover_time(volumename): + cmd = ["gluster", "volume", "get", volumename, + "changelog.rollover-time", "--xml"] + rc, out, err = execute(cmd) + + if rc != 0: + return DEFAULT_CHANGELOG_INTERVAL + + try: + tree = etree.fromstring(out) + val = tree.find('volGetopts/Opt/Value').text + if val is not None: + # Filter the value by split, as it may be 'X (DEFAULT)' + # and we only need 'X' + return int(val.split(' ', 1)[0]) + except ParseError: + return DEFAULT_CHANGELOG_INTERVAL + + +def output_path_prepare(path, args): """ - Find if a host is local or not. - Code copied from $GLUSTERFS/geo-replication/syncdaemon/syncdutils.py + If Prefix is set, joins to Path, removes ending slash + and encodes it. """ - locaddr = False - for ai in socket.getaddrinfo(host, None): - # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators - # /mgmt/glusterd/src/glusterd-utils.c#L125 - if ai[0] == socket.AF_INET: - if ai[-1][0].split(".")[0] == "127": - locaddr = True - break - elif ai[0] == socket.AF_INET6: - if ai[-1][0] == "::1": - locaddr = True - break - else: - continue - try: - # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue, - # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587 - s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP) - except socket.error: - ex = sys.exc_info()[1] - if ex.errno != EPERM: - raise - f = None - try: - f = open("/proc/sys/net/ipv4/ip_nonlocal_bind") - if int(f.read()) != 0: - logger.warning("non-local bind is set and not " - "allowed to create " - "raw sockets, cannot determine " - "if %s is local" % host) - return False - s = socket.socket(ai[0], socket.SOCK_DGRAM) - finally: - if f: - f.close() - try: - s.bind(ai[-1]) - locaddr = True - break - except: - pass - s.close() - return locaddr + if args.output_prefix != ".": + path = os.path.join(args.output_prefix, path) + if path.endswith("/"): + path = path[0:len(path)-1] + + if args.no_encode: + return path + else: + return quote_plus_space_newline(path) + + +def unquote_plus_space_newline(s): + return s.replace(SPACE_ESCAPE_CHAR, " ")\ + .replace(NEWLINE_ESCAPE_CHAR, "\n")\ + .replace(PERCENTAGE_ESCAPE_CHAR, "%") + + +def quote_plus_space_newline(s): + return s.replace("%", PERCENTAGE_ESCAPE_CHAR)\ + .replace(" ", SPACE_ESCAPE_CHAR)\ + .replace("\n", NEWLINE_ESCAPE_CHAR) diff --git a/tools/setgfid2path/Makefile.am b/tools/setgfid2path/Makefile.am new file mode 100644 index 00000000000..c14787a80ce --- /dev/null +++ b/tools/setgfid2path/Makefile.am @@ -0,0 +1,5 @@ +SUBDIRS = src + +EXTRA_DIST = gluster-setgfid2path.8 + +man8_MANS = gluster-setgfid2path.8 diff --git a/tools/setgfid2path/gluster-setgfid2path.8 b/tools/setgfid2path/gluster-setgfid2path.8 new file mode 100644 index 00000000000..2e228ca8514 --- /dev/null +++ b/tools/setgfid2path/gluster-setgfid2path.8 @@ -0,0 +1,54 @@ + +.\" Copyright (c) 2017 Red Hat, Inc. <http://www.redhat.com> +.\" This file is part of GlusterFS. +.\" +.\" This file is licensed to you under your choice of the GNU Lesser +.\" General Public License, version 3 or any later version (LGPLv3 or +.\" later), or the GNU General Public License, version 2 (GPLv2), in all +.\" cases as published by the Free Software Foundation. +.\" +.\" +.TH gluster-setgfid2path 8 "Command line utility to set GFID to Path Xattrs" +.SH NAME +gluster-setgfid2path - Gluster tool to set GFID to Path xattrs +.SH SYNOPSIS +.B gluster-setgfid2path +.IR file +.SH DESCRIPTION +New feature introduced with Gluster release 3.12, to find full path from GFID. +This feature can be enabled using Volume set command \fBgluster volume set +<VOLUME> storage.gfid2path enable\fR +.PP +Once \fBgfid2path\fR feature is enabled, it starts recording the necessary +xattrs required for the feature. But it will not add xattrs for the already +existing files. This tool provides facility to update the gfid2path xattrs for +the given file path. + +.SH EXAMPLES +To add xattrs of a single file, +.PP +.nf +.RS +gluster-setgfid2path /bricks/b1/hello.txt +.RE +.fi +.PP +To set xattr for all the existing files, run the below script on each bricks. +.PP +.nf +.RS +BRICK=/bricks/b1 +find $BRICK -type d \\( -path "${BRICK}/.trashcan" -o -path \\ + "${BRICK}/.glusterfs" \\) -prune -o -type f \\ + -exec gluster-setgfid2path {} \\; +.RE +.fi +.PP +.SH SEE ALSO +.nf +\fBgluster\fR(8) +\fR +.fi +.SH COPYRIGHT +.nf +Copyright(c) 2017 Red Hat, Inc. <http://www.redhat.com> diff --git a/tools/setgfid2path/src/Makefile.am b/tools/setgfid2path/src/Makefile.am new file mode 100644 index 00000000000..7316d117070 --- /dev/null +++ b/tools/setgfid2path/src/Makefile.am @@ -0,0 +1,16 @@ +gluster_setgfid2pathdir = $(sbindir) + +if WITH_SERVER +gluster_setgfid2path_PROGRAMS = gluster-setgfid2path +endif + +gluster_setgfid2path_SOURCES = main.c + +gluster_setgfid2path_LDADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +gluster_setgfid2path_LDFLAGS = $(GF_LDFLAGS) + +AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \ + -I$(top_builddir)/rpc/xdr/src + +AM_CFLAGS = -Wall $(GF_CFLAGS) diff --git a/tools/setgfid2path/src/main.c b/tools/setgfid2path/src/main.c new file mode 100644 index 00000000000..4320a7b2481 --- /dev/null +++ b/tools/setgfid2path/src/main.c @@ -0,0 +1,130 @@ +/* + Copyright (c) 2017 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. + */ +#include <stdio.h> +#include <libgen.h> + +#include <glusterfs/common-utils.h> +#include <glusterfs/syscall.h> + +#define MAX_GFID2PATH_LINK_SUP 500 +#define GFID_SIZE 16 +#define GFID_XATTR_KEY "trusted.gfid" + +int +main(int argc, char **argv) +{ + int ret = 0; + struct stat st; + char *dname = NULL; + char *bname = NULL; + ssize_t ret_size = 0; + uuid_t pgfid_raw = { + 0, + }; + char pgfid[36 + 1] = ""; + char xxh64[GF_XXH64_DIGEST_LENGTH * 2 + 1] = { + 0, + }; + char pgfid_bname[1024] = { + 0, + }; + char *key = NULL; + char *val = NULL; + size_t key_size = 0; + size_t val_size = 0; + const char *file_path = NULL; + char *file_path1 = NULL; + char *file_path2 = NULL; + + if (argc != 2) { + fprintf(stderr, "Usage: setgfid2path <file-path>\n"); + return -1; + } + + ret = sys_lstat(argv[1], &st); + if (ret != 0) { + fprintf(stderr, "Invalid File Path\n"); + return -1; + } + + if (st.st_nlink >= MAX_GFID2PATH_LINK_SUP) { + fprintf(stderr, + "Number of Hardlink support exceeded. " + "max=%d\n", + MAX_GFID2PATH_LINK_SUP); + return -1; + } + + file_path = argv[1]; + file_path1 = strdup(file_path); + file_path2 = strdup(file_path); + + dname = dirname(file_path1); + bname = basename(file_path2); + + /* Get GFID of Parent directory */ + ret_size = sys_lgetxattr(dname, GFID_XATTR_KEY, pgfid_raw, GFID_SIZE); + if (ret_size != GFID_SIZE) { + fprintf(stderr, "Failed to get GFID of parent directory. dir=%s\n", + dname); + ret = -1; + goto out; + } + + /* Convert to UUID format */ + if (uuid_utoa_r(pgfid_raw, pgfid) == NULL) { + fprintf(stderr, + "Failed to format GFID of parent directory. " + "dir=%s GFID=%s\n", + dname, pgfid_raw); + ret = -1; + goto out; + } + + /* Find xxhash for PGFID/BaseName */ + snprintf(pgfid_bname, sizeof(pgfid_bname), "%s/%s", pgfid, bname); + gf_xxh64_wrapper((unsigned char *)pgfid_bname, strlen(pgfid_bname), + GF_XXHSUM64_DEFAULT_SEED, xxh64); + + key_size = SLEN(GFID2PATH_XATTR_KEY_PREFIX) + GF_XXH64_DIGEST_LENGTH * 2 + + 1; + key = alloca(key_size); + snprintf(key, key_size, GFID2PATH_XATTR_KEY_PREFIX "%s", xxh64); + + val_size = UUID_CANONICAL_FORM_LEN + NAME_MAX + 2; + val = alloca(val_size); + snprintf(val, val_size, "%s/%s", pgfid, bname); + + /* Set the Xattr, ignore if same key xattr already exists */ + ret = sys_lsetxattr(file_path, key, val, strlen(val), XATTR_CREATE); + if (ret == -1) { + if (errno == EEXIST) { + printf("Xattr already exists, ignoring..\n"); + ret = 0; + goto out; + } + + fprintf(stderr, "Failed to set gfid2path xattr. errno=%d\n error=%s", + errno, strerror(errno)); + ret = -1; + goto out; + } + + printf("Success. file=%s key=%s value=%s\n", file_path, key, val); + +out: + if (file_path1 != NULL) + free(file_path1); + + if (file_path2 != NULL) + free(file_path2); + + return ret; +} |
