From 99458974b7d12bd12d78d4b9a19adfb62d771b5c Mon Sep 17 00:00:00 2001 From: Aravinda VK Date: Tue, 29 Nov 2016 16:32:54 +0530 Subject: eventsapi: Push Messages to Webhooks in parallel With this patch, glustereventsd will maintain one thread per webhook. If a webhook is slow, then all the events to that worker will be delayed but it will not affect the other webhooks. Note: Webhook in transit may get missed if glustereventsd reloads due to new Webhook addition or due configuration changes. BUG: 1357754 Change-Id: I2d11e01c7ac434355bc356ff75396252f51b339b Signed-off-by: Aravinda VK Reviewed-on: http://review.gluster.org/15966 Smoke: Gluster Build System Reviewed-by: Prashanth Pai NetBSD-regression: NetBSD Build System CentOS-regression: Gluster Build System --- events/src/glustereventsd.py | 2 ++ events/src/utils.py | 70 ++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 67 insertions(+), 5 deletions(-) (limited to 'events') diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py index 86e64b01ad5..4b56eee9131 100644 --- a/events/src/glustereventsd.py +++ b/events/src/glustereventsd.py @@ -80,11 +80,13 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler): def signal_handler_sigusr2(sig, frame): utils.load_all() + utils.restart_webhook_pool() def init_event_server(): utils.setup_logger() utils.load_all() + utils.init_webhook_pool() port = utils.get_config("port") if port is None: diff --git a/events/src/utils.py b/events/src/utils.py index e69d6577ff0..256cfca0fc2 100644 --- a/events/src/utils.py +++ b/events/src/utils.py @@ -14,6 +14,9 @@ import os import logging import fcntl from errno import ESRCH, EBADF +from threading import Thread +import multiprocessing +from Queue import Queue from eventsapiconf import (LOG_FILE, WEBHOOKS_FILE, @@ -34,6 +37,7 @@ _config = {} # Init Logger instance logger = logging.getLogger(__name__) NodeID = None +webhooks_pool = None def boolify(value): @@ -170,14 +174,13 @@ def autoload_webhooks(): load_webhooks() -def plugin_webhook(message): +def publish_to_webhook(url, token, message_queue): # Import requests here since not used in any other place import requests - message_json = json.dumps(message, sort_keys=True) - logger.debug("EVENT: {0}".format(message_json)) - for url, token in _webhooks.items(): - http_headers = {"Content-Type": "application/json"} + http_headers = {"Content-Type": "application/json"} + while True: + message_json = message_queue.get() if token != "" and token is not None: http_headers["Authorization"] = "Bearer " + token @@ -191,6 +194,8 @@ def plugin_webhook(message): event=message_json, error=e)) continue + finally: + message_queue.task_done() if resp.status_code != 200: logger.warn("Event push failed to URL: {url}, " @@ -201,6 +206,12 @@ def plugin_webhook(message): status_code=resp.status_code)) +def plugin_webhook(message): + message_json = json.dumps(message, sort_keys=True) + logger.debug("EVENT: {0}".format(message_json)) + webhooks_pool.send(message_json) + + class LockedOpen(object): def __init__(self, filename, *args, **kwargs): @@ -274,3 +285,52 @@ class PidFile(object): def __exit__(self, _exc_type, _exc_value, _traceback): self.cleanup() + + +def webhook_monitor(proc_queue, webhooks): + queues = {} + for url, token in webhooks.items(): + queues[url] = Queue() + t = Thread(target=publish_to_webhook, args=(url, token, queues[url])) + t.start() + + # Get the message sent to Process queue and distribute to all thread queues + while True: + message = proc_queue.get() + for _, q in queues.items(): + q.put(message) + + +class WebhookThreadPool(object): + def start(self): + # Seperate process to emit messages to webhooks + # which maintains one thread per webhook. Seperate + # process is required since on reload we need to stop + # and start the thread pool. In Python Threads can't be stopped + # so terminate the process and start again. Note: In transit + # events will be missed during reload + self.queue = multiprocessing.Queue() + self.proc = multiprocessing.Process(target=webhook_monitor, + args=(self.queue, _webhooks)) + self.proc.start() + + def restart(self): + # In transit messages are skipped, since webhooks monitor process + # is terminated. + self.proc.terminate() + self.start() + + def send(self, message): + self.queue.put(message) + + +def init_webhook_pool(): + global webhooks_pool + webhooks_pool = WebhookThreadPool() + webhooks_pool.start() + + +def restart_webhook_pool(): + global webhooks_pool + if webhooks_pool is not None: + webhooks_pool.restart() -- cgit