diff options
Diffstat (limited to 'libglusterfs/src/syncop-utils.c')
-rw-r--r-- | libglusterfs/src/syncop-utils.c | 238 |
1 files changed, 238 insertions, 0 deletions
diff --git a/libglusterfs/src/syncop-utils.c b/libglusterfs/src/syncop-utils.c index 4e8849f06f8..7421f81f46c 100644 --- a/libglusterfs/src/syncop-utils.c +++ b/libglusterfs/src/syncop-utils.c @@ -9,9 +9,24 @@ */ #include "syncop.h" +#include "syncop-utils.h" #include "common-utils.h" #include "libglusterfs-messages.h" +struct syncop_dir_scan_data { + xlator_t *subvol; + loc_t *parent; + void *data; + gf_dirent_t *q; + gf_dirent_t *entry; + pthread_cond_t *cond; + pthread_mutex_t *mut; + syncop_dir_scan_fn_t fn; + uint32_t *jobs_running; + uint32_t *qlen; + int32_t *retval; +}; + int syncop_dirfd (xlator_t *subvol, loc_t *loc, fd_t **fd, int pid) { @@ -219,6 +234,229 @@ out: return ret; } +static void +_scan_data_destroy (struct syncop_dir_scan_data *data) +{ + GF_FREE (data); +} + +static int +_dir_scan_job_fn_cbk (int ret, call_frame_t *frame, void *opaque) +{ + struct syncop_dir_scan_data *scan_data = opaque; + + _scan_data_destroy (scan_data); + return 0; +} + +static int +_dir_scan_job_fn (void *data) +{ + struct syncop_dir_scan_data *scan_data = data; + gf_dirent_t *entry = NULL; + int ret = 0; + + entry = scan_data->entry; + scan_data->entry = NULL; + do { + ret = scan_data->fn (scan_data->subvol, entry, + scan_data->parent, + scan_data->data); + gf_dirent_entry_free (entry); + entry = NULL; + pthread_mutex_lock (scan_data->mut); + { + if (ret || list_empty (&scan_data->q->list)) { + (*scan_data->jobs_running)--; + *scan_data->retval |= ret; + pthread_cond_broadcast (scan_data->cond); + } else { + entry = list_first_entry (&scan_data->q->list, + typeof (*scan_data->q), list); + list_del_init (&entry->list); + (*scan_data->qlen)--; + } + } + pthread_mutex_unlock (scan_data->mut); + } while (entry); + + return ret; +} + +static int +_run_dir_scan_task (xlator_t *subvol, loc_t *parent, gf_dirent_t *q, + gf_dirent_t *entry, int *retval, pthread_mutex_t *mut, + pthread_cond_t *cond, uint32_t *jobs_running, + uint32_t *qlen, syncop_dir_scan_fn_t fn, void *data) +{ + int ret = 0; + struct syncop_dir_scan_data *scan_data = NULL; + + + scan_data = GF_CALLOC (1, sizeof (struct syncop_dir_scan_data), + gf_common_mt_scan_data); + if (!scan_data) { + ret = -ENOMEM; + goto out; + } + + scan_data->subvol = subvol; + scan_data->parent = parent; + scan_data->data = data; + scan_data->mut = mut; + scan_data->cond = cond; + scan_data->fn = fn; + scan_data->jobs_running = jobs_running; + scan_data->entry = entry; + scan_data->q = q; + scan_data->qlen = qlen; + scan_data->retval = retval; + + ret = synctask_new (subvol->ctx->env, _dir_scan_job_fn, + _dir_scan_job_fn_cbk, NULL, scan_data); +out: + if (ret < 0) { + gf_dirent_entry_free (entry); + _scan_data_destroy (scan_data); + pthread_mutex_lock (mut); + { + *jobs_running = *jobs_running - 1; + } + pthread_mutex_unlock (mut); + /*No need to cond-broadcast*/ + } + return ret; +} + +int +syncop_mt_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data, + syncop_dir_scan_fn_t fn, dict_t *xdata, uint32_t max_jobs, + uint32_t max_qlen) +{ + fd_t *fd = NULL; + uint64_t offset = 0; + gf_dirent_t *last = NULL; + int ret = 0; + int retval = 0; + gf_dirent_t q; + gf_dirent_t *entry = NULL; + gf_dirent_t *tmp = NULL; + uint32_t jobs_running = 0; + uint32_t qlen = 0; + pthread_cond_t cond; + pthread_mutex_t mut; + gf_boolean_t cond_init = _gf_false; + gf_boolean_t mut_init = _gf_false; + gf_dirent_t entries; + + /*For this functionality to be implemented in general, we need + * synccond_t infra which doesn't block the executing thread. Until then + * return failures inside synctask if they use this.*/ + if (synctask_get()) + return -ENOTSUP; + + if (max_jobs == 0) + return -EINVAL; + + /*Code becomes simpler this way. cond_wait just on qlength. + * Little bit of cheating*/ + if (max_qlen == 0) + max_qlen = 1; + + ret = syncop_dirfd (subvol, loc, &fd, pid); + if (ret) + goto out; + + INIT_LIST_HEAD (&entries.list); + INIT_LIST_HEAD (&q.list); + ret = pthread_mutex_init (&mut, NULL); + if (ret) + goto out; + mut_init = _gf_true; + + ret = pthread_cond_init (&cond, NULL); + if (ret) + goto out; + cond_init = _gf_true; + + while ((ret = syncop_readdir (subvol, fd, 131072, offset, &entries, + xdata, NULL))) { + if (ret < 0) + break; + + if (ret > 0) { + /* If the entries are only '.', and '..' then ret + * value will be non-zero. so set it to zero here. */ + ret = 0; + } + + last = list_last_entry (&entries.list, typeof (*last), list); + offset = last->d_off; + + list_for_each_entry_safe (entry, tmp, &entries.list, list) { + list_del_init (&entry->list); + if (!strcmp (entry->d_name, ".") || + !strcmp (entry->d_name, "..")) { + gf_dirent_entry_free (entry); + continue; + } + + if (entry->d_type == IA_IFDIR) { + ret = fn (subvol, entry, loc, data); + gf_dirent_entry_free (entry); + if (ret) + break; + continue; + } + + pthread_mutex_lock (&mut); + { + while (qlen == max_qlen) + pthread_cond_wait (&cond, &mut); + if (max_jobs == jobs_running) { + list_add_tail (&entry->list, &q.list); + qlen++; + entry = NULL; + } else { + jobs_running++; + } + } + pthread_mutex_unlock (&mut); + if (retval) /*Any jobs failed?*/ + break; + + if (!entry) + continue; + + ret = _run_dir_scan_task (subvol, loc, &q, entry, + &retval, &mut, &cond, + &jobs_running, &qlen, fn, data); + if (ret) + break; + } + } + +out: + if (fd) + fd_unref (fd); + if (mut_init && cond_init) { + pthread_mutex_lock (&mut); + { + while (jobs_running) + pthread_cond_wait (&cond, &mut); + } + pthread_mutex_unlock (&mut); + gf_dirent_free (&q); + gf_dirent_free (&entries); + } + + if (mut_init) + pthread_mutex_destroy (&mut); + if (cond_init) + pthread_cond_destroy (&cond); + return ret|retval; +} + int syncop_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data, int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent, |