summaryrefslogtreecommitdiffstats
path: root/events/src
diff options
context:
space:
mode:
Diffstat (limited to 'events/src')
-rw-r--r--events/src/glustereventsd.py2
-rw-r--r--events/src/utils.py70
2 files changed, 67 insertions, 5 deletions
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
index 86e64b01ad5..4b56eee9131 100644
--- a/events/src/glustereventsd.py
+++ b/events/src/glustereventsd.py
@@ -80,11 +80,13 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
def signal_handler_sigusr2(sig, frame):
utils.load_all()
+ utils.restart_webhook_pool()
def init_event_server():
utils.setup_logger()
utils.load_all()
+ utils.init_webhook_pool()
port = utils.get_config("port")
if port is None:
diff --git a/events/src/utils.py b/events/src/utils.py
index e69d6577ff0..256cfca0fc2 100644
--- a/events/src/utils.py
+++ b/events/src/utils.py
@@ -14,6 +14,9 @@ import os
import logging
import fcntl
from errno import ESRCH, EBADF
+from threading import Thread
+import multiprocessing
+from Queue import Queue
from eventsapiconf import (LOG_FILE,
WEBHOOKS_FILE,
@@ -34,6 +37,7 @@ _config = {}
# Init Logger instance
logger = logging.getLogger(__name__)
NodeID = None
+webhooks_pool = None
def boolify(value):
@@ -170,14 +174,13 @@ def autoload_webhooks():
load_webhooks()
-def plugin_webhook(message):
+def publish_to_webhook(url, token, message_queue):
# Import requests here since not used in any other place
import requests
- message_json = json.dumps(message, sort_keys=True)
- logger.debug("EVENT: {0}".format(message_json))
- for url, token in _webhooks.items():
- http_headers = {"Content-Type": "application/json"}
+ http_headers = {"Content-Type": "application/json"}
+ while True:
+ message_json = message_queue.get()
if token != "" and token is not None:
http_headers["Authorization"] = "Bearer " + token
@@ -191,6 +194,8 @@ def plugin_webhook(message):
event=message_json,
error=e))
continue
+ finally:
+ message_queue.task_done()
if resp.status_code != 200:
logger.warn("Event push failed to URL: {url}, "
@@ -201,6 +206,12 @@ def plugin_webhook(message):
status_code=resp.status_code))
+def plugin_webhook(message):
+ message_json = json.dumps(message, sort_keys=True)
+ logger.debug("EVENT: {0}".format(message_json))
+ webhooks_pool.send(message_json)
+
+
class LockedOpen(object):
def __init__(self, filename, *args, **kwargs):
@@ -274,3 +285,52 @@ class PidFile(object):
def __exit__(self, _exc_type, _exc_value, _traceback):
self.cleanup()
+
+
+def webhook_monitor(proc_queue, webhooks):
+ queues = {}
+ for url, token in webhooks.items():
+ queues[url] = Queue()
+ t = Thread(target=publish_to_webhook, args=(url, token, queues[url]))
+ t.start()
+
+ # Get the message sent to Process queue and distribute to all thread queues
+ while True:
+ message = proc_queue.get()
+ for _, q in queues.items():
+ q.put(message)
+
+
+class WebhookThreadPool(object):
+ def start(self):
+ # Seperate process to emit messages to webhooks
+ # which maintains one thread per webhook. Seperate
+ # process is required since on reload we need to stop
+ # and start the thread pool. In Python Threads can't be stopped
+ # so terminate the process and start again. Note: In transit
+ # events will be missed during reload
+ self.queue = multiprocessing.Queue()
+ self.proc = multiprocessing.Process(target=webhook_monitor,
+ args=(self.queue, _webhooks))
+ self.proc.start()
+
+ def restart(self):
+ # In transit messages are skipped, since webhooks monitor process
+ # is terminated.
+ self.proc.terminate()
+ self.start()
+
+ def send(self, message):
+ self.queue.put(message)
+
+
+def init_webhook_pool():
+ global webhooks_pool
+ webhooks_pool = WebhookThreadPool()
+ webhooks_pool.start()
+
+
+def restart_webhook_pool():
+ global webhooks_pool
+ if webhooks_pool is not None:
+ webhooks_pool.restart()