diff options
Diffstat (limited to 'events')
-rw-r--r-- | events/src/eventsapiconf.py.in | 1 | ||||
-rw-r--r-- | events/src/glustereventsd.py | 21 | ||||
-rw-r--r-- | events/src/peer_eventsapi.py | 160 | ||||
-rw-r--r-- | events/src/utils.py | 77 |
4 files changed, 143 insertions, 116 deletions
diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in index fad96ca2cc7..ecccd3dbda4 100644 --- a/events/src/eventsapiconf.py.in +++ b/events/src/eventsapiconf.py.in @@ -23,3 +23,4 @@ INT_CONFIGS = ["port"] RESTART_CONFIGS = ["port"] EVENTS_ENABLED = @EVENTS_ENABLED@ UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info" +PID_FILE = "@localstatedir@/run/glustereventsd.pid" diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py index 91a0743ff22..d057e097c97 100644 --- a/events/src/glustereventsd.py +++ b/events/src/glustereventsd.py @@ -15,12 +15,13 @@ import sys import signal import SocketServer import socket +from argparse import ArgumentParser, RawDescriptionHelpFormatter from eventtypes import all_events import handlers import utils -from eventsapiconf import SERVER_ADDRESS -from utils import logger +from eventsapiconf import SERVER_ADDRESS, PID_FILE +from utils import logger, PidFile, PidFileLockFailed class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler): @@ -90,9 +91,23 @@ def init_event_server(): server.serve_forever() +def get_args(): + parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, + description=__doc__) + parser.add_argument("-p", "--pid-file", help="PID File", + default=PID_FILE) + + return parser.parse_args() + + def main(): + args = get_args() try: - init_event_server() + with PidFile(args.pid_file): + init_event_server() + except PidFileLockFailed as e: + sys.stderr.write("Failed to get lock for pid file({0}): {1}".format( + args.pid_file, e)) except KeyboardInterrupt: sys.exit(1) diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py index f4447784f90..7f80f791c4d 100644 --- a/events/src/peer_eventsapi.py +++ b/events/src/peer_eventsapi.py @@ -14,14 +14,17 @@ from __future__ import print_function import os import json from errno import EEXIST +import fcntl +from errno import EACCES, EAGAIN +import signal import requests -import fasteners from prettytable import PrettyTable from gluster.cliutils import (Cmd, execute, node_output_ok, node_output_notok, sync_file_to_peers, GlusterCmdException, output_error, execute_in_peers, runcli) +from events.utils import LockedOpen from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, WEBHOOKS_FILE, @@ -32,6 +35,7 @@ from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC, CONFIG_KEYS, BOOL_CONFIGS, INT_CONFIGS, + PID_FILE, RESTART_CONFIGS) @@ -78,67 +82,36 @@ def mkdirp(path, exit_on_err=False, logger=None): output_error("Fail to create dir %s: %s" % (path, e)) -def is_enabled(service): - rc, out, err = execute(["systemctl", "is-enabled", service]) - return rc == 0 - - -def is_active(service): - rc, out, err = execute(["systemctl", "is-active", service]) - return rc == 0 - - -def enable_service(service): - if not is_enabled(service): - cmd = ["systemctl", "enable", service] - return execute(cmd) - - return (0, "", "") - - -def disable_service(service): - if is_enabled(service): - cmd = ["systemctl", "disable", service] - return execute(cmd) - - return (0, "", "") - - -def start_service(service): - rc, out, err = enable_service(service) - if rc != 0: - return (rc, out, err) - - cmd = ["systemctl", "start", service] - return execute(cmd) - - -def stop_service(service): - rc, out, err = disable_service(service) - if rc != 0: - return (rc, out, err) - - cmd = ["systemctl", "stop", service] - return execute(cmd) - - -def restart_service(service): - rc, out, err = stop_service(service) - if rc != 0: - return (rc, out, err) - - return start_service(service) +def is_active(): + state = False + try: + with open(PID_FILE, "a+") as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + state = False + except (IOError, OSError) as e: + if e.errno in (EACCES, EAGAIN): + # cannot grab. so, process still running..move on + state = True + else: + state = False + return state -def reload_service(service): - if is_active(service): - cmd = ["systemctl", "reload", service] - return execute(cmd) +def reload_service(): + pid = None + if is_active(): + with open(PID_FILE) as f: + try: + pid = int(f.read().strip()) + except ValueError: + pid = None + if pid is not None: + os.kill(pid, signal.SIGUSR2) return (0, "", "") -def sync_to_peers(restart=False): +def sync_to_peers(): if os.path.exists(WEBHOOKS_FILE): try: sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC) @@ -153,11 +126,7 @@ def sync_to_peers(restart=False): output_error("Failed to sync Config file: [Error: {0}]" "{1}".format(e[0], e[2])) - action = "node-reload" - if restart: - action = "node-restart" - - out = execute_in_peers(action) + out = execute_in_peers("node-reload") table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"]) table.align["NODE STATUS"] = "r" table.align["SYNC STATUS"] = "r" @@ -204,53 +173,11 @@ def action_handle(action): print (table) -class NodeStart(Cmd): - name = "node-start" - - def run(self, args): - node_output_handle(start_service(EVENTSD)) - - -class StartCmd(Cmd): - name = "start" - - def run(self, args): - action_handle("start") - - -class NodeStop(Cmd): - name = "node-stop" - - def run(self, args): - node_output_handle(stop_service(EVENTSD)) - - -class StopCmd(Cmd): - name = "stop" - - def run(self, args): - action_handle("stop") - - -class NodeRestart(Cmd): - name = "node-restart" - - def run(self, args): - node_output_handle(restart_service(EVENTSD)) - - -class RestartCmd(Cmd): - name = "restart" - - def run(self, args): - action_handle("restart") - - class NodeReload(Cmd): name = "node-reload" def run(self, args): - node_output_handle(reload_service(EVENTSD)) + node_output_handle(reload_service()) class ReloadCmd(Cmd): @@ -264,7 +191,7 @@ class NodeStatus(Cmd): name = "node-status" def run(self, args): - node_output_ok("UP" if is_active(EVENTSD) else "DOWN") + node_output_ok("UP" if is_active() else "DOWN") class StatusCmd(Cmd): @@ -294,7 +221,7 @@ class WebhookAddCmd(Cmd): def run(self, args): create_webhooks_file_if_not_exists() - with fasteners.InterProcessLock(WEBHOOKS_FILE): + with LockedOpen(WEBHOOKS_FILE, 'r+'): data = json.load(open(WEBHOOKS_FILE)) if data.get(args.url, None) is not None: output_error("Webhook already exists") @@ -316,7 +243,7 @@ class WebhookModCmd(Cmd): def run(self, args): create_webhooks_file_if_not_exists() - with fasteners.InterProcessLock(WEBHOOKS_FILE): + with LockedOpen(WEBHOOKS_FILE, 'r+'): data = json.load(open(WEBHOOKS_FILE)) if data.get(args.url, None) is None: output_error("Webhook does not exists") @@ -336,7 +263,7 @@ class WebhookDelCmd(Cmd): def run(self, args): create_webhooks_file_if_not_exists() - with fasteners.InterProcessLock(WEBHOOKS_FILE): + with LockedOpen(WEBHOOKS_FILE, 'r+'): data = json.load(open(WEBHOOKS_FILE)) if data.get(args.url, None) is None: output_error("Webhook does not exists") @@ -445,7 +372,9 @@ class ConfigSetCmd(Cmd): if args.name not in CONFIG_KEYS: output_error("Invalid Config item") - with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE): + create_custom_config_file_if_not_exists() + + with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'): data = json.load(open(DEFAULT_CONFIG_FILE)) if os.path.exists(CUSTOM_CONFIG_FILE): config_json = read_file_content_json(CUSTOM_CONFIG_FILE) @@ -456,7 +385,6 @@ class ConfigSetCmd(Cmd): return # TODO: Validate Value - create_custom_config_file_if_not_exists() new_data = read_file_content_json(CUSTOM_CONFIG_FILE) v = args.value @@ -474,7 +402,9 @@ class ConfigSetCmd(Cmd): if args.name in RESTART_CONFIGS: restart = True - sync_to_peers(restart=restart) + sync_to_peers() + if restart: + print ("\nRestart glustereventsd in all nodes") class ConfigResetCmd(Cmd): @@ -484,7 +414,9 @@ class ConfigResetCmd(Cmd): parser.add_argument("name", help="Config Name or all") def run(self, args): - with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE): + create_custom_config_file_if_not_exists() + + with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'): changed_keys = [] data = {} if os.path.exists(CUSTOM_CONFIG_FILE): @@ -511,7 +443,9 @@ class ConfigResetCmd(Cmd): restart = True break - sync_to_peers(restart=restart) + sync_to_peers() + if restart: + print ("\nRestart glustereventsd in all nodes") class SyncCmd(Cmd): diff --git a/events/src/utils.py b/events/src/utils.py index 386e8f28449..db8ebfe29a9 100644 --- a/events/src/utils.py +++ b/events/src/utils.py @@ -12,6 +12,8 @@ import json import os import logging +import fcntl +from errno import ESRCH, EBADF import requests from eventsapiconf import (LOG_FILE, @@ -168,3 +170,78 @@ def plugin_webhook(message): url=url, event=message_json, status_code=resp.status_code)) + + +class LockedOpen(object): + + def __init__(self, filename, *args, **kwargs): + self.filename = filename + self.open_args = args + self.open_kwargs = kwargs + self.fileobj = None + + def __enter__(self): + """ + If two processes compete to update a file, The first process + gets the lock and the second process is blocked in the fcntl.flock() + call. When first process replaces the file and releases the lock, + the already open file descriptor in the second process now points + to a "ghost" file(not reachable by any path name) with old contents. + To avoid that conflict, check the fd already opened is same or + not. Open new one if not same + """ + f = open(self.filename, *self.open_args, **self.open_kwargs) + while True: + fcntl.flock(f, fcntl.LOCK_EX) + fnew = open(self.filename, *self.open_args, **self.open_kwargs) + if os.path.sameopenfile(f.fileno(), fnew.fileno()): + fnew.close() + break + else: + f.close() + f = fnew + self.fileobj = f + return f + + def __exit__(self, _exc_type, _exc_value, _traceback): + self.fileobj.close() + + +class PidFileLockFailed(Exception): + pass + + +class PidFile(object): + def __init__(self, filename): + self.filename = filename + self.pid = os.getpid() + self.fh = None + + def cleanup(self, remove_file=True): + try: + if self.fh is not None: + self.fh.close() + except IOError as exc: + if exc.errno != EBADF: + raise + finally: + if os.path.isfile(self.filename) and remove_file: + os.remove(self.filename) + + def __enter__(self): + self.fh = open(self.filename, 'a+') + try: + fcntl.flock(self.fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as exc: + self.cleanup(remove_file=False) + raise PidFileLockFailed(exc) + + self.fh.seek(0) + self.fh.truncate() + self.fh.write("%d\n" % self.pid) + self.fh.flush() + self.fh.seek(0) + return self + + def __exit__(self, _exc_type, _exc_value, _traceback): + self.cleanup() |