diff options
| author | Krutika Dhananjay <kdhananj@redhat.com> | 2018-01-09 15:11:00 +0530 | 
|---|---|---|
| committer | Raghavendra G <rgowdapp@redhat.com> | 2018-04-02 06:10:30 +0000 | 
| commit | 08fadcc2a706342e4eee79dc7d9b48ba01fcb312 (patch) | |
| tree | 64759c0d2701a80ca0aa5270a44018389306f2bb | |
| parent | 25690197a6af67669346892c36cca471805b9305 (diff) | |
mount/fuse: Add support for multi-threaded fuse readers
Usage: Use 'reader-thread-count=<NUM>' as command line option to
set the thread count at the time of mounting the volume.
Next task is to make these threads auto-scale based on the load,
instead of having the user remount the volume everytime to change
the thread count.
Updates #412
Change-Id: I94aa1505e5ae6a133683d473e0e4e0edd139b76b
Signed-off-by: Krutika Dhananjay <kdhananj@redhat.com>
| -rw-r--r-- | glusterfsd/src/glusterfsd.c | 26 | ||||
| -rw-r--r-- | glusterfsd/src/glusterfsd.h | 1 | ||||
| -rw-r--r-- | libglusterfs/src/glusterfs.h | 1 | ||||
| -rw-r--r-- | xlators/mount/fuse/src/fuse-bridge.c | 231 | ||||
| -rw-r--r-- | xlators/mount/fuse/src/fuse-bridge.h | 9 | ||||
| -rw-r--r-- | xlators/mount/fuse/src/fuse-helpers.c | 3 | ||||
| -rw-r--r-- | xlators/mount/fuse/src/fuse-mem-types.h | 1 | ||||
| -rwxr-xr-x | xlators/mount/fuse/utils/mount.glusterfs.in | 7 | 
8 files changed, 196 insertions, 83 deletions
diff --git a/glusterfsd/src/glusterfsd.c b/glusterfsd/src/glusterfsd.c index a113e3c479f..cb744f04fdf 100644 --- a/glusterfsd/src/glusterfsd.c +++ b/glusterfsd/src/glusterfsd.c @@ -247,6 +247,8 @@ static struct argp_option gf_options[] = {           "option to specify the process type" },          {"event-history", ARGP_FUSE_EVENT_HISTORY_KEY, "BOOL",           OPTION_ARG_OPTIONAL, "disable/enable fuse event-history"}, +        {"reader-thread-count", ARGP_READER_THREAD_COUNT_KEY, "INTEGER", +         OPTION_ARG_OPTIONAL, "set fuse reader thread count"},          {0, 0, 0, 0, "Miscellaneous Options:"},          {0, }  }; @@ -598,6 +600,16 @@ set_fuse_mount_options (glusterfs_ctx_t *ctx, dict_t *options)                          goto err;                  }          } +        if (cmd_args->reader_thread_count) { +                ret = dict_set_uint32 (options, "reader-thread-count", +                                       cmd_args->reader_thread_count); +                if (ret < 0) { +                        gf_msg ("glusterfsd", GF_LOG_ERROR, 0, glusterfsd_msg_4, +                                "failed to set dict value for key " +                                "reader-thread-count"); +                        goto err; +                } +        }          ret = 0;  err: @@ -1340,6 +1352,20 @@ no_oom_api:                  argp_failure (state, -1, 0,                                "unknown event-history setting \"%s\"", arg);                  break; +        case ARGP_READER_THREAD_COUNT_KEY: +                if (gf_string2uint32 (arg, &cmd_args->reader_thread_count)) { +                        argp_failure (state, -1, 0, +                                      "unknown reader thread count option %s", +                                      arg); +                } else if ((cmd_args->reader_thread_count < 1) || +                           (cmd_args->reader_thread_count > 64)) { +                        argp_failure (state, -1, 0, +                                      "Invalid reader thread count %s. " +                                      "Valid range: [\"1, 64\"]", arg); +                } + +                break; +  	}          return 0;  } diff --git a/glusterfsd/src/glusterfsd.h b/glusterfsd/src/glusterfsd.h index 4cbad534000..38785441817 100644 --- a/glusterfsd/src/glusterfsd.h +++ b/glusterfsd/src/glusterfsd.h @@ -101,6 +101,7 @@ enum argp_option_keys {          ARGP_PROCESS_NAME_KEY             = 179,          ARGP_FUSE_EVENT_HISTORY_KEY       = 180,          ARGP_THIN_CLIENT_KEY              = 181, +        ARGP_READER_THREAD_COUNT_KEY      = 182,  };  struct _gfd_vol_top_priv { diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index 84e33449dad..43e6f48d905 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -512,6 +512,7 @@ struct _cmd_args {          char              *process_name;          char              *event_history;          int                thin_client; +        uint32_t           reader_thread_count;  };  typedef struct _cmd_args cmd_args_t; diff --git a/xlators/mount/fuse/src/fuse-bridge.c b/xlators/mount/fuse/src/fuse-bridge.c index dee9b16abf3..9e9fb815080 100644 --- a/xlators/mount/fuse/src/fuse-bridge.c +++ b/xlators/mount/fuse/src/fuse-bridge.c @@ -690,7 +690,8 @@ fuse_lookup_resume (fuse_state_t *state)  }  static void -fuse_lookup (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_lookup (xlator_t *this, fuse_in_header_t *finh, void *msg, +             struct iobuf *iobuf)  {          char           *name     = msg;          fuse_state_t   *state    = NULL; @@ -718,7 +719,8 @@ do_forget(xlator_t *this, uint64_t unique, uint64_t nodeid, uint64_t nlookup)  }  static void -fuse_forget (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_forget (xlator_t *this, fuse_in_header_t *finh, void *msg, +             struct iobuf *iobuf)  {          struct fuse_forget_in *ffi        = msg; @@ -739,7 +741,8 @@ fuse_forget (xlator_t *this, fuse_in_header_t *finh, void *msg)  #if FUSE_KERNEL_MINOR_VERSION >= 16  static void -fuse_batch_forget(xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_batch_forget(xlator_t *this, fuse_in_header_t *finh, void *msg, +                  struct iobuf *iobuf)  {  	struct fuse_batch_forget_in *fbfi = msg;  	struct fuse_forget_one *ffo = (struct fuse_forget_one *) (fbfi + 1); @@ -957,7 +960,8 @@ fuse_getattr_resume (fuse_state_t *state)  }  static void -fuse_getattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_getattr (xlator_t *this, fuse_in_header_t *finh, void *msg, +              struct iobuf *iobuf)  {  #if FUSE_KERNEL_MINOR_VERSION >= 9          struct fuse_getattr_in *fgi  = msg; @@ -1287,7 +1291,8 @@ fuse_setattr_resume (fuse_state_t *state)  }  static void -fuse_setattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_setattr (xlator_t *this, fuse_in_header_t *finh, void *msg, +              struct iobuf *iobuf)  {          struct fuse_setattr_in *fsi = msg; @@ -1514,7 +1519,8 @@ fuse_access_resume (fuse_state_t *state)  }  static void -fuse_access (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_access (xlator_t *this, fuse_in_header_t *finh, void *msg, +             struct iobuf *iobuf)  {          struct fuse_access_in *fai = msg;          fuse_state_t *state = NULL; @@ -1588,7 +1594,8 @@ fuse_readlink_resume (fuse_state_t *state)  }  static void -fuse_readlink (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readlink (xlator_t *this, fuse_in_header_t *finh, void *msg, +               struct iobuf *iobuf)  {          fuse_state_t *state = NULL; @@ -1638,7 +1645,8 @@ fuse_mknod_resume (fuse_state_t *state)  }  static void -fuse_mknod (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_mknod (xlator_t *this, fuse_in_header_t *finh, void *msg, +            struct iobuf *iobuf)  {          struct fuse_mknod_in *fmi = msg;          char         *name = (char *)(fmi + 1); @@ -1708,7 +1716,8 @@ fuse_mkdir_resume (fuse_state_t *state)  }  static void -fuse_mkdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_mkdir (xlator_t *this, fuse_in_header_t *finh, void *msg, +            struct iobuf *iobuf)  {          struct fuse_mkdir_in *fmi = msg;          char *name = (char *)(fmi + 1); @@ -1760,7 +1769,8 @@ fuse_unlink_resume (fuse_state_t *state)  }  static void -fuse_unlink (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_unlink (xlator_t *this, fuse_in_header_t *finh, void *msg, +             struct iobuf *iobuf)  {          char         *name = msg;          fuse_state_t *state = NULL; @@ -1797,7 +1807,8 @@ fuse_rmdir_resume (fuse_state_t *state)  }  static void -fuse_rmdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_rmdir (xlator_t *this, fuse_in_header_t *finh, void *msg, +            struct iobuf *iobuf)  {          char         *name = msg;          fuse_state_t *state = NULL; @@ -1847,7 +1858,8 @@ fuse_symlink_resume (fuse_state_t *state)  }  static void -fuse_symlink (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_symlink (xlator_t *this, fuse_in_header_t *finh, void *msg, +              struct iobuf *iobuf)  {          char         *name = msg;          char         *linkname = name + strlen (name) + 1; @@ -1969,7 +1981,8 @@ fuse_rename_resume (fuse_state_t *state)  }  static void -fuse_rename (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_rename (xlator_t *this, fuse_in_header_t *finh, void *msg, +             struct iobuf *iobuf)  {          struct fuse_rename_in  *fri = msg;          char *oldname = (char *)(fri + 1); @@ -2019,7 +2032,8 @@ fuse_link_resume (fuse_state_t *state)  }  static void -fuse_link (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_link (xlator_t *this, fuse_in_header_t *finh, void *msg, +           struct iobuf *iobuf)  {          struct fuse_link_in *fli = msg;          char         *name = (char *)(fli + 1); @@ -2208,7 +2222,8 @@ fuse_create_resume (fuse_state_t *state)  }  static void -fuse_create (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_create (xlator_t *this, fuse_in_header_t *finh, void *msg, +             struct iobuf *iobuf)  {  #if FUSE_KERNEL_MINOR_VERSION >= 12          struct fuse_create_in *fci = msg; @@ -2298,7 +2313,8 @@ fuse_open_resume (fuse_state_t *state)  }  static void -fuse_open (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_open (xlator_t *this, fuse_in_header_t *finh, void *msg, +           struct iobuf *iobuf)  {          struct fuse_open_in *foi = msg;          fuse_state_t *state = NULL; @@ -2375,7 +2391,8 @@ fuse_readv_resume (fuse_state_t *state)  }  static void -fuse_readv (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readv (xlator_t *this, fuse_in_header_t *finh, void *msg, +            struct iobuf *iobuf)  {          struct fuse_read_in *fri = msg; @@ -2451,8 +2468,6 @@ void  fuse_write_resume (fuse_state_t *state)  {          struct iobref *iobref = NULL; -        struct iobuf  *iobuf = NULL; -          iobref = iobref_new ();          if (!iobref) { @@ -2465,8 +2480,7 @@ fuse_write_resume (fuse_state_t *state)                  return;          } -        iobuf = ((fuse_private_t *) (state->this->private))->iobuf; -        iobref_add (iobref, iobuf); +        iobref_add (iobref, state->iobuf);          gf_log ("glusterfs-fuse", GF_LOG_TRACE,                  "%"PRIu64": WRITE (%p, size=%"GF_PRI_SIZET", offset=%"PRId64")", @@ -2480,7 +2494,8 @@ fuse_write_resume (fuse_state_t *state)  }  static void -fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg, +            struct iobuf *iobuf)  {          /* WRITE is special, metadata is attached to in_header,           * and msg is the payload as-is. @@ -2523,6 +2538,7 @@ fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg)          state->vector.iov_base = msg;          state->vector.iov_len  = fwi->size; +        state->iobuf = iobuf;          fuse_resolve_and_resume (state, fuse_write_resume); @@ -2561,7 +2577,8 @@ fuse_lseek_resume (fuse_state_t *state)  }  static void -fuse_lseek (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_lseek (xlator_t *this, fuse_in_header_t *finh, void *msg, +            struct iobuf *iobuf)  {          struct fuse_lseek_in *ffi   = msg;          fuse_state_t         *state = NULL; @@ -2597,7 +2614,8 @@ fuse_flush_resume (fuse_state_t *state)  }  static void -fuse_flush (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_flush (xlator_t *this, fuse_in_header_t *finh, void *msg, +            struct iobuf *iobuf)  {          struct fuse_flush_in *ffi = msg; @@ -2633,7 +2651,8 @@ fuse_internal_release (xlator_t *this, fd_t *fd)  }  static void -fuse_release (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_release (xlator_t *this, fuse_in_header_t *finh, void *msg, +              struct iobuf *iobuf)  {          struct fuse_release_in *fri       = msg;          fd_t                   *fd        = NULL; @@ -2678,7 +2697,8 @@ fuse_fsync_resume (fuse_state_t *state)  }  static void -fuse_fsync (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_fsync (xlator_t *this, fuse_in_header_t *finh, void *msg, +            struct iobuf *iobuf)  {          struct fuse_fsync_in *fsi = msg; @@ -2748,7 +2768,8 @@ fuse_opendir_resume (fuse_state_t *state)  }  static void -fuse_opendir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_opendir (xlator_t *this, fuse_in_header_t *finh, void *msg, +              struct iobuf *iobuf)  {          /*          struct fuse_open_in *foi = msg; @@ -2890,7 +2911,8 @@ fuse_readdir_resume (fuse_state_t *state)  }  static void -fuse_readdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readdir (xlator_t *this, fuse_in_header_t *finh, void *msg, +              struct iobuf *iobuf)  {          struct fuse_read_in *fri = msg; @@ -3041,7 +3063,8 @@ fuse_readdirp_resume (fuse_state_t *state)  static void -fuse_readdirp (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readdirp (xlator_t *this, fuse_in_header_t *finh, void *msg, +               struct iobuf *iobuf)  {  	struct fuse_read_in *fri = msg; @@ -3088,7 +3111,8 @@ fuse_fallocate_resume(fuse_state_t *state)  }  static void -fuse_fallocate(xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_fallocate(xlator_t *this, fuse_in_header_t *finh, void *msg, +               struct iobuf *iobuf)  {  	struct fuse_fallocate_in *ffi = msg;  	fuse_state_t *state = NULL; @@ -3106,7 +3130,8 @@ fuse_fallocate(xlator_t *this, fuse_in_header_t *finh, void *msg)  #endif /* FUSE minor version >= 19 */  static void -fuse_releasedir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_releasedir (xlator_t *this, fuse_in_header_t *finh, void *msg, +                 struct iobuf *iobuf)  {          struct fuse_release_in *fri       = msg;          fuse_state_t           *state     = NULL; @@ -3147,7 +3172,8 @@ fuse_fsyncdir_resume (fuse_state_t *state)  }  static void -fuse_fsyncdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_fsyncdir (xlator_t *this, fuse_in_header_t *finh, void *msg, +               struct iobuf *iobuf)  {          struct fuse_fsync_in *fsi = msg; @@ -3247,7 +3273,8 @@ fuse_statfs_resume (fuse_state_t *state)  static void -fuse_statfs (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_statfs (xlator_t *this, fuse_in_header_t *finh, void *msg, +             struct iobuf *iobuf)  {          fuse_state_t *state = NULL; @@ -3299,7 +3326,8 @@ fuse_setxattr_resume (fuse_state_t *state)  static void -fuse_setxattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_setxattr (xlator_t *this, fuse_in_header_t *finh, void *msg, +               struct iobuf *iobuf)  {          struct fuse_setxattr_in *fsi = msg;          char         *name = (char *)(fsi + 1); @@ -3630,7 +3658,8 @@ fuse_getxattr_resume (fuse_state_t *state)  static void -fuse_getxattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_getxattr (xlator_t *this, fuse_in_header_t *finh, void *msg, +               struct iobuf *iobuf)  {          struct fuse_getxattr_in *fgxi     = msg;          char                    *name     = (char *)(fgxi + 1); @@ -3736,7 +3765,8 @@ fuse_listxattr_resume (fuse_state_t *state)  static void -fuse_listxattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_listxattr (xlator_t *this, fuse_in_header_t *finh, void *msg, +                struct iobuf *iobuf)  {          struct fuse_getxattr_in *fgxi = msg;          fuse_state_t *state = NULL; @@ -3792,7 +3822,8 @@ fuse_removexattr_resume (fuse_state_t *state)  static void -fuse_removexattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_removexattr (xlator_t *this, fuse_in_header_t *finh, void *msg, +                  struct iobuf *iobuf)  {          char *name = msg; @@ -3891,7 +3922,8 @@ fuse_getlk_resume (fuse_state_t *state)  static void -fuse_getlk (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_getlk (xlator_t *this, fuse_in_header_t *finh, void *msg, +            struct iobuf *iobuf)  {          struct fuse_lk_in *fli = msg; @@ -3983,7 +4015,8 @@ fuse_setlk_resume (fuse_state_t *state)  static void -fuse_setlk (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_setlk (xlator_t *this, fuse_in_header_t *finh, void *msg, +            struct iobuf *iobuf)  {          struct fuse_lk_in *fli = msg; @@ -4086,7 +4119,8 @@ notify_kernel_loop (void *data)  #endif  static void -fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg, +           struct iobuf *iobuf)  {          struct fuse_init_in  *fini      = msg;          struct fuse_init_out  fino      = {0,}; @@ -4257,7 +4291,8 @@ fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg)  static void -fuse_enosys (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_enosys (xlator_t *this, fuse_in_header_t *finh, void *msg, +             struct iobuf *iobuf)  {          send_fuse_err (this, finh, ENOSYS); @@ -4266,7 +4301,8 @@ fuse_enosys (xlator_t *this, fuse_in_header_t *finh, void *msg)  static void -fuse_destroy (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_destroy (xlator_t *this, fuse_in_header_t *finh, void *msg, +              struct iobuf *iobuf)  {          send_fuse_err (this, finh, 0); @@ -4857,6 +4893,7 @@ fuse_graph_sync (xlator_t *this)                  new_graph_id = priv->next_graph->id;                  priv->next_graph = NULL;                  need_first_lookup = 1; +                priv->handle_graph_switch = _gf_true;                  while (!priv->event_recvd) {                          ret = pthread_cond_wait (&priv->sync_cond, @@ -4885,6 +4922,8 @@ unlock:                  {                          old_subvol->switched = 1;                          winds_on_old_subvol = old_subvol->winds; +                        priv->handle_graph_switch = _gf_false; +                        pthread_cond_broadcast (&priv->migrate_cond);                  }                  pthread_mutex_unlock (&priv->sync_mutex); @@ -4892,6 +4931,13 @@ unlock:                          xlator_notify (old_subvol, GF_EVENT_PARENT_DOWN,                                         old_subvol, NULL);                  } +        } else { +                pthread_mutex_lock (&priv->sync_mutex); +                { +                        priv->handle_graph_switch = _gf_false; +                        pthread_cond_broadcast (&priv->migrate_cond); +                } +                pthread_mutex_unlock (&priv->sync_mutex);          }          return 0; @@ -4928,7 +4974,6 @@ fuse_thread_proc (void *data)          const size_t              msg0_size = sizeof (*finh) + 128;          fuse_handler_t          **fuse_ops = NULL;          struct pollfd             pfd[2] = {{0,}}; -        gf_boolean_t              mount_finished = _gf_false;          this = data;          priv = this->private; @@ -4945,32 +4990,40 @@ fuse_thread_proc (void *data)                  /* THIS has to be reset here */                  THIS = this; -                if (!mount_finished) { -                        memset(pfd,0,sizeof(pfd)); -                        pfd[0].fd = priv->status_pipe[0]; -                        pfd[0].events = POLLIN | POLLHUP | POLLERR; -                        pfd[1].fd = priv->fd; -                        pfd[1].events = POLLIN | POLLHUP | POLLERR; -                        if (poll(pfd,2,-1) < 0) { -                                gf_log (this->name, GF_LOG_ERROR, -                                        "poll error %s", strerror(errno)); -                                break; -                        } -                        if (pfd[0].revents & POLLIN) { -                                if (fuse_get_mount_status(this) != 0) { +                pthread_mutex_lock (&priv->sync_mutex); +                { +                        if (!priv->mount_finished) { +                                memset(pfd, 0, sizeof(pfd)); +                                pfd[0].fd = priv->status_pipe[0]; +                                pfd[0].events = POLLIN | POLLHUP | POLLERR; +                                pfd[1].fd = priv->fd; +                                pfd[1].events = POLLIN | POLLHUP | POLLERR; +                                if (poll(pfd, 2, -1) < 0) { +                                        gf_log (this->name, GF_LOG_ERROR, +                                                "poll error %s", +                                                strerror(errno)); +                                        pthread_mutex_unlock (&priv->sync_mutex);                                          break;                                  } -                                mount_finished = _gf_true; -                        } -                        else if (pfd[0].revents) { -                                gf_log (this->name, GF_LOG_ERROR, -                                        "mount pipe closed without status"); -                                break; -                        } -                        if (!pfd[1].revents) { -                                continue; +                                if (pfd[0].revents & POLLIN) { +                                        if (fuse_get_mount_status(this) != 0) { +                                                pthread_mutex_unlock (&priv->sync_mutex); +                                                break; +                                        } +                                        priv->mount_finished = _gf_true; +                                } else if (pfd[0].revents) { +                                        gf_log (this->name, GF_LOG_ERROR, +                                                "mount pipe closed without status"); +                                        pthread_mutex_unlock (&priv->sync_mutex); +                                        break; +                                } +                                if (!pfd[1].revents) { +                                        pthread_mutex_unlock (&priv->sync_mutex); +                                        continue; +                                }                          }                  } +                pthread_mutex_unlock (&priv->sync_mutex);                  /*                   * We don't want to block on readv while we're still waiting @@ -5065,8 +5118,6 @@ fuse_thread_proc (void *data)                          break;                  } -                priv->iobuf = iobuf; -                  /*                   * This can be moved around a bit, but it's important to do it                   * *after* the readv.  Otherwise, a graph switch could occur @@ -5109,9 +5160,9 @@ fuse_thread_proc (void *data)                  if (finh->opcode >= FUSE_OP_HIGH)                          /* turn down MacFUSE specific messages */ -                        fuse_enosys (this, finh, msg); +                        fuse_enosys (this, finh, msg, NULL);                  else -                        fuse_ops[finh->opcode] (this, finh, msg); +                        fuse_ops[finh->opcode] (this, finh, msg, iobuf);                  iobuf_unref (iobuf);                  continue; @@ -5183,8 +5234,6 @@ fuse_priv_dump (xlator_t  *this)                              private->volfile_size);          gf_proc_dump_write("mount_point", "%s",                              private->mount_point); -        gf_proc_dump_write("iobuf", "%u", -                            private->iobuf);          gf_proc_dump_write("fuse_thread_started", "%d",                              (int)private->fuse_thread_started);          gf_proc_dump_write("direct_io_mode", "%d", @@ -5310,6 +5359,7 @@ unlock:  int  notify (xlator_t *this, int32_t event, void *data, ...)  { +        int                 i       = 0;          int32_t             ret     = 0;          fuse_private_t     *private = NULL;          gf_boolean_t        start_thread = _gf_false; @@ -5358,14 +5408,21 @@ notify (xlator_t *this, int32_t event, void *data, ...)                  pthread_mutex_unlock (&private->sync_mutex);                  if (start_thread) { -                        ret = gf_thread_create (&private->fuse_thread, NULL, -                                                fuse_thread_proc, this, -                                                "fuseproc"); -                        if (ret != 0) { -                                gf_log (this->name, GF_LOG_DEBUG, -                                        "pthread_create() failed (%s)", -                                        strerror (errno)); -                                break; +                        private->fuse_thread = GF_CALLOC (private->reader_thread_count, +                                                          sizeof (pthread_t), +                                                          gf_fuse_mt_pthread_t); +                        for (i = 0; i < private->reader_thread_count; i++) { + +                                ret = gf_thread_create (&private->fuse_thread[i], +                                                        NULL, +                                                        fuse_thread_proc, this, +                                                        "fuseproc"); +                                if (ret != 0) { +                                        gf_log (this->name, GF_LOG_DEBUG, +                                                "pthread_create() failed (%s)", +                                                strerror (errno)); +                                        break; +                                }                          }                  } @@ -5472,7 +5529,8 @@ static fuse_handler_t *fuse_dump_ops[FUSE_OP_HIGH];  static void -fuse_dumper (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_dumper (xlator_t *this, fuse_in_header_t *finh, void *msg, +             struct iobuf *iobuf)  {          fuse_private_t *priv = NULL;          struct iovec diov[6]           = {{0,},}; @@ -5504,7 +5562,7 @@ fuse_dumper (xlator_t *this, fuse_in_header_t *finh, void *msg)                          "failed to dump fuse message (R): %s",                          strerror (errno)); -        priv->fuse_ops0[finh->opcode] (this, finh, msg); +        priv->fuse_ops0[finh->opcode] (this, finh, msg, NULL);  } @@ -5609,6 +5667,9 @@ init (xlator_t *this_xl)          GF_OPTION_INIT (ZR_ATTR_TIMEOUT_OPT, priv->attribute_timeout, double,                          cleanup_exit); +        GF_OPTION_INIT ("reader-thread-count", priv->reader_thread_count, uint32, +                        cleanup_exit); +          GF_OPTION_INIT (ZR_ENTRY_TIMEOUT_OPT, priv->entry_timeout, double,                          cleanup_exit); @@ -5830,6 +5891,7 @@ init (xlator_t *this_xl)          pthread_mutex_init (&priv->fuse_dump_mutex, NULL);          pthread_cond_init (&priv->sync_cond, NULL); +        pthread_cond_init (&priv->migrate_cond, NULL);          pthread_mutex_init (&priv->sync_mutex, NULL);          priv->event_recvd = 0; @@ -6034,5 +6096,12 @@ struct volume_options options[] = {            .default_value = "false",            .description = "Enables thin mount and connects via gfproxyd daemon.",          }, +        { .key = {"reader-thread-count"}, +          .type = GF_OPTION_TYPE_INT, +          .default_value = "4", +          .min = 1, +          .max = 64, +          .description = "Sets fuse reader thread count.", +        },          { .key = {NULL} },  }; diff --git a/xlators/mount/fuse/src/fuse-bridge.h b/xlators/mount/fuse/src/fuse-bridge.h index 52718161c24..6cf9d2f7cf8 100644 --- a/xlators/mount/fuse/src/fuse-bridge.h +++ b/xlators/mount/fuse/src/fuse-bridge.h @@ -52,7 +52,7 @@  typedef struct fuse_in_header fuse_in_header_t;  typedef void (fuse_handler_t) (xlator_t *this, fuse_in_header_t *finh, -                               void *msg); +                               void *msg, struct iobuf *iobuf);  struct fuse_private {          int                  fd; @@ -62,7 +62,8 @@ struct fuse_private {          char                *mount_point;          struct iobuf        *iobuf; -        pthread_t            fuse_thread; +        pthread_t            *fuse_thread; +        uint32_t              reader_thread_count;          char                 fuse_thread_started;          uint32_t             direct_io_mode; @@ -143,6 +144,9 @@ struct fuse_private {          /* Load the thin volfile, and connect to gfproxyd*/          gf_boolean_t         thin_client; +        gf_boolean_t         mount_finished; +        gf_boolean_t         handle_graph_switch; +        pthread_cond_t       migrate_cond;  };  typedef struct fuse_private fuse_private_t; @@ -394,6 +398,7 @@ typedef struct {          int32_t        fd_no;          gf_seek_what_t whence; +        struct iobuf *iobuf;  } fuse_state_t;  typedef struct { diff --git a/xlators/mount/fuse/src/fuse-helpers.c b/xlators/mount/fuse/src/fuse-helpers.c index c59ff772cb8..c2d4d0cc9d8 100644 --- a/xlators/mount/fuse/src/fuse-helpers.c +++ b/xlators/mount/fuse/src/fuse-helpers.c @@ -123,6 +123,9 @@ get_fuse_state (xlator_t *this, fuse_in_header_t *finh)          pthread_mutex_lock (&priv->sync_mutex);          { +                while (priv->handle_graph_switch) +                        pthread_cond_wait (&priv->migrate_cond, +                                           &priv->sync_mutex);                  active_subvol = fuse_active_subvol (state->this);                  active_subvol->winds++;          } diff --git a/xlators/mount/fuse/src/fuse-mem-types.h b/xlators/mount/fuse/src/fuse-mem-types.h index 2b4b473813d..721b9a347cf 100644 --- a/xlators/mount/fuse/src/fuse-mem-types.h +++ b/xlators/mount/fuse/src/fuse-mem-types.h @@ -23,6 +23,7 @@ enum gf_fuse_mem_types_ {          gf_fuse_mt_graph_switch_args_t,  	gf_fuse_mt_gids_t,          gf_fuse_mt_invalidate_node_t, +        gf_fuse_mt_pthread_t,          gf_fuse_mt_end  };  #endif diff --git a/xlators/mount/fuse/utils/mount.glusterfs.in b/xlators/mount/fuse/utils/mount.glusterfs.in index fd616844d65..6890ff00121 100755 --- a/xlators/mount/fuse/utils/mount.glusterfs.in +++ b/xlators/mount/fuse/utils/mount.glusterfs.in @@ -225,6 +225,10 @@ start_glusterfs ()          cmd_line=$(echo "$cmd_line --event-history=$event_history");      fi +    if [ -n "$reader_thread_count" ]; then +        cmd_line=$(echo "$cmd_line --reader-thread-count=$reader_thread_count"); +    fi +      if [ -n "$volume_name" ]; then          cmd_line=$(echo "$cmd_line --volume-name=$volume_name");      fi @@ -499,6 +503,9 @@ with_options()          "event-history")              event_history=$value              ;; +        "reader-thread-count") +            reader_thread_count=$value +            ;;          "no-root-squash")              if [ $value = "yes" ] ||                  [ $value = "on" ] ||  | 
