diff options
Diffstat (limited to 'events/src/utils.py')
| -rw-r--r-- | events/src/utils.py | 270 |
1 files changed, 233 insertions, 37 deletions
diff --git a/events/src/utils.py b/events/src/utils.py index dadd9ae3332..6d4e0791a2b 100644 --- a/events/src/utils.py +++ b/events/src/utils.py @@ -9,22 +9,39 @@ # cases as published by the Free Software Foundation. # +import sys import json import os import logging +import logging.handlers import fcntl -from errno import ESRCH, EBADF - -from eventsapiconf import (LOG_FILE, - WEBHOOKS_FILE, - DEFAULT_CONFIG_FILE, - CUSTOM_CONFIG_FILE, - UUID_FILE) -import eventtypes +from errno import EBADF +from threading import Thread +import multiprocessing +try: + from queue import Queue +except ImportError: + from Queue import Queue +from datetime import datetime, timedelta +import base64 +import hmac +from hashlib import sha256 +from calendar import timegm + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from gfevents.eventsapiconf import (LOG_FILE, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE, + UUID_FILE, + CERTS_DIR) +from gfevents import eventtypes # Webhooks list _webhooks = {} +_webhooks_file_mtime = 0 # Default Log Level _log_level = "INFO" # Config Object @@ -33,6 +50,21 @@ _config = {} # Init Logger instance logger = logging.getLogger(__name__) NodeID = None +webhooks_pool = None + + +def boolify(value): + value = str(value) + if value.lower() in ["1", "on", "true", "yes"]: + return True + else: + return False + + +def log_event(data): + # Log all published events unless it is disabled + if not _config.get("disable-events-log", False): + logger.info(repr(data)) def get_node_uuid(): @@ -45,10 +77,10 @@ def get_node_uuid(): return val -def get_config(key): +def get_config(key, default_value=None): if not _config: load_config() - return _config.get(key, None) + return _config.get(key, default_value) def get_event_type_name(idx): @@ -67,7 +99,7 @@ def setup_logger(): logger.setLevel(logging.INFO) # create the logging file handler - fh = logging.FileHandler(LOG_FILE) + fh = logging.handlers.WatchedFileHandler(LOG_FILE) formatter = logging.Formatter("[%(asctime)s] %(levelname)s " "[%(module)s - %(lineno)s:%(funcName)s] " @@ -98,7 +130,7 @@ def load_log_level(): be triggered during init and when SIGUSR2. """ global logger, _log_level - new_log_level = _config.get("log_level", "INFO") + new_log_level = _config.get("log-level", "INFO") if _log_level != new_log_level: logger.setLevel(getattr(logging, new_log_level.upper())) _log_level = new_log_level.upper() @@ -109,10 +141,12 @@ def load_webhooks(): Load/Reload the webhooks list. This function will be triggered during init and when SIGUSR2. """ - global _webhooks + global _webhooks, _webhooks_file_mtime _webhooks = {} if os.path.exists(WEBHOOKS_FILE): _webhooks = json.load(open(WEBHOOKS_FILE)) + st = os.lstat(WEBHOOKS_FILE) + _webhooks_file_mtime = st.st_mtime def load_all(): @@ -130,12 +164,17 @@ def publish(ts, event_key, data): if NodeID is None: NodeID = get_node_uuid() + autoload_webhooks() + message = { "nodeid": NodeID, "ts": int(ts), "event": get_event_type_name(event_key), "message": data } + + log_event(message) + if _webhooks: plugin_webhook(message) else: @@ -143,35 +182,135 @@ def publish(ts, event_key, data): pass -def plugin_webhook(message): +def autoload_webhooks(): + global _webhooks_file_mtime + try: + st = os.lstat(WEBHOOKS_FILE) + except OSError: + st = None + + if st is not None: + # If Stat is available and mtime is not matching with + # previously recorded mtime, reload the webhooks file + if st.st_mtime != _webhooks_file_mtime: + load_webhooks() + + +def base64_urlencode(inp): + return base64.urlsafe_b64encode(inp).replace("=", "").strip() + + +def get_jwt_token(secret, event_type, event_ts, jwt_expiry_time_seconds=60): + exp = datetime.utcnow() + timedelta(seconds=jwt_expiry_time_seconds) + payload = { + "exp": timegm(exp.utctimetuple()), + "iss": "gluster", + "sub": event_type, + "iat": event_ts + } + header = '{"alg":"HS256","typ":"JWT"}' + payload = json.dumps(payload, separators=(',', ':'), sort_keys=True) + msg = base64_urlencode(header) + "." + base64_urlencode(payload) + return "%s.%s" % ( + msg, + base64_urlencode(hmac.HMAC(str(secret), msg, sha256).digest()) + ) + + +def save_https_cert(domain, port, cert_path): + import ssl + + # Cert file already available for this URL + if os.path.exists(cert_path): + return + + cert_data = ssl.get_server_certificate((domain, port)) + with open(cert_path, "w") as f: + f.write(cert_data) + + +def publish_to_webhook(url, token, secret, message_queue): # Import requests here since not used in any other place import requests - message_json = json.dumps(message, sort_keys=True) - logger.debug("EVENT: {0}".format(message_json)) - for url, token in _webhooks.items(): - http_headers = {"Content-Type": "application/json"} + http_headers = {"Content-Type": "application/json"} + urldata = requests.utils.urlparse(url) + parts = urldata.netloc.split(":") + domain = parts[0] + # Default https port if not specified + port = 443 + if len(parts) == 2: + port = int(parts[1]) + + cert_path = os.path.join(CERTS_DIR, url.replace("/", "_").strip()) + + while True: + hashval = "" + event_type, event_ts, message_json = message_queue.get() if token != "" and token is not None: - http_headers["Authorization"] = "Bearer " + token + hashval = token - try: - resp = requests.post(url, headers=http_headers, data=message_json) - except requests.ConnectionError as e: - logger.warn("Event push failed to URL: {url}, " - "Event: {event}, " - "Status: {error}".format( - url=url, - event=message_json, - error=e)) - continue - - if resp.status_code != 200: - logger.warn("Event push failed to URL: {url}, " - "Event: {event}, " - "Status Code: {status_code}".format( - url=url, - event=message_json, - status_code=resp.status_code)) + if secret != "" and secret is not None: + hashval = get_jwt_token(secret, event_type, event_ts) + + if hashval: + http_headers["Authorization"] = "Bearer " + hashval + + verify = True + while True: + try: + resp = requests.post(url, headers=http_headers, + data=message_json, + verify=verify) + # Successful webhook push + message_queue.task_done() + if resp.status_code != 200: + logger.warn("Event push failed to URL: {url}, " + "Event: {event}, " + "Status Code: {status_code}".format( + url=url, + event=message_json, + status_code=resp.status_code)) + break + except requests.exceptions.SSLError as e: + # If verify is equal to cert path, but still failed with + # SSLError, Looks like some issue with custom downloaded + # certificate, Try with verify = false + if verify == cert_path: + logger.warn("Event push failed with certificate, " + "ignoring verification url={0} " + "Error={1}".format(url, e)) + verify = False + continue + + # If verify is instance of bool and True, then custom cert + # is required, download the cert and retry + try: + save_https_cert(domain, port, cert_path) + verify = cert_path + except Exception as ex: + verify = False + logger.warn("Unable to get Server certificate, " + "ignoring verification url={0} " + "Error={1}".format(url, ex)) + + # Done with collecting cert, continue + continue + except Exception as e: + logger.warn("Event push failed to URL: {url}, " + "Event: {event}, " + "Status: {error}".format( + url=url, + event=message_json, + error=e)) + message_queue.task_done() + break + + +def plugin_webhook(message): + message_json = json.dumps(message, sort_keys=True) + logger.debug("EVENT: {0}".format(message_json)) + webhooks_pool.send(message["event"], message["ts"], message_json) class LockedOpen(object): @@ -247,3 +386,60 @@ class PidFile(object): def __exit__(self, _exc_type, _exc_value, _traceback): self.cleanup() + + +def webhook_monitor(proc_queue, webhooks): + queues = {} + for url, data in webhooks.items(): + if isinstance(data, str): + token = data + secret = None + else: + token = data["token"] + secret = data["secret"] + + queues[url] = Queue() + t = Thread(target=publish_to_webhook, args=(url, token, secret, + queues[url])) + t.start() + + # Get the message sent to Process queue and distribute to all thread queues + while True: + message = proc_queue.get() + for _, q in queues.items(): + q.put(message) + + +class WebhookThreadPool(object): + def start(self): + # Separate process to emit messages to webhooks + # which maintains one thread per webhook. Separate + # process is required since on reload we need to stop + # and start the thread pool. In Python Threads can't be stopped + # so terminate the process and start again. Note: In transit + # events will be missed during reload + self.queue = multiprocessing.Queue() + self.proc = multiprocessing.Process(target=webhook_monitor, + args=(self.queue, _webhooks)) + self.proc.start() + + def restart(self): + # In transit messages are skipped, since webhooks monitor process + # is terminated. + self.proc.terminate() + self.start() + + def send(self, event_type, event_ts, message): + self.queue.put((event_type, event_ts, message)) + + +def init_webhook_pool(): + global webhooks_pool + webhooks_pool = WebhookThreadPool() + webhooks_pool.start() + + +def restart_webhook_pool(): + global webhooks_pool + if webhooks_pool is not None: + webhooks_pool.restart() |
