diff options
Diffstat (limited to 'events')
| -rw-r--r-- | events/Makefile.am | 4 | ||||
| -rw-r--r-- | events/eventskeygen.py | 164 | ||||
| -rw-r--r-- | events/src/Makefile.am | 27 | ||||
| -rw-r--r-- | events/src/eventsapiconf.py.in | 49 | ||||
| -rw-r--r-- | events/src/eventsconfig.json | 4 | ||||
| -rw-r--r-- | events/src/gf_event.py | 30 | ||||
| -rw-r--r-- | events/src/glustereventsd.py | 166 | ||||
| -rw-r--r-- | events/src/handlers.py | 2 | ||||
| -rw-r--r-- | events/src/peer_eventsapi.py | 562 | ||||
| -rw-r--r-- | events/src/utils.py | 359 | ||||
| -rw-r--r-- | events/tools/Makefile.am | 5 | ||||
| -rw-r--r-- | events/tools/eventsdash.py | 15 |
12 files changed, 1031 insertions, 356 deletions
diff --git a/events/Makefile.am b/events/Makefile.am index 04a74efc228..264bb742a80 100644 --- a/events/Makefile.am +++ b/events/Makefile.am @@ -1,6 +1,8 @@ SUBDIRS = src tools - +EXTRA_DIST = eventskeygen.py noinst_PYTHON = eventskeygen.py +if BUILD_EVENTS install-data-hook: $(INSTALL) -d -m 755 $(DESTDIR)@GLUSTERD_WORKDIR@/events +endif diff --git a/events/eventskeygen.py b/events/eventskeygen.py index 7357d8e84a3..e28ebe9b7e6 100644 --- a/events/eventskeygen.py +++ b/events/eventskeygen.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -21,16 +21,21 @@ gen_header_type = sys.argv[1] # When adding new keys add it to the END keys = ( + # user driven events + #peer and volume management events "EVENT_PEER_ATTACH", "EVENT_PEER_DETACH", - "EVENT_VOLUME_CREATE", "EVENT_VOLUME_START", "EVENT_VOLUME_STOP", "EVENT_VOLUME_DELETE", "EVENT_VOLUME_SET", "EVENT_VOLUME_RESET", + "EVENT_BRICK_RESET_START", + "EVENT_BRICK_RESET_COMMIT", + "EVENT_BRICK_REPLACE", + #geo-rep events "EVENT_GEOREP_CREATE", "EVENT_GEOREP_START", "EVENT_GEOREP_STOP", @@ -39,6 +44,157 @@ keys = ( "EVENT_GEOREP_DELETE", "EVENT_GEOREP_CONFIG_SET", "EVENT_GEOREP_CONFIG_RESET", + + #bitrot events + "EVENT_BITROT_ENABLE", + "EVENT_BITROT_DISABLE", + "EVENT_BITROT_SCRUB_THROTTLE", + "EVENT_BITROT_SCRUB_FREQ", + "EVENT_BITROT_SCRUB_OPTION", + "EVENT_BITROT_SCRUB_ONDEMAND", + + #quota events + "EVENT_QUOTA_ENABLE", + "EVENT_QUOTA_DISABLE", + "EVENT_QUOTA_SET_USAGE_LIMIT", + "EVENT_QUOTA_SET_OBJECTS_LIMIT", + "EVENT_QUOTA_REMOVE_USAGE_LIMIT", + "EVENT_QUOTA_REMOVE_OBJECTS_LIMIT", + "EVENT_QUOTA_ALERT_TIME", + "EVENT_QUOTA_SOFT_TIMEOUT", + "EVENT_QUOTA_HARD_TIMEOUT", + "EVENT_QUOTA_DEFAULT_SOFT_LIMIT", + + #snapshot events + "EVENT_SNAPSHOT_CREATED", + "EVENT_SNAPSHOT_CREATE_FAILED", + "EVENT_SNAPSHOT_ACTIVATED", + "EVENT_SNAPSHOT_ACTIVATE_FAILED", + "EVENT_SNAPSHOT_DEACTIVATED", + "EVENT_SNAPSHOT_DEACTIVATE_FAILED", + "EVENT_SNAPSHOT_SOFT_LIMIT_REACHED", + "EVENT_SNAPSHOT_HARD_LIMIT_REACHED", + "EVENT_SNAPSHOT_RESTORED", + "EVENT_SNAPSHOT_RESTORE_FAILED", + "EVENT_SNAPSHOT_DELETED", + "EVENT_SNAPSHOT_DELETE_FAILED", + "EVENT_SNAPSHOT_CLONED", + "EVENT_SNAPSHOT_CLONE_FAILED", + "EVENT_SNAPSHOT_CONFIG_UPDATED", + "EVENT_SNAPSHOT_CONFIG_UPDATE_FAILED", + "EVENT_SNAPSHOT_SCHEDULER_INITIALISED", + "EVENT_SNAPSHOT_SCHEDULER_INIT_FAILED", + "EVENT_SNAPSHOT_SCHEDULER_ENABLED", + "EVENT_SNAPSHOT_SCHEDULER_ENABLE_FAILED", + "EVENT_SNAPSHOT_SCHEDULER_DISABLED", + "EVENT_SNAPSHOT_SCHEDULER_DISABLE_FAILED", + "EVENT_SNAPSHOT_SCHEDULER_SCHEDULE_ADDED", + "EVENT_SNAPSHOT_SCHEDULER_SCHEDULE_ADD_FAILED", + "EVENT_SNAPSHOT_SCHEDULER_SCHEDULE_EDITED", + "EVENT_SNAPSHOT_SCHEDULER_SCHEDULE_EDIT_FAILED", + "EVENT_SNAPSHOT_SCHEDULER_SCHEDULE_DELETED", + "EVENT_SNAPSHOT_SCHEDULER_SCHEDULE_DELETE_FAILED", + + #async events + #glusterd events + "EVENT_SVC_MANAGER_FAILED", + "EVENT_SVC_RECONFIGURE_FAILED", + "EVENT_SVC_CONNECTED", + "EVENT_SVC_DISCONNECTED", + "EVENT_PEER_STORE_FAILURE", + "EVENT_PEER_RPC_CREATE_FAILED", + "EVENT_PEER_REJECT", + "EVENT_PEER_CONNECT", + "EVENT_PEER_DISCONNECT", + "EVENT_PEER_NOT_FOUND", + "EVENT_UNKNOWN_PEER", + "EVENT_BRICK_START_FAILED", + "EVENT_BRICK_STOP_FAILED", + "EVENT_BRICK_DISCONNECTED", + "EVENT_BRICK_CONNECTED", + "EVENT_BRICKPATH_RESOLVE_FAILED", + "EVENT_NOTIFY_UNKNOWN_OP", + "EVENT_QUORUM_LOST", + "EVENT_QUORUM_REGAINED", + "EVENT_REBALANCE_START_FAILED", + "EVENT_REBALANCE_STATUS_UPDATE_FAILED", + "EVENT_IMPORT_QUOTA_CONF_FAILED", + "EVENT_IMPORT_VOLUME_FAILED", + "EVENT_IMPORT_BRICK_FAILED", + "EVENT_COMPARE_FRIEND_VOLUME_FAILED", + "EVENT_NFS_GANESHA_EXPORT_FAILED", + #ec events + "EVENT_EC_MIN_BRICKS_NOT_UP", + "EVENT_EC_MIN_BRICKS_UP", + #georep async events + "EVENT_GEOREP_FAULTY", + "EVENT_GEOREP_CHECKPOINT_COMPLETED", + "EVENT_GEOREP_ACTIVE", + "EVENT_GEOREP_PASSIVE", + + #quota async events + "EVENT_QUOTA_CROSSED_SOFT_LIMIT", + #bitrot async events + "EVENT_BITROT_BAD_FILE", + #protocol-server events + "EVENT_CLIENT_CONNECT", + "EVENT_CLIENT_AUTH_REJECT", + "EVENT_CLIENT_DISCONNECT", + #posix events + "EVENT_POSIX_SAME_GFID", + "EVENT_POSIX_ALREADY_PART_OF_VOLUME", + "EVENT_POSIX_BRICK_NOT_IN_VOLUME", + "EVENT_POSIX_BRICK_VERIFICATION_FAILED", + "EVENT_POSIX_ACL_NOT_SUPPORTED", + "EVENT_POSIX_HEALTH_CHECK_FAILED", + #afr events + "EVENT_AFR_QUORUM_MET", + "EVENT_AFR_QUORUM_FAIL", + "EVENT_AFR_SUBVOL_UP", + "EVENT_AFR_SUBVOLS_DOWN", + "EVENT_AFR_SPLIT_BRAIN", + + #tier events + "EVENT_TIER_ATTACH", + "EVENT_TIER_ATTACH_FORCE", + "EVENT_TIER_DETACH_START", + "EVENT_TIER_DETACH_STOP", + "EVENT_TIER_DETACH_COMMIT", + "EVENT_TIER_DETACH_FORCE", + "EVENT_TIER_PAUSE", + "EVENT_TIER_RESUME", + "EVENT_TIER_WATERMARK_HI", + "EVENT_TIER_WATERMARK_DROPPED_TO_MID", + "EVENT_TIER_WATERMARK_RAISED_TO_MID", + "EVENT_TIER_WATERMARK_DROPPED_TO_LOW", + + #dht events + #add/remove brick events + "EVENT_VOLUME_ADD_BRICK", + "EVENT_VOLUME_ADD_BRICK_FAILED", + "EVENT_VOLUME_REMOVE_BRICK_START", + "EVENT_VOLUME_REMOVE_BRICK_START_FAILED", + "EVENT_VOLUME_REMOVE_BRICK_COMMIT", + "EVENT_VOLUME_REMOVE_BRICK_COMMIT_FAILED", + "EVENT_VOLUME_REMOVE_BRICK_STOP", + "EVENT_VOLUME_REMOVE_BRICK_STOP_FAILED", + "EVENT_VOLUME_REMOVE_BRICK_FORCE", + "EVENT_VOLUME_REMOVE_BRICK_FORCE_FAILED", + "EVENT_VOLUME_REMOVE_BRICK_FAILED", + + #rebalance events + "EVENT_VOLUME_REBALANCE_START", + "EVENT_VOLUME_REBALANCE_STOP", + "EVENT_VOLUME_REBALANCE_FAILED", + "EVENT_VOLUME_REBALANCE_COMPLETE", + + #tier events + "EVENT_TIER_START", + "EVENT_TIER_START_FORCE", + + #brick/inodes events + "EVENT_DHT_DISK_USAGE", + "EVENT_DHT_INODES_USAGE", ) LAST_EVENT = "EVENT_LAST" @@ -48,7 +204,9 @@ ERRORS = ( "EVENT_ERROR_INVALID_INPUTS", "EVENT_ERROR_SOCKET", "EVENT_ERROR_CONNECT", - "EVENT_ERROR_SEND" + "EVENT_ERROR_SEND", + "EVENT_ERROR_RESOLVE", + "EVENT_ERROR_MSG_FORMAT", ) if gen_header_type == "C_HEADER": diff --git a/events/src/Makefile.am b/events/src/Makefile.am index 423bc8891fb..3b229691897 100644 --- a/events/src/Makefile.am +++ b/events/src/Makefile.am @@ -5,28 +5,37 @@ EXTRA_DIST = glustereventsd.py __init__.py eventsapiconf.py.in \ BUILT_SOURCES = eventtypes.py CLEANFILES = eventtypes.py -eventsdir = $(libexecdir)/glusterfs/events -eventspeerscriptdir = $(libexecdir)/glusterfs +eventsdir = $(GLUSTERFS_LIBEXECDIR)/gfevents +if BUILD_EVENTS +events_PYTHON = __init__.py gf_event.py eventsapiconf.py eventtypes.py \ + utils.py +endif +# this does not work, see the Makefile.am in the root for a workaround +#nodist_events_PYTHON = eventtypes.py + +eventtypes.py: $(top_srcdir)/events/eventskeygen.py + $(PYTHON) $(top_srcdir)/events/eventskeygen.py PY_HEADER + +if BUILD_EVENTS +eventspeerscriptdir = $(GLUSTERFS_LIBEXECDIR) eventsconfdir = $(sysconfdir)/glusterfs eventsconf_DATA = eventsconfig.json -events_PYTHON = __init__.py eventsapiconf.py eventtypes.py handlers.py \ - utils.py gf_event.py +events_PYTHON += handlers.py events_SCRIPTS = glustereventsd.py eventspeerscript_SCRIPTS = peer_eventsapi.py -eventtypes.py: $(top_srcdir)/events/eventskeygen.py - $(PYTHON) $(top_srcdir)/events/eventskeygen.py PY_HEADER - install-exec-hook: $(mkdir_p) $(DESTDIR)$(sbindir) rm -f $(DESTDIR)$(sbindir)/glustereventsd - ln -s $(libexecdir)/glusterfs/events/glustereventsd.py \ + ln -s $(GLUSTERFS_LIBEXECDIR)/gfevents/glustereventsd.py \ $(DESTDIR)$(sbindir)/glustereventsd rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi - ln -s $(libexecdir)/glusterfs/peer_eventsapi.py \ + ln -s $(GLUSTERFS_LIBEXECDIR)/peer_eventsapi.py \ $(DESTDIR)$(sbindir)/gluster-eventsapi uninstall-hook: rm -f $(DESTDIR)$(sbindir)/glustereventsd rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi + +endif diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in index 702e1d21820..700093bee60 100644 --- a/events/src/eventsapiconf.py.in +++ b/events/src/eventsapiconf.py.in @@ -9,14 +9,51 @@ # cases as published by the Free Software Foundation. # -SERVER_ADDRESS = "@localstatedir@/run/gluster/events.sock" +import subprocess +glusterd_workdir = None + +# Methods +def get_glusterd_workdir(): + global glusterd_workdir + if glusterd_workdir is not None: + return glusterd_workdir + proc = subprocess.Popen(["gluster", "system::", "getwd"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines = True) + out, err = proc.communicate() + if proc.returncode == 0: + glusterd_workdir = out.strip() + else: + glusterd_workdir = "@GLUSTERD_WORKDIR@" + return glusterd_workdir + +SERVER_ADDRESS = "0.0.0.0" +SERVER_ADDRESSv4 = "0.0.0.0" +SERVER_ADDRESSv6 = "::1" DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json" CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json" -CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC +CUSTOM_CONFIG_FILE = get_glusterd_workdir() + CUSTOM_CONFIG_FILE_TO_SYNC WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json" -WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC +WEBHOOKS_FILE = get_glusterd_workdir() + WEBHOOKS_FILE_TO_SYNC LOG_FILE = "@localstatedir@/log/glusterfs/events.log" EVENTSD = "glustereventsd" -CONFIG_KEYS = ["log_level"] -BOOL_CONFIGS = [] -RESTART_CONFIGS = [] +CONFIG_KEYS = ["log-level", "port", "disable-events-log"] +BOOL_CONFIGS = ["disable-events-log"] +INT_CONFIGS = ["port"] +RESTART_CONFIGS = ["port"] +EVENTS_ENABLED = @EVENTS_ENABLED@ +UUID_FILE = get_glusterd_workdir() + "/glusterd.info" +PID_FILE = "@localstatedir@/run/glustereventsd.pid" +AUTO_BOOL_ATTRIBUTES = ["force", "push-pem", "no-verify"] +AUTO_INT_ATTRIBUTES = ["ssh-port"] +CERTS_DIR = get_glusterd_workdir() + "/events" + +# Errors +ERROR_SAME_CONFIG = 2 +ERROR_ALL_NODES_STATUS_NOT_OK = 3 +ERROR_PARTIAL_SUCCESS = 4 +ERROR_WEBHOOK_ALREADY_EXISTS = 5 +ERROR_WEBHOOK_NOT_EXISTS = 6 +ERROR_INVALID_CONFIG = 7 +ERROR_WEBHOOK_SYNC_FAILED = 8 +ERROR_CONFIG_SYNC_FAILED = 9 diff --git a/events/src/eventsconfig.json b/events/src/eventsconfig.json index ce2c775f0bd..89e5b9c1d68 100644 --- a/events/src/eventsconfig.json +++ b/events/src/eventsconfig.json @@ -1,3 +1,5 @@ { - "log_level": "INFO" + "log-level": "INFO", + "port": 24009, + "disable-events-log": false } diff --git a/events/src/gf_event.py b/events/src/gf_event.py index 0924a65dddb..260b0d9aa48 100644 --- a/events/src/gf_event.py +++ b/events/src/gf_event.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -13,25 +12,32 @@ import socket import time -from eventsapiconf import SERVER_ADDRESS -from eventtypes import all_events +from gfevents.eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED +from gfevents.eventtypes import all_events -from utils import logger, setup_logger +from gfevents.utils import logger, setup_logger, get_config # Run this when this lib loads setup_logger() def gf_event(event_type, **kwargs): + if EVENTS_ENABLED == 0: + return + if not isinstance(event_type, int) or event_type >= len(all_events): logger.error("Invalid Event Type: {0}".format(event_type)) return try: - client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - client.connect(SERVER_ADDRESS) + client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) except socket.error as e: - logger.error("Unable to connect to events.sock: {0}".format(e)) + logger.error("Unable to connect to events Server: {0}".format(e)) + return + + port = get_config("port") + if port is None: + logger.error("Unable to get eventsd port details") return # Convert key value args into KEY1=VALUE1;KEY2=VALUE2;.. @@ -40,9 +46,15 @@ def gf_event(event_type, **kwargs): msg += "{0}={1};".format(k, v) # <TIMESTAMP> <EVENT_TYPE> <MSG> - msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")) + msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")).encode() try: - client.sendall(msg) + sent = client.sendto(msg, (SERVER_ADDRESS, port)) + assert sent == len(msg) except socket.error as e: logger.error("Unable to Send message: {0}".format(e)) + except AssertionError: + logger.error("Unable to send message. Sent: {0}, Actual: {1}".format( + sent, len(msg))) + finally: + client.close() diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py index 3fa57686a8b..341a3b60947 100644 --- a/events/src/glustereventsd.py +++ b/events/src/glustereventsd.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -11,39 +11,46 @@ # from __future__ import print_function -import asyncore -import socket -import os -from multiprocessing import Process, Queue import sys import signal +import threading +try: + import socketserver +except ImportError: + import SocketServer as socketserver +import socket +from argparse import ArgumentParser, RawDescriptionHelpFormatter from eventtypes import all_events import handlers import utils -from eventsapiconf import SERVER_ADDRESS -from utils import logger - -# Global Queue, EventsHandler will add items to the queue -# and process_event will gets each item and handles it -events_queue = Queue() -events_server_pid = None - - -def process_event(): - """ - Seperate process which handles all the incoming events from Gluster - processes. - """ - while True: - data = events_queue.get() - logger.debug("EVENT: {0}".format(repr(data))) +from eventsapiconf import SERVER_ADDRESSv4, SERVER_ADDRESSv6, PID_FILE +from eventsapiconf import AUTO_BOOL_ATTRIBUTES, AUTO_INT_ATTRIBUTES +from utils import logger, PidFile, PidFileLockFailed, boolify + +# Subclass so that specifically IPv4 packets are captured +class UDPServerv4(socketserver.ThreadingUDPServer): + address_family = socket.AF_INET + +# Subclass so that specifically IPv6 packets are captured +class UDPServerv6(socketserver.ThreadingUDPServer): + address_family = socket.AF_INET6 + +class GlusterEventsRequestHandler(socketserver.BaseRequestHandler): + + def handle(self): + data = self.request[0].strip() + if sys.version_info >= (3,): + data = self.request[0].strip().decode("utf-8") + + logger.debug("EVENT: {0} from {1}".format(repr(data), + self.client_address[0])) try: # Event Format <TIMESTAMP> <TYPE> <DETAIL> ts, key, value = data.split(" ", 2) except ValueError: logger.warn("Invalid Event Format {0}".format(data)) - continue + return data_dict = {} try: @@ -51,12 +58,22 @@ def process_event(): data_dict = dict(x.split('=') for x in value.split(';')) except ValueError: logger.warn("Unable to parse Event {0}".format(data)) - continue + return + + for k, v in data_dict.items(): + try: + if k in AUTO_BOOL_ATTRIBUTES: + data_dict[k] = boolify(v) + if k in AUTO_INT_ATTRIBUTES: + data_dict[k] = int(v) + except ValueError: + # Auto Conversion failed, Retain the old value + continue try: - # Event Type to Function Map, Recieved event data will be in + # Event Type to Function Map, Received event data will be in # the form <TIMESTAMP> <TYPE> <DETAIL>, Get Event name for the - # recieved Type/Key and construct a function name starting with + # received Type/Key and construct a function name starting with # handle_ For example: handle_event_volume_create func_name = "handle_" + all_events[int(key)].lower() except IndexError: @@ -75,73 +92,64 @@ def process_event(): handlers.generic_handler(ts, int(key), data_dict) -def process_event_wrapper(): - try: - process_event() - except KeyboardInterrupt: - return - - -class GlusterEventsHandler(asyncore.dispatcher_with_send): - - def handle_read(self): - data = self.recv(8192) - if data: - events_queue.put(data) - self.send(data) - - -class GlusterEventsServer(asyncore.dispatcher): - - def __init__(self): - global events_server_pid - asyncore.dispatcher.__init__(self) - # Start the Events listener process which listens to - # the global queue - p = Process(target=process_event_wrapper) - p.start() - events_server_pid = p.pid - - # Create UNIX Domain Socket, bind to path - self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.bind(SERVER_ADDRESS) - self.listen(5) - - def handle_accept(self): - pair = self.accept() - if pair is not None: - sock, addr = pair - GlusterEventsHandler(sock) - - def signal_handler_sigusr2(sig, frame): - if events_server_pid is not None: - os.kill(events_server_pid, signal.SIGUSR2) utils.load_all() + utils.restart_webhook_pool() + + +def UDP_server_thread(sock): + sock.serve_forever() def init_event_server(): utils.setup_logger() + utils.load_all() + utils.init_webhook_pool() - # Delete Socket file if Exists + port = utils.get_config("port") + if port is None: + sys.stderr.write("Unable to get Port details from Config\n") + sys.exit(1) + + # Creating the Eventing Server, UDP Server for IPv4 packets + try: + serverv4 = UDPServerv4((SERVER_ADDRESSv4, port), + GlusterEventsRequestHandler) + except socket.error as e: + sys.stderr.write("Failed to start Eventsd for IPv4: {0}\n".format(e)) + sys.exit(1) + # Creating the Eventing Server, UDP Server for IPv6 packets try: - os.unlink(SERVER_ADDRESS) - except OSError: - if os.path.exists(SERVER_ADDRESS): - print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS), - file=sys.stderr) - sys.exit(1) + serverv6 = UDPServerv6((SERVER_ADDRESSv6, port), + GlusterEventsRequestHandler) + except socket.error as e: + sys.stderr.write("Failed to start Eventsd for IPv6: {0}\n".format(e)) + sys.exit(1) + server_thread1 = threading.Thread(target=UDP_server_thread, + args=(serverv4,)) + server_thread2 = threading.Thread(target=UDP_server_thread, + args=(serverv6,)) + server_thread1.start() + server_thread2.start() - utils.load_all() - # Start the Eventing Server, UNIX DOMAIN SOCKET Server - GlusterEventsServer() - asyncore.loop() +def get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=__doc__) + parser.add_argument("-p", "--pid-file", help="PID File", + default=PID_FILE) + + return parser.parse_args() def main(): + args = get_args() try: - init_event_server() + with PidFile(args.pid_file): + init_event_server() + except PidFileLockFailed as e: + sys.stderr.write("Failed to get lock for pid file({0}): {1}".format( + args.pid_file, e)) except KeyboardInterrupt: sys.exit(1) diff --git a/events/src/handlers.py b/events/src/handlers.py index 21d3e83de54..7746d488bf3 100644 --- a/events/src/handlers.py +++ b/events/src/handlers.py @@ -23,7 +23,7 @@ def generic_handler(ts, key, data): def handle_event_volume_set(ts, key, data): """ - Recieved data will have all the options as one string, split into + Received data will have all the options as one string, split into list of options. "key1,value1,key2,value2" into [[key1, value1], [key2, value2]] """ diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py index 7887d77351c..4d2e5f35b1c 100644 --- a/events/src/peer_eventsapi.py +++ b/events/src/peer_eventsapi.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -14,24 +14,52 @@ from __future__ import print_function import os import json from errno import EEXIST +import fcntl +from errno import EACCES, EAGAIN +import signal +import sys +import time import requests -import fasteners from prettytable import PrettyTable -from gluster.cliutils import (Cmd, execute, node_output_ok, node_output_notok, +from gluster.cliutils import (Cmd, node_output_ok, node_output_notok, sync_file_to_peers, GlusterCmdException, - output_error, execute_in_peers, runcli) - -from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, - WEBHOOKS_FILE, - DEFAULT_CONFIG_FILE, - CUSTOM_CONFIG_FILE, - CUSTOM_CONFIG_FILE_TO_SYNC, - EVENTSD, - CONFIG_KEYS, - BOOL_CONFIGS, - RESTART_CONFIGS) + output_error, execute_in_peers, runcli, + set_common_args_func) +from gfevents.utils import LockedOpen, get_jwt_token, save_https_cert + +from gfevents.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE, + CUSTOM_CONFIG_FILE_TO_SYNC, + EVENTSD, + CONFIG_KEYS, + BOOL_CONFIGS, + INT_CONFIGS, + PID_FILE, + RESTART_CONFIGS, + ERROR_INVALID_CONFIG, + ERROR_WEBHOOK_NOT_EXISTS, + ERROR_CONFIG_SYNC_FAILED, + ERROR_WEBHOOK_ALREADY_EXISTS, + ERROR_PARTIAL_SUCCESS, + ERROR_ALL_NODES_STATUS_NOT_OK, + ERROR_SAME_CONFIG, + ERROR_WEBHOOK_SYNC_FAILED, + CERTS_DIR) + + +def handle_output_error(err, errcode=1, json_output=False): + if json_output: + print (json.dumps({ + "output": "", + "error": err + })) + sys.exit(errcode) + else: + output_error(err, errcode) def file_content_overwrite(fname, data): @@ -41,15 +69,27 @@ def file_content_overwrite(fname, data): os.rename(fname + ".tmp", fname) -def create_custom_config_file_if_not_exists(): - mkdirp(os.path.dirname(CUSTOM_CONFIG_FILE)) +def create_custom_config_file_if_not_exists(args): + try: + config_dir = os.path.dirname(CUSTOM_CONFIG_FILE) + mkdirp(config_dir) + except OSError as e: + handle_output_error("Failed to create dir %s: %s" % (config_dir, e), + json_output=args.json) + if not os.path.exists(CUSTOM_CONFIG_FILE): with open(CUSTOM_CONFIG_FILE, "w") as f: f.write("{}") -def create_webhooks_file_if_not_exists(): - mkdirp(os.path.dirname(WEBHOOKS_FILE)) +def create_webhooks_file_if_not_exists(args): + try: + webhooks_dir = os.path.dirname(WEBHOOKS_FILE) + mkdirp(webhooks_dir) + except OSError as e: + handle_output_error("Failed to create dir %s: %s" % (webhooks_dir, e), + json_output=args.json) + if not os.path.exists(WEBHOOKS_FILE): with open(WEBHOOKS_FILE, "w") as f: f.write("{}") @@ -70,104 +110,115 @@ def mkdirp(path, exit_on_err=False, logger=None): """ try: os.makedirs(path) - except (OSError, IOError) as e: - if e.errno == EEXIST and os.path.isdir(path): - pass - else: - output_error("Fail to create dir %s: %s" % (path, e)) - - -def is_enabled(service): - rc, out, err = execute(["systemctl", "is-enabled", service]) - return rc == 0 - - -def is_active(service): - rc, out, err = execute(["systemctl", "is-active", service]) - return rc == 0 + except OSError as e: + if e.errno != EEXIST or not os.path.isdir(path): + raise -def enable_service(service): - if not is_enabled(service): - cmd = ["systemctl", "enable", service] - return execute(cmd) - - return (0, "", "") +def is_active(): + state = False + try: + with open(PID_FILE, "a+") as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + state = False + except (IOError, OSError) as e: + if e.errno in (EACCES, EAGAIN): + # cannot grab. so, process still running..move on + state = True + else: + state = False + return state -def disable_service(service): - if is_enabled(service): - cmd = ["systemctl", "disable", service] - return execute(cmd) +def reload_service(): + pid = None + if is_active(): + with open(PID_FILE) as f: + try: + pid = int(f.read().strip()) + except ValueError: + pid = None + if pid is not None: + os.kill(pid, signal.SIGUSR2) return (0, "", "") -def start_service(service): - rc, out, err = enable_service(service) - if rc != 0: - return (rc, out, err) - - cmd = ["systemctl", "start", service] - return execute(cmd) - - -def stop_service(service): - rc, out, err = disable_service(service) - if rc != 0: - return (rc, out, err) - - cmd = ["systemctl", "stop", service] - return execute(cmd) - - -def restart_service(service): - rc, out, err = stop_service(service) - if rc != 0: - return (rc, out, err) +def rows_to_json(json_out, column_name, rows): + num_ok_rows = 0 + for row in rows: + num_ok_rows += 1 if row.ok else 0 + json_out.append({ + "node": row.hostname, + "node_status": "UP" if row.node_up else "DOWN", + column_name: "OK" if row.ok else "NOT OK", + "error": row.error + }) + return num_ok_rows - return start_service(service) +def rows_to_table(table, rows): + num_ok_rows = 0 + for row in rows: + num_ok_rows += 1 if row.ok else 0 + table.add_row([row.hostname, + "UP" if row.node_up else "DOWN", + "OK" if row.ok else "NOT OK: {0}".format( + row.error)]) + return num_ok_rows -def reload_service(service): - if is_active(service): - cmd = ["systemctl", "reload", service] - return execute(cmd) - return (0, "", "") - - -def sync_to_peers(restart=False): +def sync_to_peers(args): if os.path.exists(WEBHOOKS_FILE): try: sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC) except GlusterCmdException as e: - output_error("Failed to sync Webhooks file: [Error: {0}]" - "{1}".format(e[0], e[2])) + # Print stdout if stderr is empty + errmsg = e.message[2] if e.message[2] else e.message[1] + handle_output_error("Failed to sync Webhooks file: [Error: {0}]" + "{1}".format(e.message[0], errmsg), + errcode=ERROR_WEBHOOK_SYNC_FAILED, + json_output=args.json) if os.path.exists(CUSTOM_CONFIG_FILE): try: sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC) except GlusterCmdException as e: - output_error("Failed to sync Config file: [Error: {0}]" - "{1}".format(e[0], e[2])) - - action = "node-reload" - if restart: - action = "node-restart" - - out = execute_in_peers(action) - table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"]) - table.align["NODE STATUS"] = "r" - table.align["SYNC STATUS"] = "r" + # Print stdout if stderr is empty + errmsg = e.message[2] if e.message[2] else e.message[1] + handle_output_error("Failed to sync Config file: [Error: {0}]" + "{1}".format(e.message[0], errmsg), + errcode=ERROR_CONFIG_SYNC_FAILED, + json_output=args.json) + + out = execute_in_peers("node-reload") + if not args.json: + table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"]) + table.align["NODE STATUS"] = "r" + table.align["SYNC STATUS"] = "r" - for p in out: - table.add_row([p.hostname, - "UP" if p.node_up else "DOWN", - "OK" if p.ok else "NOT OK: {0}".format( - p.error)]) + json_out = [] + if args.json: + num_ok_rows = rows_to_json(json_out, "sync_status", out) + else: + num_ok_rows = rows_to_table(table, out) + + ret = 0 + if num_ok_rows == 0: + ret = ERROR_ALL_NODES_STATUS_NOT_OK + elif num_ok_rows != len(out): + ret = ERROR_PARTIAL_SUCCESS + + if args.json: + print (json.dumps({ + "output": json_out, + "error": "" + })) + else: + print (table) - print (table) + # If sync status is not ok for any node set error code as partial success + sys.exit(ret) def node_output_handle(resp): @@ -178,92 +229,52 @@ def node_output_handle(resp): node_output_notok(err) -def action_handle(action): +def action_handle(action, json_output=False): out = execute_in_peers("node-" + action) column_name = action.upper() if action == "status": column_name = EVENTSD.upper() - table = PrettyTable(["NODE", "NODE STATUS", column_name + " STATUS"]) - table.align["NODE STATUS"] = "r" - table.align[column_name + " STATUS"] = "r" - - for p in out: - status_col_val = "OK" if p.ok else "NOT OK: {0}".format( - p.error) - if action == "status": - status_col_val = "DOWN" - if p.ok: - status_col_val = p.output - - table.add_row([p.hostname, - "UP" if p.node_up else "DOWN", - status_col_val]) - - print (table) - - -class NodeStart(Cmd): - name = "node-start" - - def run(self, args): - node_output_handle(start_service(EVENTSD)) - - -class StartCmd(Cmd): - name = "start" - - def run(self, args): - action_handle("start") - - -class NodeStop(Cmd): - name = "node-stop" - - def run(self, args): - node_output_handle(stop_service(EVENTSD)) - - -class StopCmd(Cmd): - name = "stop" - - def run(self, args): - action_handle("stop") - - -class NodeRestart(Cmd): - name = "node-restart" - - def run(self, args): - node_output_handle(restart_service(EVENTSD)) - + if not json_output: + table = PrettyTable(["NODE", "NODE STATUS", column_name + " STATUS"]) + table.align["NODE STATUS"] = "r" + table.align[column_name + " STATUS"] = "r" -class RestartCmd(Cmd): - name = "restart" + json_out = [] + if json_output: + rows_to_json(json_out, column_name.lower() + "_status", out) + else: + rows_to_table(table, out) - def run(self, args): - action_handle("restart") + return json_out if json_output else table class NodeReload(Cmd): name = "node-reload" def run(self, args): - node_output_handle(reload_service(EVENTSD)) + node_output_handle(reload_service()) class ReloadCmd(Cmd): name = "reload" def run(self, args): - action_handle("reload") + out = action_handle("reload", args.json) + if args.json: + print (json.dumps({ + "output": out, + "error": "" + })) + else: + print (out) class NodeStatus(Cmd): name = "node-status" def run(self, args): - node_output_ok("UP" if is_active(EVENTSD) else "DOWN") + node_output_ok("UP" if is_active() else "DOWN") class StatusCmd(Cmd): @@ -274,12 +285,25 @@ class StatusCmd(Cmd): if os.path.exists(WEBHOOKS_FILE): webhooks = json.load(open(WEBHOOKS_FILE)) - print ("Webhooks: " + ("" if webhooks else "None")) - for w in webhooks: - print (w) - - print () - action_handle("status") + json_out = {"webhooks": [], "data": []} + if args.json: + json_out["webhooks"] = webhooks.keys() + else: + print ("Webhooks: " + ("" if webhooks else "None")) + for w in webhooks: + print (w) + + print () + + out = action_handle("status", args.json) + if args.json: + json_out["data"] = out + print (json.dumps({ + "output": json_out, + "error": "" + })) + else: + print (out) class WebhookAddCmd(Cmd): @@ -289,19 +313,24 @@ class WebhookAddCmd(Cmd): parser.add_argument("url", help="URL of Webhook") parser.add_argument("--bearer_token", "-t", help="Bearer Token", default="") + parser.add_argument("--secret", "-s", + help="Secret to add JWT Bearer Token", default="") def run(self, args): - create_webhooks_file_if_not_exists() + create_webhooks_file_if_not_exists(args) - with fasteners.InterProcessLock(WEBHOOKS_FILE): + with LockedOpen(WEBHOOKS_FILE, 'r+'): data = json.load(open(WEBHOOKS_FILE)) if data.get(args.url, None) is not None: - output_error("Webhook already exists") + handle_output_error("Webhook already exists", + errcode=ERROR_WEBHOOK_ALREADY_EXISTS, + json_output=args.json) - data[args.url] = args.bearer_token + data[args.url] = {"token": args.bearer_token, + "secret": args.secret} file_content_overwrite(WEBHOOKS_FILE, data) - sync_to_peers() + sync_to_peers(args) class WebhookModCmd(Cmd): @@ -311,19 +340,31 @@ class WebhookModCmd(Cmd): parser.add_argument("url", help="URL of Webhook") parser.add_argument("--bearer_token", "-t", help="Bearer Token", default="") + parser.add_argument("--secret", "-s", + help="Secret to add JWT Bearer Token", default="") def run(self, args): - create_webhooks_file_if_not_exists() + create_webhooks_file_if_not_exists(args) - with fasteners.InterProcessLock(WEBHOOKS_FILE): + with LockedOpen(WEBHOOKS_FILE, 'r+'): data = json.load(open(WEBHOOKS_FILE)) if data.get(args.url, None) is None: - output_error("Webhook does not exists") + handle_output_error("Webhook does not exists", + errcode=ERROR_WEBHOOK_NOT_EXISTS, + json_output=args.json) + + if isinstance(data[args.url], str): + data[args.url]["token"] = data[args.url] + + if args.bearer_token != "": + data[args.url]["token"] = args.bearer_token + + if args.secret != "": + data[args.url]["secret"] = args.secret - data[args.url] = args.bearer_token file_content_overwrite(WEBHOOKS_FILE, data) - sync_to_peers() + sync_to_peers(args) class WebhookDelCmd(Cmd): @@ -333,17 +374,19 @@ class WebhookDelCmd(Cmd): parser.add_argument("url", help="URL of Webhook") def run(self, args): - create_webhooks_file_if_not_exists() + create_webhooks_file_if_not_exists(args) - with fasteners.InterProcessLock(WEBHOOKS_FILE): + with LockedOpen(WEBHOOKS_FILE, 'r+'): data = json.load(open(WEBHOOKS_FILE)) if data.get(args.url, None) is None: - output_error("Webhook does not exists") + handle_output_error("Webhook does not exists", + errcode=ERROR_WEBHOOK_NOT_EXISTS, + json_output=args.json) del data[args.url] file_content_overwrite(WEBHOOKS_FILE, data) - sync_to_peers() + sync_to_peers(args) class NodeWebhookTestCmd(Cmd): @@ -352,16 +395,57 @@ class NodeWebhookTestCmd(Cmd): def args(self, parser): parser.add_argument("url") parser.add_argument("bearer_token") + parser.add_argument("secret") def run(self, args): http_headers = {} + hashval = "" if args.bearer_token != ".": - http_headers["Authorization"] = "Bearer " + args.bearer_token - - try: - resp = requests.post(args.url, headers=http_headers) - except requests.ConnectionError as e: - node_output_notok("{0}".format(e)) + hashval = args.bearer_token + + if args.secret != ".": + hashval = get_jwt_token(args.secret, "TEST", int(time.time())) + + if hashval: + http_headers["Authorization"] = "Bearer " + hashval + + urldata = requests.utils.urlparse(args.url) + parts = urldata.netloc.split(":") + domain = parts[0] + # Default https port if not specified + port = 443 + if len(parts) == 2: + port = int(parts[1]) + + cert_path = os.path.join(CERTS_DIR, args.url.replace("/", "_").strip()) + verify = True + while True: + try: + resp = requests.post(args.url, headers=http_headers, + verify=verify) + # Successful webhook push + break + except requests.exceptions.SSLError as e: + # If verify is equal to cert path, but still failed with + # SSLError, Looks like some issue with custom downloaded + # certificate, Try with verify = false + if verify == cert_path: + verify = False + continue + + # If verify is instance of bool and True, then custom cert + # is required, download the cert and retry + try: + save_https_cert(domain, port, cert_path) + verify = cert_path + except Exception: + verify = False + + # Done with collecting cert, continue + continue + except Exception as e: + node_output_notok("{0}".format(e)) + break if resp.status_code != 200: node_output_notok("{0}".format(resp.status_code)) @@ -375,28 +459,51 @@ class WebhookTestCmd(Cmd): def args(self, parser): parser.add_argument("url", help="URL of Webhook") parser.add_argument("--bearer_token", "-t", help="Bearer Token") + parser.add_argument("--secret", "-s", + help="Secret to generate Bearer Token") def run(self, args): url = args.url bearer_token = args.bearer_token + secret = args.secret + if not args.url: url = "." if not args.bearer_token: bearer_token = "." + if not args.secret: + secret = "." - out = execute_in_peers("node-webhook-test", [url, bearer_token]) + out = execute_in_peers("node-webhook-test", [url, bearer_token, + secret]) - table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"]) - table.align["NODE STATUS"] = "r" - table.align["WEBHOOK STATUS"] = "r" + if not args.json: + table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"]) + table.align["NODE STATUS"] = "r" + table.align["WEBHOOK STATUS"] = "r" - for p in out: - table.add_row([p.hostname, - "UP" if p.node_up else "DOWN", - "OK" if p.ok else "NOT OK: {0}".format( - p.error)]) + num_ok_rows = 0 + json_out = [] + if args.json: + num_ok_rows = rows_to_json(json_out, "webhook_status", out) + else: + num_ok_rows = rows_to_table(table, out) + + ret = 0 + if num_ok_rows == 0: + ret = ERROR_ALL_NODES_STATUS_NOT_OK + elif num_ok_rows != len(out): + ret = ERROR_PARTIAL_SUCCESS + + if args.json: + print (json.dumps({ + "output": json_out, + "error": "" + })) + else: + print (table) - print (table) + sys.exit(ret) class ConfigGetCmd(Cmd): @@ -411,16 +518,30 @@ class ConfigGetCmd(Cmd): data.update(json.load(open(CUSTOM_CONFIG_FILE))) if args.name is not None and args.name not in CONFIG_KEYS: - output_error("Invalid Config item") + handle_output_error("Invalid Config item", + errcode=ERROR_INVALID_CONFIG, + json_output=args.json) + + if args.json: + json_out = {} + if args.name is None: + json_out = data + else: + json_out[args.name] = data[args.name] - table = PrettyTable(["NAME", "VALUE"]) - if args.name is None: - for k, v in data.items(): - table.add_row([k, v]) + print (json.dumps({ + "output": json_out, + "error": "" + })) else: - table.add_row([args.name, data[args.name]]) + table = PrettyTable(["NAME", "VALUE"]) + if args.name is None: + for k, v in data.items(): + table.add_row([k, v]) + else: + table.add_row([args.name, data[args.name]]) - print (table) + print (table) def read_file_content_json(fname): @@ -442,9 +563,13 @@ class ConfigSetCmd(Cmd): def run(self, args): if args.name not in CONFIG_KEYS: - output_error("Invalid Config item") + handle_output_error("Invalid Config item", + errcode=ERROR_INVALID_CONFIG, + json_output=args.json) - with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE): + create_custom_config_file_if_not_exists(args) + + with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'): data = json.load(open(DEFAULT_CONFIG_FILE)) if os.path.exists(CUSTOM_CONFIG_FILE): config_json = read_file_content_json(CUSTOM_CONFIG_FILE) @@ -452,16 +577,20 @@ class ConfigSetCmd(Cmd): # Do Nothing if same as previous value if data[args.name] == args.value: - return + handle_output_error("Config value not changed. Same config", + errcode=ERROR_SAME_CONFIG, + json_output=args.json) # TODO: Validate Value - create_custom_config_file_if_not_exists() new_data = read_file_content_json(CUSTOM_CONFIG_FILE) v = args.value if args.name in BOOL_CONFIGS: v = boolify(args.value) + if args.name in INT_CONFIGS: + v = int(args.value) + new_data[args.name] = v file_content_overwrite(CUSTOM_CONFIG_FILE, new_data) @@ -470,7 +599,10 @@ class ConfigSetCmd(Cmd): if args.name in RESTART_CONFIGS: restart = True - sync_to_peers(restart=restart) + if restart: + print ("\nRestart glustereventsd in all nodes") + + sync_to_peers(args) class ConfigResetCmd(Cmd): @@ -480,14 +612,22 @@ class ConfigResetCmd(Cmd): parser.add_argument("name", help="Config Name or all") def run(self, args): - with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE): + create_custom_config_file_if_not_exists(args) + + with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'): changed_keys = [] data = {} if os.path.exists(CUSTOM_CONFIG_FILE): data = read_file_content_json(CUSTOM_CONFIG_FILE) - if not data: - return + # If No data available in custom config or, the specific config + # item is not available in custom config + if not data or \ + (args.name != "all" and data.get(args.name, None) is None): + handle_output_error("Config value not reset. Already " + "set to default value", + errcode=ERROR_SAME_CONFIG, + json_output=args.json) if args.name.lower() == "all": for k, v in data.items(): @@ -507,15 +647,23 @@ class ConfigResetCmd(Cmd): restart = True break - sync_to_peers(restart=restart) + if restart: + print ("\nRestart glustereventsd in all nodes") + + sync_to_peers(args) class SyncCmd(Cmd): name = "sync" def run(self, args): - sync_to_peers() + sync_to_peers(args) + + +def common_args(parser): + parser.add_argument("--json", help="JSON Output", action="store_true") if __name__ == "__main__": + set_common_args_func(common_args) runcli() diff --git a/events/src/utils.py b/events/src/utils.py index 772221a1e25..6d4e0791a2b 100644 --- a/events/src/utils.py +++ b/events/src/utils.py @@ -9,22 +9,39 @@ # cases as published by the Free Software Foundation. # +import sys import json import os import logging +import logging.handlers +import fcntl +from errno import EBADF +from threading import Thread +import multiprocessing +try: + from queue import Queue +except ImportError: + from Queue import Queue +from datetime import datetime, timedelta +import base64 +import hmac +from hashlib import sha256 +from calendar import timegm -import requests -from eventsapiconf import (LOG_FILE, - WEBHOOKS_FILE, - DEFAULT_CONFIG_FILE, - CUSTOM_CONFIG_FILE) -import eventtypes +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from gluster.cliutils import get_node_uuid +from gfevents.eventsapiconf import (LOG_FILE, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE, + UUID_FILE, + CERTS_DIR) +from gfevents import eventtypes # Webhooks list _webhooks = {} +_webhooks_file_mtime = 0 # Default Log Level _log_level = "INFO" # Config Object @@ -32,6 +49,38 @@ _config = {} # Init Logger instance logger = logging.getLogger(__name__) +NodeID = None +webhooks_pool = None + + +def boolify(value): + value = str(value) + if value.lower() in ["1", "on", "true", "yes"]: + return True + else: + return False + + +def log_event(data): + # Log all published events unless it is disabled + if not _config.get("disable-events-log", False): + logger.info(repr(data)) + + +def get_node_uuid(): + val = None + with open(UUID_FILE) as f: + for line in f: + if line.startswith("UUID="): + val = line.strip().split("=")[-1] + break + return val + + +def get_config(key, default_value=None): + if not _config: + load_config() + return _config.get(key, default_value) def get_event_type_name(idx): @@ -50,7 +99,7 @@ def setup_logger(): logger.setLevel(logging.INFO) # create the logging file handler - fh = logging.FileHandler(LOG_FILE) + fh = logging.handlers.WatchedFileHandler(LOG_FILE) formatter = logging.Formatter("[%(asctime)s] %(levelname)s " "[%(module)s - %(lineno)s:%(funcName)s] " @@ -81,7 +130,7 @@ def load_log_level(): be triggered during init and when SIGUSR2. """ global logger, _log_level - new_log_level = _config.get("log_level", "INFO") + new_log_level = _config.get("log-level", "INFO") if _log_level != new_log_level: logger.setLevel(getattr(logging, new_log_level.upper())) _log_level = new_log_level.upper() @@ -92,10 +141,12 @@ def load_webhooks(): Load/Reload the webhooks list. This function will be triggered during init and when SIGUSR2. """ - global _webhooks + global _webhooks, _webhooks_file_mtime _webhooks = {} if os.path.exists(WEBHOOKS_FILE): _webhooks = json.load(open(WEBHOOKS_FILE)) + st = os.lstat(WEBHOOKS_FILE) + _webhooks_file_mtime = st.st_mtime def load_all(): @@ -109,12 +160,21 @@ def load_all(): def publish(ts, event_key, data): + global NodeID + if NodeID is None: + NodeID = get_node_uuid() + + autoload_webhooks() + message = { - "nodeid": get_node_uuid(), + "nodeid": NodeID, "ts": int(ts), "event": get_event_type_name(event_key), "message": data } + + log_event(message) + if _webhooks: plugin_webhook(message) else: @@ -122,29 +182,264 @@ def publish(ts, event_key, data): pass +def autoload_webhooks(): + global _webhooks_file_mtime + try: + st = os.lstat(WEBHOOKS_FILE) + except OSError: + st = None + + if st is not None: + # If Stat is available and mtime is not matching with + # previously recorded mtime, reload the webhooks file + if st.st_mtime != _webhooks_file_mtime: + load_webhooks() + + +def base64_urlencode(inp): + return base64.urlsafe_b64encode(inp).replace("=", "").strip() + + +def get_jwt_token(secret, event_type, event_ts, jwt_expiry_time_seconds=60): + exp = datetime.utcnow() + timedelta(seconds=jwt_expiry_time_seconds) + payload = { + "exp": timegm(exp.utctimetuple()), + "iss": "gluster", + "sub": event_type, + "iat": event_ts + } + header = '{"alg":"HS256","typ":"JWT"}' + payload = json.dumps(payload, separators=(',', ':'), sort_keys=True) + msg = base64_urlencode(header) + "." + base64_urlencode(payload) + return "%s.%s" % ( + msg, + base64_urlencode(hmac.HMAC(str(secret), msg, sha256).digest()) + ) + + +def save_https_cert(domain, port, cert_path): + import ssl + + # Cert file already available for this URL + if os.path.exists(cert_path): + return + + cert_data = ssl.get_server_certificate((domain, port)) + with open(cert_path, "w") as f: + f.write(cert_data) + + +def publish_to_webhook(url, token, secret, message_queue): + # Import requests here since not used in any other place + import requests + + http_headers = {"Content-Type": "application/json"} + urldata = requests.utils.urlparse(url) + parts = urldata.netloc.split(":") + domain = parts[0] + # Default https port if not specified + port = 443 + if len(parts) == 2: + port = int(parts[1]) + + cert_path = os.path.join(CERTS_DIR, url.replace("/", "_").strip()) + + while True: + hashval = "" + event_type, event_ts, message_json = message_queue.get() + if token != "" and token is not None: + hashval = token + + if secret != "" and secret is not None: + hashval = get_jwt_token(secret, event_type, event_ts) + + if hashval: + http_headers["Authorization"] = "Bearer " + hashval + + verify = True + while True: + try: + resp = requests.post(url, headers=http_headers, + data=message_json, + verify=verify) + # Successful webhook push + message_queue.task_done() + if resp.status_code != 200: + logger.warn("Event push failed to URL: {url}, " + "Event: {event}, " + "Status Code: {status_code}".format( + url=url, + event=message_json, + status_code=resp.status_code)) + break + except requests.exceptions.SSLError as e: + # If verify is equal to cert path, but still failed with + # SSLError, Looks like some issue with custom downloaded + # certificate, Try with verify = false + if verify == cert_path: + logger.warn("Event push failed with certificate, " + "ignoring verification url={0} " + "Error={1}".format(url, e)) + verify = False + continue + + # If verify is instance of bool and True, then custom cert + # is required, download the cert and retry + try: + save_https_cert(domain, port, cert_path) + verify = cert_path + except Exception as ex: + verify = False + logger.warn("Unable to get Server certificate, " + "ignoring verification url={0} " + "Error={1}".format(url, ex)) + + # Done with collecting cert, continue + continue + except Exception as e: + logger.warn("Event push failed to URL: {url}, " + "Event: {event}, " + "Status: {error}".format( + url=url, + event=message_json, + error=e)) + message_queue.task_done() + break + + def plugin_webhook(message): message_json = json.dumps(message, sort_keys=True) logger.debug("EVENT: {0}".format(message_json)) - for url, token in _webhooks.items(): - http_headers = {"Content-Type": "application/json"} - if token != "" and token is not None: - http_headers["Authorization"] = "Bearer " + token + webhooks_pool.send(message["event"], message["ts"], message_json) + + +class LockedOpen(object): + def __init__(self, filename, *args, **kwargs): + self.filename = filename + self.open_args = args + self.open_kwargs = kwargs + self.fileobj = None + + def __enter__(self): + """ + If two processes compete to update a file, The first process + gets the lock and the second process is blocked in the fcntl.flock() + call. When first process replaces the file and releases the lock, + the already open file descriptor in the second process now points + to a "ghost" file(not reachable by any path name) with old contents. + To avoid that conflict, check the fd already opened is same or + not. Open new one if not same + """ + f = open(self.filename, *self.open_args, **self.open_kwargs) + while True: + fcntl.flock(f, fcntl.LOCK_EX) + fnew = open(self.filename, *self.open_args, **self.open_kwargs) + if os.path.sameopenfile(f.fileno(), fnew.fileno()): + fnew.close() + break + else: + f.close() + f = fnew + self.fileobj = f + return f + + def __exit__(self, _exc_type, _exc_value, _traceback): + self.fileobj.close() + + +class PidFileLockFailed(Exception): + pass + + +class PidFile(object): + def __init__(self, filename): + self.filename = filename + self.pid = os.getpid() + self.fh = None + + def cleanup(self, remove_file=True): try: - resp = requests.post(url, headers=http_headers, data=message_json) - except requests.ConnectionError as e: - logger.warn("Event push failed to URL: {url}, " - "Event: {event}, " - "Status: {error}".format( - url=url, - event=message_json, - error=e)) - continue - - if resp.status_code != 200: - logger.warn("Event push failed to URL: {url}, " - "Event: {event}, " - "Status Code: {status_code}".format( - url=url, - event=message_json, - status_code=resp.status_code)) + if self.fh is not None: + self.fh.close() + except IOError as exc: + if exc.errno != EBADF: + raise + finally: + if os.path.isfile(self.filename) and remove_file: + os.remove(self.filename) + + def __enter__(self): + self.fh = open(self.filename, 'a+') + try: + fcntl.flock(self.fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as exc: + self.cleanup(remove_file=False) + raise PidFileLockFailed(exc) + + self.fh.seek(0) + self.fh.truncate() + self.fh.write("%d\n" % self.pid) + self.fh.flush() + self.fh.seek(0) + return self + + def __exit__(self, _exc_type, _exc_value, _traceback): + self.cleanup() + + +def webhook_monitor(proc_queue, webhooks): + queues = {} + for url, data in webhooks.items(): + if isinstance(data, str): + token = data + secret = None + else: + token = data["token"] + secret = data["secret"] + + queues[url] = Queue() + t = Thread(target=publish_to_webhook, args=(url, token, secret, + queues[url])) + t.start() + + # Get the message sent to Process queue and distribute to all thread queues + while True: + message = proc_queue.get() + for _, q in queues.items(): + q.put(message) + + +class WebhookThreadPool(object): + def start(self): + # Separate process to emit messages to webhooks + # which maintains one thread per webhook. Separate + # process is required since on reload we need to stop + # and start the thread pool. In Python Threads can't be stopped + # so terminate the process and start again. Note: In transit + # events will be missed during reload + self.queue = multiprocessing.Queue() + self.proc = multiprocessing.Process(target=webhook_monitor, + args=(self.queue, _webhooks)) + self.proc.start() + + def restart(self): + # In transit messages are skipped, since webhooks monitor process + # is terminated. + self.proc.terminate() + self.start() + + def send(self, event_type, event_ts, message): + self.queue.put((event_type, event_ts, message)) + + +def init_webhook_pool(): + global webhooks_pool + webhooks_pool = WebhookThreadPool() + webhooks_pool.start() + + +def restart_webhook_pool(): + global webhooks_pool + if webhooks_pool is not None: + webhooks_pool.restart() diff --git a/events/tools/Makefile.am b/events/tools/Makefile.am index 7d5e331e4e1..fb6770cf070 100644 --- a/events/tools/Makefile.am +++ b/events/tools/Makefile.am @@ -1,3 +1,6 @@ +EXTRA_DIST = eventsdash.py + +if BUILD_EVENTS scriptsdir = $(datadir)/glusterfs/scripts scripts_SCRIPTS = eventsdash.py -EXTRA_DIST = eventsdash.py +endif diff --git a/events/tools/eventsdash.py b/events/tools/eventsdash.py index 47fc56dda6e..6479ea59da6 100644 --- a/events/tools/eventsdash.py +++ b/events/tools/eventsdash.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -10,6 +10,7 @@ # cases as published by the Free Software Foundation. # +from __future__ import print_function from argparse import ArgumentParser, RawDescriptionHelpFormatter import logging from datetime import datetime @@ -41,11 +42,11 @@ def listen(): for k, v in data.get("message", {}).items(): message.append("{0}={1}".format(k, v)) - print ("{0:20s} {1:20s} {2:36} {3}".format( + print(("{0:20s} {1:20s} {2:36} {3}".format( human_time(data.get("ts")), data.get("event"), data.get("nodeid"), - " ".join(message))) + " ".join(message)))) return "OK" @@ -58,12 +59,12 @@ def main(): action="store_true") args = parser.parse_args() - print ("{0:20s} {1:20s} {2:36} {3}".format( + print(("{0:20s} {1:20s} {2:36} {3}".format( "TIMESTAMP", "EVENT", "NODE ID", "MESSAGE" - )) - print ("{0:20s} {1:20s} {2:36} {3}".format( + ))) + print(("{0:20s} {1:20s} {2:36} {3}".format( "-"*20, "-"*20, "-"*36, "-"*20 - )) + ))) if args.debug: app.debug = True |
