diff options
author | Venky Shankar <vshankar@redhat.com> | 2015-02-03 19:22:16 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2015-03-18 18:22:36 -0700 |
commit | 4737584fffcd25dbe35d17b076c95bf90a422cf2 (patch) | |
tree | 9f30e0e90c88c245787b78af3ca78d7ae05e30f2 /xlators/features/changelog/src/changelog-notifier.c | |
parent | 728fcd41eb39f66744d84b979dd8195fd47313ed (diff) |
features/changelog: RPC'fy {libgf}changelog
This patch introduces RPC based communication between the changelog
translator and libgfchangelog. It replaces the old pathetic stream
based interaction that existed earlier (due to time constraints :-/).
Changelog, upon initialization starts a RPC server (rpcsvc) allowing
clients to invoke a probe API as a bootup mechanism to request for
event notifications. During probe, clients can choose an event
filter specifying the type(s) of events they are interested in. As
of now there is no way to change the event notification set once
the probe RPC call is made, but that is easier to implement.
The actual event notifications is done on a separate RPC session.
The client (libgfchangelog) itself starts and RPC server which the
changelog translator "connects back" during probe. Notifications
are dispatched by a bunch of threads from the server (translator)
and the client optionally orders them if ordered notifications
are requried. FOPs fill in their respective event details in a
buffer (rot-buffs to be particular) and a bunch of threads
(consumers) swap the buffers out of roatation and dispatch them
via RPC. To avoid writer starvation, then number of dispatcher
threads is one less than the number of buffer list in rot-buffs.x
libgfchangelog becomes purely callback based -- upon event
notification from the server (and re-ordering them if required)
invoke a callback routine specified by consumer(s).
A major part of the patch is also aimed at providing backward
compatibility for geo-replication, which was one of the main
consumer of the stream based API. Also, this patch does not\
"turn on" event notifications for all fops, just a bunch which
is currently in requirement. Another pain point is that the
server does not filter events before dispatching it to the
clients. That load is taken up by the client itself (although
it's done at the library layer rather than making it hard on
the callback implementor). This needs improvement and care
needs to be taken to not load the server up with expensive
filtering mechanisms.
Change-Id: Ibf60a432b68f2dfa60c6f9add2bcfd37a9c41395
BUG: 1170075
Signed-off-by: Venky Shankar <vshankar@redhat.com>
Reviewed-on: http://review.gluster.org/9708
Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/features/changelog/src/changelog-notifier.c')
-rw-r--r-- | xlators/features/changelog/src/changelog-notifier.c | 314 |
1 files changed, 0 insertions, 314 deletions
diff --git a/xlators/features/changelog/src/changelog-notifier.c b/xlators/features/changelog/src/changelog-notifier.c deleted file mode 100644 index 5f3d063a8ad..00000000000 --- a/xlators/features/changelog/src/changelog-notifier.c +++ /dev/null @@ -1,314 +0,0 @@ -/* - Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> - This file is part of GlusterFS. - - This file is licensed to you under your choice of the GNU Lesser - General Public License, version 3 or any later version (LGPLv3 or - later), or the GNU General Public License, version 2 (GPLv2), in all - cases as published by the Free Software Foundation. -*/ - -#include "changelog-notifier.h" - -#include <pthread.h> - -inline static void -changelog_notify_clear_fd (changelog_notify_t *cn, int i) -{ - cn->client_fd[i] = -1; -} - -inline static void -changelog_notify_save_fd (changelog_notify_t *cn, int i, int fd) -{ - cn->client_fd[i] = fd; -} - -static int -changelog_notify_insert_fd (xlator_t *this, changelog_notify_t *cn, int fd) -{ - int i = 0; - int ret = 0; - - for (; i < CHANGELOG_MAX_CLIENTS; i++) { - if (cn->client_fd[i] == -1) - break; - } - - if (i == CHANGELOG_MAX_CLIENTS) { - /** - * this case should not be hit as listen() would limit - * the number of completely established connections. - */ - gf_log (this->name, GF_LOG_WARNING, - "hit max client limit (%d)", CHANGELOG_MAX_CLIENTS); - ret = -1; - } - else - changelog_notify_save_fd (cn, i, fd); - - return ret; -} - -static void -changelog_notify_fill_rset (changelog_notify_t *cn, fd_set *rset, int *maxfd) -{ - int i = 0; - - FD_ZERO (rset); - - FD_SET (cn->socket_fd, rset); - *maxfd = cn->socket_fd; - - FD_SET (cn->rfd, rset); - *maxfd = max (*maxfd, cn->rfd); - - for (; i < CHANGELOG_MAX_CLIENTS; i++) { - if (cn->client_fd[i] != -1) { - FD_SET (cn->client_fd[i], rset); - *maxfd = max (*maxfd, cn->client_fd[i]); - } - } - - *maxfd = *maxfd + 1; -} - -static int -changelog_notify_client (changelog_notify_t *cn, char *path, ssize_t len) -{ - int i = 0; - int ret = 0; - - for (; i < CHANGELOG_MAX_CLIENTS; i++) { - if (cn->client_fd[i] == -1) - continue; - - if (changelog_write (cn->client_fd[i], - path, len)) { - ret = -1; - - close (cn->client_fd[i]); - changelog_notify_clear_fd (cn, i); - } - } - - return ret; -} - -static void -changelog_notifier_init (changelog_notify_t *cn) -{ - int i = 0; - - cn->socket_fd = -1; - - for (; i < CHANGELOG_MAX_CLIENTS; i++) { - changelog_notify_clear_fd (cn, i); - } -} - -static void -changelog_close_client_conn (changelog_notify_t *cn) -{ - int i = 0; - - for (; i < CHANGELOG_MAX_CLIENTS; i++) { - if (cn->client_fd[i] == -1) - continue; - - close (cn->client_fd[i]); - changelog_notify_clear_fd (cn, i); - } -} - -static void -changelog_notifier_cleanup (void *arg) -{ - changelog_notify_t *cn = NULL; - - cn = (changelog_notify_t *) arg; - - changelog_close_client_conn (cn); - - if (cn->socket_fd != -1) - close (cn->socket_fd); - - if (cn->rfd) - close (cn->rfd); - - if (unlink (cn->sockpath)) - gf_log ("", GF_LOG_WARNING, - "could not unlink changelog socket file" - " %s (reason: %s", cn->sockpath, strerror (errno)); -} - -void * -changelog_notifier (void *data) -{ - int i = 0; - int fd = 0; - int max_fd = 0; - int len = 0; - ssize_t readlen = 0; - xlator_t *this = NULL; - changelog_priv_t *priv = NULL; - changelog_notify_t *cn = NULL; - struct sockaddr_un local = {0,}; - char path[PATH_MAX] = {0,}; - char abspath[PATH_MAX] = {0,}; - - char buffer; - fd_set rset; - - priv = (changelog_priv_t *) data; - - cn = &priv->cn; - this = cn->this; - - pthread_cleanup_push (changelog_notifier_cleanup, cn); - - changelog_notifier_init (cn); - - cn->socket_fd = socket (AF_UNIX, SOCK_STREAM, 0); - if (cn->socket_fd < 0) { - gf_log (this->name, GF_LOG_ERROR, - "changelog socket error (reason: %s)", - strerror (errno)); - goto out; - } - - CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick, - cn->sockpath, UNIX_PATH_MAX); - if (unlink (cn->sockpath) < 0) { - if (errno != ENOENT) { - gf_log (this->name, GF_LOG_ERROR, - "Could not unlink changelog socket file (%s)" - " (reason: %s)", - CHANGELOG_UNIX_SOCK, strerror (errno)); - goto cleanup; - } - } - - local.sun_family = AF_UNIX; - strcpy (local.sun_path, cn->sockpath); - - len = strlen (local.sun_path) + sizeof (local.sun_family); - - /* bind to the unix domain socket */ - if (bind (cn->socket_fd, (struct sockaddr *) &local, len) < 0) { - gf_log (this->name, GF_LOG_ERROR, - "Could not bind to changelog socket (reason: %s)", - strerror (errno)); - goto cleanup; - } - - /* listen for incoming connections */ - if (listen (cn->socket_fd, CHANGELOG_MAX_CLIENTS) < 0) { - gf_log (this->name, GF_LOG_ERROR, - "listen() error on changelog socket (reason: %s)", - strerror (errno)); - goto cleanup; - } - - /** - * simple select() on all to-be-read file descriptors. This method - * though old school works pretty well when you have a handfull of - * fd's to be watched (clients). - * - * Future TODO: move this to epoll based notification facility if - * number of clients increase. - */ - for (;;) { - changelog_notify_fill_rset (cn, &rset, &max_fd); - - if (select (max_fd, &rset, NULL, NULL, NULL) < 0) { - gf_log (this->name, GF_LOG_ERROR, - "select() returned -1 (reason: %s)", - strerror (errno)); - sleep (2); - continue; - } - - if (FD_ISSET (cn->socket_fd, &rset)) { - fd = accept (cn->socket_fd, NULL, NULL); - if (fd < 0) { - gf_log (this->name, GF_LOG_ERROR, - "accept error on changelog socket" - " (reason: %s)", strerror (errno)); - } else if (changelog_notify_insert_fd (this, cn, fd)) { - gf_log (this->name, GF_LOG_ERROR, - "hit max client limit"); - } - } - - if (FD_ISSET (cn->rfd, &rset)) { - /** - * read changelog filename and notify all connected - * clients. - */ - readlen = 0; - while (readlen < PATH_MAX) { - len = read (cn->rfd, &path[readlen++], 1); - if (len == -1) { - break; - } - - if (len == 0) { - gf_log (this->name, GF_LOG_ERROR, - "rollover thread sent EOF" - " on pipe - possibly a crash."); - /* be blunt and close all connections */ - pthread_exit(NULL); - } - - if (path[readlen - 1] == '\0') - break; - } - - /* should we close all client connections here too? */ - if (len < 0 || readlen == PATH_MAX) { - gf_log (this->name, GF_LOG_ERROR, - "Could not get pathname from rollover" - " thread or pathname too long"); - goto process_rest; - } - - (void) snprintf (abspath, PATH_MAX, - "%s/%s", priv->changelog_dir, path); - if (changelog_notify_client (cn, abspath, - strlen (abspath) + 1)) - gf_log (this->name, GF_LOG_ERROR, - "could not notify some clients with new" - " changelogs"); - } - - process_rest: - for (i = 0; i < CHANGELOG_MAX_CLIENTS; i++) { - if ( (fd = cn->client_fd[i]) == -1 ) - continue; - - if (FD_ISSET (fd, &rset)) { - /** - * the only data we accept from the client is a - * disconnect. Anything else is treated as bogus - * and is silently discarded (also warned!!!). - */ - if ( (readlen = read (fd, &buffer, 1)) <= 0 ) { - close (fd); - changelog_notify_clear_fd (cn, i); - } else { - /* silently discard data and log */ - gf_log (this->name, GF_LOG_WARNING, - "misbehaving changelog client"); - } - } - } - - } - - cleanup:; - pthread_cleanup_pop (1); - - out: - return NULL; -} |