diff options
Diffstat (limited to 'libglusterfs/src/syncop-utils.c')
| -rw-r--r-- | libglusterfs/src/syncop-utils.c | 238 | 
1 files changed, 238 insertions, 0 deletions
diff --git a/libglusterfs/src/syncop-utils.c b/libglusterfs/src/syncop-utils.c index 4e8849f06f8..7421f81f46c 100644 --- a/libglusterfs/src/syncop-utils.c +++ b/libglusterfs/src/syncop-utils.c @@ -9,9 +9,24 @@  */  #include "syncop.h" +#include "syncop-utils.h"  #include "common-utils.h"  #include "libglusterfs-messages.h" +struct syncop_dir_scan_data { +        xlator_t *subvol; +        loc_t *parent; +        void *data; +        gf_dirent_t *q; +        gf_dirent_t *entry; +        pthread_cond_t *cond; +        pthread_mutex_t *mut; +        syncop_dir_scan_fn_t fn; +        uint32_t *jobs_running; +        uint32_t *qlen; +        int32_t  *retval; +}; +  int  syncop_dirfd (xlator_t *subvol, loc_t *loc, fd_t **fd, int pid)  { @@ -219,6 +234,229 @@ out:          return ret;  } +static void +_scan_data_destroy (struct syncop_dir_scan_data *data) +{ +        GF_FREE (data); +} + +static int +_dir_scan_job_fn_cbk (int ret, call_frame_t *frame, void *opaque) +{ +        struct syncop_dir_scan_data *scan_data = opaque; + +        _scan_data_destroy (scan_data); +        return 0; +} + +static int +_dir_scan_job_fn (void *data) +{ +        struct syncop_dir_scan_data *scan_data = data; +        gf_dirent_t                 *entry     = NULL; +        int                         ret        = 0; + +        entry = scan_data->entry; +        scan_data->entry = NULL; +        do { +                ret = scan_data->fn (scan_data->subvol, entry, +                                     scan_data->parent, +                                     scan_data->data); +                gf_dirent_entry_free (entry); +                entry = NULL; +                pthread_mutex_lock (scan_data->mut); +                { +                        if (ret || list_empty (&scan_data->q->list)) { +                                (*scan_data->jobs_running)--; +                                *scan_data->retval |= ret; +                                pthread_cond_broadcast (scan_data->cond); +                        } else { +                                entry = list_first_entry (&scan_data->q->list, +                                                  typeof (*scan_data->q), list); +                                list_del_init (&entry->list); +                                (*scan_data->qlen)--; +                        } +                } +                pthread_mutex_unlock (scan_data->mut); +        } while (entry); + +        return ret; +} + +static int +_run_dir_scan_task (xlator_t *subvol, loc_t *parent, gf_dirent_t *q, +                    gf_dirent_t *entry, int *retval, pthread_mutex_t *mut, +                    pthread_cond_t *cond, uint32_t *jobs_running, +                    uint32_t *qlen, syncop_dir_scan_fn_t fn, void *data) +{ +        int     ret = 0; +        struct syncop_dir_scan_data *scan_data = NULL; + + +        scan_data = GF_CALLOC (1, sizeof (struct syncop_dir_scan_data), +                               gf_common_mt_scan_data); +        if (!scan_data) { +                ret = -ENOMEM; +                goto out; +        } + +        scan_data->subvol       = subvol; +        scan_data->parent       = parent; +        scan_data->data         = data; +        scan_data->mut          = mut; +        scan_data->cond         = cond; +        scan_data->fn           = fn; +        scan_data->jobs_running = jobs_running; +        scan_data->entry        = entry; +        scan_data->q            = q; +        scan_data->qlen         = qlen; +        scan_data->retval       = retval; + +        ret = synctask_new (subvol->ctx->env, _dir_scan_job_fn, +                            _dir_scan_job_fn_cbk, NULL, scan_data); +out: +        if (ret < 0) { +                gf_dirent_entry_free (entry); +                _scan_data_destroy (scan_data); +                pthread_mutex_lock (mut); +                { +                        *jobs_running = *jobs_running - 1; +                } +                pthread_mutex_unlock (mut); +                /*No need to cond-broadcast*/ +        } +        return ret; +} + +int +syncop_mt_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data, +                    syncop_dir_scan_fn_t fn, dict_t *xdata, uint32_t max_jobs, +                    uint32_t max_qlen) +{ +        fd_t        *fd    = NULL; +        uint64_t    offset = 0; +        gf_dirent_t *last = NULL; +        int         ret    = 0; +        int         retval = 0; +        gf_dirent_t q; +        gf_dirent_t *entry = NULL; +        gf_dirent_t *tmp = NULL; +        uint32_t    jobs_running = 0; +        uint32_t    qlen = 0; +        pthread_cond_t cond; +        pthread_mutex_t mut; +        gf_boolean_t cond_init = _gf_false; +        gf_boolean_t mut_init = _gf_false; +        gf_dirent_t entries; + +        /*For this functionality to be implemented in general, we need +         * synccond_t infra which doesn't block the executing thread. Until then +         * return failures inside synctask if they use this.*/ +        if (synctask_get()) +                return -ENOTSUP; + +        if (max_jobs == 0) +                return -EINVAL; + +        /*Code becomes simpler this way. cond_wait just on qlength. +         * Little bit of cheating*/ +        if (max_qlen == 0) +                max_qlen = 1; + +        ret = syncop_dirfd (subvol, loc, &fd, pid); +        if (ret) +                goto out; + +        INIT_LIST_HEAD (&entries.list); +        INIT_LIST_HEAD (&q.list); +        ret = pthread_mutex_init (&mut, NULL); +        if (ret) +                goto out; +        mut_init = _gf_true; + +        ret = pthread_cond_init (&cond, NULL); +        if (ret) +                goto out; +        cond_init = _gf_true; + +        while ((ret = syncop_readdir (subvol, fd, 131072, offset, &entries, +                                      xdata, NULL))) { +                if (ret < 0) +                        break; + +                if (ret > 0) { +                        /* If the entries are only '.', and '..' then ret +                         * value will be non-zero. so set it to zero here. */ +                        ret = 0; +                } + +                last = list_last_entry (&entries.list, typeof (*last), list); +                offset = last->d_off; + +                list_for_each_entry_safe (entry, tmp, &entries.list, list) { +                        list_del_init (&entry->list); +                        if (!strcmp (entry->d_name, ".") || +                            !strcmp (entry->d_name, "..")) { +                                gf_dirent_entry_free (entry); +                                continue; +                        } + +                        if (entry->d_type == IA_IFDIR) { +                                ret = fn (subvol, entry, loc, data); +                                gf_dirent_entry_free (entry); +                                if (ret) +                                        break; +                                continue; +                        } + +                        pthread_mutex_lock (&mut); +                        { +                                while (qlen == max_qlen) +                                        pthread_cond_wait (&cond, &mut); +                                if (max_jobs == jobs_running) { +                                        list_add_tail (&entry->list, &q.list); +                                        qlen++; +                                        entry = NULL; +                                } else { +                                        jobs_running++; +                                } +                        } +                        pthread_mutex_unlock (&mut); +                        if (retval) /*Any jobs failed?*/ +                                break; + +                        if (!entry) +                                continue; + +                        ret = _run_dir_scan_task (subvol, loc, &q, entry, +                                                  &retval, &mut, &cond, +                                                &jobs_running, &qlen, fn, data); +                        if (ret) +                                break; +                } +        } + +out: +        if (fd) +                fd_unref (fd); +        if (mut_init && cond_init) { +                pthread_mutex_lock (&mut); +                { +                        while (jobs_running) +                                pthread_cond_wait (&cond, &mut); +                } +                pthread_mutex_unlock (&mut); +                gf_dirent_free (&q); +                gf_dirent_free (&entries); +        } + +        if (mut_init) +                pthread_mutex_destroy (&mut); +        if (cond_init) +                pthread_cond_destroy (&cond); +        return ret|retval; +} +  int  syncop_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data,                   int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent,  | 
