diff options
author | Csaba Henk <csaba@redhat.com> | 2018-08-09 11:46:33 +0200 |
---|---|---|
committer | Amar Tumballi <amarts@redhat.com> | 2018-11-06 04:21:57 +0000 |
commit | bceb9f25671e65cb2f0987a84370055e7c36900f (patch) | |
tree | 3ed0a55bae57f2d4aae643a70d8a44c41bf77d6c | |
parent | 258db7178663305c26aa0068d4f72159ff0bc3ba (diff) |
fuse: interrupt handling framework
- add sub-framework to send timed responses to kernel
- add interrupt handler queue
- implement INTERRUPT
fuse_interrupt looks up handlers for interrupted messages
in the queue. If found, it invokes the handler function.
Else responds with EAGAIN with a delay.
See spec at
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/Documentation/filesystems/fuse.txt?h=v4.17#n148
and explanation in comments.
Change-Id: I1a79d3679b31f36e14b4ac8f60b7f2c1ea2badfb
updates: #465
Signed-off-by: Csaba Henk <csaba@redhat.com>
-rw-r--r-- | libglusterfs/src/libglusterfs.sym | 2 | ||||
-rw-r--r-- | libglusterfs/src/timespec.c | 16 | ||||
-rw-r--r-- | libglusterfs/src/timespec.h | 2 | ||||
-rw-r--r-- | xlators/mount/fuse/src/fuse-bridge.c | 472 | ||||
-rw-r--r-- | xlators/mount/fuse/src/fuse-bridge.h | 39 | ||||
-rw-r--r-- | xlators/mount/fuse/src/fuse-mem-types.h | 2 |
6 files changed, 532 insertions, 1 deletions
diff --git a/libglusterfs/src/libglusterfs.sym b/libglusterfs/src/libglusterfs.sym index 27eb1175b56..c8c42311c80 100644 --- a/libglusterfs/src/libglusterfs.sym +++ b/libglusterfs/src/libglusterfs.sym @@ -1057,6 +1057,8 @@ tbf_init tbf_throttle timespec_now timespec_sub +timespec_adjust_delta +timespec_cmp token_iter_init trap trie_add diff --git a/libglusterfs/src/timespec.c b/libglusterfs/src/timespec.c index d17506662ac..c0d4ab9315b 100644 --- a/libglusterfs/src/timespec.c +++ b/libglusterfs/src/timespec.c @@ -90,3 +90,19 @@ timespec_sub(const struct timespec *begin, const struct timespec *end, res->tv_nsec = end->tv_nsec - begin->tv_nsec; } } + +int +timespec_cmp(const struct timespec *lhs_ts, const struct timespec *rhs_ts) +{ + if (lhs_ts->tv_sec < rhs_ts->tv_sec) { + return -1; + } else if (lhs_ts->tv_sec > rhs_ts->tv_sec) { + return 1; + } else if (lhs_ts->tv_nsec < rhs_ts->tv_nsec) { + return -1; + } else if (lhs_ts->tv_nsec > rhs_ts->tv_nsec) { + return 1; + } + + return 0; +} diff --git a/libglusterfs/src/timespec.h b/libglusterfs/src/timespec.h index 73db2d16abe..871871d538c 100644 --- a/libglusterfs/src/timespec.h +++ b/libglusterfs/src/timespec.h @@ -25,5 +25,7 @@ timespec_adjust_delta(struct timespec *ts, struct timespec delta); void timespec_sub(const struct timespec *begin, const struct timespec *end, struct timespec *res); +int +timespec_cmp(const struct timespec *lhs_ts, const struct timespec *rhs_ts); #endif /* __INCLUDE_TIMESPEC_H__ */ diff --git a/xlators/mount/fuse/src/fuse-bridge.c b/xlators/mount/fuse/src/fuse-bridge.c index 75b91a43fb1..e4e894e6c9b 100644 --- a/xlators/mount/fuse/src/fuse-bridge.c +++ b/xlators/mount/fuse/src/fuse-bridge.c @@ -15,6 +15,7 @@ #include "compat-errno.h" #include "glusterfs-acl.h" #include "syscall.h" +#include "timespec.h" #ifdef __NetBSD__ #undef open /* in perfuse.h, pulled from mount-gluster-compat.h */ @@ -446,6 +447,362 @@ fuse_invalidate_inode(xlator_t *this, uint64_t fuse_ino) #endif } +static fuse_timed_message_t * +fuse_timed_message_new(void) +{ + fuse_timed_message_t *dmsg = NULL; + + dmsg = GF_MALLOC(sizeof(*dmsg), gf_fuse_mt_timed_message_t); + if (!dmsg) { + return NULL; + } + + /* should be NULL if not set */ + dmsg->fuse_message_body = NULL; + INIT_LIST_HEAD(&dmsg->next); + + return dmsg; +} + +static void +fuse_timed_message_free(fuse_timed_message_t *dmsg) +{ + GF_FREE(dmsg->fuse_message_body); + GF_FREE(dmsg); +} + +static void +send_fuse_timed(xlator_t *this, fuse_timed_message_t *dmsg) +{ + fuse_private_t *priv = NULL; + + priv = this->private; + + if (!priv->timed_response_fuse_thread_started) { + return; + } + + pthread_mutex_lock(&priv->timed_mutex); + { + list_add_tail(&dmsg->next, &priv->timed_list); + pthread_cond_signal(&priv->timed_cond); + } + pthread_mutex_unlock(&priv->timed_mutex); +} + +fuse_interrupt_record_t * +fuse_interrupt_record_new(fuse_in_header_t *finh, + fuse_interrupt_handler_t handler) +{ + fuse_interrupt_record_t *fir = NULL; + + fir = GF_MALLOC(sizeof(*fir), gf_fuse_mt_interrupt_record_t); + if (!fir) { + return NULL; + } + + fir->hit = _gf_false; + fir->interrupt_state = INTERRUPT_NONE; + fir->data = NULL; + + fir->interrupt_handler = handler; + memcpy(&fir->fuse_in_header, finh, sizeof(*finh)); + pthread_cond_init(&fir->handler_cond, NULL); + pthread_mutex_init(&fir->handler_mutex, NULL); + INIT_LIST_HEAD(&fir->next); + + return fir; +} + +static void +fuse_interrupt_record_free(fuse_interrupt_record_t *fir, void **datap) +{ + /* + * If caller wishes, we give back the private data to let them deal with it + * however they want; otherwise we take care of freeing it. + */ + if (datap) { + *datap = fir->data; + } else { + GF_FREE(fir->data); + } + + GF_FREE(fir); +} + +void +fuse_interrupt_record_insert(xlator_t *this, fuse_interrupt_record_t *fir) +{ + fuse_private_t *priv = NULL; + + priv = this->private; + pthread_mutex_lock(&priv->interrupt_mutex); + { + list_add_tail(&fir->next, &priv->interrupt_list); + } + pthread_mutex_unlock(&priv->interrupt_mutex); +} + +static fuse_interrupt_record_t * +fuse_interrupt_record_fetch(xlator_t *this, uint64_t unique, gf_boolean_t reap) +{ + fuse_interrupt_record_t *fir = NULL; + gf_boolean_t found = _gf_false; + fuse_private_t *priv = NULL; + + priv = this->private; + pthread_mutex_lock(&priv->interrupt_mutex); + { + list_for_each_entry(fir, &priv->interrupt_list, next) + { + if (fir->fuse_in_header.unique == unique) { + /* + * If we are to reap, we do it regardless the + * hit flag; otherwise we take the record only + * hasn't yet flagged hit. + */ + if (reap || !fir->hit) { + found = _gf_true; + } + /* + * If we are not reaping (coming from handler + * context), we set the hit flag. + */ + if (!reap) { + fir->hit = _gf_true; + } + break; + } + } + if (found && reap) { + list_del(&fir->next); + } + } + pthread_mutex_unlock(&priv->interrupt_mutex); + + if (found) { + return fir; + } + return NULL; +} + +static fuse_interrupt_record_t * +fuse_interrupt_record_get(xlator_t *this, uint64_t unique) +{ + return fuse_interrupt_record_fetch(this, unique, _gf_false); +} + +static fuse_interrupt_record_t * +fuse_interrupt_record_reap(xlator_t *this, uint64_t unique) +{ + return fuse_interrupt_record_fetch(this, unique, _gf_true); +} + +static void +fuse_interrupt(xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) +{ + struct fuse_interrupt_in *fii = msg; + fuse_interrupt_record_t *fir = NULL; + + gf_log("glusterfs-fuse", GF_LOG_TRACE, + "unique %" PRIu64 " INTERRUPT for %" PRIu64, finh->unique, + fii->unique); + + fir = fuse_interrupt_record_get(this, fii->unique); + if (fir) { + gf_log("glusterfs-fuse", GF_LOG_DEBUG, + "unique %" PRIu64 " INTERRUPT for %" PRIu64 + ": handler triggered", + finh->unique, fii->unique); + + fir->interrupt_handler(this, fir); + } else { + fuse_timed_message_t *dmsg = NULL; + + /* + * No record found for this interrupt request. + * + * It's either because the handler for the interrupted message + * does not want to handle interrupt, or this interrupt + * message beat the interrupted which hasn't yet added a record + * to the interrupt queue. Either case we reply with error + * EAGAIN with some (0.01 sec) delay. That will have this + * interrupt request resent, unless the interrupted message + * has been already answered. + * + * So effectively we are looping in between kernel and + * userspace, which will be exited either when the interrupted + * message handler has added an interrupt record, or has + * replied to kernel. See + * + * https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/ + * linux.git/tree/Documentation/filesystems/fuse.txt?h=v4.18#n148 + */ + + gf_log("glusterfs-fuse", GF_LOG_DEBUG, + "unique %" PRIu64 " INTERRUPT for %" PRIu64 ": no handler found", + finh->unique, fii->unique); + + dmsg = fuse_timed_message_new(); + if (!dmsg) { + gf_log("glusterfs-fuse", GF_LOG_ERROR, + "unique %" PRIu64 " INTERRUPT for %" PRIu64 + ":" + " failed to allocate timed message", + finh->unique, fii->unique); + + return; + } + + dmsg->fuse_out_header.unique = finh->unique; + dmsg->fuse_out_header.len = sizeof(dmsg->fuse_out_header); + dmsg->fuse_out_header.error = -EAGAIN; + timespec_now(&dmsg->scheduled_ts); + timespec_adjust_delta(&dmsg->scheduled_ts, + (struct timespec){0, 10000000}); + + send_fuse_timed(this, dmsg); + } +} + +/* + * Function to be called in fop cbk context (if the fop engages + * with interrupt handling). + */ +gf_boolean_t +fuse_interrupt_finish_fop(call_frame_t *frame, xlator_t *this, + gf_boolean_t sync, void **datap) +{ + fuse_interrupt_record_t *fir = NULL; + fuse_state_t *state = frame->root->state; + fuse_in_header_t *finh = state->finh; + gf_boolean_t hit = _gf_false; + gf_boolean_t handled = _gf_false; + fuse_interrupt_state_t intstat_orig = INTERRUPT_NONE; + + fir = fuse_interrupt_record_reap(this, finh->unique); + if (!fir) { + /* + * No interrupt record was inserted (however, caller would usually know + * about that and there is no point then in calling this function). + */ + return _gf_false; + } + + /* + * The interrupt handler (if finds the record) modifies fir->hit; however, + * that could have occurred only before fuse_interrupt_record_reap(), so + * we are safe here with a lock-free access. + */ + hit = fir->hit; + if (hit) { + pthread_mutex_lock(&fir->handler_mutex); + { + intstat_orig = fir->interrupt_state; + if (fir->interrupt_state == INTERRUPT_NONE) { + fir->interrupt_state = INTERRUPT_SQUELCHED; + if (sync) { + while (fir->interrupt_state == INTERRUPT_NONE) { + pthread_cond_wait(&fir->handler_cond, + &fir->handler_mutex); + } + } + } + } + pthread_mutex_unlock(&fir->handler_mutex); + } + + gf_log("glusterfs-fuse", GF_LOG_DEBUG, "intstat_orig=%d", intstat_orig); + + /* + * From this on fir can only be referred under the conditions that imply + * we are to free it (otherwise interrupt handler might have already freed + * it). + */ + + if (/* there was no interrupt */ + !hit || + /* lost the race against interrupt handler */ + intstat_orig != INTERRUPT_NONE || + /* we took cleaning up on us */ + sync) { + /* cleaning up */ + fuse_interrupt_record_free(fir, datap); + } else if (datap) { + *datap = NULL; + } + + handled = (intstat_orig == INTERRUPT_HANDLED); + if (handled) { + /* + * Fuse request was answered already from interrupt context, we can do + * away with the stack. + */ + free_fuse_state(state); + STACK_DESTROY(frame->root); + } + + /* + * Let caller know if they have to answer the fuse request. + */ + return handled; +} + +/* + * Function to be called in interrupt handler context. + */ +void +fuse_interrupt_finish_interrupt(xlator_t *this, fuse_interrupt_record_t *fir, + fuse_interrupt_state_t intstat, + gf_boolean_t sync, void **datap) +{ + fuse_in_header_t finh = { + 0, + }; + fuse_interrupt_state_t intstat_orig = INTERRUPT_NONE; + + pthread_mutex_lock(&fir->handler_mutex); + { + intstat_orig = fir->interrupt_state; + if (fir->interrupt_state == INTERRUPT_NONE) { + fir->interrupt_state = intstat; + if (sync) { + pthread_cond_signal(&fir->handler_cond); + } + } + finh = fir->fuse_in_header; + } + pthread_mutex_unlock(&fir->handler_mutex); + + gf_log("glusterfs-fuse", GF_LOG_DEBUG, "intstat_orig=%d", intstat_orig); + + /* + * From this on fir can only be referred under the conditions that imply + * we are to free it (otherwise fop handler might have already freed it). + */ + + if (/* we won the race, response is up to us */ + intstat_orig == INTERRUPT_NONE && + /* interrupt handling was successful, let the kernel know */ + intstat == INTERRUPT_HANDLED) { + send_fuse_err(this, &finh, EINTR); + } + + if (/* lost the race ... */ + intstat_orig != INTERRUPT_NONE && + /* + * ... and there is no contract with fop handler that it does the + * cleanup ... + */ + !sync) { + /* ... so we do! */ + fuse_interrupt_record_free(fir, datap); + } else if (datap) { + *datap = NULL; + } +} + int send_fuse_err(xlator_t *this, fuse_in_header_t *finh, int error) { @@ -4196,6 +4553,97 @@ notify_kernel_loop(void *data) } #endif +static void * +timed_response_loop(void *data) +{ + ssize_t rv = 0; + size_t len = 0; + xlator_t *this = NULL; + fuse_private_t *priv = NULL; + fuse_timed_message_t *dmsg = NULL; + fuse_timed_message_t *tmp = NULL; + struct timespec now = { + 0, + }; + struct timespec delta = { + 0, + }; + struct iovec iovs[2] = { + { + 0, + }, + }; + + this = data; + priv = this->private; + + for (;;) { + pthread_mutex_lock(&priv->timed_mutex); + { + while (list_empty(&priv->timed_list)) { + pthread_cond_wait(&priv->timed_cond, &priv->timed_mutex); + } + + dmsg = list_entry(priv->timed_list.next, fuse_timed_message_t, + next); + list_for_each_entry(tmp, &priv->timed_list, next) + { + if (timespec_cmp(&tmp->scheduled_ts, &dmsg->scheduled_ts) < 0) { + dmsg = tmp; + } + } + + list_del_init(&dmsg->next); + } + pthread_mutex_unlock(&priv->timed_mutex); + + timespec_now(&now); + if (timespec_cmp(&now, &dmsg->scheduled_ts) < 0) { + timespec_sub(&now, &dmsg->scheduled_ts, &delta); + nanosleep(&delta, NULL); + } + + gf_log("glusterfs-fuse", GF_LOG_TRACE, + "sending timed " + "message of unique %" PRIu64, + dmsg->fuse_out_header.unique); + + len = dmsg->fuse_out_header.len; + iovs[0] = (struct iovec){&dmsg->fuse_out_header, + sizeof(struct fuse_out_header)}; + iovs[1] = (struct iovec){dmsg->fuse_message_body, + len - sizeof(struct fuse_out_header)}; + rv = sys_writev(priv->fd, iovs, 2); + check_and_dump_fuse_W(priv, iovs, 2, rv); + + fuse_timed_message_free(dmsg); + + if (rv == -1 && errno == EBADF) { + break; + } + + if (rv != len && !(rv == -1 && errno == ENOENT)) { + gf_log("glusterfs-fuse", GF_LOG_INFO, + "len: %zu, rv: %zd, errno: %d", len, rv, errno); + } + } + + gf_log("glusterfs-fuse", GF_LOG_ERROR, "timed response loop terminated"); + + pthread_mutex_lock(&priv->timed_mutex); + { + priv->timed_response_fuse_thread_started = _gf_false; + list_for_each_entry_safe(dmsg, tmp, &priv->timed_list, next) + { + list_del_init(&dmsg->next); + fuse_timed_message_free(dmsg); + } + } + pthread_mutex_unlock(&priv->timed_mutex); + + return NULL; +} + static void fuse_init(xlator_t *this, fuse_in_header_t *finh, void *msg, struct iobuf *iobuf) @@ -4210,6 +4658,7 @@ fuse_init(xlator_t *this, fuse_in_header_t *finh, void *msg, #if FUSE_KERNEL_MINOR_VERSION >= 9 pthread_t messenger; #endif + pthread_t delayer; priv = this->private; @@ -4257,6 +4706,18 @@ fuse_init(xlator_t *this, fuse_in_header_t *finh, void *msg, fino.flags |= FUSE_BIG_WRITES; } + /* Start the thread processing timed responses */ + ret = gf_thread_create(&delayer, NULL, timed_response_loop, this, + "fusedlyd"); + if (ret != 0) { + gf_log("glusterfs-fuse", GF_LOG_ERROR, + "failed to start timed response thread (%s)", strerror(errno)); + + sys_close(priv->fd); + goto out; + } + priv->timed_response_fuse_thread_started = _gf_true; + /* Used for 'reverse invalidation of inode' */ if (fini->minor >= 12) { ret = gf_thread_create(&messenger, NULL, notify_kernel_loop, this, @@ -5300,6 +5761,8 @@ fuse_priv_dump(xlator_t *this) gf_proc_dump_write("init_recvd", "%d", (int)private->init_recvd); gf_proc_dump_write("strict_volfile_check", "%d", (int)private->strict_volfile_check); + gf_proc_dump_write("timed_response_thread_started", "%d", + (int)private->timed_response_fuse_thread_started); gf_proc_dump_write("reverse_thread_started", "%d", (int)private->reverse_fuse_thread_started); gf_proc_dump_write("use_readdirp", "%d", private->use_readdirp); @@ -5552,7 +6015,7 @@ static fuse_handler_t *fuse_std_ops[FUSE_OP_HIGH] = { [FUSE_SETLKW] = fuse_setlk, [FUSE_ACCESS] = fuse_access, [FUSE_CREATE] = fuse_create, - /* [FUSE_INTERRUPT] */ + [FUSE_INTERRUPT] = fuse_interrupt, /* [FUSE_BMAP] */ [FUSE_DESTROY] = fuse_destroy, /* [FUSE_IOCTL] */ @@ -5679,6 +6142,13 @@ init(xlator_t *this_xl) pthread_cond_init(&priv->invalidate_cond, NULL); pthread_mutex_init(&priv->invalidate_mutex, NULL); + INIT_LIST_HEAD(&priv->timed_list); + pthread_cond_init(&priv->timed_cond, NULL); + pthread_mutex_init(&priv->timed_mutex, NULL); + + INIT_LIST_HEAD(&priv->interrupt_list); + pthread_mutex_init(&priv->interrupt_mutex, NULL); + /* get options from option dictionary */ ret = dict_get_str(options, ZR_MOUNTPOINT_OPT, &value_string); if (ret == -1 || value_string == NULL) { diff --git a/xlators/mount/fuse/src/fuse-bridge.h b/xlators/mount/fuse/src/fuse-bridge.h index 318f33b5d61..b391af76bac 100644 --- a/xlators/mount/fuse/src/fuse-bridge.h +++ b/xlators/mount/fuse/src/fuse-bridge.h @@ -151,6 +151,16 @@ struct fuse_private { /* Writeback cache support */ gf_boolean_t kernel_writeback_cache; int attr_times_granularity; + + /* Delayed fuse response */ + struct list_head timed_list; + pthread_cond_t timed_cond; + pthread_mutex_t timed_mutex; + gf_boolean_t timed_response_fuse_thread_started; + + /* Interrupt subscription */ + struct list_head interrupt_list; + pthread_mutex_t interrupt_mutex; }; typedef struct fuse_private fuse_private_t; @@ -165,6 +175,35 @@ struct fuse_invalidate_node { }; typedef struct fuse_invalidate_node fuse_invalidate_node_t; +struct fuse_timed_message { + struct fuse_out_header fuse_out_header; + void *fuse_message_body; + struct timespec scheduled_ts; + struct list_head next; +}; +typedef struct fuse_timed_message fuse_timed_message_t; + +enum fuse_interrupt_state { + INTERRUPT_NONE, + INTERRUPT_SQUELCHED, + INTERRUPT_HANDLED, +}; +typedef enum fuse_interrupt_state fuse_interrupt_state_t; +struct fuse_interrupt_record; +typedef struct fuse_interrupt_record fuse_interrupt_record_t; +typedef void (*fuse_interrupt_handler_t)(xlator_t *this, + fuse_interrupt_record_t *); +struct fuse_interrupt_record { + struct fuse_in_header fuse_in_header; + void *data; + gf_boolean_t hit; + fuse_interrupt_state_t interrupt_state; + fuse_interrupt_handler_t interrupt_handler; + pthread_cond_t handler_cond; + pthread_mutex_t handler_mutex; + struct list_head next; +}; + struct fuse_graph_switch_args { xlator_t *this; xlator_t *old_subvol; diff --git a/xlators/mount/fuse/src/fuse-mem-types.h b/xlators/mount/fuse/src/fuse-mem-types.h index 5e6ab9308e0..c3c0028473a 100644 --- a/xlators/mount/fuse/src/fuse-mem-types.h +++ b/xlators/mount/fuse/src/fuse-mem-types.h @@ -24,6 +24,8 @@ enum gf_fuse_mem_types_ { gf_fuse_mt_gids_t, gf_fuse_mt_invalidate_node_t, gf_fuse_mt_pthread_t, + gf_fuse_mt_timed_message_t, + gf_fuse_mt_interrupt_record_t, gf_fuse_mt_end }; #endif |