diff options
author | Pranith Kumar K <pkarampu@redhat.com> | 2016-03-17 09:32:02 +0530 |
---|---|---|
committer | Pranith Kumar Karampuri <pkarampu@redhat.com> | 2016-04-16 18:56:13 -0700 |
commit | 02a235b5a5fcfffd17debfbf3fceeddffe171682 (patch) | |
tree | aa7270cf71157eb90b792a84c41035292dc3902b /libglusterfs | |
parent | 07ea879362348d6c355b7ea7c197e3e80ec25485 (diff) |
syncop: Add parallel dir scan functionality
Most of this functionality's ideas are contributed
by Richard Wareing, in his patch:
https://bugzilla.redhat.com/show_bug.cgi?id=1221737#c1
VERY BIG thanks to him :-).
After starting porting/testing the patch above, I found a few things we can
improve in this patch based on the results we got in testing.
1) We are reading all the indices before we launch self-heals. In some customer
cases I worked on there were almost 5million files/directories that needed
heal. With such a big number self-heal daemon will be OOM killed if we go
this route. So I modified this to launch heals based on a queue length
limit.
2) We found that for directory hierarchies, multi-threaded self-heal
patch was not giving better results compared to single-threaded
self-heal because of the order problems. We improved index xlator to
give gfid type to make sure that all directories in the indices are
healed before the files that follow in that iteration of readdir
output(http://review.gluster.org/13553). In our testing this lead to
zero errors of self-heals as we were only doing self-heals in parallel
for files and not directories. I think we can further improve self-heal
speed for directories by doing name heals in parallel based on similar
techniques Richard's patch showed. I think the best thing there would be to
introduce synccond_t infra (pthread_cond_t kind of infra for syncops)
which I am planning to implement for future releases.
3) Based on 1), 2) and the fact that afr already does retries of the
indices in a loop I removed retries again in the threads.
4) After the refactor, the changes required to bring in multi-threaded
self-heal for ec would just be ~10 lines, most of it will be about
options initialization.
Our tests found that we are able to easily saturate network :-).
High level description of the final feature:
Traditionally self-heal daemon reads the indices (gfids) that need to be healed
from the brick and initiates heal one gfid at a time. Goal of this feature is
to add parallelization to the way we do self-heals in a way we do not regress
in any case but increase parallelization wherever we can. As part of this following
knobs are introduced to improve parallelization:
1) We can launch 'max-jobs' number of heals in parallel.
2) We can keep reading indices as long as the wait-q for heals doesn't go over
'max-qlen' passed as arguments to multi-threaded dir_scan.
As a first cut, we always do healing of directories in serial order one at a time
but for files we launch heals in parallel. In future we can do name-heals of dir
in parallel, but this is not implemented as of now. Reason for this is mentioned
already in '2)' above.
AFR/EC can introduce options like max-shd-threads/wait-qlength which can be set
by users to increase the rate of heals when they want. Please note that the
options will take effect only for the next crawl.
>BUG: 1221737
>Change-Id: I8fc0afc334def87797f6d41e309cefc722a317d2
>Signed-off-by: Pranith Kumar K <pkarampu@redhat.com>
>Reviewed-on: http://review.gluster.org/13569
>NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
>CentOS-regression: Gluster Build System <jenkins@build.gluster.com>
>Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
>Smoke: Gluster Build System <jenkins@build.gluster.com>
BUG: 1325857
Change-Id: I23235bbb923208eee6a8be711bbfb14350edb11b
Signed-off-by: Pranith Kumar K <pkarampu@redhat.com>
Reviewed-on: http://review.gluster.org/13967
Smoke: Gluster Build System <jenkins@build.gluster.com>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
CentOS-regression: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'libglusterfs')
-rw-r--r-- | libglusterfs/src/mem-types.h | 1 | ||||
-rw-r--r-- | libglusterfs/src/syncop-utils.c | 238 | ||||
-rw-r--r-- | libglusterfs/src/syncop-utils.h | 7 |
3 files changed, 246 insertions, 0 deletions
diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h index 84949c61487..dd96cc63545 100644 --- a/libglusterfs/src/mem-types.h +++ b/libglusterfs/src/mem-types.h @@ -155,6 +155,7 @@ enum gf_common_mem_types_ { gf_common_mt_synctask, gf_common_mt_syncstack, gf_common_mt_syncenv, + gf_common_mt_scan_data, gf_common_mt_end }; #endif diff --git a/libglusterfs/src/syncop-utils.c b/libglusterfs/src/syncop-utils.c index 16ae1f7d74d..5e6b9fa5bfe 100644 --- a/libglusterfs/src/syncop-utils.c +++ b/libglusterfs/src/syncop-utils.c @@ -14,9 +14,24 @@ #endif #include "syncop.h" +#include "syncop-utils.h" #include "common-utils.h" #include "libglusterfs-messages.h" +struct syncop_dir_scan_data { + xlator_t *subvol; + loc_t *parent; + void *data; + gf_dirent_t *q; + gf_dirent_t *entry; + pthread_cond_t *cond; + pthread_mutex_t *mut; + syncop_dir_scan_fn_t fn; + uint32_t *jobs_running; + uint32_t *qlen; + int32_t *retval; +}; + int syncop_dirfd (xlator_t *subvol, loc_t *loc, fd_t **fd, int pid) { @@ -224,6 +239,229 @@ out: return ret; } +static void +_scan_data_destroy (struct syncop_dir_scan_data *data) +{ + GF_FREE (data); +} + +static int +_dir_scan_job_fn_cbk (int ret, call_frame_t *frame, void *opaque) +{ + struct syncop_dir_scan_data *scan_data = opaque; + + _scan_data_destroy (scan_data); + return 0; +} + +static int +_dir_scan_job_fn (void *data) +{ + struct syncop_dir_scan_data *scan_data = data; + gf_dirent_t *entry = NULL; + int ret = 0; + + entry = scan_data->entry; + scan_data->entry = NULL; + do { + ret = scan_data->fn (scan_data->subvol, entry, + scan_data->parent, + scan_data->data); + gf_dirent_entry_free (entry); + entry = NULL; + pthread_mutex_lock (scan_data->mut); + { + if (ret || list_empty (&scan_data->q->list)) { + (*scan_data->jobs_running)--; + *scan_data->retval |= ret; + pthread_cond_broadcast (scan_data->cond); + } else { + entry = list_first_entry (&scan_data->q->list, + typeof (*scan_data->q), list); + list_del_init (&entry->list); + (*scan_data->qlen)--; + } + } + pthread_mutex_unlock (scan_data->mut); + } while (entry); + + return ret; +} + +static int +_run_dir_scan_task (xlator_t *subvol, loc_t *parent, gf_dirent_t *q, + gf_dirent_t *entry, int *retval, pthread_mutex_t *mut, + pthread_cond_t *cond, uint32_t *jobs_running, + uint32_t *qlen, syncop_dir_scan_fn_t fn, void *data) +{ + int ret = 0; + struct syncop_dir_scan_data *scan_data = NULL; + + + scan_data = GF_CALLOC (1, sizeof (struct syncop_dir_scan_data), + gf_common_mt_scan_data); + if (!scan_data) { + ret = -ENOMEM; + goto out; + } + + scan_data->subvol = subvol; + scan_data->parent = parent; + scan_data->data = data; + scan_data->mut = mut; + scan_data->cond = cond; + scan_data->fn = fn; + scan_data->jobs_running = jobs_running; + scan_data->entry = entry; + scan_data->q = q; + scan_data->qlen = qlen; + scan_data->retval = retval; + + ret = synctask_new (subvol->ctx->env, _dir_scan_job_fn, + _dir_scan_job_fn_cbk, NULL, scan_data); +out: + if (ret < 0) { + gf_dirent_entry_free (entry); + _scan_data_destroy (scan_data); + pthread_mutex_lock (mut); + { + *jobs_running = *jobs_running - 1; + } + pthread_mutex_unlock (mut); + /*No need to cond-broadcast*/ + } + return ret; +} + +int +syncop_mt_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data, + syncop_dir_scan_fn_t fn, dict_t *xdata, uint32_t max_jobs, + uint32_t max_qlen) +{ + fd_t *fd = NULL; + uint64_t offset = 0; + gf_dirent_t *last = NULL; + int ret = 0; + int retval = 0; + gf_dirent_t q; + gf_dirent_t *entry = NULL; + gf_dirent_t *tmp = NULL; + uint32_t jobs_running = 0; + uint32_t qlen = 0; + pthread_cond_t cond; + pthread_mutex_t mut; + gf_boolean_t cond_init = _gf_false; + gf_boolean_t mut_init = _gf_false; + gf_dirent_t entries; + + /*For this functionality to be implemented in general, we need + * synccond_t infra which doesn't block the executing thread. Until then + * return failures inside synctask if they use this.*/ + if (synctask_get()) + return -ENOTSUP; + + if (max_jobs == 0) + return -EINVAL; + + /*Code becomes simpler this way. cond_wait just on qlength. + * Little bit of cheating*/ + if (max_qlen == 0) + max_qlen = 1; + + ret = syncop_dirfd (subvol, loc, &fd, pid); + if (ret) + goto out; + + INIT_LIST_HEAD (&entries.list); + INIT_LIST_HEAD (&q.list); + ret = pthread_mutex_init (&mut, NULL); + if (ret) + goto out; + mut_init = _gf_true; + + ret = pthread_cond_init (&cond, NULL); + if (ret) + goto out; + cond_init = _gf_true; + + while ((ret = syncop_readdir (subvol, fd, 131072, offset, &entries, + xdata, NULL))) { + if (ret < 0) + break; + + if (ret > 0) { + /* If the entries are only '.', and '..' then ret + * value will be non-zero. so set it to zero here. */ + ret = 0; + } + + last = list_last_entry (&entries.list, typeof (*last), list); + offset = last->d_off; + + list_for_each_entry_safe (entry, tmp, &entries.list, list) { + list_del_init (&entry->list); + if (!strcmp (entry->d_name, ".") || + !strcmp (entry->d_name, "..")) { + gf_dirent_entry_free (entry); + continue; + } + + if (entry->d_type == IA_IFDIR) { + ret = fn (subvol, entry, loc, data); + gf_dirent_entry_free (entry); + if (ret) + break; + continue; + } + + pthread_mutex_lock (&mut); + { + while (qlen == max_qlen) + pthread_cond_wait (&cond, &mut); + if (max_jobs == jobs_running) { + list_add_tail (&entry->list, &q.list); + qlen++; + entry = NULL; + } else { + jobs_running++; + } + } + pthread_mutex_unlock (&mut); + if (retval) /*Any jobs failed?*/ + break; + + if (!entry) + continue; + + ret = _run_dir_scan_task (subvol, loc, &q, entry, + &retval, &mut, &cond, + &jobs_running, &qlen, fn, data); + if (ret) + break; + } + } + +out: + if (fd) + fd_unref (fd); + if (mut_init && cond_init) { + pthread_mutex_lock (&mut); + { + while (jobs_running) + pthread_cond_wait (&cond, &mut); + } + pthread_mutex_unlock (&mut); + gf_dirent_free (&q); + gf_dirent_free (&entries); + } + + if (mut_init) + pthread_mutex_destroy (&mut); + if (cond_init) + pthread_cond_destroy (&cond); + return ret|retval; +} + int syncop_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data, int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent, diff --git a/libglusterfs/src/syncop-utils.h b/libglusterfs/src/syncop-utils.h index 7a9ccacb285..52bcfd99429 100644 --- a/libglusterfs/src/syncop-utils.h +++ b/libglusterfs/src/syncop-utils.h @@ -11,12 +11,19 @@ #ifndef _SYNCOP_UTILS_H #define _SYNCOP_UTILS_H +typedef int (*syncop_dir_scan_fn_t) (xlator_t *subvol, gf_dirent_t *entry, + loc_t *parent, void *data); int syncop_ftw (xlator_t *subvol, loc_t *loc, int pid, void *data, int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent, void *data)); int +syncop_mt_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data, + syncop_dir_scan_fn_t fn, dict_t *xdata, uint32_t max_jobs, + uint32_t max_qlen); + +int syncop_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data, int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent, void *data)); |