summaryrefslogtreecommitdiffstats
path: root/events/src
diff options
context:
space:
mode:
Diffstat (limited to 'events/src')
-rw-r--r--events/src/Makefile.am12
-rw-r--r--events/src/eventsapiconf.py.in41
-rw-r--r--events/src/eventsconfig.json5
-rw-r--r--events/src/gf_event.py19
-rw-r--r--events/src/glustereventsd.py57
-rw-r--r--events/src/handlers.py2
-rw-r--r--events/src/peer_eventsapi.py418
-rw-r--r--events/src/utils.py241
8 files changed, 620 insertions, 175 deletions
diff --git a/events/src/Makefile.am b/events/src/Makefile.am
index 87282c6c6f7..3b229691897 100644
--- a/events/src/Makefile.am
+++ b/events/src/Makefile.am
@@ -5,15 +5,19 @@ EXTRA_DIST = glustereventsd.py __init__.py eventsapiconf.py.in \
BUILT_SOURCES = eventtypes.py
CLEANFILES = eventtypes.py
-eventsdir = $(libexecdir)/glusterfs/events
+eventsdir = $(GLUSTERFS_LIBEXECDIR)/gfevents
+if BUILD_EVENTS
events_PYTHON = __init__.py gf_event.py eventsapiconf.py eventtypes.py \
utils.py
+endif
+# this does not work, see the Makefile.am in the root for a workaround
+#nodist_events_PYTHON = eventtypes.py
eventtypes.py: $(top_srcdir)/events/eventskeygen.py
$(PYTHON) $(top_srcdir)/events/eventskeygen.py PY_HEADER
if BUILD_EVENTS
-eventspeerscriptdir = $(libexecdir)/glusterfs
+eventspeerscriptdir = $(GLUSTERFS_LIBEXECDIR)
eventsconfdir = $(sysconfdir)/glusterfs
eventsconf_DATA = eventsconfig.json
@@ -24,10 +28,10 @@ eventspeerscript_SCRIPTS = peer_eventsapi.py
install-exec-hook:
$(mkdir_p) $(DESTDIR)$(sbindir)
rm -f $(DESTDIR)$(sbindir)/glustereventsd
- ln -s $(libexecdir)/glusterfs/events/glustereventsd.py \
+ ln -s $(GLUSTERFS_LIBEXECDIR)/gfevents/glustereventsd.py \
$(DESTDIR)$(sbindir)/glustereventsd
rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi
- ln -s $(libexecdir)/glusterfs/peer_eventsapi.py \
+ ln -s $(GLUSTERFS_LIBEXECDIR)/peer_eventsapi.py \
$(DESTDIR)$(sbindir)/gluster-eventsapi
uninstall-hook:
diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in
index e0028761bcc..700093bee60 100644
--- a/events/src/eventsapiconf.py.in
+++ b/events/src/eventsapiconf.py.in
@@ -9,20 +9,51 @@
# cases as published by the Free Software Foundation.
#
+import subprocess
+glusterd_workdir = None
+
+# Methods
+def get_glusterd_workdir():
+ global glusterd_workdir
+ if glusterd_workdir is not None:
+ return glusterd_workdir
+ proc = subprocess.Popen(["gluster", "system::", "getwd"],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines = True)
+ out, err = proc.communicate()
+ if proc.returncode == 0:
+ glusterd_workdir = out.strip()
+ else:
+ glusterd_workdir = "@GLUSTERD_WORKDIR@"
+ return glusterd_workdir
+
SERVER_ADDRESS = "0.0.0.0"
+SERVER_ADDRESSv4 = "0.0.0.0"
+SERVER_ADDRESSv6 = "::1"
DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json"
CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json"
-CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC
+CUSTOM_CONFIG_FILE = get_glusterd_workdir() + CUSTOM_CONFIG_FILE_TO_SYNC
WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json"
-WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC
+WEBHOOKS_FILE = get_glusterd_workdir() + WEBHOOKS_FILE_TO_SYNC
LOG_FILE = "@localstatedir@/log/glusterfs/events.log"
EVENTSD = "glustereventsd"
-CONFIG_KEYS = ["log_level", "port"]
-BOOL_CONFIGS = []
+CONFIG_KEYS = ["log-level", "port", "disable-events-log"]
+BOOL_CONFIGS = ["disable-events-log"]
INT_CONFIGS = ["port"]
RESTART_CONFIGS = ["port"]
EVENTS_ENABLED = @EVENTS_ENABLED@
-UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info"
+UUID_FILE = get_glusterd_workdir() + "/glusterd.info"
PID_FILE = "@localstatedir@/run/glustereventsd.pid"
AUTO_BOOL_ATTRIBUTES = ["force", "push-pem", "no-verify"]
AUTO_INT_ATTRIBUTES = ["ssh-port"]
+CERTS_DIR = get_glusterd_workdir() + "/events"
+
+# Errors
+ERROR_SAME_CONFIG = 2
+ERROR_ALL_NODES_STATUS_NOT_OK = 3
+ERROR_PARTIAL_SUCCESS = 4
+ERROR_WEBHOOK_ALREADY_EXISTS = 5
+ERROR_WEBHOOK_NOT_EXISTS = 6
+ERROR_INVALID_CONFIG = 7
+ERROR_WEBHOOK_SYNC_FAILED = 8
+ERROR_CONFIG_SYNC_FAILED = 9
diff --git a/events/src/eventsconfig.json b/events/src/eventsconfig.json
index 45730f9bb83..89e5b9c1d68 100644
--- a/events/src/eventsconfig.json
+++ b/events/src/eventsconfig.json
@@ -1,4 +1,5 @@
{
- "log_level": "INFO",
- "port": 24009
+ "log-level": "INFO",
+ "port": 24009,
+ "disable-events-log": false
}
diff --git a/events/src/gf_event.py b/events/src/gf_event.py
index f9ece6adc28..260b0d9aa48 100644
--- a/events/src/gf_event.py
+++ b/events/src/gf_event.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -13,10 +12,10 @@
import socket
import time
-from eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED
-from eventtypes import all_events
+from gfevents.eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED
+from gfevents.eventtypes import all_events
-from utils import logger, setup_logger, get_config
+from gfevents.utils import logger, setup_logger, get_config
# Run this when this lib loads
setup_logger()
@@ -36,18 +35,18 @@ def gf_event(event_type, **kwargs):
logger.error("Unable to connect to events Server: {0}".format(e))
return
+ port = get_config("port")
+ if port is None:
+ logger.error("Unable to get eventsd port details")
+ return
+
# Convert key value args into KEY1=VALUE1;KEY2=VALUE2;..
msg = ""
for k, v in kwargs.items():
msg += "{0}={1};".format(k, v)
# <TIMESTAMP> <EVENT_TYPE> <MSG>
- msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";"))
-
- port = get_config("port")
- if port is None:
- logger.error("Unable to get eventsd port details")
- return
+ msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";")).encode()
try:
sent = client.sendto(msg, (SERVER_ADDRESS, port))
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
index 86e64b01ad5..341a3b60947 100644
--- a/events/src/glustereventsd.py
+++ b/events/src/glustereventsd.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -13,22 +13,36 @@
from __future__ import print_function
import sys
import signal
-import SocketServer
+import threading
+try:
+ import socketserver
+except ImportError:
+ import SocketServer as socketserver
import socket
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from eventtypes import all_events
import handlers
import utils
-from eventsapiconf import SERVER_ADDRESS, PID_FILE
+from eventsapiconf import SERVER_ADDRESSv4, SERVER_ADDRESSv6, PID_FILE
from eventsapiconf import AUTO_BOOL_ATTRIBUTES, AUTO_INT_ATTRIBUTES
from utils import logger, PidFile, PidFileLockFailed, boolify
+# Subclass so that specifically IPv4 packets are captured
+class UDPServerv4(socketserver.ThreadingUDPServer):
+ address_family = socket.AF_INET
-class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
+# Subclass so that specifically IPv6 packets are captured
+class UDPServerv6(socketserver.ThreadingUDPServer):
+ address_family = socket.AF_INET6
+
+class GlusterEventsRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
+ if sys.version_info >= (3,):
+ data = self.request[0].strip().decode("utf-8")
+
logger.debug("EVENT: {0} from {1}".format(repr(data),
self.client_address[0]))
try:
@@ -46,7 +60,7 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
logger.warn("Unable to parse Event {0}".format(data))
return
- for k, v in data_dict.iteritems():
+ for k, v in data_dict.items():
try:
if k in AUTO_BOOL_ATTRIBUTES:
data_dict[k] = boolify(v)
@@ -57,9 +71,9 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
continue
try:
- # Event Type to Function Map, Recieved event data will be in
+ # Event Type to Function Map, Received event data will be in
# the form <TIMESTAMP> <TYPE> <DETAIL>, Get Event name for the
- # recieved Type/Key and construct a function name starting with
+ # received Type/Key and construct a function name starting with
# handle_ For example: handle_event_volume_create
func_name = "handle_" + all_events[int(key)].lower()
except IndexError:
@@ -80,26 +94,43 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
def signal_handler_sigusr2(sig, frame):
utils.load_all()
+ utils.restart_webhook_pool()
+
+
+def UDP_server_thread(sock):
+ sock.serve_forever()
def init_event_server():
utils.setup_logger()
utils.load_all()
+ utils.init_webhook_pool()
port = utils.get_config("port")
if port is None:
sys.stderr.write("Unable to get Port details from Config\n")
sys.exit(1)
- # Start the Eventing Server, UDP Server
+ # Creating the Eventing Server, UDP Server for IPv4 packets
+ try:
+ serverv4 = UDPServerv4((SERVER_ADDRESSv4, port),
+ GlusterEventsRequestHandler)
+ except socket.error as e:
+ sys.stderr.write("Failed to start Eventsd for IPv4: {0}\n".format(e))
+ sys.exit(1)
+ # Creating the Eventing Server, UDP Server for IPv6 packets
try:
- server = SocketServer.ThreadingUDPServer(
- (SERVER_ADDRESS, port),
- GlusterEventsRequestHandler)
+ serverv6 = UDPServerv6((SERVER_ADDRESSv6, port),
+ GlusterEventsRequestHandler)
except socket.error as e:
- sys.stderr.write("Failed to start Eventsd: {0}\n".format(e))
+ sys.stderr.write("Failed to start Eventsd for IPv6: {0}\n".format(e))
sys.exit(1)
- server.serve_forever()
+ server_thread1 = threading.Thread(target=UDP_server_thread,
+ args=(serverv4,))
+ server_thread2 = threading.Thread(target=UDP_server_thread,
+ args=(serverv6,))
+ server_thread1.start()
+ server_thread2.start()
def get_args():
diff --git a/events/src/handlers.py b/events/src/handlers.py
index 21d3e83de54..7746d488bf3 100644
--- a/events/src/handlers.py
+++ b/events/src/handlers.py
@@ -23,7 +23,7 @@ def generic_handler(ts, key, data):
def handle_event_volume_set(ts, key, data):
"""
- Recieved data will have all the options as one string, split into
+ Received data will have all the options as one string, split into
list of options. "key1,value1,key2,value2" into
[[key1, value1], [key2, value2]]
"""
diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py
index 7f80f791c4d..4d2e5f35b1c 100644
--- a/events/src/peer_eventsapi.py
+++ b/events/src/peer_eventsapi.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
@@ -17,26 +17,49 @@ from errno import EEXIST
import fcntl
from errno import EACCES, EAGAIN
import signal
+import sys
+import time
import requests
from prettytable import PrettyTable
-from gluster.cliutils import (Cmd, execute, node_output_ok, node_output_notok,
+from gluster.cliutils import (Cmd, node_output_ok, node_output_notok,
sync_file_to_peers, GlusterCmdException,
- output_error, execute_in_peers, runcli)
-from events.utils import LockedOpen
-
-from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
- WEBHOOKS_FILE,
- DEFAULT_CONFIG_FILE,
- CUSTOM_CONFIG_FILE,
- CUSTOM_CONFIG_FILE_TO_SYNC,
- EVENTSD,
- CONFIG_KEYS,
- BOOL_CONFIGS,
- INT_CONFIGS,
- PID_FILE,
- RESTART_CONFIGS)
+ output_error, execute_in_peers, runcli,
+ set_common_args_func)
+from gfevents.utils import LockedOpen, get_jwt_token, save_https_cert
+
+from gfevents.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
+ WEBHOOKS_FILE,
+ DEFAULT_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE_TO_SYNC,
+ EVENTSD,
+ CONFIG_KEYS,
+ BOOL_CONFIGS,
+ INT_CONFIGS,
+ PID_FILE,
+ RESTART_CONFIGS,
+ ERROR_INVALID_CONFIG,
+ ERROR_WEBHOOK_NOT_EXISTS,
+ ERROR_CONFIG_SYNC_FAILED,
+ ERROR_WEBHOOK_ALREADY_EXISTS,
+ ERROR_PARTIAL_SUCCESS,
+ ERROR_ALL_NODES_STATUS_NOT_OK,
+ ERROR_SAME_CONFIG,
+ ERROR_WEBHOOK_SYNC_FAILED,
+ CERTS_DIR)
+
+
+def handle_output_error(err, errcode=1, json_output=False):
+ if json_output:
+ print (json.dumps({
+ "output": "",
+ "error": err
+ }))
+ sys.exit(errcode)
+ else:
+ output_error(err, errcode)
def file_content_overwrite(fname, data):
@@ -46,15 +69,27 @@ def file_content_overwrite(fname, data):
os.rename(fname + ".tmp", fname)
-def create_custom_config_file_if_not_exists():
- mkdirp(os.path.dirname(CUSTOM_CONFIG_FILE))
+def create_custom_config_file_if_not_exists(args):
+ try:
+ config_dir = os.path.dirname(CUSTOM_CONFIG_FILE)
+ mkdirp(config_dir)
+ except OSError as e:
+ handle_output_error("Failed to create dir %s: %s" % (config_dir, e),
+ json_output=args.json)
+
if not os.path.exists(CUSTOM_CONFIG_FILE):
with open(CUSTOM_CONFIG_FILE, "w") as f:
f.write("{}")
-def create_webhooks_file_if_not_exists():
- mkdirp(os.path.dirname(WEBHOOKS_FILE))
+def create_webhooks_file_if_not_exists(args):
+ try:
+ webhooks_dir = os.path.dirname(WEBHOOKS_FILE)
+ mkdirp(webhooks_dir)
+ except OSError as e:
+ handle_output_error("Failed to create dir %s: %s" % (webhooks_dir, e),
+ json_output=args.json)
+
if not os.path.exists(WEBHOOKS_FILE):
with open(WEBHOOKS_FILE, "w") as f:
f.write("{}")
@@ -75,11 +110,9 @@ def mkdirp(path, exit_on_err=False, logger=None):
"""
try:
os.makedirs(path)
- except (OSError, IOError) as e:
- if e.errno == EEXIST and os.path.isdir(path):
- pass
- else:
- output_error("Fail to create dir %s: %s" % (path, e))
+ except OSError as e:
+ if e.errno != EEXIST or not os.path.isdir(path):
+ raise
def is_active():
@@ -111,33 +144,81 @@ def reload_service():
return (0, "", "")
-def sync_to_peers():
+def rows_to_json(json_out, column_name, rows):
+ num_ok_rows = 0
+ for row in rows:
+ num_ok_rows += 1 if row.ok else 0
+ json_out.append({
+ "node": row.hostname,
+ "node_status": "UP" if row.node_up else "DOWN",
+ column_name: "OK" if row.ok else "NOT OK",
+ "error": row.error
+ })
+ return num_ok_rows
+
+
+def rows_to_table(table, rows):
+ num_ok_rows = 0
+ for row in rows:
+ num_ok_rows += 1 if row.ok else 0
+ table.add_row([row.hostname,
+ "UP" if row.node_up else "DOWN",
+ "OK" if row.ok else "NOT OK: {0}".format(
+ row.error)])
+ return num_ok_rows
+
+
+def sync_to_peers(args):
if os.path.exists(WEBHOOKS_FILE):
try:
sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC)
except GlusterCmdException as e:
- output_error("Failed to sync Webhooks file: [Error: {0}]"
- "{1}".format(e[0], e[2]))
+ # Print stdout if stderr is empty
+ errmsg = e.message[2] if e.message[2] else e.message[1]
+ handle_output_error("Failed to sync Webhooks file: [Error: {0}]"
+ "{1}".format(e.message[0], errmsg),
+ errcode=ERROR_WEBHOOK_SYNC_FAILED,
+ json_output=args.json)
if os.path.exists(CUSTOM_CONFIG_FILE):
try:
sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC)
except GlusterCmdException as e:
- output_error("Failed to sync Config file: [Error: {0}]"
- "{1}".format(e[0], e[2]))
+ # Print stdout if stderr is empty
+ errmsg = e.message[2] if e.message[2] else e.message[1]
+ handle_output_error("Failed to sync Config file: [Error: {0}]"
+ "{1}".format(e.message[0], errmsg),
+ errcode=ERROR_CONFIG_SYNC_FAILED,
+ json_output=args.json)
out = execute_in_peers("node-reload")
- table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"])
- table.align["NODE STATUS"] = "r"
- table.align["SYNC STATUS"] = "r"
+ if not args.json:
+ table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"])
+ table.align["NODE STATUS"] = "r"
+ table.align["SYNC STATUS"] = "r"
- for p in out:
- table.add_row([p.hostname,
- "UP" if p.node_up else "DOWN",
- "OK" if p.ok else "NOT OK: {0}".format(
- p.error)])
+ json_out = []
+ if args.json:
+ num_ok_rows = rows_to_json(json_out, "sync_status", out)
+ else:
+ num_ok_rows = rows_to_table(table, out)
+
+ ret = 0
+ if num_ok_rows == 0:
+ ret = ERROR_ALL_NODES_STATUS_NOT_OK
+ elif num_ok_rows != len(out):
+ ret = ERROR_PARTIAL_SUCCESS
+
+ if args.json:
+ print (json.dumps({
+ "output": json_out,
+ "error": ""
+ }))
+ else:
+ print (table)
- print (table)
+ # If sync status is not ok for any node set error code as partial success
+ sys.exit(ret)
def node_output_handle(resp):
@@ -148,29 +229,24 @@ def node_output_handle(resp):
node_output_notok(err)
-def action_handle(action):
+def action_handle(action, json_output=False):
out = execute_in_peers("node-" + action)
column_name = action.upper()
if action == "status":
column_name = EVENTSD.upper()
- table = PrettyTable(["NODE", "NODE STATUS", column_name + " STATUS"])
- table.align["NODE STATUS"] = "r"
- table.align[column_name + " STATUS"] = "r"
-
- for p in out:
- status_col_val = "OK" if p.ok else "NOT OK: {0}".format(
- p.error)
- if action == "status":
- status_col_val = "DOWN"
- if p.ok:
- status_col_val = p.output
+ if not json_output:
+ table = PrettyTable(["NODE", "NODE STATUS", column_name + " STATUS"])
+ table.align["NODE STATUS"] = "r"
+ table.align[column_name + " STATUS"] = "r"
- table.add_row([p.hostname,
- "UP" if p.node_up else "DOWN",
- status_col_val])
+ json_out = []
+ if json_output:
+ rows_to_json(json_out, column_name.lower() + "_status", out)
+ else:
+ rows_to_table(table, out)
- print (table)
+ return json_out if json_output else table
class NodeReload(Cmd):
@@ -184,7 +260,14 @@ class ReloadCmd(Cmd):
name = "reload"
def run(self, args):
- action_handle("reload")
+ out = action_handle("reload", args.json)
+ if args.json:
+ print (json.dumps({
+ "output": out,
+ "error": ""
+ }))
+ else:
+ print (out)
class NodeStatus(Cmd):
@@ -202,12 +285,25 @@ class StatusCmd(Cmd):
if os.path.exists(WEBHOOKS_FILE):
webhooks = json.load(open(WEBHOOKS_FILE))
- print ("Webhooks: " + ("" if webhooks else "None"))
- for w in webhooks:
- print (w)
-
- print ()
- action_handle("status")
+ json_out = {"webhooks": [], "data": []}
+ if args.json:
+ json_out["webhooks"] = webhooks.keys()
+ else:
+ print ("Webhooks: " + ("" if webhooks else "None"))
+ for w in webhooks:
+ print (w)
+
+ print ()
+
+ out = action_handle("status", args.json)
+ if args.json:
+ json_out["data"] = out
+ print (json.dumps({
+ "output": json_out,
+ "error": ""
+ }))
+ else:
+ print (out)
class WebhookAddCmd(Cmd):
@@ -217,19 +313,24 @@ class WebhookAddCmd(Cmd):
parser.add_argument("url", help="URL of Webhook")
parser.add_argument("--bearer_token", "-t", help="Bearer Token",
default="")
+ parser.add_argument("--secret", "-s",
+ help="Secret to add JWT Bearer Token", default="")
def run(self, args):
- create_webhooks_file_if_not_exists()
+ create_webhooks_file_if_not_exists(args)
with LockedOpen(WEBHOOKS_FILE, 'r+'):
data = json.load(open(WEBHOOKS_FILE))
if data.get(args.url, None) is not None:
- output_error("Webhook already exists")
+ handle_output_error("Webhook already exists",
+ errcode=ERROR_WEBHOOK_ALREADY_EXISTS,
+ json_output=args.json)
- data[args.url] = args.bearer_token
+ data[args.url] = {"token": args.bearer_token,
+ "secret": args.secret}
file_content_overwrite(WEBHOOKS_FILE, data)
- sync_to_peers()
+ sync_to_peers(args)
class WebhookModCmd(Cmd):
@@ -239,19 +340,31 @@ class WebhookModCmd(Cmd):
parser.add_argument("url", help="URL of Webhook")
parser.add_argument("--bearer_token", "-t", help="Bearer Token",
default="")
+ parser.add_argument("--secret", "-s",
+ help="Secret to add JWT Bearer Token", default="")
def run(self, args):
- create_webhooks_file_if_not_exists()
+ create_webhooks_file_if_not_exists(args)
with LockedOpen(WEBHOOKS_FILE, 'r+'):
data = json.load(open(WEBHOOKS_FILE))
if data.get(args.url, None) is None:
- output_error("Webhook does not exists")
+ handle_output_error("Webhook does not exists",
+ errcode=ERROR_WEBHOOK_NOT_EXISTS,
+ json_output=args.json)
+
+ if isinstance(data[args.url], str):
+ data[args.url]["token"] = data[args.url]
+
+ if args.bearer_token != "":
+ data[args.url]["token"] = args.bearer_token
+
+ if args.secret != "":
+ data[args.url]["secret"] = args.secret
- data[args.url] = args.bearer_token
file_content_overwrite(WEBHOOKS_FILE, data)
- sync_to_peers()
+ sync_to_peers(args)
class WebhookDelCmd(Cmd):
@@ -261,17 +374,19 @@ class WebhookDelCmd(Cmd):
parser.add_argument("url", help="URL of Webhook")
def run(self, args):
- create_webhooks_file_if_not_exists()
+ create_webhooks_file_if_not_exists(args)
with LockedOpen(WEBHOOKS_FILE, 'r+'):
data = json.load(open(WEBHOOKS_FILE))
if data.get(args.url, None) is None:
- output_error("Webhook does not exists")
+ handle_output_error("Webhook does not exists",
+ errcode=ERROR_WEBHOOK_NOT_EXISTS,
+ json_output=args.json)
del data[args.url]
file_content_overwrite(WEBHOOKS_FILE, data)
- sync_to_peers()
+ sync_to_peers(args)
class NodeWebhookTestCmd(Cmd):
@@ -280,16 +395,57 @@ class NodeWebhookTestCmd(Cmd):
def args(self, parser):
parser.add_argument("url")
parser.add_argument("bearer_token")
+ parser.add_argument("secret")
def run(self, args):
http_headers = {}
+ hashval = ""
if args.bearer_token != ".":
- http_headers["Authorization"] = "Bearer " + args.bearer_token
+ hashval = args.bearer_token
- try:
- resp = requests.post(args.url, headers=http_headers)
- except requests.ConnectionError as e:
- node_output_notok("{0}".format(e))
+ if args.secret != ".":
+ hashval = get_jwt_token(args.secret, "TEST", int(time.time()))
+
+ if hashval:
+ http_headers["Authorization"] = "Bearer " + hashval
+
+ urldata = requests.utils.urlparse(args.url)
+ parts = urldata.netloc.split(":")
+ domain = parts[0]
+ # Default https port if not specified
+ port = 443
+ if len(parts) == 2:
+ port = int(parts[1])
+
+ cert_path = os.path.join(CERTS_DIR, args.url.replace("/", "_").strip())
+ verify = True
+ while True:
+ try:
+ resp = requests.post(args.url, headers=http_headers,
+ verify=verify)
+ # Successful webhook push
+ break
+ except requests.exceptions.SSLError as e:
+ # If verify is equal to cert path, but still failed with
+ # SSLError, Looks like some issue with custom downloaded
+ # certificate, Try with verify = false
+ if verify == cert_path:
+ verify = False
+ continue
+
+ # If verify is instance of bool and True, then custom cert
+ # is required, download the cert and retry
+ try:
+ save_https_cert(domain, port, cert_path)
+ verify = cert_path
+ except Exception:
+ verify = False
+
+ # Done with collecting cert, continue
+ continue
+ except Exception as e:
+ node_output_notok("{0}".format(e))
+ break
if resp.status_code != 200:
node_output_notok("{0}".format(resp.status_code))
@@ -303,28 +459,51 @@ class WebhookTestCmd(Cmd):
def args(self, parser):
parser.add_argument("url", help="URL of Webhook")
parser.add_argument("--bearer_token", "-t", help="Bearer Token")
+ parser.add_argument("--secret", "-s",
+ help="Secret to generate Bearer Token")
def run(self, args):
url = args.url
bearer_token = args.bearer_token
+ secret = args.secret
+
if not args.url:
url = "."
if not args.bearer_token:
bearer_token = "."
+ if not args.secret:
+ secret = "."
- out = execute_in_peers("node-webhook-test", [url, bearer_token])
+ out = execute_in_peers("node-webhook-test", [url, bearer_token,
+ secret])
- table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"])
- table.align["NODE STATUS"] = "r"
- table.align["WEBHOOK STATUS"] = "r"
+ if not args.json:
+ table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"])
+ table.align["NODE STATUS"] = "r"
+ table.align["WEBHOOK STATUS"] = "r"
- for p in out:
- table.add_row([p.hostname,
- "UP" if p.node_up else "DOWN",
- "OK" if p.ok else "NOT OK: {0}".format(
- p.error)])
+ num_ok_rows = 0
+ json_out = []
+ if args.json:
+ num_ok_rows = rows_to_json(json_out, "webhook_status", out)
+ else:
+ num_ok_rows = rows_to_table(table, out)
+
+ ret = 0
+ if num_ok_rows == 0:
+ ret = ERROR_ALL_NODES_STATUS_NOT_OK
+ elif num_ok_rows != len(out):
+ ret = ERROR_PARTIAL_SUCCESS
+
+ if args.json:
+ print (json.dumps({
+ "output": json_out,
+ "error": ""
+ }))
+ else:
+ print (table)
- print (table)
+ sys.exit(ret)
class ConfigGetCmd(Cmd):
@@ -339,16 +518,30 @@ class ConfigGetCmd(Cmd):
data.update(json.load(open(CUSTOM_CONFIG_FILE)))
if args.name is not None and args.name not in CONFIG_KEYS:
- output_error("Invalid Config item")
+ handle_output_error("Invalid Config item",
+ errcode=ERROR_INVALID_CONFIG,
+ json_output=args.json)
+
+ if args.json:
+ json_out = {}
+ if args.name is None:
+ json_out = data
+ else:
+ json_out[args.name] = data[args.name]
- table = PrettyTable(["NAME", "VALUE"])
- if args.name is None:
- for k, v in data.items():
- table.add_row([k, v])
+ print (json.dumps({
+ "output": json_out,
+ "error": ""
+ }))
else:
- table.add_row([args.name, data[args.name]])
+ table = PrettyTable(["NAME", "VALUE"])
+ if args.name is None:
+ for k, v in data.items():
+ table.add_row([k, v])
+ else:
+ table.add_row([args.name, data[args.name]])
- print (table)
+ print (table)
def read_file_content_json(fname):
@@ -370,9 +563,11 @@ class ConfigSetCmd(Cmd):
def run(self, args):
if args.name not in CONFIG_KEYS:
- output_error("Invalid Config item")
+ handle_output_error("Invalid Config item",
+ errcode=ERROR_INVALID_CONFIG,
+ json_output=args.json)
- create_custom_config_file_if_not_exists()
+ create_custom_config_file_if_not_exists(args)
with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'):
data = json.load(open(DEFAULT_CONFIG_FILE))
@@ -382,7 +577,9 @@ class ConfigSetCmd(Cmd):
# Do Nothing if same as previous value
if data[args.name] == args.value:
- return
+ handle_output_error("Config value not changed. Same config",
+ errcode=ERROR_SAME_CONFIG,
+ json_output=args.json)
# TODO: Validate Value
new_data = read_file_content_json(CUSTOM_CONFIG_FILE)
@@ -402,10 +599,11 @@ class ConfigSetCmd(Cmd):
if args.name in RESTART_CONFIGS:
restart = True
- sync_to_peers()
if restart:
print ("\nRestart glustereventsd in all nodes")
+ sync_to_peers(args)
+
class ConfigResetCmd(Cmd):
name = "config-reset"
@@ -414,7 +612,7 @@ class ConfigResetCmd(Cmd):
parser.add_argument("name", help="Config Name or all")
def run(self, args):
- create_custom_config_file_if_not_exists()
+ create_custom_config_file_if_not_exists(args)
with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'):
changed_keys = []
@@ -422,8 +620,14 @@ class ConfigResetCmd(Cmd):
if os.path.exists(CUSTOM_CONFIG_FILE):
data = read_file_content_json(CUSTOM_CONFIG_FILE)
- if not data:
- return
+ # If No data available in custom config or, the specific config
+ # item is not available in custom config
+ if not data or \
+ (args.name != "all" and data.get(args.name, None) is None):
+ handle_output_error("Config value not reset. Already "
+ "set to default value",
+ errcode=ERROR_SAME_CONFIG,
+ json_output=args.json)
if args.name.lower() == "all":
for k, v in data.items():
@@ -443,17 +647,23 @@ class ConfigResetCmd(Cmd):
restart = True
break
- sync_to_peers()
if restart:
print ("\nRestart glustereventsd in all nodes")
+ sync_to_peers(args)
+
class SyncCmd(Cmd):
name = "sync"
def run(self, args):
- sync_to_peers()
+ sync_to_peers(args)
+
+
+def common_args(parser):
+ parser.add_argument("--json", help="JSON Output", action="store_true")
if __name__ == "__main__":
+ set_common_args_func(common_args)
runcli()
diff --git a/events/src/utils.py b/events/src/utils.py
index e69d6577ff0..6d4e0791a2b 100644
--- a/events/src/utils.py
+++ b/events/src/utils.py
@@ -9,18 +9,34 @@
# cases as published by the Free Software Foundation.
#
+import sys
import json
import os
import logging
+import logging.handlers
import fcntl
-from errno import ESRCH, EBADF
-
-from eventsapiconf import (LOG_FILE,
- WEBHOOKS_FILE,
- DEFAULT_CONFIG_FILE,
- CUSTOM_CONFIG_FILE,
- UUID_FILE)
-import eventtypes
+from errno import EBADF
+from threading import Thread
+import multiprocessing
+try:
+ from queue import Queue
+except ImportError:
+ from Queue import Queue
+from datetime import datetime, timedelta
+import base64
+import hmac
+from hashlib import sha256
+from calendar import timegm
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from gfevents.eventsapiconf import (LOG_FILE,
+ WEBHOOKS_FILE,
+ DEFAULT_CONFIG_FILE,
+ CUSTOM_CONFIG_FILE,
+ UUID_FILE,
+ CERTS_DIR)
+from gfevents import eventtypes
# Webhooks list
@@ -34,6 +50,7 @@ _config = {}
# Init Logger instance
logger = logging.getLogger(__name__)
NodeID = None
+webhooks_pool = None
def boolify(value):
@@ -44,6 +61,12 @@ def boolify(value):
return False
+def log_event(data):
+ # Log all published events unless it is disabled
+ if not _config.get("disable-events-log", False):
+ logger.info(repr(data))
+
+
def get_node_uuid():
val = None
with open(UUID_FILE) as f:
@@ -54,10 +77,10 @@ def get_node_uuid():
return val
-def get_config(key):
+def get_config(key, default_value=None):
if not _config:
load_config()
- return _config.get(key, None)
+ return _config.get(key, default_value)
def get_event_type_name(idx):
@@ -76,7 +99,7 @@ def setup_logger():
logger.setLevel(logging.INFO)
# create the logging file handler
- fh = logging.FileHandler(LOG_FILE)
+ fh = logging.handlers.WatchedFileHandler(LOG_FILE)
formatter = logging.Formatter("[%(asctime)s] %(levelname)s "
"[%(module)s - %(lineno)s:%(funcName)s] "
@@ -107,7 +130,7 @@ def load_log_level():
be triggered during init and when SIGUSR2.
"""
global logger, _log_level
- new_log_level = _config.get("log_level", "INFO")
+ new_log_level = _config.get("log-level", "INFO")
if _log_level != new_log_level:
logger.setLevel(getattr(logging, new_log_level.upper()))
_log_level = new_log_level.upper()
@@ -149,6 +172,9 @@ def publish(ts, event_key, data):
"event": get_event_type_name(event_key),
"message": data
}
+
+ log_event(message)
+
if _webhooks:
plugin_webhook(message)
else:
@@ -170,35 +196,121 @@ def autoload_webhooks():
load_webhooks()
-def plugin_webhook(message):
+def base64_urlencode(inp):
+ return base64.urlsafe_b64encode(inp).replace("=", "").strip()
+
+
+def get_jwt_token(secret, event_type, event_ts, jwt_expiry_time_seconds=60):
+ exp = datetime.utcnow() + timedelta(seconds=jwt_expiry_time_seconds)
+ payload = {
+ "exp": timegm(exp.utctimetuple()),
+ "iss": "gluster",
+ "sub": event_type,
+ "iat": event_ts
+ }
+ header = '{"alg":"HS256","typ":"JWT"}'
+ payload = json.dumps(payload, separators=(',', ':'), sort_keys=True)
+ msg = base64_urlencode(header) + "." + base64_urlencode(payload)
+ return "%s.%s" % (
+ msg,
+ base64_urlencode(hmac.HMAC(str(secret), msg, sha256).digest())
+ )
+
+
+def save_https_cert(domain, port, cert_path):
+ import ssl
+
+ # Cert file already available for this URL
+ if os.path.exists(cert_path):
+ return
+
+ cert_data = ssl.get_server_certificate((domain, port))
+ with open(cert_path, "w") as f:
+ f.write(cert_data)
+
+
+def publish_to_webhook(url, token, secret, message_queue):
# Import requests here since not used in any other place
import requests
- message_json = json.dumps(message, sort_keys=True)
- logger.debug("EVENT: {0}".format(message_json))
- for url, token in _webhooks.items():
- http_headers = {"Content-Type": "application/json"}
+ http_headers = {"Content-Type": "application/json"}
+ urldata = requests.utils.urlparse(url)
+ parts = urldata.netloc.split(":")
+ domain = parts[0]
+ # Default https port if not specified
+ port = 443
+ if len(parts) == 2:
+ port = int(parts[1])
+
+ cert_path = os.path.join(CERTS_DIR, url.replace("/", "_").strip())
+
+ while True:
+ hashval = ""
+ event_type, event_ts, message_json = message_queue.get()
if token != "" and token is not None:
- http_headers["Authorization"] = "Bearer " + token
+ hashval = token
- try:
- resp = requests.post(url, headers=http_headers, data=message_json)
- except requests.ConnectionError as e:
- logger.warn("Event push failed to URL: {url}, "
- "Event: {event}, "
- "Status: {error}".format(
- url=url,
- event=message_json,
- error=e))
- continue
-
- if resp.status_code != 200:
- logger.warn("Event push failed to URL: {url}, "
- "Event: {event}, "
- "Status Code: {status_code}".format(
- url=url,
- event=message_json,
- status_code=resp.status_code))
+ if secret != "" and secret is not None:
+ hashval = get_jwt_token(secret, event_type, event_ts)
+
+ if hashval:
+ http_headers["Authorization"] = "Bearer " + hashval
+
+ verify = True
+ while True:
+ try:
+ resp = requests.post(url, headers=http_headers,
+ data=message_json,
+ verify=verify)
+ # Successful webhook push
+ message_queue.task_done()
+ if resp.status_code != 200:
+ logger.warn("Event push failed to URL: {url}, "
+ "Event: {event}, "
+ "Status Code: {status_code}".format(
+ url=url,
+ event=message_json,
+ status_code=resp.status_code))
+ break
+ except requests.exceptions.SSLError as e:
+ # If verify is equal to cert path, but still failed with
+ # SSLError, Looks like some issue with custom downloaded
+ # certificate, Try with verify = false
+ if verify == cert_path:
+ logger.warn("Event push failed with certificate, "
+ "ignoring verification url={0} "
+ "Error={1}".format(url, e))
+ verify = False
+ continue
+
+ # If verify is instance of bool and True, then custom cert
+ # is required, download the cert and retry
+ try:
+ save_https_cert(domain, port, cert_path)
+ verify = cert_path
+ except Exception as ex:
+ verify = False
+ logger.warn("Unable to get Server certificate, "
+ "ignoring verification url={0} "
+ "Error={1}".format(url, ex))
+
+ # Done with collecting cert, continue
+ continue
+ except Exception as e:
+ logger.warn("Event push failed to URL: {url}, "
+ "Event: {event}, "
+ "Status: {error}".format(
+ url=url,
+ event=message_json,
+ error=e))
+ message_queue.task_done()
+ break
+
+
+def plugin_webhook(message):
+ message_json = json.dumps(message, sort_keys=True)
+ logger.debug("EVENT: {0}".format(message_json))
+ webhooks_pool.send(message["event"], message["ts"], message_json)
class LockedOpen(object):
@@ -274,3 +386,60 @@ class PidFile(object):
def __exit__(self, _exc_type, _exc_value, _traceback):
self.cleanup()
+
+
+def webhook_monitor(proc_queue, webhooks):
+ queues = {}
+ for url, data in webhooks.items():
+ if isinstance(data, str):
+ token = data
+ secret = None
+ else:
+ token = data["token"]
+ secret = data["secret"]
+
+ queues[url] = Queue()
+ t = Thread(target=publish_to_webhook, args=(url, token, secret,
+ queues[url]))
+ t.start()
+
+ # Get the message sent to Process queue and distribute to all thread queues
+ while True:
+ message = proc_queue.get()
+ for _, q in queues.items():
+ q.put(message)
+
+
+class WebhookThreadPool(object):
+ def start(self):
+ # Separate process to emit messages to webhooks
+ # which maintains one thread per webhook. Separate
+ # process is required since on reload we need to stop
+ # and start the thread pool. In Python Threads can't be stopped
+ # so terminate the process and start again. Note: In transit
+ # events will be missed during reload
+ self.queue = multiprocessing.Queue()
+ self.proc = multiprocessing.Process(target=webhook_monitor,
+ args=(self.queue, _webhooks))
+ self.proc.start()
+
+ def restart(self):
+ # In transit messages are skipped, since webhooks monitor process
+ # is terminated.
+ self.proc.terminate()
+ self.start()
+
+ def send(self, event_type, event_ts, message):
+ self.queue.put((event_type, event_ts, message))
+
+
+def init_webhook_pool():
+ global webhooks_pool
+ webhooks_pool = WebhookThreadPool()
+ webhooks_pool.start()
+
+
+def restart_webhook_pool():
+ global webhooks_pool
+ if webhooks_pool is not None:
+ webhooks_pool.restart()