diff options
author | Krutika Dhananjay <kdhananj@redhat.com> | 2018-01-09 15:11:00 +0530 |
---|---|---|
committer | Raghavendra G <rgowdapp@redhat.com> | 2018-04-02 06:10:30 +0000 |
commit | 08fadcc2a706342e4eee79dc7d9b48ba01fcb312 (patch) | |
tree | 64759c0d2701a80ca0aa5270a44018389306f2bb | |
parent | 25690197a6af67669346892c36cca471805b9305 (diff) |
mount/fuse: Add support for multi-threaded fuse readers
Usage: Use 'reader-thread-count=<NUM>' as command line option to
set the thread count at the time of mounting the volume.
Next task is to make these threads auto-scale based on the load,
instead of having the user remount the volume everytime to change
the thread count.
Updates #412
Change-Id: I94aa1505e5ae6a133683d473e0e4e0edd139b76b
Signed-off-by: Krutika Dhananjay <kdhananj@redhat.com>
-rw-r--r-- | glusterfsd/src/glusterfsd.c | 26 | ||||
-rw-r--r-- | glusterfsd/src/glusterfsd.h | 1 | ||||
-rw-r--r-- | libglusterfs/src/glusterfs.h | 1 | ||||
-rw-r--r-- | xlators/mount/fuse/src/fuse-bridge.c | 231 | ||||
-rw-r--r-- | xlators/mount/fuse/src/fuse-bridge.h | 9 | ||||
-rw-r--r-- | xlators/mount/fuse/src/fuse-helpers.c | 3 | ||||
-rw-r--r-- | xlators/mount/fuse/src/fuse-mem-types.h | 1 | ||||
-rwxr-xr-x | xlators/mount/fuse/utils/mount.glusterfs.in | 7 |
8 files changed, 196 insertions, 83 deletions
diff --git a/glusterfsd/src/glusterfsd.c b/glusterfsd/src/glusterfsd.c index a113e3c479f..cb744f04fdf 100644 --- a/glusterfsd/src/glusterfsd.c +++ b/glusterfsd/src/glusterfsd.c @@ -247,6 +247,8 @@ static struct argp_option gf_options[] = { "option to specify the process type" }, {"event-history", ARGP_FUSE_EVENT_HISTORY_KEY, "BOOL", OPTION_ARG_OPTIONAL, "disable/enable fuse event-history"}, + {"reader-thread-count", ARGP_READER_THREAD_COUNT_KEY, "INTEGER", + OPTION_ARG_OPTIONAL, "set fuse reader thread count"}, {0, 0, 0, 0, "Miscellaneous Options:"}, {0, } }; @@ -598,6 +600,16 @@ set_fuse_mount_options (glusterfs_ctx_t *ctx, dict_t *options) goto err; } } + if (cmd_args->reader_thread_count) { + ret = dict_set_uint32 (options, "reader-thread-count", + cmd_args->reader_thread_count); + if (ret < 0) { + gf_msg ("glusterfsd", GF_LOG_ERROR, 0, glusterfsd_msg_4, + "failed to set dict value for key " + "reader-thread-count"); + goto err; + } + } ret = 0; err: @@ -1340,6 +1352,20 @@ no_oom_api: argp_failure (state, -1, 0, "unknown event-history setting \"%s\"", arg); break; + case ARGP_READER_THREAD_COUNT_KEY: + if (gf_string2uint32 (arg, &cmd_args->reader_thread_count)) { + argp_failure (state, -1, 0, + "unknown reader thread count option %s", + arg); + } else if ((cmd_args->reader_thread_count < 1) || + (cmd_args->reader_thread_count > 64)) { + argp_failure (state, -1, 0, + "Invalid reader thread count %s. " + "Valid range: [\"1, 64\"]", arg); + } + + break; + } return 0; } diff --git a/glusterfsd/src/glusterfsd.h b/glusterfsd/src/glusterfsd.h index 4cbad534000..38785441817 100644 --- a/glusterfsd/src/glusterfsd.h +++ b/glusterfsd/src/glusterfsd.h @@ -101,6 +101,7 @@ enum argp_option_keys { ARGP_PROCESS_NAME_KEY = 179, ARGP_FUSE_EVENT_HISTORY_KEY = 180, ARGP_THIN_CLIENT_KEY = 181, + ARGP_READER_THREAD_COUNT_KEY = 182, }; struct _gfd_vol_top_priv { diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index 84e33449dad..43e6f48d905 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -512,6 +512,7 @@ struct _cmd_args { char *process_name; char *event_history; int thin_client; + uint32_t reader_thread_count; }; typedef struct _cmd_args cmd_args_t; diff --git a/xlators/mount/fuse/src/fuse-bridge.c b/xlators/mount/fuse/src/fuse-bridge.c index dee9b16abf3..9e9fb815080 100644 --- a/xlators/mount/fuse/src/fuse-bridge.c +++ b/xlators/mount/fuse/src/fuse-bridge.c @@ -690,7 +690,8 @@ fuse_lookup_resume (fuse_state_t *state) } static void -fuse_lookup (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_lookup (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { char *name = msg; fuse_state_t *state = NULL; @@ -718,7 +719,8 @@ do_forget(xlator_t *this, uint64_t unique, uint64_t nodeid, uint64_t nlookup) } static void -fuse_forget (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_forget (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_forget_in *ffi = msg; @@ -739,7 +741,8 @@ fuse_forget (xlator_t *this, fuse_in_header_t *finh, void *msg) #if FUSE_KERNEL_MINOR_VERSION >= 16 static void -fuse_batch_forget(xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_batch_forget(xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_batch_forget_in *fbfi = msg; struct fuse_forget_one *ffo = (struct fuse_forget_one *) (fbfi + 1); @@ -957,7 +960,8 @@ fuse_getattr_resume (fuse_state_t *state) } static void -fuse_getattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_getattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { #if FUSE_KERNEL_MINOR_VERSION >= 9 struct fuse_getattr_in *fgi = msg; @@ -1287,7 +1291,8 @@ fuse_setattr_resume (fuse_state_t *state) } static void -fuse_setattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_setattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_setattr_in *fsi = msg; @@ -1514,7 +1519,8 @@ fuse_access_resume (fuse_state_t *state) } static void -fuse_access (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_access (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_access_in *fai = msg; fuse_state_t *state = NULL; @@ -1588,7 +1594,8 @@ fuse_readlink_resume (fuse_state_t *state) } static void -fuse_readlink (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readlink (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { fuse_state_t *state = NULL; @@ -1638,7 +1645,8 @@ fuse_mknod_resume (fuse_state_t *state) } static void -fuse_mknod (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_mknod (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_mknod_in *fmi = msg; char *name = (char *)(fmi + 1); @@ -1708,7 +1716,8 @@ fuse_mkdir_resume (fuse_state_t *state) } static void -fuse_mkdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_mkdir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_mkdir_in *fmi = msg; char *name = (char *)(fmi + 1); @@ -1760,7 +1769,8 @@ fuse_unlink_resume (fuse_state_t *state) } static void -fuse_unlink (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_unlink (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { char *name = msg; fuse_state_t *state = NULL; @@ -1797,7 +1807,8 @@ fuse_rmdir_resume (fuse_state_t *state) } static void -fuse_rmdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_rmdir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { char *name = msg; fuse_state_t *state = NULL; @@ -1847,7 +1858,8 @@ fuse_symlink_resume (fuse_state_t *state) } static void -fuse_symlink (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_symlink (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { char *name = msg; char *linkname = name + strlen (name) + 1; @@ -1969,7 +1981,8 @@ fuse_rename_resume (fuse_state_t *state) } static void -fuse_rename (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_rename (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_rename_in *fri = msg; char *oldname = (char *)(fri + 1); @@ -2019,7 +2032,8 @@ fuse_link_resume (fuse_state_t *state) } static void -fuse_link (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_link (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_link_in *fli = msg; char *name = (char *)(fli + 1); @@ -2208,7 +2222,8 @@ fuse_create_resume (fuse_state_t *state) } static void -fuse_create (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_create (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { #if FUSE_KERNEL_MINOR_VERSION >= 12 struct fuse_create_in *fci = msg; @@ -2298,7 +2313,8 @@ fuse_open_resume (fuse_state_t *state) } static void -fuse_open (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_open (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_open_in *foi = msg; fuse_state_t *state = NULL; @@ -2375,7 +2391,8 @@ fuse_readv_resume (fuse_state_t *state) } static void -fuse_readv (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readv (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_read_in *fri = msg; @@ -2451,8 +2468,6 @@ void fuse_write_resume (fuse_state_t *state) { struct iobref *iobref = NULL; - struct iobuf *iobuf = NULL; - iobref = iobref_new (); if (!iobref) { @@ -2465,8 +2480,7 @@ fuse_write_resume (fuse_state_t *state) return; } - iobuf = ((fuse_private_t *) (state->this->private))->iobuf; - iobref_add (iobref, iobuf); + iobref_add (iobref, state->iobuf); gf_log ("glusterfs-fuse", GF_LOG_TRACE, "%"PRIu64": WRITE (%p, size=%"GF_PRI_SIZET", offset=%"PRId64")", @@ -2480,7 +2494,8 @@ fuse_write_resume (fuse_state_t *state) } static void -fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { /* WRITE is special, metadata is attached to in_header, * and msg is the payload as-is. @@ -2523,6 +2538,7 @@ fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg) state->vector.iov_base = msg; state->vector.iov_len = fwi->size; + state->iobuf = iobuf; fuse_resolve_and_resume (state, fuse_write_resume); @@ -2561,7 +2577,8 @@ fuse_lseek_resume (fuse_state_t *state) } static void -fuse_lseek (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_lseek (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_lseek_in *ffi = msg; fuse_state_t *state = NULL; @@ -2597,7 +2614,8 @@ fuse_flush_resume (fuse_state_t *state) } static void -fuse_flush (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_flush (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_flush_in *ffi = msg; @@ -2633,7 +2651,8 @@ fuse_internal_release (xlator_t *this, fd_t *fd) } static void -fuse_release (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_release (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_release_in *fri = msg; fd_t *fd = NULL; @@ -2678,7 +2697,8 @@ fuse_fsync_resume (fuse_state_t *state) } static void -fuse_fsync (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_fsync (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_fsync_in *fsi = msg; @@ -2748,7 +2768,8 @@ fuse_opendir_resume (fuse_state_t *state) } static void -fuse_opendir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_opendir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { /* struct fuse_open_in *foi = msg; @@ -2890,7 +2911,8 @@ fuse_readdir_resume (fuse_state_t *state) } static void -fuse_readdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readdir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_read_in *fri = msg; @@ -3041,7 +3063,8 @@ fuse_readdirp_resume (fuse_state_t *state) static void -fuse_readdirp (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readdirp (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_read_in *fri = msg; @@ -3088,7 +3111,8 @@ fuse_fallocate_resume(fuse_state_t *state) } static void -fuse_fallocate(xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_fallocate(xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_fallocate_in *ffi = msg; fuse_state_t *state = NULL; @@ -3106,7 +3130,8 @@ fuse_fallocate(xlator_t *this, fuse_in_header_t *finh, void *msg) #endif /* FUSE minor version >= 19 */ static void -fuse_releasedir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_releasedir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_release_in *fri = msg; fuse_state_t *state = NULL; @@ -3147,7 +3172,8 @@ fuse_fsyncdir_resume (fuse_state_t *state) } static void -fuse_fsyncdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_fsyncdir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_fsync_in *fsi = msg; @@ -3247,7 +3273,8 @@ fuse_statfs_resume (fuse_state_t *state) static void -fuse_statfs (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_statfs (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { fuse_state_t *state = NULL; @@ -3299,7 +3326,8 @@ fuse_setxattr_resume (fuse_state_t *state) static void -fuse_setxattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_setxattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_setxattr_in *fsi = msg; char *name = (char *)(fsi + 1); @@ -3630,7 +3658,8 @@ fuse_getxattr_resume (fuse_state_t *state) static void -fuse_getxattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_getxattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_getxattr_in *fgxi = msg; char *name = (char *)(fgxi + 1); @@ -3736,7 +3765,8 @@ fuse_listxattr_resume (fuse_state_t *state) static void -fuse_listxattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_listxattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_getxattr_in *fgxi = msg; fuse_state_t *state = NULL; @@ -3792,7 +3822,8 @@ fuse_removexattr_resume (fuse_state_t *state) static void -fuse_removexattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_removexattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { char *name = msg; @@ -3891,7 +3922,8 @@ fuse_getlk_resume (fuse_state_t *state) static void -fuse_getlk (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_getlk (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_lk_in *fli = msg; @@ -3983,7 +4015,8 @@ fuse_setlk_resume (fuse_state_t *state) static void -fuse_setlk (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_setlk (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_lk_in *fli = msg; @@ -4086,7 +4119,8 @@ notify_kernel_loop (void *data) #endif static void -fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_init_in *fini = msg; struct fuse_init_out fino = {0,}; @@ -4257,7 +4291,8 @@ fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg) static void -fuse_enosys (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_enosys (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { send_fuse_err (this, finh, ENOSYS); @@ -4266,7 +4301,8 @@ fuse_enosys (xlator_t *this, fuse_in_header_t *finh, void *msg) static void -fuse_destroy (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_destroy (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { send_fuse_err (this, finh, 0); @@ -4857,6 +4893,7 @@ fuse_graph_sync (xlator_t *this) new_graph_id = priv->next_graph->id; priv->next_graph = NULL; need_first_lookup = 1; + priv->handle_graph_switch = _gf_true; while (!priv->event_recvd) { ret = pthread_cond_wait (&priv->sync_cond, @@ -4885,6 +4922,8 @@ unlock: { old_subvol->switched = 1; winds_on_old_subvol = old_subvol->winds; + priv->handle_graph_switch = _gf_false; + pthread_cond_broadcast (&priv->migrate_cond); } pthread_mutex_unlock (&priv->sync_mutex); @@ -4892,6 +4931,13 @@ unlock: xlator_notify (old_subvol, GF_EVENT_PARENT_DOWN, old_subvol, NULL); } + } else { + pthread_mutex_lock (&priv->sync_mutex); + { + priv->handle_graph_switch = _gf_false; + pthread_cond_broadcast (&priv->migrate_cond); + } + pthread_mutex_unlock (&priv->sync_mutex); } return 0; @@ -4928,7 +4974,6 @@ fuse_thread_proc (void *data) const size_t msg0_size = sizeof (*finh) + 128; fuse_handler_t **fuse_ops = NULL; struct pollfd pfd[2] = {{0,}}; - gf_boolean_t mount_finished = _gf_false; this = data; priv = this->private; @@ -4945,32 +4990,40 @@ fuse_thread_proc (void *data) /* THIS has to be reset here */ THIS = this; - if (!mount_finished) { - memset(pfd,0,sizeof(pfd)); - pfd[0].fd = priv->status_pipe[0]; - pfd[0].events = POLLIN | POLLHUP | POLLERR; - pfd[1].fd = priv->fd; - pfd[1].events = POLLIN | POLLHUP | POLLERR; - if (poll(pfd,2,-1) < 0) { - gf_log (this->name, GF_LOG_ERROR, - "poll error %s", strerror(errno)); - break; - } - if (pfd[0].revents & POLLIN) { - if (fuse_get_mount_status(this) != 0) { + pthread_mutex_lock (&priv->sync_mutex); + { + if (!priv->mount_finished) { + memset(pfd, 0, sizeof(pfd)); + pfd[0].fd = priv->status_pipe[0]; + pfd[0].events = POLLIN | POLLHUP | POLLERR; + pfd[1].fd = priv->fd; + pfd[1].events = POLLIN | POLLHUP | POLLERR; + if (poll(pfd, 2, -1) < 0) { + gf_log (this->name, GF_LOG_ERROR, + "poll error %s", + strerror(errno)); + pthread_mutex_unlock (&priv->sync_mutex); break; } - mount_finished = _gf_true; - } - else if (pfd[0].revents) { - gf_log (this->name, GF_LOG_ERROR, - "mount pipe closed without status"); - break; - } - if (!pfd[1].revents) { - continue; + if (pfd[0].revents & POLLIN) { + if (fuse_get_mount_status(this) != 0) { + pthread_mutex_unlock (&priv->sync_mutex); + break; + } + priv->mount_finished = _gf_true; + } else if (pfd[0].revents) { + gf_log (this->name, GF_LOG_ERROR, + "mount pipe closed without status"); + pthread_mutex_unlock (&priv->sync_mutex); + break; + } + if (!pfd[1].revents) { + pthread_mutex_unlock (&priv->sync_mutex); + continue; + } } } + pthread_mutex_unlock (&priv->sync_mutex); /* * We don't want to block on readv while we're still waiting @@ -5065,8 +5118,6 @@ fuse_thread_proc (void *data) break; } - priv->iobuf = iobuf; - /* * This can be moved around a bit, but it's important to do it * *after* the readv. Otherwise, a graph switch could occur @@ -5109,9 +5160,9 @@ fuse_thread_proc (void *data) if (finh->opcode >= FUSE_OP_HIGH) /* turn down MacFUSE specific messages */ - fuse_enosys (this, finh, msg); + fuse_enosys (this, finh, msg, NULL); else - fuse_ops[finh->opcode] (this, finh, msg); + fuse_ops[finh->opcode] (this, finh, msg, iobuf); iobuf_unref (iobuf); continue; @@ -5183,8 +5234,6 @@ fuse_priv_dump (xlator_t *this) private->volfile_size); gf_proc_dump_write("mount_point", "%s", private->mount_point); - gf_proc_dump_write("iobuf", "%u", - private->iobuf); gf_proc_dump_write("fuse_thread_started", "%d", (int)private->fuse_thread_started); gf_proc_dump_write("direct_io_mode", "%d", @@ -5310,6 +5359,7 @@ unlock: int notify (xlator_t *this, int32_t event, void *data, ...) { + int i = 0; int32_t ret = 0; fuse_private_t *private = NULL; gf_boolean_t start_thread = _gf_false; @@ -5358,14 +5408,21 @@ notify (xlator_t *this, int32_t event, void *data, ...) pthread_mutex_unlock (&private->sync_mutex); if (start_thread) { - ret = gf_thread_create (&private->fuse_thread, NULL, - fuse_thread_proc, this, - "fuseproc"); - if (ret != 0) { - gf_log (this->name, GF_LOG_DEBUG, - "pthread_create() failed (%s)", - strerror (errno)); - break; + private->fuse_thread = GF_CALLOC (private->reader_thread_count, + sizeof (pthread_t), + gf_fuse_mt_pthread_t); + for (i = 0; i < private->reader_thread_count; i++) { + + ret = gf_thread_create (&private->fuse_thread[i], + NULL, + fuse_thread_proc, this, + "fuseproc"); + if (ret != 0) { + gf_log (this->name, GF_LOG_DEBUG, + "pthread_create() failed (%s)", + strerror (errno)); + break; + } } } @@ -5472,7 +5529,8 @@ static fuse_handler_t *fuse_dump_ops[FUSE_OP_HIGH]; static void -fuse_dumper (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_dumper (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { fuse_private_t *priv = NULL; struct iovec diov[6] = {{0,},}; @@ -5504,7 +5562,7 @@ fuse_dumper (xlator_t *this, fuse_in_header_t *finh, void *msg) "failed to dump fuse message (R): %s", strerror (errno)); - priv->fuse_ops0[finh->opcode] (this, finh, msg); + priv->fuse_ops0[finh->opcode] (this, finh, msg, NULL); } @@ -5609,6 +5667,9 @@ init (xlator_t *this_xl) GF_OPTION_INIT (ZR_ATTR_TIMEOUT_OPT, priv->attribute_timeout, double, cleanup_exit); + GF_OPTION_INIT ("reader-thread-count", priv->reader_thread_count, uint32, + cleanup_exit); + GF_OPTION_INIT (ZR_ENTRY_TIMEOUT_OPT, priv->entry_timeout, double, cleanup_exit); @@ -5830,6 +5891,7 @@ init (xlator_t *this_xl) pthread_mutex_init (&priv->fuse_dump_mutex, NULL); pthread_cond_init (&priv->sync_cond, NULL); + pthread_cond_init (&priv->migrate_cond, NULL); pthread_mutex_init (&priv->sync_mutex, NULL); priv->event_recvd = 0; @@ -6034,5 +6096,12 @@ struct volume_options options[] = { .default_value = "false", .description = "Enables thin mount and connects via gfproxyd daemon.", }, + { .key = {"reader-thread-count"}, + .type = GF_OPTION_TYPE_INT, + .default_value = "4", + .min = 1, + .max = 64, + .description = "Sets fuse reader thread count.", + }, { .key = {NULL} }, }; diff --git a/xlators/mount/fuse/src/fuse-bridge.h b/xlators/mount/fuse/src/fuse-bridge.h index 52718161c24..6cf9d2f7cf8 100644 --- a/xlators/mount/fuse/src/fuse-bridge.h +++ b/xlators/mount/fuse/src/fuse-bridge.h @@ -52,7 +52,7 @@ typedef struct fuse_in_header fuse_in_header_t; typedef void (fuse_handler_t) (xlator_t *this, fuse_in_header_t *finh, - void *msg); + void *msg, struct iobuf *iobuf); struct fuse_private { int fd; @@ -62,7 +62,8 @@ struct fuse_private { char *mount_point; struct iobuf *iobuf; - pthread_t fuse_thread; + pthread_t *fuse_thread; + uint32_t reader_thread_count; char fuse_thread_started; uint32_t direct_io_mode; @@ -143,6 +144,9 @@ struct fuse_private { /* Load the thin volfile, and connect to gfproxyd*/ gf_boolean_t thin_client; + gf_boolean_t mount_finished; + gf_boolean_t handle_graph_switch; + pthread_cond_t migrate_cond; }; typedef struct fuse_private fuse_private_t; @@ -394,6 +398,7 @@ typedef struct { int32_t fd_no; gf_seek_what_t whence; + struct iobuf *iobuf; } fuse_state_t; typedef struct { diff --git a/xlators/mount/fuse/src/fuse-helpers.c b/xlators/mount/fuse/src/fuse-helpers.c index c59ff772cb8..c2d4d0cc9d8 100644 --- a/xlators/mount/fuse/src/fuse-helpers.c +++ b/xlators/mount/fuse/src/fuse-helpers.c @@ -123,6 +123,9 @@ get_fuse_state (xlator_t *this, fuse_in_header_t *finh) pthread_mutex_lock (&priv->sync_mutex); { + while (priv->handle_graph_switch) + pthread_cond_wait (&priv->migrate_cond, + &priv->sync_mutex); active_subvol = fuse_active_subvol (state->this); active_subvol->winds++; } diff --git a/xlators/mount/fuse/src/fuse-mem-types.h b/xlators/mount/fuse/src/fuse-mem-types.h index 2b4b473813d..721b9a347cf 100644 --- a/xlators/mount/fuse/src/fuse-mem-types.h +++ b/xlators/mount/fuse/src/fuse-mem-types.h @@ -23,6 +23,7 @@ enum gf_fuse_mem_types_ { gf_fuse_mt_graph_switch_args_t, gf_fuse_mt_gids_t, gf_fuse_mt_invalidate_node_t, + gf_fuse_mt_pthread_t, gf_fuse_mt_end }; #endif diff --git a/xlators/mount/fuse/utils/mount.glusterfs.in b/xlators/mount/fuse/utils/mount.glusterfs.in index fd616844d65..6890ff00121 100755 --- a/xlators/mount/fuse/utils/mount.glusterfs.in +++ b/xlators/mount/fuse/utils/mount.glusterfs.in @@ -225,6 +225,10 @@ start_glusterfs () cmd_line=$(echo "$cmd_line --event-history=$event_history"); fi + if [ -n "$reader_thread_count" ]; then + cmd_line=$(echo "$cmd_line --reader-thread-count=$reader_thread_count"); + fi + if [ -n "$volume_name" ]; then cmd_line=$(echo "$cmd_line --volume-name=$volume_name"); fi @@ -499,6 +503,9 @@ with_options() "event-history") event_history=$value ;; + "reader-thread-count") + reader_thread_count=$value + ;; "no-root-squash") if [ $value = "yes" ] || [ $value = "on" ] || |