summaryrefslogtreecommitdiffstats
path: root/events/src/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'events/src/utils.py')
-rw-r--r--events/src/utils.py178
1 files changed, 139 insertions, 39 deletions
diff --git a/events/src/utils.py b/events/src/utils.py
index 2a77b13d502..6d4e0791a2b 100644
--- a/events/src/utils.py
+++ b/events/src/utils.py
@@ -9,21 +9,34 @@
# cases as published by the Free Software Foundation.
#
+import sys
import json
import os
import logging
+import logging.handlers
import fcntl
-from errno import ESRCH, EBADF
+from errno import EBADF
from threading import Thread
import multiprocessing
-from Queue import Queue
-
-from eventsapiconf import (LOG_FILE,
- WEBHOOKS_FILE,
- DEFAULT_CONFIG_FILE,
- CUSTOM_CONFIG_FILE,
- UUID_FILE)
-import eventtypes
+try:
+ from queue import Queue
+except ImportError:
+ from Queue import Queue
+from datetime import datetime, timedelta
+import base64
+import hmac
+from hashlib import sha256
+from calendar import timegm
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from gfevents.eventsapiconf import (LOG_FILE,
+ WEBHOOKS_FILE,
+ DEFAULT_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE,
+ UUID_FILE,
+ CERTS_DIR)
+from gfevents import eventtypes
# Webhooks list
@@ -86,7 +99,7 @@ def setup_logger():
logger.setLevel(logging.INFO)
# create the logging file handler
- fh = logging.FileHandler(LOG_FILE)
+ fh = logging.handlers.WatchedFileHandler(LOG_FILE)
formatter = logging.Formatter("[%(asctime)s] %(levelname)s "
"[%(module)s - %(lineno)s:%(funcName)s] "
@@ -183,42 +196,121 @@ def autoload_webhooks():
load_webhooks()
-def publish_to_webhook(url, token, message_queue):
+def base64_urlencode(inp):
+ return base64.urlsafe_b64encode(inp).replace("=", "").strip()
+
+
+def get_jwt_token(secret, event_type, event_ts, jwt_expiry_time_seconds=60):
+ exp = datetime.utcnow() + timedelta(seconds=jwt_expiry_time_seconds)
+ payload = {
+ "exp": timegm(exp.utctimetuple()),
+ "iss": "gluster",
+ "sub": event_type,
+ "iat": event_ts
+ }
+ header = '{"alg":"HS256","typ":"JWT"}'
+ payload = json.dumps(payload, separators=(',', ':'), sort_keys=True)
+ msg = base64_urlencode(header) + "." + base64_urlencode(payload)
+ return "%s.%s" % (
+ msg,
+ base64_urlencode(hmac.HMAC(str(secret), msg, sha256).digest())
+ )
+
+
+def save_https_cert(domain, port, cert_path):
+ import ssl
+
+ # Cert file already available for this URL
+ if os.path.exists(cert_path):
+ return
+
+ cert_data = ssl.get_server_certificate((domain, port))
+ with open(cert_path, "w") as f:
+ f.write(cert_data)
+
+
+def publish_to_webhook(url, token, secret, message_queue):
# Import requests here since not used in any other place
import requests
http_headers = {"Content-Type": "application/json"}
+ urldata = requests.utils.urlparse(url)
+ parts = urldata.netloc.split(":")
+ domain = parts[0]
+ # Default https port if not specified
+ port = 443
+ if len(parts) == 2:
+ port = int(parts[1])
+
+ cert_path = os.path.join(CERTS_DIR, url.replace("/", "_").strip())
+
while True:
- message_json = message_queue.get()
+ hashval = ""
+ event_type, event_ts, message_json = message_queue.get()
if token != "" and token is not None:
- http_headers["Authorization"] = "Bearer " + token
+ hashval = token
- try:
- resp = requests.post(url, headers=http_headers, data=message_json)
- except requests.ConnectionError as e:
- logger.warn("Event push failed to URL: {url}, "
- "Event: {event}, "
- "Status: {error}".format(
- url=url,
- event=message_json,
- error=e))
- continue
- finally:
- message_queue.task_done()
+ if secret != "" and secret is not None:
+ hashval = get_jwt_token(secret, event_type, event_ts)
- if resp.status_code != 200:
- logger.warn("Event push failed to URL: {url}, "
- "Event: {event}, "
- "Status Code: {status_code}".format(
- url=url,
- event=message_json,
- status_code=resp.status_code))
+ if hashval:
+ http_headers["Authorization"] = "Bearer " + hashval
+
+ verify = True
+ while True:
+ try:
+ resp = requests.post(url, headers=http_headers,
+ data=message_json,
+ verify=verify)
+ # Successful webhook push
+ message_queue.task_done()
+ if resp.status_code != 200:
+ logger.warn("Event push failed to URL: {url}, "
+ "Event: {event}, "
+ "Status Code: {status_code}".format(
+ url=url,
+ event=message_json,
+ status_code=resp.status_code))
+ break
+ except requests.exceptions.SSLError as e:
+ # If verify is equal to cert path, but still failed with
+ # SSLError, Looks like some issue with custom downloaded
+ # certificate, Try with verify = false
+ if verify == cert_path:
+ logger.warn("Event push failed with certificate, "
+ "ignoring verification url={0} "
+ "Error={1}".format(url, e))
+ verify = False
+ continue
+
+ # If verify is instance of bool and True, then custom cert
+ # is required, download the cert and retry
+ try:
+ save_https_cert(domain, port, cert_path)
+ verify = cert_path
+ except Exception as ex:
+ verify = False
+ logger.warn("Unable to get Server certificate, "
+ "ignoring verification url={0} "
+ "Error={1}".format(url, ex))
+
+ # Done with collecting cert, continue
+ continue
+ except Exception as e:
+ logger.warn("Event push failed to URL: {url}, "
+ "Event: {event}, "
+ "Status: {error}".format(
+ url=url,
+ event=message_json,
+ error=e))
+ message_queue.task_done()
+ break
def plugin_webhook(message):
message_json = json.dumps(message, sort_keys=True)
logger.debug("EVENT: {0}".format(message_json))
- webhooks_pool.send(message_json)
+ webhooks_pool.send(message["event"], message["ts"], message_json)
class LockedOpen(object):
@@ -298,9 +390,17 @@ class PidFile(object):
def webhook_monitor(proc_queue, webhooks):
queues = {}
- for url, token in webhooks.items():
+ for url, data in webhooks.items():
+ if isinstance(data, str):
+ token = data
+ secret = None
+ else:
+ token = data["token"]
+ secret = data["secret"]
+
queues[url] = Queue()
- t = Thread(target=publish_to_webhook, args=(url, token, queues[url]))
+ t = Thread(target=publish_to_webhook, args=(url, token, secret,
+ queues[url]))
t.start()
# Get the message sent to Process queue and distribute to all thread queues
@@ -312,8 +412,8 @@ def webhook_monitor(proc_queue, webhooks):
class WebhookThreadPool(object):
def start(self):
- # Seperate process to emit messages to webhooks
- # which maintains one thread per webhook. Seperate
+ # Separate process to emit messages to webhooks
+ # which maintains one thread per webhook. Separate
# process is required since on reload we need to stop
# and start the thread pool. In Python Threads can't be stopped
# so terminate the process and start again. Note: In transit
@@ -329,8 +429,8 @@ class WebhookThreadPool(object):
self.proc.terminate()
self.start()
- def send(self, message):
- self.queue.put(message)
+ def send(self, event_type, event_ts, message):
+ self.queue.put((event_type, event_ts, message))
def init_webhook_pool():