diff options
Diffstat (limited to 'geo-replication/syncdaemon/gsyncdstatus.py')
-rw-r--r-- | geo-replication/syncdaemon/gsyncdstatus.py | 317 |
1 files changed, 317 insertions, 0 deletions
diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py new file mode 100644 index 00000000000..a49b9c23dea --- /dev/null +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import fcntl +import os +import tempfile +import urllib +import json +import time +from datetime import datetime + +DEFAULT_STATUS = "N/A" +MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped") +STATUS_VALUES = (DEFAULT_STATUS, + "Initializing...", + "Active", + "Passive", + "Faulty") + +CRAWL_STATUS_VALUES = (DEFAULT_STATUS, + "Hybrid Crawl", + "History Crawl", + "Changelog Crawl") + + +def human_time(ts): + try: + return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + return DEFAULT_STATUS + + +def human_time_utc(ts): + try: + return datetime.utcfromtimestamp( + float(ts)).strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + return DEFAULT_STATUS + + +def get_default_values(): + return { + "slave_node": DEFAULT_STATUS, + "worker_status": DEFAULT_STATUS, + "last_synced": 0, + "crawl_status": DEFAULT_STATUS, + "entry": 0, + "data": 0, + "meta": 0, + "failures": 0, + "checkpoint_completed": DEFAULT_STATUS, + "checkpoint_time": 0, + "checkpoint_completion_time": 0} + + +class LockedOpen(object): + + def __init__(self, filename, *args, **kwargs): + self.filename = filename + self.open_args = args + self.open_kwargs = kwargs + self.fileobj = None + + def __enter__(self): + """ + If two processes compete to update a file, The first process + gets the lock and the second process is blocked in the fcntl.flock() + call. When first process replaces the file and releases the lock, + the already open file descriptor in the second process now points + to a "ghost" file(not reachable by any path name) with old contents. + To avoid that conflict, check the fd already opened is same or + not. Open new one if not same + """ + f = open(self.filename, *self.open_args, **self.open_kwargs) + while True: + fcntl.flock(f, fcntl.LOCK_EX) + fnew = open(self.filename, *self.open_args, **self.open_kwargs) + if os.path.sameopenfile(f.fileno(), fnew.fileno()): + fnew.close() + break + else: + f.close() + f = fnew + self.fileobj = f + return f + + def __exit__(self, _exc_type, _exc_value, _traceback): + self.fileobj.close() + + +def set_monitor_status(status_file, status): + fd = os.open(status_file, os.O_CREAT | os.O_RDWR) + os.close(fd) + with LockedOpen(status_file, 'r+'): + with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(status_file), + delete=False) as tf: + tf.write(status) + tempname = tf.name + + os.rename(tempname, status_file) + dirfd = os.open(os.path.dirname(os.path.abspath(status_file)), + os.O_DIRECTORY) + os.fsync(dirfd) + os.close(dirfd) + + +class GeorepStatus(object): + def __init__(self, monitor_status_file, brick): + self.work_dir = os.path.dirname(monitor_status_file) + self.monitor_status_file = monitor_status_file + self.filename = os.path.join(self.work_dir, + "brick_%s.status" + % urllib.quote_plus(brick)) + + fd = os.open(self.filename, os.O_CREAT | os.O_RDWR) + os.close(fd) + fd = os.open(self.monitor_status_file, os.O_CREAT | os.O_RDWR) + os.close(fd) + self.brick = brick + self.default_values = get_default_values() + + def _update(self, mergerfunc): + with LockedOpen(self.filename, 'r+') as f: + try: + data = json.load(f) + except ValueError: + data = self.default_values + + data = mergerfunc(data) + with tempfile.NamedTemporaryFile( + 'w', + dir=os.path.dirname(self.filename), + delete=False) as tf: + tf.write(data) + tempname = tf.name + + os.rename(tempname, self.filename) + dirfd = os.open(os.path.dirname(os.path.abspath(self.filename)), + os.O_DIRECTORY) + os.fsync(dirfd) + os.close(dirfd) + + def reset_on_worker_start(self): + def merger(data): + data["slave_node"] = DEFAULT_STATUS + data["crawl_status"] = DEFAULT_STATUS + data["entry"] = 0 + data["data"] = 0 + data["meta"] = 0 + return json.dumps(data) + + self._update(merger) + + def set_field(self, key, value): + def merger(data): + data[key] = value + return json.dumps(data) + + self._update(merger) + + def set_last_synced(self, value, checkpoint_time): + def merger(data): + data["last_synced"] = value[0] + + # If checkpoint is not set or reset + # or if last set checkpoint is changed + if checkpoint_time == 0 or \ + checkpoint_time != data["checkpoint_time"]: + data["checkpoint_time"] = 0 + data["checkpoint_completion_time"] = 0 + data["checkpoint_completed"] = "No" + + # If checkpoint is completed and not marked as completed + # previously then update the checkpoint completed time + if checkpoint_time > 0 and checkpoint_time <= value[0]: + if data["checkpoint_completed"] == "No": + data["checkpoint_time"] = checkpoint_time + data["checkpoint_completion_time"] = int(time.time()) + data["checkpoint_completed"] = "Yes" + return json.dumps(data) + + self._update(merger) + + def set_worker_status(self, status): + self.set_field("worker_status", status) + + def set_worker_crawl_status(self, status): + self.set_field("crawl_status", status) + + def set_slave_node(self, slave_node): + def merger(data): + data["slave_node"] = slave_node + return json.dumps(data) + + self._update(merger) + + def inc_value(self, key, value): + def merger(data): + data[key] = data.get(key, 0) + value + return json.dumps(data) + + self._update(merger) + + def dec_value(self, key, value): + def merger(data): + data[key] = data.get(key, 0) - value + if data[key] < 0: + data[key] = 0 + return json.dumps(data) + + self._update(merger) + + def set_active(self): + self.set_field("worker_status", "Active") + + def set_passive(self): + self.set_field("worker_status", "Passive") + + def get_monitor_status(self): + data = "" + with open(self.monitor_status_file, "r") as f: + data = f.read().strip() + return data + + def get_status(self, checkpoint_time=0): + """ + Monitor Status ---> Created Started Paused Stopped + ---------------------------------------------------------------------- + slave_node N/A VALUE VALUE N/A + status Created VALUE Paused Stopped + last_synced N/A VALUE VALUE VALUE + crawl_status N/A VALUE N/A N/A + entry N/A VALUE N/A N/A + data N/A VALUE N/A N/A + meta N/A VALUE N/A N/A + failures N/A VALUE VALUE VALUE + checkpoint_completed N/A VALUE VALUE VALUE + checkpoint_time N/A VALUE VALUE VALUE + checkpoint_completed_time N/A VALUE VALUE VALUE + """ + data = self.default_values + with open(self.filename) as f: + try: + data.update(json.load(f)) + except ValueError: + pass + monitor_status = self.get_monitor_status() + + if monitor_status in ["Created", "Paused", "Stopped"]: + data["worker_status"] = monitor_status + + # Checkpoint adjustments + if checkpoint_time == 0: + data["checkpoint_completed"] = DEFAULT_STATUS + data["checkpoint_time"] = DEFAULT_STATUS + data["checkpoint_completion_time"] = DEFAULT_STATUS + else: + if checkpoint_time != data["checkpoint_time"]: + if checkpoint_time <= data["last_synced"]: + data["checkpoint_completed"] = "Yes" + data["checkpoint_time"] = checkpoint_time + data["checkpoint_completion_time"] = data["last_synced"] + else: + data["checkpoint_completed"] = "No" + data["checkpoint_time"] = checkpoint_time + data["checkpoint_completion_time"] = DEFAULT_STATUS + + if data["checkpoint_time"] not in [0, DEFAULT_STATUS]: + chkpt_time = data["checkpoint_time"] + data["checkpoint_time"] = human_time(chkpt_time) + data["checkpoint_time_utc"] = human_time_utc(chkpt_time) + + if data["checkpoint_completion_time"] not in [0, DEFAULT_STATUS]: + chkpt_completion_time = data["checkpoint_completion_time"] + data["checkpoint_completion_time"] = human_time( + chkpt_completion_time) + data["checkpoint_completion_time_utc"] = human_time_utc( + chkpt_completion_time) + + if data["last_synced"] == 0: + data["last_synced"] = DEFAULT_STATUS + data["last_synced_utc"] = DEFAULT_STATUS + else: + last_synced = data["last_synced"] + data["last_synced"] = human_time(last_synced) + data["last_synced_utc"] = human_time_utc(last_synced) + + if data["worker_status"] != "Active": + data["last_synced"] = DEFAULT_STATUS + data["last_synced_utc"] = DEFAULT_STATUS + data["crawl_status"] = DEFAULT_STATUS + data["entry"] = DEFAULT_STATUS + data["data"] = DEFAULT_STATUS + data["meta"] = DEFAULT_STATUS + data["failures"] = DEFAULT_STATUS + data["checkpoint_completed"] = DEFAULT_STATUS + data["checkpoint_time"] = DEFAULT_STATUS + data["checkpoint_completed_time"] = DEFAULT_STATUS + data["checkpoint_time_utc"] = DEFAULT_STATUS + data["checkpoint_completion_time_utc"] = DEFAULT_STATUS + + if data["worker_status"] not in ["Active", "Passive"]: + data["slave_node"] = DEFAULT_STATUS + + return data + + def print_status(self, checkpoint_time=0): + for key, value in self.get_status(checkpoint_time).items(): + print ("%s: %s" % (key, value)) |