summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKrutika Dhananjay <kdhananj@redhat.com>2018-01-09 15:11:00 +0530
committerRaghavendra G <rgowdapp@redhat.com>2018-04-02 06:10:30 +0000
commit08fadcc2a706342e4eee79dc7d9b48ba01fcb312 (patch)
tree64759c0d2701a80ca0aa5270a44018389306f2bb
parent25690197a6af67669346892c36cca471805b9305 (diff)
mount/fuse: Add support for multi-threaded fuse readers
Usage: Use 'reader-thread-count=<NUM>' as command line option to set the thread count at the time of mounting the volume. Next task is to make these threads auto-scale based on the load, instead of having the user remount the volume everytime to change the thread count. Updates #412 Change-Id: I94aa1505e5ae6a133683d473e0e4e0edd139b76b Signed-off-by: Krutika Dhananjay <kdhananj@redhat.com>
-rw-r--r--glusterfsd/src/glusterfsd.c26
-rw-r--r--glusterfsd/src/glusterfsd.h1
-rw-r--r--libglusterfs/src/glusterfs.h1
-rw-r--r--xlators/mount/fuse/src/fuse-bridge.c231
-rw-r--r--xlators/mount/fuse/src/fuse-bridge.h9
-rw-r--r--xlators/mount/fuse/src/fuse-helpers.c3
-rw-r--r--xlators/mount/fuse/src/fuse-mem-types.h1
-rwxr-xr-xxlators/mount/fuse/utils/mount.glusterfs.in7
8 files changed, 196 insertions, 83 deletions
diff --git a/glusterfsd/src/glusterfsd.c b/glusterfsd/src/glusterfsd.c
index a113e3c479f..cb744f04fdf 100644
--- a/glusterfsd/src/glusterfsd.c
+++ b/glusterfsd/src/glusterfsd.c
@@ -247,6 +247,8 @@ static struct argp_option gf_options[] = {
"option to specify the process type" },
{"event-history", ARGP_FUSE_EVENT_HISTORY_KEY, "BOOL",
OPTION_ARG_OPTIONAL, "disable/enable fuse event-history"},
+ {"reader-thread-count", ARGP_READER_THREAD_COUNT_KEY, "INTEGER",
+ OPTION_ARG_OPTIONAL, "set fuse reader thread count"},
{0, 0, 0, 0, "Miscellaneous Options:"},
{0, }
};
@@ -598,6 +600,16 @@ set_fuse_mount_options (glusterfs_ctx_t *ctx, dict_t *options)
goto err;
}
}
+ if (cmd_args->reader_thread_count) {
+ ret = dict_set_uint32 (options, "reader-thread-count",
+ cmd_args->reader_thread_count);
+ if (ret < 0) {
+ gf_msg ("glusterfsd", GF_LOG_ERROR, 0, glusterfsd_msg_4,
+ "failed to set dict value for key "
+ "reader-thread-count");
+ goto err;
+ }
+ }
ret = 0;
err:
@@ -1340,6 +1352,20 @@ no_oom_api:
argp_failure (state, -1, 0,
"unknown event-history setting \"%s\"", arg);
break;
+ case ARGP_READER_THREAD_COUNT_KEY:
+ if (gf_string2uint32 (arg, &cmd_args->reader_thread_count)) {
+ argp_failure (state, -1, 0,
+ "unknown reader thread count option %s",
+ arg);
+ } else if ((cmd_args->reader_thread_count < 1) ||
+ (cmd_args->reader_thread_count > 64)) {
+ argp_failure (state, -1, 0,
+ "Invalid reader thread count %s. "
+ "Valid range: [\"1, 64\"]", arg);
+ }
+
+ break;
+
}
return 0;
}
diff --git a/glusterfsd/src/glusterfsd.h b/glusterfsd/src/glusterfsd.h
index 4cbad534000..38785441817 100644
--- a/glusterfsd/src/glusterfsd.h
+++ b/glusterfsd/src/glusterfsd.h
@@ -101,6 +101,7 @@ enum argp_option_keys {
ARGP_PROCESS_NAME_KEY = 179,
ARGP_FUSE_EVENT_HISTORY_KEY = 180,
ARGP_THIN_CLIENT_KEY = 181,
+ ARGP_READER_THREAD_COUNT_KEY = 182,
};
struct _gfd_vol_top_priv {
diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h
index 84e33449dad..43e6f48d905 100644
--- a/libglusterfs/src/glusterfs.h
+++ b/libglusterfs/src/glusterfs.h
@@ -512,6 +512,7 @@ struct _cmd_args {
char *process_name;
char *event_history;
int thin_client;
+ uint32_t reader_thread_count;
};
typedef struct _cmd_args cmd_args_t;
diff --git a/xlators/mount/fuse/src/fuse-bridge.c b/xlators/mount/fuse/src/fuse-bridge.c
index dee9b16abf3..9e9fb815080 100644
--- a/xlators/mount/fuse/src/fuse-bridge.c
+++ b/xlators/mount/fuse/src/fuse-bridge.c
@@ -690,7 +690,8 @@ fuse_lookup_resume (fuse_state_t *state)
}
static void
-fuse_lookup (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_lookup (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
char *name = msg;
fuse_state_t *state = NULL;
@@ -718,7 +719,8 @@ do_forget(xlator_t *this, uint64_t unique, uint64_t nodeid, uint64_t nlookup)
}
static void
-fuse_forget (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_forget (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_forget_in *ffi = msg;
@@ -739,7 +741,8 @@ fuse_forget (xlator_t *this, fuse_in_header_t *finh, void *msg)
#if FUSE_KERNEL_MINOR_VERSION >= 16
static void
-fuse_batch_forget(xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_batch_forget(xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_batch_forget_in *fbfi = msg;
struct fuse_forget_one *ffo = (struct fuse_forget_one *) (fbfi + 1);
@@ -957,7 +960,8 @@ fuse_getattr_resume (fuse_state_t *state)
}
static void
-fuse_getattr (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_getattr (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
#if FUSE_KERNEL_MINOR_VERSION >= 9
struct fuse_getattr_in *fgi = msg;
@@ -1287,7 +1291,8 @@ fuse_setattr_resume (fuse_state_t *state)
}
static void
-fuse_setattr (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_setattr (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_setattr_in *fsi = msg;
@@ -1514,7 +1519,8 @@ fuse_access_resume (fuse_state_t *state)
}
static void
-fuse_access (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_access (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_access_in *fai = msg;
fuse_state_t *state = NULL;
@@ -1588,7 +1594,8 @@ fuse_readlink_resume (fuse_state_t *state)
}
static void
-fuse_readlink (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_readlink (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
fuse_state_t *state = NULL;
@@ -1638,7 +1645,8 @@ fuse_mknod_resume (fuse_state_t *state)
}
static void
-fuse_mknod (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_mknod (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_mknod_in *fmi = msg;
char *name = (char *)(fmi + 1);
@@ -1708,7 +1716,8 @@ fuse_mkdir_resume (fuse_state_t *state)
}
static void
-fuse_mkdir (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_mkdir (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_mkdir_in *fmi = msg;
char *name = (char *)(fmi + 1);
@@ -1760,7 +1769,8 @@ fuse_unlink_resume (fuse_state_t *state)
}
static void
-fuse_unlink (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_unlink (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
char *name = msg;
fuse_state_t *state = NULL;
@@ -1797,7 +1807,8 @@ fuse_rmdir_resume (fuse_state_t *state)
}
static void
-fuse_rmdir (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_rmdir (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
char *name = msg;
fuse_state_t *state = NULL;
@@ -1847,7 +1858,8 @@ fuse_symlink_resume (fuse_state_t *state)
}
static void
-fuse_symlink (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_symlink (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
char *name = msg;
char *linkname = name + strlen (name) + 1;
@@ -1969,7 +1981,8 @@ fuse_rename_resume (fuse_state_t *state)
}
static void
-fuse_rename (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_rename (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_rename_in *fri = msg;
char *oldname = (char *)(fri + 1);
@@ -2019,7 +2032,8 @@ fuse_link_resume (fuse_state_t *state)
}
static void
-fuse_link (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_link (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_link_in *fli = msg;
char *name = (char *)(fli + 1);
@@ -2208,7 +2222,8 @@ fuse_create_resume (fuse_state_t *state)
}
static void
-fuse_create (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_create (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
#if FUSE_KERNEL_MINOR_VERSION >= 12
struct fuse_create_in *fci = msg;
@@ -2298,7 +2313,8 @@ fuse_open_resume (fuse_state_t *state)
}
static void
-fuse_open (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_open (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_open_in *foi = msg;
fuse_state_t *state = NULL;
@@ -2375,7 +2391,8 @@ fuse_readv_resume (fuse_state_t *state)
}
static void
-fuse_readv (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_readv (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_read_in *fri = msg;
@@ -2451,8 +2468,6 @@ void
fuse_write_resume (fuse_state_t *state)
{
struct iobref *iobref = NULL;
- struct iobuf *iobuf = NULL;
-
iobref = iobref_new ();
if (!iobref) {
@@ -2465,8 +2480,7 @@ fuse_write_resume (fuse_state_t *state)
return;
}
- iobuf = ((fuse_private_t *) (state->this->private))->iobuf;
- iobref_add (iobref, iobuf);
+ iobref_add (iobref, state->iobuf);
gf_log ("glusterfs-fuse", GF_LOG_TRACE,
"%"PRIu64": WRITE (%p, size=%"GF_PRI_SIZET", offset=%"PRId64")",
@@ -2480,7 +2494,8 @@ fuse_write_resume (fuse_state_t *state)
}
static void
-fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
/* WRITE is special, metadata is attached to in_header,
* and msg is the payload as-is.
@@ -2523,6 +2538,7 @@ fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg)
state->vector.iov_base = msg;
state->vector.iov_len = fwi->size;
+ state->iobuf = iobuf;
fuse_resolve_and_resume (state, fuse_write_resume);
@@ -2561,7 +2577,8 @@ fuse_lseek_resume (fuse_state_t *state)
}
static void
-fuse_lseek (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_lseek (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_lseek_in *ffi = msg;
fuse_state_t *state = NULL;
@@ -2597,7 +2614,8 @@ fuse_flush_resume (fuse_state_t *state)
}
static void
-fuse_flush (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_flush (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_flush_in *ffi = msg;
@@ -2633,7 +2651,8 @@ fuse_internal_release (xlator_t *this, fd_t *fd)
}
static void
-fuse_release (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_release (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_release_in *fri = msg;
fd_t *fd = NULL;
@@ -2678,7 +2697,8 @@ fuse_fsync_resume (fuse_state_t *state)
}
static void
-fuse_fsync (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_fsync (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_fsync_in *fsi = msg;
@@ -2748,7 +2768,8 @@ fuse_opendir_resume (fuse_state_t *state)
}
static void
-fuse_opendir (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_opendir (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
/*
struct fuse_open_in *foi = msg;
@@ -2890,7 +2911,8 @@ fuse_readdir_resume (fuse_state_t *state)
}
static void
-fuse_readdir (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_readdir (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_read_in *fri = msg;
@@ -3041,7 +3063,8 @@ fuse_readdirp_resume (fuse_state_t *state)
static void
-fuse_readdirp (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_readdirp (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_read_in *fri = msg;
@@ -3088,7 +3111,8 @@ fuse_fallocate_resume(fuse_state_t *state)
}
static void
-fuse_fallocate(xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_fallocate(xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_fallocate_in *ffi = msg;
fuse_state_t *state = NULL;
@@ -3106,7 +3130,8 @@ fuse_fallocate(xlator_t *this, fuse_in_header_t *finh, void *msg)
#endif /* FUSE minor version >= 19 */
static void
-fuse_releasedir (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_releasedir (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_release_in *fri = msg;
fuse_state_t *state = NULL;
@@ -3147,7 +3172,8 @@ fuse_fsyncdir_resume (fuse_state_t *state)
}
static void
-fuse_fsyncdir (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_fsyncdir (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_fsync_in *fsi = msg;
@@ -3247,7 +3273,8 @@ fuse_statfs_resume (fuse_state_t *state)
static void
-fuse_statfs (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_statfs (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
fuse_state_t *state = NULL;
@@ -3299,7 +3326,8 @@ fuse_setxattr_resume (fuse_state_t *state)
static void
-fuse_setxattr (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_setxattr (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_setxattr_in *fsi = msg;
char *name = (char *)(fsi + 1);
@@ -3630,7 +3658,8 @@ fuse_getxattr_resume (fuse_state_t *state)
static void
-fuse_getxattr (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_getxattr (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_getxattr_in *fgxi = msg;
char *name = (char *)(fgxi + 1);
@@ -3736,7 +3765,8 @@ fuse_listxattr_resume (fuse_state_t *state)
static void
-fuse_listxattr (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_listxattr (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_getxattr_in *fgxi = msg;
fuse_state_t *state = NULL;
@@ -3792,7 +3822,8 @@ fuse_removexattr_resume (fuse_state_t *state)
static void
-fuse_removexattr (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_removexattr (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
char *name = msg;
@@ -3891,7 +3922,8 @@ fuse_getlk_resume (fuse_state_t *state)
static void
-fuse_getlk (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_getlk (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_lk_in *fli = msg;
@@ -3983,7 +4015,8 @@ fuse_setlk_resume (fuse_state_t *state)
static void
-fuse_setlk (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_setlk (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_lk_in *fli = msg;
@@ -4086,7 +4119,8 @@ notify_kernel_loop (void *data)
#endif
static void
-fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
struct fuse_init_in *fini = msg;
struct fuse_init_out fino = {0,};
@@ -4257,7 +4291,8 @@ fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg)
static void
-fuse_enosys (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_enosys (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
send_fuse_err (this, finh, ENOSYS);
@@ -4266,7 +4301,8 @@ fuse_enosys (xlator_t *this, fuse_in_header_t *finh, void *msg)
static void
-fuse_destroy (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_destroy (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
send_fuse_err (this, finh, 0);
@@ -4857,6 +4893,7 @@ fuse_graph_sync (xlator_t *this)
new_graph_id = priv->next_graph->id;
priv->next_graph = NULL;
need_first_lookup = 1;
+ priv->handle_graph_switch = _gf_true;
while (!priv->event_recvd) {
ret = pthread_cond_wait (&priv->sync_cond,
@@ -4885,6 +4922,8 @@ unlock:
{
old_subvol->switched = 1;
winds_on_old_subvol = old_subvol->winds;
+ priv->handle_graph_switch = _gf_false;
+ pthread_cond_broadcast (&priv->migrate_cond);
}
pthread_mutex_unlock (&priv->sync_mutex);
@@ -4892,6 +4931,13 @@ unlock:
xlator_notify (old_subvol, GF_EVENT_PARENT_DOWN,
old_subvol, NULL);
}
+ } else {
+ pthread_mutex_lock (&priv->sync_mutex);
+ {
+ priv->handle_graph_switch = _gf_false;
+ pthread_cond_broadcast (&priv->migrate_cond);
+ }
+ pthread_mutex_unlock (&priv->sync_mutex);
}
return 0;
@@ -4928,7 +4974,6 @@ fuse_thread_proc (void *data)
const size_t msg0_size = sizeof (*finh) + 128;
fuse_handler_t **fuse_ops = NULL;
struct pollfd pfd[2] = {{0,}};
- gf_boolean_t mount_finished = _gf_false;
this = data;
priv = this->private;
@@ -4945,32 +4990,40 @@ fuse_thread_proc (void *data)
/* THIS has to be reset here */
THIS = this;
- if (!mount_finished) {
- memset(pfd,0,sizeof(pfd));
- pfd[0].fd = priv->status_pipe[0];
- pfd[0].events = POLLIN | POLLHUP | POLLERR;
- pfd[1].fd = priv->fd;
- pfd[1].events = POLLIN | POLLHUP | POLLERR;
- if (poll(pfd,2,-1) < 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "poll error %s", strerror(errno));
- break;
- }
- if (pfd[0].revents & POLLIN) {
- if (fuse_get_mount_status(this) != 0) {
+ pthread_mutex_lock (&priv->sync_mutex);
+ {
+ if (!priv->mount_finished) {
+ memset(pfd, 0, sizeof(pfd));
+ pfd[0].fd = priv->status_pipe[0];
+ pfd[0].events = POLLIN | POLLHUP | POLLERR;
+ pfd[1].fd = priv->fd;
+ pfd[1].events = POLLIN | POLLHUP | POLLERR;
+ if (poll(pfd, 2, -1) < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "poll error %s",
+ strerror(errno));
+ pthread_mutex_unlock (&priv->sync_mutex);
break;
}
- mount_finished = _gf_true;
- }
- else if (pfd[0].revents) {
- gf_log (this->name, GF_LOG_ERROR,
- "mount pipe closed without status");
- break;
- }
- if (!pfd[1].revents) {
- continue;
+ if (pfd[0].revents & POLLIN) {
+ if (fuse_get_mount_status(this) != 0) {
+ pthread_mutex_unlock (&priv->sync_mutex);
+ break;
+ }
+ priv->mount_finished = _gf_true;
+ } else if (pfd[0].revents) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "mount pipe closed without status");
+ pthread_mutex_unlock (&priv->sync_mutex);
+ break;
+ }
+ if (!pfd[1].revents) {
+ pthread_mutex_unlock (&priv->sync_mutex);
+ continue;
+ }
}
}
+ pthread_mutex_unlock (&priv->sync_mutex);
/*
* We don't want to block on readv while we're still waiting
@@ -5065,8 +5118,6 @@ fuse_thread_proc (void *data)
break;
}
- priv->iobuf = iobuf;
-
/*
* This can be moved around a bit, but it's important to do it
* *after* the readv. Otherwise, a graph switch could occur
@@ -5109,9 +5160,9 @@ fuse_thread_proc (void *data)
if (finh->opcode >= FUSE_OP_HIGH)
/* turn down MacFUSE specific messages */
- fuse_enosys (this, finh, msg);
+ fuse_enosys (this, finh, msg, NULL);
else
- fuse_ops[finh->opcode] (this, finh, msg);
+ fuse_ops[finh->opcode] (this, finh, msg, iobuf);
iobuf_unref (iobuf);
continue;
@@ -5183,8 +5234,6 @@ fuse_priv_dump (xlator_t *this)
private->volfile_size);
gf_proc_dump_write("mount_point", "%s",
private->mount_point);
- gf_proc_dump_write("iobuf", "%u",
- private->iobuf);
gf_proc_dump_write("fuse_thread_started", "%d",
(int)private->fuse_thread_started);
gf_proc_dump_write("direct_io_mode", "%d",
@@ -5310,6 +5359,7 @@ unlock:
int
notify (xlator_t *this, int32_t event, void *data, ...)
{
+ int i = 0;
int32_t ret = 0;
fuse_private_t *private = NULL;
gf_boolean_t start_thread = _gf_false;
@@ -5358,14 +5408,21 @@ notify (xlator_t *this, int32_t event, void *data, ...)
pthread_mutex_unlock (&private->sync_mutex);
if (start_thread) {
- ret = gf_thread_create (&private->fuse_thread, NULL,
- fuse_thread_proc, this,
- "fuseproc");
- if (ret != 0) {
- gf_log (this->name, GF_LOG_DEBUG,
- "pthread_create() failed (%s)",
- strerror (errno));
- break;
+ private->fuse_thread = GF_CALLOC (private->reader_thread_count,
+ sizeof (pthread_t),
+ gf_fuse_mt_pthread_t);
+ for (i = 0; i < private->reader_thread_count; i++) {
+
+ ret = gf_thread_create (&private->fuse_thread[i],
+ NULL,
+ fuse_thread_proc, this,
+ "fuseproc");
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "pthread_create() failed (%s)",
+ strerror (errno));
+ break;
+ }
}
}
@@ -5472,7 +5529,8 @@ static fuse_handler_t *fuse_dump_ops[FUSE_OP_HIGH];
static void
-fuse_dumper (xlator_t *this, fuse_in_header_t *finh, void *msg)
+fuse_dumper (xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
{
fuse_private_t *priv = NULL;
struct iovec diov[6] = {{0,},};
@@ -5504,7 +5562,7 @@ fuse_dumper (xlator_t *this, fuse_in_header_t *finh, void *msg)
"failed to dump fuse message (R): %s",
strerror (errno));
- priv->fuse_ops0[finh->opcode] (this, finh, msg);
+ priv->fuse_ops0[finh->opcode] (this, finh, msg, NULL);
}
@@ -5609,6 +5667,9 @@ init (xlator_t *this_xl)
GF_OPTION_INIT (ZR_ATTR_TIMEOUT_OPT, priv->attribute_timeout, double,
cleanup_exit);
+ GF_OPTION_INIT ("reader-thread-count", priv->reader_thread_count, uint32,
+ cleanup_exit);
+
GF_OPTION_INIT (ZR_ENTRY_TIMEOUT_OPT, priv->entry_timeout, double,
cleanup_exit);
@@ -5830,6 +5891,7 @@ init (xlator_t *this_xl)
pthread_mutex_init (&priv->fuse_dump_mutex, NULL);
pthread_cond_init (&priv->sync_cond, NULL);
+ pthread_cond_init (&priv->migrate_cond, NULL);
pthread_mutex_init (&priv->sync_mutex, NULL);
priv->event_recvd = 0;
@@ -6034,5 +6096,12 @@ struct volume_options options[] = {
.default_value = "false",
.description = "Enables thin mount and connects via gfproxyd daemon.",
},
+ { .key = {"reader-thread-count"},
+ .type = GF_OPTION_TYPE_INT,
+ .default_value = "4",
+ .min = 1,
+ .max = 64,
+ .description = "Sets fuse reader thread count.",
+ },
{ .key = {NULL} },
};
diff --git a/xlators/mount/fuse/src/fuse-bridge.h b/xlators/mount/fuse/src/fuse-bridge.h
index 52718161c24..6cf9d2f7cf8 100644
--- a/xlators/mount/fuse/src/fuse-bridge.h
+++ b/xlators/mount/fuse/src/fuse-bridge.h
@@ -52,7 +52,7 @@
typedef struct fuse_in_header fuse_in_header_t;
typedef void (fuse_handler_t) (xlator_t *this, fuse_in_header_t *finh,
- void *msg);
+ void *msg, struct iobuf *iobuf);
struct fuse_private {
int fd;
@@ -62,7 +62,8 @@ struct fuse_private {
char *mount_point;
struct iobuf *iobuf;
- pthread_t fuse_thread;
+ pthread_t *fuse_thread;
+ uint32_t reader_thread_count;
char fuse_thread_started;
uint32_t direct_io_mode;
@@ -143,6 +144,9 @@ struct fuse_private {
/* Load the thin volfile, and connect to gfproxyd*/
gf_boolean_t thin_client;
+ gf_boolean_t mount_finished;
+ gf_boolean_t handle_graph_switch;
+ pthread_cond_t migrate_cond;
};
typedef struct fuse_private fuse_private_t;
@@ -394,6 +398,7 @@ typedef struct {
int32_t fd_no;
gf_seek_what_t whence;
+ struct iobuf *iobuf;
} fuse_state_t;
typedef struct {
diff --git a/xlators/mount/fuse/src/fuse-helpers.c b/xlators/mount/fuse/src/fuse-helpers.c
index c59ff772cb8..c2d4d0cc9d8 100644
--- a/xlators/mount/fuse/src/fuse-helpers.c
+++ b/xlators/mount/fuse/src/fuse-helpers.c
@@ -123,6 +123,9 @@ get_fuse_state (xlator_t *this, fuse_in_header_t *finh)
pthread_mutex_lock (&priv->sync_mutex);
{
+ while (priv->handle_graph_switch)
+ pthread_cond_wait (&priv->migrate_cond,
+ &priv->sync_mutex);
active_subvol = fuse_active_subvol (state->this);
active_subvol->winds++;
}
diff --git a/xlators/mount/fuse/src/fuse-mem-types.h b/xlators/mount/fuse/src/fuse-mem-types.h
index 2b4b473813d..721b9a347cf 100644
--- a/xlators/mount/fuse/src/fuse-mem-types.h
+++ b/xlators/mount/fuse/src/fuse-mem-types.h
@@ -23,6 +23,7 @@ enum gf_fuse_mem_types_ {
gf_fuse_mt_graph_switch_args_t,
gf_fuse_mt_gids_t,
gf_fuse_mt_invalidate_node_t,
+ gf_fuse_mt_pthread_t,
gf_fuse_mt_end
};
#endif
diff --git a/xlators/mount/fuse/utils/mount.glusterfs.in b/xlators/mount/fuse/utils/mount.glusterfs.in
index fd616844d65..6890ff00121 100755
--- a/xlators/mount/fuse/utils/mount.glusterfs.in
+++ b/xlators/mount/fuse/utils/mount.glusterfs.in
@@ -225,6 +225,10 @@ start_glusterfs ()
cmd_line=$(echo "$cmd_line --event-history=$event_history");
fi
+ if [ -n "$reader_thread_count" ]; then
+ cmd_line=$(echo "$cmd_line --reader-thread-count=$reader_thread_count");
+ fi
+
if [ -n "$volume_name" ]; then
cmd_line=$(echo "$cmd_line --volume-name=$volume_name");
fi
@@ -499,6 +503,9 @@ with_options()
"event-history")
event_history=$value
;;
+ "reader-thread-count")
+ reader_thread_count=$value
+ ;;
"no-root-squash")
if [ $value = "yes" ] ||
[ $value = "on" ] ||