diff options
Diffstat (limited to 'events')
-rw-r--r-- | events/Makefile.am | 6 | ||||
-rw-r--r-- | events/eventskeygen.py | 65 | ||||
-rw-r--r-- | events/src/Makefile.am | 24 | ||||
-rw-r--r-- | events/src/__init__.py | 10 | ||||
-rw-r--r-- | events/src/eventsapiconf.py.in | 22 | ||||
-rw-r--r-- | events/src/eventsconfig.json | 3 | ||||
-rw-r--r-- | events/src/eventtypes.py | 9 | ||||
-rw-r--r-- | events/src/glustereventsd.py | 151 | ||||
-rw-r--r-- | events/src/handlers.py | 21 | ||||
-rw-r--r-- | events/src/peer_eventsapi.py | 521 | ||||
-rw-r--r-- | events/src/utils.py | 150 | ||||
-rw-r--r-- | events/tools/Makefile.am | 3 | ||||
-rw-r--r-- | events/tools/eventsdash.py | 74 |
13 files changed, 1059 insertions, 0 deletions
diff --git a/events/Makefile.am b/events/Makefile.am new file mode 100644 index 00000000000..04a74efc228 --- /dev/null +++ b/events/Makefile.am @@ -0,0 +1,6 @@ +SUBDIRS = src tools + +noinst_PYTHON = eventskeygen.py + +install-data-hook: + $(INSTALL) -d -m 755 $(DESTDIR)@GLUSTERD_WORKDIR@/events diff --git a/events/eventskeygen.py b/events/eventskeygen.py new file mode 100644 index 00000000000..656a7dce9f1 --- /dev/null +++ b/events/eventskeygen.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import os + +GLUSTER_SRC_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +eventtypes_h = os.path.join(GLUSTER_SRC_ROOT, "libglusterfs/src/eventtypes.h") +eventtypes_py = os.path.join(GLUSTER_SRC_ROOT, "events/src/eventtypes.py") + +# When adding new keys add it to the END +keys = ( + "EVENT_PEER_ATTACH", + "EVENT_PEER_DETACH", + + "EVENT_VOLUME_CREATE", + "EVENT_VOLUME_START", + "EVENT_VOLUME_STOP", + "EVENT_VOLUME_DELETE", +) + +LAST_EVENT = "EVENT_LAST" + +ERRORS = ( + "EVENT_SEND_OK", + "EVENT_ERROR_INVALID_INPUTS", + "EVENT_ERROR_SOCKET", + "EVENT_ERROR_CONNECT", + "EVENT_ERROR_SEND" +) + +# Generate eventtypes.h +with open(eventtypes_h, "w") as f: + f.write("#ifndef __EVENTTYPES_H__\n") + f.write("#define __EVENTTYPES_H__\n\n") + f.write("typedef enum {\n") + for k in ERRORS: + f.write(" {0},\n".format(k)) + f.write("} event_errors_t;\n") + + f.write("\n") + + f.write("typedef enum {\n") + for k in keys: + f.write(" {0},\n".format(k)) + + f.write(" {0}\n".format(LAST_EVENT)) + f.write("} eventtypes_t;\n") + f.write("\n#endif /* __EVENTTYPES_H__ */\n") + +# Generate eventtypes.py +with open(eventtypes_py, "w") as f: + f.write("# -*- coding: utf-8 -*-\n") + f.write("all_events = [\n") + for ev in keys: + f.write(' "{0}",\n'.format(ev)) + f.write("]\n") diff --git a/events/src/Makefile.am b/events/src/Makefile.am new file mode 100644 index 00000000000..528f0208fe2 --- /dev/null +++ b/events/src/Makefile.am @@ -0,0 +1,24 @@ +EXTRA_DIST = glustereventsd.py __init__.py eventsapiconf.py.in eventtypes.py \ + handlers.py utils.py peer_eventsapi.py eventsconfig.json + +eventsdir = $(libexecdir)/glusterfs/events +eventspeerscriptdir = $(libexecdir)/glusterfs +eventsconfdir = $(sysconfdir)/glusterfs +eventsconf_DATA = eventsconfig.json + +events_PYTHON = __init__.py eventsapiconf.py eventtypes.py handlers.py utils.py +events_SCRIPTS = glustereventsd.py +eventspeerscript_SCRIPTS = peer_eventsapi.py + +install-exec-hook: + $(mkdir_p) $(DESTDIR)$(sbindir) + rm -f $(DESTDIR)$(sbindir)/glustereventsd + ln -s $(libexecdir)/glusterfs/events/glustereventsd.py \ + $(DESTDIR)$(sbindir)/glustereventsd + rm -f $(DESTDIR)$(sbindir)/gluster-eventing + ln -s $(libexecdir)/glusterfs/peer_eventsapi.py \ + $(DESTDIR)$(sbindir)/gluster-eventsapi + +uninstall-hook: + rm -f $(DESTDIR)$(sbindir)/glustereventsd + rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi diff --git a/events/src/__init__.py b/events/src/__init__.py new file mode 100644 index 00000000000..f27c53a4df4 --- /dev/null +++ b/events/src/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in new file mode 100644 index 00000000000..702e1d21820 --- /dev/null +++ b/events/src/eventsapiconf.py.in @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +SERVER_ADDRESS = "@localstatedir@/run/gluster/events.sock" +DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json" +CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json" +CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC +WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json" +WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC +LOG_FILE = "@localstatedir@/log/glusterfs/events.log" +EVENTSD = "glustereventsd" +CONFIG_KEYS = ["log_level"] +BOOL_CONFIGS = [] +RESTART_CONFIGS = [] diff --git a/events/src/eventsconfig.json b/events/src/eventsconfig.json new file mode 100644 index 00000000000..ce2c775f0bd --- /dev/null +++ b/events/src/eventsconfig.json @@ -0,0 +1,3 @@ +{ + "log_level": "INFO" +} diff --git a/events/src/eventtypes.py b/events/src/eventtypes.py new file mode 100644 index 00000000000..4812e659de3 --- /dev/null +++ b/events/src/eventtypes.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +all_events = [ + "EVENT_PEER_ATTACH", + "EVENT_PEER_DETACH", + "EVENT_VOLUME_CREATE", + "EVENT_VOLUME_START", + "EVENT_VOLUME_STOP", + "EVENT_VOLUME_DELETE", +] diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py new file mode 100644 index 00000000000..3fa57686a8b --- /dev/null +++ b/events/src/glustereventsd.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +from __future__ import print_function +import asyncore +import socket +import os +from multiprocessing import Process, Queue +import sys +import signal + +from eventtypes import all_events +import handlers +import utils +from eventsapiconf import SERVER_ADDRESS +from utils import logger + +# Global Queue, EventsHandler will add items to the queue +# and process_event will gets each item and handles it +events_queue = Queue() +events_server_pid = None + + +def process_event(): + """ + Seperate process which handles all the incoming events from Gluster + processes. + """ + while True: + data = events_queue.get() + logger.debug("EVENT: {0}".format(repr(data))) + try: + # Event Format <TIMESTAMP> <TYPE> <DETAIL> + ts, key, value = data.split(" ", 2) + except ValueError: + logger.warn("Invalid Event Format {0}".format(data)) + continue + + data_dict = {} + try: + # Format key=value;key=value + data_dict = dict(x.split('=') for x in value.split(';')) + except ValueError: + logger.warn("Unable to parse Event {0}".format(data)) + continue + + try: + # Event Type to Function Map, Recieved event data will be in + # the form <TIMESTAMP> <TYPE> <DETAIL>, Get Event name for the + # recieved Type/Key and construct a function name starting with + # handle_ For example: handle_event_volume_create + func_name = "handle_" + all_events[int(key)].lower() + except IndexError: + # This type of Event is not handled? + logger.warn("Unhandled Event: {0}".format(key)) + func_name = None + + if func_name is not None: + # Get function from handlers module + func = getattr(handlers, func_name, None) + # If func is None, then handler unimplemented for that event. + if func is not None: + func(ts, int(key), data_dict) + else: + # Generic handler, broadcast whatever received + handlers.generic_handler(ts, int(key), data_dict) + + +def process_event_wrapper(): + try: + process_event() + except KeyboardInterrupt: + return + + +class GlusterEventsHandler(asyncore.dispatcher_with_send): + + def handle_read(self): + data = self.recv(8192) + if data: + events_queue.put(data) + self.send(data) + + +class GlusterEventsServer(asyncore.dispatcher): + + def __init__(self): + global events_server_pid + asyncore.dispatcher.__init__(self) + # Start the Events listener process which listens to + # the global queue + p = Process(target=process_event_wrapper) + p.start() + events_server_pid = p.pid + + # Create UNIX Domain Socket, bind to path + self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.bind(SERVER_ADDRESS) + self.listen(5) + + def handle_accept(self): + pair = self.accept() + if pair is not None: + sock, addr = pair + GlusterEventsHandler(sock) + + +def signal_handler_sigusr2(sig, frame): + if events_server_pid is not None: + os.kill(events_server_pid, signal.SIGUSR2) + utils.load_all() + + +def init_event_server(): + utils.setup_logger() + + # Delete Socket file if Exists + try: + os.unlink(SERVER_ADDRESS) + except OSError: + if os.path.exists(SERVER_ADDRESS): + print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS), + file=sys.stderr) + sys.exit(1) + + utils.load_all() + + # Start the Eventing Server, UNIX DOMAIN SOCKET Server + GlusterEventsServer() + asyncore.loop() + + +def main(): + try: + init_event_server() + except KeyboardInterrupt: + sys.exit(1) + + +if __name__ == "__main__": + signal.signal(signal.SIGUSR2, signal_handler_sigusr2) + main() diff --git a/events/src/handlers.py b/events/src/handlers.py new file mode 100644 index 00000000000..9b756a91d51 --- /dev/null +++ b/events/src/handlers.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import utils + + +def generic_handler(ts, key, data): + """ + Generic handler to broadcast message to all peers, custom handlers + can be created by func name handler_<event_name> + Ex: handle_event_volume_create(ts, key, data) + """ + utils.publish(ts, key, data) diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py new file mode 100644 index 00000000000..7887d77351c --- /dev/null +++ b/events/src/peer_eventsapi.py @@ -0,0 +1,521 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +from __future__ import print_function +import os +import json +from errno import EEXIST + +import requests +import fasteners +from prettytable import PrettyTable + +from gluster.cliutils import (Cmd, execute, node_output_ok, node_output_notok, + sync_file_to_peers, GlusterCmdException, + output_error, execute_in_peers, runcli) + +from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE, + CUSTOM_CONFIG_FILE_TO_SYNC, + EVENTSD, + CONFIG_KEYS, + BOOL_CONFIGS, + RESTART_CONFIGS) + + +def file_content_overwrite(fname, data): + with open(fname + ".tmp", "w") as f: + f.write(json.dumps(data)) + + os.rename(fname + ".tmp", fname) + + +def create_custom_config_file_if_not_exists(): + mkdirp(os.path.dirname(CUSTOM_CONFIG_FILE)) + if not os.path.exists(CUSTOM_CONFIG_FILE): + with open(CUSTOM_CONFIG_FILE, "w") as f: + f.write("{}") + + +def create_webhooks_file_if_not_exists(): + mkdirp(os.path.dirname(WEBHOOKS_FILE)) + if not os.path.exists(WEBHOOKS_FILE): + with open(WEBHOOKS_FILE, "w") as f: + f.write("{}") + + +def boolify(value): + val = False + if value.lower() in ["enabled", "true", "on", "yes"]: + val = True + return val + + +def mkdirp(path, exit_on_err=False, logger=None): + """ + Try creating required directory structure + ignore EEXIST and raise exception for rest of the errors. + Print error in stderr and exit + """ + try: + os.makedirs(path) + except (OSError, IOError) as e: + if e.errno == EEXIST and os.path.isdir(path): + pass + else: + output_error("Fail to create dir %s: %s" % (path, e)) + + +def is_enabled(service): + rc, out, err = execute(["systemctl", "is-enabled", service]) + return rc == 0 + + +def is_active(service): + rc, out, err = execute(["systemctl", "is-active", service]) + return rc == 0 + + +def enable_service(service): + if not is_enabled(service): + cmd = ["systemctl", "enable", service] + return execute(cmd) + + return (0, "", "") + + +def disable_service(service): + if is_enabled(service): + cmd = ["systemctl", "disable", service] + return execute(cmd) + + return (0, "", "") + + +def start_service(service): + rc, out, err = enable_service(service) + if rc != 0: + return (rc, out, err) + + cmd = ["systemctl", "start", service] + return execute(cmd) + + +def stop_service(service): + rc, out, err = disable_service(service) + if rc != 0: + return (rc, out, err) + + cmd = ["systemctl", "stop", service] + return execute(cmd) + + +def restart_service(service): + rc, out, err = stop_service(service) + if rc != 0: + return (rc, out, err) + + return start_service(service) + + +def reload_service(service): + if is_active(service): + cmd = ["systemctl", "reload", service] + return execute(cmd) + + return (0, "", "") + + +def sync_to_peers(restart=False): + if os.path.exists(WEBHOOKS_FILE): + try: + sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC) + except GlusterCmdException as e: + output_error("Failed to sync Webhooks file: [Error: {0}]" + "{1}".format(e[0], e[2])) + + if os.path.exists(CUSTOM_CONFIG_FILE): + try: + sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC) + except GlusterCmdException as e: + output_error("Failed to sync Config file: [Error: {0}]" + "{1}".format(e[0], e[2])) + + action = "node-reload" + if restart: + action = "node-restart" + + out = execute_in_peers(action) + table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"]) + table.align["NODE STATUS"] = "r" + table.align["SYNC STATUS"] = "r" + + for p in out: + table.add_row([p.hostname, + "UP" if p.node_up else "DOWN", + "OK" if p.ok else "NOT OK: {0}".format( + p.error)]) + + print (table) + + +def node_output_handle(resp): + rc, out, err = resp + if rc == 0: + node_output_ok(out) + else: + node_output_notok(err) + + +def action_handle(action): + out = execute_in_peers("node-" + action) + column_name = action.upper() + if action == "status": + column_name = EVENTSD.upper() + + table = PrettyTable(["NODE", "NODE STATUS", column_name + " STATUS"]) + table.align["NODE STATUS"] = "r" + table.align[column_name + " STATUS"] = "r" + + for p in out: + status_col_val = "OK" if p.ok else "NOT OK: {0}".format( + p.error) + if action == "status": + status_col_val = "DOWN" + if p.ok: + status_col_val = p.output + + table.add_row([p.hostname, + "UP" if p.node_up else "DOWN", + status_col_val]) + + print (table) + + +class NodeStart(Cmd): + name = "node-start" + + def run(self, args): + node_output_handle(start_service(EVENTSD)) + + +class StartCmd(Cmd): + name = "start" + + def run(self, args): + action_handle("start") + + +class NodeStop(Cmd): + name = "node-stop" + + def run(self, args): + node_output_handle(stop_service(EVENTSD)) + + +class StopCmd(Cmd): + name = "stop" + + def run(self, args): + action_handle("stop") + + +class NodeRestart(Cmd): + name = "node-restart" + + def run(self, args): + node_output_handle(restart_service(EVENTSD)) + + +class RestartCmd(Cmd): + name = "restart" + + def run(self, args): + action_handle("restart") + + +class NodeReload(Cmd): + name = "node-reload" + + def run(self, args): + node_output_handle(reload_service(EVENTSD)) + + +class ReloadCmd(Cmd): + name = "reload" + + def run(self, args): + action_handle("reload") + + +class NodeStatus(Cmd): + name = "node-status" + + def run(self, args): + node_output_ok("UP" if is_active(EVENTSD) else "DOWN") + + +class StatusCmd(Cmd): + name = "status" + + def run(self, args): + webhooks = {} + if os.path.exists(WEBHOOKS_FILE): + webhooks = json.load(open(WEBHOOKS_FILE)) + + print ("Webhooks: " + ("" if webhooks else "None")) + for w in webhooks: + print (w) + + print () + action_handle("status") + + +class WebhookAddCmd(Cmd): + name = "webhook-add" + + def args(self, parser): + parser.add_argument("url", help="URL of Webhook") + parser.add_argument("--bearer_token", "-t", help="Bearer Token", + default="") + + def run(self, args): + create_webhooks_file_if_not_exists() + + with fasteners.InterProcessLock(WEBHOOKS_FILE): + data = json.load(open(WEBHOOKS_FILE)) + if data.get(args.url, None) is not None: + output_error("Webhook already exists") + + data[args.url] = args.bearer_token + file_content_overwrite(WEBHOOKS_FILE, data) + + sync_to_peers() + + +class WebhookModCmd(Cmd): + name = "webhook-mod" + + def args(self, parser): + parser.add_argument("url", help="URL of Webhook") + parser.add_argument("--bearer_token", "-t", help="Bearer Token", + default="") + + def run(self, args): + create_webhooks_file_if_not_exists() + + with fasteners.InterProcessLock(WEBHOOKS_FILE): + data = json.load(open(WEBHOOKS_FILE)) + if data.get(args.url, None) is None: + output_error("Webhook does not exists") + + data[args.url] = args.bearer_token + file_content_overwrite(WEBHOOKS_FILE, data) + + sync_to_peers() + + +class WebhookDelCmd(Cmd): + name = "webhook-del" + + def args(self, parser): + parser.add_argument("url", help="URL of Webhook") + + def run(self, args): + create_webhooks_file_if_not_exists() + + with fasteners.InterProcessLock(WEBHOOKS_FILE): + data = json.load(open(WEBHOOKS_FILE)) + if data.get(args.url, None) is None: + output_error("Webhook does not exists") + + del data[args.url] + file_content_overwrite(WEBHOOKS_FILE, data) + + sync_to_peers() + + +class NodeWebhookTestCmd(Cmd): + name = "node-webhook-test" + + def args(self, parser): + parser.add_argument("url") + parser.add_argument("bearer_token") + + def run(self, args): + http_headers = {} + if args.bearer_token != ".": + http_headers["Authorization"] = "Bearer " + args.bearer_token + + try: + resp = requests.post(args.url, headers=http_headers) + except requests.ConnectionError as e: + node_output_notok("{0}".format(e)) + + if resp.status_code != 200: + node_output_notok("{0}".format(resp.status_code)) + + node_output_ok() + + +class WebhookTestCmd(Cmd): + name = "webhook-test" + + def args(self, parser): + parser.add_argument("url", help="URL of Webhook") + parser.add_argument("--bearer_token", "-t", help="Bearer Token") + + def run(self, args): + url = args.url + bearer_token = args.bearer_token + if not args.url: + url = "." + if not args.bearer_token: + bearer_token = "." + + out = execute_in_peers("node-webhook-test", [url, bearer_token]) + + table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"]) + table.align["NODE STATUS"] = "r" + table.align["WEBHOOK STATUS"] = "r" + + for p in out: + table.add_row([p.hostname, + "UP" if p.node_up else "DOWN", + "OK" if p.ok else "NOT OK: {0}".format( + p.error)]) + + print (table) + + +class ConfigGetCmd(Cmd): + name = "config-get" + + def args(self, parser): + parser.add_argument("--name", help="Config Name") + + def run(self, args): + data = json.load(open(DEFAULT_CONFIG_FILE)) + if os.path.exists(CUSTOM_CONFIG_FILE): + data.update(json.load(open(CUSTOM_CONFIG_FILE))) + + if args.name is not None and args.name not in CONFIG_KEYS: + output_error("Invalid Config item") + + table = PrettyTable(["NAME", "VALUE"]) + if args.name is None: + for k, v in data.items(): + table.add_row([k, v]) + else: + table.add_row([args.name, data[args.name]]) + + print (table) + + +def read_file_content_json(fname): + content = "{}" + with open(fname) as f: + content = f.read() + if content.strip() == "": + content = "{}" + + return json.loads(content) + + +class ConfigSetCmd(Cmd): + name = "config-set" + + def args(self, parser): + parser.add_argument("name", help="Config Name") + parser.add_argument("value", help="Config Value") + + def run(self, args): + if args.name not in CONFIG_KEYS: + output_error("Invalid Config item") + + with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE): + data = json.load(open(DEFAULT_CONFIG_FILE)) + if os.path.exists(CUSTOM_CONFIG_FILE): + config_json = read_file_content_json(CUSTOM_CONFIG_FILE) + data.update(config_json) + + # Do Nothing if same as previous value + if data[args.name] == args.value: + return + + # TODO: Validate Value + create_custom_config_file_if_not_exists() + new_data = read_file_content_json(CUSTOM_CONFIG_FILE) + + v = args.value + if args.name in BOOL_CONFIGS: + v = boolify(args.value) + + new_data[args.name] = v + file_content_overwrite(CUSTOM_CONFIG_FILE, new_data) + + # If any value changed which requires restart of REST server + restart = False + if args.name in RESTART_CONFIGS: + restart = True + + sync_to_peers(restart=restart) + + +class ConfigResetCmd(Cmd): + name = "config-reset" + + def args(self, parser): + parser.add_argument("name", help="Config Name or all") + + def run(self, args): + with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE): + changed_keys = [] + data = {} + if os.path.exists(CUSTOM_CONFIG_FILE): + data = read_file_content_json(CUSTOM_CONFIG_FILE) + + if not data: + return + + if args.name.lower() == "all": + for k, v in data.items(): + changed_keys.append(k) + + # Reset all keys + file_content_overwrite(CUSTOM_CONFIG_FILE, {}) + else: + changed_keys.append(args.name) + del data[args.name] + file_content_overwrite(CUSTOM_CONFIG_FILE, data) + + # If any value changed which requires restart of REST server + restart = False + for key in changed_keys: + if key in RESTART_CONFIGS: + restart = True + break + + sync_to_peers(restart=restart) + + +class SyncCmd(Cmd): + name = "sync" + + def run(self, args): + sync_to_peers() + + +if __name__ == "__main__": + runcli() diff --git a/events/src/utils.py b/events/src/utils.py new file mode 100644 index 00000000000..772221a1e25 --- /dev/null +++ b/events/src/utils.py @@ -0,0 +1,150 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import json +import os +import logging + +import requests +from eventsapiconf import (LOG_FILE, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE) +import eventtypes + +from gluster.cliutils import get_node_uuid + + +# Webhooks list +_webhooks = {} +# Default Log Level +_log_level = "INFO" +# Config Object +_config = {} + +# Init Logger instance +logger = logging.getLogger(__name__) + + +def get_event_type_name(idx): + """ + Returns Event Type text from the index. For example, VOLUME_CREATE + """ + return eventtypes.all_events[idx].replace("EVENT_", "") + + +def setup_logger(): + """ + Logging initialization, Log level by default will be INFO, once config + file is read, respective log_level will be set. + """ + global logger + logger.setLevel(logging.INFO) + + # create the logging file handler + fh = logging.FileHandler(LOG_FILE) + + formatter = logging.Formatter("[%(asctime)s] %(levelname)s " + "[%(module)s - %(lineno)s:%(funcName)s] " + "- %(message)s") + + fh.setFormatter(formatter) + + # add handler to logger object + logger.addHandler(fh) + + +def load_config(): + """ + Load/Reload the config from REST Config files. This function will + be triggered during init and when SIGUSR2. + """ + global _config + _config = {} + if os.path.exists(DEFAULT_CONFIG_FILE): + _config = json.load(open(DEFAULT_CONFIG_FILE)) + if os.path.exists(CUSTOM_CONFIG_FILE): + _config.update(json.load(open(CUSTOM_CONFIG_FILE))) + + +def load_log_level(): + """ + Reads log_level from Config file and sets accordingly. This function will + be triggered during init and when SIGUSR2. + """ + global logger, _log_level + new_log_level = _config.get("log_level", "INFO") + if _log_level != new_log_level: + logger.setLevel(getattr(logging, new_log_level.upper())) + _log_level = new_log_level.upper() + + +def load_webhooks(): + """ + Load/Reload the webhooks list. This function will + be triggered during init and when SIGUSR2. + """ + global _webhooks + _webhooks = {} + if os.path.exists(WEBHOOKS_FILE): + _webhooks = json.load(open(WEBHOOKS_FILE)) + + +def load_all(): + """ + Wrapper function to call all load/reload functions. This function will + be triggered during init and when SIGUSR2. + """ + load_config() + load_webhooks() + load_log_level() + + +def publish(ts, event_key, data): + message = { + "nodeid": get_node_uuid(), + "ts": int(ts), + "event": get_event_type_name(event_key), + "message": data + } + if _webhooks: + plugin_webhook(message) + else: + # TODO: Default action? + pass + + +def plugin_webhook(message): + message_json = json.dumps(message, sort_keys=True) + logger.debug("EVENT: {0}".format(message_json)) + for url, token in _webhooks.items(): + http_headers = {"Content-Type": "application/json"} + if token != "" and token is not None: + http_headers["Authorization"] = "Bearer " + token + + try: + resp = requests.post(url, headers=http_headers, data=message_json) + except requests.ConnectionError as e: + logger.warn("Event push failed to URL: {url}, " + "Event: {event}, " + "Status: {error}".format( + url=url, + event=message_json, + error=e)) + continue + + if resp.status_code != 200: + logger.warn("Event push failed to URL: {url}, " + "Event: {event}, " + "Status Code: {status_code}".format( + url=url, + event=message_json, + status_code=resp.status_code)) diff --git a/events/tools/Makefile.am b/events/tools/Makefile.am new file mode 100644 index 00000000000..7d5e331e4e1 --- /dev/null +++ b/events/tools/Makefile.am @@ -0,0 +1,3 @@ +scriptsdir = $(datadir)/glusterfs/scripts +scripts_SCRIPTS = eventsdash.py +EXTRA_DIST = eventsdash.py diff --git a/events/tools/eventsdash.py b/events/tools/eventsdash.py new file mode 100644 index 00000000000..47fc56dda6e --- /dev/null +++ b/events/tools/eventsdash.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +from argparse import ArgumentParser, RawDescriptionHelpFormatter +import logging +from datetime import datetime + +from flask import Flask, request + +app = Flask(__name__) +app.logger.disabled = True +log = logging.getLogger('werkzeug') +log.disabled = True + + +def human_time(ts): + return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S") + + +@app.route("/") +def home(): + return "OK" + + +@app.route("/listen", methods=["POST"]) +def listen(): + data = request.json + if data is None: + return "OK" + + message = [] + for k, v in data.get("message", {}).items(): + message.append("{0}={1}".format(k, v)) + + print ("{0:20s} {1:20s} {2:36} {3}".format( + human_time(data.get("ts")), + data.get("event"), + data.get("nodeid"), + " ".join(message))) + + return "OK" + + +def main(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=__doc__) + parser.add_argument("--port", type=int, help="Port", default=9000) + parser.add_argument("--debug", help="Run Server in debug mode", + action="store_true") + args = parser.parse_args() + + print ("{0:20s} {1:20s} {2:36} {3}".format( + "TIMESTAMP", "EVENT", "NODE ID", "MESSAGE" + )) + print ("{0:20s} {1:20s} {2:36} {3}".format( + "-"*20, "-"*20, "-"*36, "-"*20 + )) + if args.debug: + app.debug = True + + app.run(host="0.0.0.0", port=args.port) + + +if __name__ == "__main__": + main() |