diff options
Diffstat (limited to 'events/src/utils.py')
| -rw-r--r-- | events/src/utils.py | 178 |
1 files changed, 139 insertions, 39 deletions
diff --git a/events/src/utils.py b/events/src/utils.py index 2a77b13d502..6d4e0791a2b 100644 --- a/events/src/utils.py +++ b/events/src/utils.py @@ -9,21 +9,34 @@ # cases as published by the Free Software Foundation. # +import sys import json import os import logging +import logging.handlers import fcntl -from errno import ESRCH, EBADF +from errno import EBADF from threading import Thread import multiprocessing -from Queue import Queue - -from eventsapiconf import (LOG_FILE, - WEBHOOKS_FILE, - DEFAULT_CONFIG_FILE, - CUSTOM_CONFIG_FILE, - UUID_FILE) -import eventtypes +try: + from queue import Queue +except ImportError: + from Queue import Queue +from datetime import datetime, timedelta +import base64 +import hmac +from hashlib import sha256 +from calendar import timegm + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from gfevents.eventsapiconf import (LOG_FILE, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE, + UUID_FILE, + CERTS_DIR) +from gfevents import eventtypes # Webhooks list @@ -86,7 +99,7 @@ def setup_logger(): logger.setLevel(logging.INFO) # create the logging file handler - fh = logging.FileHandler(LOG_FILE) + fh = logging.handlers.WatchedFileHandler(LOG_FILE) formatter = logging.Formatter("[%(asctime)s] %(levelname)s " "[%(module)s - %(lineno)s:%(funcName)s] " @@ -183,42 +196,121 @@ def autoload_webhooks(): load_webhooks() -def publish_to_webhook(url, token, message_queue): +def base64_urlencode(inp): + return base64.urlsafe_b64encode(inp).replace("=", "").strip() + + +def get_jwt_token(secret, event_type, event_ts, jwt_expiry_time_seconds=60): + exp = datetime.utcnow() + timedelta(seconds=jwt_expiry_time_seconds) + payload = { + "exp": timegm(exp.utctimetuple()), + "iss": "gluster", + "sub": event_type, + "iat": event_ts + } + header = '{"alg":"HS256","typ":"JWT"}' + payload = json.dumps(payload, separators=(',', ':'), sort_keys=True) + msg = base64_urlencode(header) + "." + base64_urlencode(payload) + return "%s.%s" % ( + msg, + base64_urlencode(hmac.HMAC(str(secret), msg, sha256).digest()) + ) + + +def save_https_cert(domain, port, cert_path): + import ssl + + # Cert file already available for this URL + if os.path.exists(cert_path): + return + + cert_data = ssl.get_server_certificate((domain, port)) + with open(cert_path, "w") as f: + f.write(cert_data) + + +def publish_to_webhook(url, token, secret, message_queue): # Import requests here since not used in any other place import requests http_headers = {"Content-Type": "application/json"} + urldata = requests.utils.urlparse(url) + parts = urldata.netloc.split(":") + domain = parts[0] + # Default https port if not specified + port = 443 + if len(parts) == 2: + port = int(parts[1]) + + cert_path = os.path.join(CERTS_DIR, url.replace("/", "_").strip()) + while True: - message_json = message_queue.get() + hashval = "" + event_type, event_ts, message_json = message_queue.get() if token != "" and token is not None: - http_headers["Authorization"] = "Bearer " + token + hashval = token - try: - resp = requests.post(url, headers=http_headers, data=message_json) - except requests.ConnectionError as e: - logger.warn("Event push failed to URL: {url}, " - "Event: {event}, " - "Status: {error}".format( - url=url, - event=message_json, - error=e)) - continue - finally: - message_queue.task_done() + if secret != "" and secret is not None: + hashval = get_jwt_token(secret, event_type, event_ts) - if resp.status_code != 200: - logger.warn("Event push failed to URL: {url}, " - "Event: {event}, " - "Status Code: {status_code}".format( - url=url, - event=message_json, - status_code=resp.status_code)) + if hashval: + http_headers["Authorization"] = "Bearer " + hashval + + verify = True + while True: + try: + resp = requests.post(url, headers=http_headers, + data=message_json, + verify=verify) + # Successful webhook push + message_queue.task_done() + if resp.status_code != 200: + logger.warn("Event push failed to URL: {url}, " + "Event: {event}, " + "Status Code: {status_code}".format( + url=url, + event=message_json, + status_code=resp.status_code)) + break + except requests.exceptions.SSLError as e: + # If verify is equal to cert path, but still failed with + # SSLError, Looks like some issue with custom downloaded + # certificate, Try with verify = false + if verify == cert_path: + logger.warn("Event push failed with certificate, " + "ignoring verification url={0} " + "Error={1}".format(url, e)) + verify = False + continue + + # If verify is instance of bool and True, then custom cert + # is required, download the cert and retry + try: + save_https_cert(domain, port, cert_path) + verify = cert_path + except Exception as ex: + verify = False + logger.warn("Unable to get Server certificate, " + "ignoring verification url={0} " + "Error={1}".format(url, ex)) + + # Done with collecting cert, continue + continue + except Exception as e: + logger.warn("Event push failed to URL: {url}, " + "Event: {event}, " + "Status: {error}".format( + url=url, + event=message_json, + error=e)) + message_queue.task_done() + break def plugin_webhook(message): message_json = json.dumps(message, sort_keys=True) logger.debug("EVENT: {0}".format(message_json)) - webhooks_pool.send(message_json) + webhooks_pool.send(message["event"], message["ts"], message_json) class LockedOpen(object): @@ -298,9 +390,17 @@ class PidFile(object): def webhook_monitor(proc_queue, webhooks): queues = {} - for url, token in webhooks.items(): + for url, data in webhooks.items(): + if isinstance(data, str): + token = data + secret = None + else: + token = data["token"] + secret = data["secret"] + queues[url] = Queue() - t = Thread(target=publish_to_webhook, args=(url, token, queues[url])) + t = Thread(target=publish_to_webhook, args=(url, token, secret, + queues[url])) t.start() # Get the message sent to Process queue and distribute to all thread queues @@ -312,8 +412,8 @@ def webhook_monitor(proc_queue, webhooks): class WebhookThreadPool(object): def start(self): - # Seperate process to emit messages to webhooks - # which maintains one thread per webhook. Seperate + # Separate process to emit messages to webhooks + # which maintains one thread per webhook. Separate # process is required since on reload we need to stop # and start the thread pool. In Python Threads can't be stopped # so terminate the process and start again. Note: In transit @@ -329,8 +429,8 @@ class WebhookThreadPool(object): self.proc.terminate() self.start() - def send(self, message): - self.queue.put(message) + def send(self, event_type, event_ts, message): + self.queue.put((event_type, event_ts, message)) def init_webhook_pool(): |
