diff options
Diffstat (limited to 'geo-replication/syncdaemon')
-rw-r--r-- | geo-replication/syncdaemon/Makefile.am | 3 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 25 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncdstatus.py | 317 | ||||
-rw-r--r-- | geo-replication/syncdaemon/master.py | 374 | ||||
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 58 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 12 |
6 files changed, 415 insertions, 374 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index 885963eae2b..ed0f5e40924 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -2,6 +2,7 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \ resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ - $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py + $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py \ + gsyncdstatus.py CLEANFILES = diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index b9ee5aec8c7..32e4eb7828d 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -27,12 +27,13 @@ from ipaddr import IPAddress, IPNetwork from gconf import gconf from syncdutils import FreeObject, norm, grabpidfile, finalize -from syncdutils import log_raise_exception, privileged, update_file +from syncdutils import log_raise_exception, privileged from syncdutils import GsyncdError, select, set_term_handler from configinterface import GConffile, upgrade_config_file import resource from monitor import monitor from changelogagent import agent, Changelog +from gsyncdstatus import set_monitor_status, GeorepStatus class GLogger(Logger): @@ -267,7 +268,7 @@ def main_i(): op.add_option('--socketdir', metavar='DIR') op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs) - op.add_option('--checkpoint', metavar='LABEL', default='') + op.add_option('--checkpoint', metavar='LABEL', default='0') # tunables for failover/failback mechanism: # None - gsyncd behaves as normal @@ -315,6 +316,8 @@ def main_i(): action='callback', callback=store_local) op.add_option('--delete', dest='delete', action='callback', callback=store_local_curry(True)) + op.add_option('--status-get', dest='status_get', action='callback', + callback=store_local_curry(True)) op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a), setattr( @@ -583,15 +586,8 @@ def main_i(): GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf') if confdata.op == 'set': logging.info('checkpoint %s set' % confdata.val) - gcnf.delete('checkpoint_completed') - gcnf.delete('checkpoint_target') elif confdata.op == 'del': logging.info('checkpoint info was reset') - # if it is removing 'checkpoint' then we need - # to remove 'checkpoint_completed' and 'checkpoint_target' too - gcnf.delete('checkpoint_completed') - gcnf.delete('checkpoint_target') - except IOError: if sys.exc_info()[1].errno == ENOENT: # directory of log path is not present, @@ -607,7 +603,7 @@ def main_i(): create = rconf.get('create') if create: if getattr(gconf, 'state_file', None): - update_file(gconf.state_file, lambda f: f.write(create + '\n')) + set_monitor_status(gconf.state_file, create) return go_daemon = rconf['go_daemon'] @@ -615,6 +611,15 @@ def main_i(): be_agent = rconf.get('agent') rscs, local, remote = makersc(args) + + status_get = rconf.get('status_get') + if status_get: + for brick in gconf.path: + brick_status = GeorepStatus(gconf.state_file, brick) + checkpoint_time = int(getattr(gconf, "checkpoint", "0")) + brick_status.print_status(checkpoint_time=checkpoint_time) + return + if not be_monitor and isinstance(remote, resource.SSH) and \ go_daemon == 'should': go_daemon = 'postconn' diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py new file mode 100644 index 00000000000..a49b9c23dea --- /dev/null +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import fcntl +import os +import tempfile +import urllib +import json +import time +from datetime import datetime + +DEFAULT_STATUS = "N/A" +MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped") +STATUS_VALUES = (DEFAULT_STATUS, + "Initializing...", + "Active", + "Passive", + "Faulty") + +CRAWL_STATUS_VALUES = (DEFAULT_STATUS, + "Hybrid Crawl", + "History Crawl", + "Changelog Crawl") + + +def human_time(ts): + try: + return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + return DEFAULT_STATUS + + +def human_time_utc(ts): + try: + return datetime.utcfromtimestamp( + float(ts)).strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + return DEFAULT_STATUS + + +def get_default_values(): + return { + "slave_node": DEFAULT_STATUS, + "worker_status": DEFAULT_STATUS, + "last_synced": 0, + "crawl_status": DEFAULT_STATUS, + "entry": 0, + "data": 0, + "meta": 0, + "failures": 0, + "checkpoint_completed": DEFAULT_STATUS, + "checkpoint_time": 0, + "checkpoint_completion_time": 0} + + +class LockedOpen(object): + + def __init__(self, filename, *args, **kwargs): + self.filename = filename + self.open_args = args + self.open_kwargs = kwargs + self.fileobj = None + + def __enter__(self): + """ + If two processes compete to update a file, The first process + gets the lock and the second process is blocked in the fcntl.flock() + call. When first process replaces the file and releases the lock, + the already open file descriptor in the second process now points + to a "ghost" file(not reachable by any path name) with old contents. + To avoid that conflict, check the fd already opened is same or + not. Open new one if not same + """ + f = open(self.filename, *self.open_args, **self.open_kwargs) + while True: + fcntl.flock(f, fcntl.LOCK_EX) + fnew = open(self.filename, *self.open_args, **self.open_kwargs) + if os.path.sameopenfile(f.fileno(), fnew.fileno()): + fnew.close() + break + else: + f.close() + f = fnew + self.fileobj = f + return f + + def __exit__(self, _exc_type, _exc_value, _traceback): + self.fileobj.close() + + +def set_monitor_status(status_file, status): + fd = os.open(status_file, os.O_CREAT | os.O_RDWR) + os.close(fd) + with LockedOpen(status_file, 'r+'): + with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(status_file), + delete=False) as tf: + tf.write(status) + tempname = tf.name + + os.rename(tempname, status_file) + dirfd = os.open(os.path.dirname(os.path.abspath(status_file)), + os.O_DIRECTORY) + os.fsync(dirfd) + os.close(dirfd) + + +class GeorepStatus(object): + def __init__(self, monitor_status_file, brick): + self.work_dir = os.path.dirname(monitor_status_file) + self.monitor_status_file = monitor_status_file + self.filename = os.path.join(self.work_dir, + "brick_%s.status" + % urllib.quote_plus(brick)) + + fd = os.open(self.filename, os.O_CREAT | os.O_RDWR) + os.close(fd) + fd = os.open(self.monitor_status_file, os.O_CREAT | os.O_RDWR) + os.close(fd) + self.brick = brick + self.default_values = get_default_values() + + def _update(self, mergerfunc): + with LockedOpen(self.filename, 'r+') as f: + try: + data = json.load(f) + except ValueError: + data = self.default_values + + data = mergerfunc(data) + with tempfile.NamedTemporaryFile( + 'w', + dir=os.path.dirname(self.filename), + delete=False) as tf: + tf.write(data) + tempname = tf.name + + os.rename(tempname, self.filename) + dirfd = os.open(os.path.dirname(os.path.abspath(self.filename)), + os.O_DIRECTORY) + os.fsync(dirfd) + os.close(dirfd) + + def reset_on_worker_start(self): + def merger(data): + data["slave_node"] = DEFAULT_STATUS + data["crawl_status"] = DEFAULT_STATUS + data["entry"] = 0 + data["data"] = 0 + data["meta"] = 0 + return json.dumps(data) + + self._update(merger) + + def set_field(self, key, value): + def merger(data): + data[key] = value + return json.dumps(data) + + self._update(merger) + + def set_last_synced(self, value, checkpoint_time): + def merger(data): + data["last_synced"] = value[0] + + # If checkpoint is not set or reset + # or if last set checkpoint is changed + if checkpoint_time == 0 or \ + checkpoint_time != data["checkpoint_time"]: + data["checkpoint_time"] = 0 + data["checkpoint_completion_time"] = 0 + data["checkpoint_completed"] = "No" + + # If checkpoint is completed and not marked as completed + # previously then update the checkpoint completed time + if checkpoint_time > 0 and checkpoint_time <= value[0]: + if data["checkpoint_completed"] == "No": + data["checkpoint_time"] = checkpoint_time + data["checkpoint_completion_time"] = int(time.time()) + data["checkpoint_completed"] = "Yes" + return json.dumps(data) + + self._update(merger) + + def set_worker_status(self, status): + self.set_field("worker_status", status) + + def set_worker_crawl_status(self, status): + self.set_field("crawl_status", status) + + def set_slave_node(self, slave_node): + def merger(data): + data["slave_node"] = slave_node + return json.dumps(data) + + self._update(merger) + + def inc_value(self, key, value): + def merger(data): + data[key] = data.get(key, 0) + value + return json.dumps(data) + + self._update(merger) + + def dec_value(self, key, value): + def merger(data): + data[key] = data.get(key, 0) - value + if data[key] < 0: + data[key] = 0 + return json.dumps(data) + + self._update(merger) + + def set_active(self): + self.set_field("worker_status", "Active") + + def set_passive(self): + self.set_field("worker_status", "Passive") + + def get_monitor_status(self): + data = "" + with open(self.monitor_status_file, "r") as f: + data = f.read().strip() + return data + + def get_status(self, checkpoint_time=0): + """ + Monitor Status ---> Created Started Paused Stopped + ---------------------------------------------------------------------- + slave_node N/A VALUE VALUE N/A + status Created VALUE Paused Stopped + last_synced N/A VALUE VALUE VALUE + crawl_status N/A VALUE N/A N/A + entry N/A VALUE N/A N/A + data N/A VALUE N/A N/A + meta N/A VALUE N/A N/A + failures N/A VALUE VALUE VALUE + checkpoint_completed N/A VALUE VALUE VALUE + checkpoint_time N/A VALUE VALUE VALUE + checkpoint_completed_time N/A VALUE VALUE VALUE + """ + data = self.default_values + with open(self.filename) as f: + try: + data.update(json.load(f)) + except ValueError: + pass + monitor_status = self.get_monitor_status() + + if monitor_status in ["Created", "Paused", "Stopped"]: + data["worker_status"] = monitor_status + + # Checkpoint adjustments + if checkpoint_time == 0: + data["checkpoint_completed"] = DEFAULT_STATUS + data["checkpoint_time"] = DEFAULT_STATUS + data["checkpoint_completion_time"] = DEFAULT_STATUS + else: + if checkpoint_time != data["checkpoint_time"]: + if checkpoint_time <= data["last_synced"]: + data["checkpoint_completed"] = "Yes" + data["checkpoint_time"] = checkpoint_time + data["checkpoint_completion_time"] = data["last_synced"] + else: + data["checkpoint_completed"] = "No" + data["checkpoint_time"] = checkpoint_time + data["checkpoint_completion_time"] = DEFAULT_STATUS + + if data["checkpoint_time"] not in [0, DEFAULT_STATUS]: + chkpt_time = data["checkpoint_time"] + data["checkpoint_time"] = human_time(chkpt_time) + data["checkpoint_time_utc"] = human_time_utc(chkpt_time) + + if data["checkpoint_completion_time"] not in [0, DEFAULT_STATUS]: + chkpt_completion_time = data["checkpoint_completion_time"] + data["checkpoint_completion_time"] = human_time( + chkpt_completion_time) + data["checkpoint_completion_time_utc"] = human_time_utc( + chkpt_completion_time) + + if data["last_synced"] == 0: + data["last_synced"] = DEFAULT_STATUS + data["last_synced_utc"] = DEFAULT_STATUS + else: + last_synced = data["last_synced"] + data["last_synced"] = human_time(last_synced) + data["last_synced_utc"] = human_time_utc(last_synced) + + if data["worker_status"] != "Active": + data["last_synced"] = DEFAULT_STATUS + data["last_synced_utc"] = DEFAULT_STATUS + data["crawl_status"] = DEFAULT_STATUS + data["entry"] = DEFAULT_STATUS + data["data"] = DEFAULT_STATUS + data["meta"] = DEFAULT_STATUS + data["failures"] = DEFAULT_STATUS + data["checkpoint_completed"] = DEFAULT_STATUS + data["checkpoint_time"] = DEFAULT_STATUS + data["checkpoint_completed_time"] = DEFAULT_STATUS + data["checkpoint_time_utc"] = DEFAULT_STATUS + data["checkpoint_completion_time_utc"] = DEFAULT_STATUS + + if data["worker_status"] not in ["Active", "Passive"]: + data["slave_node"] = DEFAULT_STATUS + + return data + + def print_status(self, checkpoint_time=0): + for key, value in self.get_status(checkpoint_time).items(): + print ("%s: %s" % (key, value)) diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 38535884ec6..8e4c43046b0 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -15,17 +15,15 @@ import stat import json import logging import fcntl -import socket import string import errno import tarfile -from errno import ENOENT, ENODATA, EPIPE, EEXIST, EACCES, EAGAIN +from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN from threading import Condition, Lock from datetime import datetime from gconf import gconf -from tempfile import NamedTemporaryFile from syncdutils import Thread, GsyncdError, boolify, escape -from syncdutils import unescape, select, gauxpfx, md5hex, selfkill +from syncdutils import unescape, gauxpfx, md5hex, selfkill from syncdutils import lstat, errno_wrap from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable @@ -397,18 +395,6 @@ class GMasterCommon(object): raise return default_data - def update_crawl_data(self): - if getattr(gconf, 'state_detail_file', None): - try: - same_dir = os.path.dirname(gconf.state_detail_file) - with NamedTemporaryFile(dir=same_dir, delete=False) as tmp: - json.dump(self.total_crawl_stats, tmp) - tmp.flush() - os.fsync(tmp.fileno()) - os.rename(tmp.name, gconf.state_detail_file) - except (IOError, OSError): - raise - def __init__(self, master, slave): self.master = master self.slave = slave @@ -434,14 +420,12 @@ class GMasterCommon(object): self.total_turns = int(gconf.turns) self.crawl_start = datetime.now() self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} - self.total_crawl_stats = None self.start = None self.change_seen = None # the actual volinfo we make use of self.volinfo = None self.terminate = False self.sleep_interval = 1 - self.checkpoint_thread = None self.current_files_skipped_count = 0 self.skipped_gfid_list = [] self.unlinked_gfids = [] @@ -493,7 +477,6 @@ class GMasterCommon(object): logging.debug("Got the lock") return True - def should_crawl(self): if not gconf.use_meta_volume: return gconf.glusterd_uuid in self.master.server.node_uuid() @@ -503,7 +486,6 @@ class GMasterCommon(object): sys.exit(1) return self.mgmt_lock() - def register(self): self.register() @@ -542,10 +524,8 @@ class GMasterCommon(object): if self.volinfo['retval']: logging.warn("master cluster's info may not be valid %d" % self.volinfo['retval']) - self.start_checkpoint_thread() else: raise GsyncdError("master volinfo unavailable") - self.total_crawl_stats = self.get_initial_crawl_data() self.lastreport['time'] = time.time() logging.info('crawl interval: %d seconds' % self.sleep_interval) @@ -570,7 +550,7 @@ class GMasterCommon(object): t0 = t1 self.update_worker_remote_node() if not crawl: - self.update_worker_health("Passive") + self.status.set_passive() # bring up _this_ brick to the cluster stime # which is min of cluster (but max of the replicas) brick_stime = self.xtime('.', self.slave) @@ -597,35 +577,14 @@ class GMasterCommon(object): time.sleep(5) continue - self.update_worker_health("Active") + + self.status.set_active() self.crawl() + if oneshot: return time.sleep(self.sleep_interval) - @classmethod - def _checkpt_param(cls, chkpt, prm, xtimish=True): - """use config backend to lookup a parameter belonging to - checkpoint @chkpt""" - cprm = gconf.configinterface.get_realtime('checkpoint_' + prm) - if not cprm: - return - chkpt_mapped, val = cprm.split(':', 1) - if unescape(chkpt_mapped) != chkpt: - return - if xtimish: - val = cls.deserialize_xtime(val) - return val - - @classmethod - def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True): - """use config backend to store a parameter associated - with checkpoint @chkpt""" - if xtimish: - val = cls.serialize_xtime(val) - gconf.configinterface.set( - 'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) - @staticmethod def humantime(*tpair): """format xtime-like (sec, nsec) pair to human readable format""" @@ -654,116 +613,6 @@ class GMasterCommon(object): string.zfill(m, 2), string.zfill(s, 2)) return date - def checkpt_service(self, chan, chkpt): - """checkpoint service loop - - monitor and verify checkpoint status for @chkpt, and listen - for incoming requests for whom we serve a pretty-formatted - status report""" - while True: - chkpt = gconf.configinterface.get_realtime("checkpoint") - if not chkpt: - gconf.configinterface.delete("checkpoint_completed") - gconf.configinterface.delete("checkpoint_target") - # dummy loop for the case when there is no checkpt set - select([chan], [], []) - conn, _ = chan.accept() - conn.send('\0') - conn.close() - continue - - checkpt_tgt = self._checkpt_param(chkpt, 'target') - if not checkpt_tgt: - checkpt_tgt = self.xtime('.') - if isinstance(checkpt_tgt, int): - raise GsyncdError("master root directory is " - "unaccessible (%s)", - os.strerror(checkpt_tgt)) - self._set_checkpt_param(chkpt, 'target', checkpt_tgt) - logging.debug("checkpoint target %s has been determined " - "for checkpoint %s" % - (repr(checkpt_tgt), chkpt)) - - # check if the label is 'now' - chkpt_lbl = chkpt - try: - x1, x2 = chkpt.split(':') - if x1 == 'now': - chkpt_lbl = "as of " + self.humantime(x2) - except: - pass - completed = self._checkpt_param(chkpt, 'completed', xtimish=False) - if completed: - completed = tuple(int(x) for x in completed.split('.')) - s, _, _ = select([chan], [], [], (not completed) and 5 or None) - # either request made and we re-check to not - # give back stale data, or we still hunting for completion - if (self.native_xtime(checkpt_tgt) and ( - self.native_xtime(checkpt_tgt) < self.volmark)): - # indexing has been reset since setting the checkpoint - status = "is invalid" - else: - xtr = self.xtime('.', self.slave) - if isinstance(xtr, int): - raise GsyncdError("slave root directory is " - "unaccessible (%s)", - os.strerror(xtr)) - ncompleted = self.xtime_geq(xtr, checkpt_tgt) - if completed and not ncompleted: # stale data - logging.warn("completion time %s for checkpoint %s " - "became stale" % - (self.humantime(*completed), chkpt)) - completed = None - gconf.configinterface.delete('checkpoint_completed') - if ncompleted and not completed: # just reaching completion - completed = "%.6f" % time.time() - self._set_checkpt_param( - chkpt, 'completed', completed, xtimish=False) - completed = tuple(int(x) for x in completed.split('.')) - logging.info("checkpoint %s completed" % chkpt) - status = completed and \ - "completed at " + self.humantime(completed[0]) or \ - "not reached yet" - if s: - conn = None - try: - conn, _ = chan.accept() - try: - conn.send("checkpoint %s is %s\0" % - (chkpt_lbl, status)) - except: - exc = sys.exc_info()[1] - if ((isinstance(exc, OSError) or isinstance( - exc, IOError)) and exc.errno == EPIPE): - logging.debug('checkpoint client disconnected') - else: - raise - finally: - if conn: - conn.close() - - def start_checkpoint_thread(self): - """prepare and start checkpoint service""" - if self.checkpoint_thread or not ( - getattr(gconf, 'state_socket_unencoded', None) and getattr( - gconf, 'socketdir', None) - ): - return - chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - state_socket = os.path.join( - gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket") - try: - os.unlink(state_socket) - except: - if sys.exc_info()[0] == OSError: - pass - chan.bind(state_socket) - chan.listen(1) - chkpt = gconf.configinterface.get_realtime("checkpoint") - t = Thread(target=self.checkpt_service, args=(chan, chkpt)) - t.start() - self.checkpoint_thread = t - def add_job(self, path, label, job, *a, **kw): """insert @job function to job table at @path with @label""" if self.jobtab.get(path) is None: @@ -937,11 +786,15 @@ class GMasterChangelogMixin(GMasterCommon): files_pending['purge'] += 1 def log_failures(failures, entry_key, gfid_prefix, log_prefix): + num_failures = 0 for failure in failures: st = lstat(os.path.join(gfid_prefix, failure[0][entry_key])) if not isinstance(st, int): + num_failures += 1 logging.warn('%s FAILED: %s' % (log_prefix, repr(failure))) + self.status.inc_value("failures", num_failures) + for e in clist: e = e.strip() et = e[self.IDX_START:self.IDX_END] # entry type @@ -1040,12 +893,18 @@ class GMasterChangelogMixin(GMasterCommon): else: logging.warn('got invalid changelog type: %s' % (et)) logging.debug('entries: %s' % repr(entries)) - if not retry: - self.update_worker_cumilitive_status(files_pending) + + # Increment counters for Status + self.status.inc_value("entry", len(entries)) + self.files_in_batch = len(datas) + self.status.inc_value("data", self.files_in_batch) + # sync namespace if entries: failures = self.slave.server.entry_ops(entries) log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') + self.status.dec_value("entry", len(entries)) + # sync metadata if meta_gfid: meta_entries = [] @@ -1059,8 +918,11 @@ class GMasterChangelogMixin(GMasterCommon): continue meta_entries.append(edct('META', go=go[0], stat=st)) if meta_entries: + self.status.inc_value("meta", len(entries)) failures = self.slave.server.meta_ops(meta_entries) log_failures(failures, 'go', '', 'META') + self.status.dec_value("meta", len(entries)) + # sync data if datas: self.a_syncdata(datas) @@ -1112,9 +974,17 @@ class GMasterChangelogMixin(GMasterCommon): if done: xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) + chkpt_time = gconf.configinterface.get_realtime( + "checkpoint") + checkpoint_time = 0 + if chkpt_time is not None: + checkpoint_time = int(chkpt_time) + + self.status.set_last_synced(xtl, checkpoint_time) map(self.changelog_done_func, changes) self.archive_and_purge_changelogs(changes) - self.update_worker_files_syncd() + self.status.dec_value("data", self.files_in_batch) + self.files_in_batch = 0 break # We do not know which changelog transfer failed, retry everything. @@ -1124,14 +994,22 @@ class GMasterChangelogMixin(GMasterCommon): logging.warn('changelogs %s could not be processed - ' 'moving on...' % ' '.join(map(os.path.basename, changes))) - self.update_worker_total_files_skipped( - self.current_files_skipped_count) + self.status.inc_value("failures", + self.current_files_skipped_count) logging.warn('SKIPPED GFID = %s' % ','.join(self.skipped_gfid_list)) - self.update_worker_files_syncd() + + self.files_in_batch = 0 if done: xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) + chkpt_time = gconf.configinterface.get_realtime( + "checkpoint") + checkpoint_time = 0 + if chkpt_time is not None: + checkpoint_time = int(chkpt_time) + + self.status.set_last_synced(xtl, checkpoint_time) map(self.changelog_done_func, changes) self.archive_and_purge_changelogs(changes) break @@ -1152,161 +1030,12 @@ class GMasterChangelogMixin(GMasterCommon): if not stime == URXTIME: self.sendmark(path, stime) - def get_worker_status_file(self): - file_name = gconf.local_path + '.status' - file_name = file_name.replace("/", "_") - worker_status_file = gconf.georep_session_working_dir + file_name - return worker_status_file - - def update_worker_status(self, key, value): - default_data = {"remote_node": "N/A", - "worker status": "Not Started", - "crawl status": "N/A", - "files_syncd": 0, - "files_remaining": 0, - "bytes_remaining": 0, - "purges_remaining": 0, - "total_files_skipped": 0} - worker_status_file = self.get_worker_status_file() - try: - with open(worker_status_file, 'r+') as f: - loaded_data = json.load(f) - loaded_data[key] = value - os.ftruncate(f.fileno(), 0) - os.lseek(f.fileno(), 0, os.SEEK_SET) - json.dump(loaded_data, f) - f.flush() - os.fsync(f.fileno()) - except (IOError, OSError, ValueError): - logging.info('Creating new %s' % worker_status_file) - try: - with open(worker_status_file, 'wb') as f: - default_data[key] = value - json.dump(default_data, f) - f.flush() - os.fsync(f.fileno()) - except: - raise - - def update_worker_cumilitive_status(self, files_pending): - default_data = {"remote_node": "N/A", - "worker status": "Not Started", - "crawl status": "N/A", - "files_syncd": 0, - "files_remaining": 0, - "bytes_remaining": 0, - "purges_remaining": 0, - "total_files_skipped": 0} - worker_status_file = self.get_worker_status_file() - try: - with open(worker_status_file, 'r+') as f: - loaded_data = json.load(f) - loaded_data['files_remaining'] = files_pending['count'] - loaded_data['bytes_remaining'] = files_pending['bytes'] - loaded_data['purges_remaining'] = files_pending['purge'] - os.ftruncate(f.fileno(), 0) - os.lseek(f.fileno(), 0, os.SEEK_SET) - json.dump(loaded_data, f) - f.flush() - os.fsync(f.fileno()) - except (IOError, OSError, ValueError): - logging.info('Creating new %s' % worker_status_file) - try: - with open(worker_status_file, 'wb') as f: - default_data['files_remaining'] = files_pending['count'] - default_data['bytes_remaining'] = files_pending['bytes'] - default_data['purges_remaining'] = files_pending['purge'] - json.dump(default_data, f) - f.flush() - os.fsync(f.fileno()) - except: - raise - def update_worker_remote_node(self): node = sys.argv[-1] - node = node.split("@")[-1] + node_data = node.split("@") + node = node_data[-1] remote_node_ip = node.split(":")[0] - remote_node_vol = node.split(":")[3] - remote_node = remote_node_ip + '::' + remote_node_vol - self.update_worker_status('remote_node', remote_node) - - def update_worker_health(self, state): - self.update_worker_status('worker status', state) - - def update_worker_crawl_status(self, state): - self.update_worker_status('crawl status', state) - - def update_worker_files_syncd(self): - default_data = {"remote_node": "N/A", - "worker status": "Not Started", - "crawl status": "N/A", - "files_syncd": 0, - "files_remaining": 0, - "bytes_remaining": 0, - "purges_remaining": 0, - "total_files_skipped": 0} - worker_status_file = self.get_worker_status_file() - try: - with open(worker_status_file, 'r+') as f: - loaded_data = json.load(f) - loaded_data['files_syncd'] += loaded_data['files_remaining'] - loaded_data['files_remaining'] = 0 - loaded_data['bytes_remaining'] = 0 - loaded_data['purges_remaining'] = 0 - os.ftruncate(f.fileno(), 0) - os.lseek(f.fileno(), 0, os.SEEK_SET) - json.dump(loaded_data, f) - f.flush() - os.fsync(f.fileno()) - except (IOError, OSError, ValueError): - logging.info('Creating new %s' % worker_status_file) - try: - with open(worker_status_file, 'wb') as f: - json.dump(default_data, f) - f.flush() - os.fsync(f.fileno()) - except: - raise - - def update_worker_files_remaining(self, state): - self.update_worker_status('files_remaining', state) - - def update_worker_bytes_remaining(self, state): - self.update_worker_status('bytes_remaining', state) - - def update_worker_purges_remaining(self, state): - self.update_worker_status('purges_remaining', state) - - def update_worker_total_files_skipped(self, value): - default_data = {"remote_node": "N/A", - "worker status": "Not Started", - "crawl status": "N/A", - "files_syncd": 0, - "files_remaining": 0, - "bytes_remaining": 0, - "purges_remaining": 0, - "total_files_skipped": 0} - worker_status_file = self.get_worker_status_file() - try: - with open(worker_status_file, 'r+') as f: - loaded_data = json.load(f) - loaded_data['total_files_skipped'] = value - loaded_data['files_remaining'] -= value - os.ftruncate(f.fileno(), 0) - os.lseek(f.fileno(), 0, os.SEEK_SET) - json.dump(loaded_data, f) - f.flush() - os.fsync(f.fileno()) - except (IOError, OSError, ValueError): - logging.info('Creating new %s' % worker_status_file) - try: - with open(worker_status_file, 'wb') as f: - default_data['total_files_skipped'] = value - json.dump(default_data, f) - f.flush() - os.fsync(f.fileno()) - except: - raise + self.status.set_slave_node(remote_node_ip) def changelogs_batch_process(self, changes): changelogs_batches = [] @@ -1331,7 +1060,7 @@ class GMasterChangelogMixin(GMasterCommon): self.process(batch) def crawl(self): - self.update_worker_crawl_status("Changelog Crawl") + self.status.set_worker_crawl_status("Changelog Crawl") changes = [] # get stime (from the brick) and purge changelogs # that are _historical_ to that time. @@ -1355,16 +1084,17 @@ class GMasterChangelogMixin(GMasterCommon): self.changelogs_batch_process(changes) - def register(self, register_time, changelog_agent): + def register(self, register_time, changelog_agent, status): self.changelog_agent = changelog_agent self.sleep_interval = int(gconf.change_interval) self.changelog_done_func = self.changelog_agent.done self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), ".processed") + self.status = status class GMasterChangeloghistoryMixin(GMasterChangelogMixin): - def register(self, register_time, changelog_agent): + def register(self, register_time, changelog_agent, status): self.changelog_agent = changelog_agent self.changelog_register_time = register_time self.history_crawl_start_time = register_time @@ -1372,10 +1102,11 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): self.history_turns = 0 self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), ".history/.processed") + self.status = status def crawl(self): self.history_turns += 1 - self.update_worker_crawl_status("History Crawl") + self.status.set_worker_crawl_status("History Crawl") purge_time = self.get_purge_time() logging.info('starting history crawl... turns: %s, stime: %s' @@ -1455,7 +1186,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin): XSYNC_MAX_ENTRIES = 1 << 13 - def register(self, register_time=None, changelog_agent=None): + def register(self, register_time=None, changelog_agent=None, status=None): + self.status = status self.counter = 0 self.comlist = [] self.stimes = [] @@ -1486,7 +1218,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): t.start() logging.info('starting hybrid crawl..., stime: %s' % repr(self.get_purge_time())) - self.update_worker_crawl_status("Hybrid Crawl") + self.status.set_worker_crawl_status("Hybrid Crawl") while True: try: item = self.comlist.pop(0) diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 029726c7a5a..ba5c8e32514 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -22,10 +22,12 @@ from errno import EEXIST import re import random from gconf import gconf -from syncdutils import update_file, select, waitpid +from syncdutils import select, waitpid from syncdutils import set_term_handler, is_host_local, GsyncdError from syncdutils import escape, Thread, finalize, memoize +from gsyncdstatus import GeorepStatus, set_monitor_status + ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -125,46 +127,22 @@ class Volinfo(object): def disperse_count(self): return int(self.get('disperseCount')[0].text) + class Monitor(object): """class which spawns and manages gsyncd workers""" ST_INIT = 'Initializing...' - ST_STABLE = 'Stable' - ST_FAULTY = 'faulty' + ST_STARTED = 'Started' + ST_STABLE = 'Active' + ST_FAULTY = 'Faulty' ST_INCON = 'inconsistent' _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON] def __init__(self): self.lock = Lock() self.state = {} - - def set_state(self, state, w=None): - """set the state that can be used by external agents - like glusterd for status reporting""" - computestate = lambda: self.state and self._ST_ORD[ - max(self._ST_ORD.index(s) for s in self.state.values())] - if w: - self.lock.acquire() - old_state = computestate() - self.state[w] = state - state = computestate() - self.lock.release() - if state != old_state: - self.set_state(state) - else: - if getattr(gconf, 'state_file', None): - # If previous state is paused, suffix the - # new state with '(Paused)' - try: - with open(gconf.state_file, "r") as f: - content = f.read() - if "paused" in content.lower(): - state = state + '(Paused)' - except IOError: - pass - logging.info('new state: %s' % state) - update_file(gconf.state_file, lambda f: f.write(state + '\n')) + self.status = {} @staticmethod def terminate(): @@ -174,8 +152,7 @@ class Monitor(object): # give a chance to graceful exit os.kill(-os.getpid(), signal.SIGTERM) - - def monitor(self, w, argv, cpids, agents, slave_vol, slave_host): + def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -194,8 +171,11 @@ class Monitor(object): blown worker blows up on EPIPE if the net goes down, due to the keep-alive thread) """ + if not self.status.get(w[0], None): + self.status[w[0]] = GeorepStatus(gconf.state_file, w[0]) - self.set_state(self.ST_INIT, w) + set_monitor_status(gconf.state_file, self.ST_STARTED) + self.status[w[0]].set_worker_status(self.ST_INIT) ret = 0 @@ -310,7 +290,7 @@ class Monitor(object): nwait(apid) #wait for agent ret = nwait(cpid) if ret is None: - self.set_state(self.ST_STABLE, w) + self.status[w[0]].set_worker_status(self.ST_STABLE) #If worker dies, agent terminates on EOF. #So lets wait for agent first. nwait(apid) @@ -320,12 +300,12 @@ class Monitor(object): else: ret = exit_status(ret) if ret in (0, 1): - self.set_state(self.ST_FAULTY, w) + self.status[w[0]].set_worker_status(self.ST_FAULTY) time.sleep(10) - self.set_state(self.ST_INCON, w) + self.status[w[0]].set_worker_status(self.ST_INCON) return ret - def multiplex(self, wspx, suuid, slave_vol, slave_host): + def multiplex(self, wspx, suuid, slave_vol, slave_host, master): argv = sys.argv[:] for o in ('-N', '--no-daemon', '--monitor'): while o in argv: @@ -339,7 +319,7 @@ class Monitor(object): for wx in wspx: def wmon(w): cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, - slave_host) + slave_host, master) time.sleep(1) self.lock.acquire() for cpid in cpids: @@ -401,7 +381,7 @@ def distribute(*resources): for idx, brick in enumerate(mvol.bricks) if is_host_local(brick['host'])] logging.info('worker specs: ' + repr(workerspex)) - return workerspex, suuid, slave_vol, slave_host + return workerspex, suuid, slave_vol, slave_host, master def monitor(*resources): diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index d3d1ee36e01..6bf1ad03e70 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -38,6 +38,7 @@ from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable from syncdutils import ChangelogException from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION +from gsyncdstatus import GeorepStatus UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') @@ -611,6 +612,9 @@ class Server(object): def collect_failure(e, cmd_ret): # We do this for failing fops on Slave # Master should be logging this + if cmd_ret is None: + return + if cmd_ret == EEXIST: disk_gfid = cls.gfid_mnt(e['entry']) if isinstance(disk_gfid, basestring): @@ -1344,6 +1348,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): os.close(int(ra)) os.close(int(wa)) changelog_agent = RepceClient(int(inf), int(ouf)) + status = GeorepStatus(gconf.state_file, gconf.local_path) + status.reset_on_worker_start() rv = changelog_agent.version() if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: raise GsyncdError( @@ -1367,13 +1373,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): g2.CHANGELOG_CONN_RETRIES) register_time = int(time.time()) - g2.register(register_time, changelog_agent) - g3.register(register_time, changelog_agent) + g2.register(register_time, changelog_agent, status) + g3.register(register_time, changelog_agent, status) except ChangelogException as e: logging.error("Changelog register failed, %s" % e) sys.exit(1) - g1.register() + g1.register(status=status) logging.info("Register time: %s" % register_time) # oneshot: Try to use changelog history api, if not # available switch to FS crawl |