diff options
Diffstat (limited to 'geo-replication')
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 48 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncdstatus.py | 22 | ||||
-rw-r--r-- | geo-replication/syncdaemon/master.py | 241 | ||||
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 61 | ||||
-rw-r--r-- | geo-replication/syncdaemon/repce.py | 8 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 92 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 27 |
7 files changed, 289 insertions, 210 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 3ddcb7f5454..932e37d1124 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -39,7 +39,7 @@ from changelogagent import agent, Changelog from gsyncdstatus import set_monitor_status, GeorepStatus, human_time_utc from libcxattr import Xattr import struct -from syncdutils import get_master_and_slave_data_from_args +from syncdutils import get_master_and_slave_data_from_args, lf ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -127,24 +127,30 @@ def slave_vol_uuid_get(host, vol): stdin=None, stdout=PIPE, stderr=PIPE) vix, err = po.communicate() if po.returncode != 0: - logging.info("Volume info failed, unable to get " - "volume uuid of %s present in %s," - "returning empty string: %s" % - (vol, host, po.returncode)) + logging.info(lf("Volume info failed, unable to get " + "volume uuid of slavevol, " + "returning empty string", + slavevol=vol, + slavehost=host, + error=po.returncode)) return "" vi = XET.fromstring(vix) if vi.find('opRet').text != '0': - logging.info("Unable to get volume uuid of %s, " - "present in %s returning empty string: %s" % - (vol, host, vi.find('opErrstr').text)) + logging.info(lf("Unable to get volume uuid of slavevol, " + "returning empty string", + slavevol=vol, + slavehost=host, + error=vi.find('opErrstr').text)) return "" try: voluuid = vi.find("volInfo/volumes/volume/id").text except (ParseError, AttributeError, ValueError) as e: - logging.info("Parsing failed to volume uuid of %s, " - "present in %s returning empty string: %s" % - (vol, host, e)) + logging.info(lf("Parsing failed to volume uuid of slavevol, " + "returning empty string", + slavevol=vol, + slavehost=host, + error=e)) voluuid = "" return voluuid @@ -692,16 +698,18 @@ def main_i(): if confdata.op == 'set': if confdata.opt == 'checkpoint': - logging.info("Checkpoint Set: %s" % ( - human_time_utc(confdata.val))) + logging.info(lf("Checkpoint Set", + time=human_time_utc(confdata.val))) else: - logging.info("Config Set: %s = %s" % ( - confdata.opt, confdata.val)) + logging.info(lf("Config Set", + config=confdata.opt, + value=confdata.val)) elif confdata.op == 'del': if confdata.opt == 'checkpoint': logging.info("Checkpoint Reset") else: - logging.info("Config Reset: %s" % confdata.opt) + logging.info(lf("Config Reset", + config=confdata.opt)) except IOError: if sys.exc_info()[1].errno == ENOENT: # directory of log path is not present, @@ -722,7 +730,8 @@ def main_i(): try: GLogger._gsyncd_loginit(log_file=gconf.log_file, label='monitor') gconf.log_exit = False - logging.info("Monitor Status: %s" % create) + logging.info(lf("Monitor Status Change", + status=create)) except IOError: if sys.exc_info()[1].errno == ENOENT: # If log dir not present @@ -772,7 +781,8 @@ def main_i(): if be_agent: os.setsid() - logging.debug('rpc_fd: %s' % repr(gconf.rpc_fd)) + logging.debug(lf("RPC FD", + rpc_fd=repr(gconf.rpc_fd))) return agent(Changelog(), gconf.rpc_fd) if be_monitor: @@ -786,7 +796,7 @@ def main_i(): remote.connect_remote(go_daemon='done') local.connect() if ffd: - logging.info ("Closing feedback fd, waking up the monitor") + logging.info("Closing feedback fd, waking up the monitor") os.close(ffd) local.service_loop(*[r for r in [remote] if r]) diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py index dd363baf181..38ca92c73a9 100644 --- a/geo-replication/syncdaemon/gsyncdstatus.py +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -20,7 +20,7 @@ from errno import EACCES, EAGAIN, ENOENT import logging from syncdutils import EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event -from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED +from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED, lf DEFAULT_STATUS = "N/A" MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped") @@ -225,10 +225,10 @@ class GeorepStatus(object): data["checkpoint_time"] = checkpoint_time data["checkpoint_completion_time"] = curr_time data["checkpoint_completed"] = "Yes" - logging.info("Checkpoint completed. Checkpoint " - "Time: %s, Completion Time: %s" % ( - human_time_utc(checkpoint_time), - human_time_utc(curr_time))) + logging.info(lf("Checkpoint completed", + checkpoint_time=human_time_utc( + checkpoint_time), + completion_time=human_time_utc(curr_time))) self.trigger_gf_event_checkpoint_completion( checkpoint_time, curr_time) @@ -238,11 +238,13 @@ class GeorepStatus(object): def set_worker_status(self, status): if self.set_field("worker_status", status): - logging.info("Worker Status: %s" % status) + logging.info(lf("Worker Status Change", + status=status)) def set_worker_crawl_status(self, status): if self.set_field("crawl_status", status): - logging.info("Crawl Status: %s" % status) + logging.info(lf("Crawl Status Change", + status=status)) def set_slave_node(self, slave_node): def merger(data): @@ -269,12 +271,14 @@ class GeorepStatus(object): def set_active(self): if self.set_field("worker_status", "Active"): - logging.info("Worker Status: Active") + logging.info(lf("Worker Status Change", + status="Active")) self.send_event(EVENT_GEOREP_ACTIVE) def set_passive(self): if self.set_field("worker_status", "Passive"): - logging.info("Worker Status: Passive") + logging.info(lf("Worker Status Change", + status="Passive")) self.send_event(EVENT_GEOREP_PASSIVE) def get_monitor_status(self): diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index aebaf31dcff..17ec550aafa 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -24,7 +24,7 @@ from datetime import datetime from gconf import gconf from syncdutils import Thread, GsyncdError, boolify, escape from syncdutils import unescape, gauxpfx, md5hex, selfkill -from syncdutils import lstat, errno_wrap, FreeObject +from syncdutils import lstat, errno_wrap, FreeObject, lf from syncdutils import NoStimeAvailable, PartialHistoryAvailable URXTIME = (-1, 0) @@ -54,8 +54,8 @@ def _volinfo_hook_relax_foreign(self): fgn_vi = volinfo_sys[self.KFGN] if fgn_vi: expiry = fgn_vi['timeout'] - int(time.time()) + 1 - logging.info('foreign volume info found, waiting %d sec for expiry' % - expiry) + logging.info(lf('foreign volume info found, waiting for expiry', + expiry=expiry)) time.sleep(expiry) volinfo_sys = self.get_sys_volinfo() return volinfo_sys @@ -90,7 +90,8 @@ def gmaster_builder(excrawl=None): modemixin = 'normal' changemixin = 'xsync' if gconf.change_detector == 'xsync' \ else excrawl or gconf.change_detector - logging.debug('setting up %s change detection mode' % changemixin) + logging.debug(lf('setting up change detection mode', + mode=changemixin)) modemixin = getattr(this, modemixin.capitalize() + 'Mixin') crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin') sendmarkmixin = boolify( @@ -256,7 +257,7 @@ class TarSSHEngine(object): """ def a_syncdata(self, files): - logging.debug('files: %s' % (files)) + logging.debug(lf("Files", files=files)) for f in files: pb = self.syncer.add(f) @@ -264,7 +265,7 @@ class TarSSHEngine(object): def regjob(se, xte, pb): rv = pb.wait() if rv[0]: - logging.debug('synced ' + se) + logging.debug(lf('synced', file=se)) return True else: # stat check for file presence @@ -290,16 +291,16 @@ class RsyncEngine(object): """Sync engine that uses rsync(1) for data transfers""" def a_syncdata(self, files): - logging.debug('files: %s' % (files)) + logging.debug(lf("files", files=files)) for f in files: - logging.debug('candidate for syncing %s' % f) + logging.debug(lf('candidate for syncing', file=f)) pb = self.syncer.add(f) def regjob(se, xte, pb): rv = pb.wait() if rv[0]: - logging.debug('synced ' + se) + logging.debug(lf('synced', file=se)) return True else: # stat to check if the file exist @@ -431,16 +432,16 @@ class GMasterCommon(object): fcntl.lockf(gconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) if not gconf.active_earlier: gconf.active_earlier = True - logging.info("Got lock : %s : Becoming ACTIVE" - % gconf.local_path) + logging.info(lf("Got lock Becoming ACTIVE", + brick=gconf.local_path)) return True except: ex = sys.exc_info()[1] if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): if not gconf.passive_earlier: gconf.passive_earlier = True - logging.info("Didn't get lock : %s : Becoming PASSIVE" - % gconf.local_path) + logging.info(lf("Didn't get lock Becoming PASSIVE", + brick=gconf.local_path)) return False raise @@ -449,7 +450,7 @@ class GMasterCommon(object): + str(gconf.subvol_num) + ".lock" mgmt_lock_dir = os.path.join(gconf.meta_volume_mnt, "geo-rep") path = os.path.join(mgmt_lock_dir, bname) - logging.debug("lock_file_path: %s" % path) + logging.debug(lf("lock file path", path=path)) try: fd = os.open(path, os.O_CREAT | os.O_RDWR) except OSError: @@ -477,15 +478,16 @@ class GMasterCommon(object): # cannot grab, it's taken if not gconf.passive_earlier: gconf.passive_earlier = True - logging.info("Didn't get lock : %s : Becoming PASSIVE" - % gconf.local_path) + logging.info(lf("Didn't get lock Becoming PASSIVE", + brick=gconf.local_path)) gconf.mgmt_lock_fd = fd return False raise if not gconf.active_earlier: gconf.active_earlier = True - logging.info("Got lock : %s : Becoming ACTIVE" % gconf.local_path) + logging.info(lf("Got lock Becoming ACTIVE", + brick=gconf.local_path)) return True def should_crawl(self): @@ -533,8 +535,8 @@ class GMasterCommon(object): gconf.configinterface.set('volume_id', self.uuid) if self.volinfo: if self.volinfo['retval']: - logging.warn("master cluster's info may not be valid %d" % - self.volinfo['retval']) + logging.warn(lf("master cluster's info may not be valid", + error=self.volinfo['retval'])) else: raise GsyncdError("master volinfo unavailable") self.lastreport['time'] = time.time() @@ -566,8 +568,10 @@ class GMasterCommon(object): brick_stime = self.xtime('.', self.slave) cluster_stime = self.master.server.aggregated.stime_mnt( '.', '.'.join([str(self.uuid), str(gconf.slave_id)])) - logging.debug("Cluster stime: %s | Brick stime: %s" % - (repr(cluster_stime), repr(brick_stime))) + logging.debug(lf("Crawl info", + cluster_stime=cluster_stime, + brick_stime=brick_stime)) + if not isinstance(cluster_stime, int): if brick_stime < cluster_stime: self.slave.server.set_stime( @@ -773,8 +777,8 @@ class GMasterChangelogMixin(GMasterCommon): st = lstat(os.path.join(gfid_prefix, failure[0][entry_key])) if not isinstance(st, int): num_failures += 1 - logging.error('%s FAILED: %s' % (log_prefix, - repr(failure))) + logging.error(lf('%s FAILED' % log_prefix, + data=failure)) if failure[0]['op'] == 'MKDIR': raise GsyncdError("The above directory failed to sync." " Please fix it to proceed further.") @@ -826,8 +830,8 @@ class GMasterChangelogMixin(GMasterCommon): if self.name == 'live_changelog' or \ self.name == 'history_changelog': if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY: - logging.debug('skip ENTRY op: %s if hot tier brick' - % (ec[self.POS_TYPE])) + logging.debug(lf('skip ENTRY op if hot tier brick', + op=ec[self.POS_TYPE])) continue # Data and Meta operations are decided while parsing @@ -917,7 +921,8 @@ class GMasterChangelogMixin(GMasterCommon): go = os.path.join(pfx, gfid) st = lstat(go) if isinstance(st, int): - logging.debug('file %s got purged in the interim' % go) + logging.debug(lf('file got purged in the interim', + file=go)) continue if ty == 'LINK': @@ -930,7 +935,9 @@ class GMasterChangelogMixin(GMasterCommon): entries.append( edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) else: - logging.warn('ignoring %s [op %s]' % (gfid, ty)) + logging.warn(lf('ignoring op', + gfid=gfid, + type=ty)) elif et == self.TYPE_GFID: # If self.unlinked_gfids is available, then that means it is # retrying the changelog second time. Do not add the GFID's @@ -962,7 +969,8 @@ class GMasterChangelogMixin(GMasterCommon): (boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)): datas.add(os.path.join(pfx, ec[0])) else: - logging.warn('got invalid changelog type: %s' % (et)) + logging.warn(lf('got invalid fop type', + type=et)) logging.debug('entries: %s' % repr(entries)) # Increment counters for Status @@ -1011,7 +1019,8 @@ class GMasterChangelogMixin(GMasterCommon): else: st = lstat(go[0]) if isinstance(st, int): - logging.debug('file %s got purged in the interim' % go[0]) + logging.debug(lf('file got purged in the interim', + file=go[0])) continue meta_entries.append(edct('META', go=go[0], stat=st)) if meta_entries: @@ -1067,7 +1076,8 @@ class GMasterChangelogMixin(GMasterCommon): self.a_syncdata(self.datas_in_batch) else: for change in changes: - logging.debug('processing change %s' % change) + logging.debug(lf('processing change', + changelog=change)) self.process_change(change, done, retry) if not retry: # number of changelogs processed in the batch @@ -1111,9 +1121,9 @@ class GMasterChangelogMixin(GMasterCommon): retry = True tries += 1 if tries == int(gconf.max_rsync_retries): - logging.error('changelogs %s could not be processed ' - 'completely - moving on...' % - ' '.join(map(os.path.basename, changes))) + logging.error(lf('changelogs could not be processed ' + 'completely - moving on...', + files=map(os.path.basename, changes))) # Reset data counter on failure self.status.dec_value("data", self.files_in_batch) @@ -1133,8 +1143,8 @@ class GMasterChangelogMixin(GMasterCommon): # entry_ops() that failed... so we retry the _whole_ changelog # again. # TODO: remove entry retries when it's gets fixed. - logging.warn('incomplete sync, retrying changelogs: %s' % - ' '.join(map(os.path.basename, changes))) + logging.warn(lf('incomplete sync, retrying changelogs', + files=map(os.path.basename, changes))) # Reset the Data counter before Retry self.status.dec_value("data", self.files_in_batch) @@ -1145,43 +1155,44 @@ class GMasterChangelogMixin(GMasterCommon): # Log the Skipped Entry ops range if any if self.skipped_entry_changelogs_first is not None and \ self.skipped_entry_changelogs_last is not None: - logging.info("Skipping already processed entry " - "ops from CHANGELOG.{0} to CHANGELOG.{1} " - "Num: {2}".format( - self.skipped_entry_changelogs_first, - self.skipped_entry_changelogs_last, - self.num_skipped_entry_changelogs)) + logging.info(lf("Skipping already processed entry ops", + from_changelog=self.skipped_entry_changelogs_first, + to_changelog=self.skipped_entry_changelogs_last, + num_changelogs=self.num_skipped_entry_changelogs)) # Log Current batch details if changes: logging.info( - "Entry Time Taken (UNL:{0} RMD:{1} CRE:{2} MKN:{3} " - "MKD:{4} REN:{5} LIN:{6} SYM:{7}): {8:.4f} " - "secs ".format ( - self.batch_stats["UNLINK"], self.batch_stats["RMDIR"], - self.batch_stats["CREATE"], self.batch_stats["MKNOD"], - self.batch_stats["MKDIR"], self.batch_stats["RENAME"], - self.batch_stats["LINK"], self.batch_stats["SYMLINK"], - self.batch_stats["ENTRY_SYNC_TIME"])) + lf("Entry Time Taken", + UNL=self.batch_stats["UNLINK"], + RMD=self.batch_stats["RMDIR"], + CRE=self.batch_stats["CREATE"], + MKN=self.batch_stats["MKNOD"], + MKD=self.batch_stats["MKDIR"], + REN=self.batch_stats["RENAME"], + LIN=self.batch_stats["LINK"], + SYM=self.batch_stats["SYMLINK"], + duration="%.4f" % self.batch_stats["ENTRY_SYNC_TIME"])) + logging.info( - "Metadata Time Taken (SETA:{0}): {1:.4f} secs. " - "Data Time Taken (SETX:{2} XATT:{3} DATA:{4}): " - "{5:.4f} secs".format( - self.batch_stats["SETATTR"], - self.batch_stats["META_SYNC_TIME"], - self.batch_stats["SETXATTR"], self.batch_stats["XATTROP"], - self.batch_stats["DATA"], - time.time() - self.batch_stats["DATA_START_TIME"])) + lf("Data/Metadata Time Taken", + SETA=self.batch_stats["SETATTR"], + meta_duration="%.4f" % self.batch_stats["META_SYNC_TIME"], + SETX=self.batch_stats["SETXATTR"], + XATT=self.batch_stats["XATTROP"], + DATA=self.batch_stats["DATA"], + data_duration="%.4f" % ( + time.time() - self.batch_stats["DATA_START_TIME"]))) + logging.info( - "{0} mode completed in {1:.4f} seconds " - "({2} - {3} Num: {4}) stime: {5}, entry_stime: {6}".format( - self.name, - time.time() - self.batch_start_time, - changes[0].split("/")[-1], - changes[-1].split("/")[-1], - len(changes), - repr(self.get_data_stime()), - repr(self.get_entry_stime()))) + lf("Batch Completed", + mode=self.name, + duration="%.4f" % (time.time() - self.batch_start_time), + changelog_start=changes[0].split(".")[-1], + changelog_end=changes[-1].split(".")[-1], + num_changelogs=len(changes), + stime=self.get_data_stime(), + entry_stime=self.get_entry_stime())) def upd_entry_stime(self, stime): self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY, @@ -1231,7 +1242,8 @@ class GMasterChangelogMixin(GMasterCommon): changelogs_batches[-1].append(c) for batch in changelogs_batches: - logging.debug('processing changes %s' % repr(batch)) + logging.debug(lf('processing changes', + batch=batch)) self.process(batch) def crawl(self): @@ -1246,13 +1258,14 @@ class GMasterChangelogMixin(GMasterCommon): changes = self.changelog_agent.getchanges() if changes: if data_stime: - logging.info("slave's time: %s" % repr(data_stime)) + logging.info(lf("slave's time", + stime=data_stime)) processed = [x for x in changes if int(x.split('.')[-1]) < data_stime[0]] for pr in processed: - logging.info( - 'skipping already processed change: %s...' % - os.path.basename(pr)) + logging.debug( + lf('skipping already processed change', + changelog=os.path.basename(pr))) self.changelog_done_func(pr) changes.remove(pr) self.archive_and_purge_changelogs(processed) @@ -1289,10 +1302,11 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): data_stime = self.get_data_stime() end_time = int(time.time()) - logging.info('starting history crawl... turns: %s, stime: %s, ' - 'etime: %s, entry_stime: %s' - % (self.history_turns, repr(data_stime), - repr(end_time), self.get_entry_stime())) + logging.info(lf('starting history crawl', + turns=self.history_turns, + stime=data_stime, + etime=end_time, + entry_stime=self.get_entry_stime())) if not data_stime or data_stime == URXTIME: raise NoStimeAvailable() @@ -1320,12 +1334,13 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): changes = self.changelog_agent.history_getchanges() if changes: if data_stime: - logging.info("slave's time: %s" % repr(data_stime)) + logging.info(lf("slave's time", + stime=data_stime)) processed = [x for x in changes if int(x.split('.')[-1]) < data_stime[0]] for pr in processed: - logging.info('skipping already processed change: ' - '%s...' % os.path.basename(pr)) + logging.debug(lf('skipping already processed change', + changelog=os.path.basename(pr))) self.changelog_done_func(pr) changes.remove(pr) @@ -1333,10 +1348,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): history_turn_time = int(time.time()) - self.history_crawl_start_time - logging.info('finished history crawl syncing, endtime: %s, ' - 'stime: %s, entry_stime: %s' - % (actual_end, repr(self.get_data_stime()), - self.get_entry_stime())) + logging.info(lf('finished history crawl', + endtime=actual_end, + stime=self.get_data_stime(), + entry_stime=self.get_entry_stime())) # If TS returned from history_changelog is < register_time # then FS crawl may be required, since history is only available @@ -1376,7 +1391,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin): self.stimes = [] self.sleep_interval = 60 self.tempdir = self.setup_working_dir() - logging.info('Working dir: %s' % self.tempdir) + logging.info(lf('Working dir', + path=self.tempdir)) self.tempdir = os.path.join(self.tempdir, 'xsync') self.processed_changelogs_dir = self.tempdir self.name = "xsync" @@ -1400,25 +1416,28 @@ class GMasterXsyncMixin(GMasterChangelogMixin): self.Xcrawl() t = Thread(target=Xsyncer) t.start() - logging.info('starting hybrid crawl..., stime: %s' - % repr(self.get_data_stime())) + logging.info(lf('starting hybrid crawl', + stime=self.get_data_stime())) self.status.set_worker_crawl_status("Hybrid Crawl") while True: try: item = self.comlist.pop(0) if item[0] == 'finale': - logging.info('finished hybrid crawl syncing, stime: %s' - % repr(self.get_data_stime())) + logging.info(lf('finished hybrid crawl', + stime=self.get_data_stime())) break elif item[0] == 'xsync': - logging.info('processing xsync changelog %s' % (item[1])) + logging.info(lf('processing xsync changelog', + path=item[1])) self.process([item[1]], 0) self.archive_and_purge_changelogs([item[1]]) elif item[0] == 'stime': - logging.debug('setting slave time: %s' % repr(item[1])) + logging.debug(lf('setting slave time', + time=item[1])) self.upd_stime(item[1][1], item[1][0]) else: - logging.warn('unknown tuple in comlist (%s)' % repr(item)) + logging.warn(lf('unknown tuple in comlist', + entry=item)) except IndexError: time.sleep(1) @@ -1496,8 +1515,9 @@ class GMasterXsyncMixin(GMasterChangelogMixin): xtr_root = self.xtime('.', self.slave) if isinstance(xtr_root, int): if xtr_root != ENOENT: - logging.warn("slave cluster not returning the " - "correct xtime for root (%d)" % xtr_root) + logging.warn(lf("slave cluster not returning the " + "correct xtime for root", + xtime=xtr_root)) xtr_root = self.minus_infinity xtl = self.xtime(path) if isinstance(xtl, int): @@ -1505,8 +1525,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin): xtr = self.xtime(path, self.slave) if isinstance(xtr, int): if xtr != ENOENT: - logging.warn("slave cluster not returning the " - "correct xtime for %s (%d)" % (path, xtr)) + logging.warn(lf("slave cluster not returning the " + "correct xtime", + path=path, + xtime=xtr)) xtr = self.minus_infinity xtr = max(xtr, xtr_root) zero_zero = (0, 0) @@ -1521,27 +1543,32 @@ class GMasterXsyncMixin(GMasterChangelogMixin): dem = self.master.server.entries(path) pargfid = self.master.server.gfid(path) if isinstance(pargfid, int): - logging.warn('skipping directory %s' % (path)) + logging.warn(lf('skipping directory', + path=path)) for e in dem: bname = e e = os.path.join(path, e) xte = self.xtime(e) if isinstance(xte, int): - logging.warn("irregular xtime for %s: %s" % - (e, errno.errorcode[xte])) + logging.warn(lf("irregular xtime", + path=e, + error=errno.errorcode[xte])) continue if not self.need_sync(e, xte, xtr): continue st = self.master.server.lstat(e) if isinstance(st, int): - logging.warn('%s got purged in the interim ...' % e) + logging.warn(lf('got purged in the interim', + path=e)) continue if self.is_sticky(e, st.st_mode): - logging.debug('ignoring sticky bit file %s' % e) + logging.debug(lf('ignoring sticky bit file', + path=e)) continue gfid = self.master.server.gfid(e) if isinstance(gfid, int): - logging.warn('skipping entry %s..' % e) + logging.warn(lf('skipping entry', + path=e)) continue mo = st.st_mode self.counter += 1 if ((stat.S_ISDIR(mo) or @@ -1704,14 +1731,12 @@ class Syncer(object): pb.close() start = time.time() po = self.sync_engine(pb, self.log_err) - logging.info("Sync Time Taken (Job:{0} " - "Files:{1} ReturnCode:{2}): " - "{3:.4f} secs".format( - job_id, - len(pb), - po.returncode, - time.time() - start - )) + logging.info(lf("Sync Time Taken", + job=job_id, + num_files=len(pb), + return_code=po.returncode, + duration="%.4f" % (time.time() - start))) + if po.returncode == 0: ret = (True, 0) elif po.returncode in self.errnos_ok: diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index c54c07d600c..b65f1948050 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -22,7 +22,7 @@ from errno import ECHILD, ESRCH import re import random from gconf import gconf -from syncdutils import select, waitpid, errno_wrap +from syncdutils import select, waitpid, errno_wrap, lf from syncdutils import set_term_handler, is_host_local, GsyncdError from syncdutils import escape, Thread, finalize, memoize from syncdutils import gf_event, EVENT_GEOREP_FAULTY @@ -63,15 +63,17 @@ def get_slave_bricks_status(host, vol): po.wait() po.terminate_geterr(fail_on_err=False) if po.returncode != 0: - logging.info("Volume status command failed, unable to get " - "list of up nodes of %s, returning empty list: %s" % - (vol, po.returncode)) + logging.info(lf("Volume status command failed, unable to get " + "list of up nodes, returning empty list", + volume=vol, + error=po.returncode)) return [] vi = XET.fromstring(vix) if vi.find('opRet').text != '0': - logging.info("Unable to get list of up nodes of %s, " - "returning empty list: %s" % - (vol, vi.find('opErrstr').text)) + logging.info(lf("Unable to get list of up nodes, " + "returning empty list", + volume=vol, + error=vi.find('opErrstr').text)) return [] up_hosts = set() @@ -81,8 +83,10 @@ def get_slave_bricks_status(host, vol): if el.find('status').text == '1': up_hosts.add(el.find('hostname').text) except (ParseError, AttributeError, ValueError) as e: - logging.info("Parsing failed to get list of up nodes of %s, " - "returning empty list: %s" % (vol, e)) + logging.info(lf("Parsing failed to get list of up nodes, " + "returning empty list", + volume=vol, + error=e)) return list(up_hosts) @@ -271,8 +275,9 @@ class Monitor(object): # Spawn the worker and agent in lock to avoid fd leak self.lock.acquire() - logging.info('starting gsyncd worker(%s). Slave node: %s' % - (w[0]['dir'], remote_host)) + logging.info(lf('starting gsyncd worker', + brick=w[0]['dir'], + slave_node=remote_host)) # Couple of pipe pairs for RPC communication b/w # worker and changelog agent. @@ -336,15 +341,16 @@ class Monitor(object): if ret_agent is not None: # Agent is died Kill Worker - logging.info("Changelog Agent died, " - "Aborting Worker(%s)" % w[0]['dir']) + logging.info(lf("Changelog Agent died, Aborting Worker", + brick=w[0]['dir'])) errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) nwait(cpid) nwait(apid) if ret is not None: - logging.info("worker(%s) died before establishing " - "connection" % w[0]['dir']) + logging.info(lf("worker died before establishing " + "connection", + brick=w[0]['dir'])) nwait(apid) # wait for agent else: logging.debug("worker(%s) connected" % w[0]['dir']) @@ -353,15 +359,16 @@ class Monitor(object): ret_agent = nwait(apid, os.WNOHANG) if ret is not None: - logging.info("worker(%s) died in startup " - "phase" % w[0]['dir']) + logging.info(lf("worker died in startup phase", + brick=w[0]['dir'])) nwait(apid) # wait for agent break if ret_agent is not None: # Agent is died Kill Worker - logging.info("Changelog Agent died, Aborting " - "Worker(%s)" % w[0]['dir']) + logging.info(lf("Changelog Agent died, Aborting " + "Worker", + brick=w[0]['dir'])) errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) nwait(cpid) nwait(apid) @@ -369,13 +376,15 @@ class Monitor(object): time.sleep(1) else: - logging.info("worker(%s) not confirmed in %d sec, aborting it. " - "Gsyncd invocation on remote slave via SSH or " - "gluster master mount might have hung. Please " - "check the above logs for exact issue and check " - "master or slave volume for errors. Restarting " - "master/slave volume accordingly might help." - % (w[0]['dir'], conn_timeout)) + logging.info( + lf("Worker not confirmed after wait, aborting it. " + "Gsyncd invocation on remote slave via SSH or " + "gluster master mount might have hung. Please " + "check the above logs for exact issue and check " + "master or slave volume for errors. Restarting " + "master/slave volume accordingly might help.", + brick=w[0]['dir'], + timeout=conn_timeout)) errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH]) nwait(apid) # wait for agent ret = nwait(cpid) diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py index d7b17dda796..0ac144930db 100644 --- a/geo-replication/syncdaemon/repce.py +++ b/geo-replication/syncdaemon/repce.py @@ -29,7 +29,7 @@ except ImportError: # py 3 import pickle -from syncdutils import Thread, select +from syncdutils import Thread, select, lf pickle_proto = -1 repce_version = 1.0 @@ -203,8 +203,10 @@ class RepceClient(object): meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) exc, res = rjob.wait() if exc: - logging.error('call %s (%s) failed on peer with %s' % - (repr(rjob), meth, str(type(res).__name__))) + logging.error(lf('call failed on peer', + call=repr(rjob), + method=meth, + error=str(type(res).__name__))) raise res logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res))) return res diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 5d7234358fb..37f6e1cabc1 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -41,7 +41,7 @@ from syncdutils import get_changelog_log_level, get_rsync_version from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION from gsyncdstatus import GeorepStatus from syncdutils import get_master_and_slave_data_from_args -from syncdutils import mntpt_list +from syncdutils import mntpt_list, lf UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I) @@ -228,11 +228,9 @@ class Popen(subprocess.Popen): def errlog(self): """make a log about child's failure event""" - filling = "" - if self.elines: - filling = ", saying:" - logging.error("""command "%s" returned with %s%s""" % - (" ".join(self.args), repr(self.returncode), filling)) + logging.error(lf("command returned error", + cmd=" ".join(self.args), + error=self.returncode)) lp = '' def logerr(l): @@ -725,11 +723,12 @@ class Server(object): def rename_with_disk_gfid_confirmation(gfid, entry, en): if not matching_disk_gfid(gfid, entry): - logging.error("RENAME ignored: " - "source entry:%s(gfid:%s) does not match with " - "on-disk gfid(%s), when attempting to rename " - "to %s" % - (entry, gfid, cls.gfid_mnt(entry), en)) + logging.error(lf("RENAME ignored: source entry does not match " + "with on-disk gfid", + source=entry, + gfid=gfid, + disk_gfid=cls.gfid_mnt(entry), + target=en)) return cmd_ret = errno_wrap(os.rename, @@ -769,12 +768,17 @@ class Server(object): logging.debug("Removed %s => %s/%s recursively" % (gfid, pg, bname)) else: - logging.warn("Recursive remove %s => %s/%s" - "failed: %s" % (gfid, pg, bname, - os.strerror(er1))) + logging.warn(lf("Recursive remove failed", + gfid=gfid, + pgfid=pg, + bname=bname, + error=os.strerror(er1))) else: - logging.warn("Failed to remove %s => %s/%s. %s" % - (gfid, pg, bname, os.strerror(er))) + logging.warn(lf("Failed to remove", + gfid=gfid, + pgfid=pg, + bname=bname, + error=os.strerror(er))) elif op in ['CREATE', 'MKNOD']: slink = os.path.join(pfx, gfid) st = lstat(slink) @@ -833,10 +837,11 @@ class Server(object): except OSError as e: if e.errno == ENOTEMPTY: logging.error( - "Unable to delete directory " - "{0}, Both Old({1}) and New{2}" - " directories exists".format( - entry, entry, en)) + lf("Unable to delete directory" + ", Both Old and New" + " directories exists", + old=entry, + new=en)) else: raise else: @@ -1011,8 +1016,8 @@ class SlaveLocal(object): time.sleep(int(gconf.timeout)) if lp == self.server.last_keep_alive: logging.info( - "connection inactive for %d seconds, stopping" % - int(gconf.timeout)) + lf("connection inactive, stopping", + timeout=int(gconf.timeout))) break else: select((), (), ()) @@ -1114,7 +1119,9 @@ class SlaveRemote(object): if kw.get("log_err", False): for errline in stderr.strip().split("\n")[:-1]: - logging.error("SYNC Error(Rsync): %s" % errline) + logging.error(lf("SYNC Error", + sync_engine="Rsync", + error=errline)) if log_rsync_performance: rsync_msg = [] @@ -1129,7 +1136,8 @@ class SlaveRemote(object): line.startswith("Total bytes received:") or \ line.startswith("sent "): rsync_msg.append(line) - logging.info("rsync performance: %s" % ", ".join(rsync_msg)) + logging.info(lf("rsync performance", + data=", ".join(rsync_msg))) return po @@ -1169,7 +1177,9 @@ class SlaveRemote(object): if log_err: for errline in stderr1.strip().split("\n")[:-1]: - logging.error("SYNC Error(Untar): %s" % errline) + logging.error(lf("SYNC Error", + sync_engine="Tarssh", + error=errline)) return p1 @@ -1389,7 +1399,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): if rv: rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \ (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0) - logging.warn('stale mount possibly left behind on ' + d) + logging.warn(lf('stale mount possibly left behind', + path=d)) raise GsyncdError("cleaning up temp mountpoint %s " "failed with status %d" % (d, rv)) @@ -1478,7 +1489,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): # if cli terminated with error due to being # refused by glusterd, what it put # out on stdout is a diagnostic message - logging.error('glusterd answered: %s' % self.mntpt) + logging.error(lf('glusterd answered', mnt=self.mntpt)) def connect(self): """inhibit the resource beyond @@ -1488,7 +1499,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): with given backend """ - logging.info ("Mounting gluster volume locally...") + logging.info("Mounting gluster volume locally...") t0 = time.time() label = getattr(gconf, 'mountbroker', None) if not label and not privileged(): @@ -1500,8 +1511,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + self.host, 'volfile-id=' + self.volume, 'client-pid=-1'] mounter(params).inhibit(*[l for l in [label] if l]) - logging.info ("Mounted gluster volume. Time taken: {0:.4f} " - "secs".format((time.time() - t0))) + logging.info(lf("Mounted gluster volume", + duration="%.4f" % (time.time() - t0))) def connect_remote(self, *a, **kw): sup(self, *a, **kw) @@ -1643,11 +1654,12 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): g2.register(register_time, changelog_agent, status) g3.register(register_time, changelog_agent, status) except ChangelogException as e: - logging.error("Changelog register failed, %s" % e) + logging.error(lf("Changelog register failed", error=e)) sys.exit(1) g1.register(status=status) - logging.info("Register time: %s" % register_time) + logging.info(lf("Register time", + time=register_time)) # oneshot: Try to use changelog history api, if not # available switch to FS crawl # Note: if config.change_detector is xsync then @@ -1655,8 +1667,9 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): try: g3.crawlwrap(oneshot=True) except PartialHistoryAvailable as e: - logging.info('Partial history available, using xsync crawl' - ' after consuming history till %s' % str(e)) + logging.info(lf('Partial history available, using xsync crawl' + ' after consuming history', + till=e)) g1.crawlwrap(oneshot=True, register_time=register_time) except ChangelogHistoryNotAvailable: logging.info('Changelog history not available, using xsync') @@ -1665,13 +1678,14 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): logging.info('No stime available, using xsync crawl') g1.crawlwrap(oneshot=True, register_time=register_time) except ChangelogException as e: - logging.error("Changelog History Crawl failed, %s" % e) + logging.error(lf("Changelog History Crawl failed", + error=e)) sys.exit(1) try: g2.crawlwrap() except ChangelogException as e: - logging.error("Changelog crawl failed, %s" % e) + logging.error(lf("Changelog crawl failed", error=e)) sys.exit(1) else: sup(self, *args) @@ -1763,14 +1777,14 @@ class SSH(AbstractUrl, SlaveRemote): self.inner_rsc.url) deferred = go_daemon == 'postconn' - logging.info ("Initializing SSH connection between master and slave...") + logging.info("Initializing SSH connection between master and slave...") t0 = time.time() ret = sup(self, gconf.ssh_command.split() + ["-p", str(gconf.ssh_port)] + gconf.ssh_ctl_args + [self.remote_addr], slave=self.inner_rsc.url, deferred=deferred) - logging.info ("SSH connection between master and slave established. " - "Time taken: {0:.4f} secs".format((time.time() - t0))) + logging.info(lf("SSH connection between master and slave established.", + duration="%.4f" % (time.time() - t0))) if deferred: # send a message to peer so that we can wait for diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 321e0d32ccc..b5f09459c57 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -304,8 +304,8 @@ def log_raise_exception(excont): gconf.transport.terminate_geterr() elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED): - logging.error('glusterfs session went down [%s]', - errorcode[exc.errno]) + logging.error(lf('glusterfs session went down', + error=errorcode[exc.errno])) else: logtag = "FAIL" if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG): @@ -387,8 +387,9 @@ def boolify(s): if lstr in true_list: rv = True elif not lstr in false_list: - logging.warn("Unknown string (%s) in string to boolean conversion " - "defaulting to False\n" % (s)) + logging.warn(lf("Unknown string in \"string to boolean\" conversion, " + "defaulting to False", + str=s)) return rv @@ -497,8 +498,9 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]): nr_tries += 1 if nr_tries == GF_OP_RETRIES: # probably a screwed state, cannot do much... - logging.warn('reached maximum retries (%s)...%s' % - (repr(arg), ex)) + logging.warn(lf('reached maximum retries', + args=repr(arg), + error=ex)) raise time.sleep(0.250) # retry the call @@ -572,3 +574,16 @@ def get_rsync_version(rsync_cmd): rsync_version = out.split(" ", 4)[3] return rsync_version + + +def lf(event, **kwargs): + """ + Log Format helper function, log messages can be + easily modified to structured log format. + lf("Config Change", sync_jobs=4, brick=/bricks/b1) will be + converted as "Config Change<TAB>brick=/bricks/b1<TAB>sync_jobs=4" + """ + msg = event + for k, v in kwargs.items(): + msg += "\t{0}={1}".format(k, v) + return msg |