diff options
Diffstat (limited to 'geo-replication/syncdaemon')
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 4 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncdstatus.py | 50 | ||||
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 5 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 7 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 19 |
5 files changed, 77 insertions, 8 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index ee2a9b334d3..3718ba83141 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -39,6 +39,7 @@ from changelogagent import agent, Changelog from gsyncdstatus import set_monitor_status, GeorepStatus from libcxattr import Xattr import struct +from syncdutils import get_master_and_slave_data_from_args ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -698,8 +699,11 @@ def main_i(): status_get = rconf.get('status_get') if status_get: + master_name, slave_data = get_master_and_slave_data_from_args(args) for brick in gconf.path: brick_status = GeorepStatus(gconf.state_file, brick, + master_name, + slave_data, getattr(gconf, "pid_file", None)) checkpoint_time = int(getattr(gconf, "checkpoint", "0")) brick_status.print_status(checkpoint_time=checkpoint_time) diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py index d36ddce865e..f0836edbb26 100644 --- a/geo-replication/syncdaemon/gsyncdstatus.py +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -17,6 +17,8 @@ import json import time from datetime import datetime from errno import EACCES, EAGAIN, ENOENT +from syncdutils import EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event +from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED DEFAULT_STATUS = "N/A" MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped") @@ -115,7 +117,12 @@ def set_monitor_status(status_file, status): class GeorepStatus(object): - def __init__(self, monitor_status_file, brick, monitor_pid_file=None): + def __init__(self, monitor_status_file, brick, master, slave, + monitor_pid_file=None): + self.master = master + slv_data = slave.split("::") + self.slave_host = slv_data[0] + self.slave_volume = slv_data[1].split(":")[0] # Remove Slave UUID self.work_dir = os.path.dirname(monitor_status_file) self.monitor_status_file = monitor_status_file self.filename = os.path.join(self.work_dir, @@ -138,6 +145,10 @@ class GeorepStatus(object): data = self.default_values data = mergerfunc(data) + # If Data is not changed by merger func + if not data: + return False + with tempfile.NamedTemporaryFile( 'w', dir=os.path.dirname(self.filename), @@ -150,6 +161,7 @@ class GeorepStatus(object): os.O_DIRECTORY) os.fsync(dirfd) os.close(dirfd) + return True def reset_on_worker_start(self): def merger(data): @@ -164,10 +176,24 @@ class GeorepStatus(object): def set_field(self, key, value): def merger(data): + # Current data and prev data is same + if data[key] == value: + return {} + data[key] = value return json.dumps(data) - self._update(merger) + return self._update(merger) + + def trigger_gf_event_checkpoint_completion(self, checkpoint_time, + checkpoint_completion_time): + gf_event(EVENT_GEOREP_CHECKPOINT_COMPLETED, + master_volume=self.master, + slave_host=self.slave_host, + slave_volume=self.slave_volume, + brick_path=self.brick, + checkpoint_time=checkpoint_time, + checkpoint_completion_time=checkpoint_completion_time) def set_last_synced(self, value, checkpoint_time): def merger(data): @@ -185,9 +211,13 @@ class GeorepStatus(object): # previously then update the checkpoint completed time if checkpoint_time > 0 and checkpoint_time <= value[0]: if data["checkpoint_completed"] == "No": + curr_time = int(time.time()) data["checkpoint_time"] = checkpoint_time - data["checkpoint_completion_time"] = int(time.time()) + data["checkpoint_completion_time"] = curr_time data["checkpoint_completed"] = "Yes" + self.trigger_gf_event_checkpoint_completion( + checkpoint_time, curr_time) + return json.dumps(data) self._update(merger) @@ -222,10 +252,20 @@ class GeorepStatus(object): self._update(merger) def set_active(self): - self.set_field("worker_status", "Active") + if self.set_field("worker_status", "Active"): + gf_event(EVENT_GEOREP_ACTIVE, + master_volume=self.master, + slave_host=self.slave_host, + slave_volume=self.slave_volume, + brick_path=self.brick) def set_passive(self): - self.set_field("worker_status", "Passive") + if self.set_field("worker_status", "Passive"): + gf_event(EVENT_GEOREP_PASSIVE, + master_volume=self.master, + slave_host=self.slave_host, + slave_volume=self.slave_volume, + brick_path=self.brick) def get_monitor_status(self): data = "" diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 22cd1cc3a86..7eddd26d5ea 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -212,7 +212,10 @@ class Monitor(object): """ if not self.status.get(w[0]['dir'], None): self.status[w[0]['dir']] = GeorepStatus(gconf.state_file, - w[0]['dir']) + w[0]['dir'], + master, + "%s::%s" % (slave_host, + slave_vol)) set_monitor_status(gconf.state_file, self.ST_STARTED) self.status[w[0]['dir']].set_worker_status(self.ST_INIT) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 9dd8988dc6d..0b756b750e7 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -40,7 +40,7 @@ from syncdutils import ChangelogException, ChangelogHistoryNotAvailable from syncdutils import get_changelog_log_level from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION from gsyncdstatus import GeorepStatus - +from syncdutils import get_master_and_slave_data_from_args UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I) @@ -1541,7 +1541,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): changelog_register_failed = False (inf, ouf, ra, wa) = gconf.rpc_fd.split(',') changelog_agent = RepceClient(int(inf), int(ouf)) - status = GeorepStatus(gconf.state_file, gconf.local_path) + master_name, slave_data = get_master_and_slave_data_from_args( + sys.argv) + status = GeorepStatus(gconf.state_file, gconf.local_path, + master_name, slave_data) status.reset_on_worker_start() rv = changelog_agent.version() if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 5dabbeaccef..5b926e0c271 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -28,10 +28,17 @@ sys.path.insert(1, GLUSTERFS_LIBEXECDIR) EVENTS_ENABLED = True try: from events.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY + from events.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE + from events.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE + from events.eventtypes import GEOREP_CHECKPOINT_COMPLETED \ + as EVENT_GEOREP_CHECKPOINT_COMPLETED except ImportError: # Events APIs not installed, dummy eventtypes with None EVENTS_ENABLED = False EVENT_GEOREP_FAULTY = None + EVENT_GEOREP_ACTIVE = None + EVENT_GEOREP_PASSIVE = None + EVENT_GEOREP_CHECKPOINT_COMPLETED = None try: from cPickle import PickleError @@ -542,3 +549,15 @@ class GlusterLogLevel(object): def get_changelog_log_level(lvl): return getattr(GlusterLogLevel, lvl, GlusterLogLevel.INFO) + + +def get_master_and_slave_data_from_args(args): + master_name = None + slave_data = None + for arg in args: + if arg.startswith(":"): + master_name = arg.replace(":", "") + if "::" in arg: + slave_data = arg.replace("ssh://", "") + + return (master_name, slave_data) |