diff options
Diffstat (limited to 'events')
| -rw-r--r-- | events/eventskeygen.py | 8 | ||||
| -rw-r--r-- | events/src/Makefile.am | 8 | ||||
| -rw-r--r-- | events/src/eventsapiconf.py.in | 27 | ||||
| -rw-r--r-- | events/src/gf_event.py | 19 | ||||
| -rw-r--r-- | events/src/glustereventsd.py | 51 | ||||
| -rw-r--r-- | events/src/peer_eventsapi.py | 133 | ||||
| -rw-r--r-- | events/src/utils.py | 178 | ||||
| -rw-r--r-- | events/tools/eventsdash.py | 15 |
8 files changed, 331 insertions, 108 deletions
diff --git a/events/eventskeygen.py b/events/eventskeygen.py index 23dfb478904..e28ebe9b7e6 100644 --- a/events/eventskeygen.py +++ b/events/eventskeygen.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -22,7 +22,7 @@ gen_header_type = sys.argv[1] # When adding new keys add it to the END keys = ( # user driven events - #peer and volume managment events + #peer and volume management events "EVENT_PEER_ATTACH", "EVENT_PEER_DETACH", "EVENT_VOLUME_CREATE", @@ -191,6 +191,10 @@ keys = ( #tier events "EVENT_TIER_START", "EVENT_TIER_START_FORCE", + + #brick/inodes events + "EVENT_DHT_DISK_USAGE", + "EVENT_DHT_INODES_USAGE", ) LAST_EVENT = "EVENT_LAST" diff --git a/events/src/Makefile.am b/events/src/Makefile.am index 8493abdcd82..3b229691897 100644 --- a/events/src/Makefile.am +++ b/events/src/Makefile.am @@ -5,9 +5,13 @@ EXTRA_DIST = glustereventsd.py __init__.py eventsapiconf.py.in \ BUILT_SOURCES = eventtypes.py CLEANFILES = eventtypes.py -eventsdir = $(GLUSTERFS_LIBEXECDIR)/events +eventsdir = $(GLUSTERFS_LIBEXECDIR)/gfevents +if BUILD_EVENTS events_PYTHON = __init__.py gf_event.py eventsapiconf.py eventtypes.py \ utils.py +endif +# this does not work, see the Makefile.am in the root for a workaround +#nodist_events_PYTHON = eventtypes.py eventtypes.py: $(top_srcdir)/events/eventskeygen.py $(PYTHON) $(top_srcdir)/events/eventskeygen.py PY_HEADER @@ -24,7 +28,7 @@ eventspeerscript_SCRIPTS = peer_eventsapi.py install-exec-hook: $(mkdir_p) $(DESTDIR)$(sbindir) rm -f $(DESTDIR)$(sbindir)/glustereventsd - ln -s $(GLUSTERFS_LIBEXECDIR)/events/glustereventsd.py \ + ln -s $(GLUSTERFS_LIBEXECDIR)/gfevents/glustereventsd.py \ $(DESTDIR)$(sbindir)/glustereventsd rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi ln -s $(GLUSTERFS_LIBEXECDIR)/peer_eventsapi.py \ diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in index 08a3602f567..700093bee60 100644 --- a/events/src/eventsapiconf.py.in +++ b/events/src/eventsapiconf.py.in @@ -9,12 +9,32 @@ # cases as published by the Free Software Foundation. # +import subprocess +glusterd_workdir = None + +# Methods +def get_glusterd_workdir(): + global glusterd_workdir + if glusterd_workdir is not None: + return glusterd_workdir + proc = subprocess.Popen(["gluster", "system::", "getwd"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines = True) + out, err = proc.communicate() + if proc.returncode == 0: + glusterd_workdir = out.strip() + else: + glusterd_workdir = "@GLUSTERD_WORKDIR@" + return glusterd_workdir + SERVER_ADDRESS = "0.0.0.0" +SERVER_ADDRESSv4 = "0.0.0.0" +SERVER_ADDRESSv6 = "::1" DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json" CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json" -CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC +CUSTOM_CONFIG_FILE = get_glusterd_workdir() + CUSTOM_CONFIG_FILE_TO_SYNC WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json" -WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC +WEBHOOKS_FILE = get_glusterd_workdir() + WEBHOOKS_FILE_TO_SYNC LOG_FILE = "@localstatedir@/log/glusterfs/events.log" EVENTSD = "glustereventsd" CONFIG_KEYS = ["log-level", "port", "disable-events-log"] @@ -22,10 +42,11 @@ BOOL_CONFIGS = ["disable-events-log"] INT_CONFIGS = ["port"] RESTART_CONFIGS = ["port"] EVENTS_ENABLED = @EVENTS_ENABLED@ -UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info" +UUID_FILE = get_glusterd_workdir() + "/glusterd.info" PID_FILE = "@localstatedir@/run/glustereventsd.pid" AUTO_BOOL_ATTRIBUTES = ["force", "push-pem", "no-verify"] AUTO_INT_ATTRIBUTES = ["ssh-port"] +CERTS_DIR = get_glusterd_workdir() + "/events" # Errors ERROR_SAME_CONFIG = 2 diff --git a/events/src/gf_event.py b/events/src/gf_event.py index f9ece6adc28..260b0d9aa48 100644 --- a/events/src/gf_event.py +++ b/events/src/gf_event.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -13,10 +12,10 @@ import socket import time -from eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED -from eventtypes import all_events +from gfevents.eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED +from gfevents.eventtypes import all_events -from utils import logger, setup_logger, get_config +from gfevents.utils import logger, setup_logger, get_config # Run this when this lib loads setup_logger() @@ -36,18 +35,18 @@ def gf_event(event_type, **kwargs): logger.error("Unable to connect to events Server: {0}".format(e)) return + port = get_config("port") + if port is None: + logger.error("Unable to get eventsd port details") + return + # Convert key value args into KEY1=VALUE1;KEY2=VALUE2;.. msg = "" for k, v in kwargs.items(): msg += "{0}={1};".format(k, v) # <TIMESTAMP> <EVENT_TYPE> <MSG> - msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")) - - port = get_config("port") - if port is None: - logger.error("Unable to get eventsd port details") - return + msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")).encode() try: sent = client.sendto(msg, (SERVER_ADDRESS, port)) diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py index 606b89cbd7f..341a3b60947 100644 --- a/events/src/glustereventsd.py +++ b/events/src/glustereventsd.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -13,22 +13,36 @@ from __future__ import print_function import sys import signal -import SocketServer +import threading +try: + import socketserver +except ImportError: + import SocketServer as socketserver import socket from argparse import ArgumentParser, RawDescriptionHelpFormatter from eventtypes import all_events import handlers import utils -from eventsapiconf import SERVER_ADDRESS, PID_FILE +from eventsapiconf import SERVER_ADDRESSv4, SERVER_ADDRESSv6, PID_FILE from eventsapiconf import AUTO_BOOL_ATTRIBUTES, AUTO_INT_ATTRIBUTES from utils import logger, PidFile, PidFileLockFailed, boolify +# Subclass so that specifically IPv4 packets are captured +class UDPServerv4(socketserver.ThreadingUDPServer): + address_family = socket.AF_INET -class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler): +# Subclass so that specifically IPv6 packets are captured +class UDPServerv6(socketserver.ThreadingUDPServer): + address_family = socket.AF_INET6 + +class GlusterEventsRequestHandler(socketserver.BaseRequestHandler): def handle(self): data = self.request[0].strip() + if sys.version_info >= (3,): + data = self.request[0].strip().decode("utf-8") + logger.debug("EVENT: {0} from {1}".format(repr(data), self.client_address[0])) try: @@ -46,7 +60,7 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler): logger.warn("Unable to parse Event {0}".format(data)) return - for k, v in data_dict.iteritems(): + for k, v in data_dict.items(): try: if k in AUTO_BOOL_ATTRIBUTES: data_dict[k] = boolify(v) @@ -83,6 +97,10 @@ def signal_handler_sigusr2(sig, frame): utils.restart_webhook_pool() +def UDP_server_thread(sock): + sock.serve_forever() + + def init_event_server(): utils.setup_logger() utils.load_all() @@ -93,15 +111,26 @@ def init_event_server(): sys.stderr.write("Unable to get Port details from Config\n") sys.exit(1) - # Start the Eventing Server, UDP Server + # Creating the Eventing Server, UDP Server for IPv4 packets + try: + serverv4 = UDPServerv4((SERVER_ADDRESSv4, port), + GlusterEventsRequestHandler) + except socket.error as e: + sys.stderr.write("Failed to start Eventsd for IPv4: {0}\n".format(e)) + sys.exit(1) + # Creating the Eventing Server, UDP Server for IPv6 packets try: - server = SocketServer.ThreadingUDPServer( - (SERVER_ADDRESS, port), - GlusterEventsRequestHandler) + serverv6 = UDPServerv6((SERVER_ADDRESSv6, port), + GlusterEventsRequestHandler) except socket.error as e: - sys.stderr.write("Failed to start Eventsd: {0}\n".format(e)) + sys.stderr.write("Failed to start Eventsd for IPv6: {0}\n".format(e)) sys.exit(1) - server.serve_forever() + server_thread1 = threading.Thread(target=UDP_server_thread, + args=(serverv4,)) + server_thread2 = threading.Thread(target=UDP_server_thread, + args=(serverv6,)) + server_thread1.start() + server_thread2.start() def get_args(): diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py index 59808ada539..4d2e5f35b1c 100644 --- a/events/src/peer_eventsapi.py +++ b/events/src/peer_eventsapi.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -18,6 +18,7 @@ import fcntl from errno import EACCES, EAGAIN import signal import sys +import time import requests from prettytable import PrettyTable @@ -26,27 +27,28 @@ from gluster.cliutils import (Cmd, node_output_ok, node_output_notok, sync_file_to_peers, GlusterCmdException, output_error, execute_in_peers, runcli, set_common_args_func) -from events.utils import LockedOpen - -from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, - WEBHOOKS_FILE, - DEFAULT_CONFIG_FILE, - CUSTOM_CONFIG_FILE, - CUSTOM_CONFIG_FILE_TO_SYNC, - EVENTSD, - CONFIG_KEYS, - BOOL_CONFIGS, - INT_CONFIGS, - PID_FILE, - RESTART_CONFIGS, - ERROR_INVALID_CONFIG, - ERROR_WEBHOOK_NOT_EXISTS, - ERROR_CONFIG_SYNC_FAILED, - ERROR_WEBHOOK_ALREADY_EXISTS, - ERROR_PARTIAL_SUCCESS, - ERROR_ALL_NODES_STATUS_NOT_OK, - ERROR_SAME_CONFIG, - ERROR_WEBHOOK_SYNC_FAILED) +from gfevents.utils import LockedOpen, get_jwt_token, save_https_cert + +from gfevents.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE, + CUSTOM_CONFIG_FILE_TO_SYNC, + EVENTSD, + CONFIG_KEYS, + BOOL_CONFIGS, + INT_CONFIGS, + PID_FILE, + RESTART_CONFIGS, + ERROR_INVALID_CONFIG, + ERROR_WEBHOOK_NOT_EXISTS, + ERROR_CONFIG_SYNC_FAILED, + ERROR_WEBHOOK_ALREADY_EXISTS, + ERROR_PARTIAL_SUCCESS, + ERROR_ALL_NODES_STATUS_NOT_OK, + ERROR_SAME_CONFIG, + ERROR_WEBHOOK_SYNC_FAILED, + CERTS_DIR) def handle_output_error(err, errcode=1, json_output=False): @@ -171,8 +173,10 @@ def sync_to_peers(args): try: sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC) except GlusterCmdException as e: + # Print stdout if stderr is empty + errmsg = e.message[2] if e.message[2] else e.message[1] handle_output_error("Failed to sync Webhooks file: [Error: {0}]" - "{1}".format(e[0], e[2]), + "{1}".format(e.message[0], errmsg), errcode=ERROR_WEBHOOK_SYNC_FAILED, json_output=args.json) @@ -180,8 +184,10 @@ def sync_to_peers(args): try: sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC) except GlusterCmdException as e: + # Print stdout if stderr is empty + errmsg = e.message[2] if e.message[2] else e.message[1] handle_output_error("Failed to sync Config file: [Error: {0}]" - "{1}".format(e[0], e[2]), + "{1}".format(e.message[0], errmsg), errcode=ERROR_CONFIG_SYNC_FAILED, json_output=args.json) @@ -307,6 +313,8 @@ class WebhookAddCmd(Cmd): parser.add_argument("url", help="URL of Webhook") parser.add_argument("--bearer_token", "-t", help="Bearer Token", default="") + parser.add_argument("--secret", "-s", + help="Secret to add JWT Bearer Token", default="") def run(self, args): create_webhooks_file_if_not_exists(args) @@ -318,7 +326,8 @@ class WebhookAddCmd(Cmd): errcode=ERROR_WEBHOOK_ALREADY_EXISTS, json_output=args.json) - data[args.url] = args.bearer_token + data[args.url] = {"token": args.bearer_token, + "secret": args.secret} file_content_overwrite(WEBHOOKS_FILE, data) sync_to_peers(args) @@ -331,6 +340,8 @@ class WebhookModCmd(Cmd): parser.add_argument("url", help="URL of Webhook") parser.add_argument("--bearer_token", "-t", help="Bearer Token", default="") + parser.add_argument("--secret", "-s", + help="Secret to add JWT Bearer Token", default="") def run(self, args): create_webhooks_file_if_not_exists(args) @@ -342,7 +353,15 @@ class WebhookModCmd(Cmd): errcode=ERROR_WEBHOOK_NOT_EXISTS, json_output=args.json) - data[args.url] = args.bearer_token + if isinstance(data[args.url], str): + data[args.url]["token"] = data[args.url] + + if args.bearer_token != "": + data[args.url]["token"] = args.bearer_token + + if args.secret != "": + data[args.url]["secret"] = args.secret + file_content_overwrite(WEBHOOKS_FILE, data) sync_to_peers(args) @@ -376,18 +395,57 @@ class NodeWebhookTestCmd(Cmd): def args(self, parser): parser.add_argument("url") parser.add_argument("bearer_token") + parser.add_argument("secret") def run(self, args): http_headers = {} + hashval = "" if args.bearer_token != ".": - http_headers["Authorization"] = "Bearer " + args.bearer_token + hashval = args.bearer_token - try: - resp = requests.post(args.url, headers=http_headers) - except requests.ConnectionError as e: - node_output_notok("{0}".format(e)) - except requests.exceptions.InvalidSchema as e: - node_output_notok("{0}".format(e)) + if args.secret != ".": + hashval = get_jwt_token(args.secret, "TEST", int(time.time())) + + if hashval: + http_headers["Authorization"] = "Bearer " + hashval + + urldata = requests.utils.urlparse(args.url) + parts = urldata.netloc.split(":") + domain = parts[0] + # Default https port if not specified + port = 443 + if len(parts) == 2: + port = int(parts[1]) + + cert_path = os.path.join(CERTS_DIR, args.url.replace("/", "_").strip()) + verify = True + while True: + try: + resp = requests.post(args.url, headers=http_headers, + verify=verify) + # Successful webhook push + break + except requests.exceptions.SSLError as e: + # If verify is equal to cert path, but still failed with + # SSLError, Looks like some issue with custom downloaded + # certificate, Try with verify = false + if verify == cert_path: + verify = False + continue + + # If verify is instance of bool and True, then custom cert + # is required, download the cert and retry + try: + save_https_cert(domain, port, cert_path) + verify = cert_path + except Exception: + verify = False + + # Done with collecting cert, continue + continue + except Exception as e: + node_output_notok("{0}".format(e)) + break if resp.status_code != 200: node_output_notok("{0}".format(resp.status_code)) @@ -401,16 +459,23 @@ class WebhookTestCmd(Cmd): def args(self, parser): parser.add_argument("url", help="URL of Webhook") parser.add_argument("--bearer_token", "-t", help="Bearer Token") + parser.add_argument("--secret", "-s", + help="Secret to generate Bearer Token") def run(self, args): url = args.url bearer_token = args.bearer_token + secret = args.secret + if not args.url: url = "." if not args.bearer_token: bearer_token = "." + if not args.secret: + secret = "." - out = execute_in_peers("node-webhook-test", [url, bearer_token]) + out = execute_in_peers("node-webhook-test", [url, bearer_token, + secret]) if not args.json: table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"]) diff --git a/events/src/utils.py b/events/src/utils.py index 2a77b13d502..6d4e0791a2b 100644 --- a/events/src/utils.py +++ b/events/src/utils.py @@ -9,21 +9,34 @@ # cases as published by the Free Software Foundation. # +import sys import json import os import logging +import logging.handlers import fcntl -from errno import ESRCH, EBADF +from errno import EBADF from threading import Thread import multiprocessing -from Queue import Queue - -from eventsapiconf import (LOG_FILE, - WEBHOOKS_FILE, - DEFAULT_CONFIG_FILE, - CUSTOM_CONFIG_FILE, - UUID_FILE) -import eventtypes +try: + from queue import Queue +except ImportError: + from Queue import Queue +from datetime import datetime, timedelta +import base64 +import hmac +from hashlib import sha256 +from calendar import timegm + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from gfevents.eventsapiconf import (LOG_FILE, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE, + UUID_FILE, + CERTS_DIR) +from gfevents import eventtypes # Webhooks list @@ -86,7 +99,7 @@ def setup_logger(): logger.setLevel(logging.INFO) # create the logging file handler - fh = logging.FileHandler(LOG_FILE) + fh = logging.handlers.WatchedFileHandler(LOG_FILE) formatter = logging.Formatter("[%(asctime)s] %(levelname)s " "[%(module)s - %(lineno)s:%(funcName)s] " @@ -183,42 +196,121 @@ def autoload_webhooks(): load_webhooks() -def publish_to_webhook(url, token, message_queue): +def base64_urlencode(inp): + return base64.urlsafe_b64encode(inp).replace("=", "").strip() + + +def get_jwt_token(secret, event_type, event_ts, jwt_expiry_time_seconds=60): + exp = datetime.utcnow() + timedelta(seconds=jwt_expiry_time_seconds) + payload = { + "exp": timegm(exp.utctimetuple()), + "iss": "gluster", + "sub": event_type, + "iat": event_ts + } + header = '{"alg":"HS256","typ":"JWT"}' + payload = json.dumps(payload, separators=(',', ':'), sort_keys=True) + msg = base64_urlencode(header) + "." + base64_urlencode(payload) + return "%s.%s" % ( + msg, + base64_urlencode(hmac.HMAC(str(secret), msg, sha256).digest()) + ) + + +def save_https_cert(domain, port, cert_path): + import ssl + + # Cert file already available for this URL + if os.path.exists(cert_path): + return + + cert_data = ssl.get_server_certificate((domain, port)) + with open(cert_path, "w") as f: + f.write(cert_data) + + +def publish_to_webhook(url, token, secret, message_queue): # Import requests here since not used in any other place import requests http_headers = {"Content-Type": "application/json"} + urldata = requests.utils.urlparse(url) + parts = urldata.netloc.split(":") + domain = parts[0] + # Default https port if not specified + port = 443 + if len(parts) == 2: + port = int(parts[1]) + + cert_path = os.path.join(CERTS_DIR, url.replace("/", "_").strip()) + while True: - message_json = message_queue.get() + hashval = "" + event_type, event_ts, message_json = message_queue.get() if token != "" and token is not None: - http_headers["Authorization"] = "Bearer " + token + hashval = token - try: - resp = requests.post(url, headers=http_headers, data=message_json) - except requests.ConnectionError as e: - logger.warn("Event push failed to URL: {url}, " - "Event: {event}, " - "Status: {error}".format( - url=url, - event=message_json, - error=e)) - continue - finally: - message_queue.task_done() + if secret != "" and secret is not None: + hashval = get_jwt_token(secret, event_type, event_ts) - if resp.status_code != 200: - logger.warn("Event push failed to URL: {url}, " - "Event: {event}, " - "Status Code: {status_code}".format( - url=url, - event=message_json, - status_code=resp.status_code)) + if hashval: + http_headers["Authorization"] = "Bearer " + hashval + + verify = True + while True: + try: + resp = requests.post(url, headers=http_headers, + data=message_json, + verify=verify) + # Successful webhook push + message_queue.task_done() + if resp.status_code != 200: + logger.warn("Event push failed to URL: {url}, " + "Event: {event}, " + "Status Code: {status_code}".format( + url=url, + event=message_json, + status_code=resp.status_code)) + break + except requests.exceptions.SSLError as e: + # If verify is equal to cert path, but still failed with + # SSLError, Looks like some issue with custom downloaded + # certificate, Try with verify = false + if verify == cert_path: + logger.warn("Event push failed with certificate, " + "ignoring verification url={0} " + "Error={1}".format(url, e)) + verify = False + continue + + # If verify is instance of bool and True, then custom cert + # is required, download the cert and retry + try: + save_https_cert(domain, port, cert_path) + verify = cert_path + except Exception as ex: + verify = False + logger.warn("Unable to get Server certificate, " + "ignoring verification url={0} " + "Error={1}".format(url, ex)) + + # Done with collecting cert, continue + continue + except Exception as e: + logger.warn("Event push failed to URL: {url}, " + "Event: {event}, " + "Status: {error}".format( + url=url, + event=message_json, + error=e)) + message_queue.task_done() + break def plugin_webhook(message): message_json = json.dumps(message, sort_keys=True) logger.debug("EVENT: {0}".format(message_json)) - webhooks_pool.send(message_json) + webhooks_pool.send(message["event"], message["ts"], message_json) class LockedOpen(object): @@ -298,9 +390,17 @@ class PidFile(object): def webhook_monitor(proc_queue, webhooks): queues = {} - for url, token in webhooks.items(): + for url, data in webhooks.items(): + if isinstance(data, str): + token = data + secret = None + else: + token = data["token"] + secret = data["secret"] + queues[url] = Queue() - t = Thread(target=publish_to_webhook, args=(url, token, queues[url])) + t = Thread(target=publish_to_webhook, args=(url, token, secret, + queues[url])) t.start() # Get the message sent to Process queue and distribute to all thread queues @@ -312,8 +412,8 @@ def webhook_monitor(proc_queue, webhooks): class WebhookThreadPool(object): def start(self): - # Seperate process to emit messages to webhooks - # which maintains one thread per webhook. Seperate + # Separate process to emit messages to webhooks + # which maintains one thread per webhook. Separate # process is required since on reload we need to stop # and start the thread pool. In Python Threads can't be stopped # so terminate the process and start again. Note: In transit @@ -329,8 +429,8 @@ class WebhookThreadPool(object): self.proc.terminate() self.start() - def send(self, message): - self.queue.put(message) + def send(self, event_type, event_ts, message): + self.queue.put((event_type, event_ts, message)) def init_webhook_pool(): diff --git a/events/tools/eventsdash.py b/events/tools/eventsdash.py index 47fc56dda6e..6479ea59da6 100644 --- a/events/tools/eventsdash.py +++ b/events/tools/eventsdash.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -10,6 +10,7 @@ # cases as published by the Free Software Foundation. # +from __future__ import print_function from argparse import ArgumentParser, RawDescriptionHelpFormatter import logging from datetime import datetime @@ -41,11 +42,11 @@ def listen(): for k, v in data.get("message", {}).items(): message.append("{0}={1}".format(k, v)) - print ("{0:20s} {1:20s} {2:36} {3}".format( + print(("{0:20s} {1:20s} {2:36} {3}".format( human_time(data.get("ts")), data.get("event"), data.get("nodeid"), - " ".join(message))) + " ".join(message)))) return "OK" @@ -58,12 +59,12 @@ def main(): action="store_true") args = parser.parse_args() - print ("{0:20s} {1:20s} {2:36} {3}".format( + print(("{0:20s} {1:20s} {2:36} {3}".format( "TIMESTAMP", "EVENT", "NODE ID", "MESSAGE" - )) - print ("{0:20s} {1:20s} {2:36} {3}".format( + ))) + print(("{0:20s} {1:20s} {2:36} {3}".format( "-"*20, "-"*20, "-"*36, "-"*20 - )) + ))) if args.debug: app.debug = True |
