diff options
Diffstat (limited to 'geo-replication/syncdaemon/gsyncdstatus.py')
| -rw-r--r-- | geo-replication/syncdaemon/gsyncdstatus.py | 99 |
1 files changed, 85 insertions, 14 deletions
diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py index f4d50c19469..1a655ff8887 100644 --- a/geo-replication/syncdaemon/gsyncdstatus.py +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 # # Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> # This file is part of GlusterFS. @@ -9,14 +9,22 @@ # cases as published by the Free Software Foundation. # +from __future__ import print_function import fcntl import os import tempfile -import urllib +try: + import urllib.parse as urllib +except ImportError: + import urllib import json import time from datetime import datetime from errno import EACCES, EAGAIN, ENOENT +import logging + +from syncdutils import (EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event, + EVENT_GEOREP_CHECKPOINT_COMPLETED, lf) DEFAULT_STATUS = "N/A" MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped") @@ -52,6 +60,7 @@ def get_default_values(): "slave_node": DEFAULT_STATUS, "worker_status": DEFAULT_STATUS, "last_synced": 0, + "last_synced_entry": 0, "crawl_status": DEFAULT_STATUS, "entry": 0, "data": 0, @@ -94,6 +103,7 @@ class LockedOpen(object): return f def __exit__(self, _exc_type, _exc_value, _traceback): + fcntl.flock(self.fileobj, fcntl.LOCK_UN) self.fileobj.close() @@ -114,7 +124,12 @@ def set_monitor_status(status_file, status): class GeorepStatus(object): - def __init__(self, monitor_status_file, brick, monitor_pid_file=None): + def __init__(self, monitor_status_file, master_node, brick, master_node_id, + master, slave, monitor_pid_file=None): + self.master = master + slv_data = slave.split("::") + self.slave_host = slv_data[0] + self.slave_volume = slv_data[1].split(":")[0] # Remove Slave UUID self.work_dir = os.path.dirname(monitor_status_file) self.monitor_status_file = monitor_status_file self.filename = os.path.join(self.work_dir, @@ -125,18 +140,35 @@ class GeorepStatus(object): os.close(fd) fd = os.open(self.monitor_status_file, os.O_CREAT | os.O_RDWR) os.close(fd) + self.master_node = master_node + self.master_node_id = master_node_id self.brick = brick self.default_values = get_default_values() self.monitor_pid_file = monitor_pid_file + def send_event(self, event_type, **kwargs): + gf_event(event_type, + master_volume=self.master, + master_node=self.master_node, + master_node_id=self.master_node_id, + slave_host=self.slave_host, + slave_volume=self.slave_volume, + brick_path=self.brick, + **kwargs) + def _update(self, mergerfunc): + data = self.default_values with LockedOpen(self.filename, 'r+') as f: try: - data = json.load(f) + data.update(json.load(f)) except ValueError: - data = self.default_values + pass data = mergerfunc(data) + # If Data is not changed by merger func + if not data: + return False + with tempfile.NamedTemporaryFile( 'w', dir=os.path.dirname(self.filename), @@ -149,6 +181,7 @@ class GeorepStatus(object): os.O_DIRECTORY) os.fsync(dirfd) os.close(dirfd) + return True def reset_on_worker_start(self): def merger(data): @@ -163,10 +196,20 @@ class GeorepStatus(object): def set_field(self, key, value): def merger(data): + # Current data and prev data is same + if data[key] == value: + return {} + data[key] = value return json.dumps(data) - self._update(merger) + return self._update(merger) + + def trigger_gf_event_checkpoint_completion(self, checkpoint_time, + checkpoint_completion_time): + self.send_event(EVENT_GEOREP_CHECKPOINT_COMPLETED, + checkpoint_time=checkpoint_time, + checkpoint_completion_time=checkpoint_completion_time) def set_last_synced(self, value, checkpoint_time): def merger(data): @@ -184,18 +227,30 @@ class GeorepStatus(object): # previously then update the checkpoint completed time if checkpoint_time > 0 and checkpoint_time <= value[0]: if data["checkpoint_completed"] == "No": + curr_time = int(time.time()) data["checkpoint_time"] = checkpoint_time - data["checkpoint_completion_time"] = int(time.time()) + data["checkpoint_completion_time"] = curr_time data["checkpoint_completed"] = "Yes" + logging.info(lf("Checkpoint completed", + checkpoint_time=human_time_utc( + checkpoint_time), + completion_time=human_time_utc(curr_time))) + self.trigger_gf_event_checkpoint_completion( + checkpoint_time, curr_time) + return json.dumps(data) self._update(merger) def set_worker_status(self, status): - self.set_field("worker_status", status) + if self.set_field("worker_status", status): + logging.info(lf("Worker Status Change", + status=status)) def set_worker_crawl_status(self, status): - self.set_field("crawl_status", status) + if self.set_field("crawl_status", status): + logging.info(lf("Crawl Status Change", + status=status)) def set_slave_node(self, slave_node): def merger(data): @@ -221,10 +276,16 @@ class GeorepStatus(object): self._update(merger) def set_active(self): - self.set_field("worker_status", "Active") + if self.set_field("worker_status", "Active"): + logging.info(lf("Worker Status Change", + status="Active")) + self.send_event(EVENT_GEOREP_ACTIVE) def set_passive(self): - self.set_field("worker_status", "Passive") + if self.set_field("worker_status", "Passive"): + logging.info(lf("Worker Status Change", + status="Passive")) + self.send_event(EVENT_GEOREP_PASSIVE) def get_monitor_status(self): data = "" @@ -239,6 +300,7 @@ class GeorepStatus(object): slave_node N/A VALUE VALUE N/A status Created VALUE Paused Stopped last_synced N/A VALUE VALUE VALUE + last_synced_entry N/A VALUE VALUE VALUE crawl_status N/A VALUE N/A N/A entry N/A VALUE N/A N/A data N/A VALUE N/A N/A @@ -343,6 +405,15 @@ class GeorepStatus(object): return data - def print_status(self, checkpoint_time=0): - for key, value in self.get_status(checkpoint_time).items(): - print ("%s: %s" % (key, value)) + def print_status(self, checkpoint_time=0, json_output=False): + status_out = self.get_status(checkpoint_time) + if json_output: + out = {} + # Convert all values as string + for k, v in status_out.items(): + out[k] = str(v) + print(json.dumps(out)) + return + + for key, value in status_out.items(): + print(("%s: %s" % (key, value))) |
