summaryrefslogtreecommitdiffstats
path: root/events
diff options
context:
space:
mode:
Diffstat (limited to 'events')
-rw-r--r--events/src/eventsapiconf.py.in1
-rw-r--r--events/src/glustereventsd.py21
-rw-r--r--events/src/peer_eventsapi.py160
-rw-r--r--events/src/utils.py77
4 files changed, 143 insertions, 116 deletions
diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in
index fad96ca2cc7..ecccd3dbda4 100644
--- a/events/src/eventsapiconf.py.in
+++ b/events/src/eventsapiconf.py.in
@@ -23,3 +23,4 @@ INT_CONFIGS = ["port"]
RESTART_CONFIGS = ["port"]
EVENTS_ENABLED = @EVENTS_ENABLED@
UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info"
+PID_FILE = "@localstatedir@/run/glustereventsd.pid"
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
index 91a0743ff22..d057e097c97 100644
--- a/events/src/glustereventsd.py
+++ b/events/src/glustereventsd.py
@@ -15,12 +15,13 @@ import sys
import signal
import SocketServer
import socket
+from argparse import ArgumentParser, RawDescriptionHelpFormatter
from eventtypes import all_events
import handlers
import utils
-from eventsapiconf import SERVER_ADDRESS
-from utils import logger
+from eventsapiconf import SERVER_ADDRESS, PID_FILE
+from utils import logger, PidFile, PidFileLockFailed
class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
@@ -90,9 +91,23 @@ def init_event_server():
server.serve_forever()
+def get_args():
+ parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
+ description=__doc__)
+ parser.add_argument("-p", "--pid-file", help="PID File",
+ default=PID_FILE)
+
+ return parser.parse_args()
+
+
def main():
+ args = get_args()
try:
- init_event_server()
+ with PidFile(args.pid_file):
+ init_event_server()
+ except PidFileLockFailed as e:
+ sys.stderr.write("Failed to get lock for pid file({0}): {1}".format(
+ args.pid_file, e))
except KeyboardInterrupt:
sys.exit(1)
diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py
index f4447784f90..7f80f791c4d 100644
--- a/events/src/peer_eventsapi.py
+++ b/events/src/peer_eventsapi.py
@@ -14,14 +14,17 @@ from __future__ import print_function
import os
import json
from errno import EEXIST
+import fcntl
+from errno import EACCES, EAGAIN
+import signal
import requests
-import fasteners
from prettytable import PrettyTable
from gluster.cliutils import (Cmd, execute, node_output_ok, node_output_notok,
sync_file_to_peers, GlusterCmdException,
output_error, execute_in_peers, runcli)
+from events.utils import LockedOpen
from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
WEBHOOKS_FILE,
@@ -32,6 +35,7 @@ from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
CONFIG_KEYS,
BOOL_CONFIGS,
INT_CONFIGS,
+ PID_FILE,
RESTART_CONFIGS)
@@ -78,67 +82,36 @@ def mkdirp(path, exit_on_err=False, logger=None):
output_error("Fail to create dir %s: %s" % (path, e))
-def is_enabled(service):
- rc, out, err = execute(["systemctl", "is-enabled", service])
- return rc == 0
-
-
-def is_active(service):
- rc, out, err = execute(["systemctl", "is-active", service])
- return rc == 0
-
-
-def enable_service(service):
- if not is_enabled(service):
- cmd = ["systemctl", "enable", service]
- return execute(cmd)
-
- return (0, "", "")
-
-
-def disable_service(service):
- if is_enabled(service):
- cmd = ["systemctl", "disable", service]
- return execute(cmd)
-
- return (0, "", "")
-
-
-def start_service(service):
- rc, out, err = enable_service(service)
- if rc != 0:
- return (rc, out, err)
-
- cmd = ["systemctl", "start", service]
- return execute(cmd)
-
-
-def stop_service(service):
- rc, out, err = disable_service(service)
- if rc != 0:
- return (rc, out, err)
-
- cmd = ["systemctl", "stop", service]
- return execute(cmd)
-
-
-def restart_service(service):
- rc, out, err = stop_service(service)
- if rc != 0:
- return (rc, out, err)
-
- return start_service(service)
+def is_active():
+ state = False
+ try:
+ with open(PID_FILE, "a+") as f:
+ fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
+ state = False
+ except (IOError, OSError) as e:
+ if e.errno in (EACCES, EAGAIN):
+ # cannot grab. so, process still running..move on
+ state = True
+ else:
+ state = False
+ return state
-def reload_service(service):
- if is_active(service):
- cmd = ["systemctl", "reload", service]
- return execute(cmd)
+def reload_service():
+ pid = None
+ if is_active():
+ with open(PID_FILE) as f:
+ try:
+ pid = int(f.read().strip())
+ except ValueError:
+ pid = None
+ if pid is not None:
+ os.kill(pid, signal.SIGUSR2)
return (0, "", "")
-def sync_to_peers(restart=False):
+def sync_to_peers():
if os.path.exists(WEBHOOKS_FILE):
try:
sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC)
@@ -153,11 +126,7 @@ def sync_to_peers(restart=False):
output_error("Failed to sync Config file: [Error: {0}]"
"{1}".format(e[0], e[2]))
- action = "node-reload"
- if restart:
- action = "node-restart"
-
- out = execute_in_peers(action)
+ out = execute_in_peers("node-reload")
table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"])
table.align["NODE STATUS"] = "r"
table.align["SYNC STATUS"] = "r"
@@ -204,53 +173,11 @@ def action_handle(action):
print (table)
-class NodeStart(Cmd):
- name = "node-start"
-
- def run(self, args):
- node_output_handle(start_service(EVENTSD))
-
-
-class StartCmd(Cmd):
- name = "start"
-
- def run(self, args):
- action_handle("start")
-
-
-class NodeStop(Cmd):
- name = "node-stop"
-
- def run(self, args):
- node_output_handle(stop_service(EVENTSD))
-
-
-class StopCmd(Cmd):
- name = "stop"
-
- def run(self, args):
- action_handle("stop")
-
-
-class NodeRestart(Cmd):
- name = "node-restart"
-
- def run(self, args):
- node_output_handle(restart_service(EVENTSD))
-
-
-class RestartCmd(Cmd):
- name = "restart"
-
- def run(self, args):
- action_handle("restart")
-
-
class NodeReload(Cmd):
name = "node-reload"
def run(self, args):
- node_output_handle(reload_service(EVENTSD))
+ node_output_handle(reload_service())
class ReloadCmd(Cmd):
@@ -264,7 +191,7 @@ class NodeStatus(Cmd):
name = "node-status"
def run(self, args):
- node_output_ok("UP" if is_active(EVENTSD) else "DOWN")
+ node_output_ok("UP" if is_active() else "DOWN")
class StatusCmd(Cmd):
@@ -294,7 +221,7 @@ class WebhookAddCmd(Cmd):
def run(self, args):
create_webhooks_file_if_not_exists()
- with fasteners.InterProcessLock(WEBHOOKS_FILE):
+ with LockedOpen(WEBHOOKS_FILE, 'r+'):
data = json.load(open(WEBHOOKS_FILE))
if data.get(args.url, None) is not None:
output_error("Webhook already exists")
@@ -316,7 +243,7 @@ class WebhookModCmd(Cmd):
def run(self, args):
create_webhooks_file_if_not_exists()
- with fasteners.InterProcessLock(WEBHOOKS_FILE):
+ with LockedOpen(WEBHOOKS_FILE, 'r+'):
data = json.load(open(WEBHOOKS_FILE))
if data.get(args.url, None) is None:
output_error("Webhook does not exists")
@@ -336,7 +263,7 @@ class WebhookDelCmd(Cmd):
def run(self, args):
create_webhooks_file_if_not_exists()
- with fasteners.InterProcessLock(WEBHOOKS_FILE):
+ with LockedOpen(WEBHOOKS_FILE, 'r+'):
data = json.load(open(WEBHOOKS_FILE))
if data.get(args.url, None) is None:
output_error("Webhook does not exists")
@@ -445,7 +372,9 @@ class ConfigSetCmd(Cmd):
if args.name not in CONFIG_KEYS:
output_error("Invalid Config item")
- with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE):
+ create_custom_config_file_if_not_exists()
+
+ with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'):
data = json.load(open(DEFAULT_CONFIG_FILE))
if os.path.exists(CUSTOM_CONFIG_FILE):
config_json = read_file_content_json(CUSTOM_CONFIG_FILE)
@@ -456,7 +385,6 @@ class ConfigSetCmd(Cmd):
return
# TODO: Validate Value
- create_custom_config_file_if_not_exists()
new_data = read_file_content_json(CUSTOM_CONFIG_FILE)
v = args.value
@@ -474,7 +402,9 @@ class ConfigSetCmd(Cmd):
if args.name in RESTART_CONFIGS:
restart = True
- sync_to_peers(restart=restart)
+ sync_to_peers()
+ if restart:
+ print ("\nRestart glustereventsd in all nodes")
class ConfigResetCmd(Cmd):
@@ -484,7 +414,9 @@ class ConfigResetCmd(Cmd):
parser.add_argument("name", help="Config Name or all")
def run(self, args):
- with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE):
+ create_custom_config_file_if_not_exists()
+
+ with LockedOpen(CUSTOM_CONFIG_FILE, 'r+'):
changed_keys = []
data = {}
if os.path.exists(CUSTOM_CONFIG_FILE):
@@ -511,7 +443,9 @@ class ConfigResetCmd(Cmd):
restart = True
break
- sync_to_peers(restart=restart)
+ sync_to_peers()
+ if restart:
+ print ("\nRestart glustereventsd in all nodes")
class SyncCmd(Cmd):
diff --git a/events/src/utils.py b/events/src/utils.py
index 386e8f28449..db8ebfe29a9 100644
--- a/events/src/utils.py
+++ b/events/src/utils.py
@@ -12,6 +12,8 @@
import json
import os
import logging
+import fcntl
+from errno import ESRCH, EBADF
import requests
from eventsapiconf import (LOG_FILE,
@@ -168,3 +170,78 @@ def plugin_webhook(message):
url=url,
event=message_json,
status_code=resp.status_code))
+
+
+class LockedOpen(object):
+
+ def __init__(self, filename, *args, **kwargs):
+ self.filename = filename
+ self.open_args = args
+ self.open_kwargs = kwargs
+ self.fileobj = None
+
+ def __enter__(self):
+ """
+ If two processes compete to update a file, The first process
+ gets the lock and the second process is blocked in the fcntl.flock()
+ call. When first process replaces the file and releases the lock,
+ the already open file descriptor in the second process now points
+ to a "ghost" file(not reachable by any path name) with old contents.
+ To avoid that conflict, check the fd already opened is same or
+ not. Open new one if not same
+ """
+ f = open(self.filename, *self.open_args, **self.open_kwargs)
+ while True:
+ fcntl.flock(f, fcntl.LOCK_EX)
+ fnew = open(self.filename, *self.open_args, **self.open_kwargs)
+ if os.path.sameopenfile(f.fileno(), fnew.fileno()):
+ fnew.close()
+ break
+ else:
+ f.close()
+ f = fnew
+ self.fileobj = f
+ return f
+
+ def __exit__(self, _exc_type, _exc_value, _traceback):
+ self.fileobj.close()
+
+
+class PidFileLockFailed(Exception):
+ pass
+
+
+class PidFile(object):
+ def __init__(self, filename):
+ self.filename = filename
+ self.pid = os.getpid()
+ self.fh = None
+
+ def cleanup(self, remove_file=True):
+ try:
+ if self.fh is not None:
+ self.fh.close()
+ except IOError as exc:
+ if exc.errno != EBADF:
+ raise
+ finally:
+ if os.path.isfile(self.filename) and remove_file:
+ os.remove(self.filename)
+
+ def __enter__(self):
+ self.fh = open(self.filename, 'a+')
+ try:
+ fcntl.flock(self.fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError as exc:
+ self.cleanup(remove_file=False)
+ raise PidFileLockFailed(exc)
+
+ self.fh.seek(0)
+ self.fh.truncate()
+ self.fh.write("%d\n" % self.pid)
+ self.fh.flush()
+ self.fh.seek(0)
+ return self
+
+ def __exit__(self, _exc_type, _exc_value, _traceback):
+ self.cleanup()