diff options
-rw-r--r-- | events/eventskeygen.py | 4 | ||||
-rw-r--r-- | events/src/eventsapiconf.py.in | 8 | ||||
-rw-r--r-- | events/src/eventsconfig.json | 3 | ||||
-rw-r--r-- | events/src/gf_event.py | 20 | ||||
-rw-r--r-- | events/src/glustereventsd.py | 95 | ||||
-rw-r--r-- | events/src/peer_eventsapi.py | 4 | ||||
-rw-r--r-- | events/src/utils.py | 28 | ||||
-rw-r--r-- | extras/firewalld/glusterfs.xml | 1 | ||||
-rw-r--r-- | libglusterfs/src/events.c | 90 |
9 files changed, 139 insertions, 114 deletions
diff --git a/events/eventskeygen.py b/events/eventskeygen.py index 30b518dc18a..9d8a97c3030 100644 --- a/events/eventskeygen.py +++ b/events/eventskeygen.py @@ -102,7 +102,9 @@ ERRORS = ( "EVENT_ERROR_INVALID_INPUTS", "EVENT_ERROR_SOCKET", "EVENT_ERROR_CONNECT", - "EVENT_ERROR_SEND" + "EVENT_ERROR_SEND", + "EVENT_ERROR_RESOLVE", + "EVENT_ERROR_MSG_FORMAT", ) if gen_header_type == "C_HEADER": diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in index 03dd0e8d5d7..fad96ca2cc7 100644 --- a/events/src/eventsapiconf.py.in +++ b/events/src/eventsapiconf.py.in @@ -9,7 +9,7 @@ # cases as published by the Free Software Foundation. # -SERVER_ADDRESS = "@localstatedir@/run/gluster/events.sock" +SERVER_ADDRESS = "0.0.0.0" DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json" CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json" CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC @@ -17,7 +17,9 @@ WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json" WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC LOG_FILE = "@localstatedir@/log/glusterfs/events.log" EVENTSD = "glustereventsd" -CONFIG_KEYS = ["log_level"] +CONFIG_KEYS = ["log_level", "port"] BOOL_CONFIGS = [] -RESTART_CONFIGS = [] +INT_CONFIGS = ["port"] +RESTART_CONFIGS = ["port"] EVENTS_ENABLED = @EVENTS_ENABLED@ +UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info" diff --git a/events/src/eventsconfig.json b/events/src/eventsconfig.json index ce2c775f0bd..45730f9bb83 100644 --- a/events/src/eventsconfig.json +++ b/events/src/eventsconfig.json @@ -1,3 +1,4 @@ { - "log_level": "INFO" + "log_level": "INFO", + "port": 24009 } diff --git a/events/src/gf_event.py b/events/src/gf_event.py index 20dfc8a4f17..f9ece6adc28 100644 --- a/events/src/gf_event.py +++ b/events/src/gf_event.py @@ -16,7 +16,7 @@ import time from eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED from eventtypes import all_events -from utils import logger, setup_logger +from utils import logger, setup_logger, get_config # Run this when this lib loads setup_logger() @@ -31,10 +31,9 @@ def gf_event(event_type, **kwargs): return try: - client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - client.connect(SERVER_ADDRESS) + client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) except socket.error as e: - logger.error("Unable to connect to events.sock: {0}".format(e)) + logger.error("Unable to connect to events Server: {0}".format(e)) return # Convert key value args into KEY1=VALUE1;KEY2=VALUE2;.. @@ -45,7 +44,18 @@ def gf_event(event_type, **kwargs): # <TIMESTAMP> <EVENT_TYPE> <MSG> msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")) + port = get_config("port") + if port is None: + logger.error("Unable to get eventsd port details") + return + try: - client.sendall(msg) + sent = client.sendto(msg, (SERVER_ADDRESS, port)) + assert sent == len(msg) except socket.error as e: logger.error("Unable to Send message: {0}".format(e)) + except AssertionError: + logger.error("Unable to send message. Sent: {0}, Actual: {1}".format( + sent, len(msg))) + finally: + client.close() diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py index 3fa57686a8b..91a0743ff22 100644 --- a/events/src/glustereventsd.py +++ b/events/src/glustereventsd.py @@ -11,12 +11,10 @@ # from __future__ import print_function -import asyncore -import socket -import os -from multiprocessing import Process, Queue import sys import signal +import SocketServer +import socket from eventtypes import all_events import handlers @@ -24,26 +22,19 @@ import utils from eventsapiconf import SERVER_ADDRESS from utils import logger -# Global Queue, EventsHandler will add items to the queue -# and process_event will gets each item and handles it -events_queue = Queue() -events_server_pid = None +class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler): -def process_event(): - """ - Seperate process which handles all the incoming events from Gluster - processes. - """ - while True: - data = events_queue.get() - logger.debug("EVENT: {0}".format(repr(data))) + def handle(self): + data = self.request[0].strip() + logger.debug("EVENT: {0} from {1}".format(repr(data), + self.client_address[0])) try: # Event Format <TIMESTAMP> <TYPE> <DETAIL> ts, key, value = data.split(" ", 2) except ValueError: logger.warn("Invalid Event Format {0}".format(data)) - continue + return data_dict = {} try: @@ -51,7 +42,7 @@ def process_event(): data_dict = dict(x.split('=') for x in value.split(';')) except ValueError: logger.warn("Unable to parse Event {0}".format(data)) - continue + return try: # Event Type to Function Map, Recieved event data will be in @@ -75,68 +66,28 @@ def process_event(): handlers.generic_handler(ts, int(key), data_dict) -def process_event_wrapper(): - try: - process_event() - except KeyboardInterrupt: - return - - -class GlusterEventsHandler(asyncore.dispatcher_with_send): - - def handle_read(self): - data = self.recv(8192) - if data: - events_queue.put(data) - self.send(data) - - -class GlusterEventsServer(asyncore.dispatcher): - - def __init__(self): - global events_server_pid - asyncore.dispatcher.__init__(self) - # Start the Events listener process which listens to - # the global queue - p = Process(target=process_event_wrapper) - p.start() - events_server_pid = p.pid - - # Create UNIX Domain Socket, bind to path - self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.bind(SERVER_ADDRESS) - self.listen(5) - - def handle_accept(self): - pair = self.accept() - if pair is not None: - sock, addr = pair - GlusterEventsHandler(sock) - - def signal_handler_sigusr2(sig, frame): - if events_server_pid is not None: - os.kill(events_server_pid, signal.SIGUSR2) utils.load_all() def init_event_server(): utils.setup_logger() - - # Delete Socket file if Exists - try: - os.unlink(SERVER_ADDRESS) - except OSError: - if os.path.exists(SERVER_ADDRESS): - print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS), - file=sys.stderr) - sys.exit(1) - utils.load_all() - # Start the Eventing Server, UNIX DOMAIN SOCKET Server - GlusterEventsServer() - asyncore.loop() + port = utils.get_config("port") + if port is None: + sys.stderr.write("Unable to get Port details from Config\n") + sys.exit(1) + + # Start the Eventing Server, UDP Server + try: + server = SocketServer.ThreadingUDPServer( + (SERVER_ADDRESS, port), + GlusterEventsRequestHandler) + except socket.error as e: + sys.stderr.write("Failed to start Eventsd: {0}\n".format(e)) + sys.exit(1) + server.serve_forever() def main(): diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py index 7887d77351c..f4447784f90 100644 --- a/events/src/peer_eventsapi.py +++ b/events/src/peer_eventsapi.py @@ -31,6 +31,7 @@ from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, EVENTSD, CONFIG_KEYS, BOOL_CONFIGS, + INT_CONFIGS, RESTART_CONFIGS) @@ -462,6 +463,9 @@ class ConfigSetCmd(Cmd): if args.name in BOOL_CONFIGS: v = boolify(args.value) + if args.name in INT_CONFIGS: + v = int(args.value) + new_data[args.name] = v file_content_overwrite(CUSTOM_CONFIG_FILE, new_data) diff --git a/events/src/utils.py b/events/src/utils.py index 772221a1e25..386e8f28449 100644 --- a/events/src/utils.py +++ b/events/src/utils.py @@ -17,11 +17,10 @@ import requests from eventsapiconf import (LOG_FILE, WEBHOOKS_FILE, DEFAULT_CONFIG_FILE, - CUSTOM_CONFIG_FILE) + CUSTOM_CONFIG_FILE, + UUID_FILE) import eventtypes -from gluster.cliutils import get_node_uuid - # Webhooks list _webhooks = {} @@ -32,6 +31,23 @@ _config = {} # Init Logger instance logger = logging.getLogger(__name__) +NodeID = None + + +def get_node_uuid(): + val = None + with open(UUID_FILE) as f: + for line in f: + if line.startswith("UUID="): + val = line.strip().split("=")[-1] + break + return val + + +def get_config(key): + if not _config: + load_config() + return _config.get(key, None) def get_event_type_name(idx): @@ -109,8 +125,12 @@ def load_all(): def publish(ts, event_key, data): + global NodeID + if NodeID is None: + NodeID = get_node_uuid() + message = { - "nodeid": get_node_uuid(), + "nodeid": NodeID, "ts": int(ts), "event": get_event_type_name(event_key), "message": data diff --git a/extras/firewalld/glusterfs.xml b/extras/firewalld/glusterfs.xml index f8efd90c3b5..7e176442f5b 100644 --- a/extras/firewalld/glusterfs.xml +++ b/extras/firewalld/glusterfs.xml @@ -4,6 +4,7 @@ <description>Default ports for gluster-distributed storage</description> <port protocol="tcp" port="24007"/> <!--For glusterd --> <port protocol="tcp" port="24008"/> <!--For glusterd RDMA port management --> +<port protocol="tcp" port="24009"/> <!--For glustereventsd --> <port protocol="tcp" port="38465"/> <!--Gluster NFS service --> <port protocol="tcp" port="38466"/> <!--Gluster NFS service --> <port protocol="tcp" port="38467"/> <!--Gluster NFS service --> diff --git a/libglusterfs/src/events.c b/libglusterfs/src/events.c index f93934de0fb..b7b513eb39a 100644 --- a/libglusterfs/src/events.c +++ b/libglusterfs/src/events.c @@ -16,73 +16,107 @@ #include <time.h> #include <stdarg.h> #include <string.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netdb.h> + #include "syscall.h" #include "mem-pool.h" +#include "glusterfs.h" +#include "globals.h" #include "events.h" -#define EVENT_PATH DATADIR "/run/gluster/events.sock" -#define EVENTS_MSG_MAX 2048 +#define EVENT_HOST "127.0.0.1" +#define EVENT_PORT 24009 int gf_event (eventtypes_t event, char *fmt, ...) { - int ret = 0; - int sock = -1; - char eventstr[EVENTS_MSG_MAX] = ""; - struct sockaddr_un server; - va_list arguments; - char *msg = NULL; - size_t eventstr_size = 0; + int ret = 0; + int sock = -1; + char *eventstr = NULL; + struct sockaddr_in server; + va_list arguments; + char *msg = NULL; + glusterfs_ctx_t *ctx = NULL; + struct hostent *host_data; + char *host = NULL; + + /* Global context */ + ctx = THIS->ctx; if (event < 0 || event >= EVENT_LAST) { ret = EVENT_ERROR_INVALID_INPUTS; goto out; } - sock = socket(AF_UNIX, SOCK_STREAM, 0); + /* Initialize UDP socket */ + sock = socket (AF_INET, SOCK_DGRAM, 0); if (sock < 0) { ret = EVENT_ERROR_SOCKET; goto out; } - server.sun_family = AF_UNIX; - strcpy(server.sun_path, EVENT_PATH); - if (connect(sock, - (struct sockaddr *) &server, - sizeof(struct sockaddr_un)) < 0) { - ret = EVENT_ERROR_CONNECT; - goto out; + /* Get Host name to send message */ + if (ctx && ctx->cmd_args.volfile_server) { + /* If it is client code then volfile_server is set + use that information to push the events. */ + host_data = gethostbyname (ctx->cmd_args.volfile_server); + if (host_data == NULL) { + ret = EVENT_ERROR_RESOLVE; + goto out; + } + host = inet_ntoa (*(struct in_addr *)(host_data->h_addr)); + } else { + /* Localhost, Use the defined IP for localhost */ + host = EVENT_HOST; } + /* Socket Configurations */ + server.sin_family = AF_INET; + server.sin_port = htons (EVENT_PORT); + server.sin_addr.s_addr = inet_addr (host); + memset (&server.sin_zero, '\0', sizeof (server.sin_zero)); + va_start (arguments, fmt); ret = gf_vasprintf (&msg, fmt, arguments); va_end (arguments); + if (ret < 0) { ret = EVENT_ERROR_INVALID_INPUTS; goto out; } - eventstr_size = snprintf(NULL, 0, "%u %d %s", (unsigned)time(NULL), - event, msg); + ret = gf_asprintf (&eventstr, "%u %d %s", + (unsigned)time(NULL), event, msg); - if (eventstr_size + 1 > EVENTS_MSG_MAX) { - eventstr_size = EVENTS_MSG_MAX - 1; + if (ret <= 0) { + ret = EVENT_ERROR_MSG_FORMAT; + goto out; } - snprintf(eventstr, eventstr_size+1, "%u %d %s", - (unsigned)time(NULL), event, msg); - - if (sys_write(sock, eventstr, strlen(eventstr)) <= 0) { + /* Send Message */ + if (sendto (sock, eventstr, strlen (eventstr), + 0, (struct sockaddr *)&server, sizeof (server)) <= 0) { ret = EVENT_ERROR_SEND; - goto out; } ret = EVENT_SEND_OK; out: - sys_close(sock); - GF_FREE(msg); + if (sock >= 0) { + sys_close (sock); + } + + /* Allocated by gf_vasprintf */ + if (msg) + GF_FREE (msg); + + /* Allocated by gf_asprintf */ + if (eventstr) + GF_FREE (eventstr); + return ret; } |