summaryrefslogtreecommitdiffstats
path: root/events
diff options
context:
space:
mode:
Diffstat (limited to 'events')
-rw-r--r--events/Makefile.am4
-rw-r--r--events/eventskeygen.py164
-rw-r--r--events/src/Makefile.am27
-rw-r--r--events/src/eventsapiconf.py.in49
-rw-r--r--events/src/eventsconfig.json4
-rw-r--r--events/src/gf_event.py30
-rw-r--r--events/src/glustereventsd.py166
-rw-r--r--events/src/handlers.py2
-rw-r--r--events/src/peer_eventsapi.py562
-rw-r--r--events/src/utils.py359
-rw-r--r--events/tools/Makefile.am5
-rw-r--r--events/tools/eventsdash.py15
12 files changed, 1031 insertions, 356 deletions
diff --git a/events/Makefile.am b/events/Makefile.am
index 04a74efc228..264bb742a80 100644
--- a/events/Makefile.am
+++ b/events/Makefile.am
@@ -1,6 +1,8 @@
SUBDIRS = src tools
-
+EXTRA_DIST = eventskeygen.py
noinst_PYTHON = eventskeygen.py
+if BUILD_EVENTS
install-data-hook:
$(INSTALL) -d -m 755 $(DESTDIR)@GLUSTERD_WORKDIR@/events
+endif
diff --git a/events/eventskeygen.py b/events/eventskeygen.py
index 7357d8e84a3..e28ebe9b7e6 100644
--- a/events/eventskeygen.py
+++ b/events/eventskeygen.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -21,16 +21,21 @@ gen_header_type = sys.argv[1]
# When adding new keys add it to the END
keys = (
+ # user driven events
+ #peer and volume management events
"EVENT_PEER_ATTACH",
"EVENT_PEER_DETACH",
-
"EVENT_VOLUME_CREATE",
"EVENT_VOLUME_START",
"EVENT_VOLUME_STOP",
"EVENT_VOLUME_DELETE",
"EVENT_VOLUME_SET",
"EVENT_VOLUME_RESET",
+ "EVENT_BRICK_RESET_START",
+ "EVENT_BRICK_RESET_COMMIT",
+ "EVENT_BRICK_REPLACE",
+ #geo-rep events
"EVENT_GEOREP_CREATE",
"EVENT_GEOREP_START",
"EVENT_GEOREP_STOP",
@@ -39,6 +44,157 @@ keys = (
"EVENT_GEOREP_DELETE",
"EVENT_GEOREP_CONFIG_SET",
"EVENT_GEOREP_CONFIG_RESET",
+
+ #bitrot events
+ "EVENT_BITROT_ENABLE",
+ "EVENT_BITROT_DISABLE",
+ "EVENT_BITROT_SCRUB_THROTTLE",
+ "EVENT_BITROT_SCRUB_FREQ",
+ "EVENT_BITROT_SCRUB_OPTION",
+ "EVENT_BITROT_SCRUB_ONDEMAND",
+
+ #quota events
+ "EVENT_QUOTA_ENABLE",
+ "EVENT_QUOTA_DISABLE",
+ "EVENT_QUOTA_SET_USAGE_LIMIT",
+ "EVENT_QUOTA_SET_OBJECTS_LIMIT",
+ "EVENT_QUOTA_REMOVE_USAGE_LIMIT",
+ "EVENT_QUOTA_REMOVE_OBJECTS_LIMIT",
+ "EVENT_QUOTA_ALERT_TIME",
+ "EVENT_QUOTA_SOFT_TIMEOUT",
+ "EVENT_QUOTA_HARD_TIMEOUT",
+ "EVENT_QUOTA_DEFAULT_SOFT_LIMIT",
+
+ #snapshot events
+ "EVENT_SNAPSHOT_CREATED",
+ "EVENT_SNAPSHOT_CREATE_FAILED",
+ "EVENT_SNAPSHOT_ACTIVATED",
+ "EVENT_SNAPSHOT_ACTIVATE_FAILED",
+ "EVENT_SNAPSHOT_DEACTIVATED",
+ "EVENT_SNAPSHOT_DEACTIVATE_FAILED",
+ "EVENT_SNAPSHOT_SOFT_LIMIT_REACHED",
+ "EVENT_SNAPSHOT_HARD_LIMIT_REACHED",
+ "EVENT_SNAPSHOT_RESTORED",
+ "EVENT_SNAPSHOT_RESTORE_FAILED",
+ "EVENT_SNAPSHOT_DELETED",
+ "EVENT_SNAPSHOT_DELETE_FAILED",
+ "EVENT_SNAPSHOT_CLONED",
+ "EVENT_SNAPSHOT_CLONE_FAILED",
+ "EVENT_SNAPSHOT_CONFIG_UPDATED",
+ "EVENT_SNAPSHOT_CONFIG_UPDATE_FAILED",
+ "EVENT_SNAPSHOT_SCHEDULER_INITIALISED",
+ "EVENT_SNAPSHOT_SCHEDULER_INIT_FAILED",
+ "EVENT_SNAPSHOT_SCHEDULER_ENABLED",
+ "EVENT_SNAPSHOT_SCHEDULER_ENABLE_FAILED",
+ "EVENT_SNAPSHOT_SCHEDULER_DISABLED",
+ "EVENT_SNAPSHOT_SCHEDULER_DISABLE_FAILED",
+ "EVENT_SNAPSHOT_SCHEDULER_SCHEDULE_ADDED",
+ "EVENT_SNAPSHOT_SCHEDULER_SCHEDULE_ADD_FAILED",
+ "EVENT_SNAPSHOT_SCHEDULER_SCHEDULE_EDITED",
+ "EVENT_SNAPSHOT_SCHEDULER_SCHEDULE_EDIT_FAILED",
+ "EVENT_SNAPSHOT_SCHEDULER_SCHEDULE_DELETED",
+ "EVENT_SNAPSHOT_SCHEDULER_SCHEDULE_DELETE_FAILED",
+
+ #async events
+ #glusterd events
+ "EVENT_SVC_MANAGER_FAILED",
+ "EVENT_SVC_RECONFIGURE_FAILED",
+ "EVENT_SVC_CONNECTED",
+ "EVENT_SVC_DISCONNECTED",
+ "EVENT_PEER_STORE_FAILURE",
+ "EVENT_PEER_RPC_CREATE_FAILED",
+ "EVENT_PEER_REJECT",
+ "EVENT_PEER_CONNECT",
+ "EVENT_PEER_DISCONNECT",
+ "EVENT_PEER_NOT_FOUND",
+ "EVENT_UNKNOWN_PEER",
+ "EVENT_BRICK_START_FAILED",
+ "EVENT_BRICK_STOP_FAILED",
+ "EVENT_BRICK_DISCONNECTED",
+ "EVENT_BRICK_CONNECTED",
+ "EVENT_BRICKPATH_RESOLVE_FAILED",
+ "EVENT_NOTIFY_UNKNOWN_OP",
+ "EVENT_QUORUM_LOST",
+ "EVENT_QUORUM_REGAINED",
+ "EVENT_REBALANCE_START_FAILED",
+ "EVENT_REBALANCE_STATUS_UPDATE_FAILED",
+ "EVENT_IMPORT_QUOTA_CONF_FAILED",
+ "EVENT_IMPORT_VOLUME_FAILED",
+ "EVENT_IMPORT_BRICK_FAILED",
+ "EVENT_COMPARE_FRIEND_VOLUME_FAILED",
+ "EVENT_NFS_GANESHA_EXPORT_FAILED",
+ #ec events
+ "EVENT_EC_MIN_BRICKS_NOT_UP",
+ "EVENT_EC_MIN_BRICKS_UP",
+ #georep async events
+ "EVENT_GEOREP_FAULTY",
+ "EVENT_GEOREP_CHECKPOINT_COMPLETED",
+ "EVENT_GEOREP_ACTIVE",
+ "EVENT_GEOREP_PASSIVE",
+
+ #quota async events
+ "EVENT_QUOTA_CROSSED_SOFT_LIMIT",
+ #bitrot async events
+ "EVENT_BITROT_BAD_FILE",
+ #protocol-server events
+ "EVENT_CLIENT_CONNECT",
+ "EVENT_CLIENT_AUTH_REJECT",
+ "EVENT_CLIENT_DISCONNECT",
+ #posix events
+ "EVENT_POSIX_SAME_GFID",
+ "EVENT_POSIX_ALREADY_PART_OF_VOLUME",
+ "EVENT_POSIX_BRICK_NOT_IN_VOLUME",
+ "EVENT_POSIX_BRICK_VERIFICATION_FAILED",
+ "EVENT_POSIX_ACL_NOT_SUPPORTED",
+ "EVENT_POSIX_HEALTH_CHECK_FAILED",
+ #afr events
+ "EVENT_AFR_QUORUM_MET",
+ "EVENT_AFR_QUORUM_FAIL",
+ "EVENT_AFR_SUBVOL_UP",
+ "EVENT_AFR_SUBVOLS_DOWN",
+ "EVENT_AFR_SPLIT_BRAIN",
+
+ #tier events
+ "EVENT_TIER_ATTACH",
+ "EVENT_TIER_ATTACH_FORCE",
+ "EVENT_TIER_DETACH_START",
+ "EVENT_TIER_DETACH_STOP",
+ "EVENT_TIER_DETACH_COMMIT",
+ "EVENT_TIER_DETACH_FORCE",
+ "EVENT_TIER_PAUSE",
+ "EVENT_TIER_RESUME",
+ "EVENT_TIER_WATERMARK_HI",
+ "EVENT_TIER_WATERMARK_DROPPED_TO_MID",
+ "EVENT_TIER_WATERMARK_RAISED_TO_MID",
+ "EVENT_TIER_WATERMARK_DROPPED_TO_LOW",
+
+ #dht events
+ #add/remove brick events
+ "EVENT_VOLUME_ADD_BRICK",
+ "EVENT_VOLUME_ADD_BRICK_FAILED",
+ "EVENT_VOLUME_REMOVE_BRICK_START",
+ "EVENT_VOLUME_REMOVE_BRICK_START_FAILED",
+ "EVENT_VOLUME_REMOVE_BRICK_COMMIT",
+ "EVENT_VOLUME_REMOVE_BRICK_COMMIT_FAILED",
+ "EVENT_VOLUME_REMOVE_BRICK_STOP",
+ "EVENT_VOLUME_REMOVE_BRICK_STOP_FAILED",
+ "EVENT_VOLUME_REMOVE_BRICK_FORCE",
+ "EVENT_VOLUME_REMOVE_BRICK_FORCE_FAILED",
+ "EVENT_VOLUME_REMOVE_BRICK_FAILED",
+
+ #rebalance events
+ "EVENT_VOLUME_REBALANCE_START",
+ "EVENT_VOLUME_REBALANCE_STOP",
+ "EVENT_VOLUME_REBALANCE_FAILED",
+ "EVENT_VOLUME_REBALANCE_COMPLETE",
+
+ #tier events
+ "EVENT_TIER_START",
+ "EVENT_TIER_START_FORCE",
+
+ #brick/inodes events
+ "EVENT_DHT_DISK_USAGE",
+ "EVENT_DHT_INODES_USAGE",
)
LAST_EVENT = "EVENT_LAST"
@@ -48,7 +204,9 @@ ERRORS = (
"EVENT_ERROR_INVALID_INPUTS",
"EVENT_ERROR_SOCKET",
"EVENT_ERROR_CONNECT",
- "EVENT_ERROR_SEND"
+ "EVENT_ERROR_SEND",
+ "EVENT_ERROR_RESOLVE",
+ "EVENT_ERROR_MSG_FORMAT",
)
if gen_header_type == "C_HEADER":
diff --git a/events/src/Makefile.am b/events/src/Makefile.am
index 423bc8891fb..3b229691897 100644
--- a/events/src/Makefile.am
+++ b/events/src/Makefile.am
@@ -5,28 +5,37 @@ EXTRA_DIST = glustereventsd.py __init__.py eventsapiconf.py.in \
BUILT_SOURCES = eventtypes.py
CLEANFILES = eventtypes.py
-eventsdir = $(libexecdir)/glusterfs/events
-eventspeerscriptdir = $(libexecdir)/glusterfs
+eventsdir = $(GLUSTERFS_LIBEXECDIR)/gfevents
+if BUILD_EVENTS
+events_PYTHON = __init__.py gf_event.py eventsapiconf.py eventtypes.py \
+ utils.py
+endif
+# this does not work, see the Makefile.am in the root for a workaround
+#nodist_events_PYTHON = eventtypes.py
+
+eventtypes.py: $(top_srcdir)/events/eventskeygen.py
+ $(PYTHON) $(top_srcdir)/events/eventskeygen.py PY_HEADER
+
+if BUILD_EVENTS
+eventspeerscriptdir = $(GLUSTERFS_LIBEXECDIR)
eventsconfdir = $(sysconfdir)/glusterfs
eventsconf_DATA = eventsconfig.json
-events_PYTHON = __init__.py eventsapiconf.py eventtypes.py handlers.py \
- utils.py gf_event.py
+events_PYTHON += handlers.py
events_SCRIPTS = glustereventsd.py
eventspeerscript_SCRIPTS = peer_eventsapi.py
-eventtypes.py: $(top_srcdir)/events/eventskeygen.py
- $(PYTHON) $(top_srcdir)/events/eventskeygen.py PY_HEADER
-
install-exec-hook:
$(mkdir_p) $(DESTDIR)$(sbindir)
rm -f $(DESTDIR)$(sbindir)/glustereventsd
- ln -s $(libexecdir)/glusterfs/events/glustereventsd.py \
+ ln -s $(GLUSTERFS_LIBEXECDIR)/gfevents/glustereventsd.py \
$(DESTDIR)$(sbindir)/glustereventsd
rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi
- ln -s $(libexecdir)/glusterfs/peer_eventsapi.py \
+ ln -s $(GLUSTERFS_LIBEXECDIR)/peer_eventsapi.py \
$(DESTDIR)$(sbindir)/gluster-eventsapi
uninstall-hook:
rm -f $(DESTDIR)$(sbindir)/glustereventsd
rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi
+
+endif
diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in
index 702e1d21820..700093bee60 100644
--- a/events/src/eventsapiconf.py.in
+++ b/events/src/eventsapiconf.py.in
@@ -9,14 +9,51 @@
# cases as published by the Free Software Foundation.
#
-SERVER_ADDRESS = "@localstatedir@/run/gluster/events.sock"
+import subprocess
+glusterd_workdir = None
+
+# Methods
+def get_glusterd_workdir():
+ global glusterd_workdir
+ if glusterd_workdir is not None:
+ return glusterd_workdir
+ proc = subprocess.Popen(["gluster", "system::", "getwd"],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines = True)
+ out, err = proc.communicate()
+ if proc.returncode == 0:
+ glusterd_workdir = out.strip()
+ else:
+ glusterd_workdir = "@GLUSTERD_WORKDIR@"
+ return glusterd_workdir
+
+SERVER_ADDRESS = "0.0.0.0"
+SERVER_ADDRESSv4 = "0.0.0.0"
+SERVER_ADDRESSv6 = "::1"
DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json"
CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json"
-CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC
+CUSTOM_CONFIG_FILE = get_glusterd_workdir() + CUSTOM_CONFIG_FILE_TO_SYNC
WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json"
-WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC
+WEBHOOKS_FILE = get_glusterd_workdir() + WEBHOOKS_FILE_TO_SYNC
LOG_FILE = "@localstatedir@/log/glusterfs/events.log"
EVENTSD = "glustereventsd"
-CONFIG_KEYS = ["log_level"]
-BOOL_CONFIGS = []
-RESTART_CONFIGS = []
+CONFIG_KEYS = ["log-level", "port", "disable-events-log"]
+BOOL_CONFIGS = ["disable-events-log"]
+INT_CONFIGS = ["port"]
+RESTART_CONFIGS = ["port"]
+EVENTS_ENABLED = @EVENTS_ENABLED@
+UUID_FILE = get_glusterd_workdir() + "/glusterd.info"
+PID_FILE = "@localstatedir@/run/glustereventsd.pid"
+AUTO_BOOL_ATTRIBUTES = ["force", "push-pem", "no-verify"]
+AUTO_INT_ATTRIBUTES = ["ssh-port"]
+CERTS_DIR = get_glusterd_workdir() + "/events"
+
+# Errors
+ERROR_SAME_CONFIG = 2
+ERROR_ALL_NODES_STATUS_NOT_OK = 3
+ERROR_PARTIAL_SUCCESS = 4
+ERROR_WEBHOOK_ALREADY_EXISTS = 5
+ERROR_WEBHOOK_NOT_EXISTS = 6
+ERROR_INVALID_CONFIG = 7
+ERROR_WEBHOOK_SYNC_FAILED = 8
+ERROR_CONFIG_SYNC_FAILED = 9
diff --git a/events/src/eventsconfig.json b/events/src/eventsconfig.json
index ce2c775f0bd..89e5b9c1d68 100644
--- a/events/src/eventsconfig.json
+++ b/events/src/eventsconfig.json
@@ -1,3 +1,5 @@
{
- "log_level": "INFO"
+ "log-level": "INFO",
+ "port": 24009,
+ "disable-events-log": false
}
diff --git a/events/src/gf_event.py b/events/src/gf_event.py
index 0924a65dddb..260b0d9aa48 100644
--- a/events/src/gf_event.py
+++ b/events/src/gf_event.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -13,25 +12,32 @@
import socket
import time
-from eventsapiconf import SERVER_ADDRESS
-from eventtypes import all_events
+from gfevents.eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED
+from gfevents.eventtypes import all_events
-from utils import logger, setup_logger
+from gfevents.utils import logger, setup_logger, get_config
# Run this when this lib loads
setup_logger()
def gf_event(event_type, **kwargs):
+ if EVENTS_ENABLED == 0:
+ return
+
if not isinstance(event_type, int) or event_type >= len(all_events):
logger.error("Invalid Event Type: {0}".format(event_type))
return
try:
- client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- client.connect(SERVER_ADDRESS)
+ client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
except socket.error as e:
- logger.error("Unable to connect to events.sock: {0}".format(e))
+ logger.error("Unable to connect to events Server: {0}".format(e))
+ return
+
+ port = get_config("port")
+ if port is None:
+ logger.error("Unable to get eventsd port details")
return
# Convert key value args into KEY1=VALUE1;KEY2=VALUE2;..
@@ -40,9 +46,15 @@ def gf_event(event_type, **kwargs):
msg += "{0}={1};".format(k, v)
# <TIMESTAMP> <EVENT_TYPE> <MSG>
- msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";"))
+ msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")).encode()
try:
- client.sendall(msg)
+ sent = client.sendto(msg, (SERVER_ADDRESS, port))
+ assert sent == len(msg)
except socket.error as e:
logger.error("Unable to Send message: {0}".format(e))
+ except AssertionError:
+ logger.error("Unable to send message. Sent: {0}, Actual: {1}".format(
+ sent, len(msg)))
+ finally:
+ client.close()
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
index 3fa57686a8b..341a3b60947 100644
--- a/events/src/glustereventsd.py
+++ b/events/src/glustereventsd.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -11,39 +11,46 @@
#
from __future__ import print_function
-import asyncore
-import socket
-import os
-from multiprocessing import Process, Queue
import sys
import signal
+import threading
+try:
+ import socketserver
+except ImportError:
+ import SocketServer as socketserver
+import socket
+from argparse import ArgumentParser, RawDescriptionHelpFormatter
from eventtypes import all_events
import handlers
import utils
-from eventsapiconf import SERVER_ADDRESS
-from utils import logger
-
-# Global Queue, EventsHandler will add items to the queue
-# and process_event will gets each item and handles it
-events_queue = Queue()
-events_server_pid = None
-
-
-def process_event():
- """
- Seperate process which handles all the incoming events from Gluster
- processes.
- """
- while True:
- data = events_queue.get()
- logger.debug("EVENT: {0}".format(repr(data)))
+from eventsapiconf import SERVER_ADDRESSv4, SERVER_ADDRESSv6, PID_FILE
+from eventsapiconf import AUTO_BOOL_ATTRIBUTES, AUTO_INT_ATTRIBUTES
+from utils import logger, PidFile, PidFileLockFailed, boolify
+
+# Subclass so that specifically IPv4 packets are captured
+class UDPServerv4(socketserver.ThreadingUDPServer):
+ address_family = socket.AF_INET
+
+# Subclass so that specifically IPv6 packets are captured
+class UDPServerv6(socketserver.ThreadingUDPServer):
+ address_family = socket.AF_INET6
+
+class GlusterEventsRequestHandler(socketserver.BaseRequestHandler):
+
+ def handle(self):
+ data = self.request[0].strip()
+ if sys.version_info >= (3,):
+ data = self.request[0].strip().decode("utf-8")
+
+ logger.debug("EVENT: {0} from {1}".format(repr(data),
+ self.client_address[0]))
try:
# Event Format <TIMESTAMP> <TYPE> <DETAIL>
ts, key, value = data.split(" ", 2)
except ValueError:
logger.warn("Invalid Event Format {0}".format(data))
- continue
+ return
data_dict = {}
try:
@@ -51,12 +58,22 @@ def process_event():
data_dict = dict(x.split('=') for x in value.split(';'))
except ValueError:
logger.warn("Unable to parse Event {0}".format(data))
- continue
+ return
+
+ for k, v in data_dict.items():
+ try:
+ if k in AUTO_BOOL_ATTRIBUTES:
+ data_dict[k] = boolify(v)
+ if k in AUTO_INT_ATTRIBUTES:
+ data_dict[k] = int(v)
+ except ValueError:
+ # Auto Conversion failed, Retain the old value
+ continue
try:
- # Event Type to Function Map, Recieved event data will be in
+ # Event Type to Function Map, Received event data will be in
# the form <TIMESTAMP> <TYPE> <DETAIL>, Get Event name for the
- # recieved Type/Key and construct a function name starting with
+ # received Type/Key and construct a function name starting with
# handle_ For example: handle_event_volume_create
func_name = "handle_" + all_events[int(key)].lower()
except IndexError:
@@ -75,73 +92,64 @@ def process_event():
handlers.generic_handler(ts, int(key), data_dict)
-def process_event_wrapper():
- try:
- process_event()
- except KeyboardInterrupt:
- return
-
-
-class GlusterEventsHandler(asyncore.dispatcher_with_send):
-
- def handle_read(self):
- data = self.recv(8192)
- if data:
- events_queue.put(data)
- self.send(data)
-
-
-class GlusterEventsServer(asyncore.dispatcher):
-
- def __init__(self):
- global events_server_pid
- asyncore.dispatcher.__init__(self)
- # Start the Events listener process which listens to
- # the global queue
- p = Process(target=process_event_wrapper)
- p.start()
- events_server_pid = p.pid
-
- # Create UNIX Domain Socket, bind to path
- self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.bind(SERVER_ADDRESS)
- self.listen(5)
-
- def handle_accept(self):
- pair = self.accept()
- if pair is not None:
- sock, addr = pair
- GlusterEventsHandler(sock)
-
-
def signal_handler_sigusr2(sig, frame):
- if events_server_pid is not None:
- os.kill(events_server_pid, signal.SIGUSR2)
utils.load_all()
+ utils.restart_webhook_pool()
+
+
+def UDP_server_thread(sock):
+ sock.serve_forever()
def init_event_server():
utils.setup_logger()
+ utils.load_all()
+ utils.init_webhook_pool()
- # Delete Socket file if Exists
+ port = utils.get_config("port")
+ if port is None:
+ sys.stderr.write("Unable to get Port details from Config\n")
+ sys.exit(1)
+
+ # Creating the Eventing Server, UDP Server for IPv4 packets
+ try:
+ serverv4 = UDPServerv4((SERVER_ADDRESSv4, port),
+ GlusterEventsRequestHandler)
+ except socket.error as e:
+ sys.stderr.write("Failed to start Eventsd for IPv4: {0}\n".format(e))
+ sys.exit(1)
+ # Creating the Eventing Server, UDP Server for IPv6 packets
try:
- os.unlink(SERVER_ADDRESS)
- except OSError:
- if os.path.exists(SERVER_ADDRESS):
- print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS),
- file=sys.stderr)
- sys.exit(1)
+ serverv6 = UDPServerv6((SERVER_ADDRESSv6, port),
+ GlusterEventsRequestHandler)
+ except socket.error as e:
+ sys.stderr.write("Failed to start Eventsd for IPv6: {0}\n".format(e))
+ sys.exit(1)
+ server_thread1 = threading.Thread(target=UDP_server_thread,
+ args=(serverv4,))
+ server_thread2 = threading.Thread(target=UDP_server_thread,
+ args=(serverv6,))
+ server_thread1.start()
+ server_thread2.start()
- utils.load_all()
- # Start the Eventing Server, UNIX DOMAIN SOCKET Server
- GlusterEventsServer()
- asyncore.loop()
+def get_args():
+ parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
+ description=__doc__)
+ parser.add_argument("-p", "--pid-file", help="PID File",
+ default=PID_FILE)
+
+ return parser.parse_args()
def main():
+ args = get_args()
try:
- init_event_server()
+ with PidFile(args.pid_file):
+ init_event_server()
+ except PidFileLockFailed as e:
+ sys.stderr.write("Failed to get lock for pid file({0}): {1}".format(
+ args.pid_file, e))
except KeyboardInterrupt:
sys.exit(1)
diff --git a/events/src/handlers.py b/events/src/handlers.py
index 21d3e83de54..7746d488bf3 100644
--- a/events/src/handlers.py
+++ b/events/src/handlers.py
@@ -23,7 +23,7 @@ def generic_handler(ts, key, data):
def handle_event_volume_set(ts, key, data):
"""
- Recieved data will have all the options as one string, split into
+ Received data will have all the options as one string, split into
list of options. "key1,value1,key2,value2" into
[[key1, value1], [key2, value2]]
"""
diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py
index 7887d77351c..4d2e5f35b1c 100644
--- a/events/src/peer_eventsapi.py
+++ b/events/src/peer_eventsapi.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -14,24 +14,52 @@ from __future__ import print_function
import os
import json
from errno import EEXIST
+import fcntl
+from errno import EACCES, EAGAIN
+import signal
+import sys
+import time
import requests
-import fasteners
from prettytable import PrettyTable
-from gluster.cliutils import (Cmd, execute, node_output_ok, node_output_notok,
+from gluster.cliutils import (Cmd, node_output_ok, node_output_notok,
sync_file_to_peers, GlusterCmdException,
- output_error, execute_in_peers, runcli)
-
-from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
- WEBHOOKS_FILE,
- DEFAULT_CONFIG_FILE,
- CUSTOM_CONFIG_FILE,
- CUSTOM_CONFIG_FILE_TO_SYNC,
- EVENTSD,
- CONFIG_KEYS,
- BOOL_CONFIGS,
- RESTART_CONFIGS)
+ output_error, execute_in_peers, runcli,
+ set_common_args_func)
+from gfevents.utils import LockedOpen, get_jwt_token, save_https_cert
+
+from gfevents.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
+ WEBHOOKS_FILE,
+ DEFAULT_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE_TO_SYNC,
+ EVENTSD,
+ CONFIG_KEYS,
+ BOOL_CONFIGS,
+ INT_CONFIGS,
+ PID_FILE,
+ RESTART_CONFIGS,
+ ERROR_INVALID_CONFIG,
+ ERROR_WEBHOOK_NOT_EXISTS,
+ ERROR_CONFIG_SYNC_FAILED,
+ ERROR_WEBHOOK_ALREADY_EXISTS,
+ ERROR_PARTIAL_SUCCESS,
+ ERROR_ALL_NODES_STATUS_NOT_OK,
+ ERROR_SAME_CONFIG,
+ ERROR_WEBHOOK_SYNC_FAILED,
+ CERTS_DIR)
+
+
+def handle_output_error(err, errcode=1, json_output=False):
+ if json_output:
+ print (json.dumps({
+ "output": "",
+ "error": err
+ }))
+ sys.exit(errcode)
+ else:
+ output_error(err, errcode)
def file_content_overwrite(fname, data):
@@ -41,15 +69,27 @@ def file_content_overwrite(fname, data):
os.rename(fname + ".tmp", fname)
-def create_custom_config_file_if_not_exists():
- mkdirp(os.path.dirname(CUSTOM_CONFIG_FILE))
+def create_custom_config_file_if_not_exists(args):
+ try:
+ config_dir = os.path.dirname(CUSTOM_CONFIG_FILE)
+ mkdirp(config_dir)
+ except OSError as e:
+ handle_output_error("Failed to create dir %s: %s" % (config_dir, e),
+ json_output=args.json)
+
if not os.path.exists(CUSTOM_CONFIG_FILE):
with open(CUSTOM_CONFIG_FILE, "w") as f:
f.write("{}")
-def create_webhooks_file_if_not_exists():
- mkdirp(os.path.dirname(WEBHOOKS_FILE))
+def create_webhooks_file_if_not_exists(args):
+ try:
+ webhooks_dir = os.path.dirname(WEBHOOKS_FILE)
+ mkdirp(webhooks_dir)
+ except OSError as e:
+ handle_output_error("Failed to create dir %s: %s" % (webhooks_dir, e),
+ json_output=args.json)
+
if not os.path.exists(WEBHOOKS_FILE):
with open(WEBHOOKS_FILE, "w") as f:
f.write("{}")
@@ -70,104 +110,115 @@ def mkdirp(path, exit_on_err=False, logger=None):
"""
try:
os.makedirs(path)
- except (OSError, IOError) as e:
- if e.errno == EEXIST and os.path.isdir(path):
- pass
- else:
- output_error("Fail to create dir %s: %s" % (path, e))
-
-
-def is_enabled(service):
- rc, out, err = execute(["systemctl", "is-enabled", service])
- return rc == 0
-
-
-def is_active(service):
- rc, out, err = execute(["systemctl", "is-active", service])
- return rc == 0
+ except OSError as e:
+ if e.errno != EEXIST or not os.path.isdir(path):
+ raise
-def enable_service(service):
- if not is_enabled(service):
- cmd = ["systemctl", "enable", service]
- return execute(cmd)
-
- return (0, "", "")
+def is_active():
+ state = False
+ try:
+ with open(PID_FILE, "a+") as f:
+ fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
+ state = False
+ except (IOError, OSError) as e:
+ if e.errno in (EACCES, EAGAIN):
+ # cannot grab. so, process still running..move on
+ state = True
+ else:
+ state = False
+ return state
-def disable_service(service):
- if is_enabled(service):
- cmd = ["systemctl", "disable", service]
- return execute(cmd)
+def reload_service():
+ pid = None
+ if is_active():
+ with open(PID_FILE) as f:
+ try:
+ pid = int(f.read().strip())
+ except ValueError:
+ pid = None
+ if pid is not None:
+ os.kill(pid, signal.SIGUSR2)
return (0, "", "")
-def start_service(service):
- rc, out, err = enable_service(service)
- if rc != 0:
- return (rc, out, err)
-
- cmd = ["systemctl", "start", service]
- return execute(cmd)
-
-
-def stop_service(service):
- rc, out, err = disable_service(service)
- if rc != 0:
- return (rc, out, err)
-
- cmd = ["systemctl", "stop", service]
- return execute(cmd)
-
-
-def restart_service(service):
- rc, out, err = stop_service(service)
- if rc != 0:
- return (rc, out, err)
+def rows_to_json(json_out, column_name, rows):
+ num_ok_rows = 0
+ for row in rows:
+ num_ok_rows += 1 if row.ok else 0
+ json_out.append({
+ "node": row.hostname,
+ "node_status": "UP" if row.node_up else "DOWN",
+ column_name: "OK" if row.ok else "NOT OK",
+ "error": row.error
+ })
+ return num_ok_rows
- return start_service(service)
+def rows_to_table(table, rows):
+ num_ok_rows = 0
+ for row in rows:
+ num_ok_rows += 1 if row.ok else 0
+ table.add_row([row.hostname,
+ "UP" if row.node_up else "DOWN",
+ "OK" if row.ok else "NOT OK: {0}".format(
+ row.error)])
+ return num_ok_rows
-def reload_service(service):
- if is_active(service):
- cmd = ["systemctl", "reload", service]
- return execute(cmd)
- return (0, "", "")
-
-
-def sync_to_peers(restart=False):
+def sync_to_peers(args):
if os.path.exists(WEBHOOKS_FILE):
try:
sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC)
except GlusterCmdException as e:
- output_error("Failed to sync Webhooks file: [Error: {0}]"
- "{1}".format(e[0], e[2]))
+ # Print stdout if stderr is empty
+ errmsg = e.message[2] if e.message[2] else e.message[1]
+ handle_output_error("Failed to sync Webhooks file: [Error: {0}]"
+ "{1}".format(e.message[0], errmsg),
+ errcode=ERROR_WEBHOOK_SYNC_FAILED,
+ json_output=args.json)
if os.path.exists(CUSTOM_CONFIG_FILE):
try:
sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC)
except GlusterCmdException as e:
- output_error("Failed to sync Config file: [Error: {0}]"
- "{1}".format(e[0], e[2]))
-
- action = "node-reload"
- if restart:
- action = "node-restart"
-
- out = execute_in_peers(action)
- table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"])
- table.align["NODE STATUS"] = "r"
- table.align["SYNC STATUS"] = "r"
+ # Print stdout if stderr is empty
+ errmsg = e.message[2] if e.message[2] else e.message[1]
+ handle_output_error("Failed to sync Config file: [Error: {0}]"
+ "{1}".format(e.message[0], errmsg),
+ errcode=ERROR_CONFIG_SYNC_FAILED,
+ json_output=args.json)
+
+ out = execute_in_peers("node-reload")
+ if not args.json:
+ table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"])
+ table.align["NODE STATUS"] = "r"
+ table.align["SYNC STATUS"] = "r"
- for p in out:
- table.add_row([p.hostname,
- "UP" if p.node_up else "DOWN",
- "OK" if p.ok else "NOT OK: {0}".format(
- p.error)])
+ json_out = []
+ if args.json:
+ num_ok_rows = rows_to_json(json_out, "sync_status", out)
+ else:
+ num_ok_rows = rows_to_table(table, out)
+
+ ret = 0
+ if num_ok_rows == 0:
+ ret = ERROR_ALL_NODES_STATUS_NOT_OK
+ elif num_ok_rows != len(out):
+ ret = ERROR_PARTIAL_SUCCESS
+
+ if args.json:
+ print (json.dumps({
+ "output": json_out,
+ "error": ""
+ }))
+ else:
+ print (table)
- print (table)
+ # If sync status is not ok for any node set error code as partial success
+ sys.exit(ret)
def node_output_handle(resp):
@@ -178,92 +229,52 @@ def node_output_handle(resp):
node_output_notok(err)
-def action_handle(action):
+def action_handle(action, json_output=False):
out = execute_in_peers("node-" + action)
column_name = action.upper()
if action == "status":
column_name = EVENTSD.upper()
- table = PrettyTable(["NODE", "NODE STATUS", column_name + " STATUS"])
- table.align["NODE STATUS"] = "r"
- table.align[column_name + " STATUS"] = "r"
-
- for p in out:
- status_col_val = "OK" if p.ok else "NOT OK: {0}".format(
- p.error)
- if action == "status":
- status_col_val = "DOWN"
- if p.ok:
- status_col_val = p.output
-
- table.add_row([p.hostname,
- "UP" if p.node_up else "DOWN",
- status_col_val])
-
- print (table)
-
-
-class NodeStart(Cmd):
- name = "node-start"
-
- def run(self, args):
- node_output_handle(start_service(EVENTSD))
-
-
-class StartCmd(Cmd):
- name = "start"
-
- def run(self, args):
- action_handle("start")
-
-
-class NodeStop(Cmd):
- name = "node-stop"
-
- def run(self, args):
- node_output_handle(stop_service(EVENTSD))
-
-
-class StopCmd(Cmd):
- name = "stop"
-
- def run(self, args):
- action_handle("stop")
-
-
-class NodeRestart(Cmd):
- name = "node-restart"
-
- def run(self, args):
- node_output_handle(restart_service(EVENTSD))
-
+ if not json_output:
+ table = PrettyTable(["NODE", "NODE STATUS", column_name + " STATUS"])
+ table.align["NODE STATUS"] = "r"
+ table.align[column_name + " STATUS"] = "r"
-class RestartCmd(Cmd):
- name = "restart"
+ json_out = []
+ if json_output:
+ rows_to_json(json_out, column_name.lower() + "_status", out)
+ else:
+ rows_to_table(table, out)
- def run(self, args):
- action_handle("restart")
+ return json_out if json_output else table
class NodeReload(Cmd):
name = "node-reload"
def run(self, args):
- node_output_handle(reload_service(EVENTSD))
+ node_output_handle(reload_service())
class ReloadCmd(Cmd):
name = "reload"
def run(self, args):
- action_handle("reload")
+ out = action_handle("reload", args.json)
+ if args.json:
+ print (json.dumps({
+ "output": out,
+ "error": ""
+ }))
+ else:
+ print (out)
class NodeStatus(Cmd):
name = "node-status"
def run(self, args):
- node_output_ok("UP" if is_active(EVENTSD) else "DOWN")
+ node_output_ok("UP" if is_active() else "DOWN")
class StatusCmd(Cmd):
@@ -274,12 +285,25 @@ class StatusCmd(Cmd):
if os.path.exists(WEBHOOKS_FILE):
webhooks = json.load(open(WEBHOOKS_FILE))
- print ("Webhooks: " + ("" if webhooks else "None"))
- for w in webhooks:
- print (w)
-
- print ()
- action_handle("status")
+ json_out = {"webhooks": [], "data": []}
+ if args.json:
+ json_out["webhooks"] = webhooks.keys()
+ else:
+ print ("Webhooks: " + ("" if webhooks else "None"))
+ for w in webhooks:
+ print (w)
+
+ print ()
+
+ out = action_handle("status", args.json)
+ if args.json:
+ json_out["data"] = out
+ print (json.dumps({
+ "output": json_out,
+ "error": ""
+ }))
+ else:
+ print (out)
class WebhookAddCmd(Cmd):
@@ -289,19 +313,24 @@ class WebhookAddCmd(Cmd):
parser.add_argument("url", help="URL of Webhook")
parser.add_argument("--bearer_token", "-t", help="Bearer Token",
default="")
+ parser.add_argument("--secret", "-s",
+ help="Secret to add JWT Bearer Token", default="")
def run(self, args):
- create_webhooks_file_if_not_exists()
+ create_webhooks_file_if_not_exists(args)
- with fasteners.InterProcessLock(WEBHOOKS_FILE):
+ with LockedOpen(WEBHOOKS_FILE, 'r+'):
data = json.load(open(WEBHOOKS_FILE))
if data.get(args.url, None) is not None:
- output_error("Webhook already exists")
+ handle_output_error("Webhook already exists",
+ errcode=ERROR_WEBHOOK_ALREADY_EXISTS,
+ json_output=args.json)
- data[args.url] = args.bearer_token
+ data[args.url] = {"token": args.bearer_token,
+ "secret": args.secret}
file_content_overwrite(WEBHOOKS_FILE, data)
- sync_to_peers()
+ sync_to_peers(args)
class WebhookModCmd(Cmd):
@@ -311,19 +340,31 @@ class WebhookModCmd(Cmd):
parser.add_argument("url", help="URL of Webhook")
parser.add_argument("--bearer_token", "-t", help="Bearer Token",
default="")
+ parser.add_argument("--secret", "-s",
+ help="Secret to add JWT Bearer Token", default="")
def run(self, args):
- create_webhooks_file_if_not_exists()
+ create_webhooks_file_if_not_exists(args)
- with fasteners.InterProcessLock(WEBHOOKS_FILE):
+ with LockedOpen(WEBHOOKS_FILE, 'r+'):
data = json.load(open(WEBHOOKS_FILE))
if data.get(args.url, None) is None:
- output_error("Webhook does not exists")
+ handle_output_error("Webhook does not exists",
+ errcode=ERROR_WEBHOOK_NOT_EXISTS,
+ json_output=args.json)
+
+ if isinstance(data[args.url], str):
+ data[args.url]["token"] = data[args.url]
+
+ if args.bearer_token != "":
+ data[args.url]["token"] = args.bearer_token
+
+ if args.secret != "":
+ data[args.url]["secret"] = args.secret
- data[args.url] = args.bearer_token
file_content_overwrite(WEBHOOKS_FILE, data)
- sync_to_peers()
+ sync_to_peers(args)
class WebhookDelCmd(Cmd):
@@ -333,17 +374,19 @@ class WebhookDelCmd(Cmd):
parser.add_argument("url", help="URL of Webhook")
def run(self, args):
- create_webhooks_file_if_not_exists()
+ create_webhooks_file_if_not_exists(args)
- with fasteners.InterProcessLock(WEBHOOKS_FILE):
+ with LockedOpen(WEBHOOKS_FILE, 'r+'):
data = json.load(open(WEBHOOKS_FILE))
if data.get(args.url, None) is None:
- output_error("Webhook does not exists")
+ handle_output_error("Webhook does not exists",
+ errcode=ERROR_WEBHOOK_NOT_EXISTS,
+ json_output=args.json)
del data[args.url]
file_content_overwrite(WEBHOOKS_FILE, data)
- sync_to_peers()
+ sync_to_peers(args)
class NodeWebhookTestCmd(Cmd):
@@ -352,16 +395,57 @@ class NodeWebhookTestCmd(Cmd):
def args(self, parser):
parser.add_argument("url")
parser.add_argument("bearer_token")
+ parser.add_argument("secret")
def run(self, args):
http_headers = {}
+ hashval = ""
if args.bearer_token != ".":
- http_headers["Authorization"] = "Bearer " + args.bearer_token
-
- try:
- resp = requests.post(args.url, headers=http_headers)
- except requests.ConnectionError as e:
- node_output_notok("{0}".format(e))
+ hashval = args.bearer_token
+
+ if args.secret != ".":
+ hashval = get_jwt_token(args.secret, "TEST", int(time.time()))
+
+ if hashval:
+ http_headers["Authorization"] = "Bearer " + hashval
+
+ urldata = requests.utils.urlparse(args.url)
+ parts = urldata.netloc.split(":")
+ domain = parts[0]
+ # Default https port if not specified
+ port = 443
+ if len(parts) == 2:
+ port = int(parts[1])
+
+ cert_path = os.path.join(CERTS_DIR, args.url.replace("/", "_").strip())
+ verify = True
+ while True:
+ try:
+ resp = requests.post(args.url, headers=http_headers,
+ verify=verify)
+ # Successful webhook push
+ break
+ except requests.exceptions.SSLError as e:
+ # If verify is equal to cert path, but still failed with
+ # SSLError, Looks like some issue with custom downloaded
+ # certificate, Try with verify = false
+ if verify == cert_path:
+ verify = False
+ continue
+
+ # If verify is instance of bool and True, then custom cert
+ # is required, download the cert and retry
+ try:
+ save_https_cert(domain, port, cert_path)
+ verify = cert_path
+ except Exception:
+ verify = False
+
+ # Done with collecting cert, continue
+ continue
+ except Exception as e:
+ node_output_notok("{0}".format(e))
+ break
if resp.status_code != 200:
node_output_notok("{0}".format(resp.status_code))
@@ -375,28 +459,51 @@ class WebhookTestCmd(Cmd):
def args(self, parser):
parser.add_argument("url", help="URL of Webhook")
parser.add_argument("--bearer_token", "-t", help="Bearer Token")
+ parser.add_argument("--secret", "-s",
+ help="Secret to generate Bearer Token")
def run(self, args):
url = args.url
bearer_token = args.bearer_token
+ secret = args.secret
+
if not args.url:
url = "."
if not args.bearer_token:
bearer_token = "."
+ if not args.secret:
+ secret = "."
- out = execute_in_peers("node-webhook-test", [url, bearer_token])
+ out = execute_in_peers("node-webhook-test", [url, bearer_token,
+ secret])
- table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"])
- table.align["NODE STATUS"] = "r"
- table.align["WEBHOOK STATUS"] = "r"
+ if not args.json:
+ table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"])
+ table.align["NODE STATUS"] = "r"
+ table.align["WEBHOOK STATUS"] = "r"
- for p in out:
- table.add_row([p.hostname,
- "UP" if p.node_up else "DOWN",
- "OK" if p.ok else "NOT OK: {0}".format(
- p.error)])
+ num_ok_rows = 0
+ json_out = []
+ if args.json:
+ num_ok_rows = rows_to_json(json_out, "webhook_status", out)
+ else:
+ num_ok_rows = rows_to_table(table, out)
+
+ ret = 0
+ if num_ok_rows == 0:
+ ret = ERROR_ALL_NODES_STATUS_NOT_OK
+ elif num_ok_rows != len(out):
+ ret = ERROR_PARTIAL_SUCCESS
+
+ if args.json:
+ print (json.dumps({
+ "output": json_out,
+ "error": ""
+ }))
+ else:
+ print (table)
- print (table)
+ sys.exit(ret)
class ConfigGetCmd(Cmd):
@@ -411,16 +518,30 @@ class ConfigGetCmd(Cmd):
data.update(json.load(open(CUSTOM_CONFIG_FILE)))
if args.name is not None and args.name not in CONFIG_KEYS:
- output_error("Invalid Config item")
+ handle_output_error("Invalid Config item",
+ errcode=ERROR_INVALID_CONFIG,
+ json_output=args.json)
+
+ if args.json:
+ json_out = {}
+ if args.name is None:
+ json_out = data
+ else:
+ json_out[args.name] = data[args.name]
- table = PrettyTable(["NAME", "VALUE"])
- if args.name is None:
- for k, v in data.items():
- table.add_row([k, v])
+ print (json.dumps({
+ "output": json_out,
+ "error": ""
+ }))
else:
- table.add_row([args.name, data[args.name]])
+ table = PrettyTable(["NAME", "VALUE"])
+ if args.name is None:
+ for k, v in data.items():
+ table.add_row([k, v])
+ else:
+ table.add_row([args.name, data[args.name]])
- print (table)
+ print (table)
def read_file_content_json(fname):
@@ -442,9 +563,13 @@ class ConfigSetCmd(Cmd):
def run(self, args):
if args.name not in CONFIG_KEYS:
- output_error("Invalid Config item")
+ handle_output_error("Invalid Config item",
+ errcode=ERROR_INVALID_CONFIG,
+ json_output=args.json)
- with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE):
+ create_custom_config_file_if_not_exists(args)
+
+ with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'):
data = json.load(open(DEFAULT_CONFIG_FILE))
if os.path.exists(CUSTOM_CONFIG_FILE):
config_json = read_file_content_json(CUSTOM_CONFIG_FILE)
@@ -452,16 +577,20 @@ class ConfigSetCmd(Cmd):
# Do Nothing if same as previous value
if data[args.name] == args.value:
- return
+ handle_output_error("Config value not changed. Same config",
+ errcode=ERROR_SAME_CONFIG,
+ json_output=args.json)
# TODO: Validate Value
- create_custom_config_file_if_not_exists()
new_data = read_file_content_json(CUSTOM_CONFIG_FILE)
v = args.value
if args.name in BOOL_CONFIGS:
v = boolify(args.value)
+ if args.name in INT_CONFIGS:
+ v = int(args.value)
+
new_data[args.name] = v
file_content_overwrite(CUSTOM_CONFIG_FILE, new_data)
@@ -470,7 +599,10 @@ class ConfigSetCmd(Cmd):
if args.name in RESTART_CONFIGS:
restart = True
- sync_to_peers(restart=restart)
+ if restart:
+ print ("\nRestart glustereventsd in all nodes")
+
+ sync_to_peers(args)
class ConfigResetCmd(Cmd):
@@ -480,14 +612,22 @@ class ConfigResetCmd(Cmd):
parser.add_argument("name", help="Config Name or all")
def run(self, args):
- with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE):
+ create_custom_config_file_if_not_exists(args)
+
+ with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'):
changed_keys = []
data = {}
if os.path.exists(CUSTOM_CONFIG_FILE):
data = read_file_content_json(CUSTOM_CONFIG_FILE)
- if not data:
- return
+ # If No data available in custom config or, the specific config
+ # item is not available in custom config
+ if not data or \
+ (args.name != "all" and data.get(args.name, None) is None):
+ handle_output_error("Config value not reset. Already "
+ "set to default value",
+ errcode=ERROR_SAME_CONFIG,
+ json_output=args.json)
if args.name.lower() == "all":
for k, v in data.items():
@@ -507,15 +647,23 @@ class ConfigResetCmd(Cmd):
restart = True
break
- sync_to_peers(restart=restart)
+ if restart:
+ print ("\nRestart glustereventsd in all nodes")
+
+ sync_to_peers(args)
class SyncCmd(Cmd):
name = "sync"
def run(self, args):
- sync_to_peers()
+ sync_to_peers(args)
+
+
+def common_args(parser):
+ parser.add_argument("--json", help="JSON Output", action="store_true")
if __name__ == "__main__":
+ set_common_args_func(common_args)
runcli()
diff --git a/events/src/utils.py b/events/src/utils.py
index 772221a1e25..6d4e0791a2b 100644
--- a/events/src/utils.py
+++ b/events/src/utils.py
@@ -9,22 +9,39 @@
# cases as published by the Free Software Foundation.
#
+import sys
import json
import os
import logging
+import logging.handlers
+import fcntl
+from errno import EBADF
+from threading import Thread
+import multiprocessing
+try:
+ from queue import Queue
+except ImportError:
+ from Queue import Queue
+from datetime import datetime, timedelta
+import base64
+import hmac
+from hashlib import sha256
+from calendar import timegm
-import requests
-from eventsapiconf import (LOG_FILE,
- WEBHOOKS_FILE,
- DEFAULT_CONFIG_FILE,
- CUSTOM_CONFIG_FILE)
-import eventtypes
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-from gluster.cliutils import get_node_uuid
+from gfevents.eventsapiconf import (LOG_FILE,
+ WEBHOOKS_FILE,
+ DEFAULT_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE,
+ UUID_FILE,
+ CERTS_DIR)
+from gfevents import eventtypes
# Webhooks list
_webhooks = {}
+_webhooks_file_mtime = 0
# Default Log Level
_log_level = "INFO"
# Config Object
@@ -32,6 +49,38 @@ _config = {}
# Init Logger instance
logger = logging.getLogger(__name__)
+NodeID = None
+webhooks_pool = None
+
+
+def boolify(value):
+ value = str(value)
+ if value.lower() in ["1", "on", "true", "yes"]:
+ return True
+ else:
+ return False
+
+
+def log_event(data):
+ # Log all published events unless it is disabled
+ if not _config.get("disable-events-log", False):
+ logger.info(repr(data))
+
+
+def get_node_uuid():
+ val = None
+ with open(UUID_FILE) as f:
+ for line in f:
+ if line.startswith("UUID="):
+ val = line.strip().split("=")[-1]
+ break
+ return val
+
+
+def get_config(key, default_value=None):
+ if not _config:
+ load_config()
+ return _config.get(key, default_value)
def get_event_type_name(idx):
@@ -50,7 +99,7 @@ def setup_logger():
logger.setLevel(logging.INFO)
# create the logging file handler
- fh = logging.FileHandler(LOG_FILE)
+ fh = logging.handlers.WatchedFileHandler(LOG_FILE)
formatter = logging.Formatter("[%(asctime)s] %(levelname)s "
"[%(module)s - %(lineno)s:%(funcName)s] "
@@ -81,7 +130,7 @@ def load_log_level():
be triggered during init and when SIGUSR2.
"""
global logger, _log_level
- new_log_level = _config.get("log_level", "INFO")
+ new_log_level = _config.get("log-level", "INFO")
if _log_level != new_log_level:
logger.setLevel(getattr(logging, new_log_level.upper()))
_log_level = new_log_level.upper()
@@ -92,10 +141,12 @@ def load_webhooks():
Load/Reload the webhooks list. This function will
be triggered during init and when SIGUSR2.
"""
- global _webhooks
+ global _webhooks, _webhooks_file_mtime
_webhooks = {}
if os.path.exists(WEBHOOKS_FILE):
_webhooks = json.load(open(WEBHOOKS_FILE))
+ st = os.lstat(WEBHOOKS_FILE)
+ _webhooks_file_mtime = st.st_mtime
def load_all():
@@ -109,12 +160,21 @@ def load_all():
def publish(ts, event_key, data):
+ global NodeID
+ if NodeID is None:
+ NodeID = get_node_uuid()
+
+ autoload_webhooks()
+
message = {
- "nodeid": get_node_uuid(),
+ "nodeid": NodeID,
"ts": int(ts),
"event": get_event_type_name(event_key),
"message": data
}
+
+ log_event(message)
+
if _webhooks:
plugin_webhook(message)
else:
@@ -122,29 +182,264 @@ def publish(ts, event_key, data):
pass
+def autoload_webhooks():
+ global _webhooks_file_mtime
+ try:
+ st = os.lstat(WEBHOOKS_FILE)
+ except OSError:
+ st = None
+
+ if st is not None:
+ # If Stat is available and mtime is not matching with
+ # previously recorded mtime, reload the webhooks file
+ if st.st_mtime != _webhooks_file_mtime:
+ load_webhooks()
+
+
+def base64_urlencode(inp):
+ return base64.urlsafe_b64encode(inp).replace("=", "").strip()
+
+
+def get_jwt_token(secret, event_type, event_ts, jwt_expiry_time_seconds=60):
+ exp = datetime.utcnow() + timedelta(seconds=jwt_expiry_time_seconds)
+ payload = {
+ "exp": timegm(exp.utctimetuple()),
+ "iss": "gluster",
+ "sub": event_type,
+ "iat": event_ts
+ }
+ header = '{"alg":"HS256","typ":"JWT"}'
+ payload = json.dumps(payload, separators=(',', ':'), sort_keys=True)
+ msg = base64_urlencode(header) + "." + base64_urlencode(payload)
+ return "%s.%s" % (
+ msg,
+ base64_urlencode(hmac.HMAC(str(secret), msg, sha256).digest())
+ )
+
+
+def save_https_cert(domain, port, cert_path):
+ import ssl
+
+ # Cert file already available for this URL
+ if os.path.exists(cert_path):
+ return
+
+ cert_data = ssl.get_server_certificate((domain, port))
+ with open(cert_path, "w") as f:
+ f.write(cert_data)
+
+
+def publish_to_webhook(url, token, secret, message_queue):
+ # Import requests here since not used in any other place
+ import requests
+
+ http_headers = {"Content-Type": "application/json"}
+ urldata = requests.utils.urlparse(url)
+ parts = urldata.netloc.split(":")
+ domain = parts[0]
+ # Default https port if not specified
+ port = 443
+ if len(parts) == 2:
+ port = int(parts[1])
+
+ cert_path = os.path.join(CERTS_DIR, url.replace("/", "_").strip())
+
+ while True:
+ hashval = ""
+ event_type, event_ts, message_json = message_queue.get()
+ if token != "" and token is not None:
+ hashval = token
+
+ if secret != "" and secret is not None:
+ hashval = get_jwt_token(secret, event_type, event_ts)
+
+ if hashval:
+ http_headers["Authorization"] = "Bearer " + hashval
+
+ verify = True
+ while True:
+ try:
+ resp = requests.post(url, headers=http_headers,
+ data=message_json,
+ verify=verify)
+ # Successful webhook push
+ message_queue.task_done()
+ if resp.status_code != 200:
+ logger.warn("Event push failed to URL: {url}, "
+ "Event: {event}, "
+ "Status Code: {status_code}".format(
+ url=url,
+ event=message_json,
+ status_code=resp.status_code))
+ break
+ except requests.exceptions.SSLError as e:
+ # If verify is equal to cert path, but still failed with
+ # SSLError, Looks like some issue with custom downloaded
+ # certificate, Try with verify = false
+ if verify == cert_path:
+ logger.warn("Event push failed with certificate, "
+ "ignoring verification url={0} "
+ "Error={1}".format(url, e))
+ verify = False
+ continue
+
+ # If verify is instance of bool and True, then custom cert
+ # is required, download the cert and retry
+ try:
+ save_https_cert(domain, port, cert_path)
+ verify = cert_path
+ except Exception as ex:
+ verify = False
+ logger.warn("Unable to get Server certificate, "
+ "ignoring verification url={0} "
+ "Error={1}".format(url, ex))
+
+ # Done with collecting cert, continue
+ continue
+ except Exception as e:
+ logger.warn("Event push failed to URL: {url}, "
+ "Event: {event}, "
+ "Status: {error}".format(
+ url=url,
+ event=message_json,
+ error=e))
+ message_queue.task_done()
+ break
+
+
def plugin_webhook(message):
message_json = json.dumps(message, sort_keys=True)
logger.debug("EVENT: {0}".format(message_json))
- for url, token in _webhooks.items():
- http_headers = {"Content-Type": "application/json"}
- if token != "" and token is not None:
- http_headers["Authorization"] = "Bearer " + token
+ webhooks_pool.send(message["event"], message["ts"], message_json)
+
+
+class LockedOpen(object):
+ def __init__(self, filename, *args, **kwargs):
+ self.filename = filename
+ self.open_args = args
+ self.open_kwargs = kwargs
+ self.fileobj = None
+
+ def __enter__(self):
+ """
+ If two processes compete to update a file, The first process
+ gets the lock and the second process is blocked in the fcntl.flock()
+ call. When first process replaces the file and releases the lock,
+ the already open file descriptor in the second process now points
+ to a "ghost" file(not reachable by any path name) with old contents.
+ To avoid that conflict, check the fd already opened is same or
+ not. Open new one if not same
+ """
+ f = open(self.filename, *self.open_args, **self.open_kwargs)
+ while True:
+ fcntl.flock(f, fcntl.LOCK_EX)
+ fnew = open(self.filename, *self.open_args, **self.open_kwargs)
+ if os.path.sameopenfile(f.fileno(), fnew.fileno()):
+ fnew.close()
+ break
+ else:
+ f.close()
+ f = fnew
+ self.fileobj = f
+ return f
+
+ def __exit__(self, _exc_type, _exc_value, _traceback):
+ self.fileobj.close()
+
+
+class PidFileLockFailed(Exception):
+ pass
+
+
+class PidFile(object):
+ def __init__(self, filename):
+ self.filename = filename
+ self.pid = os.getpid()
+ self.fh = None
+
+ def cleanup(self, remove_file=True):
try:
- resp = requests.post(url, headers=http_headers, data=message_json)
- except requests.ConnectionError as e:
- logger.warn("Event push failed to URL: {url}, "
- "Event: {event}, "
- "Status: {error}".format(
- url=url,
- event=message_json,
- error=e))
- continue
-
- if resp.status_code != 200:
- logger.warn("Event push failed to URL: {url}, "
- "Event: {event}, "
- "Status Code: {status_code}".format(
- url=url,
- event=message_json,
- status_code=resp.status_code))
+ if self.fh is not None:
+ self.fh.close()
+ except IOError as exc:
+ if exc.errno != EBADF:
+ raise
+ finally:
+ if os.path.isfile(self.filename) and remove_file:
+ os.remove(self.filename)
+
+ def __enter__(self):
+ self.fh = open(self.filename, 'a+')
+ try:
+ fcntl.flock(self.fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError as exc:
+ self.cleanup(remove_file=False)
+ raise PidFileLockFailed(exc)
+
+ self.fh.seek(0)
+ self.fh.truncate()
+ self.fh.write("%d\n" % self.pid)
+ self.fh.flush()
+ self.fh.seek(0)
+ return self
+
+ def __exit__(self, _exc_type, _exc_value, _traceback):
+ self.cleanup()
+
+
+def webhook_monitor(proc_queue, webhooks):
+ queues = {}
+ for url, data in webhooks.items():
+ if isinstance(data, str):
+ token = data
+ secret = None
+ else:
+ token = data["token"]
+ secret = data["secret"]
+
+ queues[url] = Queue()
+ t = Thread(target=publish_to_webhook, args=(url, token, secret,
+ queues[url]))
+ t.start()
+
+ # Get the message sent to Process queue and distribute to all thread queues
+ while True:
+ message = proc_queue.get()
+ for _, q in queues.items():
+ q.put(message)
+
+
+class WebhookThreadPool(object):
+ def start(self):
+ # Separate process to emit messages to webhooks
+ # which maintains one thread per webhook. Separate
+ # process is required since on reload we need to stop
+ # and start the thread pool. In Python Threads can't be stopped
+ # so terminate the process and start again. Note: In transit
+ # events will be missed during reload
+ self.queue = multiprocessing.Queue()
+ self.proc = multiprocessing.Process(target=webhook_monitor,
+ args=(self.queue, _webhooks))
+ self.proc.start()
+
+ def restart(self):
+ # In transit messages are skipped, since webhooks monitor process
+ # is terminated.
+ self.proc.terminate()
+ self.start()
+
+ def send(self, event_type, event_ts, message):
+ self.queue.put((event_type, event_ts, message))
+
+
+def init_webhook_pool():
+ global webhooks_pool
+ webhooks_pool = WebhookThreadPool()
+ webhooks_pool.start()
+
+
+def restart_webhook_pool():
+ global webhooks_pool
+ if webhooks_pool is not None:
+ webhooks_pool.restart()
diff --git a/events/tools/Makefile.am b/events/tools/Makefile.am
index 7d5e331e4e1..fb6770cf070 100644
--- a/events/tools/Makefile.am
+++ b/events/tools/Makefile.am
@@ -1,3 +1,6 @@
+EXTRA_DIST = eventsdash.py
+
+if BUILD_EVENTS
scriptsdir = $(datadir)/glusterfs/scripts
scripts_SCRIPTS = eventsdash.py
-EXTRA_DIST = eventsdash.py
+endif
diff --git a/events/tools/eventsdash.py b/events/tools/eventsdash.py
index 47fc56dda6e..6479ea59da6 100644
--- a/events/tools/eventsdash.py
+++ b/events/tools/eventsdash.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -10,6 +10,7 @@
# cases as published by the Free Software Foundation.
#
+from __future__ import print_function
from argparse import ArgumentParser, RawDescriptionHelpFormatter
import logging
from datetime import datetime
@@ -41,11 +42,11 @@ def listen():
for k, v in data.get("message", {}).items():
message.append("{0}={1}".format(k, v))
- print ("{0:20s} {1:20s} {2:36} {3}".format(
+ print(("{0:20s} {1:20s} {2:36} {3}".format(
human_time(data.get("ts")),
data.get("event"),
data.get("nodeid"),
- " ".join(message)))
+ " ".join(message))))
return "OK"
@@ -58,12 +59,12 @@ def main():
action="store_true")
args = parser.parse_args()
- print ("{0:20s} {1:20s} {2:36} {3}".format(
+ print(("{0:20s} {1:20s} {2:36} {3}".format(
"TIMESTAMP", "EVENT", "NODE ID", "MESSAGE"
- ))
- print ("{0:20s} {1:20s} {2:36} {3}".format(
+ )))
+ print(("{0:20s} {1:20s} {2:36} {3}".format(
"-"*20, "-"*20, "-"*36, "-"*20
- ))
+ )))
if args.debug:
app.debug = True