diff options
Diffstat (limited to 'events/src/glustereventsd.py')
-rw-r--r-- | events/src/glustereventsd.py | 95 |
1 files changed, 23 insertions, 72 deletions
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py index 3fa57686a8b..91a0743ff22 100644 --- a/events/src/glustereventsd.py +++ b/events/src/glustereventsd.py @@ -11,12 +11,10 @@ # from __future__ import print_function -import asyncore -import socket -import os -from multiprocessing import Process, Queue import sys import signal +import SocketServer +import socket from eventtypes import all_events import handlers @@ -24,26 +22,19 @@ import utils from eventsapiconf import SERVER_ADDRESS from utils import logger -# Global Queue, EventsHandler will add items to the queue -# and process_event will gets each item and handles it -events_queue = Queue() -events_server_pid = None +class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler): -def process_event(): - """ - Seperate process which handles all the incoming events from Gluster - processes. - """ - while True: - data = events_queue.get() - logger.debug("EVENT: {0}".format(repr(data))) + def handle(self): + data = self.request[0].strip() + logger.debug("EVENT: {0} from {1}".format(repr(data), + self.client_address[0])) try: # Event Format <TIMESTAMP> <TYPE> <DETAIL> ts, key, value = data.split(" ", 2) except ValueError: logger.warn("Invalid Event Format {0}".format(data)) - continue + return data_dict = {} try: @@ -51,7 +42,7 @@ def process_event(): data_dict = dict(x.split('=') for x in value.split(';')) except ValueError: logger.warn("Unable to parse Event {0}".format(data)) - continue + return try: # Event Type to Function Map, Recieved event data will be in @@ -75,68 +66,28 @@ def process_event(): handlers.generic_handler(ts, int(key), data_dict) -def process_event_wrapper(): - try: - process_event() - except KeyboardInterrupt: - return - - -class GlusterEventsHandler(asyncore.dispatcher_with_send): - - def handle_read(self): - data = self.recv(8192) - if data: - events_queue.put(data) - self.send(data) - - -class GlusterEventsServer(asyncore.dispatcher): - - def __init__(self): - global events_server_pid - asyncore.dispatcher.__init__(self) - # Start the Events listener process which listens to - # the global queue - p = Process(target=process_event_wrapper) - p.start() - events_server_pid = p.pid - - # Create UNIX Domain Socket, bind to path - self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.bind(SERVER_ADDRESS) - self.listen(5) - - def handle_accept(self): - pair = self.accept() - if pair is not None: - sock, addr = pair - GlusterEventsHandler(sock) - - def signal_handler_sigusr2(sig, frame): - if events_server_pid is not None: - os.kill(events_server_pid, signal.SIGUSR2) utils.load_all() def init_event_server(): utils.setup_logger() - - # Delete Socket file if Exists - try: - os.unlink(SERVER_ADDRESS) - except OSError: - if os.path.exists(SERVER_ADDRESS): - print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS), - file=sys.stderr) - sys.exit(1) - utils.load_all() - # Start the Eventing Server, UNIX DOMAIN SOCKET Server - GlusterEventsServer() - asyncore.loop() + port = utils.get_config("port") + if port is None: + sys.stderr.write("Unable to get Port details from Config\n") + sys.exit(1) + + # Start the Eventing Server, UDP Server + try: + server = SocketServer.ThreadingUDPServer( + (SERVER_ADDRESS, port), + GlusterEventsRequestHandler) + except socket.error as e: + sys.stderr.write("Failed to start Eventsd: {0}\n".format(e)) + sys.exit(1) + server.serve_forever() def main(): |