diff options
Diffstat (limited to 'events/src/peer_eventsapi.py')
| -rw-r--r-- | events/src/peer_eventsapi.py | 669 |
1 files changed, 669 insertions, 0 deletions
diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py new file mode 100644 index 00000000000..4d2e5f35b1c --- /dev/null +++ b/events/src/peer_eventsapi.py @@ -0,0 +1,669 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. +# +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +from __future__ import print_function +import os +import json +from errno import EEXIST +import fcntl +from errno import EACCES, EAGAIN +import signal +import sys +import time + +import requests +from prettytable import PrettyTable + +from gluster.cliutils import (Cmd, node_output_ok, node_output_notok, + sync_file_to_peers, GlusterCmdException, + output_error, execute_in_peers, runcli, + set_common_args_func) +from gfevents.utils import LockedOpen, get_jwt_token, save_https_cert + +from gfevents.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, + WEBHOOKS_FILE, + DEFAULT_CONFIG_FILE, + CUSTOM_CONFIG_FILE, + CUSTOM_CONFIG_FILE_TO_SYNC, + EVENTSD, + CONFIG_KEYS, + BOOL_CONFIGS, + INT_CONFIGS, + PID_FILE, + RESTART_CONFIGS, + ERROR_INVALID_CONFIG, + ERROR_WEBHOOK_NOT_EXISTS, + ERROR_CONFIG_SYNC_FAILED, + ERROR_WEBHOOK_ALREADY_EXISTS, + ERROR_PARTIAL_SUCCESS, + ERROR_ALL_NODES_STATUS_NOT_OK, + ERROR_SAME_CONFIG, + ERROR_WEBHOOK_SYNC_FAILED, + CERTS_DIR) + + +def handle_output_error(err, errcode=1, json_output=False): + if json_output: + print (json.dumps({ + "output": "", + "error": err + })) + sys.exit(errcode) + else: + output_error(err, errcode) + + +def file_content_overwrite(fname, data): + with open(fname + ".tmp", "w") as f: + f.write(json.dumps(data)) + + os.rename(fname + ".tmp", fname) + + +def create_custom_config_file_if_not_exists(args): + try: + config_dir = os.path.dirname(CUSTOM_CONFIG_FILE) + mkdirp(config_dir) + except OSError as e: + handle_output_error("Failed to create dir %s: %s" % (config_dir, e), + json_output=args.json) + + if not os.path.exists(CUSTOM_CONFIG_FILE): + with open(CUSTOM_CONFIG_FILE, "w") as f: + f.write("{}") + + +def create_webhooks_file_if_not_exists(args): + try: + webhooks_dir = os.path.dirname(WEBHOOKS_FILE) + mkdirp(webhooks_dir) + except OSError as e: + handle_output_error("Failed to create dir %s: %s" % (webhooks_dir, e), + json_output=args.json) + + if not os.path.exists(WEBHOOKS_FILE): + with open(WEBHOOKS_FILE, "w") as f: + f.write("{}") + + +def boolify(value): + val = False + if value.lower() in ["enabled", "true", "on", "yes"]: + val = True + return val + + +def mkdirp(path, exit_on_err=False, logger=None): + """ + Try creating required directory structure + ignore EEXIST and raise exception for rest of the errors. + Print error in stderr and exit + """ + try: + os.makedirs(path) + except OSError as e: + if e.errno != EEXIST or not os.path.isdir(path): + raise + + +def is_active(): + state = False + try: + with open(PID_FILE, "a+") as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + state = False + except (IOError, OSError) as e: + if e.errno in (EACCES, EAGAIN): + # cannot grab. so, process still running..move on + state = True + else: + state = False + return state + + +def reload_service(): + pid = None + if is_active(): + with open(PID_FILE) as f: + try: + pid = int(f.read().strip()) + except ValueError: + pid = None + if pid is not None: + os.kill(pid, signal.SIGUSR2) + + return (0, "", "") + + +def rows_to_json(json_out, column_name, rows): + num_ok_rows = 0 + for row in rows: + num_ok_rows += 1 if row.ok else 0 + json_out.append({ + "node": row.hostname, + "node_status": "UP" if row.node_up else "DOWN", + column_name: "OK" if row.ok else "NOT OK", + "error": row.error + }) + return num_ok_rows + + +def rows_to_table(table, rows): + num_ok_rows = 0 + for row in rows: + num_ok_rows += 1 if row.ok else 0 + table.add_row([row.hostname, + "UP" if row.node_up else "DOWN", + "OK" if row.ok else "NOT OK: {0}".format( + row.error)]) + return num_ok_rows + + +def sync_to_peers(args): + if os.path.exists(WEBHOOKS_FILE): + try: + sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC) + except GlusterCmdException as e: + # Print stdout if stderr is empty + errmsg = e.message[2] if e.message[2] else e.message[1] + handle_output_error("Failed to sync Webhooks file: [Error: {0}]" + "{1}".format(e.message[0], errmsg), + errcode=ERROR_WEBHOOK_SYNC_FAILED, + json_output=args.json) + + if os.path.exists(CUSTOM_CONFIG_FILE): + try: + sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC) + except GlusterCmdException as e: + # Print stdout if stderr is empty + errmsg = e.message[2] if e.message[2] else e.message[1] + handle_output_error("Failed to sync Config file: [Error: {0}]" + "{1}".format(e.message[0], errmsg), + errcode=ERROR_CONFIG_SYNC_FAILED, + json_output=args.json) + + out = execute_in_peers("node-reload") + if not args.json: + table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"]) + table.align["NODE STATUS"] = "r" + table.align["SYNC STATUS"] = "r" + + json_out = [] + if args.json: + num_ok_rows = rows_to_json(json_out, "sync_status", out) + else: + num_ok_rows = rows_to_table(table, out) + + ret = 0 + if num_ok_rows == 0: + ret = ERROR_ALL_NODES_STATUS_NOT_OK + elif num_ok_rows != len(out): + ret = ERROR_PARTIAL_SUCCESS + + if args.json: + print (json.dumps({ + "output": json_out, + "error": "" + })) + else: + print (table) + + # If sync status is not ok for any node set error code as partial success + sys.exit(ret) + + +def node_output_handle(resp): + rc, out, err = resp + if rc == 0: + node_output_ok(out) + else: + node_output_notok(err) + + +def action_handle(action, json_output=False): + out = execute_in_peers("node-" + action) + column_name = action.upper() + if action == "status": + column_name = EVENTSD.upper() + + if not json_output: + table = PrettyTable(["NODE", "NODE STATUS", column_name + " STATUS"]) + table.align["NODE STATUS"] = "r" + table.align[column_name + " STATUS"] = "r" + + json_out = [] + if json_output: + rows_to_json(json_out, column_name.lower() + "_status", out) + else: + rows_to_table(table, out) + + return json_out if json_output else table + + +class NodeReload(Cmd): + name = "node-reload" + + def run(self, args): + node_output_handle(reload_service()) + + +class ReloadCmd(Cmd): + name = "reload" + + def run(self, args): + out = action_handle("reload", args.json) + if args.json: + print (json.dumps({ + "output": out, + "error": "" + })) + else: + print (out) + + +class NodeStatus(Cmd): + name = "node-status" + + def run(self, args): + node_output_ok("UP" if is_active() else "DOWN") + + +class StatusCmd(Cmd): + name = "status" + + def run(self, args): + webhooks = {} + if os.path.exists(WEBHOOKS_FILE): + webhooks = json.load(open(WEBHOOKS_FILE)) + + json_out = {"webhooks": [], "data": []} + if args.json: + json_out["webhooks"] = webhooks.keys() + else: + print ("Webhooks: " + ("" if webhooks else "None")) + for w in webhooks: + print (w) + + print () + + out = action_handle("status", args.json) + if args.json: + json_out["data"] = out + print (json.dumps({ + "output": json_out, + "error": "" + })) + else: + print (out) + + +class WebhookAddCmd(Cmd): + name = "webhook-add" + + def args(self, parser): + parser.add_argument("url", help="URL of Webhook") + parser.add_argument("--bearer_token", "-t", help="Bearer Token", + default="") + parser.add_argument("--secret", "-s", + help="Secret to add JWT Bearer Token", default="") + + def run(self, args): + create_webhooks_file_if_not_exists(args) + + with LockedOpen(WEBHOOKS_FILE, 'r+'): + data = json.load(open(WEBHOOKS_FILE)) + if data.get(args.url, None) is not None: + handle_output_error("Webhook already exists", + errcode=ERROR_WEBHOOK_ALREADY_EXISTS, + json_output=args.json) + + data[args.url] = {"token": args.bearer_token, + "secret": args.secret} + file_content_overwrite(WEBHOOKS_FILE, data) + + sync_to_peers(args) + + +class WebhookModCmd(Cmd): + name = "webhook-mod" + + def args(self, parser): + parser.add_argument("url", help="URL of Webhook") + parser.add_argument("--bearer_token", "-t", help="Bearer Token", + default="") + parser.add_argument("--secret", "-s", + help="Secret to add JWT Bearer Token", default="") + + def run(self, args): + create_webhooks_file_if_not_exists(args) + + with LockedOpen(WEBHOOKS_FILE, 'r+'): + data = json.load(open(WEBHOOKS_FILE)) + if data.get(args.url, None) is None: + handle_output_error("Webhook does not exists", + errcode=ERROR_WEBHOOK_NOT_EXISTS, + json_output=args.json) + + if isinstance(data[args.url], str): + data[args.url]["token"] = data[args.url] + + if args.bearer_token != "": + data[args.url]["token"] = args.bearer_token + + if args.secret != "": + data[args.url]["secret"] = args.secret + + file_content_overwrite(WEBHOOKS_FILE, data) + + sync_to_peers(args) + + +class WebhookDelCmd(Cmd): + name = "webhook-del" + + def args(self, parser): + parser.add_argument("url", help="URL of Webhook") + + def run(self, args): + create_webhooks_file_if_not_exists(args) + + with LockedOpen(WEBHOOKS_FILE, 'r+'): + data = json.load(open(WEBHOOKS_FILE)) + if data.get(args.url, None) is None: + handle_output_error("Webhook does not exists", + errcode=ERROR_WEBHOOK_NOT_EXISTS, + json_output=args.json) + + del data[args.url] + file_content_overwrite(WEBHOOKS_FILE, data) + + sync_to_peers(args) + + +class NodeWebhookTestCmd(Cmd): + name = "node-webhook-test" + + def args(self, parser): + parser.add_argument("url") + parser.add_argument("bearer_token") + parser.add_argument("secret") + + def run(self, args): + http_headers = {} + hashval = "" + if args.bearer_token != ".": + hashval = args.bearer_token + + if args.secret != ".": + hashval = get_jwt_token(args.secret, "TEST", int(time.time())) + + if hashval: + http_headers["Authorization"] = "Bearer " + hashval + + urldata = requests.utils.urlparse(args.url) + parts = urldata.netloc.split(":") + domain = parts[0] + # Default https port if not specified + port = 443 + if len(parts) == 2: + port = int(parts[1]) + + cert_path = os.path.join(CERTS_DIR, args.url.replace("/", "_").strip()) + verify = True + while True: + try: + resp = requests.post(args.url, headers=http_headers, + verify=verify) + # Successful webhook push + break + except requests.exceptions.SSLError as e: + # If verify is equal to cert path, but still failed with + # SSLError, Looks like some issue with custom downloaded + # certificate, Try with verify = false + if verify == cert_path: + verify = False + continue + + # If verify is instance of bool and True, then custom cert + # is required, download the cert and retry + try: + save_https_cert(domain, port, cert_path) + verify = cert_path + except Exception: + verify = False + + # Done with collecting cert, continue + continue + except Exception as e: + node_output_notok("{0}".format(e)) + break + + if resp.status_code != 200: + node_output_notok("{0}".format(resp.status_code)) + + node_output_ok() + + +class WebhookTestCmd(Cmd): + name = "webhook-test" + + def args(self, parser): + parser.add_argument("url", help="URL of Webhook") + parser.add_argument("--bearer_token", "-t", help="Bearer Token") + parser.add_argument("--secret", "-s", + help="Secret to generate Bearer Token") + + def run(self, args): + url = args.url + bearer_token = args.bearer_token + secret = args.secret + + if not args.url: + url = "." + if not args.bearer_token: + bearer_token = "." + if not args.secret: + secret = "." + + out = execute_in_peers("node-webhook-test", [url, bearer_token, + secret]) + + if not args.json: + table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"]) + table.align["NODE STATUS"] = "r" + table.align["WEBHOOK STATUS"] = "r" + + num_ok_rows = 0 + json_out = [] + if args.json: + num_ok_rows = rows_to_json(json_out, "webhook_status", out) + else: + num_ok_rows = rows_to_table(table, out) + + ret = 0 + if num_ok_rows == 0: + ret = ERROR_ALL_NODES_STATUS_NOT_OK + elif num_ok_rows != len(out): + ret = ERROR_PARTIAL_SUCCESS + + if args.json: + print (json.dumps({ + "output": json_out, + "error": "" + })) + else: + print (table) + + sys.exit(ret) + + +class ConfigGetCmd(Cmd): + name = "config-get" + + def args(self, parser): + parser.add_argument("--name", help="Config Name") + + def run(self, args): + data = json.load(open(DEFAULT_CONFIG_FILE)) + if os.path.exists(CUSTOM_CONFIG_FILE): + data.update(json.load(open(CUSTOM_CONFIG_FILE))) + + if args.name is not None and args.name not in CONFIG_KEYS: + handle_output_error("Invalid Config item", + errcode=ERROR_INVALID_CONFIG, + json_output=args.json) + + if args.json: + json_out = {} + if args.name is None: + json_out = data + else: + json_out[args.name] = data[args.name] + + print (json.dumps({ + "output": json_out, + "error": "" + })) + else: + table = PrettyTable(["NAME", "VALUE"]) + if args.name is None: + for k, v in data.items(): + table.add_row([k, v]) + else: + table.add_row([args.name, data[args.name]]) + + print (table) + + +def read_file_content_json(fname): + content = "{}" + with open(fname) as f: + content = f.read() + if content.strip() == "": + content = "{}" + + return json.loads(content) + + +class ConfigSetCmd(Cmd): + name = "config-set" + + def args(self, parser): + parser.add_argument("name", help="Config Name") + parser.add_argument("value", help="Config Value") + + def run(self, args): + if args.name not in CONFIG_KEYS: + handle_output_error("Invalid Config item", + errcode=ERROR_INVALID_CONFIG, + json_output=args.json) + + create_custom_config_file_if_not_exists(args) + + with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'): + data = json.load(open(DEFAULT_CONFIG_FILE)) + if os.path.exists(CUSTOM_CONFIG_FILE): + config_json = read_file_content_json(CUSTOM_CONFIG_FILE) + data.update(config_json) + + # Do Nothing if same as previous value + if data[args.name] == args.value: + handle_output_error("Config value not changed. Same config", + errcode=ERROR_SAME_CONFIG, + json_output=args.json) + + # TODO: Validate Value + new_data = read_file_content_json(CUSTOM_CONFIG_FILE) + + v = args.value + if args.name in BOOL_CONFIGS: + v = boolify(args.value) + + if args.name in INT_CONFIGS: + v = int(args.value) + + new_data[args.name] = v + file_content_overwrite(CUSTOM_CONFIG_FILE, new_data) + + # If any value changed which requires restart of REST server + restart = False + if args.name in RESTART_CONFIGS: + restart = True + + if restart: + print ("\nRestart glustereventsd in all nodes") + + sync_to_peers(args) + + +class ConfigResetCmd(Cmd): + name = "config-reset" + + def args(self, parser): + parser.add_argument("name", help="Config Name or all") + + def run(self, args): + create_custom_config_file_if_not_exists(args) + + with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'): + changed_keys = [] + data = {} + if os.path.exists(CUSTOM_CONFIG_FILE): + data = read_file_content_json(CUSTOM_CONFIG_FILE) + + # If No data available in custom config or, the specific config + # item is not available in custom config + if not data or \ + (args.name != "all" and data.get(args.name, None) is None): + handle_output_error("Config value not reset. Already " + "set to default value", + errcode=ERROR_SAME_CONFIG, + json_output=args.json) + + if args.name.lower() == "all": + for k, v in data.items(): + changed_keys.append(k) + + # Reset all keys + file_content_overwrite(CUSTOM_CONFIG_FILE, {}) + else: + changed_keys.append(args.name) + del data[args.name] + file_content_overwrite(CUSTOM_CONFIG_FILE, data) + + # If any value changed which requires restart of REST server + restart = False + for key in changed_keys: + if key in RESTART_CONFIGS: + restart = True + break + + if restart: + print ("\nRestart glustereventsd in all nodes") + + sync_to_peers(args) + + +class SyncCmd(Cmd): + name = "sync" + + def run(self, args): + sync_to_peers(args) + + +def common_args(parser): + parser.add_argument("--json", help="JSON Output", action="store_true") + + +if __name__ == "__main__": + set_common_args_func(common_args) + runcli() |
