summaryrefslogtreecommitdiffstats
path: root/xlators/mount
diff options
context:
space:
mode:
authorCsaba Henk <csaba@redhat.com>2018-08-09 11:46:33 +0200
committerAmar Tumballi <amarts@redhat.com>2018-11-06 04:21:57 +0000
commitbceb9f25671e65cb2f0987a84370055e7c36900f (patch)
tree3ed0a55bae57f2d4aae643a70d8a44c41bf77d6c /xlators/mount
parent258db7178663305c26aa0068d4f72159ff0bc3ba (diff)
fuse: interrupt handling framework
- add sub-framework to send timed responses to kernel - add interrupt handler queue - implement INTERRUPT fuse_interrupt looks up handlers for interrupted messages in the queue. If found, it invokes the handler function. Else responds with EAGAIN with a delay. See spec at https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/Documentation/filesystems/fuse.txt?h=v4.17#n148 and explanation in comments. Change-Id: I1a79d3679b31f36e14b4ac8f60b7f2c1ea2badfb updates: #465 Signed-off-by: Csaba Henk <csaba@redhat.com>
Diffstat (limited to 'xlators/mount')
-rw-r--r--xlators/mount/fuse/src/fuse-bridge.c472
-rw-r--r--xlators/mount/fuse/src/fuse-bridge.h39
-rw-r--r--xlators/mount/fuse/src/fuse-mem-types.h2
3 files changed, 512 insertions, 1 deletions
diff --git a/xlators/mount/fuse/src/fuse-bridge.c b/xlators/mount/fuse/src/fuse-bridge.c
index 75b91a43fb1..e4e894e6c9b 100644
--- a/xlators/mount/fuse/src/fuse-bridge.c
+++ b/xlators/mount/fuse/src/fuse-bridge.c
@@ -15,6 +15,7 @@
#include "compat-errno.h"
#include "glusterfs-acl.h"
#include "syscall.h"
+#include "timespec.h"
#ifdef __NetBSD__
#undef open /* in perfuse.h, pulled from mount-gluster-compat.h */
@@ -446,6 +447,362 @@ fuse_invalidate_inode(xlator_t *this, uint64_t fuse_ino)
#endif
}
+static fuse_timed_message_t *
+fuse_timed_message_new(void)
+{
+ fuse_timed_message_t *dmsg = NULL;
+
+ dmsg = GF_MALLOC(sizeof(*dmsg), gf_fuse_mt_timed_message_t);
+ if (!dmsg) {
+ return NULL;
+ }
+
+ /* should be NULL if not set */
+ dmsg->fuse_message_body = NULL;
+ INIT_LIST_HEAD(&dmsg->next);
+
+ return dmsg;
+}
+
+static void
+fuse_timed_message_free(fuse_timed_message_t *dmsg)
+{
+ GF_FREE(dmsg->fuse_message_body);
+ GF_FREE(dmsg);
+}
+
+static void
+send_fuse_timed(xlator_t *this, fuse_timed_message_t *dmsg)
+{
+ fuse_private_t *priv = NULL;
+
+ priv = this->private;
+
+ if (!priv->timed_response_fuse_thread_started) {
+ return;
+ }
+
+ pthread_mutex_lock(&priv->timed_mutex);
+ {
+ list_add_tail(&dmsg->next, &priv->timed_list);
+ pthread_cond_signal(&priv->timed_cond);
+ }
+ pthread_mutex_unlock(&priv->timed_mutex);
+}
+
+fuse_interrupt_record_t *
+fuse_interrupt_record_new(fuse_in_header_t *finh,
+ fuse_interrupt_handler_t handler)
+{
+ fuse_interrupt_record_t *fir = NULL;
+
+ fir = GF_MALLOC(sizeof(*fir), gf_fuse_mt_interrupt_record_t);
+ if (!fir) {
+ return NULL;
+ }
+
+ fir->hit = _gf_false;
+ fir->interrupt_state = INTERRUPT_NONE;
+ fir->data = NULL;
+
+ fir->interrupt_handler = handler;
+ memcpy(&fir->fuse_in_header, finh, sizeof(*finh));
+ pthread_cond_init(&fir->handler_cond, NULL);
+ pthread_mutex_init(&fir->handler_mutex, NULL);
+ INIT_LIST_HEAD(&fir->next);
+
+ return fir;
+}
+
+static void
+fuse_interrupt_record_free(fuse_interrupt_record_t *fir, void **datap)
+{
+ /*
+ * If caller wishes, we give back the private data to let them deal with it
+ * however they want; otherwise we take care of freeing it.
+ */
+ if (datap) {
+ *datap = fir->data;
+ } else {
+ GF_FREE(fir->data);
+ }
+
+ GF_FREE(fir);
+}
+
+void
+fuse_interrupt_record_insert(xlator_t *this, fuse_interrupt_record_t *fir)
+{
+ fuse_private_t *priv = NULL;
+
+ priv = this->private;
+ pthread_mutex_lock(&priv->interrupt_mutex);
+ {
+ list_add_tail(&fir->next, &priv->interrupt_list);
+ }
+ pthread_mutex_unlock(&priv->interrupt_mutex);
+}
+
+static fuse_interrupt_record_t *
+fuse_interrupt_record_fetch(xlator_t *this, uint64_t unique, gf_boolean_t reap)
+{
+ fuse_interrupt_record_t *fir = NULL;
+ gf_boolean_t found = _gf_false;
+ fuse_private_t *priv = NULL;
+
+ priv = this->private;
+ pthread_mutex_lock(&priv->interrupt_mutex);
+ {
+ list_for_each_entry(fir, &priv->interrupt_list, next)
+ {
+ if (fir->fuse_in_header.unique == unique) {
+ /*
+ * If we are to reap, we do it regardless the
+ * hit flag; otherwise we take the record only
+ * hasn't yet flagged hit.
+ */
+ if (reap || !fir->hit) {
+ found = _gf_true;
+ }
+ /*
+ * If we are not reaping (coming from handler
+ * context), we set the hit flag.
+ */
+ if (!reap) {
+ fir->hit = _gf_true;
+ }
+ break;
+ }
+ }
+ if (found && reap) {
+ list_del(&fir->next);
+ }
+ }
+ pthread_mutex_unlock(&priv->interrupt_mutex);
+
+ if (found) {
+ return fir;
+ }
+ return NULL;
+}
+
+static fuse_interrupt_record_t *
+fuse_interrupt_record_get(xlator_t *this, uint64_t unique)
+{
+ return fuse_interrupt_record_fetch(this, unique, _gf_false);
+}
+
+static fuse_interrupt_record_t *
+fuse_interrupt_record_reap(xlator_t *this, uint64_t unique)
+{
+ return fuse_interrupt_record_fetch(this, unique, _gf_true);
+}
+
+static void
+fuse_interrupt(xlator_t *this, fuse_in_header_t *finh, void *msg,
+ struct iobuf *iobuf)
+{
+ struct fuse_interrupt_in *fii = msg;
+ fuse_interrupt_record_t *fir = NULL;
+
+ gf_log("glusterfs-fuse", GF_LOG_TRACE,
+ "unique %" PRIu64 " INTERRUPT for %" PRIu64, finh->unique,
+ fii->unique);
+
+ fir = fuse_interrupt_record_get(this, fii->unique);
+ if (fir) {
+ gf_log("glusterfs-fuse", GF_LOG_DEBUG,
+ "unique %" PRIu64 " INTERRUPT for %" PRIu64
+ ": handler triggered",
+ finh->unique, fii->unique);
+
+ fir->interrupt_handler(this, fir);
+ } else {
+ fuse_timed_message_t *dmsg = NULL;
+
+ /*
+ * No record found for this interrupt request.
+ *
+ * It's either because the handler for the interrupted message
+ * does not want to handle interrupt, or this interrupt
+ * message beat the interrupted which hasn't yet added a record
+ * to the interrupt queue. Either case we reply with error
+ * EAGAIN with some (0.01 sec) delay. That will have this
+ * interrupt request resent, unless the interrupted message
+ * has been already answered.
+ *
+ * So effectively we are looping in between kernel and
+ * userspace, which will be exited either when the interrupted
+ * message handler has added an interrupt record, or has
+ * replied to kernel. See
+ *
+ * https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/
+ * linux.git/tree/Documentation/filesystems/fuse.txt?h=v4.18#n148
+ */
+
+ gf_log("glusterfs-fuse", GF_LOG_DEBUG,
+ "unique %" PRIu64 " INTERRUPT for %" PRIu64 ": no handler found",
+ finh->unique, fii->unique);
+
+ dmsg = fuse_timed_message_new();
+ if (!dmsg) {
+ gf_log("glusterfs-fuse", GF_LOG_ERROR,
+ "unique %" PRIu64 " INTERRUPT for %" PRIu64
+ ":"
+ " failed to allocate timed message",
+ finh->unique, fii->unique);
+
+ return;
+ }
+
+ dmsg->fuse_out_header.unique = finh->unique;
+ dmsg->fuse_out_header.len = sizeof(dmsg->fuse_out_header);
+ dmsg->fuse_out_header.error = -EAGAIN;
+ timespec_now(&dmsg->scheduled_ts);
+ timespec_adjust_delta(&dmsg->scheduled_ts,
+ (struct timespec){0, 10000000});
+
+ send_fuse_timed(this, dmsg);
+ }
+}
+
+/*
+ * Function to be called in fop cbk context (if the fop engages
+ * with interrupt handling).
+ */
+gf_boolean_t
+fuse_interrupt_finish_fop(call_frame_t *frame, xlator_t *this,
+ gf_boolean_t sync, void **datap)
+{
+ fuse_interrupt_record_t *fir = NULL;
+ fuse_state_t *state = frame->root->state;
+ fuse_in_header_t *finh = state->finh;
+ gf_boolean_t hit = _gf_false;
+ gf_boolean_t handled = _gf_false;
+ fuse_interrupt_state_t intstat_orig = INTERRUPT_NONE;
+
+ fir = fuse_interrupt_record_reap(this, finh->unique);
+ if (!fir) {
+ /*
+ * No interrupt record was inserted (however, caller would usually know
+ * about that and there is no point then in calling this function).
+ */
+ return _gf_false;
+ }
+
+ /*
+ * The interrupt handler (if finds the record) modifies fir->hit; however,
+ * that could have occurred only before fuse_interrupt_record_reap(), so
+ * we are safe here with a lock-free access.
+ */
+ hit = fir->hit;
+ if (hit) {
+ pthread_mutex_lock(&fir->handler_mutex);
+ {
+ intstat_orig = fir->interrupt_state;
+ if (fir->interrupt_state == INTERRUPT_NONE) {
+ fir->interrupt_state = INTERRUPT_SQUELCHED;
+ if (sync) {
+ while (fir->interrupt_state == INTERRUPT_NONE) {
+ pthread_cond_wait(&fir->handler_cond,
+ &fir->handler_mutex);
+ }
+ }
+ }
+ }
+ pthread_mutex_unlock(&fir->handler_mutex);
+ }
+
+ gf_log("glusterfs-fuse", GF_LOG_DEBUG, "intstat_orig=%d", intstat_orig);
+
+ /*
+ * From this on fir can only be referred under the conditions that imply
+ * we are to free it (otherwise interrupt handler might have already freed
+ * it).
+ */
+
+ if (/* there was no interrupt */
+ !hit ||
+ /* lost the race against interrupt handler */
+ intstat_orig != INTERRUPT_NONE ||
+ /* we took cleaning up on us */
+ sync) {
+ /* cleaning up */
+ fuse_interrupt_record_free(fir, datap);
+ } else if (datap) {
+ *datap = NULL;
+ }
+
+ handled = (intstat_orig == INTERRUPT_HANDLED);
+ if (handled) {
+ /*
+ * Fuse request was answered already from interrupt context, we can do
+ * away with the stack.
+ */
+ free_fuse_state(state);
+ STACK_DESTROY(frame->root);
+ }
+
+ /*
+ * Let caller know if they have to answer the fuse request.
+ */
+ return handled;
+}
+
+/*
+ * Function to be called in interrupt handler context.
+ */
+void
+fuse_interrupt_finish_interrupt(xlator_t *this, fuse_interrupt_record_t *fir,
+ fuse_interrupt_state_t intstat,
+ gf_boolean_t sync, void **datap)
+{
+ fuse_in_header_t finh = {
+ 0,
+ };
+ fuse_interrupt_state_t intstat_orig = INTERRUPT_NONE;
+
+ pthread_mutex_lock(&fir->handler_mutex);
+ {
+ intstat_orig = fir->interrupt_state;
+ if (fir->interrupt_state == INTERRUPT_NONE) {
+ fir->interrupt_state = intstat;
+ if (sync) {
+ pthread_cond_signal(&fir->handler_cond);
+ }
+ }
+ finh = fir->fuse_in_header;
+ }
+ pthread_mutex_unlock(&fir->handler_mutex);
+
+ gf_log("glusterfs-fuse", GF_LOG_DEBUG, "intstat_orig=%d", intstat_orig);
+
+ /*
+ * From this on fir can only be referred under the conditions that imply
+ * we are to free it (otherwise fop handler might have already freed it).
+ */
+
+ if (/* we won the race, response is up to us */
+ intstat_orig == INTERRUPT_NONE &&
+ /* interrupt handling was successful, let the kernel know */
+ intstat == INTERRUPT_HANDLED) {
+ send_fuse_err(this, &finh, EINTR);
+ }
+
+ if (/* lost the race ... */
+ intstat_orig != INTERRUPT_NONE &&
+ /*
+ * ... and there is no contract with fop handler that it does the
+ * cleanup ...
+ */
+ !sync) {
+ /* ... so we do! */
+ fuse_interrupt_record_free(fir, datap);
+ } else if (datap) {
+ *datap = NULL;
+ }
+}
+
int
send_fuse_err(xlator_t *this, fuse_in_header_t *finh, int error)
{
@@ -4196,6 +4553,97 @@ notify_kernel_loop(void *data)
}
#endif
+static void *
+timed_response_loop(void *data)
+{
+ ssize_t rv = 0;
+ size_t len = 0;
+ xlator_t *this = NULL;
+ fuse_private_t *priv = NULL;
+ fuse_timed_message_t *dmsg = NULL;
+ fuse_timed_message_t *tmp = NULL;
+ struct timespec now = {
+ 0,
+ };
+ struct timespec delta = {
+ 0,
+ };
+ struct iovec iovs[2] = {
+ {
+ 0,
+ },
+ };
+
+ this = data;
+ priv = this->private;
+
+ for (;;) {
+ pthread_mutex_lock(&priv->timed_mutex);
+ {
+ while (list_empty(&priv->timed_list)) {
+ pthread_cond_wait(&priv->timed_cond, &priv->timed_mutex);
+ }
+
+ dmsg = list_entry(priv->timed_list.next, fuse_timed_message_t,
+ next);
+ list_for_each_entry(tmp, &priv->timed_list, next)
+ {
+ if (timespec_cmp(&tmp->scheduled_ts, &dmsg->scheduled_ts) < 0) {
+ dmsg = tmp;
+ }
+ }
+
+ list_del_init(&dmsg->next);
+ }
+ pthread_mutex_unlock(&priv->timed_mutex);
+
+ timespec_now(&now);
+ if (timespec_cmp(&now, &dmsg->scheduled_ts) < 0) {
+ timespec_sub(&now, &dmsg->scheduled_ts, &delta);
+ nanosleep(&delta, NULL);
+ }
+
+ gf_log("glusterfs-fuse", GF_LOG_TRACE,
+ "sending timed "
+ "message of unique %" PRIu64,
+ dmsg->fuse_out_header.unique);
+
+ len = dmsg->fuse_out_header.len;
+ iovs[0] = (struct iovec){&dmsg->fuse_out_header,
+ sizeof(struct fuse_out_header)};
+ iovs[1] = (struct iovec){dmsg->fuse_message_body,
+ len - sizeof(struct fuse_out_header)};
+ rv = sys_writev(priv->fd, iovs, 2);
+ check_and_dump_fuse_W(priv, iovs, 2, rv);
+
+ fuse_timed_message_free(dmsg);
+
+ if (rv == -1 && errno == EBADF) {
+ break;
+ }
+
+ if (rv != len && !(rv == -1 && errno == ENOENT)) {
+ gf_log("glusterfs-fuse", GF_LOG_INFO,
+ "len: %zu, rv: %zd, errno: %d", len, rv, errno);
+ }
+ }
+
+ gf_log("glusterfs-fuse", GF_LOG_ERROR, "timed response loop terminated");
+
+ pthread_mutex_lock(&priv->timed_mutex);
+ {
+ priv->timed_response_fuse_thread_started = _gf_false;
+ list_for_each_entry_safe(dmsg, tmp, &priv->timed_list, next)
+ {
+ list_del_init(&dmsg->next);
+ fuse_timed_message_free(dmsg);
+ }
+ }
+ pthread_mutex_unlock(&priv->timed_mutex);
+
+ return NULL;
+}
+
static void
fuse_init(xlator_t *this, fuse_in_header_t *finh, void *msg,
struct iobuf *iobuf)
@@ -4210,6 +4658,7 @@ fuse_init(xlator_t *this, fuse_in_header_t *finh, void *msg,
#if FUSE_KERNEL_MINOR_VERSION >= 9
pthread_t messenger;
#endif
+ pthread_t delayer;
priv = this->private;
@@ -4257,6 +4706,18 @@ fuse_init(xlator_t *this, fuse_in_header_t *finh, void *msg,
fino.flags |= FUSE_BIG_WRITES;
}
+ /* Start the thread processing timed responses */
+ ret = gf_thread_create(&delayer, NULL, timed_response_loop, this,
+ "fusedlyd");
+ if (ret != 0) {
+ gf_log("glusterfs-fuse", GF_LOG_ERROR,
+ "failed to start timed response thread (%s)", strerror(errno));
+
+ sys_close(priv->fd);
+ goto out;
+ }
+ priv->timed_response_fuse_thread_started = _gf_true;
+
/* Used for 'reverse invalidation of inode' */
if (fini->minor >= 12) {
ret = gf_thread_create(&messenger, NULL, notify_kernel_loop, this,
@@ -5300,6 +5761,8 @@ fuse_priv_dump(xlator_t *this)
gf_proc_dump_write("init_recvd", "%d", (int)private->init_recvd);
gf_proc_dump_write("strict_volfile_check", "%d",
(int)private->strict_volfile_check);
+ gf_proc_dump_write("timed_response_thread_started", "%d",
+ (int)private->timed_response_fuse_thread_started);
gf_proc_dump_write("reverse_thread_started", "%d",
(int)private->reverse_fuse_thread_started);
gf_proc_dump_write("use_readdirp", "%d", private->use_readdirp);
@@ -5552,7 +6015,7 @@ static fuse_handler_t *fuse_std_ops[FUSE_OP_HIGH] = {
[FUSE_SETLKW] = fuse_setlk,
[FUSE_ACCESS] = fuse_access,
[FUSE_CREATE] = fuse_create,
- /* [FUSE_INTERRUPT] */
+ [FUSE_INTERRUPT] = fuse_interrupt,
/* [FUSE_BMAP] */
[FUSE_DESTROY] = fuse_destroy,
/* [FUSE_IOCTL] */
@@ -5679,6 +6142,13 @@ init(xlator_t *this_xl)
pthread_cond_init(&priv->invalidate_cond, NULL);
pthread_mutex_init(&priv->invalidate_mutex, NULL);
+ INIT_LIST_HEAD(&priv->timed_list);
+ pthread_cond_init(&priv->timed_cond, NULL);
+ pthread_mutex_init(&priv->timed_mutex, NULL);
+
+ INIT_LIST_HEAD(&priv->interrupt_list);
+ pthread_mutex_init(&priv->interrupt_mutex, NULL);
+
/* get options from option dictionary */
ret = dict_get_str(options, ZR_MOUNTPOINT_OPT, &value_string);
if (ret == -1 || value_string == NULL) {
diff --git a/xlators/mount/fuse/src/fuse-bridge.h b/xlators/mount/fuse/src/fuse-bridge.h
index 318f33b5d61..b391af76bac 100644
--- a/xlators/mount/fuse/src/fuse-bridge.h
+++ b/xlators/mount/fuse/src/fuse-bridge.h
@@ -151,6 +151,16 @@ struct fuse_private {
/* Writeback cache support */
gf_boolean_t kernel_writeback_cache;
int attr_times_granularity;
+
+ /* Delayed fuse response */
+ struct list_head timed_list;
+ pthread_cond_t timed_cond;
+ pthread_mutex_t timed_mutex;
+ gf_boolean_t timed_response_fuse_thread_started;
+
+ /* Interrupt subscription */
+ struct list_head interrupt_list;
+ pthread_mutex_t interrupt_mutex;
};
typedef struct fuse_private fuse_private_t;
@@ -165,6 +175,35 @@ struct fuse_invalidate_node {
};
typedef struct fuse_invalidate_node fuse_invalidate_node_t;
+struct fuse_timed_message {
+ struct fuse_out_header fuse_out_header;
+ void *fuse_message_body;
+ struct timespec scheduled_ts;
+ struct list_head next;
+};
+typedef struct fuse_timed_message fuse_timed_message_t;
+
+enum fuse_interrupt_state {
+ INTERRUPT_NONE,
+ INTERRUPT_SQUELCHED,
+ INTERRUPT_HANDLED,
+};
+typedef enum fuse_interrupt_state fuse_interrupt_state_t;
+struct fuse_interrupt_record;
+typedef struct fuse_interrupt_record fuse_interrupt_record_t;
+typedef void (*fuse_interrupt_handler_t)(xlator_t *this,
+ fuse_interrupt_record_t *);
+struct fuse_interrupt_record {
+ struct fuse_in_header fuse_in_header;
+ void *data;
+ gf_boolean_t hit;
+ fuse_interrupt_state_t interrupt_state;
+ fuse_interrupt_handler_t interrupt_handler;
+ pthread_cond_t handler_cond;
+ pthread_mutex_t handler_mutex;
+ struct list_head next;
+};
+
struct fuse_graph_switch_args {
xlator_t *this;
xlator_t *old_subvol;
diff --git a/xlators/mount/fuse/src/fuse-mem-types.h b/xlators/mount/fuse/src/fuse-mem-types.h
index 5e6ab9308e0..c3c0028473a 100644
--- a/xlators/mount/fuse/src/fuse-mem-types.h
+++ b/xlators/mount/fuse/src/fuse-mem-types.h
@@ -24,6 +24,8 @@ enum gf_fuse_mem_types_ {
gf_fuse_mt_gids_t,
gf_fuse_mt_invalidate_node_t,
gf_fuse_mt_pthread_t,
+ gf_fuse_mt_timed_message_t,
+ gf_fuse_mt_interrupt_record_t,
gf_fuse_mt_end
};
#endif