summaryrefslogtreecommitdiffstats
path: root/events/src/glustereventsd.py
diff options
context:
space:
mode:
Diffstat (limited to 'events/src/glustereventsd.py')
-rw-r--r--events/src/glustereventsd.py95
1 files changed, 23 insertions, 72 deletions
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
index 3fa57686a8b..91a0743ff22 100644
--- a/events/src/glustereventsd.py
+++ b/events/src/glustereventsd.py
@@ -11,12 +11,10 @@
#
from __future__ import print_function
-import asyncore
-import socket
-import os
-from multiprocessing import Process, Queue
import sys
import signal
+import SocketServer
+import socket
from eventtypes import all_events
import handlers
@@ -24,26 +22,19 @@ import utils
from eventsapiconf import SERVER_ADDRESS
from utils import logger
-# Global Queue, EventsHandler will add items to the queue
-# and process_event will gets each item and handles it
-events_queue = Queue()
-events_server_pid = None
+class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
-def process_event():
- """
- Seperate process which handles all the incoming events from Gluster
- processes.
- """
- while True:
- data = events_queue.get()
- logger.debug("EVENT: {0}".format(repr(data)))
+ def handle(self):
+ data = self.request[0].strip()
+ logger.debug("EVENT: {0} from {1}".format(repr(data),
+ self.client_address[0]))
try:
# Event Format <TIMESTAMP> <TYPE> <DETAIL>
ts, key, value = data.split(" ", 2)
except ValueError:
logger.warn("Invalid Event Format {0}".format(data))
- continue
+ return
data_dict = {}
try:
@@ -51,7 +42,7 @@ def process_event():
data_dict = dict(x.split('=') for x in value.split(';'))
except ValueError:
logger.warn("Unable to parse Event {0}".format(data))
- continue
+ return
try:
# Event Type to Function Map, Recieved event data will be in
@@ -75,68 +66,28 @@ def process_event():
handlers.generic_handler(ts, int(key), data_dict)
-def process_event_wrapper():
- try:
- process_event()
- except KeyboardInterrupt:
- return
-
-
-class GlusterEventsHandler(asyncore.dispatcher_with_send):
-
- def handle_read(self):
- data = self.recv(8192)
- if data:
- events_queue.put(data)
- self.send(data)
-
-
-class GlusterEventsServer(asyncore.dispatcher):
-
- def __init__(self):
- global events_server_pid
- asyncore.dispatcher.__init__(self)
- # Start the Events listener process which listens to
- # the global queue
- p = Process(target=process_event_wrapper)
- p.start()
- events_server_pid = p.pid
-
- # Create UNIX Domain Socket, bind to path
- self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.bind(SERVER_ADDRESS)
- self.listen(5)
-
- def handle_accept(self):
- pair = self.accept()
- if pair is not None:
- sock, addr = pair
- GlusterEventsHandler(sock)
-
-
def signal_handler_sigusr2(sig, frame):
- if events_server_pid is not None:
- os.kill(events_server_pid, signal.SIGUSR2)
utils.load_all()
def init_event_server():
utils.setup_logger()
-
- # Delete Socket file if Exists
- try:
- os.unlink(SERVER_ADDRESS)
- except OSError:
- if os.path.exists(SERVER_ADDRESS):
- print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS),
- file=sys.stderr)
- sys.exit(1)
-
utils.load_all()
- # Start the Eventing Server, UNIX DOMAIN SOCKET Server
- GlusterEventsServer()
- asyncore.loop()
+ port = utils.get_config("port")
+ if port is None:
+ sys.stderr.write("Unable to get Port details from Config\n")
+ sys.exit(1)
+
+ # Start the Eventing Server, UDP Server
+ try:
+ server = SocketServer.ThreadingUDPServer(
+ (SERVER_ADDRESS, port),
+ GlusterEventsRequestHandler)
+ except socket.error as e:
+ sys.stderr.write("Failed to start Eventsd: {0}\n".format(e))
+ sys.exit(1)
+ server.serve_forever()
def main():