From 118ce698e8af425bf75ceab2c9e71cfdaa0ac848 Mon Sep 17 00:00:00 2001 From: Csaba Henk Date: Sun, 27 May 2012 03:56:24 +0530 Subject: geo-rep: checkpointing - gluster vol geo-rep M S conf checkpoint sets a checkpoint with LABEL (the keyword "now" is special, it's rendered to the label "as of ") that's used to refer to the checkpoint in the sequel. (Technically, gsyncd makes a note of the xtime of master's root as of setting the checkpoint, called the "checkpoint target".) - gluster vol geo-rep M S conf \!checkpoint deletes the checkpoint. - gluster vol geo-rep M S stat if status is OK, and there is a checkpoint configured, the checkpoint info is appended to status (either "not yet reached", or "completed at "). (Technically, the worker runs a thread that monitors / serializes / verifies checkpoint status, and answers checkpoint status requests through a UNIX socket; monitoring boils down to querying the xtime of slave's root and comparing with the target.) - gluster vol geo-rep M S conf log-file | xargs grep checkpoint displays the checkpoint history. Set, delete and completion events are logged properly. Change-Id: I4398e0819f1504e6e496b4209e91a0e156e1a0f8 BUG: 826512 Signed-off-by: Csaba Henk Reviewed-on: http://review.gluster.com/3491 Tested-by: Gluster Build System Reviewed-by: Venky Shankar --- xlators/features/marker/utils/syncdaemon/gsyncd.py | 56 ++++++--- xlators/features/marker/utils/syncdaemon/master.py | 129 ++++++++++++++++++++- .../features/marker/utils/syncdaemon/syncdutils.py | 6 + 3 files changed, 172 insertions(+), 19 deletions(-) (limited to 'xlators/features/marker/utils') diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py index 9ac32ce4267..d68cea6725e 100644 --- a/xlators/features/marker/utils/syncdaemon/gsyncd.py +++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py @@ -58,6 +58,25 @@ class GLogger(Logger): logging.getLogger().handlers = [] logging.basicConfig(**lprm) + @classmethod + def _gsyncd_loginit(cls, **kw): + lkw = {} + if gconf.log_level: + lkw['level'] = gconf.log_level + if kw.get('log_file'): + if kw['log_file'] in ('-', '/dev/stderr'): + lkw['stream'] = sys.stderr + elif kw['log_file'] == '/dev/stdout': + lkw['stream'] = sys.stdout + else: + lkw['filename'] = kw['log_file'] + + cls.setup(label=kw.get('label'), **lkw) + + lkw.update({'saved_label': kw.get('label')}) + gconf.log_metadata = lkw + gconf.log_exit = True + def startup(**kw): """set up logging, pidfile grabbing, daemonization""" if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn': @@ -88,22 +107,7 @@ def startup(**kw): select((x,), (), ()) os.close(x) - lkw = {} - if gconf.log_level: - lkw['level'] = gconf.log_level - if kw.get('log_file'): - if kw['log_file'] in ('-', '/dev/stderr'): - lkw['stream'] = sys.stderr - elif kw['log_file'] == '/dev/stdout': - lkw['stream'] = sys.stdout - else: - lkw['filename'] = kw['log_file'] - - GLogger.setup(label=kw.get('label'), **lkw) - - lkw.update({'saved_label': kw.get('label')}) - gconf.log_metadata = lkw - gconf.log_exit = True + GLogger._gsyncd_loginit(**kw) def main(): """main routine, signal/exception handling boilerplates""" @@ -166,6 +170,8 @@ def main_i(): op.add_option('--sync-jobs', metavar='N', type=int, default=3) op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) op.add_option('--allow-network', metavar='IPS', default='') + op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs) + op.add_option('--checkpoint', metavar='LABEL', default='') op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local) # duh. need to specify dest or value will be mapped to None :S @@ -278,6 +284,7 @@ def main_i(): rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf") gcnf = GConffile(rconf['config_file'], canon_peers, defaults.__dict__, opts.__dict__, namedict) + checkpoint_change = False if confdata: opt_ok = norm(confdata.opt) in tunables + [None] if confdata.op == 'check': @@ -293,7 +300,14 @@ def main_i(): gcnf.set(confdata.opt, confdata.val, confdata.rx) elif confdata.op == 'del': gcnf.delete(confdata.opt, confdata.rx) - return + # when modifying checkpoint, it's important to make a log + # of that, so in that case we go on to set up logging even + # if its just config invocation + if confdata.opt == 'checkpoint' and confdata.op in ('set', 'del') and \ + not confdata.rx: + checkpoint_change = True + if not checkpoint_change: + return gconf.__dict__.update(defaults.__dict__) gcnf.update_to(gconf.__dict__) @@ -331,6 +345,14 @@ def main_i(): raise GsyncdError('cannot recognize log level "%s"' % lvl0) gconf.log_level = lvl2 + if checkpoint_change: + GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf') + if confdata.op == 'set': + logging.info('checkpoint %s set' % confdata.val) + elif confdata.op == 'del': + logging.info('checkpoint info was reset') + return + go_daemon = rconf['go_daemon'] be_monitor = rconf.get('monitor') diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py index 8e196f8c5f4..4826037f134 100644 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -5,12 +5,20 @@ import stat import random import signal import logging +import socket import errno -from errno import ENOENT, ENODATA +from errno import ENOENT, ENODATA, EPIPE from threading import currentThread, Condition, Lock +from datetime import datetime +try: + from hashlib import md5 as md5 +except ImportError: + # py 2.4 + from md5 import new as md5 from gconf import gconf -from syncdutils import FreeObject, Thread, GsyncdError, boolify +from syncdutils import FreeObject, Thread, GsyncdError, boolify, \ + escape, unescape, select URXTIME = (-1, 0) @@ -113,6 +121,122 @@ class GMaster(object): # the actual volinfo we make use of self.volinfo = None self.terminate = False + self.checkpoint_thread = None + + @staticmethod + def _checkpt_param(chkpt, prm, timish=True): + """use config backend to lookup a parameter belonging to + checkpoint @chkpt""" + cprm = getattr(gconf, 'checkpoint_' + prm, None) + if not cprm: + return + chkpt_mapped, val = cprm.split(':', 1) + if unescape(chkpt_mapped) != chkpt: + return + if timish: + val = tuple(int(x) for x in val.split(".")) + return val + + @staticmethod + def _set_checkpt_param(chkpt, prm, val, timish=True): + """use config backend to store a parameter associated + with checkpoint @chkpt""" + if timish: + val = "%d.%d" % tuple(val) + gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) + + @staticmethod + def humantime(*tpair): + """format xtime-like (sec, nsec) pair to human readable format""" + ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\ + strftime("%Y-%m-%d %H:%M:%S") + if len(tpair) > 1: + ts += '.' + str(tpair[1]) + return ts + + def checkpt_service(self, chan, chkpt, tgt): + """checkpoint service loop + + monitor and verify checkpoint status for @chkpt, and listen + for incoming requests for whom we serve a pretty-formatted + status report""" + if not chkpt: + # dummy loop for the case when there is no checkpt set + while True: + select([chan], [], []) + conn, _ = chan.accept() + conn.send('\0') + conn.close() + completed = self._checkpt_param(chkpt, 'completed') + while True: + s,_,_ = select([chan], [], [], (not completed) and 5 or None) + # either request made and we re-check to not + # give back stale data, or we still hunting for completion + if tgt < self.volmark: + # indexing has been reset since setting the checkpoint + status = "is invalid" + else: + xtr = self.xtime('.', self.slave) + if isinstance(xtr, int): + raise GsyncdError("slave root directory is unaccessible (%s)", + os.strerror(xtr)) + ncompleted = (xtr >= tgt) + if completed and not ncompleted: # stale data + logging.warn("completion time %s for checkpoint %s became stale" % \ + (self.humantime(*completed), chkpt)) + completed = None + gconf.confdata.delete('checkpoint-completed') + if ncompleted and not completed: # just reaching completion + completed = [ int(x) for x in ("%.6f" % time.time()).split('.') ] + self._set_checkpt_param(chkpt, 'completed', completed) + logging.info("checkpoint %s completed" % chkpt) + status = completed and \ + "completed at " + self.humantime(completed[0]) or \ + "not reached yet" + if s: + conn = None + try: + conn, _ = chan.accept() + try: + conn.send(" | checkpoint %s %s\0" % (chkpt, status)) + except: + exc = sys.exc_info()[1] + if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ + exc.errno == EPIPE: + logging.debug('checkpoint client disconnected') + else: + raise + finally: + if conn: + conn.close() + + def start_checkpoint_thread(self): + """prepare and start checkpoint service""" + if self.checkpoint_thread or not getattr(gconf, 'state_socket_unencoded', None): + return + chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + state_socket = "/tmp/%s.socket" % md5(gconf.state_socket_unencoded).hexdigest() + try: + os.unlink(state_socket) + except: + if sys.exc_info()[0] == OSError: + pass + chan.bind(state_socket) + chan.listen(1) + checkpt_tgt = None + if gconf.checkpoint: + checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target') + if not checkpt_tgt: + checkpt_tgt = self.xtime('.') + if isinstance(checkpt_tgt, int): + raise GsyncdError("master root directory is unaccessible (%s)", + os.strerror(checkpt_tgt)) + self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt) + logging.debug("checkpoint target %d.%d has been determined for checkpoint %s" % \ + (checkpt_tgt[0], checkpt_tgt[1], gconf.checkpoint)) + t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt)) + t.start() + self.checkpoint_thread = t def crawl_loop(self): """start the keep-alive thread and iterate .crawl""" @@ -291,6 +415,7 @@ class GMaster(object): if self.volinfo: if self.volinfo['retval']: raise GsyncdError ("master is corrupt") + self.start_checkpoint_thread() else: if should_display_info or self.crawls == 0: if self.inter_master: diff --git a/xlators/features/marker/utils/syncdaemon/syncdutils.py b/xlators/features/marker/utils/syncdaemon/syncdutils.py index f786bc34326..1d4eb20032c 100644 --- a/xlators/features/marker/utils/syncdaemon/syncdutils.py +++ b/xlators/features/marker/utils/syncdaemon/syncdutils.py @@ -138,6 +138,12 @@ def finalize(*a, **kw): raise if gconf.ssh_ctl_dir and not gconf.cpid: shutil.rmtree(gconf.ssh_ctl_dir) + if getattr(gconf, 'state_socket', None): + try: + os.unlink(gconf.state_socket) + except: + if sys.exc_info()[0] == OSError: + pass if gconf.log_exit: logging.info("exiting.") sys.stdout.flush() -- cgit