summaryrefslogtreecommitdiffstats
path: root/geo-replication
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2015-03-12 16:07:13 +0530
committerVijay Bellur <vbellur@redhat.com>2015-05-05 02:15:24 -0700
commit98b69412e92742e0638ef8bd76223671386f5a39 (patch)
tree4d02c8989c50c7b219404900bc7beac327b19dca /geo-replication
parente02ac3c28241ff004d6cfbfc03975822146ce5dd (diff)
geo-rep: Status Enhancements
Discussion in gluster-devel http://www.gluster.org/pipermail/gluster-devel/2015-April/044301.html MASTER NODE - Master Volume Node MASTER VOL - Master Volume name MASTER BRICK - Master Volume Brick SLAVE USER - Slave User to which Geo-rep session is established SLAVE - <SLAVE_NODE>::<SLAVE_VOL> used in Geo-rep Create command SLAVE NODE - Slave Node to which Master worker is connected STATUS - Worker Status(Created, Initializing, Active, Passive, Faulty, Paused, Stopped) CRAWL STATUS - Crawl type(Hybrid Crawl, History Crawl, Changelog Crawl) LAST_SYNCED - Last Synced Time(Local Time in CLI output and UTC in XML output) ENTRY - Number of entry Operations pending.(Resets on worker restart) DATA - Number of Data operations pending(Resets on worker restart) META - Number of Meta operations pending(Resets on worker restart) FAILURES - Number of Failures CHECKPOINT TIME - Checkpoint set Time(Local Time in CLI output and UTC in XML output) CHECKPOINT COMPLETED - Yes/No or N/A CHECKPOINT COMPLETION TIME - Checkpoint Completed Time(Local Time in CLI output and UTC in XML output) XML output: <?xml version="1.0" encoding="UTF-8" standalone="yes"?> cliOutput> geoRep> volume> name> sessions> session> session_slave> pair> master_node> master_brick> slave_user> slave/> slave_node> status> crawl_status> entry> data> meta> failures> checkpoint_completed> master_node_uuid> last_synced> checkpoint_time> checkpoint_completion_time> BUG: 1212410 Change-Id: I944a6c3c67f1e6d6baf9670b474233bec8f61ea3 Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/10121 Tested-by: NetBSD Build System Reviewed-by: Kotresh HR <khiremat@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'geo-replication')
-rw-r--r--geo-replication/syncdaemon/Makefile.am3
-rw-r--r--geo-replication/syncdaemon/gsyncd.py25
-rw-r--r--geo-replication/syncdaemon/gsyncdstatus.py317
-rw-r--r--geo-replication/syncdaemon/master.py374
-rw-r--r--geo-replication/syncdaemon/monitor.py58
-rw-r--r--geo-replication/syncdaemon/resource.py12
-rw-r--r--geo-replication/tests/unit/test_gsyncdstatus.py193
7 files changed, 608 insertions, 374 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am
index 885963eae2b..ed0f5e40924 100644
--- a/geo-replication/syncdaemon/Makefile.am
+++ b/geo-replication/syncdaemon/Makefile.am
@@ -2,6 +2,7 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon
syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \
resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \
- $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py
+ $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py \
+ gsyncdstatus.py
CLEANFILES =
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index b9ee5aec8c7..32e4eb7828d 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -27,12 +27,13 @@ from ipaddr import IPAddress, IPNetwork
from gconf import gconf
from syncdutils import FreeObject, norm, grabpidfile, finalize
-from syncdutils import log_raise_exception, privileged, update_file
+from syncdutils import log_raise_exception, privileged
from syncdutils import GsyncdError, select, set_term_handler
from configinterface import GConffile, upgrade_config_file
import resource
from monitor import monitor
from changelogagent import agent, Changelog
+from gsyncdstatus import set_monitor_status, GeorepStatus
class GLogger(Logger):
@@ -267,7 +268,7 @@ def main_i():
op.add_option('--socketdir', metavar='DIR')
op.add_option('--state-socket-unencoded', metavar='SOCKF',
type=str, action='callback', callback=store_abs)
- op.add_option('--checkpoint', metavar='LABEL', default='')
+ op.add_option('--checkpoint', metavar='LABEL', default='0')
# tunables for failover/failback mechanism:
# None - gsyncd behaves as normal
@@ -315,6 +316,8 @@ def main_i():
action='callback', callback=store_local)
op.add_option('--delete', dest='delete', action='callback',
callback=store_local_curry(True))
+ op.add_option('--status-get', dest='status_get', action='callback',
+ callback=store_local_curry(True))
op.add_option('--debug', dest="go_daemon", action='callback',
callback=lambda *a: (store_local_curry('dont')(*a),
setattr(
@@ -583,15 +586,8 @@ def main_i():
GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf')
if confdata.op == 'set':
logging.info('checkpoint %s set' % confdata.val)
- gcnf.delete('checkpoint_completed')
- gcnf.delete('checkpoint_target')
elif confdata.op == 'del':
logging.info('checkpoint info was reset')
- # if it is removing 'checkpoint' then we need
- # to remove 'checkpoint_completed' and 'checkpoint_target' too
- gcnf.delete('checkpoint_completed')
- gcnf.delete('checkpoint_target')
-
except IOError:
if sys.exc_info()[1].errno == ENOENT:
# directory of log path is not present,
@@ -607,7 +603,7 @@ def main_i():
create = rconf.get('create')
if create:
if getattr(gconf, 'state_file', None):
- update_file(gconf.state_file, lambda f: f.write(create + '\n'))
+ set_monitor_status(gconf.state_file, create)
return
go_daemon = rconf['go_daemon']
@@ -615,6 +611,15 @@ def main_i():
be_agent = rconf.get('agent')
rscs, local, remote = makersc(args)
+
+ status_get = rconf.get('status_get')
+ if status_get:
+ for brick in gconf.path:
+ brick_status = GeorepStatus(gconf.state_file, brick)
+ checkpoint_time = int(getattr(gconf, "checkpoint", "0"))
+ brick_status.print_status(checkpoint_time=checkpoint_time)
+ return
+
if not be_monitor and isinstance(remote, resource.SSH) and \
go_daemon == 'should':
go_daemon = 'postconn'
diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py
new file mode 100644
index 00000000000..a49b9c23dea
--- /dev/null
+++ b/geo-replication/syncdaemon/gsyncdstatus.py
@@ -0,0 +1,317 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import fcntl
+import os
+import tempfile
+import urllib
+import json
+import time
+from datetime import datetime
+
+DEFAULT_STATUS = "N/A"
+MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped")
+STATUS_VALUES = (DEFAULT_STATUS,
+ "Initializing...",
+ "Active",
+ "Passive",
+ "Faulty")
+
+CRAWL_STATUS_VALUES = (DEFAULT_STATUS,
+ "Hybrid Crawl",
+ "History Crawl",
+ "Changelog Crawl")
+
+
+def human_time(ts):
+ try:
+ return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S")
+ except ValueError:
+ return DEFAULT_STATUS
+
+
+def human_time_utc(ts):
+ try:
+ return datetime.utcfromtimestamp(
+ float(ts)).strftime("%Y-%m-%d %H:%M:%S")
+ except ValueError:
+ return DEFAULT_STATUS
+
+
+def get_default_values():
+ return {
+ "slave_node": DEFAULT_STATUS,
+ "worker_status": DEFAULT_STATUS,
+ "last_synced": 0,
+ "crawl_status": DEFAULT_STATUS,
+ "entry": 0,
+ "data": 0,
+ "meta": 0,
+ "failures": 0,
+ "checkpoint_completed": DEFAULT_STATUS,
+ "checkpoint_time": 0,
+ "checkpoint_completion_time": 0}
+
+
+class LockedOpen(object):
+
+ def __init__(self, filename, *args, **kwargs):
+ self.filename = filename
+ self.open_args = args
+ self.open_kwargs = kwargs
+ self.fileobj = None
+
+ def __enter__(self):
+ """
+ If two processes compete to update a file, The first process
+ gets the lock and the second process is blocked in the fcntl.flock()
+ call. When first process replaces the file and releases the lock,
+ the already open file descriptor in the second process now points
+ to a "ghost" file(not reachable by any path name) with old contents.
+ To avoid that conflict, check the fd already opened is same or
+ not. Open new one if not same
+ """
+ f = open(self.filename, *self.open_args, **self.open_kwargs)
+ while True:
+ fcntl.flock(f, fcntl.LOCK_EX)
+ fnew = open(self.filename, *self.open_args, **self.open_kwargs)
+ if os.path.sameopenfile(f.fileno(), fnew.fileno()):
+ fnew.close()
+ break
+ else:
+ f.close()
+ f = fnew
+ self.fileobj = f
+ return f
+
+ def __exit__(self, _exc_type, _exc_value, _traceback):
+ self.fileobj.close()
+
+
+def set_monitor_status(status_file, status):
+ fd = os.open(status_file, os.O_CREAT | os.O_RDWR)
+ os.close(fd)
+ with LockedOpen(status_file, 'r+'):
+ with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(status_file),
+ delete=False) as tf:
+ tf.write(status)
+ tempname = tf.name
+
+ os.rename(tempname, status_file)
+ dirfd = os.open(os.path.dirname(os.path.abspath(status_file)),
+ os.O_DIRECTORY)
+ os.fsync(dirfd)
+ os.close(dirfd)
+
+
+class GeorepStatus(object):
+ def __init__(self, monitor_status_file, brick):
+ self.work_dir = os.path.dirname(monitor_status_file)
+ self.monitor_status_file = monitor_status_file
+ self.filename = os.path.join(self.work_dir,
+ "brick_%s.status"
+ % urllib.quote_plus(brick))
+
+ fd = os.open(self.filename, os.O_CREAT | os.O_RDWR)
+ os.close(fd)
+ fd = os.open(self.monitor_status_file, os.O_CREAT | os.O_RDWR)
+ os.close(fd)
+ self.brick = brick
+ self.default_values = get_default_values()
+
+ def _update(self, mergerfunc):
+ with LockedOpen(self.filename, 'r+') as f:
+ try:
+ data = json.load(f)
+ except ValueError:
+ data = self.default_values
+
+ data = mergerfunc(data)
+ with tempfile.NamedTemporaryFile(
+ 'w',
+ dir=os.path.dirname(self.filename),
+ delete=False) as tf:
+ tf.write(data)
+ tempname = tf.name
+
+ os.rename(tempname, self.filename)
+ dirfd = os.open(os.path.dirname(os.path.abspath(self.filename)),
+ os.O_DIRECTORY)
+ os.fsync(dirfd)
+ os.close(dirfd)
+
+ def reset_on_worker_start(self):
+ def merger(data):
+ data["slave_node"] = DEFAULT_STATUS
+ data["crawl_status"] = DEFAULT_STATUS
+ data["entry"] = 0
+ data["data"] = 0
+ data["meta"] = 0
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def set_field(self, key, value):
+ def merger(data):
+ data[key] = value
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def set_last_synced(self, value, checkpoint_time):
+ def merger(data):
+ data["last_synced"] = value[0]
+
+ # If checkpoint is not set or reset
+ # or if last set checkpoint is changed
+ if checkpoint_time == 0 or \
+ checkpoint_time != data["checkpoint_time"]:
+ data["checkpoint_time"] = 0
+ data["checkpoint_completion_time"] = 0
+ data["checkpoint_completed"] = "No"
+
+ # If checkpoint is completed and not marked as completed
+ # previously then update the checkpoint completed time
+ if checkpoint_time > 0 and checkpoint_time <= value[0]:
+ if data["checkpoint_completed"] == "No":
+ data["checkpoint_time"] = checkpoint_time
+ data["checkpoint_completion_time"] = int(time.time())
+ data["checkpoint_completed"] = "Yes"
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def set_worker_status(self, status):
+ self.set_field("worker_status", status)
+
+ def set_worker_crawl_status(self, status):
+ self.set_field("crawl_status", status)
+
+ def set_slave_node(self, slave_node):
+ def merger(data):
+ data["slave_node"] = slave_node
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def inc_value(self, key, value):
+ def merger(data):
+ data[key] = data.get(key, 0) + value
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def dec_value(self, key, value):
+ def merger(data):
+ data[key] = data.get(key, 0) - value
+ if data[key] < 0:
+ data[key] = 0
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def set_active(self):
+ self.set_field("worker_status", "Active")
+
+ def set_passive(self):
+ self.set_field("worker_status", "Passive")
+
+ def get_monitor_status(self):
+ data = ""
+ with open(self.monitor_status_file, "r") as f:
+ data = f.read().strip()
+ return data
+
+ def get_status(self, checkpoint_time=0):
+ """
+ Monitor Status ---> Created Started Paused Stopped
+ ----------------------------------------------------------------------
+ slave_node N/A VALUE VALUE N/A
+ status Created VALUE Paused Stopped
+ last_synced N/A VALUE VALUE VALUE
+ crawl_status N/A VALUE N/A N/A
+ entry N/A VALUE N/A N/A
+ data N/A VALUE N/A N/A
+ meta N/A VALUE N/A N/A
+ failures N/A VALUE VALUE VALUE
+ checkpoint_completed N/A VALUE VALUE VALUE
+ checkpoint_time N/A VALUE VALUE VALUE
+ checkpoint_completed_time N/A VALUE VALUE VALUE
+ """
+ data = self.default_values
+ with open(self.filename) as f:
+ try:
+ data.update(json.load(f))
+ except ValueError:
+ pass
+ monitor_status = self.get_monitor_status()
+
+ if monitor_status in ["Created", "Paused", "Stopped"]:
+ data["worker_status"] = monitor_status
+
+ # Checkpoint adjustments
+ if checkpoint_time == 0:
+ data["checkpoint_completed"] = DEFAULT_STATUS
+ data["checkpoint_time"] = DEFAULT_STATUS
+ data["checkpoint_completion_time"] = DEFAULT_STATUS
+ else:
+ if checkpoint_time != data["checkpoint_time"]:
+ if checkpoint_time <= data["last_synced"]:
+ data["checkpoint_completed"] = "Yes"
+ data["checkpoint_time"] = checkpoint_time
+ data["checkpoint_completion_time"] = data["last_synced"]
+ else:
+ data["checkpoint_completed"] = "No"
+ data["checkpoint_time"] = checkpoint_time
+ data["checkpoint_completion_time"] = DEFAULT_STATUS
+
+ if data["checkpoint_time"] not in [0, DEFAULT_STATUS]:
+ chkpt_time = data["checkpoint_time"]
+ data["checkpoint_time"] = human_time(chkpt_time)
+ data["checkpoint_time_utc"] = human_time_utc(chkpt_time)
+
+ if data["checkpoint_completion_time"] not in [0, DEFAULT_STATUS]:
+ chkpt_completion_time = data["checkpoint_completion_time"]
+ data["checkpoint_completion_time"] = human_time(
+ chkpt_completion_time)
+ data["checkpoint_completion_time_utc"] = human_time_utc(
+ chkpt_completion_time)
+
+ if data["last_synced"] == 0:
+ data["last_synced"] = DEFAULT_STATUS
+ data["last_synced_utc"] = DEFAULT_STATUS
+ else:
+ last_synced = data["last_synced"]
+ data["last_synced"] = human_time(last_synced)
+ data["last_synced_utc"] = human_time_utc(last_synced)
+
+ if data["worker_status"] != "Active":
+ data["last_synced"] = DEFAULT_STATUS
+ data["last_synced_utc"] = DEFAULT_STATUS
+ data["crawl_status"] = DEFAULT_STATUS
+ data["entry"] = DEFAULT_STATUS
+ data["data"] = DEFAULT_STATUS
+ data["meta"] = DEFAULT_STATUS
+ data["failures"] = DEFAULT_STATUS
+ data["checkpoint_completed"] = DEFAULT_STATUS
+ data["checkpoint_time"] = DEFAULT_STATUS
+ data["checkpoint_completed_time"] = DEFAULT_STATUS
+ data["checkpoint_time_utc"] = DEFAULT_STATUS
+ data["checkpoint_completion_time_utc"] = DEFAULT_STATUS
+
+ if data["worker_status"] not in ["Active", "Passive"]:
+ data["slave_node"] = DEFAULT_STATUS
+
+ return data
+
+ def print_status(self, checkpoint_time=0):
+ for key, value in self.get_status(checkpoint_time).items():
+ print ("%s: %s" % (key, value))
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 38535884ec6..8e4c43046b0 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -15,17 +15,15 @@ import stat
import json
import logging
import fcntl
-import socket
import string
import errno
import tarfile
-from errno import ENOENT, ENODATA, EPIPE, EEXIST, EACCES, EAGAIN
+from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN
from threading import Condition, Lock
from datetime import datetime
from gconf import gconf
-from tempfile import NamedTemporaryFile
from syncdutils import Thread, GsyncdError, boolify, escape
-from syncdutils import unescape, select, gauxpfx, md5hex, selfkill
+from syncdutils import unescape, gauxpfx, md5hex, selfkill
from syncdutils import lstat, errno_wrap
from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
@@ -397,18 +395,6 @@ class GMasterCommon(object):
raise
return default_data
- def update_crawl_data(self):
- if getattr(gconf, 'state_detail_file', None):
- try:
- same_dir = os.path.dirname(gconf.state_detail_file)
- with NamedTemporaryFile(dir=same_dir, delete=False) as tmp:
- json.dump(self.total_crawl_stats, tmp)
- tmp.flush()
- os.fsync(tmp.fileno())
- os.rename(tmp.name, gconf.state_detail_file)
- except (IOError, OSError):
- raise
-
def __init__(self, master, slave):
self.master = master
self.slave = slave
@@ -434,14 +420,12 @@ class GMasterCommon(object):
self.total_turns = int(gconf.turns)
self.crawl_start = datetime.now()
self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0}
- self.total_crawl_stats = None
self.start = None
self.change_seen = None
# the actual volinfo we make use of
self.volinfo = None
self.terminate = False
self.sleep_interval = 1
- self.checkpoint_thread = None
self.current_files_skipped_count = 0
self.skipped_gfid_list = []
self.unlinked_gfids = []
@@ -493,7 +477,6 @@ class GMasterCommon(object):
logging.debug("Got the lock")
return True
-
def should_crawl(self):
if not gconf.use_meta_volume:
return gconf.glusterd_uuid in self.master.server.node_uuid()
@@ -503,7 +486,6 @@ class GMasterCommon(object):
sys.exit(1)
return self.mgmt_lock()
-
def register(self):
self.register()
@@ -542,10 +524,8 @@ class GMasterCommon(object):
if self.volinfo['retval']:
logging.warn("master cluster's info may not be valid %d" %
self.volinfo['retval'])
- self.start_checkpoint_thread()
else:
raise GsyncdError("master volinfo unavailable")
- self.total_crawl_stats = self.get_initial_crawl_data()
self.lastreport['time'] = time.time()
logging.info('crawl interval: %d seconds' % self.sleep_interval)
@@ -570,7 +550,7 @@ class GMasterCommon(object):
t0 = t1
self.update_worker_remote_node()
if not crawl:
- self.update_worker_health("Passive")
+ self.status.set_passive()
# bring up _this_ brick to the cluster stime
# which is min of cluster (but max of the replicas)
brick_stime = self.xtime('.', self.slave)
@@ -597,35 +577,14 @@ class GMasterCommon(object):
time.sleep(5)
continue
- self.update_worker_health("Active")
+
+ self.status.set_active()
self.crawl()
+
if oneshot:
return
time.sleep(self.sleep_interval)
- @classmethod
- def _checkpt_param(cls, chkpt, prm, xtimish=True):
- """use config backend to lookup a parameter belonging to
- checkpoint @chkpt"""
- cprm = gconf.configinterface.get_realtime('checkpoint_' + prm)
- if not cprm:
- return
- chkpt_mapped, val = cprm.split(':', 1)
- if unescape(chkpt_mapped) != chkpt:
- return
- if xtimish:
- val = cls.deserialize_xtime(val)
- return val
-
- @classmethod
- def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True):
- """use config backend to store a parameter associated
- with checkpoint @chkpt"""
- if xtimish:
- val = cls.serialize_xtime(val)
- gconf.configinterface.set(
- 'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))
-
@staticmethod
def humantime(*tpair):
"""format xtime-like (sec, nsec) pair to human readable format"""
@@ -654,116 +613,6 @@ class GMasterCommon(object):
string.zfill(m, 2), string.zfill(s, 2))
return date
- def checkpt_service(self, chan, chkpt):
- """checkpoint service loop
-
- monitor and verify checkpoint status for @chkpt, and listen
- for incoming requests for whom we serve a pretty-formatted
- status report"""
- while True:
- chkpt = gconf.configinterface.get_realtime("checkpoint")
- if not chkpt:
- gconf.configinterface.delete("checkpoint_completed")
- gconf.configinterface.delete("checkpoint_target")
- # dummy loop for the case when there is no checkpt set
- select([chan], [], [])
- conn, _ = chan.accept()
- conn.send('\0')
- conn.close()
- continue
-
- checkpt_tgt = self._checkpt_param(chkpt, 'target')
- if not checkpt_tgt:
- checkpt_tgt = self.xtime('.')
- if isinstance(checkpt_tgt, int):
- raise GsyncdError("master root directory is "
- "unaccessible (%s)",
- os.strerror(checkpt_tgt))
- self._set_checkpt_param(chkpt, 'target', checkpt_tgt)
- logging.debug("checkpoint target %s has been determined "
- "for checkpoint %s" %
- (repr(checkpt_tgt), chkpt))
-
- # check if the label is 'now'
- chkpt_lbl = chkpt
- try:
- x1, x2 = chkpt.split(':')
- if x1 == 'now':
- chkpt_lbl = "as of " + self.humantime(x2)
- except:
- pass
- completed = self._checkpt_param(chkpt, 'completed', xtimish=False)
- if completed:
- completed = tuple(int(x) for x in completed.split('.'))
- s, _, _ = select([chan], [], [], (not completed) and 5 or None)
- # either request made and we re-check to not
- # give back stale data, or we still hunting for completion
- if (self.native_xtime(checkpt_tgt) and (
- self.native_xtime(checkpt_tgt) < self.volmark)):
- # indexing has been reset since setting the checkpoint
- status = "is invalid"
- else:
- xtr = self.xtime('.', self.slave)
- if isinstance(xtr, int):
- raise GsyncdError("slave root directory is "
- "unaccessible (%s)",
- os.strerror(xtr))
- ncompleted = self.xtime_geq(xtr, checkpt_tgt)
- if completed and not ncompleted: # stale data
- logging.warn("completion time %s for checkpoint %s "
- "became stale" %
- (self.humantime(*completed), chkpt))
- completed = None
- gconf.configinterface.delete('checkpoint_completed')
- if ncompleted and not completed: # just reaching completion
- completed = "%.6f" % time.time()
- self._set_checkpt_param(
- chkpt, 'completed', completed, xtimish=False)
- completed = tuple(int(x) for x in completed.split('.'))
- logging.info("checkpoint %s completed" % chkpt)
- status = completed and \
- "completed at " + self.humantime(completed[0]) or \
- "not reached yet"
- if s:
- conn = None
- try:
- conn, _ = chan.accept()
- try:
- conn.send("checkpoint %s is %s\0" %
- (chkpt_lbl, status))
- except:
- exc = sys.exc_info()[1]
- if ((isinstance(exc, OSError) or isinstance(
- exc, IOError)) and exc.errno == EPIPE):
- logging.debug('checkpoint client disconnected')
- else:
- raise
- finally:
- if conn:
- conn.close()
-
- def start_checkpoint_thread(self):
- """prepare and start checkpoint service"""
- if self.checkpoint_thread or not (
- getattr(gconf, 'state_socket_unencoded', None) and getattr(
- gconf, 'socketdir', None)
- ):
- return
- chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- state_socket = os.path.join(
- gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket")
- try:
- os.unlink(state_socket)
- except:
- if sys.exc_info()[0] == OSError:
- pass
- chan.bind(state_socket)
- chan.listen(1)
- chkpt = gconf.configinterface.get_realtime("checkpoint")
- t = Thread(target=self.checkpt_service, args=(chan, chkpt))
- t.start()
- self.checkpoint_thread = t
-
def add_job(self, path, label, job, *a, **kw):
"""insert @job function to job table at @path with @label"""
if self.jobtab.get(path) is None:
@@ -937,11 +786,15 @@ class GMasterChangelogMixin(GMasterCommon):
files_pending['purge'] += 1
def log_failures(failures, entry_key, gfid_prefix, log_prefix):
+ num_failures = 0
for failure in failures:
st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
if not isinstance(st, int):
+ num_failures += 1
logging.warn('%s FAILED: %s' % (log_prefix, repr(failure)))
+ self.status.inc_value("failures", num_failures)
+
for e in clist:
e = e.strip()
et = e[self.IDX_START:self.IDX_END] # entry type
@@ -1040,12 +893,18 @@ class GMasterChangelogMixin(GMasterCommon):
else:
logging.warn('got invalid changelog type: %s' % (et))
logging.debug('entries: %s' % repr(entries))
- if not retry:
- self.update_worker_cumilitive_status(files_pending)
+
+ # Increment counters for Status
+ self.status.inc_value("entry", len(entries))
+ self.files_in_batch = len(datas)
+ self.status.inc_value("data", self.files_in_batch)
+
# sync namespace
if entries:
failures = self.slave.server.entry_ops(entries)
log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
+ self.status.dec_value("entry", len(entries))
+
# sync metadata
if meta_gfid:
meta_entries = []
@@ -1059,8 +918,11 @@ class GMasterChangelogMixin(GMasterCommon):
continue
meta_entries.append(edct('META', go=go[0], stat=st))
if meta_entries:
+ self.status.inc_value("meta", len(entries))
failures = self.slave.server.meta_ops(meta_entries)
log_failures(failures, 'go', '', 'META')
+ self.status.dec_value("meta", len(entries))
+
# sync data
if datas:
self.a_syncdata(datas)
@@ -1112,9 +974,17 @@ class GMasterChangelogMixin(GMasterCommon):
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
+ chkpt_time = gconf.configinterface.get_realtime(
+ "checkpoint")
+ checkpoint_time = 0
+ if chkpt_time is not None:
+ checkpoint_time = int(chkpt_time)
+
+ self.status.set_last_synced(xtl, checkpoint_time)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
- self.update_worker_files_syncd()
+ self.status.dec_value("data", self.files_in_batch)
+ self.files_in_batch = 0
break
# We do not know which changelog transfer failed, retry everything.
@@ -1124,14 +994,22 @@ class GMasterChangelogMixin(GMasterCommon):
logging.warn('changelogs %s could not be processed - '
'moving on...' %
' '.join(map(os.path.basename, changes)))
- self.update_worker_total_files_skipped(
- self.current_files_skipped_count)
+ self.status.inc_value("failures",
+ self.current_files_skipped_count)
logging.warn('SKIPPED GFID = %s' %
','.join(self.skipped_gfid_list))
- self.update_worker_files_syncd()
+
+ self.files_in_batch = 0
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
+ chkpt_time = gconf.configinterface.get_realtime(
+ "checkpoint")
+ checkpoint_time = 0
+ if chkpt_time is not None:
+ checkpoint_time = int(chkpt_time)
+
+ self.status.set_last_synced(xtl, checkpoint_time)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
break
@@ -1152,161 +1030,12 @@ class GMasterChangelogMixin(GMasterCommon):
if not stime == URXTIME:
self.sendmark(path, stime)
- def get_worker_status_file(self):
- file_name = gconf.local_path + '.status'
- file_name = file_name.replace("/", "_")
- worker_status_file = gconf.georep_session_working_dir + file_name
- return worker_status_file
-
- def update_worker_status(self, key, value):
- default_data = {"remote_node": "N/A",
- "worker status": "Not Started",
- "crawl status": "N/A",
- "files_syncd": 0,
- "files_remaining": 0,
- "bytes_remaining": 0,
- "purges_remaining": 0,
- "total_files_skipped": 0}
- worker_status_file = self.get_worker_status_file()
- try:
- with open(worker_status_file, 'r+') as f:
- loaded_data = json.load(f)
- loaded_data[key] = value
- os.ftruncate(f.fileno(), 0)
- os.lseek(f.fileno(), 0, os.SEEK_SET)
- json.dump(loaded_data, f)
- f.flush()
- os.fsync(f.fileno())
- except (IOError, OSError, ValueError):
- logging.info('Creating new %s' % worker_status_file)
- try:
- with open(worker_status_file, 'wb') as f:
- default_data[key] = value
- json.dump(default_data, f)
- f.flush()
- os.fsync(f.fileno())
- except:
- raise
-
- def update_worker_cumilitive_status(self, files_pending):
- default_data = {"remote_node": "N/A",
- "worker status": "Not Started",
- "crawl status": "N/A",
- "files_syncd": 0,
- "files_remaining": 0,
- "bytes_remaining": 0,
- "purges_remaining": 0,
- "total_files_skipped": 0}
- worker_status_file = self.get_worker_status_file()
- try:
- with open(worker_status_file, 'r+') as f:
- loaded_data = json.load(f)
- loaded_data['files_remaining'] = files_pending['count']
- loaded_data['bytes_remaining'] = files_pending['bytes']
- loaded_data['purges_remaining'] = files_pending['purge']
- os.ftruncate(f.fileno(), 0)
- os.lseek(f.fileno(), 0, os.SEEK_SET)
- json.dump(loaded_data, f)
- f.flush()
- os.fsync(f.fileno())
- except (IOError, OSError, ValueError):
- logging.info('Creating new %s' % worker_status_file)
- try:
- with open(worker_status_file, 'wb') as f:
- default_data['files_remaining'] = files_pending['count']
- default_data['bytes_remaining'] = files_pending['bytes']
- default_data['purges_remaining'] = files_pending['purge']
- json.dump(default_data, f)
- f.flush()
- os.fsync(f.fileno())
- except:
- raise
-
def update_worker_remote_node(self):
node = sys.argv[-1]
- node = node.split("@")[-1]
+ node_data = node.split("@")
+ node = node_data[-1]
remote_node_ip = node.split(":")[0]
- remote_node_vol = node.split(":")[3]
- remote_node = remote_node_ip + '::' + remote_node_vol
- self.update_worker_status('remote_node', remote_node)
-
- def update_worker_health(self, state):
- self.update_worker_status('worker status', state)
-
- def update_worker_crawl_status(self, state):
- self.update_worker_status('crawl status', state)
-
- def update_worker_files_syncd(self):
- default_data = {"remote_node": "N/A",
- "worker status": "Not Started",
- "crawl status": "N/A",
- "files_syncd": 0,
- "files_remaining": 0,
- "bytes_remaining": 0,
- "purges_remaining": 0,
- "total_files_skipped": 0}
- worker_status_file = self.get_worker_status_file()
- try:
- with open(worker_status_file, 'r+') as f:
- loaded_data = json.load(f)
- loaded_data['files_syncd'] += loaded_data['files_remaining']
- loaded_data['files_remaining'] = 0
- loaded_data['bytes_remaining'] = 0
- loaded_data['purges_remaining'] = 0
- os.ftruncate(f.fileno(), 0)
- os.lseek(f.fileno(), 0, os.SEEK_SET)
- json.dump(loaded_data, f)
- f.flush()
- os.fsync(f.fileno())
- except (IOError, OSError, ValueError):
- logging.info('Creating new %s' % worker_status_file)
- try:
- with open(worker_status_file, 'wb') as f:
- json.dump(default_data, f)
- f.flush()
- os.fsync(f.fileno())
- except:
- raise
-
- def update_worker_files_remaining(self, state):
- self.update_worker_status('files_remaining', state)
-
- def update_worker_bytes_remaining(self, state):
- self.update_worker_status('bytes_remaining', state)
-
- def update_worker_purges_remaining(self, state):
- self.update_worker_status('purges_remaining', state)
-
- def update_worker_total_files_skipped(self, value):
- default_data = {"remote_node": "N/A",
- "worker status": "Not Started",
- "crawl status": "N/A",
- "files_syncd": 0,
- "files_remaining": 0,
- "bytes_remaining": 0,
- "purges_remaining": 0,
- "total_files_skipped": 0}
- worker_status_file = self.get_worker_status_file()
- try:
- with open(worker_status_file, 'r+') as f:
- loaded_data = json.load(f)
- loaded_data['total_files_skipped'] = value
- loaded_data['files_remaining'] -= value
- os.ftruncate(f.fileno(), 0)
- os.lseek(f.fileno(), 0, os.SEEK_SET)
- json.dump(loaded_data, f)
- f.flush()
- os.fsync(f.fileno())
- except (IOError, OSError, ValueError):
- logging.info('Creating new %s' % worker_status_file)
- try:
- with open(worker_status_file, 'wb') as f:
- default_data['total_files_skipped'] = value
- json.dump(default_data, f)
- f.flush()
- os.fsync(f.fileno())
- except:
- raise
+ self.status.set_slave_node(remote_node_ip)
def changelogs_batch_process(self, changes):
changelogs_batches = []
@@ -1331,7 +1060,7 @@ class GMasterChangelogMixin(GMasterCommon):
self.process(batch)
def crawl(self):
- self.update_worker_crawl_status("Changelog Crawl")
+ self.status.set_worker_crawl_status("Changelog Crawl")
changes = []
# get stime (from the brick) and purge changelogs
# that are _historical_ to that time.
@@ -1355,16 +1084,17 @@ class GMasterChangelogMixin(GMasterCommon):
self.changelogs_batch_process(changes)
- def register(self, register_time, changelog_agent):
+ def register(self, register_time, changelog_agent, status):
self.changelog_agent = changelog_agent
self.sleep_interval = int(gconf.change_interval)
self.changelog_done_func = self.changelog_agent.done
self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
".processed")
+ self.status = status
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
- def register(self, register_time, changelog_agent):
+ def register(self, register_time, changelog_agent, status):
self.changelog_agent = changelog_agent
self.changelog_register_time = register_time
self.history_crawl_start_time = register_time
@@ -1372,10 +1102,11 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
self.history_turns = 0
self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
".history/.processed")
+ self.status = status
def crawl(self):
self.history_turns += 1
- self.update_worker_crawl_status("History Crawl")
+ self.status.set_worker_crawl_status("History Crawl")
purge_time = self.get_purge_time()
logging.info('starting history crawl... turns: %s, stime: %s'
@@ -1455,7 +1186,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
XSYNC_MAX_ENTRIES = 1 << 13
- def register(self, register_time=None, changelog_agent=None):
+ def register(self, register_time=None, changelog_agent=None, status=None):
+ self.status = status
self.counter = 0
self.comlist = []
self.stimes = []
@@ -1486,7 +1218,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
t.start()
logging.info('starting hybrid crawl..., stime: %s'
% repr(self.get_purge_time()))
- self.update_worker_crawl_status("Hybrid Crawl")
+ self.status.set_worker_crawl_status("Hybrid Crawl")
while True:
try:
item = self.comlist.pop(0)
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index 029726c7a5a..ba5c8e32514 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -22,10 +22,12 @@ from errno import EEXIST
import re
import random
from gconf import gconf
-from syncdutils import update_file, select, waitpid
+from syncdutils import select, waitpid
from syncdutils import set_term_handler, is_host_local, GsyncdError
from syncdutils import escape, Thread, finalize, memoize
+from gsyncdstatus import GeorepStatus, set_monitor_status
+
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
@@ -125,46 +127,22 @@ class Volinfo(object):
def disperse_count(self):
return int(self.get('disperseCount')[0].text)
+
class Monitor(object):
"""class which spawns and manages gsyncd workers"""
ST_INIT = 'Initializing...'
- ST_STABLE = 'Stable'
- ST_FAULTY = 'faulty'
+ ST_STARTED = 'Started'
+ ST_STABLE = 'Active'
+ ST_FAULTY = 'Faulty'
ST_INCON = 'inconsistent'
_ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]
def __init__(self):
self.lock = Lock()
self.state = {}
-
- def set_state(self, state, w=None):
- """set the state that can be used by external agents
- like glusterd for status reporting"""
- computestate = lambda: self.state and self._ST_ORD[
- max(self._ST_ORD.index(s) for s in self.state.values())]
- if w:
- self.lock.acquire()
- old_state = computestate()
- self.state[w] = state
- state = computestate()
- self.lock.release()
- if state != old_state:
- self.set_state(state)
- else:
- if getattr(gconf, 'state_file', None):
- # If previous state is paused, suffix the
- # new state with '(Paused)'
- try:
- with open(gconf.state_file, "r") as f:
- content = f.read()
- if "paused" in content.lower():
- state = state + '(Paused)'
- except IOError:
- pass
- logging.info('new state: %s' % state)
- update_file(gconf.state_file, lambda f: f.write(state + '\n'))
+ self.status = {}
@staticmethod
def terminate():
@@ -174,8 +152,7 @@ class Monitor(object):
# give a chance to graceful exit
os.kill(-os.getpid(), signal.SIGTERM)
-
- def monitor(self, w, argv, cpids, agents, slave_vol, slave_host):
+ def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master):
"""the monitor loop
Basic logic is a blantantly simple blunt heuristics:
@@ -194,8 +171,11 @@ class Monitor(object):
blown worker blows up on EPIPE if the net goes down,
due to the keep-alive thread)
"""
+ if not self.status.get(w[0], None):
+ self.status[w[0]] = GeorepStatus(gconf.state_file, w[0])
- self.set_state(self.ST_INIT, w)
+ set_monitor_status(gconf.state_file, self.ST_STARTED)
+ self.status[w[0]].set_worker_status(self.ST_INIT)
ret = 0
@@ -310,7 +290,7 @@ class Monitor(object):
nwait(apid) #wait for agent
ret = nwait(cpid)
if ret is None:
- self.set_state(self.ST_STABLE, w)
+ self.status[w[0]].set_worker_status(self.ST_STABLE)
#If worker dies, agent terminates on EOF.
#So lets wait for agent first.
nwait(apid)
@@ -320,12 +300,12 @@ class Monitor(object):
else:
ret = exit_status(ret)
if ret in (0, 1):
- self.set_state(self.ST_FAULTY, w)
+ self.status[w[0]].set_worker_status(self.ST_FAULTY)
time.sleep(10)
- self.set_state(self.ST_INCON, w)
+ self.status[w[0]].set_worker_status(self.ST_INCON)
return ret
- def multiplex(self, wspx, suuid, slave_vol, slave_host):
+ def multiplex(self, wspx, suuid, slave_vol, slave_host, master):
argv = sys.argv[:]
for o in ('-N', '--no-daemon', '--monitor'):
while o in argv:
@@ -339,7 +319,7 @@ class Monitor(object):
for wx in wspx:
def wmon(w):
cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol,
- slave_host)
+ slave_host, master)
time.sleep(1)
self.lock.acquire()
for cpid in cpids:
@@ -401,7 +381,7 @@ def distribute(*resources):
for idx, brick in enumerate(mvol.bricks)
if is_host_local(brick['host'])]
logging.info('worker specs: ' + repr(workerspex))
- return workerspex, suuid, slave_vol, slave_host
+ return workerspex, suuid, slave_vol, slave_host, master
def monitor(*resources):
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index d3d1ee36e01..6bf1ad03e70 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -38,6 +38,7 @@ from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
from syncdutils import ChangelogException
from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
+from gsyncdstatus import GeorepStatus
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
@@ -611,6 +612,9 @@ class Server(object):
def collect_failure(e, cmd_ret):
# We do this for failing fops on Slave
# Master should be logging this
+ if cmd_ret is None:
+ return
+
if cmd_ret == EEXIST:
disk_gfid = cls.gfid_mnt(e['entry'])
if isinstance(disk_gfid, basestring):
@@ -1344,6 +1348,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
os.close(int(ra))
os.close(int(wa))
changelog_agent = RepceClient(int(inf), int(ouf))
+ status = GeorepStatus(gconf.state_file, gconf.local_path)
+ status.reset_on_worker_start()
rv = changelog_agent.version()
if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:
raise GsyncdError(
@@ -1367,13 +1373,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
g2.CHANGELOG_CONN_RETRIES)
register_time = int(time.time())
- g2.register(register_time, changelog_agent)
- g3.register(register_time, changelog_agent)
+ g2.register(register_time, changelog_agent, status)
+ g3.register(register_time, changelog_agent, status)
except ChangelogException as e:
logging.error("Changelog register failed, %s" % e)
sys.exit(1)
- g1.register()
+ g1.register(status=status)
logging.info("Register time: %s" % register_time)
# oneshot: Try to use changelog history api, if not
# available switch to FS crawl
diff --git a/geo-replication/tests/unit/test_gsyncdstatus.py b/geo-replication/tests/unit/test_gsyncdstatus.py
new file mode 100644
index 00000000000..a65d659e356
--- /dev/null
+++ b/geo-replication/tests/unit/test_gsyncdstatus.py
@@ -0,0 +1,193 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import unittest
+import os
+import urllib
+
+from syncdaemon.gstatus import GeorepStatus, set_monitor_status
+from syncdaemon.gstatus import get_default_values
+from syncdaemon.gstatus import MONITOR_STATUS, DEFAULT_STATUS
+from syncdaemon.gstatus import STATUS_VALUES, CRAWL_STATUS_VALUES
+from syncdaemon.gstatus import human_time, human_time_utc
+
+
+class GeorepStatusTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.work_dir = os.path.dirname(os.path.abspath(__file__))
+ cls.monitor_status_file = os.path.join(cls.work_dir, "monitor.status")
+ cls.brick = "/exports/bricks/b1"
+ cls.status = GeorepStatus(cls.monitor_status_file, cls.brick)
+ cls.statusfile = os.path.join(cls.work_dir,
+ "brick_%s.status"
+ % urllib.quote_plus(cls.brick))
+
+ @classmethod
+ def tearDownClass(cls):
+ os.remove(cls.statusfile)
+ os.remove(cls.monitor_status_file)
+
+ def _filter_dict(self, inp, keys):
+ op = {}
+ for k in keys:
+ op[k] = inp.get(k, None)
+ return op
+
+ def test_monitor_status_file_created(self):
+ self.assertTrue(os.path.exists(self.monitor_status_file))
+
+ def test_status_file_created(self):
+ self.assertTrue(os.path.exists(self.statusfile))
+
+ def test_set_monitor_status(self):
+ for st in MONITOR_STATUS:
+ set_monitor_status(self.monitor_status_file, st)
+ self.assertTrue(self.status.get_monitor_status(), st)
+
+ def test_default_values_test(self):
+ self.assertTrue(get_default_values(), {
+ "slave_node": DEFAULT_STATUS,
+ "worker_status": DEFAULT_STATUS,
+ "last_synced": 0,
+ "last_synced_utc": 0,
+ "crawl_status": DEFAULT_STATUS,
+ "entry": 0,
+ "data": 0,
+ "metadata": 0,
+ "failures": 0,
+ "checkpoint_completed": False,
+ "checkpoint_time": 0,
+ "checkpoint_time_utc": 0,
+ "checkpoint_completion_time": 0,
+ "checkpoint_completion_time_utc": 0
+ })
+
+ def test_human_time(self):
+ self.assertTrue(human_time(1429174398), "2015-04-16 14:23:18")
+
+ def test_human_time_utc(self):
+ self.assertTrue(human_time_utc(1429174398), "2015-04-16 08:53:18")
+
+ def test_invalid_human_time(self):
+ self.assertTrue(human_time(142917439), DEFAULT_STATUS)
+ self.assertTrue(human_time("abcdef"), DEFAULT_STATUS)
+
+ def test_invalid_human_time_utc(self):
+ self.assertTrue(human_time_utc(142917439), DEFAULT_STATUS)
+ self.assertTrue(human_time_utc("abcdef"), DEFAULT_STATUS)
+
+ def test_worker_status(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ for st in STATUS_VALUES:
+ self.status.set_worker_status(st)
+ self.assertTrue(self.status.get_status()["worker_status"], st)
+
+ def test_crawl_status(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_active()
+ for st in CRAWL_STATUS_VALUES:
+ self.status.set_worker_crawl_status(st)
+ self.assertTrue(self.status.get_status()["crawl_status"], st)
+
+ def test_slave_node(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_active()
+ self.status.set_slave_node("fvm2")
+ self.assertTrue(self.status.get_status()["slave_node"], "fvm2")
+
+ self.status.set_worker_status("Passive")
+ self.status.set_slave_node("fvm2")
+ self.assertTrue(self.status.get_status()["slave_node"], "fvm2")
+
+ def test_active_worker_status(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_active()
+ self.assertTrue(self.status.get_status()["worker_status"], "Active")
+
+ def test_passive_worker_status(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_passive()
+ self.assertTrue(self.status.get_status()["worker_status"], "Passive")
+
+ def test_set_field(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_active()
+ self.status.set_field("entry", 42)
+ self.assertTrue(self.status.get_status()["entry"], 42)
+
+ def test_inc_value(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_active()
+ self.status.set_field("entry", 0)
+ self.status.inc_value("entry", 2)
+ self.assertTrue(self.status.get_status()["entry"], 2)
+
+ self.status.set_field("data", 0)
+ self.status.inc_value("data", 2)
+ self.assertTrue(self.status.get_status()["data"], 2)
+
+ self.status.set_field("meta", 0)
+ self.status.inc_value("meta", 2)
+ self.assertTrue(self.status.get_status()["meta"], 2)
+
+ self.status.set_field("failures", 0)
+ self.status.inc_value("failures", 2)
+ self.assertTrue(self.status.get_status()["failures"], 2)
+
+ def test_dec_value(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_active()
+
+ self.status.set_field("entry", 4)
+ self.status.inc_value("entry", 2)
+ self.assertTrue(self.status.get_status()["entry"], 2)
+
+ self.status.set_field("data", 4)
+ self.status.inc_value("data", 2)
+ self.assertTrue(self.status.get_status()["data"], 2)
+
+ self.status.set_field("meta", 4)
+ self.status.inc_value("meta", 2)
+ self.assertTrue(self.status.get_status()["meta"], 2)
+
+ self.status.set_field("failures", 4)
+ self.status.inc_value("failures", 2)
+ self.assertTrue(self.status.get_status()["failures"], 2)
+
+ def test_worker_status_when_monitor_status_created(self):
+ set_monitor_status(self.monitor_status_file, "Created")
+ for st in STATUS_VALUES:
+ self.status.set_worker_status(st)
+ self.assertTrue(self.status.get_status()["worker_status"],
+ "Created")
+
+ def test_worker_status_when_monitor_status_paused(self):
+ set_monitor_status(self.monitor_status_file, "Paused")
+ for st in STATUS_VALUES:
+ self.status.set_worker_status(st)
+ self.assertTrue(self.status.get_status()["worker_status"],
+ "Paused")
+
+ def test_worker_status_when_monitor_status_stopped(self):
+ set_monitor_status(self.monitor_status_file, "Stopped")
+ for st in STATUS_VALUES:
+ self.status.set_worker_status(st)
+ self.assertTrue(self.status.get_status()["worker_status"],
+ "Stopped")
+
+ def test_status_when_worker_status_active(self):
+ set_monitor_status(self.monitor_status_file, "Started")
+ self.status.set_active()
+
+
+if __name__ == "__main__":
+ unittest.main()