diff options
| author | Aravinda VK <avishwan@redhat.com> | 2016-08-17 13:46:00 +0530 | 
|---|---|---|
| committer | Atin Mukherjee <amukherj@redhat.com> | 2016-08-30 18:34:59 -0700 | 
| commit | b71ae7d77d7ab1581d266f6435d134958844d0db (patch) | |
| tree | 1e2044ee1c7d7ec4dc5a620693484fb091ffb2c9 | |
| parent | c1f5cf0bda47fc34725084ee3988b0efe2dcfc8a (diff) | |
eventsapi: Add support for Client side Events
Client side gf_event uses ctx->cmd_args.volfile_server to push
notifications to the eventsd.
Socket server changed from Unix domain socket to UDP to support
external events.
Following to be addressed in different patch
- Port used for eventsd is 24009. Make it configurable
  Already configurable in Server side. Configurable in gf_event API
  is required.
- Auth Token yet to be added as discussed in
  https://www.gluster.org/pipermail/gluster-devel/2016-August/050324.html
Change-Id: I159acf80b681d10b82d52cfb3ffdf85cb896542d
BUG: 1367774
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/15189
Smoke: Gluster Build System <jenkins@build.gluster.org>
Reviewed-by: Prashanth Pai <ppai@redhat.com>
Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
| -rw-r--r-- | events/eventskeygen.py | 4 | ||||
| -rw-r--r-- | events/src/eventsapiconf.py.in | 8 | ||||
| -rw-r--r-- | events/src/eventsconfig.json | 3 | ||||
| -rw-r--r-- | events/src/gf_event.py | 20 | ||||
| -rw-r--r-- | events/src/glustereventsd.py | 95 | ||||
| -rw-r--r-- | events/src/peer_eventsapi.py | 4 | ||||
| -rw-r--r-- | events/src/utils.py | 28 | ||||
| -rw-r--r-- | extras/firewalld/glusterfs.xml | 1 | ||||
| -rw-r--r-- | libglusterfs/src/events.c | 90 | 
9 files changed, 139 insertions, 114 deletions
diff --git a/events/eventskeygen.py b/events/eventskeygen.py index 30b518dc18a..9d8a97c3030 100644 --- a/events/eventskeygen.py +++ b/events/eventskeygen.py @@ -102,7 +102,9 @@ ERRORS = (      "EVENT_ERROR_INVALID_INPUTS",      "EVENT_ERROR_SOCKET",      "EVENT_ERROR_CONNECT", -    "EVENT_ERROR_SEND" +    "EVENT_ERROR_SEND", +    "EVENT_ERROR_RESOLVE", +    "EVENT_ERROR_MSG_FORMAT",  )  if gen_header_type == "C_HEADER": diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in index 03dd0e8d5d7..fad96ca2cc7 100644 --- a/events/src/eventsapiconf.py.in +++ b/events/src/eventsapiconf.py.in @@ -9,7 +9,7 @@  #  cases as published by the Free Software Foundation.  # -SERVER_ADDRESS = "@localstatedir@/run/gluster/events.sock" +SERVER_ADDRESS = "0.0.0.0"  DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json"  CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json"  CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC @@ -17,7 +17,9 @@ WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json"  WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC  LOG_FILE = "@localstatedir@/log/glusterfs/events.log"  EVENTSD = "glustereventsd" -CONFIG_KEYS = ["log_level"] +CONFIG_KEYS = ["log_level", "port"]  BOOL_CONFIGS = [] -RESTART_CONFIGS = [] +INT_CONFIGS = ["port"] +RESTART_CONFIGS = ["port"]  EVENTS_ENABLED = @EVENTS_ENABLED@ +UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info" diff --git a/events/src/eventsconfig.json b/events/src/eventsconfig.json index ce2c775f0bd..45730f9bb83 100644 --- a/events/src/eventsconfig.json +++ b/events/src/eventsconfig.json @@ -1,3 +1,4 @@  { -    "log_level": "INFO" +    "log_level": "INFO", +    "port": 24009  } diff --git a/events/src/gf_event.py b/events/src/gf_event.py index 20dfc8a4f17..f9ece6adc28 100644 --- a/events/src/gf_event.py +++ b/events/src/gf_event.py @@ -16,7 +16,7 @@ import time  from eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED  from eventtypes import all_events -from utils import logger, setup_logger +from utils import logger, setup_logger, get_config  # Run this when this lib loads  setup_logger() @@ -31,10 +31,9 @@ def gf_event(event_type, **kwargs):          return      try: -        client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) -        client.connect(SERVER_ADDRESS) +        client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)      except socket.error as e: -        logger.error("Unable to connect to events.sock: {0}".format(e)) +        logger.error("Unable to connect to events Server: {0}".format(e))          return      # Convert key value args into KEY1=VALUE1;KEY2=VALUE2;.. @@ -45,7 +44,18 @@ def gf_event(event_type, **kwargs):      # <TIMESTAMP> <EVENT_TYPE> <MSG>      msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")) +    port = get_config("port") +    if port is None: +        logger.error("Unable to get eventsd port details") +        return +      try: -        client.sendall(msg) +        sent = client.sendto(msg, (SERVER_ADDRESS, port)) +        assert sent == len(msg)      except socket.error as e:          logger.error("Unable to Send message: {0}".format(e)) +    except AssertionError: +        logger.error("Unable to send message. Sent: {0}, Actual: {1}".format( +            sent, len(msg))) +    finally: +        client.close() diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py index 3fa57686a8b..91a0743ff22 100644 --- a/events/src/glustereventsd.py +++ b/events/src/glustereventsd.py @@ -11,12 +11,10 @@  #  from __future__ import print_function -import asyncore -import socket -import os -from multiprocessing import Process, Queue  import sys  import signal +import SocketServer +import socket  from eventtypes import all_events  import handlers @@ -24,26 +22,19 @@ import utils  from eventsapiconf import SERVER_ADDRESS  from utils import logger -# Global Queue, EventsHandler will add items to the queue -# and process_event will gets each item and handles it -events_queue = Queue() -events_server_pid = None +class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler): -def process_event(): -    """ -    Seperate process which handles all the incoming events from Gluster -    processes. -    """ -    while True: -        data = events_queue.get() -        logger.debug("EVENT: {0}".format(repr(data))) +    def handle(self): +        data = self.request[0].strip() +        logger.debug("EVENT: {0} from {1}".format(repr(data), +                                                  self.client_address[0]))          try:              # Event Format <TIMESTAMP> <TYPE> <DETAIL>              ts, key, value = data.split(" ", 2)          except ValueError:              logger.warn("Invalid Event Format {0}".format(data)) -            continue +            return          data_dict = {}          try: @@ -51,7 +42,7 @@ def process_event():              data_dict = dict(x.split('=') for x in value.split(';'))          except ValueError:              logger.warn("Unable to parse Event {0}".format(data)) -            continue +            return          try:              # Event Type to Function Map, Recieved event data will be in @@ -75,68 +66,28 @@ def process_event():                  handlers.generic_handler(ts, int(key), data_dict) -def process_event_wrapper(): -    try: -        process_event() -    except KeyboardInterrupt: -        return - - -class GlusterEventsHandler(asyncore.dispatcher_with_send): - -    def handle_read(self): -        data = self.recv(8192) -        if data: -            events_queue.put(data) -            self.send(data) - - -class GlusterEventsServer(asyncore.dispatcher): - -    def __init__(self): -        global events_server_pid -        asyncore.dispatcher.__init__(self) -        # Start the Events listener process which listens to -        # the global queue -        p = Process(target=process_event_wrapper) -        p.start() -        events_server_pid = p.pid - -        # Create UNIX Domain Socket, bind to path -        self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM) -        self.bind(SERVER_ADDRESS) -        self.listen(5) - -    def handle_accept(self): -        pair = self.accept() -        if pair is not None: -            sock, addr = pair -            GlusterEventsHandler(sock) - -  def signal_handler_sigusr2(sig, frame): -    if events_server_pid is not None: -        os.kill(events_server_pid, signal.SIGUSR2)      utils.load_all()  def init_event_server():      utils.setup_logger() - -    # Delete Socket file if Exists -    try: -        os.unlink(SERVER_ADDRESS) -    except OSError: -        if os.path.exists(SERVER_ADDRESS): -            print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS), -                   file=sys.stderr) -            sys.exit(1) -      utils.load_all() -    # Start the Eventing Server, UNIX DOMAIN SOCKET Server -    GlusterEventsServer() -    asyncore.loop() +    port = utils.get_config("port") +    if port is None: +        sys.stderr.write("Unable to get Port details from Config\n") +        sys.exit(1) + +    # Start the Eventing Server, UDP Server +    try: +        server = SocketServer.ThreadingUDPServer( +            (SERVER_ADDRESS, port), +            GlusterEventsRequestHandler) +    except socket.error as e: +        sys.stderr.write("Failed to start Eventsd: {0}\n".format(e)) +        sys.exit(1) +    server.serve_forever()  def main(): diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py index 7887d77351c..f4447784f90 100644 --- a/events/src/peer_eventsapi.py +++ b/events/src/peer_eventsapi.py @@ -31,6 +31,7 @@ from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,                                    EVENTSD,                                    CONFIG_KEYS,                                    BOOL_CONFIGS, +                                  INT_CONFIGS,                                    RESTART_CONFIGS) @@ -462,6 +463,9 @@ class ConfigSetCmd(Cmd):              if args.name in BOOL_CONFIGS:                  v = boolify(args.value) +            if args.name in INT_CONFIGS: +                v = int(args.value) +              new_data[args.name] = v              file_content_overwrite(CUSTOM_CONFIG_FILE, new_data) diff --git a/events/src/utils.py b/events/src/utils.py index 772221a1e25..386e8f28449 100644 --- a/events/src/utils.py +++ b/events/src/utils.py @@ -17,11 +17,10 @@ import requests  from eventsapiconf import (LOG_FILE,                             WEBHOOKS_FILE,                             DEFAULT_CONFIG_FILE, -                           CUSTOM_CONFIG_FILE) +                           CUSTOM_CONFIG_FILE, +                           UUID_FILE)  import eventtypes -from gluster.cliutils import get_node_uuid -  # Webhooks list  _webhooks = {} @@ -32,6 +31,23 @@ _config = {}  # Init Logger instance  logger = logging.getLogger(__name__) +NodeID = None + + +def get_node_uuid(): +    val = None +    with open(UUID_FILE) as f: +        for line in f: +            if line.startswith("UUID="): +                val = line.strip().split("=")[-1] +                break +    return val + + +def get_config(key): +    if not _config: +        load_config() +    return _config.get(key, None)  def get_event_type_name(idx): @@ -109,8 +125,12 @@ def load_all():  def publish(ts, event_key, data): +    global NodeID +    if NodeID is None: +        NodeID = get_node_uuid() +      message = { -        "nodeid": get_node_uuid(), +        "nodeid": NodeID,          "ts": int(ts),          "event": get_event_type_name(event_key),          "message": data diff --git a/extras/firewalld/glusterfs.xml b/extras/firewalld/glusterfs.xml index f8efd90c3b5..7e176442f5b 100644 --- a/extras/firewalld/glusterfs.xml +++ b/extras/firewalld/glusterfs.xml @@ -4,6 +4,7 @@  <description>Default ports for gluster-distributed storage</description>  <port protocol="tcp" port="24007"/>    <!--For glusterd -->  <port protocol="tcp" port="24008"/>    <!--For glusterd RDMA port management --> +<port protocol="tcp" port="24009"/>    <!--For glustereventsd -->  <port protocol="tcp" port="38465"/>    <!--Gluster NFS service -->  <port protocol="tcp" port="38466"/>    <!--Gluster NFS service -->  <port protocol="tcp" port="38467"/>    <!--Gluster NFS service --> diff --git a/libglusterfs/src/events.c b/libglusterfs/src/events.c index f93934de0fb..b7b513eb39a 100644 --- a/libglusterfs/src/events.c +++ b/libglusterfs/src/events.c @@ -16,73 +16,107 @@  #include <time.h>  #include <stdarg.h>  #include <string.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netdb.h> +  #include "syscall.h"  #include "mem-pool.h" +#include "glusterfs.h" +#include "globals.h"  #include "events.h" -#define EVENT_PATH DATADIR "/run/gluster/events.sock" -#define EVENTS_MSG_MAX 2048 +#define EVENT_HOST "127.0.0.1" +#define EVENT_PORT 24009  int  gf_event (eventtypes_t event, char *fmt, ...)  { -        int      ret                      = 0; -        int      sock                     = -1; -        char     eventstr[EVENTS_MSG_MAX] = ""; -        struct   sockaddr_un server; -        va_list  arguments; -        char     *msg                     = NULL; -        size_t   eventstr_size            = 0; +        int                ret                   = 0; +        int                sock                  = -1; +        char              *eventstr              = NULL; +        struct             sockaddr_in server; +        va_list            arguments; +        char              *msg                   = NULL; +        glusterfs_ctx_t   *ctx                   = NULL; +        struct hostent    *host_data; +        char              *host                  = NULL; + +        /* Global context */ +        ctx = THIS->ctx;          if (event < 0 || event >= EVENT_LAST) {                  ret = EVENT_ERROR_INVALID_INPUTS;                  goto out;          } -        sock = socket(AF_UNIX, SOCK_STREAM, 0); +        /* Initialize UDP socket */ +        sock = socket (AF_INET, SOCK_DGRAM, 0);          if (sock < 0) {                  ret = EVENT_ERROR_SOCKET;                  goto out;          } -        server.sun_family = AF_UNIX; -        strcpy(server.sun_path, EVENT_PATH); -        if (connect(sock, -                    (struct sockaddr *) &server, -                    sizeof(struct sockaddr_un)) < 0) { -                ret = EVENT_ERROR_CONNECT; -                goto out; +        /* Get Host name to send message */ +        if (ctx && ctx->cmd_args.volfile_server) { +                /* If it is client code then volfile_server is set +                   use that information to push the events. */ +                host_data = gethostbyname (ctx->cmd_args.volfile_server); +                if (host_data == NULL) { +                        ret = EVENT_ERROR_RESOLVE; +                        goto out; +                } +                host = inet_ntoa (*(struct in_addr *)(host_data->h_addr)); +        } else { +                /* Localhost, Use the defined IP for localhost */ +                host = EVENT_HOST;          } +        /* Socket Configurations */ +        server.sin_family = AF_INET; +        server.sin_port = htons (EVENT_PORT); +        server.sin_addr.s_addr = inet_addr (host); +        memset (&server.sin_zero, '\0', sizeof (server.sin_zero)); +          va_start (arguments, fmt);          ret = gf_vasprintf (&msg, fmt, arguments);          va_end (arguments); +          if (ret < 0) {                  ret = EVENT_ERROR_INVALID_INPUTS;                  goto out;          } -        eventstr_size = snprintf(NULL, 0, "%u %d %s", (unsigned)time(NULL), -                                 event, msg); +        ret = gf_asprintf (&eventstr, "%u %d %s", +                            (unsigned)time(NULL), event, msg); -        if (eventstr_size + 1 > EVENTS_MSG_MAX) { -                eventstr_size = EVENTS_MSG_MAX - 1; +        if (ret <= 0) { +                ret = EVENT_ERROR_MSG_FORMAT; +                goto out;          } -        snprintf(eventstr, eventstr_size+1, "%u %d %s", -                 (unsigned)time(NULL), event, msg); - -        if (sys_write(sock, eventstr, strlen(eventstr)) <= 0) { +        /* Send Message */ +        if (sendto (sock, eventstr, strlen (eventstr), +                    0, (struct sockaddr *)&server, sizeof (server)) <= 0) {                  ret = EVENT_ERROR_SEND; -                goto out;          }          ret = EVENT_SEND_OK;   out: -        sys_close(sock); -        GF_FREE(msg); +        if (sock >= 0) { +                sys_close (sock); +        } + +        /* Allocated by gf_vasprintf */ +        if (msg) +                GF_FREE (msg); + +        /* Allocated by gf_asprintf */ +        if (eventstr) +                GF_FREE (eventstr); +          return ret;  }  | 
