diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 374 |
1 files changed, 53 insertions, 321 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 38535884ec6..8e4c43046b0 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -15,17 +15,15 @@ import stat import json import logging import fcntl -import socket import string import errno import tarfile -from errno import ENOENT, ENODATA, EPIPE, EEXIST, EACCES, EAGAIN +from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN from threading import Condition, Lock from datetime import datetime from gconf import gconf -from tempfile import NamedTemporaryFile from syncdutils import Thread, GsyncdError, boolify, escape -from syncdutils import unescape, select, gauxpfx, md5hex, selfkill +from syncdutils import unescape, gauxpfx, md5hex, selfkill from syncdutils import lstat, errno_wrap from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable @@ -397,18 +395,6 @@ class GMasterCommon(object): raise return default_data - def update_crawl_data(self): - if getattr(gconf, 'state_detail_file', None): - try: - same_dir = os.path.dirname(gconf.state_detail_file) - with NamedTemporaryFile(dir=same_dir, delete=False) as tmp: - json.dump(self.total_crawl_stats, tmp) - tmp.flush() - os.fsync(tmp.fileno()) - os.rename(tmp.name, gconf.state_detail_file) - except (IOError, OSError): - raise - def __init__(self, master, slave): self.master = master self.slave = slave @@ -434,14 +420,12 @@ class GMasterCommon(object): self.total_turns = int(gconf.turns) self.crawl_start = datetime.now() self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} - self.total_crawl_stats = None self.start = None self.change_seen = None # the actual volinfo we make use of self.volinfo = None self.terminate = False self.sleep_interval = 1 - self.checkpoint_thread = None self.current_files_skipped_count = 0 self.skipped_gfid_list = [] self.unlinked_gfids = [] @@ -493,7 +477,6 @@ class GMasterCommon(object): logging.debug("Got the lock") return True - def should_crawl(self): if not gconf.use_meta_volume: return gconf.glusterd_uuid in self.master.server.node_uuid() @@ -503,7 +486,6 @@ class GMasterCommon(object): sys.exit(1) return self.mgmt_lock() - def register(self): self.register() @@ -542,10 +524,8 @@ class GMasterCommon(object): if self.volinfo['retval']: logging.warn("master cluster's info may not be valid %d" % self.volinfo['retval']) - self.start_checkpoint_thread() else: raise GsyncdError("master volinfo unavailable") - self.total_crawl_stats = self.get_initial_crawl_data() self.lastreport['time'] = time.time() logging.info('crawl interval: %d seconds' % self.sleep_interval) @@ -570,7 +550,7 @@ class GMasterCommon(object): t0 = t1 self.update_worker_remote_node() if not crawl: - self.update_worker_health("Passive") + self.status.set_passive() # bring up _this_ brick to the cluster stime # which is min of cluster (but max of the replicas) brick_stime = self.xtime('.', self.slave) @@ -597,35 +577,14 @@ class GMasterCommon(object): time.sleep(5) continue - self.update_worker_health("Active") + + self.status.set_active() self.crawl() + if oneshot: return time.sleep(self.sleep_interval) - @classmethod - def _checkpt_param(cls, chkpt, prm, xtimish=True): - """use config backend to lookup a parameter belonging to - checkpoint @chkpt""" - cprm = gconf.configinterface.get_realtime('checkpoint_' + prm) - if not cprm: - return - chkpt_mapped, val = cprm.split(':', 1) - if unescape(chkpt_mapped) != chkpt: - return - if xtimish: - val = cls.deserialize_xtime(val) - return val - - @classmethod - def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True): - """use config backend to store a parameter associated - with checkpoint @chkpt""" - if xtimish: - val = cls.serialize_xtime(val) - gconf.configinterface.set( - 'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) - @staticmethod def humantime(*tpair): """format xtime-like (sec, nsec) pair to human readable format""" @@ -654,116 +613,6 @@ class GMasterCommon(object): string.zfill(m, 2), string.zfill(s, 2)) return date - def checkpt_service(self, chan, chkpt): - """checkpoint service loop - - monitor and verify checkpoint status for @chkpt, and listen - for incoming requests for whom we serve a pretty-formatted - status report""" - while True: - chkpt = gconf.configinterface.get_realtime("checkpoint") - if not chkpt: - gconf.configinterface.delete("checkpoint_completed") - gconf.configinterface.delete("checkpoint_target") - # dummy loop for the case when there is no checkpt set - select([chan], [], []) - conn, _ = chan.accept() - conn.send('\0') - conn.close() - continue - - checkpt_tgt = self._checkpt_param(chkpt, 'target') - if not checkpt_tgt: - checkpt_tgt = self.xtime('.') - if isinstance(checkpt_tgt, int): - raise GsyncdError("master root directory is " - "unaccessible (%s)", - os.strerror(checkpt_tgt)) - self._set_checkpt_param(chkpt, 'target', checkpt_tgt) - logging.debug("checkpoint target %s has been determined " - "for checkpoint %s" % - (repr(checkpt_tgt), chkpt)) - - # check if the label is 'now' - chkpt_lbl = chkpt - try: - x1, x2 = chkpt.split(':') - if x1 == 'now': - chkpt_lbl = "as of " + self.humantime(x2) - except: - pass - completed = self._checkpt_param(chkpt, 'completed', xtimish=False) - if completed: - completed = tuple(int(x) for x in completed.split('.')) - s, _, _ = select([chan], [], [], (not completed) and 5 or None) - # either request made and we re-check to not - # give back stale data, or we still hunting for completion - if (self.native_xtime(checkpt_tgt) and ( - self.native_xtime(checkpt_tgt) < self.volmark)): - # indexing has been reset since setting the checkpoint - status = "is invalid" - else: - xtr = self.xtime('.', self.slave) - if isinstance(xtr, int): - raise GsyncdError("slave root directory is " - "unaccessible (%s)", - os.strerror(xtr)) - ncompleted = self.xtime_geq(xtr, checkpt_tgt) - if completed and not ncompleted: # stale data - logging.warn("completion time %s for checkpoint %s " - "became stale" % - (self.humantime(*completed), chkpt)) - completed = None - gconf.configinterface.delete('checkpoint_completed') - if ncompleted and not completed: # just reaching completion - completed = "%.6f" % time.time() - self._set_checkpt_param( - chkpt, 'completed', completed, xtimish=False) - completed = tuple(int(x) for x in completed.split('.')) - logging.info("checkpoint %s completed" % chkpt) - status = completed and \ - "completed at " + self.humantime(completed[0]) or \ - "not reached yet" - if s: - conn = None - try: - conn, _ = chan.accept() - try: - conn.send("checkpoint %s is %s\0" % - (chkpt_lbl, status)) - except: - exc = sys.exc_info()[1] - if ((isinstance(exc, OSError) or isinstance( - exc, IOError)) and exc.errno == EPIPE): - logging.debug('checkpoint client disconnected') - else: - raise - finally: - if conn: - conn.close() - - def start_checkpoint_thread(self): - """prepare and start checkpoint service""" - if self.checkpoint_thread or not ( - getattr(gconf, 'state_socket_unencoded', None) and getattr( - gconf, 'socketdir', None) - ): - return - chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - state_socket = os.path.join( - gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket") - try: - os.unlink(state_socket) - except: - if sys.exc_info()[0] == OSError: - pass - chan.bind(state_socket) - chan.listen(1) - chkpt = gconf.configinterface.get_realtime("checkpoint") - t = Thread(target=self.checkpt_service, args=(chan, chkpt)) - t.start() - self.checkpoint_thread = t - def add_job(self, path, label, job, *a, **kw): """insert @job function to job table at @path with @label""" if self.jobtab.get(path) is None: @@ -937,11 +786,15 @@ class GMasterChangelogMixin(GMasterCommon): files_pending['purge'] += 1 def log_failures(failures, entry_key, gfid_prefix, log_prefix): + num_failures = 0 for failure in failures: st = lstat(os.path.join(gfid_prefix, failure[0][entry_key])) if not isinstance(st, int): + num_failures += 1 logging.warn('%s FAILED: %s' % (log_prefix, repr(failure))) + self.status.inc_value("failures", num_failures) + for e in clist: e = e.strip() et = e[self.IDX_START:self.IDX_END] # entry type @@ -1040,12 +893,18 @@ class GMasterChangelogMixin(GMasterCommon): else: logging.warn('got invalid changelog type: %s' % (et)) logging.debug('entries: %s' % repr(entries)) - if not retry: - self.update_worker_cumilitive_status(files_pending) + + # Increment counters for Status + self.status.inc_value("entry", len(entries)) + self.files_in_batch = len(datas) + self.status.inc_value("data", self.files_in_batch) + # sync namespace if entries: failures = self.slave.server.entry_ops(entries) log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') + self.status.dec_value("entry", len(entries)) + # sync metadata if meta_gfid: meta_entries = [] @@ -1059,8 +918,11 @@ class GMasterChangelogMixin(GMasterCommon): continue meta_entries.append(edct('META', go=go[0], stat=st)) if meta_entries: + self.status.inc_value("meta", len(entries)) failures = self.slave.server.meta_ops(meta_entries) log_failures(failures, 'go', '', 'META') + self.status.dec_value("meta", len(entries)) + # sync data if datas: self.a_syncdata(datas) @@ -1112,9 +974,17 @@ class GMasterChangelogMixin(GMasterCommon): if done: xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) + chkpt_time = gconf.configinterface.get_realtime( + "checkpoint") + checkpoint_time = 0 + if chkpt_time is not None: + checkpoint_time = int(chkpt_time) + + self.status.set_last_synced(xtl, checkpoint_time) map(self.changelog_done_func, changes) self.archive_and_purge_changelogs(changes) - self.update_worker_files_syncd() + self.status.dec_value("data", self.files_in_batch) + self.files_in_batch = 0 break # We do not know which changelog transfer failed, retry everything. @@ -1124,14 +994,22 @@ class GMasterChangelogMixin(GMasterCommon): logging.warn('changelogs %s could not be processed - ' 'moving on...' % ' '.join(map(os.path.basename, changes))) - self.update_worker_total_files_skipped( - self.current_files_skipped_count) + self.status.inc_value("failures", + self.current_files_skipped_count) logging.warn('SKIPPED GFID = %s' % ','.join(self.skipped_gfid_list)) - self.update_worker_files_syncd() + + self.files_in_batch = 0 if done: xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) + chkpt_time = gconf.configinterface.get_realtime( + "checkpoint") + checkpoint_time = 0 + if chkpt_time is not None: + checkpoint_time = int(chkpt_time) + + self.status.set_last_synced(xtl, checkpoint_time) map(self.changelog_done_func, changes) self.archive_and_purge_changelogs(changes) break @@ -1152,161 +1030,12 @@ class GMasterChangelogMixin(GMasterCommon): if not stime == URXTIME: self.sendmark(path, stime) - def get_worker_status_file(self): - file_name = gconf.local_path + '.status' - file_name = file_name.replace("/", "_") - worker_status_file = gconf.georep_session_working_dir + file_name - return worker_status_file - - def update_worker_status(self, key, value): - default_data = {"remote_node": "N/A", - "worker status": "Not Started", - "crawl status": "N/A", - "files_syncd": 0, - "files_remaining": 0, - "bytes_remaining": 0, - "purges_remaining": 0, - "total_files_skipped": 0} - worker_status_file = self.get_worker_status_file() - try: - with open(worker_status_file, 'r+') as f: - loaded_data = json.load(f) - loaded_data[key] = value - os.ftruncate(f.fileno(), 0) - os.lseek(f.fileno(), 0, os.SEEK_SET) - json.dump(loaded_data, f) - f.flush() - os.fsync(f.fileno()) - except (IOError, OSError, ValueError): - logging.info('Creating new %s' % worker_status_file) - try: - with open(worker_status_file, 'wb') as f: - default_data[key] = value - json.dump(default_data, f) - f.flush() - os.fsync(f.fileno()) - except: - raise - - def update_worker_cumilitive_status(self, files_pending): - default_data = {"remote_node": "N/A", - "worker status": "Not Started", - "crawl status": "N/A", - "files_syncd": 0, - "files_remaining": 0, - "bytes_remaining": 0, - "purges_remaining": 0, - "total_files_skipped": 0} - worker_status_file = self.get_worker_status_file() - try: - with open(worker_status_file, 'r+') as f: - loaded_data = json.load(f) - loaded_data['files_remaining'] = files_pending['count'] - loaded_data['bytes_remaining'] = files_pending['bytes'] - loaded_data['purges_remaining'] = files_pending['purge'] - os.ftruncate(f.fileno(), 0) - os.lseek(f.fileno(), 0, os.SEEK_SET) - json.dump(loaded_data, f) - f.flush() - os.fsync(f.fileno()) - except (IOError, OSError, ValueError): - logging.info('Creating new %s' % worker_status_file) - try: - with open(worker_status_file, 'wb') as f: - default_data['files_remaining'] = files_pending['count'] - default_data['bytes_remaining'] = files_pending['bytes'] - default_data['purges_remaining'] = files_pending['purge'] - json.dump(default_data, f) - f.flush() - os.fsync(f.fileno()) - except: - raise - def update_worker_remote_node(self): node = sys.argv[-1] - node = node.split("@")[-1] + node_data = node.split("@") + node = node_data[-1] remote_node_ip = node.split(":")[0] - remote_node_vol = node.split(":")[3] - remote_node = remote_node_ip + '::' + remote_node_vol - self.update_worker_status('remote_node', remote_node) - - def update_worker_health(self, state): - self.update_worker_status('worker status', state) - - def update_worker_crawl_status(self, state): - self.update_worker_status('crawl status', state) - - def update_worker_files_syncd(self): - default_data = {"remote_node": "N/A", - "worker status": "Not Started", - "crawl status": "N/A", - "files_syncd": 0, - "files_remaining": 0, - "bytes_remaining": 0, - "purges_remaining": 0, - "total_files_skipped": 0} - worker_status_file = self.get_worker_status_file() - try: - with open(worker_status_file, 'r+') as f: - loaded_data = json.load(f) - loaded_data['files_syncd'] += loaded_data['files_remaining'] - loaded_data['files_remaining'] = 0 - loaded_data['bytes_remaining'] = 0 - loaded_data['purges_remaining'] = 0 - os.ftruncate(f.fileno(), 0) - os.lseek(f.fileno(), 0, os.SEEK_SET) - json.dump(loaded_data, f) - f.flush() - os.fsync(f.fileno()) - except (IOError, OSError, ValueError): - logging.info('Creating new %s' % worker_status_file) - try: - with open(worker_status_file, 'wb') as f: - json.dump(default_data, f) - f.flush() - os.fsync(f.fileno()) - except: - raise - - def update_worker_files_remaining(self, state): - self.update_worker_status('files_remaining', state) - - def update_worker_bytes_remaining(self, state): - self.update_worker_status('bytes_remaining', state) - - def update_worker_purges_remaining(self, state): - self.update_worker_status('purges_remaining', state) - - def update_worker_total_files_skipped(self, value): - default_data = {"remote_node": "N/A", - "worker status": "Not Started", - "crawl status": "N/A", - "files_syncd": 0, - "files_remaining": 0, - "bytes_remaining": 0, - "purges_remaining": 0, - "total_files_skipped": 0} - worker_status_file = self.get_worker_status_file() - try: - with open(worker_status_file, 'r+') as f: - loaded_data = json.load(f) - loaded_data['total_files_skipped'] = value - loaded_data['files_remaining'] -= value - os.ftruncate(f.fileno(), 0) - os.lseek(f.fileno(), 0, os.SEEK_SET) - json.dump(loaded_data, f) - f.flush() - os.fsync(f.fileno()) - except (IOError, OSError, ValueError): - logging.info('Creating new %s' % worker_status_file) - try: - with open(worker_status_file, 'wb') as f: - default_data['total_files_skipped'] = value - json.dump(default_data, f) - f.flush() - os.fsync(f.fileno()) - except: - raise + self.status.set_slave_node(remote_node_ip) def changelogs_batch_process(self, changes): changelogs_batches = [] @@ -1331,7 +1060,7 @@ class GMasterChangelogMixin(GMasterCommon): self.process(batch) def crawl(self): - self.update_worker_crawl_status("Changelog Crawl") + self.status.set_worker_crawl_status("Changelog Crawl") changes = [] # get stime (from the brick) and purge changelogs # that are _historical_ to that time. @@ -1355,16 +1084,17 @@ class GMasterChangelogMixin(GMasterCommon): self.changelogs_batch_process(changes) - def register(self, register_time, changelog_agent): + def register(self, register_time, changelog_agent, status): self.changelog_agent = changelog_agent self.sleep_interval = int(gconf.change_interval) self.changelog_done_func = self.changelog_agent.done self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), ".processed") + self.status = status class GMasterChangeloghistoryMixin(GMasterChangelogMixin): - def register(self, register_time, changelog_agent): + def register(self, register_time, changelog_agent, status): self.changelog_agent = changelog_agent self.changelog_register_time = register_time self.history_crawl_start_time = register_time @@ -1372,10 +1102,11 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): self.history_turns = 0 self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), ".history/.processed") + self.status = status def crawl(self): self.history_turns += 1 - self.update_worker_crawl_status("History Crawl") + self.status.set_worker_crawl_status("History Crawl") purge_time = self.get_purge_time() logging.info('starting history crawl... turns: %s, stime: %s' @@ -1455,7 +1186,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin): XSYNC_MAX_ENTRIES = 1 << 13 - def register(self, register_time=None, changelog_agent=None): + def register(self, register_time=None, changelog_agent=None, status=None): + self.status = status self.counter = 0 self.comlist = [] self.stimes = [] @@ -1486,7 +1218,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): t.start() logging.info('starting hybrid crawl..., stime: %s' % repr(self.get_purge_time())) - self.update_worker_crawl_status("Hybrid Crawl") + self.status.set_worker_crawl_status("Hybrid Crawl") while True: try: item = self.comlist.pop(0) |