diff options
-rw-r--r-- | cli/src/cli-rl.c | 8 | ||||
-rw-r--r-- | glusterfsd/src/glusterfsd-mgmt.c | 122 | ||||
-rw-r--r-- | libglusterfs/src/event-epoll.c | 81 | ||||
-rw-r--r-- | libglusterfs/src/event-poll.c | 4 | ||||
-rw-r--r-- | libglusterfs/src/event.c | 59 | ||||
-rw-r--r-- | libglusterfs/src/event.h | 11 | ||||
-rw-r--r-- | libglusterfs/src/glusterfs.h | 2 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 96 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 6 |
9 files changed, 268 insertions, 121 deletions
diff --git a/cli/src/cli-rl.c b/cli/src/cli-rl.c index bca37d9c509..4745cf49369 100644 --- a/cli/src/cli-rl.c +++ b/cli/src/cli-rl.c @@ -108,11 +108,17 @@ cli_rl_process_line (char *line) int -cli_rl_stdin (int fd, int idx, void *data, +cli_rl_stdin (int fd, int idx, int gen, void *data, int poll_out, int poll_in, int poll_err) { + struct cli_state *state = NULL; + + state = data; + rl_callback_read_char (); + event_handled (state->ctx->event_pool, fd, idx, gen); + return 0; } diff --git a/glusterfsd/src/glusterfsd-mgmt.c b/glusterfsd/src/glusterfsd-mgmt.c index 6db2a0fca3f..9b76c7576d4 100644 --- a/glusterfsd/src/glusterfsd-mgmt.c +++ b/glusterfsd/src/glusterfsd-mgmt.c @@ -1732,8 +1732,7 @@ out: /* XXX: move these into @ctx */ static char *oldvolfile = NULL; -static int oldvollen = 0; - +static int oldvollen; int @@ -1743,7 +1742,7 @@ mgmt_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count, gf_getspec_rsp rsp = {0,}; call_frame_t *frame = NULL; glusterfs_ctx_t *ctx = NULL; - int ret = 0; + int ret = 0, locked = 0; ssize_t size = 0; FILE *tmpfp = NULL; char *volfilebuf = NULL; @@ -1773,74 +1772,85 @@ mgmt_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count, ret = 0; size = rsp.op_ret; - if (size == oldvollen && (memcmp (oldvolfile, rsp.spec, size) == 0)) { - gf_log (frame->this->name, GF_LOG_INFO, - "No change in volfile, continuing"); - goto out; - } + LOCK (&ctx->volfile_lock); + { + locked = 1; - tmpfp = tmpfile (); - if (!tmpfp) { - ret = -1; - goto out; - } + if (size == oldvollen && (memcmp (oldvolfile, rsp.spec, size) == 0)) { + gf_log (frame->this->name, GF_LOG_INFO, + "No change in volfile, continuing"); + goto out; + } - fwrite (rsp.spec, size, 1, tmpfp); - fflush (tmpfp); - if (ferror (tmpfp)) { - ret = -1; - goto out; - } + tmpfp = tmpfile (); + if (!tmpfp) { + ret = -1; + goto out; + } + + fwrite (rsp.spec, size, 1, tmpfp); + fflush (tmpfp); + if (ferror (tmpfp)) { + ret = -1; + goto out; + } + + /* Check if only options have changed. No need to reload the + * volfile if topology hasn't changed. + * glusterfs_volfile_reconfigure returns 3 possible return states + * return 0 =======> reconfiguration of options has succeeded + * return 1 =======> the graph has to be reconstructed and all the xlators should be inited + * return -1(or -ve) =======> Some Internal Error occurred during the operation + */ + + ret = glusterfs_volfile_reconfigure (oldvollen, tmpfp, ctx, oldvolfile); + if (ret == 0) { + gf_log ("glusterfsd-mgmt", GF_LOG_DEBUG, + "No need to re-load volfile, reconfigure done"); + if (oldvolfile) + volfilebuf = GF_REALLOC (oldvolfile, size); + else + volfilebuf = GF_CALLOC (1, size, gf_common_mt_char); + if (!volfilebuf) { + ret = -1; + goto out; + } + oldvolfile = volfilebuf; + oldvollen = size; + memcpy (oldvolfile, rsp.spec, size); + goto out; + } - /* Check if only options have changed. No need to reload the - * volfile if topology hasn't changed. - * glusterfs_volfile_reconfigure returns 3 possible return states - * return 0 =======> reconfiguration of options has succeeded - * return 1 =======> the graph has to be reconstructed and all the xlators should be inited - * return -1(or -ve) =======> Some Internal Error occurred during the operation - */ + if (ret < 0) { + gf_log ("glusterfsd-mgmt", + GF_LOG_DEBUG, "Reconfigure failed !!"); + goto out; + } + + ret = glusterfs_process_volfp (ctx, tmpfp); + /* tmpfp closed */ + tmpfp = NULL; + if (ret) + goto out; - ret = glusterfs_volfile_reconfigure (oldvollen, tmpfp, ctx, oldvolfile); - if (ret == 0) { - gf_log ("glusterfsd-mgmt", GF_LOG_DEBUG, - "No need to re-load volfile, reconfigure done"); if (oldvolfile) volfilebuf = GF_REALLOC (oldvolfile, size); else volfilebuf = GF_CALLOC (1, size, gf_common_mt_char); + if (!volfilebuf) { ret = -1; goto out; } + oldvolfile = volfilebuf; oldvollen = size; memcpy (oldvolfile, rsp.spec, size); - goto out; } + UNLOCK (&ctx->volfile_lock); - if (ret < 0) { - gf_log ("glusterfsd-mgmt", GF_LOG_DEBUG, "Reconfigure failed !!"); - goto out; - } - - ret = glusterfs_process_volfp (ctx, tmpfp); - /* tmpfp closed */ - tmpfp = NULL; - if (ret) - goto out; - - if (oldvolfile) - volfilebuf = GF_REALLOC (oldvolfile, size); - else - volfilebuf = GF_CALLOC (1, size, gf_common_mt_char); + locked = 0; - if (!volfilebuf) { - ret = -1; - goto out; - } - oldvolfile = volfilebuf; - oldvollen = size; - memcpy (oldvolfile, rsp.spec, size); if (!is_mgmt_rpc_reconnect) { need_emancipate = 1; glusterfs_mgmt_pmap_signin (ctx); @@ -1848,6 +1858,10 @@ mgmt_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count, } out: + + if (locked) + UNLOCK (&ctx->volfile_lock); + STACK_DESTROY (frame->root); free (rsp.spec); @@ -2345,6 +2359,8 @@ glusterfs_mgmt_init (glusterfs_ctx_t *ctx) if (ctx->mgmt) return 0; + LOCK_INIT (&ctx->volfile_lock); + if (cmd_args->volfile_server_port) port = cmd_args->volfile_server_port; diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c index e2b40602e7a..4b76cc96fd3 100644 --- a/libglusterfs/src/event-epoll.c +++ b/libglusterfs/src/event-epoll.c @@ -569,38 +569,11 @@ pre_unlock: if (!handler) goto out; - ret = handler (fd, idx, data, + ret = handler (fd, idx, gen, data, (event->events & (EPOLLIN|EPOLLPRI)), (event->events & (EPOLLOUT)), (event->events & (EPOLLERR|EPOLLHUP))); - LOCK (&slot->lock); - { - slot->in_handler--; - - if (gen != slot->gen) { - /* event_unregister() happened while we were - in handler() - */ - gf_msg_debug ("epoll", 0, "generation bumped on idx=%d" - " from gen=%d to slot->gen=%d, fd=%d, " - "slot->fd=%d", idx, gen, slot->gen, fd, - slot->fd); - goto post_unlock; - } - - /* This call also picks up the changes made by another - thread calling event_select_on_epoll() while this - thread was busy in handler() - */ - if (slot->in_handler == 0) { - event->events = slot->events; - ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, - fd, event); - } - } -post_unlock: - UNLOCK (&slot->lock); out: event_slot_unref (event_pool, slot, idx); @@ -891,6 +864,55 @@ event_pool_destroy_epoll (struct event_pool *event_pool) return ret; } +static int +event_handled_epoll (struct event_pool *event_pool, int fd, int idx, int gen) +{ + struct event_slot_epoll *slot = NULL; + struct epoll_event epoll_event = {0, }; + struct event_data *ev_data = (void *)&epoll_event.data; + int ret = 0; + + slot = event_slot_get (event_pool, idx); + + assert (slot->fd == fd); + + LOCK (&slot->lock); + { + slot->in_handler--; + + if (gen != slot->gen) { + /* event_unregister() happened while we were + in handler() + */ + gf_msg_debug ("epoll", 0, "generation bumped on idx=%d" + " from gen=%d to slot->gen=%d, fd=%d, " + "slot->fd=%d", idx, gen, slot->gen, fd, + slot->fd); + goto post_unlock; + } + + /* This call also picks up the changes made by another + thread calling event_select_on_epoll() while this + thread was busy in handler() + */ + if (slot->in_handler == 0) { + epoll_event.events = slot->events; + ev_data->idx = idx; + ev_data->gen = gen; + + ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD, + fd, &epoll_event); + } + } +post_unlock: + UNLOCK (&slot->lock); + + event_slot_unref (event_pool, slot, idx); + + return ret; +} + + struct event_ops event_ops_epoll = { .new = event_pool_new_epoll, .event_register = event_register_epoll, @@ -899,7 +921,8 @@ struct event_ops event_ops_epoll = { .event_unregister_close = event_unregister_close_epoll, .event_dispatch = event_dispatch_epoll, .event_reconfigure_threads = event_reconfigure_threads_epoll, - .event_pool_destroy = event_pool_destroy_epoll + .event_pool_destroy = event_pool_destroy_epoll, + .event_handled = event_handled_epoll, }; #endif diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c index 2006e33d33b..3bffc4784d7 100644 --- a/libglusterfs/src/event-poll.c +++ b/libglusterfs/src/event-poll.c @@ -40,7 +40,7 @@ event_register_poll (struct event_pool *event_pool, int fd, static int -__flush_fd (int fd, int idx, void *data, +__flush_fd (int fd, int idx, int gen, void *data, int poll_in, int poll_out, int poll_err) { char buf[64]; @@ -386,7 +386,7 @@ unlock: pthread_mutex_unlock (&event_pool->mutex); if (handler) - ret = handler (ufds[i].fd, idx, data, + ret = handler (ufds[i].fd, idx, 0, data, (ufds[i].revents & (POLLIN|POLLPRI)), (ufds[i].revents & (POLLOUT)), (ufds[i].revents & (POLLERR|POLLHUP|POLLNVAL))); diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c index 6aaa53499df..bba6f8429a1 100644 --- a/libglusterfs/src/event.c +++ b/libglusterfs/src/event.c @@ -159,8 +159,9 @@ event_pool_destroy (struct event_pool *event_pool) } pthread_mutex_unlock (&event_pool->mutex); - if (!destroy || (activethreadcount > 0)) + if (!destroy || (activethreadcount > 0)) { goto out; + } ret = event_pool->ops->event_pool_destroy (event_pool); out: @@ -168,19 +169,27 @@ out: } int -poller_destroy_handler (int fd, int idx, void *data, +poller_destroy_handler (int fd, int idx, int gen, void *data, int poll_out, int poll_in, int poll_err) { - int readfd = -1; - char buf = '\0'; + struct event_destroy_data *destroy = NULL; + int readfd = -1, ret = -1; + char buf = '\0'; - readfd = *(int *)data; - if (readfd < 0) - return -1; + destroy = data; + readfd = destroy->readfd; + if (readfd < 0) { + goto out; + } while (sys_read (readfd, &buf, 1) > 0) { } - return 0; + + ret = 0; +out: + event_handled (destroy->pool, fd, idx, gen); + + return ret; } /* This function destroys all the poller threads. @@ -197,11 +206,12 @@ poller_destroy_handler (int fd, int idx, void *data, int event_dispatch_destroy (struct event_pool *event_pool) { - int ret = -1; - int fd[2] = {-1}; - int idx = -1; - int flags = 0; - struct timespec sleep_till = {0, }; + int ret = -1, threadcount = 0; + int fd[2] = {-1}; + int idx = -1; + int flags = 0; + struct timespec sleep_till = {0, }; + struct event_destroy_data data = {0, }; GF_VALIDATE_OR_GOTO ("event", event_pool, out); @@ -223,10 +233,13 @@ event_dispatch_destroy (struct event_pool *event_pool) if (ret < 0) goto out; + data.pool = event_pool; + data.readfd = fd[1]; + /* From the main thread register an event on the pipe fd[0], */ idx = event_register (event_pool, fd[0], poller_destroy_handler, - &fd[1], 1, 0); + &data, 1, 0); if (idx < 0) goto out; @@ -235,6 +248,7 @@ event_dispatch_destroy (struct event_pool *event_pool) */ pthread_mutex_lock (&event_pool->mutex); { + threadcount = event_pool->eventthreadcount; event_pool->destroy = 1; } pthread_mutex_unlock (&event_pool->mutex); @@ -254,9 +268,11 @@ event_dispatch_destroy (struct event_pool *event_pool) */ int retry = 0; - while (event_pool->activethreadcount > 0 && retry++ < 10) { - if (sys_write (fd[1], "dummy", 6) == -1) + while (event_pool->activethreadcount > 0 + && (retry++ < (threadcount + 10))) { + if (sys_write (fd[1], "dummy", 6) == -1) { break; + } sleep_till.tv_sec = time (NULL) + 1; ret = pthread_cond_timedwait (&event_pool->cond, &event_pool->mutex, @@ -275,3 +291,14 @@ event_dispatch_destroy (struct event_pool *event_pool) return ret; } + +int +event_handled (struct event_pool *event_pool, int fd, int idx, int gen) +{ + int ret = 0; + + if (event_pool->ops->event_handled) + ret = event_pool->ops->event_handled (event_pool, fd, idx, gen); + + return ret; +} diff --git a/libglusterfs/src/event.h b/libglusterfs/src/event.h index 1348f5d05c0..c60b14ad04b 100644 --- a/libglusterfs/src/event.h +++ b/libglusterfs/src/event.h @@ -23,7 +23,7 @@ struct event_data { } __attribute__ ((__packed__, __may_alias__)); -typedef int (*event_handler_t) (int fd, int idx, void *data, +typedef int (*event_handler_t) (int fd, int idx, int gen, void *data, int poll_in, int poll_out, int poll_err); #define EVENT_EPOLL_TABLES 1024 @@ -73,6 +73,11 @@ struct event_pool { }; +struct event_destroy_data { + int readfd; + struct event_pool *pool; +}; + struct event_ops { struct event_pool * (*new) (int count, int eventthreadcount); @@ -93,6 +98,8 @@ struct event_ops { int (*event_reconfigure_threads) (struct event_pool *event_pool, int newcount); int (*event_pool_destroy) (struct event_pool *event_pool); + int (*event_handled) (struct event_pool *event_pool, int fd, int idx, + int gen); }; struct event_pool *event_pool_new (int count, int eventthreadcount); @@ -107,4 +114,6 @@ int event_dispatch (struct event_pool *event_pool); int event_reconfigure_threads (struct event_pool *event_pool, int value); int event_pool_destroy (struct event_pool *event_pool); int event_dispatch_destroy (struct event_pool *event_pool); +int event_handled (struct event_pool *event_pool, int fd, int idx, int gen); + #endif /* _EVENT_H_ */ diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index 839a1d47d2a..2e709b9d703 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -520,6 +520,8 @@ struct _glusterfs_ctx { int notifying; struct gf_ctx_tw *tw; /* refcounted timer_wheel */ + + gf_lock_t volfile_lock; }; typedef struct _glusterfs_ctx glusterfs_ctx_t; diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 8ba2692cdc6..e14152c5822 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -1172,11 +1172,11 @@ out: } -static int -socket_event_poll_err (rpc_transport_t *this) +static gf_boolean_t +socket_event_poll_err (rpc_transport_t *this, int gen, int idx) { - socket_private_t *priv = NULL; - int ret = -1; + socket_private_t *priv = NULL; + gf_boolean_t socket_closed = _gf_false; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -1185,15 +1185,29 @@ socket_event_poll_err (rpc_transport_t *this) pthread_mutex_lock (&priv->lock); { - __socket_ioq_flush (this); - __socket_reset (this); + if ((priv->gen == gen) && (priv->idx == idx) + && (priv->sock != -1)) { + __socket_ioq_flush (this); + __socket_reset (this); + socket_closed = _gf_true; + } } pthread_mutex_unlock (&priv->lock); - rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); + if (socket_closed) { + pthread_mutex_lock (&priv->notify.lock); + { + while (priv->notify.in_progress) + pthread_cond_wait (&priv->notify.cond, + &priv->notify.lock); + } + pthread_mutex_unlock (&priv->notify.lock); + + rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); + } out: - return ret; + return socket_closed; } @@ -2271,22 +2285,50 @@ out: static int -socket_event_poll_in (rpc_transport_t *this) +socket_event_poll_in (rpc_transport_t *this, gf_boolean_t notify_handled) { int ret = -1; rpc_transport_pollin_t *pollin = NULL; socket_private_t *priv = this->private; + glusterfs_ctx_t *ctx = NULL; + + ctx = this->ctx; ret = socket_proto_state_machine (this, &pollin); + if (pollin) { + pthread_mutex_lock (&priv->notify.lock); + { + priv->notify.in_progress++; + } + pthread_mutex_unlock (&priv->notify.lock); + } + + + if (notify_handled && (ret != -1)) + event_handled (ctx->event_pool, priv->sock, priv->idx, + priv->gen); + if (pollin) { priv->ot_state = OT_CALLBACK; + ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, pollin); + if (priv->ot_state == OT_CALLBACK) { priv->ot_state = OT_RUNNING; } + rpc_transport_pollin_destroy (pollin); + + pthread_mutex_lock (&priv->notify.lock); + { + --priv->notify.in_progress; + + if (!priv->notify.in_progress) + pthread_cond_signal (&priv->notify.cond); + } + pthread_mutex_unlock (&priv->notify.lock); } return ret; @@ -2369,24 +2411,29 @@ static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait); /* reads rpc_requests during pollin */ static int -socket_event_handler (int fd, int idx, void *data, +socket_event_handler (int fd, int idx, int gen, void *data, int poll_in, int poll_out, int poll_err) { - rpc_transport_t *this = NULL; - socket_private_t *priv = NULL; - int ret = -1; + rpc_transport_t *this = NULL; + socket_private_t *priv = NULL; + int ret = -1; + glusterfs_ctx_t *ctx = NULL; + gf_boolean_t socket_closed = _gf_false, notify_handled = _gf_false; this = data; + GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); GF_VALIDATE_OR_GOTO ("socket", this->xl, out); THIS = this->xl; priv = this->private; + ctx = this->ctx; pthread_mutex_lock (&priv->lock); { priv->idx = idx; + priv->gen = gen; } pthread_mutex_unlock (&priv->lock); @@ -2417,16 +2464,23 @@ socket_event_handler (int fd, int idx, void *data, } if (!ret && poll_in) { - ret = socket_event_poll_in (this); + ret = socket_event_poll_in (this, !poll_err); + notify_handled = _gf_true; } if ((ret < 0) || poll_err) { /* Logging has happened already in earlier cases */ gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG), "EPOLLERR - disconnecting now"); - socket_event_poll_err (this); - rpc_transport_unref (this); - } + + socket_closed = socket_event_poll_err (this, gen, idx); + + if (socket_closed) + rpc_transport_unref (this); + + } else if (!notify_handled) { + event_handled (ctx->event_pool, fd, idx, gen); + } out: return ret; @@ -2533,7 +2587,7 @@ socket_poller (void *ctx) } if (pfd[1].revents & POLL_MASK_INPUT) { - ret = socket_event_poll_in(this); + ret = socket_event_poll_in(this, 0); if (ret >= 0) { /* Suppress errors while making progress. */ pfd[1].revents &= ~POLL_MASK_ERROR; @@ -2657,7 +2711,7 @@ socket_spawn (rpc_transport_t *this) } static int -socket_server_event_handler (int fd, int idx, void *data, +socket_server_event_handler (int fd, int idx, int gen, void *data, int poll_in, int poll_out, int poll_err) { rpc_transport_t *this = NULL; @@ -2913,6 +2967,8 @@ socket_server_event_handler (int fd, int idx, void *data, } } out: + event_handled (ctx->event_pool, fd, idx, gen); + if (cname && (cname != this->ssl_name)) { GF_FREE(cname); } @@ -4024,6 +4080,8 @@ socket_init (rpc_transport_t *this) priv->bio = 0; priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; INIT_LIST_HEAD (&priv->ioq); + pthread_mutex_init (&priv->notify.lock, NULL); + pthread_cond_init (&priv->notify.cond, NULL); /* All the below section needs 'this->options' to be present */ if (!this->options) diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index 6c8875f7fb7..e299a3d7bd5 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -203,6 +203,7 @@ typedef enum { typedef struct { int32_t sock; int32_t idx; + int32_t gen; /* -1 = not connected. 0 = in progress. 1 = connected */ char connected; /* 1 = connect failed for reasons other than EINPROGRESS/ENOENT @@ -254,6 +255,11 @@ typedef struct { int log_ctr; GF_REF_DECL; /* refcount to keep track of socket_poller threads */ + struct { + pthread_mutex_t lock; + pthread_cond_t cond; + uint64_t in_progress; + } notify; } socket_private_t; |