summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--events/eventskeygen.py4
-rw-r--r--events/src/eventsapiconf.py.in8
-rw-r--r--events/src/eventsconfig.json3
-rw-r--r--events/src/gf_event.py20
-rw-r--r--events/src/glustereventsd.py95
-rw-r--r--events/src/peer_eventsapi.py4
-rw-r--r--events/src/utils.py28
-rw-r--r--extras/firewalld/glusterfs.xml1
-rw-r--r--libglusterfs/src/events.c90
9 files changed, 139 insertions, 114 deletions
diff --git a/events/eventskeygen.py b/events/eventskeygen.py
index 30b518dc18a..9d8a97c3030 100644
--- a/events/eventskeygen.py
+++ b/events/eventskeygen.py
@@ -102,7 +102,9 @@ ERRORS = (
"EVENT_ERROR_INVALID_INPUTS",
"EVENT_ERROR_SOCKET",
"EVENT_ERROR_CONNECT",
- "EVENT_ERROR_SEND"
+ "EVENT_ERROR_SEND",
+ "EVENT_ERROR_RESOLVE",
+ "EVENT_ERROR_MSG_FORMAT",
)
if gen_header_type == "C_HEADER":
diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in
index 03dd0e8d5d7..fad96ca2cc7 100644
--- a/events/src/eventsapiconf.py.in
+++ b/events/src/eventsapiconf.py.in
@@ -9,7 +9,7 @@
# cases as published by the Free Software Foundation.
#
-SERVER_ADDRESS = "@localstatedir@/run/gluster/events.sock"
+SERVER_ADDRESS = "0.0.0.0"
DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json"
CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json"
CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC
@@ -17,7 +17,9 @@ WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json"
WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC
LOG_FILE = "@localstatedir@/log/glusterfs/events.log"
EVENTSD = "glustereventsd"
-CONFIG_KEYS = ["log_level"]
+CONFIG_KEYS = ["log_level", "port"]
BOOL_CONFIGS = []
-RESTART_CONFIGS = []
+INT_CONFIGS = ["port"]
+RESTART_CONFIGS = ["port"]
EVENTS_ENABLED = @EVENTS_ENABLED@
+UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info"
diff --git a/events/src/eventsconfig.json b/events/src/eventsconfig.json
index ce2c775f0bd..45730f9bb83 100644
--- a/events/src/eventsconfig.json
+++ b/events/src/eventsconfig.json
@@ -1,3 +1,4 @@
{
- "log_level": "INFO"
+ "log_level": "INFO",
+ "port": 24009
}
diff --git a/events/src/gf_event.py b/events/src/gf_event.py
index 20dfc8a4f17..f9ece6adc28 100644
--- a/events/src/gf_event.py
+++ b/events/src/gf_event.py
@@ -16,7 +16,7 @@ import time
from eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED
from eventtypes import all_events
-from utils import logger, setup_logger
+from utils import logger, setup_logger, get_config
# Run this when this lib loads
setup_logger()
@@ -31,10 +31,9 @@ def gf_event(event_type, **kwargs):
return
try:
- client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- client.connect(SERVER_ADDRESS)
+ client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
except socket.error as e:
- logger.error("Unable to connect to events.sock: {0}".format(e))
+ logger.error("Unable to connect to events Server: {0}".format(e))
return
# Convert key value args into KEY1=VALUE1;KEY2=VALUE2;..
@@ -45,7 +44,18 @@ def gf_event(event_type, **kwargs):
# <TIMESTAMP> <EVENT_TYPE> <MSG>
msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";"))
+ port = get_config("port")
+ if port is None:
+ logger.error("Unable to get eventsd port details")
+ return
+
try:
- client.sendall(msg)
+ sent = client.sendto(msg, (SERVER_ADDRESS, port))
+ assert sent == len(msg)
except socket.error as e:
logger.error("Unable to Send message: {0}".format(e))
+ except AssertionError:
+ logger.error("Unable to send message. Sent: {0}, Actual: {1}".format(
+ sent, len(msg)))
+ finally:
+ client.close()
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
index 3fa57686a8b..91a0743ff22 100644
--- a/events/src/glustereventsd.py
+++ b/events/src/glustereventsd.py
@@ -11,12 +11,10 @@
#
from __future__ import print_function
-import asyncore
-import socket
-import os
-from multiprocessing import Process, Queue
import sys
import signal
+import SocketServer
+import socket
from eventtypes import all_events
import handlers
@@ -24,26 +22,19 @@ import utils
from eventsapiconf import SERVER_ADDRESS
from utils import logger
-# Global Queue, EventsHandler will add items to the queue
-# and process_event will gets each item and handles it
-events_queue = Queue()
-events_server_pid = None
+class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
-def process_event():
- """
- Seperate process which handles all the incoming events from Gluster
- processes.
- """
- while True:
- data = events_queue.get()
- logger.debug("EVENT: {0}".format(repr(data)))
+ def handle(self):
+ data = self.request[0].strip()
+ logger.debug("EVENT: {0} from {1}".format(repr(data),
+ self.client_address[0]))
try:
# Event Format <TIMESTAMP> <TYPE> <DETAIL>
ts, key, value = data.split(" ", 2)
except ValueError:
logger.warn("Invalid Event Format {0}".format(data))
- continue
+ return
data_dict = {}
try:
@@ -51,7 +42,7 @@ def process_event():
data_dict = dict(x.split('=') for x in value.split(';'))
except ValueError:
logger.warn("Unable to parse Event {0}".format(data))
- continue
+ return
try:
# Event Type to Function Map, Recieved event data will be in
@@ -75,68 +66,28 @@ def process_event():
handlers.generic_handler(ts, int(key), data_dict)
-def process_event_wrapper():
- try:
- process_event()
- except KeyboardInterrupt:
- return
-
-
-class GlusterEventsHandler(asyncore.dispatcher_with_send):
-
- def handle_read(self):
- data = self.recv(8192)
- if data:
- events_queue.put(data)
- self.send(data)
-
-
-class GlusterEventsServer(asyncore.dispatcher):
-
- def __init__(self):
- global events_server_pid
- asyncore.dispatcher.__init__(self)
- # Start the Events listener process which listens to
- # the global queue
- p = Process(target=process_event_wrapper)
- p.start()
- events_server_pid = p.pid
-
- # Create UNIX Domain Socket, bind to path
- self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.bind(SERVER_ADDRESS)
- self.listen(5)
-
- def handle_accept(self):
- pair = self.accept()
- if pair is not None:
- sock, addr = pair
- GlusterEventsHandler(sock)
-
-
def signal_handler_sigusr2(sig, frame):
- if events_server_pid is not None:
- os.kill(events_server_pid, signal.SIGUSR2)
utils.load_all()
def init_event_server():
utils.setup_logger()
-
- # Delete Socket file if Exists
- try:
- os.unlink(SERVER_ADDRESS)
- except OSError:
- if os.path.exists(SERVER_ADDRESS):
- print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS),
- file=sys.stderr)
- sys.exit(1)
-
utils.load_all()
- # Start the Eventing Server, UNIX DOMAIN SOCKET Server
- GlusterEventsServer()
- asyncore.loop()
+ port = utils.get_config("port")
+ if port is None:
+ sys.stderr.write("Unable to get Port details from Config\n")
+ sys.exit(1)
+
+ # Start the Eventing Server, UDP Server
+ try:
+ server = SocketServer.ThreadingUDPServer(
+ (SERVER_ADDRESS, port),
+ GlusterEventsRequestHandler)
+ except socket.error as e:
+ sys.stderr.write("Failed to start Eventsd: {0}\n".format(e))
+ sys.exit(1)
+ server.serve_forever()
def main():
diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py
index 7887d77351c..f4447784f90 100644
--- a/events/src/peer_eventsapi.py
+++ b/events/src/peer_eventsapi.py
@@ -31,6 +31,7 @@ from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
EVENTSD,
CONFIG_KEYS,
BOOL_CONFIGS,
+ INT_CONFIGS,
RESTART_CONFIGS)
@@ -462,6 +463,9 @@ class ConfigSetCmd(Cmd):
if args.name in BOOL_CONFIGS:
v = boolify(args.value)
+ if args.name in INT_CONFIGS:
+ v = int(args.value)
+
new_data[args.name] = v
file_content_overwrite(CUSTOM_CONFIG_FILE, new_data)
diff --git a/events/src/utils.py b/events/src/utils.py
index 772221a1e25..386e8f28449 100644
--- a/events/src/utils.py
+++ b/events/src/utils.py
@@ -17,11 +17,10 @@ import requests
from eventsapiconf import (LOG_FILE,
WEBHOOKS_FILE,
DEFAULT_CONFIG_FILE,
- CUSTOM_CONFIG_FILE)
+ CUSTOM_CONFIG_FILE,
+ UUID_FILE)
import eventtypes
-from gluster.cliutils import get_node_uuid
-
# Webhooks list
_webhooks = {}
@@ -32,6 +31,23 @@ _config = {}
# Init Logger instance
logger = logging.getLogger(__name__)
+NodeID = None
+
+
+def get_node_uuid():
+ val = None
+ with open(UUID_FILE) as f:
+ for line in f:
+ if line.startswith("UUID="):
+ val = line.strip().split("=")[-1]
+ break
+ return val
+
+
+def get_config(key):
+ if not _config:
+ load_config()
+ return _config.get(key, None)
def get_event_type_name(idx):
@@ -109,8 +125,12 @@ def load_all():
def publish(ts, event_key, data):
+ global NodeID
+ if NodeID is None:
+ NodeID = get_node_uuid()
+
message = {
- "nodeid": get_node_uuid(),
+ "nodeid": NodeID,
"ts": int(ts),
"event": get_event_type_name(event_key),
"message": data
diff --git a/extras/firewalld/glusterfs.xml b/extras/firewalld/glusterfs.xml
index f8efd90c3b5..7e176442f5b 100644
--- a/extras/firewalld/glusterfs.xml
+++ b/extras/firewalld/glusterfs.xml
@@ -4,6 +4,7 @@
<description>Default ports for gluster-distributed storage</description>
<port protocol="tcp" port="24007"/> <!--For glusterd -->
<port protocol="tcp" port="24008"/> <!--For glusterd RDMA port management -->
+<port protocol="tcp" port="24009"/> <!--For glustereventsd -->
<port protocol="tcp" port="38465"/> <!--Gluster NFS service -->
<port protocol="tcp" port="38466"/> <!--Gluster NFS service -->
<port protocol="tcp" port="38467"/> <!--Gluster NFS service -->
diff --git a/libglusterfs/src/events.c b/libglusterfs/src/events.c
index f93934de0fb..b7b513eb39a 100644
--- a/libglusterfs/src/events.c
+++ b/libglusterfs/src/events.c
@@ -16,73 +16,107 @@
#include <time.h>
#include <stdarg.h>
#include <string.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+
#include "syscall.h"
#include "mem-pool.h"
+#include "glusterfs.h"
+#include "globals.h"
#include "events.h"
-#define EVENT_PATH DATADIR "/run/gluster/events.sock"
-#define EVENTS_MSG_MAX 2048
+#define EVENT_HOST "127.0.0.1"
+#define EVENT_PORT 24009
int
gf_event (eventtypes_t event, char *fmt, ...)
{
- int ret = 0;
- int sock = -1;
- char eventstr[EVENTS_MSG_MAX] = "";
- struct sockaddr_un server;
- va_list arguments;
- char *msg = NULL;
- size_t eventstr_size = 0;
+ int ret = 0;
+ int sock = -1;
+ char *eventstr = NULL;
+ struct sockaddr_in server;
+ va_list arguments;
+ char *msg = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ struct hostent *host_data;
+ char *host = NULL;
+
+ /* Global context */
+ ctx = THIS->ctx;
if (event < 0 || event >= EVENT_LAST) {
ret = EVENT_ERROR_INVALID_INPUTS;
goto out;
}
- sock = socket(AF_UNIX, SOCK_STREAM, 0);
+ /* Initialize UDP socket */
+ sock = socket (AF_INET, SOCK_DGRAM, 0);
if (sock < 0) {
ret = EVENT_ERROR_SOCKET;
goto out;
}
- server.sun_family = AF_UNIX;
- strcpy(server.sun_path, EVENT_PATH);
- if (connect(sock,
- (struct sockaddr *) &server,
- sizeof(struct sockaddr_un)) < 0) {
- ret = EVENT_ERROR_CONNECT;
- goto out;
+ /* Get Host name to send message */
+ if (ctx && ctx->cmd_args.volfile_server) {
+ /* If it is client code then volfile_server is set
+ use that information to push the events. */
+ host_data = gethostbyname (ctx->cmd_args.volfile_server);
+ if (host_data == NULL) {
+ ret = EVENT_ERROR_RESOLVE;
+ goto out;
+ }
+ host = inet_ntoa (*(struct in_addr *)(host_data->h_addr));
+ } else {
+ /* Localhost, Use the defined IP for localhost */
+ host = EVENT_HOST;
}
+ /* Socket Configurations */
+ server.sin_family = AF_INET;
+ server.sin_port = htons (EVENT_PORT);
+ server.sin_addr.s_addr = inet_addr (host);
+ memset (&server.sin_zero, '\0', sizeof (server.sin_zero));
+
va_start (arguments, fmt);
ret = gf_vasprintf (&msg, fmt, arguments);
va_end (arguments);
+
if (ret < 0) {
ret = EVENT_ERROR_INVALID_INPUTS;
goto out;
}
- eventstr_size = snprintf(NULL, 0, "%u %d %s", (unsigned)time(NULL),
- event, msg);
+ ret = gf_asprintf (&eventstr, "%u %d %s",
+ (unsigned)time(NULL), event, msg);
- if (eventstr_size + 1 > EVENTS_MSG_MAX) {
- eventstr_size = EVENTS_MSG_MAX - 1;
+ if (ret <= 0) {
+ ret = EVENT_ERROR_MSG_FORMAT;
+ goto out;
}
- snprintf(eventstr, eventstr_size+1, "%u %d %s",
- (unsigned)time(NULL), event, msg);
-
- if (sys_write(sock, eventstr, strlen(eventstr)) <= 0) {
+ /* Send Message */
+ if (sendto (sock, eventstr, strlen (eventstr),
+ 0, (struct sockaddr *)&server, sizeof (server)) <= 0) {
ret = EVENT_ERROR_SEND;
- goto out;
}
ret = EVENT_SEND_OK;
out:
- sys_close(sock);
- GF_FREE(msg);
+ if (sock >= 0) {
+ sys_close (sock);
+ }
+
+ /* Allocated by gf_vasprintf */
+ if (msg)
+ GF_FREE (msg);
+
+ /* Allocated by gf_asprintf */
+ if (eventstr)
+ GF_FREE (eventstr);
+
return ret;
}