diff options
| -rw-r--r-- | libglusterfs/src/glusterfs.h | 1 | ||||
| -rw-r--r-- | libglusterfs/src/mem-types.h | 2 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-common.c | 130 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-common.h | 47 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-helper.c | 23 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-mem-types.h | 3 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-rebalance.c | 1157 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-shared.c | 26 | 
8 files changed, 1137 insertions, 252 deletions
diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index 2923aeb4aa4..f6c10e99ef9 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -83,6 +83,7 @@  #define GF_XATTR_CLRLK_CMD      "glusterfs.clrlk"  #define GF_XATTR_PATHINFO_KEY   "trusted.glusterfs.pathinfo"  #define GF_XATTR_NODE_UUID_KEY  "trusted.glusterfs.node-uuid" +#define GF_REBAL_FIND_LOCAL_SUBVOL "glusterfs.find-local-subvol"  #define GF_XATTR_VOL_ID_KEY   "trusted.glusterfs.volume-id"  #define GF_XATTR_LOCKINFO_KEY   "trusted.glusterfs.lockinfo"  #define GF_XATTR_GET_REAL_FILENAME_KEY "glusterfs.get_real_filename:" diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h index fc06d52239b..a77998e8a63 100644 --- a/libglusterfs/src/mem-types.h +++ b/libglusterfs/src/mem-types.h @@ -150,6 +150,8 @@ enum gf_common_mem_types_ {          gf_common_mt_nfs_exports          = 131,          gf_common_mt_gf_brick_spec_t      = 132,          gf_common_mt_gf_timer_entry_t     = 133, +        gf_common_mt_int                  = 134, +        gf_common_mt_pointer              = 135,          gf_common_mt_end  };  #endif diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c index 2cfd862de65..2122839c020 100644 --- a/xlators/cluster/dht/src/dht-common.c +++ b/xlators/cluster/dht/src/dht-common.c @@ -2534,6 +2534,87 @@ dht_vgetxattr_fill_and_set (dht_local_t *local, dict_t **dict, xlator_t *this,   out:          return ret;  } +int +dht_find_local_subvol_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                           int op_ret, int op_errno, dict_t *xattr, +                           dict_t *xdata) +{ +        dht_local_t  *local         = NULL; +        dht_conf_t   *conf          = NULL; +        call_frame_t *prev          = NULL; +        int           this_call_cnt = 0; +        int           ret           = 0; +        char         *uuid_str      = NULL; +        uuid_t        node_uuid     = {0,}; + + +        VALIDATE_OR_GOTO (frame, out); +        VALIDATE_OR_GOTO (frame->local, out); + +        local = frame->local; +        prev = cookie; +        conf = this->private; + +        LOCK (&frame->lock); +        { +                this_call_cnt = --local->call_cnt; +                if (op_ret < 0) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "getxattr err (%s) for dir", +                                strerror (op_errno)); +                        local->op_ret = -1; +                        local->op_errno = op_errno; +                        goto unlock; +                } + +                ret = dict_get_str (xattr, local->xsel, &uuid_str); + +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_ERROR, "Failed to " +                                "get %s", local->xsel); +                        local->op_ret = -1; +                        local->op_errno = EINVAL; +                        goto unlock; +                } + +                if (gf_uuid_parse (uuid_str, node_uuid)) { +                        gf_log (this->name, GF_LOG_ERROR, "Failed to parse uuid" +                                " failed for %s", prev->this->name); +                        local->op_ret = -1; +                        local->op_errno = EINVAL; +                        goto unlock; +                } + +                if (gf_uuid_compare (node_uuid, conf->defrag->node_uuid)) { +                        gf_log (this->name, GF_LOG_DEBUG, "subvol %s does not" +                                "belong to this node", prev->this->name); +                } else { +                        conf->local_subvols[(conf->local_subvols_cnt)++] +                                                           = prev->this; +                        gf_log (this->name, GF_LOG_DEBUG, "subvol %s belongs to" +                                " this node", prev->this->name); +                } +        } + +        local->op_ret = 0; + unlock: +        UNLOCK (&frame->lock); + +        if (!is_last_call (this_call_cnt)) +                goto out; + +        if (local->op_ret == -1) { +                goto unwind; +        } + +        DHT_STACK_UNWIND (getxattr, frame, 0, 0, NULL, NULL); +        goto out; + + unwind: +        DHT_STACK_UNWIND (getxattr, frame, -1, local->op_errno, NULL, NULL); + out: +        return 0; +}  int  dht_vgetxattr_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, @@ -2899,7 +2980,8 @@ dht_getxattr (call_frame_t *frame, xlator_t *this,          int           op_errno      = -1;          int           i             = 0;          int           cnt           = 0; - +        char         *node_uuid_key = NULL; +        int           ret           = -1;          VALIDATE_OR_GOTO (frame, err);          VALIDATE_OR_GOTO (this, err);          VALIDATE_OR_GOTO (loc, err); @@ -2940,6 +3022,28 @@ dht_getxattr (call_frame_t *frame, xlator_t *this,  		return 0;  	} +        if (key && DHT_IS_DIR(layout) && +           (!strcmp (key, GF_REBAL_FIND_LOCAL_SUBVOL))) { +                ret = gf_asprintf +                           (&node_uuid_key, "%s", GF_XATTR_NODE_UUID_KEY); +                if (ret == -1 || !node_uuid_key) { +                        gf_log (this->name, GF_LOG_ERROR, "Failed to copy key"); +                        op_errno = ENOMEM; +                        goto err; +                } +                (void) strncpy (local->xsel, node_uuid_key, 256); +                cnt = local->call_cnt = conf->subvolume_cnt; +                for (i = 0; i < cnt; i++) { +                        STACK_WIND (frame, dht_find_local_subvol_cbk, +                                    conf->subvolumes[i], +                                    conf->subvolumes[i]->fops->getxattr, +                                    loc, node_uuid_key, xdata); +                } +                if (node_uuid_key) +                        GF_FREE (node_uuid_key); +                return 0; +        } +          /* for file use cached subvolume (obviously!): see if {}           * below           * for directory: @@ -2949,6 +3053,7 @@ dht_getxattr (call_frame_t *frame, xlator_t *this,           * NOTE: Don't trust inode here, as that may not be valid           *       (until inode_link() happens)           */ +          if (key && DHT_IS_DIR(layout) &&              (XATTR_IS_PATHINFO (key)               || (strcmp (key, GF_XATTR_NODE_UUID_KEY) == 0))) { @@ -3838,13 +3943,24 @@ dht_opendir (call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd,                  goto err;          } -        local->call_cnt = conf->subvolume_cnt; +        if (!(conf->local_subvols_cnt) || !conf->defrag) { +                local->call_cnt = conf->subvolume_cnt; -        for (i = 0; i < conf->subvolume_cnt; i++) { -                STACK_WIND (frame, dht_fd_cbk, -                            conf->subvolumes[i], -                            conf->subvolumes[i]->fops->opendir, -                            loc, fd, xdata); +                for (i = 0; i < conf->subvolume_cnt; i++) { +                        STACK_WIND (frame, dht_fd_cbk, +                                    conf->subvolumes[i], +                                    conf->subvolumes[i]->fops->opendir, +                                    loc, fd, xdata); + +                } +        } else { +                local->call_cnt = conf->local_subvols_cnt; +                for (i = 0; i < conf->local_subvols_cnt; i++) { +                        STACK_WIND (frame, dht_fd_cbk, +                                    conf->local_subvols[i], +                                    conf->local_subvols[i]->fops->opendir, +                                    loc, fd, xdata); +                }          }          return 0; diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h index 3ca626feec8..0e290465d44 100644 --- a/xlators/cluster/dht/src/dht-common.h +++ b/xlators/cluster/dht/src/dht-common.h @@ -290,6 +290,20 @@ struct gf_defrag_pattern_list {          gf_defrag_pattern_list_t  *next;  }; +struct dht_container { +        union { +                struct list_head             list; +                struct { +                        struct _gf_dirent_t *next; +                        struct _gf_dirent_t *prev; +                }; +        }; +        gf_dirent_t     *df_entry; +        xlator_t        *this; +        loc_t           *parent_loc; +        dict_t          *migrate_data; +}; +  struct gf_defrag_info_ {          uint64_t                     total_files;          uint64_t                     total_data; @@ -317,6 +331,19 @@ struct gf_defrag_info_ {          uint64_t                     total_files_demoted;          int                          write_freq_threshold;          int                          read_freq_threshold; + +        pthread_cond_t               parallel_migration_cond; +        pthread_mutex_t              dfq_mutex; +        pthread_cond_t               rebalance_crawler_alarm; +        int32_t                      q_entry_count; +        int32_t                      global_error; +        struct  dht_container       *queue; +        int32_t                      crawl_done; +        int32_t                      abort; +        int32_t                      wakeup_crawler; + +        /* Hard link handle requirement */ +        synclock_t                   link_lock;  };  typedef struct gf_defrag_info_ gf_defrag_info_t; @@ -394,9 +421,19 @@ struct dht_conf {          dht_methods_t  *methods;          struct mem_pool *lock_pool; + +        /*local subvol storage for rebalance*/ +        xlator_t       **local_subvols; +        int32_t          local_subvols_cnt;  };  typedef struct dht_conf dht_conf_t; +struct dht_dfoffset_ctx { +        xlator_t       *this; +        off_t           offset; +        int32_t         readdir_done; +}; +typedef struct dht_dfoffset_ctx dht_dfoffset_ctx_t;  struct dht_disk_layout {          uint32_t           cnt; @@ -420,6 +457,14 @@ typedef enum {          GF_DHT_WEIGHTED_DISTRIBUTION  } dht_distribution_type_t; +struct dir_dfmeta { +        gf_dirent_t             *equeue; +        dht_dfoffset_ctx_t      *offset_var; +        struct list_head        **head; +        struct list_head        **iterator; +        int                     *fetch_entries; +}; +  #define ENTRY_MISSING(op_ret, op_errno) (op_ret == -1 && op_errno == ENOENT)  #define is_revalidate(loc) (dht_inode_ctx_layout_get (loc->inode, this, NULL) == 0) @@ -605,6 +650,8 @@ int dht_start_rebalance_task (xlator_t *this, call_frame_t *frame);  int dht_rebalance_in_progress_check (xlator_t *this, call_frame_t *frame);  int dht_rebalance_complete_check (xlator_t *this, call_frame_t *frame); +int +dht_init_local_subvolumes (xlator_t *this, dht_conf_t *conf);  /* FOPS */  int32_t dht_lookup (call_frame_t *frame, diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c index cab66017b84..b5114b620ce 100644 --- a/xlators/cluster/dht/src/dht-helper.c +++ b/xlators/cluster/dht/src/dht-helper.c @@ -731,7 +731,28 @@ err:          return -1;  } +int +dht_init_local_subvolumes (xlator_t *this, dht_conf_t *conf) +{ +        xlator_list_t *subvols = NULL; +        int            cnt = 0; +        if (!conf) +                return -1; + +        for (subvols = this->children; subvols; subvols = subvols->next) +                cnt++; + +        conf->local_subvols = GF_CALLOC (cnt, sizeof (xlator_t *), +                                        gf_dht_mt_xlator_t); +        if (!conf->local_subvols) { +                return -1; +        } + +        conf->local_subvols_cnt = 0; + +        return 0; +}  int  dht_init_subvolumes (xlator_t *this, dht_conf_t *conf) @@ -752,6 +773,8 @@ dht_init_subvolumes (xlator_t *this, dht_conf_t *conf)          }          conf->subvolume_cnt = cnt; +        conf->local_subvols_cnt = 0; +          dht_set_subvol_range(this);          cnt = 0; diff --git a/xlators/cluster/dht/src/dht-mem-types.h b/xlators/cluster/dht/src/dht-mem-types.h index e893eb48fd8..46028e6d9e0 100644 --- a/xlators/cluster/dht/src/dht-mem-types.h +++ b/xlators/cluster/dht/src/dht-mem-types.h @@ -30,6 +30,9 @@ enum gf_dht_mem_types_ {          gf_defrag_info_mt,          gf_dht_mt_inode_ctx_t,          gf_dht_mt_ctx_stat_time_t, +        gf_dht_mt_dirent_t, +        gf_dht_mt_container_t, +        gf_dht_mt_octx_t,          gf_dht_mt_end  };  #endif diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c index b0e21da8cb5..5e9f4d6e1a6 100644 --- a/xlators/cluster/dht/src/dht-rebalance.c +++ b/xlators/cluster/dht/src/dht-rebalance.c @@ -23,6 +23,36 @@  #define GF_DISK_SECTOR_SIZE             512  #define DHT_REBALANCE_PID               4242 /* Change it if required */  #define DHT_REBALANCE_BLKSIZE           (128 * 1024) +#define MAX_MIGRATOR_THREAD_COUNT       20 +#define MAX_MIGRATE_QUEUE_COUNT         500 +#define MIN_MIGRATE_QUEUE_COUNT         200 + +#define GF_CRAWL_INDEX_MOVE(idx, sv_cnt)  {     \ +                idx++;                          \ +                idx %= sv_cnt;                  \ +        } + +#define GF_FREE_DIR_DFMETA(dir_dfmeta) {                        \ +                if (dir_dfmeta) {                               \ +                        GF_FREE (dir_dfmeta->head);             \ +                        GF_FREE (dir_dfmeta->equeue);           \ +                        GF_FREE (dir_dfmeta->iterator);         \ +                        GF_FREE (dir_dfmeta->offset_var);       \ +                        GF_FREE (dir_dfmeta->fetch_entries);    \ +                        GF_FREE (dir_dfmeta);                   \ +                }                                               \ +        }                                                       \ + +void +dht_set_global_defrag_error (gf_defrag_info_t *defrag, int ret) +{ +        LOCK (&defrag->lock); +        { +                defrag->global_error = ret; +        } +        UNLOCK (&defrag->lock); +        return; +}  static int  dht_write_with_holes (xlator_t *to, fd_t *fd, struct iovec *vec, int count, @@ -178,6 +208,47 @@ gf_defrag_handle_hardlink (xlator_t *this, loc_t *loc, dict_t  *xattrs,                  goto out;          } +        /* +          Parallel migration can lead to migration of the hard link multiple +          times which can lead to data loss. Hence, adding a fresh lookup to +          decide whether migration is required or not. + +          Elaborating the scenario for let say 10 hardlinks [link{1..10}]: +              Let say the first hard link "link1"  does the setxattr of the +          new hashed subvolume info on the cached file. As there are multiple +          threads working, we might have already all the links created on the +          new hashed by the time we reach hardlink let say link5. Now the +          number of links on hashed is equal to that of cached. Hence, file +          migration will happen for link6. + +                 Cached                                 Hashed +          --------T link6                        rwxrwxrwx   link6 + +          Now post above state all the link file on the cached will be zero +          byte linkto files. Hence, if we still do migration for the following +          files link{7..10}, we will end up migrating 0 data leading to data +          loss. +                Hence, a lookup can make sure whether we need to migrate the +          file or not. +        */ + +        ret = syncop_lookup (this, loc, NULL, NULL, +                                             NULL, NULL); +        if (ret) { +                /*Ignore ENOENT and ESTALE as file might have been +                  migrated already*/ +                if (-ret == ENOENT || -ret == ESTALE) { +                        ret = -2; +                        goto out; +                } +                gf_msg (this->name, GF_LOG_ERROR, 0, +                        DHT_MSG_MIGRATE_FILE_FAILED, +                        "Migrate file failed:%s lookup failed with ret = %d", +                        loc->path, ret); +                ret = -1; +                goto out; +        } +          cached_subvol = dht_subvol_get_cached (this, loc->inode);          if (!cached_subvol) {                  gf_msg (this->name, GF_LOG_ERROR, 0, @@ -198,6 +269,11 @@ gf_defrag_handle_hardlink (xlator_t *this, loc_t *loc, dict_t  *xattrs,                  goto out;          } +        if (hashed_subvol == cached_subvol) { +                ret = -2; +                goto out; +        } +          gf_log (this->name, GF_LOG_INFO, "Attempting to migrate hardlink %s "                  "with gfid %s from %s -> %s", loc->name, uuid_utoa (loc->gfid),                  cached_subvol->name, hashed_subvol->name); @@ -288,7 +364,8 @@ out:  */  static inline int  __is_file_migratable (xlator_t *this, loc_t *loc, -                      struct iatt *stbuf, dict_t *xattrs, int flags) +                      struct iatt *stbuf, dict_t *xattrs, int flags, +                                gf_defrag_info_t *defrag)  {          int ret = -1; @@ -308,13 +385,14 @@ __is_file_migratable (xlator_t *this, loc_t *loc,          if (stbuf->ia_nlink > 1) {                  /* support for decomission */                  if (flags == GF_DHT_MIGRATE_HARDLINK) { -                        ret = gf_defrag_handle_hardlink (this, loc, -                                                         xattrs, stbuf); - -                   /* -                   Returning zero will force the file to be remigrated. -                   Checkout gf_defrag_handle_hardlink for more information. -                   */ +                        synclock_lock (&defrag->link_lock); +                        ret = gf_defrag_handle_hardlink +                                (this, loc, xattrs, stbuf); +                        synclock_unlock (&defrag->link_lock); +                        /* +                        Returning zero will force the file to be remigrated. +                        Checkout gf_defrag_handle_hardlink for more information. +                        */                          if (ret && ret != -2) {                                  gf_msg (this->name, GF_LOG_WARNING, 0,                                          DHT_MSG_MIGRATE_FILE_FAILED, @@ -610,6 +688,7 @@ __dht_rebalance_migrate_data (xlator_t *from, xlator_t *to, fd_t *src, fd_t *dst          while (total < ia_size) {                  read_size = (((ia_size - total) > DHT_REBALANCE_BLKSIZE) ?                               DHT_REBALANCE_BLKSIZE : (ia_size - total)); +                  ret = syncop_readv (from, src, read_size,                                      offset, 0, &vector, &count, &iobref, NULL,                                      NULL); @@ -904,6 +983,11 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,          loc_t           tmp_loc              = {0, };          gf_boolean_t    locked               = _gf_false;          int             lk_ret               = -1; +        gf_defrag_info_t *defrag              =  NULL; + +        defrag = conf->defrag; +        if (!defrag) +                goto out;          gf_log (this->name, GF_LOG_INFO, "%s: attempting to move from %s to %s",                  loc->path, from->name, to->name); @@ -960,7 +1044,7 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,          src_ia_prot = stbuf.ia_prot;          /* Check if file can be migrated */ -        ret = __is_file_migratable (this, loc, &stbuf, xattr_rsp, flag); +        ret = __is_file_migratable (this, loc, &stbuf, xattr_rsp, flag, defrag);          if (ret) {                  if (ret == -2)                          ret = 0; @@ -1295,6 +1379,7 @@ dht_start_rebalance_task (xlator_t *this, call_frame_t *frame)  {          int         ret     = -1; +        frame->root->pid = GF_CLIENT_PID_DEFRAG;          ret = synctask_new (this->ctx->env, rebalance_task,                              rebalance_task_completion,                              frame, frame); @@ -1406,61 +1491,341 @@ gf_defrag_pattern_match (gf_defrag_info_t *defrag, char *name, uint64_t size)          return ret;  } -/* We do a depth first traversal of directories. But before we move into - * subdirs, we complete the data migration of those directories whose layouts - * have been fixed - */ +int dht_dfreaddirp_done (dht_dfoffset_ctx_t *offset_var, int cnt) { + +        int i; +        int result = 1; + +        for (i = 0; i < cnt; i++) { +                if (offset_var[i].readdir_done == 0) { +                        result = 0; +                        break; +                } +        } +        return result; +} + +int static +gf_defrag_ctx_subvols_init (dht_dfoffset_ctx_t *offset_var, xlator_t *this) { + +        int i; +        dht_conf_t *conf = NULL; + +        conf = this->private; + +        if (!conf) +               return -1; + +        for (i = 0; i < conf->local_subvols_cnt; i++) { +               offset_var[i].this = conf->local_subvols[i]; +               offset_var[i].offset = (off_t) 0; +               offset_var[i].readdir_done = 0; +        } + +        return 0; +}  int -gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, -                        dict_t *migrate_data) +gf_defrag_migrate_single_file (void *opaque)  { -        int                      ret            = -1; -        loc_t                    entry_loc      = {0,}; -        fd_t                    *fd             = NULL; -        gf_dirent_t              entries; -        gf_dirent_t             *tmp            = NULL; +        xlator_t                *this = NULL; +        dht_conf_t              *conf = NULL; +        gf_defrag_info_t        *defrag = NULL; +        int                     ret = 0;          gf_dirent_t             *entry          = NULL; -        gf_boolean_t             free_entries   = _gf_false; -        off_t                    offset         = 0; -        dict_t                  *dict           = NULL; +        struct timeval           start          = {0,}; +        loc_t                    entry_loc      = {0,}; +        loc_t                   *loc            = NULL;          struct iatt              iatt           = {0,}; +        dict_t                  *migrate_data   = NULL;          int32_t                  op_errno       = 0; -        char                    *uuid_str       = NULL; -        uuid_t                   node_uuid      = {0,}; -        struct timeval           dir_start      = {0,};          struct timeval           end            = {0,};          double                   elapsed        = {0,}; -        struct timeval           start          = {0,}; -        int                      loglevel       = GF_LOG_TRACE; +        struct dht_container    *rebal_entry    = NULL; -        gf_log (this->name, GF_LOG_INFO, "migrate data called on %s", -                loc->path); -        gettimeofday (&dir_start, NULL); +        rebal_entry = (struct dht_container *)opaque; +        if (!rebal_entry) { +                gf_log (this->name, GF_LOG_ERROR, "rebal_entry is NULL"); +                ret = -1; +                goto out; +        } -        fd = fd_create (loc->inode, defrag->pid); -        if (!fd) { -                gf_log (this->name, GF_LOG_ERROR, "Failed to create fd"); +        this = rebal_entry->this; + +        conf = this->private; + +        defrag = conf->defrag; + +        loc = rebal_entry->parent_loc; + +        migrate_data = rebal_entry->migrate_data; + +        entry = rebal_entry->df_entry; + +        if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { +                ret = -1;                  goto out;          } -        ret = syncop_opendir (this, loc, fd, NULL, NULL); +        if (defrag->stats == _gf_true) { +                gettimeofday (&start, NULL); +        } + +        if (defrag->defrag_pattern && +            (gf_defrag_pattern_match (defrag, entry->d_name, +                                      entry->d_stat.ia_size) == _gf_false)) { +                gf_log (this->name, GF_LOG_ERROR, "pattern_match failed"); +                goto out; +        } + +        memset (&entry_loc, 0, sizeof (entry_loc)); + +        ret = dht_build_child_loc (this, &entry_loc, loc, entry->d_name); +        if (ret) { +                LOCK (&defrag->lock); +                { +                        defrag->total_failures += 1; +                } +                UNLOCK (&defrag->lock); + +                ret = 0; + +                gf_log (this->name, GF_LOG_ERROR, "Child loc build failed"); + +                goto out; +        } + +        gf_uuid_copy (entry_loc.gfid, entry->d_stat.ia_gfid); + +        gf_uuid_copy (entry_loc.pargfid, loc->gfid); + +        entry_loc.inode->ia_type = entry->d_stat.ia_type; + +        ret = syncop_lookup (this, &entry_loc, &iatt, NULL, NULL, NULL);          if (ret) {                  gf_msg (this->name, GF_LOG_ERROR, 0, -                        DHT_MSG_MIGRATE_DATA_FAILED, -                        "Migrate data failed: Failed to open dir %s", -                        loc->path); -                ret = -1; +                        DHT_MSG_MIGRATE_FILE_FAILED, +                        "Migrate file failed: %s lookup failed", +                        entry_loc.name); +                ret = 0;                  goto out;          } -        INIT_LIST_HEAD (&entries.list); +        ret = syncop_setxattr (this, &entry_loc, migrate_data, 0, NULL, NULL); +        if (ret < 0) { +                op_errno = -ret; +                /* errno is overloaded. See +                 * rebalance_task_completion () */ +                if (op_errno == ENOSPC) { +                        gf_msg_debug (this->name, 0, "migrate-data skipped for" +                                      " %s due to space constraints", +                                      entry_loc.path); +                        LOCK (&defrag->lock); +                        { +                                defrag->skipped += 1; +                        } +                        UNLOCK (&defrag->lock); +                } else if (op_errno != EEXIST) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, +                                DHT_MSG_MIGRATE_FILE_FAILED, +                                "migrate-data failed for %s", entry_loc.path); -        while ((ret = syncop_readdirp (this, fd, 131072, offset, &entries, -                                       NULL, NULL)) != 0) { +                        LOCK (&defrag->lock); +                        { +                                defrag->total_failures += 1; +                        } +                        UNLOCK (&defrag->lock); -                if (ret < 0) { +                } + +                ret = gf_defrag_handle_migrate_error (op_errno, defrag); + +                if (!ret) { +                        gf_msg(this->name, GF_LOG_ERROR, 0, +                               DHT_MSG_MIGRATE_FILE_FAILED, +                               "migrate-data on %s failed: %s", entry_loc.path, +                               strerror (op_errno)); +                } else if (ret == 1) { +                        ret = 0; +                        goto out; +                } else if (ret == -1) { +                        goto out; +                } +        } else if (ret > 0) { +                gf_msg (this->name, GF_LOG_ERROR, 0, +                        DHT_MSG_MIGRATE_FILE_FAILED, +                        "migrate-data failed for %s", entry_loc.path); +                ret = 0; +                LOCK (&defrag->lock); +                { +                        defrag->total_failures += 1; +                } +                UNLOCK (&defrag->lock); +        } + +        LOCK (&defrag->lock); +        { +                defrag->total_files += 1; +                defrag->total_data += iatt.ia_size; +        } +        UNLOCK (&defrag->lock); + +        if (defrag->stats == _gf_true) { +                gettimeofday (&end, NULL); +                elapsed = (end.tv_sec - start.tv_sec) * 1e6 + +                          (end.tv_usec - start.tv_usec); +                gf_log (this->name, GF_LOG_INFO, "Migration of " +                        "file:%s size:%"PRIu64" bytes took %.2f" +                        "secs and ret: %d", entry_loc.name, +                        iatt.ia_size, elapsed/1e6, ret); +        } + +out: +        loc_wipe (&entry_loc); + +        return ret; + +} + +void * +gf_defrag_task (void *opaque) +{ +        struct list_head        *q_head         = NULL; +        struct dht_container    *iterator       = NULL; +        gf_defrag_info_t        *defrag         = NULL; +        int                      ret            = 0; +        gf_boolean_t             true           = _gf_true; + + +        defrag = (gf_defrag_info_t *)opaque; +        if (!defrag) { +                gf_msg ("dht", GF_LOG_ERROR, 0, 0, "defrag is NULL"); +                goto out; +        } + +        q_head = &(defrag->queue[0].list); + +       /* The following while loop will dequeue one entry from the defrag->queue +          under lock. We will update the defrag->global_error only when there +          is an error which is critical to stop the rebalance process. The stop +          message will be intimated to other migrator threads by setting the +          defrag->defrag_status to GF_DEFRAG_STATUS_FAILED. + +          In defrag->queue, a low watermark (MIN_MIGRATE_QUEUE_COUNT) is +          maintained so that crawler does not starve the file migration +          workers and a high watermark (MAX_MIGRATE_QUEUE_COUNT) so that +          crawler does not go far ahead in filling up the queue. +        */ + +        while (true) { + +                if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { +                        goto out; +                } + +                pthread_mutex_lock (&defrag->dfq_mutex); +                { +                        if (defrag->q_entry_count) { +                                iterator = list_entry (q_head->next, +                                                typeof(*iterator), list); + +                                gf_log ("DHT", GF_LOG_DEBUG, "picking entry " +                                        "%s", iterator->df_entry->d_name); + +                                list_del_init (&(iterator->list)); + +                                defrag->q_entry_count--; + +                                if ((defrag->q_entry_count < +                                        MIN_MIGRATE_QUEUE_COUNT) && +                                        defrag->wakeup_crawler) { +                                        pthread_cond_broadcast ( +                                              &defrag->rebalance_crawler_alarm); +                                } +                                pthread_mutex_unlock (&defrag->dfq_mutex); +                                ret = gf_defrag_migrate_single_file +                                                        ((void *)iterator); + +                                /*Critical errors: ENOTCONN and ENOSPACE*/ +                                if (ret) { +                                        dht_set_global_defrag_error +                                                         (defrag, ret); + +                                        defrag->defrag_status = +                                                       GF_DEFRAG_STATUS_FAILED; +                                        goto out; +                                } + +                                gf_dirent_free (iterator->df_entry); +                                GF_FREE (iterator); +                                continue; +                        } else { + +                        /* defrag->crawl_done flag is set means crawling +                         file system is done and hence a list_empty when +                         the above flag is set indicates there are no more +                         entries to be added to the queue and rebalance is +                         finished */ + +                                if (!defrag->crawl_done) { +                                        pthread_cond_wait ( +                                           &defrag->parallel_migration_cond, +                                           &defrag->dfq_mutex); +                                } + +                                if (defrag->crawl_done && +                                                 !defrag->q_entry_count) { +                                        pthread_cond_broadcast ( +                                             &defrag->parallel_migration_cond); +                                        goto unlock; +                                } else { +                                        pthread_mutex_unlock +                                                 (&defrag->dfq_mutex); +                                        continue; +                                } +                        } + +                } +unlock: +                pthread_mutex_unlock (&defrag->dfq_mutex); +                break; +        } +out: +        return NULL; +} + +int static +gf_defrag_get_entry (xlator_t *this, int i, struct dht_container **container, +                     loc_t *loc, dht_conf_t *conf, gf_defrag_info_t *defrag, +                     fd_t *fd, dict_t *migrate_data, +                     struct dir_dfmeta *dir_dfmeta, dict_t *xattr_req) +{ +        int                     ret             = -1; +        char                    is_linkfile     = 0; +        gf_dirent_t            *df_entry        = NULL; +        loc_t                   entry_loc       = {0,}; +        dict_t                 *xattr_rsp       = NULL; +        struct iatt             iatt            = {0,}; +        struct dht_container   *tmp_container   = NULL; +        xlator_t               *hashed_subvol   = NULL; +        xlator_t               *cached_subvol   = NULL; + +        if (dir_dfmeta->offset_var[i].readdir_done == 1) { +                ret = 0; +                goto out; +        } +        if (dir_dfmeta->fetch_entries[i] == 1) { +                ret = syncop_readdirp (conf->local_subvols[i], fd, 131072, +                                       dir_dfmeta->offset_var[i].offset, +                                       &(dir_dfmeta->equeue[i]), +                                       NULL, NULL); +                if (ret == 0) { +                        dir_dfmeta->offset_var[i].readdir_done = 1; +                        ret = 0; +                        goto out; +                } +                if (ret < 0) {                          gf_msg (this->name, GF_LOG_ERROR, 0,                                  DHT_MSG_MIGRATE_DATA_FAILED,                                  "%s: Migrate data failed: Readdir returned" @@ -1470,213 +1835,429 @@ gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,                          goto out;                  } -                if (list_empty (&entries.list)) -                        break; +                if (list_empty (&(dir_dfmeta->equeue[i].list))) { +                        dir_dfmeta->offset_var[i].readdir_done = 1; +                        ret = 0; +                        goto out; +                } -                free_entries = _gf_true; +                dir_dfmeta->fetch_entries[i] = 0; +        } -                list_for_each_entry_safe (entry, tmp, &entries.list, list) { +        while (1) { -                        if (dict) { -                                dict_unref (dict); -                                dict = NULL; -                        } +                if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { +                        ret = -1; +                        goto out; +                } -                        if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { -                                ret = 1; -                                goto out; -                        } +                df_entry = list_entry (dir_dfmeta->iterator[i]->next, +                                       typeof (*df_entry), list); -                        offset = entry->d_off; +                if (&df_entry->list == dir_dfmeta->head[i]) { +                        gf_dirent_free (&(dir_dfmeta->equeue[i])); +                        INIT_LIST_HEAD (&(dir_dfmeta->equeue[i].list)); +                        dir_dfmeta->fetch_entries[i] = 1; +                        dir_dfmeta->iterator[i] = dir_dfmeta->head[i]; +                        ret = 0; +                        goto out; +                } -                        if (!strcmp (entry->d_name, ".") || -                            !strcmp (entry->d_name, "..")) -                                continue; +                dir_dfmeta->iterator[i] = dir_dfmeta->iterator[i]->next; -                        if (IA_ISDIR (entry->d_stat.ia_type)) -                                continue; +                dir_dfmeta->offset_var[i].offset = df_entry->d_off; +                if (!strcmp (df_entry->d_name, ".") || +                    !strcmp (df_entry->d_name, "..")) +                        continue; -                        defrag->num_files_lookedup++; -                        if (defrag->stats == _gf_true) { -                                gettimeofday (&start, NULL); -                        } +                if (IA_ISDIR (df_entry->d_stat.ia_type)) +                        continue; -                        if (defrag->defrag_pattern && -                            (gf_defrag_pattern_match (defrag, entry->d_name, -                                                      entry->d_stat.ia_size) -                             == _gf_false)) { -                                continue; -                        } +                defrag->num_files_lookedup++; -                        loc_wipe (&entry_loc); -                        ret =dht_build_child_loc (this, &entry_loc, loc, -                                                  entry->d_name); -                        if (ret) { -                                gf_log (this->name, GF_LOG_ERROR, "Child loc" -                                        " build failed"); -                                goto out; -                        } +                if (defrag->defrag_pattern && +                    (gf_defrag_pattern_match (defrag, df_entry->d_name, +                                              df_entry->d_stat.ia_size) +                     == _gf_false)) { +                        continue; +                } -                        if (gf_uuid_is_null (entry->d_stat.ia_gfid)) { -                                gf_msg (this->name, GF_LOG_ERROR, 0, -                                        DHT_MSG_GFID_NULL, -                                        "%s/%s gfid not present", loc->path, -                                         entry->d_name); -                                continue; -                        } +                loc_wipe (&entry_loc); +                ret = dht_build_child_loc (this, &entry_loc, loc, +                                          df_entry->d_name); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, "Child loc" +                                " build failed"); +                        ret = -1; +                        goto out; +                } -                        gf_uuid_copy (entry_loc.gfid, entry->d_stat.ia_gfid); +                if (gf_uuid_is_null (df_entry->d_stat.ia_gfid)) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, +                                DHT_MSG_GFID_NULL, +                                "%s/%s gfid not present", loc->path, +                                df_entry->d_name); +                        continue; +                } -                        if (gf_uuid_is_null (loc->gfid)) { -                                gf_msg (this->name, GF_LOG_ERROR, 0, -                                        DHT_MSG_GFID_NULL, -                                        "%s/%s gfid not present", loc->path, -                                         entry->d_name); -                                continue; -                        } +                gf_uuid_copy (entry_loc.gfid, df_entry->d_stat.ia_gfid); -                        gf_uuid_copy (entry_loc.pargfid, loc->gfid); +                if (gf_uuid_is_null (loc->gfid)) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, +                                DHT_MSG_GFID_NULL, +                                "%s/%s gfid not present", loc->path, +                                df_entry->d_name); +                        continue; +                } -                        entry_loc.inode->ia_type = entry->d_stat.ia_type; +                gf_uuid_copy (entry_loc.pargfid, loc->gfid); -                        ret = syncop_lookup (this, &entry_loc, &iatt, NULL, -                                             NULL, NULL); -                        if (ret) { -                                gf_msg (this->name, GF_LOG_ERROR, 0, -                                        DHT_MSG_MIGRATE_FILE_FAILED, -                                        "Migrate file failed:%s lookup failed", -                                        entry_loc.path); -                                ret = -1; -                                continue; -                        } +                entry_loc.inode->ia_type = df_entry->d_stat.ia_type; +                ret = syncop_lookup (conf->local_subvols[i], &entry_loc, +                                        &iatt, NULL, xattr_req, &xattr_rsp); +                if (ret) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, +                                DHT_MSG_MIGRATE_FILE_FAILED, +                                "Migrate file failed:%s lookup failed", +                                entry_loc.path); +                        continue; +                } -                        ret = syncop_getxattr (this, &entry_loc, &dict, -                                               GF_XATTR_NODE_UUID_KEY, NULL, -                                               NULL); -                        if(ret < 0) { -                                gf_msg (this->name, GF_LOG_ERROR, 0, -                                        DHT_MSG_MIGRATE_FILE_FAILED, -                                        "Migrate file failed:" -                                        "Failed to get node-uuid for %s", -                                        entry_loc.path); -                                ret = -1; -                                continue; -                        } -                        ret = dict_get_str (dict, GF_XATTR_NODE_UUID_KEY, -                                            &uuid_str); -                        if(ret < 0) { -                                gf_log (this->name, GF_LOG_ERROR, "Failed to " -                                        "get node-uuid from dict for %s", -                                        entry_loc.path); -                                ret = -1; -                                continue; -                        } +                is_linkfile = check_is_linkfile (NULL, &iatt, xattr_rsp, +                                                conf->link_xattr_name); -                        if (gf_uuid_parse (uuid_str, node_uuid)) { -                                gf_log (this->name, GF_LOG_ERROR, "gf_uuid_parse " -                                        "failed for %s", entry_loc.path); -                                continue; -                        } +                if (is_linkfile) { +                        /* No need to add linkto file to the queue for +                           migration. Only the actual data file need to +                           be checked for migration criteria. +                        */ +                        gf_log (this->name, GF_LOG_INFO, "linkfile." +                                " Hence skip for file: %s", entry_loc.path); +                        continue; +                } -                        /* if file belongs to different node, skip migration -                         * the other node will take responsibility of migration -                         */ -                        if (gf_uuid_compare (node_uuid, defrag->node_uuid)) { -                                gf_msg_trace (this->name, 0, "%s does not" -                                              "belong to this node", -                                              entry_loc.path); -                                continue; -                        } -                        uuid_str = NULL; +                ret = syncop_lookup (this, &entry_loc, NULL, NULL, +                                     NULL, NULL); +                if (ret) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, +                                DHT_MSG_MIGRATE_FILE_FAILED, +                                "Migrate file failed:%s lookup failed", +                                entry_loc.path); +                        continue; +                } -                        /* if distribute is present, it will honor this key. -                         * -1, ENODATA is returned if distribute is not present -                         * or file doesn't have a link-file. If file has -                         * link-file, the path of link-file will be the value, -                         * and also that guarantees that file has to be mostly -                         * migrated */ +               /* if distribute is present, it will honor this key. +                * -1, ENODATA is returned if distribute is not present +                * or file doesn't have a link-file. If file has +                * link-file, the path of link-file will be the value, +                * and also that guarantees that file has to be mostly +                * migrated */ -                        ret = syncop_getxattr (this, &entry_loc, NULL, -                                               GF_XATTR_LINKINFO_KEY, NULL, -                                               NULL); -                        if (ret < 0) { -                                if (-ret != ENODATA) { -                                        loglevel = GF_LOG_ERROR; -                                        defrag->total_failures += 1; -                                } else { -                                        loglevel = GF_LOG_TRACE; -                                } -                                gf_log (this->name, loglevel, "%s: failed to " -                                        "get "GF_XATTR_LINKINFO_KEY" key - %s", -                                        entry_loc.path, strerror (-ret)); -                                ret = -1; -                                continue; +                hashed_subvol = dht_subvol_get_hashed (this, &entry_loc); +                if (!hashed_subvol) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, +                                DHT_MSG_HASHED_SUBVOL_GET_FAILED, +                                "Failed to get hashed subvol for %s", +                                loc->path); +                        continue; +                } + +                cached_subvol = dht_subvol_get_cached (this, entry_loc.inode); +                if (!cached_subvol) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, +                                DHT_MSG_CACHED_SUBVOL_GET_FAILED, +                                "Failed to get cached subvol for %s", +                                loc->path); + +                        continue; +                } + +                if (hashed_subvol == cached_subvol) { +                        continue; +                } + +               /*Build Container Structure */ + +                tmp_container =  GF_CALLOC (1, sizeof(struct dht_container), +                                            gf_dht_mt_container_t); +                if (!tmp_container) { +                        gf_log (this->name, GF_LOG_ERROR, "Failed to allocate " +                                "memory for container"); +                        ret = -1; +                        goto out; +                } +                tmp_container->df_entry = gf_dirent_for_name (df_entry->d_name); +                if (!tmp_container->df_entry) { +                        gf_log (this->name, GF_LOG_ERROR, "Failed to allocate " +                                "memory for df_entry"); +                        ret = -1; +                        goto out; +                } + +                tmp_container->df_entry->d_stat = df_entry->d_stat; + +                tmp_container->df_entry->d_ino  = df_entry->d_ino; + +                tmp_container->df_entry->d_type = df_entry->d_type; + +                tmp_container->df_entry->d_len  = df_entry->d_len; + +                tmp_container->parent_loc = GF_CALLOC(1, sizeof(*loc), +                                                      gf_dht_mt_loc_t); +                if (!tmp_container->parent_loc) { +                        gf_log (this->name, GF_LOG_ERROR, "Failed to allocate " +                                "memory for loc"); +                        ret = -1; +                        goto out; +                } + + +                ret = loc_copy (tmp_container->parent_loc, loc); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "loc_copy failed"); +                        ret = -1; +                        goto out; +                } + +                tmp_container->migrate_data = migrate_data; + +                tmp_container->this = this; + +                if (df_entry->dict) +                        tmp_container->df_entry->dict = +                                        dict_ref (df_entry->dict); + +               /*Build Container Structue >> END*/ + +                ret = 0; +                goto out; + +        } + +out: +        if (ret == 0) { +                *container = tmp_container; +        } else { +                if (tmp_container) { +                        GF_FREE (tmp_container->df_entry); +                        GF_FREE (tmp_container->parent_loc); +                        GF_FREE (tmp_container); +                } +        } + +        if (xattr_rsp) +                dict_unref (xattr_rsp); +        return ret; +} + +int +gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, +                       dict_t *migrate_data) +{ +        int                      ret               = -1; +        fd_t                    *fd                = NULL; +        dht_conf_t              *conf              = NULL; +        gf_dirent_t              entries; +        dict_t                  *dict              = NULL; +        dict_t                  *xattr_req         = NULL; +        struct timeval           dir_start         = {0,}; +        struct timeval           end               = {0,}; +        double                   elapsed           = {0,}; +        int                      local_subvols_cnt = 0; +        int                      i                 = 0; +        struct  dht_container   *container         = NULL; +        int                      ldfq_count        = 0; +        int                      dfc_index         = 0; +        struct dir_dfmeta       *dir_dfmeta        = NULL; + +        gf_log (this->name, GF_LOG_INFO, "migrate data called on %s", +                loc->path); +        gettimeofday (&dir_start, NULL); + +        conf = this->private; +        local_subvols_cnt = conf->local_subvols_cnt; + +        if (!local_subvols_cnt) { +                ret = 0; +                goto out; +        } + +        fd = fd_create (loc->inode, defrag->pid); +        if (!fd) { +                gf_log (this->name, GF_LOG_ERROR, "Failed to create fd"); +                ret = -1; +                goto out; +        } + +        ret = syncop_opendir (this, loc, fd, NULL, NULL); +        if (ret) { +                gf_msg (this->name, GF_LOG_ERROR, 0, +                        DHT_MSG_MIGRATE_DATA_FAILED, +                        "Migrate data failed: Failed to open dir %s", +                        loc->path); +                ret = -1; +                goto out; +        } + +        dir_dfmeta = GF_CALLOC (1, sizeof (*dir_dfmeta), +                                                gf_common_mt_pointer); +        if (!dir_dfmeta) { +                gf_log (this->name, GF_LOG_ERROR, "dir_dfmeta is NULL"); +                ret = -1; +                goto out; +        } + + +        dir_dfmeta->head = GF_CALLOC (local_subvols_cnt, +                                      sizeof (*(dir_dfmeta->head)), +                                      gf_common_mt_pointer); +        if (!dir_dfmeta->head) { +                gf_log (this->name, GF_LOG_ERROR, "dir_dfmeta->head is NULL"); +                ret = -1; +                goto out; +        } + +        dir_dfmeta->iterator = GF_CALLOC (local_subvols_cnt, +                                          sizeof (*(dir_dfmeta->iterator)), +                                          gf_common_mt_pointer); +        if (!dir_dfmeta->iterator) { +                gf_log (this->name, GF_LOG_ERROR, +                        "dir_dfmeta->iterator is NULL"); +                ret = -1; +                goto out; +        } + +        dir_dfmeta->equeue = GF_CALLOC (local_subvols_cnt, sizeof (entries), +                                        gf_dht_mt_dirent_t); +        if (!dir_dfmeta->equeue) { +                gf_log (this->name, GF_LOG_ERROR, "dir_dfmeta->equeue is NULL"); +                ret = -1; +                goto out; +        } + +        dir_dfmeta->offset_var = GF_CALLOC (local_subvols_cnt, +                                            sizeof (dht_dfoffset_ctx_t), +                                            gf_dht_mt_octx_t); +        if (!dir_dfmeta->offset_var) { +                gf_log (this->name, GF_LOG_ERROR, +                        "dir_dfmeta->offset_var is NULL"); +                ret = -1; +                goto out; +        } +        ret = gf_defrag_ctx_subvols_init (dir_dfmeta->offset_var, this); +        if (ret) { +                gf_log (this->name, GF_LOG_ERROR, "dht_dfoffset_ctx_t" +                        "initialization failed"); +                ret = -1; +                goto out; +        } + +        dir_dfmeta->fetch_entries = GF_CALLOC (local_subvols_cnt, +                                               sizeof (int), gf_common_mt_int); +        if (!dir_dfmeta->fetch_entries) { +                gf_log (this->name, GF_LOG_ERROR, +                        "dir_dfmeta->fetch_entries is NULL"); +                ret = -1; +                goto out; +        } + +        for (i = 0; i < local_subvols_cnt ; i++) { +                INIT_LIST_HEAD (&(dir_dfmeta->equeue[i].list)); +                dir_dfmeta->head[i]          = &(dir_dfmeta->equeue[i].list); +                dir_dfmeta->iterator[i]      = dir_dfmeta->head[i]; +                dir_dfmeta->fetch_entries[i] = 1; +        } + +        xattr_req = dict_new (); +        if (!xattr_req) { +               gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); +               ret = -1; +               goto out; +        } + +        ret = dict_set_uint32 (xattr_req, +                               conf->link_xattr_name, 256); +        if (ret) { +               gf_log (this->name, GF_LOG_ERROR, "failed to set dict for " +                       "key: %s", conf->link_xattr_name); +               ret = -1; +               goto out; +        } + +        /* +         Job: Read entries from each local subvol and store the entries +              in equeue array of linked list. Now pick one entry from the +              equeue array in a round robin basis and add them to defrag Queue. +        */ + +        while (!dht_dfreaddirp_done(dir_dfmeta->offset_var, +                                         local_subvols_cnt)) { + +                pthread_mutex_lock (&defrag->dfq_mutex); +                { +                        while (defrag->q_entry_count > +                                        MAX_MIGRATE_QUEUE_COUNT) { +                                defrag->wakeup_crawler = 1; +                                pthread_cond_wait ( +                                        &defrag->rebalance_crawler_alarm, +                                        &defrag->dfq_mutex);                          } -                        ret = syncop_setxattr (this, &entry_loc, migrate_data, -                                               0, NULL, NULL); -                        if (ret < 0) { -                                op_errno = -ret; -                                /* errno is overloaded. See -                                 * rebalance_task_completion () */ -                                if (op_errno == ENOSPC) { -                                        gf_msg_debug (this->name, 0, -                                                      "migrate-data skipped for" -                                                      " %s due to space " -                                                      "constraints", -                                                      entry_loc.path); -                                        defrag->skipped +=1; -                                } else{ -                                        gf_msg (this->name, GF_LOG_ERROR, 0, -                                                DHT_MSG_MIGRATE_FILE_FAILED, -                                                "migrate-data failed for %s", -                                                entry_loc.path); -                                        defrag->total_failures +=1; -                                } +                       ldfq_count = defrag->q_entry_count; -                                ret = gf_defrag_handle_migrate_error (op_errno, -                                                                      defrag); +                       if (defrag->wakeup_crawler) { +                               defrag->wakeup_crawler = 0; +                       } -                                if (!ret) -                                        gf_msg_debug (this->name, 0, -                                                      "migrate-data on %s " -                                                      "failed: %s", -                                                      entry_loc.path, -                                                      strerror (op_errno)); -                                else if (ret == 1) -                                        continue; -                                else if (ret == -1) -                                        goto out; -                        } else if (ret > 0) { -                                gf_msg (this->name, GF_LOG_ERROR, 0, -                                        DHT_MSG_MIGRATE_FILE_FAILED, -                                        "migrate-data failed for %s", -                                        entry_loc.path); -                                defrag->total_failures +=1; +                } +                pthread_mutex_unlock (&defrag->dfq_mutex); + +                while (ldfq_count <= MAX_MIGRATE_QUEUE_COUNT && +                       !dht_dfreaddirp_done(dir_dfmeta->offset_var, +                                               local_subvols_cnt)) { + +                        ret = gf_defrag_get_entry (this, dfc_index, &container, +                                                   loc, conf, defrag, fd, +                                                   migrate_data, dir_dfmeta, +                                                   xattr_req); +                        if (ret) { +                                gf_log ("DHT", GF_LOG_INFO, "Found critical " +                                        "error from gf_defrag_get_entry"); +                                ret = -1; +                                goto out;                          } -                        LOCK (&defrag->lock); -                        { -                                defrag->total_files += 1; -                                defrag->total_data += iatt.ia_size; +                        /* Check if we got an entry, else we need to move the +                           index to the next subvol */ +                        if (!container) { +                                GF_CRAWL_INDEX_MOVE(dfc_index, +                                                    local_subvols_cnt); +                                continue;                          } -                        UNLOCK (&defrag->lock); -                        if (defrag->stats == _gf_true) { -                                gettimeofday (&end, NULL); -                                elapsed = (end.tv_sec - start.tv_sec) * 1e6 + -                                          (end.tv_usec - start.tv_usec); -                                gf_log (this->name, GF_LOG_INFO, "Migration of " -                                        "file:%s size:%"PRIu64" bytes took %.2f" -                                        "secs", entry_loc.path, iatt.ia_size, -                                         elapsed/1e6); + +                        /* Q this entry in the dfq */ +                        pthread_mutex_lock (&defrag->dfq_mutex); +                        { +                                list_add_tail (&container->list, +                                        &(defrag->queue[0].list)); +                                defrag->q_entry_count++; +                                ldfq_count = defrag->q_entry_count; + +                                gf_log (this->name, GF_LOG_DEBUG, "added " +                                        "file:%s parent:%s to the queue ", +                                        container->df_entry->d_name, +                                        container->parent_loc->path); + +                                pthread_cond_signal ( +                                        &defrag->parallel_migration_cond);                          } -                } +                        pthread_mutex_unlock (&defrag->dfq_mutex); -                gf_dirent_free (&entries); -                free_entries = _gf_false; -                INIT_LIST_HEAD (&entries.list); +                        GF_CRAWL_INDEX_MOVE(dfc_index, local_subvols_cnt); +                }          }          gettimeofday (&end, NULL); @@ -1686,20 +2267,20 @@ gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,                  "%.2f secs", loc->path, elapsed/1e6);          ret = 0;  out: -        if (free_entries) -                gf_dirent_free (&entries); -        loc_wipe (&entry_loc); +        GF_FREE_DIR_DFMETA (dir_dfmeta);          if (dict)                  dict_unref(dict); +        if (xattr_req) +                dict_unref(xattr_req); +          if (fd)                  fd_unref (fd);          return ret;  } -  int  gf_defrag_fix_layout (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,                    dict_t *fix_layout, dict_t *migrate_data) @@ -1725,7 +2306,7 @@ gf_defrag_fix_layout (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,          if ((defrag->cmd != GF_DEFRAG_CMD_START_TIER) &&              (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX)) { -                ret = gf_defrag_migrate_data (this, defrag, loc, migrate_data); +                ret = gf_defrag_process_dir (this, defrag, loc, migrate_data);                  if (ret)                          goto out;          } @@ -1877,34 +2458,39 @@ out:  int  gf_defrag_start_crawl (void *data)  { -        xlator_t                *this   = NULL; -        dht_conf_t              *conf   = NULL; -        gf_defrag_info_t        *defrag = NULL; -        int                      ret    = -1; -        loc_t                    loc    = {0,}; -        struct iatt              iatt   = {0,}; -        struct iatt              parent = {0,}; -        dict_t                  *fix_layout = NULL; -        dict_t                  *migrate_data = NULL; -        dict_t                  *status = NULL; -        glusterfs_ctx_t         *ctx = NULL; -        dht_methods_t           *methods = NULL; +        xlator_t                *this           = NULL; +        dht_conf_t              *conf           = NULL; +        gf_defrag_info_t        *defrag         = NULL; +        int                      ret            = -1; +        loc_t                    loc            = {0,}; +        struct iatt              iatt           = {0,}; +        struct iatt              parent         = {0,}; +        dict_t                  *fix_layout     = NULL; +        dict_t                  *migrate_data   = NULL; +        dict_t                  *status         = NULL; +        dict_t                  *dict           = NULL; +        glusterfs_ctx_t         *ctx            = NULL; +        dht_methods_t           *methods        = NULL; +        int                      i              = 0; +        int                     thread_index    = 0; +        int                     err             = 0; +        pthread_t tid[MAX_MIGRATOR_THREAD_COUNT];          this = data;          if (!this) -                goto out; +                goto exit;          ctx = this->ctx;          if (!ctx) -                goto out; +                goto exit;          conf = this->private;          if (!conf) -                goto out; +                goto exit;          defrag = conf->defrag;          if (!defrag) -                goto out; +                goto exit;          gettimeofday (&defrag->start_time, NULL);          dht_build_root_inode (this, &defrag->root_inode); @@ -1938,7 +2524,7 @@ gf_defrag_start_crawl (void *data)                          "Failed to start rebalance:"                          "Failed to set dictionary value: key = %s",                          GF_XATTR_FIX_LAYOUT_KEY); - +                ret = -1;                  goto out;          } @@ -1970,6 +2556,53 @@ gf_defrag_start_crawl (void *data)                                              "non-force");                  if (ret)                          goto out; + +                /* Find local subvolumes */ +                ret = syncop_getxattr (this, &loc, &dict, +                                       GF_REBAL_FIND_LOCAL_SUBVOL, +                                       NULL, NULL); +                if (ret) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, 0, "local " +                                "subvolume determination failed with error: %d", +                                -ret); +                        ret = -1; +                        goto out; +                } + +                for (i = 0 ; i < conf->local_subvols_cnt; i++) { +                        gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvols " +                                "are %s", conf->local_subvols[i]->name); +                } + +                /* Initialize global entry queue */ +                defrag->queue = GF_CALLOC (1, sizeof (struct dht_container), +                                           gf_dht_mt_container_t); + +                if (!defrag->queue) { +                        gf_log (this->name, GF_LOG_INFO, "No memory for queue"); +                        ret = -1; +                        goto out; +                } + +                INIT_LIST_HEAD (&(defrag->queue[0].list)); + +                /*Spawn Threads Here*/ +                while (thread_index < MAX_MIGRATOR_THREAD_COUNT) { +                        err = pthread_create(&(tid[thread_index]), NULL, +                                     &gf_defrag_task, (void *)defrag); +                        if (err != 0) { +                                gf_log ("DHT", GF_LOG_ERROR, +                                        "Thread[%d] creation failed. " +                                        "Aborting Rebalance", +                                         thread_index); +                                ret = -1; +                                goto out; +                        } else { +                                gf_log ("DHT", GF_LOG_INFO, "Thread[%d] " +                                        "creation successful", thread_index); +                        } +                        thread_index++; +                }          }          ret = gf_defrag_fix_layout (this, defrag, &loc, fix_layout, @@ -1987,13 +2620,40 @@ gf_defrag_start_crawl (void *data)                  }                  methods->migration_other(this, defrag);          } +        gf_log ("DHT", GF_LOG_INFO, "crawling file-system completed"); +out: +        /* We are here means crawling the entire file system is done +           or something failed. Set defrag->crawl_done flag to intimate +           the migrator threads to exhaust the defrag->queue and terminate*/ + +        if (ret) { +                defrag->defrag_status = GF_DEFRAG_STATUS_FAILED; +        } + +        pthread_mutex_lock (&defrag->dfq_mutex); +        { +                defrag->crawl_done = 1; + +                pthread_cond_broadcast ( +                        &defrag->parallel_migration_cond); +        } +        pthread_mutex_unlock (&defrag->dfq_mutex); + +        /*Wait for all the threads to complete their task*/ +        for (i = 0; i < thread_index; i++) { +                pthread_join (tid[i], NULL); +        } + +        if (defrag->queue) { +                gf_dirent_free (defrag->queue[0].df_entry); +                INIT_LIST_HEAD (&(defrag->queue[0].list)); +        }          if ((defrag->defrag_status != GF_DEFRAG_STATUS_STOPPED) &&              (defrag->defrag_status != GF_DEFRAG_STATUS_FAILED)) {                  defrag->defrag_status = GF_DEFRAG_STATUS_COMPLETE;          } -out:          LOCK (&defrag->lock);          {                  status = dict_new (); @@ -2006,11 +2666,14 @@ out:          }          UNLOCK (&defrag->lock); -        if (defrag) { -                GF_FREE (defrag); -                conf->defrag = NULL; -        } +        GF_FREE (defrag->queue); +        GF_FREE (defrag); +        conf->defrag = NULL; + +        if (dict) +                dict_unref(dict); +exit:          return ret;  } @@ -2033,6 +2696,7 @@ gf_defrag_start (void *data)          dht_conf_t              *conf   = NULL;          gf_defrag_info_t        *defrag = NULL;          xlator_t                *this  = NULL; +        xlator_t                *old_THIS = NULL;          this = data;          conf = this->private; @@ -2058,6 +2722,8 @@ gf_defrag_start (void *data)          defrag->defrag_status = GF_DEFRAG_STATUS_STARTED; +        old_THIS = THIS; +        THIS = this;          ret = synctask_new (this->ctx->env, gf_defrag_start_crawl,                              gf_defrag_done, frame, this); @@ -2065,6 +2731,7 @@ gf_defrag_start (void *data)                  gf_msg (this->name, GF_LOG_ERROR, 0,                          DHT_MSG_REBALANCE_START_FAILED,                          "Could not create task for rebalance"); +        THIS = old_THIS;  out:          return NULL;  } diff --git a/xlators/cluster/dht/src/dht-shared.c b/xlators/cluster/dht/src/dht-shared.c index fc281b80287..3eccff925fb 100644 --- a/xlators/cluster/dht/src/dht-shared.c +++ b/xlators/cluster/dht/src/dht-shared.c @@ -589,6 +589,23 @@ dht_init (xlator_t *this)                  defrag->cmd = cmd;                  defrag->stats = _gf_false; + +                defrag->queue = NULL; + +                defrag->crawl_done = 0; + +                defrag->global_error = 0; + +                defrag->q_entry_count = 0; + +                defrag->wakeup_crawler = 0; + +                synclock_init (&defrag->link_lock, SYNC_LOCK_DEFAULT); +                pthread_mutex_init (&defrag->dfq_mutex, 0); +                pthread_cond_init  (&defrag->parallel_migration_cond, 0); +                pthread_cond_init  (&defrag->rebalance_crawler_alarm, 0); +                defrag->global_error = 0; +          }          conf->search_unhashed = GF_DHT_LOOKUP_UNHASHED_ON; @@ -651,6 +668,15 @@ dht_init (xlator_t *this)                  goto err;          } +        if (cmd) { +                ret = dht_init_local_subvolumes (this, conf); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "dht_init_local_subvolumes failed"); +                        goto err; +                } +        } +          if (dict_get_str (this->options, "decommissioned-bricks", &temp_str) == 0) {                  ret = dht_parse_decommissioned_bricks (this, conf, temp_str);                  if (ret == -1)  | 
