summaryrefslogtreecommitdiffstats
path: root/libglusterfs/src/syncop-utils.c
diff options
context:
space:
mode:
Diffstat (limited to 'libglusterfs/src/syncop-utils.c')
-rw-r--r--libglusterfs/src/syncop-utils.c238
1 files changed, 238 insertions, 0 deletions
diff --git a/libglusterfs/src/syncop-utils.c b/libglusterfs/src/syncop-utils.c
index 16ae1f7d74d..5e6b9fa5bfe 100644
--- a/libglusterfs/src/syncop-utils.c
+++ b/libglusterfs/src/syncop-utils.c
@@ -14,9 +14,24 @@
#endif
#include "syncop.h"
+#include "syncop-utils.h"
#include "common-utils.h"
#include "libglusterfs-messages.h"
+struct syncop_dir_scan_data {
+ xlator_t *subvol;
+ loc_t *parent;
+ void *data;
+ gf_dirent_t *q;
+ gf_dirent_t *entry;
+ pthread_cond_t *cond;
+ pthread_mutex_t *mut;
+ syncop_dir_scan_fn_t fn;
+ uint32_t *jobs_running;
+ uint32_t *qlen;
+ int32_t *retval;
+};
+
int
syncop_dirfd (xlator_t *subvol, loc_t *loc, fd_t **fd, int pid)
{
@@ -224,6 +239,229 @@ out:
return ret;
}
+static void
+_scan_data_destroy (struct syncop_dir_scan_data *data)
+{
+ GF_FREE (data);
+}
+
+static int
+_dir_scan_job_fn_cbk (int ret, call_frame_t *frame, void *opaque)
+{
+ struct syncop_dir_scan_data *scan_data = opaque;
+
+ _scan_data_destroy (scan_data);
+ return 0;
+}
+
+static int
+_dir_scan_job_fn (void *data)
+{
+ struct syncop_dir_scan_data *scan_data = data;
+ gf_dirent_t *entry = NULL;
+ int ret = 0;
+
+ entry = scan_data->entry;
+ scan_data->entry = NULL;
+ do {
+ ret = scan_data->fn (scan_data->subvol, entry,
+ scan_data->parent,
+ scan_data->data);
+ gf_dirent_entry_free (entry);
+ entry = NULL;
+ pthread_mutex_lock (scan_data->mut);
+ {
+ if (ret || list_empty (&scan_data->q->list)) {
+ (*scan_data->jobs_running)--;
+ *scan_data->retval |= ret;
+ pthread_cond_broadcast (scan_data->cond);
+ } else {
+ entry = list_first_entry (&scan_data->q->list,
+ typeof (*scan_data->q), list);
+ list_del_init (&entry->list);
+ (*scan_data->qlen)--;
+ }
+ }
+ pthread_mutex_unlock (scan_data->mut);
+ } while (entry);
+
+ return ret;
+}
+
+static int
+_run_dir_scan_task (xlator_t *subvol, loc_t *parent, gf_dirent_t *q,
+ gf_dirent_t *entry, int *retval, pthread_mutex_t *mut,
+ pthread_cond_t *cond, uint32_t *jobs_running,
+ uint32_t *qlen, syncop_dir_scan_fn_t fn, void *data)
+{
+ int ret = 0;
+ struct syncop_dir_scan_data *scan_data = NULL;
+
+
+ scan_data = GF_CALLOC (1, sizeof (struct syncop_dir_scan_data),
+ gf_common_mt_scan_data);
+ if (!scan_data) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ scan_data->subvol = subvol;
+ scan_data->parent = parent;
+ scan_data->data = data;
+ scan_data->mut = mut;
+ scan_data->cond = cond;
+ scan_data->fn = fn;
+ scan_data->jobs_running = jobs_running;
+ scan_data->entry = entry;
+ scan_data->q = q;
+ scan_data->qlen = qlen;
+ scan_data->retval = retval;
+
+ ret = synctask_new (subvol->ctx->env, _dir_scan_job_fn,
+ _dir_scan_job_fn_cbk, NULL, scan_data);
+out:
+ if (ret < 0) {
+ gf_dirent_entry_free (entry);
+ _scan_data_destroy (scan_data);
+ pthread_mutex_lock (mut);
+ {
+ *jobs_running = *jobs_running - 1;
+ }
+ pthread_mutex_unlock (mut);
+ /*No need to cond-broadcast*/
+ }
+ return ret;
+}
+
+int
+syncop_mt_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data,
+ syncop_dir_scan_fn_t fn, dict_t *xdata, uint32_t max_jobs,
+ uint32_t max_qlen)
+{
+ fd_t *fd = NULL;
+ uint64_t offset = 0;
+ gf_dirent_t *last = NULL;
+ int ret = 0;
+ int retval = 0;
+ gf_dirent_t q;
+ gf_dirent_t *entry = NULL;
+ gf_dirent_t *tmp = NULL;
+ uint32_t jobs_running = 0;
+ uint32_t qlen = 0;
+ pthread_cond_t cond;
+ pthread_mutex_t mut;
+ gf_boolean_t cond_init = _gf_false;
+ gf_boolean_t mut_init = _gf_false;
+ gf_dirent_t entries;
+
+ /*For this functionality to be implemented in general, we need
+ * synccond_t infra which doesn't block the executing thread. Until then
+ * return failures inside synctask if they use this.*/
+ if (synctask_get())
+ return -ENOTSUP;
+
+ if (max_jobs == 0)
+ return -EINVAL;
+
+ /*Code becomes simpler this way. cond_wait just on qlength.
+ * Little bit of cheating*/
+ if (max_qlen == 0)
+ max_qlen = 1;
+
+ ret = syncop_dirfd (subvol, loc, &fd, pid);
+ if (ret)
+ goto out;
+
+ INIT_LIST_HEAD (&entries.list);
+ INIT_LIST_HEAD (&q.list);
+ ret = pthread_mutex_init (&mut, NULL);
+ if (ret)
+ goto out;
+ mut_init = _gf_true;
+
+ ret = pthread_cond_init (&cond, NULL);
+ if (ret)
+ goto out;
+ cond_init = _gf_true;
+
+ while ((ret = syncop_readdir (subvol, fd, 131072, offset, &entries,
+ xdata, NULL))) {
+ if (ret < 0)
+ break;
+
+ if (ret > 0) {
+ /* If the entries are only '.', and '..' then ret
+ * value will be non-zero. so set it to zero here. */
+ ret = 0;
+ }
+
+ last = list_last_entry (&entries.list, typeof (*last), list);
+ offset = last->d_off;
+
+ list_for_each_entry_safe (entry, tmp, &entries.list, list) {
+ list_del_init (&entry->list);
+ if (!strcmp (entry->d_name, ".") ||
+ !strcmp (entry->d_name, "..")) {
+ gf_dirent_entry_free (entry);
+ continue;
+ }
+
+ if (entry->d_type == IA_IFDIR) {
+ ret = fn (subvol, entry, loc, data);
+ gf_dirent_entry_free (entry);
+ if (ret)
+ break;
+ continue;
+ }
+
+ pthread_mutex_lock (&mut);
+ {
+ while (qlen == max_qlen)
+ pthread_cond_wait (&cond, &mut);
+ if (max_jobs == jobs_running) {
+ list_add_tail (&entry->list, &q.list);
+ qlen++;
+ entry = NULL;
+ } else {
+ jobs_running++;
+ }
+ }
+ pthread_mutex_unlock (&mut);
+ if (retval) /*Any jobs failed?*/
+ break;
+
+ if (!entry)
+ continue;
+
+ ret = _run_dir_scan_task (subvol, loc, &q, entry,
+ &retval, &mut, &cond,
+ &jobs_running, &qlen, fn, data);
+ if (ret)
+ break;
+ }
+ }
+
+out:
+ if (fd)
+ fd_unref (fd);
+ if (mut_init && cond_init) {
+ pthread_mutex_lock (&mut);
+ {
+ while (jobs_running)
+ pthread_cond_wait (&cond, &mut);
+ }
+ pthread_mutex_unlock (&mut);
+ gf_dirent_free (&q);
+ gf_dirent_free (&entries);
+ }
+
+ if (mut_init)
+ pthread_mutex_destroy (&mut);
+ if (cond_init)
+ pthread_cond_destroy (&cond);
+ return ret|retval;
+}
+
int
syncop_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data,
int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent,