diff options
| -rw-r--r-- | xlators/cluster/dht/src/dht-common.h | 5 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-rebalance.c | 204 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-shared.c | 3 | 
3 files changed, 188 insertions, 24 deletions
diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h index 11a14905b4b..d6ca0448b09 100644 --- a/xlators/cluster/dht/src/dht-common.h +++ b/xlators/cluster/dht/src/dht-common.h @@ -508,6 +508,7 @@ struct gf_defrag_info_ {          uint64_t                     total_failures;          uint64_t                     skipped;          uint64_t                     num_dirs_processed; +        uint64_t                     size_processed;          gf_lock_t                    lock;          int                          cmd;          pthread_t                    th; @@ -553,6 +554,10 @@ struct gf_defrag_info_ {          /* backpointer to make it easier to write functions for rebalance */          xlator_t                     *this; +        pthread_cond_t               fc_wakeup_cond; +        pthread_mutex_t              fc_mutex; + +  };  typedef struct gf_defrag_info_ gf_defrag_info_t; diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c index 09bfac2dd1f..5653116a814 100644 --- a/xlators/cluster/dht/src/dht-rebalance.c +++ b/xlators/cluster/dht/src/dht-rebalance.c @@ -48,6 +48,7 @@          }                                                       \  uint64_t g_totalfiles = 0; +uint64_t g_totalsize = 0;  void @@ -2586,6 +2587,7 @@ gf_defrag_migrate_single_file (void *opaque)                          LOCK (&defrag->lock);                          {                                  defrag->skipped += 1; +                                defrag->size_processed += iatt.ia_size;                          }                          UNLOCK (&defrag->lock);                  } else if (fop_errno == ENOTSUP) { @@ -2594,6 +2596,7 @@ gf_defrag_migrate_single_file (void *opaque)                          LOCK (&defrag->lock);                          {                                  defrag->skipped += 1; +                                defrag->size_processed += iatt.ia_size;                          }                          UNLOCK (&defrag->lock);                  } else if (fop_errno != EEXIST) { @@ -2604,6 +2607,7 @@ gf_defrag_migrate_single_file (void *opaque)                          LOCK (&defrag->lock);                          {                                  defrag->total_failures += 1; +                                defrag->size_processed += iatt.ia_size;                          }                          UNLOCK (&defrag->lock); @@ -2627,6 +2631,7 @@ gf_defrag_migrate_single_file (void *opaque)          {                  defrag->total_files += 1;                  defrag->total_data += iatt.ia_size; +                defrag->size_processed += iatt.ia_size;          }          UNLOCK (&defrag->lock); @@ -2896,8 +2901,11 @@ gf_defrag_get_entry (xlator_t *this, int i, struct dht_container **container,                      !strcmp (df_entry->d_name, ".."))                          continue; -                if (IA_ISDIR (df_entry->d_stat.ia_type)) + +                if (IA_ISDIR (df_entry->d_stat.ia_type)) { +                        defrag->size_processed += df_entry->d_stat.ia_size;                          continue; +                }                  defrag->num_files_lookedup++; @@ -2905,6 +2913,7 @@ gf_defrag_get_entry (xlator_t *this, int i, struct dht_container **container,                      (gf_defrag_pattern_match (defrag, df_entry->d_name,                                                df_entry->d_stat.ia_size)                       == _gf_false)) { +                        defrag->size_processed += df_entry->d_stat.ia_size;                          continue;                  } @@ -3975,10 +3984,25 @@ gf_tier_wait_fix_lookup (gf_defrag_info_t *defrag) {  /******************Tier background Fix layout functions END********************/ +uint64_t +gf_defrag_subvol_file_size (xlator_t *this, loc_t *root_loc) +{ +        int ret = -1; +        struct statvfs buf = {0,}; +        if (!this) +                return 0; +        ret = syncop_statfs (this, root_loc, &buf, NULL, NULL); +        if (ret) { +                /* Aargh! */ +                return 0; +        } +        return ((buf.f_blocks - buf.f_bfree) * buf.f_frsize); +} -uint64_t gf_defrag_subvol_file_cnt (xlator_t *this, loc_t *root_loc) +uint64_t +gf_defrag_subvol_file_cnt (xlator_t *this, loc_t *root_loc)  {          int ret = -1;          struct statvfs buf = {0,}; @@ -3996,6 +4020,35 @@ uint64_t gf_defrag_subvol_file_cnt (xlator_t *this, loc_t *root_loc)  uint64_t +gf_defrag_total_file_size (xlator_t *this, loc_t *root_loc) +{ +        dht_conf_t    *conf  = NULL; +        int            i     = 0; +        uint64_t       size_files = 0; +        uint64_t       total_size = 0; + +        conf = this->private; +        if (!conf) { +                return 0; +        } + +        for (i = 0 ; i < conf->local_subvols_cnt; i++) { +                size_files = gf_defrag_subvol_file_size (conf->local_subvols[i], +                                                       root_loc); +                total_size += size_files; +                gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvol: %s," +                        "cnt = %"PRIu64, conf->local_subvols[i]->name, +                        size_files); +        } + +        gf_msg (this->name, GF_LOG_INFO, 0, 0, +                "Total size files = %"PRIu64, total_size); + +        return total_size; +} + + +uint64_t  gf_defrag_total_file_cnt (xlator_t *this, loc_t *root_loc)  {          dht_conf_t    *conf  = NULL; @@ -4080,8 +4133,12 @@ out:  static void*  dht_file_counter_thread (void *args)  { -        gf_defrag_info_t *defrag = NULL; -        loc_t root_loc = {0,}; +        gf_defrag_info_t *defrag      = NULL; +        loc_t root_loc                = {0,}; +        struct timespec time_to_wait  = {0,}; +        struct timeval now            = {0,}; +        uint64_t tmp_size             = 0; +          if (!args)                  return NULL; @@ -4091,18 +4148,38 @@ dht_file_counter_thread (void *args)          while (defrag->defrag_status == GF_DEFRAG_STATUS_STARTED) { -                sleep (FILE_CNT_INTERVAL); -                g_totalfiles = gf_defrag_total_file_cnt (defrag->this, +                gettimeofday (&now, NULL); +                time_to_wait.tv_sec = now.tv_sec + 600; +                time_to_wait.tv_nsec = 0; + + +                pthread_mutex_lock (&defrag->fc_mutex); +                pthread_cond_timedwait (&defrag->fc_wakeup_cond, +                                        &defrag->fc_mutex, +                                        &time_to_wait); + +                pthread_mutex_unlock (&defrag->fc_mutex); + + +                if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) +                        break; + +                tmp_size = gf_defrag_total_file_size (defrag->this,                                                           &root_loc); -                if (!g_totalfiles) { +                gf_log ("dht", GF_LOG_INFO, +                        "tmp data size =%"PRIu64, +                        tmp_size); + +                if (!tmp_size) {                          gf_msg ("dht", GF_LOG_ERROR, 0, 0, "Failed to get " -                                "the total number of files. Unable to estimate " +                                "the total data size. Unable to estimate "                                  "time to complete rebalance.");                  } else { +                        g_totalsize = tmp_size;                          gf_msg_debug ("dht", 0, -                                      "total number of files =%"PRIu64, -                                      g_totalfiles); +                                      "total data size =%"PRIu64, +                                      g_totalsize);                  }          } @@ -4136,7 +4213,9 @@ gf_defrag_start_crawl (void *data)          gf_boolean_t             is_tier_detach         = _gf_false;          call_frame_t            *statfs_frame           = NULL;          xlator_t                *old_THIS               = NULL; -        int j = 0; +        int                      j                      = 0; +        gf_boolean_t             fc_thread_started      = _gf_false; +          this = data;          if (!this) @@ -4281,6 +4360,13 @@ gf_defrag_start_crawl (void *data)                          }                  } +                g_totalsize = gf_defrag_total_file_size (this, &loc); +                if (!g_totalsize) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Failed to get " +                                "the total data size. Unable to estimate " +                                "time to complete rebalance."); +                } +                  g_totalfiles = gf_defrag_total_file_cnt (this, &loc);                  if (!g_totalfiles) {                          gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Failed to get " @@ -4288,16 +4374,19 @@ gf_defrag_start_crawl (void *data)                                  "time to complete rebalance.");                  } -                ret = gf_thread_create_detached (&filecnt_thread, -                                                 &dht_file_counter_thread, -                                                 (void *)defrag); +                ret = pthread_create (&filecnt_thread, NULL, +                                      &dht_file_counter_thread, +                                      (void *)defrag);                  if (ret) {                          gf_msg (this->name, GF_LOG_ERROR, ret, 0, "Failed to "                                  "create the file counter thread ");                          ret = 0; +                } else { +                        fc_thread_started = _gf_true;                  } +                  /* Initialize global entry queue */                  defrag->queue = GF_CALLOC (1, sizeof (struct dht_container),                                             gf_dht_mt_container_t); @@ -4416,8 +4505,6 @@ out:                  pthread_join (tid[i], NULL);          } - -          GF_FREE (tid);          if (defrag->cmd == GF_DEFRAG_CMD_START_TIER) { @@ -4443,6 +4530,16 @@ out:                  defrag->defrag_status = GF_DEFRAG_STATUS_COMPLETE;          } +        if (fc_thread_started) { +                pthread_mutex_lock (&defrag->fc_mutex); +                { +                        pthread_cond_broadcast (&defrag->fc_wakeup_cond); +                } +                pthread_mutex_unlock (&defrag->fc_mutex); + +                pthread_join (filecnt_thread, NULL); +        } +          dht_send_rebalance_event (this, defrag->cmd, defrag->defrag_status);          LOCK (&defrag->lock); @@ -4532,6 +4629,52 @@ out:  uint64_t +gf_defrag_get_estimates_based_on_size (dht_conf_t *conf) +{ +        gf_defrag_info_t *defrag = NULL; +        double            rate_processed = 0; +        uint64_t          total_processed = 0; +        uint64_t          tmp_count = 0; +        uint64_t          time_to_complete = 0; +        struct            timeval now = {0,}; +        double            elapsed = 0; + +        defrag = conf->defrag; + +        if (!g_totalsize) +                goto out; + +        gettimeofday (&now, NULL); +        elapsed = now.tv_sec - defrag->start_time.tv_sec; + +        total_processed = defrag->size_processed; + +        /* rate at which files processed */ +        rate_processed = (total_processed)/elapsed; + +        tmp_count = g_totalsize; + +        if (rate_processed) { +                time_to_complete = (tmp_count)/rate_processed; + +        } else { + +                gf_msg (THIS->name, GF_LOG_ERROR, 0, 0, +                        "Unable to calculate estimated time for rebalance"); +        } + +        gf_log (THIS->name, GF_LOG_INFO, +                "TIME: (size) total_processed=%"PRIu64" tmp_cnt = %"PRIu64"," +                "rate_processed=%f, elapsed = %f", total_processed, tmp_count, +                rate_processed, elapsed); + +out: +        return time_to_complete; +} + + + +uint64_t  gf_defrag_get_estimates (dht_conf_t *conf)  {          gf_defrag_info_t *defrag = NULL; @@ -4542,17 +4685,17 @@ gf_defrag_get_estimates (dht_conf_t *conf)          uint64_t          total_processed = 0;          uint64_t          tmp_count = 0;          uint64_t          time_to_complete = 0; -        struct            timeval end = {0,}; +        struct            timeval now = {0,};          double            elapsed = 0;          defrag = conf->defrag;          if (!g_totalfiles) -                return 0; +                goto out; -        gettimeofday (&end, NULL); -        elapsed = end.tv_sec - defrag->start_time.tv_sec; +        gettimeofday (&now, NULL); +        elapsed = now.tv_sec - defrag->start_time.tv_sec;          /* I tried locking before accessing num_files_lookedup and           * num_dirs_processed but the status function @@ -4599,7 +4742,7 @@ gf_defrag_get_estimates (dht_conf_t *conf)          }          gf_log (THIS->name, GF_LOG_INFO, -                "TIME: total_processed=%"PRIu64" tmp_cnt = %"PRIu64"," +                "TIME: (count) total_processed=%"PRIu64" tmp_cnt = %"PRIu64","                  "rate_lookedup=%f", total_processed, tmp_count,                  rate_lookedup); @@ -4646,19 +4789,32 @@ gf_defrag_status_get (dht_conf_t *conf, dict_t *dict)          elapsed = end.tv_sec - defrag->start_time.tv_sec; +        /* The rebalance is still in progress */ +          if ((defrag->cmd != GF_DEFRAG_CMD_START_TIER) -                && (defrag->defrag_status == GF_DEFRAG_STATUS_STARTED)) { +            && (defrag->defrag_status == GF_DEFRAG_STATUS_STARTED)) { +/*                  time_to_complete = gf_defrag_get_estimates (conf);                  if (time_to_complete && (time_to_complete > elapsed))                          time_left = time_to_complete - elapsed;                  gf_log (THIS->name, GF_LOG_INFO, -                        "TIME: Estimated total time to complete = %"PRIu64 -                        " seconds, seconds left = %"PRIu64"", +                        "TIME: Estimated total time to complete based on" +                        " count = %"PRIu64 " seconds, seconds left = %"PRIu64"",                          time_to_complete, time_left); +*/ +                time_to_complete = gf_defrag_get_estimates_based_on_size (conf); + +                if (time_to_complete && (time_to_complete > elapsed)) +                        time_left = time_to_complete - elapsed; + +                gf_log (THIS->name, GF_LOG_INFO, +                        "TIME: Estimated total time to complete (size)= %"PRIu64 +                        " seconds, seconds left = %"PRIu64"", +                        time_to_complete, time_left);          }          if (!dict) diff --git a/xlators/cluster/dht/src/dht-shared.c b/xlators/cluster/dht/src/dht-shared.c index a75c61190df..0373ebffe5a 100644 --- a/xlators/cluster/dht/src/dht-shared.c +++ b/xlators/cluster/dht/src/dht-shared.c @@ -742,6 +742,9 @@ dht_init (xlator_t *this)                  pthread_cond_init  (&defrag->rebalance_crawler_alarm, 0);                  pthread_cond_init  (&defrag->df_wakeup_thread, 0); +                pthread_mutex_init (&defrag->fc_mutex, 0); +                pthread_cond_init  (&defrag->fc_wakeup_cond, 0); +                  defrag->global_error = 0;          }  | 
