diff options
Diffstat (limited to 'events/src/peer_eventsapi.py')
| -rw-r--r-- | events/src/peer_eventsapi.py | 562 |
1 files changed, 355 insertions, 207 deletions
diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py index 7887d77351c..4d2e5f35b1c 100644 --- a/events/src/peer_eventsapi.py +++ b/events/src/peer_eventsapi.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> @@ -14,24 +14,52 @@ from __future__ import print_function import os import json from errno import EEXIST +import fcntl +from errno import EACCES, EAGAIN +import signal +import sys +import time import requests -import fasteners from prettytable import PrettyTable -from gluster.cliutils import (Cmd, execute, node_output_ok, node_output_notok, +from gluster.cliutils import (Cmd, node_output_ok, node_output_notok, sync_file_to_peers, GlusterCmdException, - output_error, execute_in_peers, runcli) - -from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, - WEBHOOKS_FILE, - DEFAULT_CONFIG_FILE, - CUSTOM_CONFIG_FILE, - CUSTOM_CONFIG_FILE_TO_SYNC, - EVENTSD, - CONFIG_KEYS, - BOOL_CONFIGS, - RESTART_CONFIGS) + output_error, execute_in_peers, runcli, + set_common_args_func) +from gfevents.utils import LockedOpen, get_jwt_token, save_https_cert + +from gfevents.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE, + CUSTOM_CONFIG_FILE_TO_SYNC, + EVENTSD, + CONFIG_KEYS, + BOOL_CONFIGS, + INT_CONFIGS, + PID_FILE, + RESTART_CONFIGS, + ERROR_INVALID_CONFIG, + ERROR_WEBHOOK_NOT_EXISTS, + ERROR_CONFIG_SYNC_FAILED, + ERROR_WEBHOOK_ALREADY_EXISTS, + ERROR_PARTIAL_SUCCESS, + ERROR_ALL_NODES_STATUS_NOT_OK, + ERROR_SAME_CONFIG, + ERROR_WEBHOOK_SYNC_FAILED, + CERTS_DIR) + + +def handle_output_error(err, errcode=1, json_output=False): + if json_output: + print (json.dumps({ + "output": "", + "error": err + })) + sys.exit(errcode) + else: + output_error(err, errcode) def file_content_overwrite(fname, data): @@ -41,15 +69,27 @@ def file_content_overwrite(fname, data): os.rename(fname + ".tmp", fname) -def create_custom_config_file_if_not_exists(): - mkdirp(os.path.dirname(CUSTOM_CONFIG_FILE)) +def create_custom_config_file_if_not_exists(args): + try: + config_dir = os.path.dirname(CUSTOM_CONFIG_FILE) + mkdirp(config_dir) + except OSError as e: + handle_output_error("Failed to create dir %s: %s" % (config_dir, e), + json_output=args.json) + if not os.path.exists(CUSTOM_CONFIG_FILE): with open(CUSTOM_CONFIG_FILE, "w") as f: f.write("{}") -def create_webhooks_file_if_not_exists(): - mkdirp(os.path.dirname(WEBHOOKS_FILE)) +def create_webhooks_file_if_not_exists(args): + try: + webhooks_dir = os.path.dirname(WEBHOOKS_FILE) + mkdirp(webhooks_dir) + except OSError as e: + handle_output_error("Failed to create dir %s: %s" % (webhooks_dir, e), + json_output=args.json) + if not os.path.exists(WEBHOOKS_FILE): with open(WEBHOOKS_FILE, "w") as f: f.write("{}") @@ -70,104 +110,115 @@ def mkdirp(path, exit_on_err=False, logger=None): """ try: os.makedirs(path) - except (OSError, IOError) as e: - if e.errno == EEXIST and os.path.isdir(path): - pass - else: - output_error("Fail to create dir %s: %s" % (path, e)) - - -def is_enabled(service): - rc, out, err = execute(["systemctl", "is-enabled", service]) - return rc == 0 - - -def is_active(service): - rc, out, err = execute(["systemctl", "is-active", service]) - return rc == 0 + except OSError as e: + if e.errno != EEXIST or not os.path.isdir(path): + raise -def enable_service(service): - if not is_enabled(service): - cmd = ["systemctl", "enable", service] - return execute(cmd) - - return (0, "", "") +def is_active(): + state = False + try: + with open(PID_FILE, "a+") as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + state = False + except (IOError, OSError) as e: + if e.errno in (EACCES, EAGAIN): + # cannot grab. so, process still running..move on + state = True + else: + state = False + return state -def disable_service(service): - if is_enabled(service): - cmd = ["systemctl", "disable", service] - return execute(cmd) +def reload_service(): + pid = None + if is_active(): + with open(PID_FILE) as f: + try: + pid = int(f.read().strip()) + except ValueError: + pid = None + if pid is not None: + os.kill(pid, signal.SIGUSR2) return (0, "", "") -def start_service(service): - rc, out, err = enable_service(service) - if rc != 0: - return (rc, out, err) - - cmd = ["systemctl", "start", service] - return execute(cmd) - - -def stop_service(service): - rc, out, err = disable_service(service) - if rc != 0: - return (rc, out, err) - - cmd = ["systemctl", "stop", service] - return execute(cmd) - - -def restart_service(service): - rc, out, err = stop_service(service) - if rc != 0: - return (rc, out, err) +def rows_to_json(json_out, column_name, rows): + num_ok_rows = 0 + for row in rows: + num_ok_rows += 1 if row.ok else 0 + json_out.append({ + "node": row.hostname, + "node_status": "UP" if row.node_up else "DOWN", + column_name: "OK" if row.ok else "NOT OK", + "error": row.error + }) + return num_ok_rows - return start_service(service) +def rows_to_table(table, rows): + num_ok_rows = 0 + for row in rows: + num_ok_rows += 1 if row.ok else 0 + table.add_row([row.hostname, + "UP" if row.node_up else "DOWN", + "OK" if row.ok else "NOT OK: {0}".format( + row.error)]) + return num_ok_rows -def reload_service(service): - if is_active(service): - cmd = ["systemctl", "reload", service] - return execute(cmd) - return (0, "", "") - - -def sync_to_peers(restart=False): +def sync_to_peers(args): if os.path.exists(WEBHOOKS_FILE): try: sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC) except GlusterCmdException as e: - output_error("Failed to sync Webhooks file: [Error: {0}]" - "{1}".format(e[0], e[2])) + # Print stdout if stderr is empty + errmsg = e.message[2] if e.message[2] else e.message[1] + handle_output_error("Failed to sync Webhooks file: [Error: {0}]" + "{1}".format(e.message[0], errmsg), + errcode=ERROR_WEBHOOK_SYNC_FAILED, + json_output=args.json) if os.path.exists(CUSTOM_CONFIG_FILE): try: sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC) except GlusterCmdException as e: - output_error("Failed to sync Config file: [Error: {0}]" - "{1}".format(e[0], e[2])) - - action = "node-reload" - if restart: - action = "node-restart" - - out = execute_in_peers(action) - table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"]) - table.align["NODE STATUS"] = "r" - table.align["SYNC STATUS"] = "r" + # Print stdout if stderr is empty + errmsg = e.message[2] if e.message[2] else e.message[1] + handle_output_error("Failed to sync Config file: [Error: {0}]" + "{1}".format(e.message[0], errmsg), + errcode=ERROR_CONFIG_SYNC_FAILED, + json_output=args.json) + + out = execute_in_peers("node-reload") + if not args.json: + table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"]) + table.align["NODE STATUS"] = "r" + table.align["SYNC STATUS"] = "r" - for p in out: - table.add_row([p.hostname, - "UP" if p.node_up else "DOWN", - "OK" if p.ok else "NOT OK: {0}".format( - p.error)]) + json_out = [] + if args.json: + num_ok_rows = rows_to_json(json_out, "sync_status", out) + else: + num_ok_rows = rows_to_table(table, out) + + ret = 0 + if num_ok_rows == 0: + ret = ERROR_ALL_NODES_STATUS_NOT_OK + elif num_ok_rows != len(out): + ret = ERROR_PARTIAL_SUCCESS + + if args.json: + print (json.dumps({ + "output": json_out, + "error": "" + })) + else: + print (table) - print (table) + # If sync status is not ok for any node set error code as partial success + sys.exit(ret) def node_output_handle(resp): @@ -178,92 +229,52 @@ def node_output_handle(resp): node_output_notok(err) -def action_handle(action): +def action_handle(action, json_output=False): out = execute_in_peers("node-" + action) column_name = action.upper() if action == "status": column_name = EVENTSD.upper() - table = PrettyTable(["NODE", "NODE STATUS", column_name + " STATUS"]) - table.align["NODE STATUS"] = "r" - table.align[column_name + " STATUS"] = "r" - - for p in out: - status_col_val = "OK" if p.ok else "NOT OK: {0}".format( - p.error) - if action == "status": - status_col_val = "DOWN" - if p.ok: - status_col_val = p.output - - table.add_row([p.hostname, - "UP" if p.node_up else "DOWN", - status_col_val]) - - print (table) - - -class NodeStart(Cmd): - name = "node-start" - - def run(self, args): - node_output_handle(start_service(EVENTSD)) - - -class StartCmd(Cmd): - name = "start" - - def run(self, args): - action_handle("start") - - -class NodeStop(Cmd): - name = "node-stop" - - def run(self, args): - node_output_handle(stop_service(EVENTSD)) - - -class StopCmd(Cmd): - name = "stop" - - def run(self, args): - action_handle("stop") - - -class NodeRestart(Cmd): - name = "node-restart" - - def run(self, args): - node_output_handle(restart_service(EVENTSD)) - + if not json_output: + table = PrettyTable(["NODE", "NODE STATUS", column_name + " STATUS"]) + table.align["NODE STATUS"] = "r" + table.align[column_name + " STATUS"] = "r" -class RestartCmd(Cmd): - name = "restart" + json_out = [] + if json_output: + rows_to_json(json_out, column_name.lower() + "_status", out) + else: + rows_to_table(table, out) - def run(self, args): - action_handle("restart") + return json_out if json_output else table class NodeReload(Cmd): name = "node-reload" def run(self, args): - node_output_handle(reload_service(EVENTSD)) + node_output_handle(reload_service()) class ReloadCmd(Cmd): name = "reload" def run(self, args): - action_handle("reload") + out = action_handle("reload", args.json) + if args.json: + print (json.dumps({ + "output": out, + "error": "" + })) + else: + print (out) class NodeStatus(Cmd): name = "node-status" def run(self, args): - node_output_ok("UP" if is_active(EVENTSD) else "DOWN") + node_output_ok("UP" if is_active() else "DOWN") class StatusCmd(Cmd): @@ -274,12 +285,25 @@ class StatusCmd(Cmd): if os.path.exists(WEBHOOKS_FILE): webhooks = json.load(open(WEBHOOKS_FILE)) - print ("Webhooks: " + ("" if webhooks else "None")) - for w in webhooks: - print (w) - - print () - action_handle("status") + json_out = {"webhooks": [], "data": []} + if args.json: + json_out["webhooks"] = webhooks.keys() + else: + print ("Webhooks: " + ("" if webhooks else "None")) + for w in webhooks: + print (w) + + print () + + out = action_handle("status", args.json) + if args.json: + json_out["data"] = out + print (json.dumps({ + "output": json_out, + "error": "" + })) + else: + print (out) class WebhookAddCmd(Cmd): @@ -289,19 +313,24 @@ class WebhookAddCmd(Cmd): parser.add_argument("url", help="URL of Webhook") parser.add_argument("--bearer_token", "-t", help="Bearer Token", default="") + parser.add_argument("--secret", "-s", + help="Secret to add JWT Bearer Token", default="") def run(self, args): - create_webhooks_file_if_not_exists() + create_webhooks_file_if_not_exists(args) - with fasteners.InterProcessLock(WEBHOOKS_FILE): + with LockedOpen(WEBHOOKS_FILE, 'r+'): data = json.load(open(WEBHOOKS_FILE)) if data.get(args.url, None) is not None: - output_error("Webhook already exists") + handle_output_error("Webhook already exists", + errcode=ERROR_WEBHOOK_ALREADY_EXISTS, + json_output=args.json) - data[args.url] = args.bearer_token + data[args.url] = {"token": args.bearer_token, + "secret": args.secret} file_content_overwrite(WEBHOOKS_FILE, data) - sync_to_peers() + sync_to_peers(args) class WebhookModCmd(Cmd): @@ -311,19 +340,31 @@ class WebhookModCmd(Cmd): parser.add_argument("url", help="URL of Webhook") parser.add_argument("--bearer_token", "-t", help="Bearer Token", default="") + parser.add_argument("--secret", "-s", + help="Secret to add JWT Bearer Token", default="") def run(self, args): - create_webhooks_file_if_not_exists() + create_webhooks_file_if_not_exists(args) - with fasteners.InterProcessLock(WEBHOOKS_FILE): + with LockedOpen(WEBHOOKS_FILE, 'r+'): data = json.load(open(WEBHOOKS_FILE)) if data.get(args.url, None) is None: - output_error("Webhook does not exists") + handle_output_error("Webhook does not exists", + errcode=ERROR_WEBHOOK_NOT_EXISTS, + json_output=args.json) + + if isinstance(data[args.url], str): + data[args.url]["token"] = data[args.url] + + if args.bearer_token != "": + data[args.url]["token"] = args.bearer_token + + if args.secret != "": + data[args.url]["secret"] = args.secret - data[args.url] = args.bearer_token file_content_overwrite(WEBHOOKS_FILE, data) - sync_to_peers() + sync_to_peers(args) class WebhookDelCmd(Cmd): @@ -333,17 +374,19 @@ class WebhookDelCmd(Cmd): parser.add_argument("url", help="URL of Webhook") def run(self, args): - create_webhooks_file_if_not_exists() + create_webhooks_file_if_not_exists(args) - with fasteners.InterProcessLock(WEBHOOKS_FILE): + with LockedOpen(WEBHOOKS_FILE, 'r+'): data = json.load(open(WEBHOOKS_FILE)) if data.get(args.url, None) is None: - output_error("Webhook does not exists") + handle_output_error("Webhook does not exists", + errcode=ERROR_WEBHOOK_NOT_EXISTS, + json_output=args.json) del data[args.url] file_content_overwrite(WEBHOOKS_FILE, data) - sync_to_peers() + sync_to_peers(args) class NodeWebhookTestCmd(Cmd): @@ -352,16 +395,57 @@ class NodeWebhookTestCmd(Cmd): def args(self, parser): parser.add_argument("url") parser.add_argument("bearer_token") + parser.add_argument("secret") def run(self, args): http_headers = {} + hashval = "" if args.bearer_token != ".": - http_headers["Authorization"] = "Bearer " + args.bearer_token - - try: - resp = requests.post(args.url, headers=http_headers) - except requests.ConnectionError as e: - node_output_notok("{0}".format(e)) + hashval = args.bearer_token + + if args.secret != ".": + hashval = get_jwt_token(args.secret, "TEST", int(time.time())) + + if hashval: + http_headers["Authorization"] = "Bearer " + hashval + + urldata = requests.utils.urlparse(args.url) + parts = urldata.netloc.split(":") + domain = parts[0] + # Default https port if not specified + port = 443 + if len(parts) == 2: + port = int(parts[1]) + + cert_path = os.path.join(CERTS_DIR, args.url.replace("/", "_").strip()) + verify = True + while True: + try: + resp = requests.post(args.url, headers=http_headers, + verify=verify) + # Successful webhook push + break + except requests.exceptions.SSLError as e: + # If verify is equal to cert path, but still failed with + # SSLError, Looks like some issue with custom downloaded + # certificate, Try with verify = false + if verify == cert_path: + verify = False + continue + + # If verify is instance of bool and True, then custom cert + # is required, download the cert and retry + try: + save_https_cert(domain, port, cert_path) + verify = cert_path + except Exception: + verify = False + + # Done with collecting cert, continue + continue + except Exception as e: + node_output_notok("{0}".format(e)) + break if resp.status_code != 200: node_output_notok("{0}".format(resp.status_code)) @@ -375,28 +459,51 @@ class WebhookTestCmd(Cmd): def args(self, parser): parser.add_argument("url", help="URL of Webhook") parser.add_argument("--bearer_token", "-t", help="Bearer Token") + parser.add_argument("--secret", "-s", + help="Secret to generate Bearer Token") def run(self, args): url = args.url bearer_token = args.bearer_token + secret = args.secret + if not args.url: url = "." if not args.bearer_token: bearer_token = "." + if not args.secret: + secret = "." - out = execute_in_peers("node-webhook-test", [url, bearer_token]) + out = execute_in_peers("node-webhook-test", [url, bearer_token, + secret]) - table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"]) - table.align["NODE STATUS"] = "r" - table.align["WEBHOOK STATUS"] = "r" + if not args.json: + table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"]) + table.align["NODE STATUS"] = "r" + table.align["WEBHOOK STATUS"] = "r" - for p in out: - table.add_row([p.hostname, - "UP" if p.node_up else "DOWN", - "OK" if p.ok else "NOT OK: {0}".format( - p.error)]) + num_ok_rows = 0 + json_out = [] + if args.json: + num_ok_rows = rows_to_json(json_out, "webhook_status", out) + else: + num_ok_rows = rows_to_table(table, out) + + ret = 0 + if num_ok_rows == 0: + ret = ERROR_ALL_NODES_STATUS_NOT_OK + elif num_ok_rows != len(out): + ret = ERROR_PARTIAL_SUCCESS + + if args.json: + print (json.dumps({ + "output": json_out, + "error": "" + })) + else: + print (table) - print (table) + sys.exit(ret) class ConfigGetCmd(Cmd): @@ -411,16 +518,30 @@ class ConfigGetCmd(Cmd): data.update(json.load(open(CUSTOM_CONFIG_FILE))) if args.name is not None and args.name not in CONFIG_KEYS: - output_error("Invalid Config item") + handle_output_error("Invalid Config item", + errcode=ERROR_INVALID_CONFIG, + json_output=args.json) + + if args.json: + json_out = {} + if args.name is None: + json_out = data + else: + json_out[args.name] = data[args.name] - table = PrettyTable(["NAME", "VALUE"]) - if args.name is None: - for k, v in data.items(): - table.add_row([k, v]) + print (json.dumps({ + "output": json_out, + "error": "" + })) else: - table.add_row([args.name, data[args.name]]) + table = PrettyTable(["NAME", "VALUE"]) + if args.name is None: + for k, v in data.items(): + table.add_row([k, v]) + else: + table.add_row([args.name, data[args.name]]) - print (table) + print (table) def read_file_content_json(fname): @@ -442,9 +563,13 @@ class ConfigSetCmd(Cmd): def run(self, args): if args.name not in CONFIG_KEYS: - output_error("Invalid Config item") + handle_output_error("Invalid Config item", + errcode=ERROR_INVALID_CONFIG, + json_output=args.json) - with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE): + create_custom_config_file_if_not_exists(args) + + with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'): data = json.load(open(DEFAULT_CONFIG_FILE)) if os.path.exists(CUSTOM_CONFIG_FILE): config_json = read_file_content_json(CUSTOM_CONFIG_FILE) @@ -452,16 +577,20 @@ class ConfigSetCmd(Cmd): # Do Nothing if same as previous value if data[args.name] == args.value: - return + handle_output_error("Config value not changed. Same config", + errcode=ERROR_SAME_CONFIG, + json_output=args.json) # TODO: Validate Value - create_custom_config_file_if_not_exists() new_data = read_file_content_json(CUSTOM_CONFIG_FILE) v = args.value if args.name in BOOL_CONFIGS: v = boolify(args.value) + if args.name in INT_CONFIGS: + v = int(args.value) + new_data[args.name] = v file_content_overwrite(CUSTOM_CONFIG_FILE, new_data) @@ -470,7 +599,10 @@ class ConfigSetCmd(Cmd): if args.name in RESTART_CONFIGS: restart = True - sync_to_peers(restart=restart) + if restart: + print ("\nRestart glustereventsd in all nodes") + + sync_to_peers(args) class ConfigResetCmd(Cmd): @@ -480,14 +612,22 @@ class ConfigResetCmd(Cmd): parser.add_argument("name", help="Config Name or all") def run(self, args): - with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE): + create_custom_config_file_if_not_exists(args) + + with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'): changed_keys = [] data = {} if os.path.exists(CUSTOM_CONFIG_FILE): data = read_file_content_json(CUSTOM_CONFIG_FILE) - if not data: - return + # If No data available in custom config or, the specific config + # item is not available in custom config + if not data or \ + (args.name != "all" and data.get(args.name, None) is None): + handle_output_error("Config value not reset. Already " + "set to default value", + errcode=ERROR_SAME_CONFIG, + json_output=args.json) if args.name.lower() == "all": for k, v in data.items(): @@ -507,15 +647,23 @@ class ConfigResetCmd(Cmd): restart = True break - sync_to_peers(restart=restart) + if restart: + print ("\nRestart glustereventsd in all nodes") + + sync_to_peers(args) class SyncCmd(Cmd): name = "sync" def run(self, args): - sync_to_peers() + sync_to_peers(args) + + +def common_args(parser): + parser.add_argument("--json", help="JSON Output", action="store_true") if __name__ == "__main__": + set_common_args_func(common_args) runcli() |
