summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/monitor.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
-rw-r--r--geo-replication/syncdaemon/monitor.py395
1 files changed, 395 insertions, 0 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
new file mode 100644
index 00000000000..6aa7b9dfc99
--- /dev/null
+++ b/geo-replication/syncdaemon/monitor.py
@@ -0,0 +1,395 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import os
+import sys
+import time
+import signal
+import logging
+import xml.etree.ElementTree as XET
+from threading import Lock
+from errno import ECHILD, ESRCH
+import random
+
+from resource import SSH
+import gsyncdconfig as gconf
+import libgfchangelog
+from rconf import rconf
+from syncdutils import (select, waitpid, errno_wrap, lf, grabpidfile,
+ set_term_handler, GsyncdError,
+ Thread, finalize, Volinfo, VolinfoFromGconf,
+ gf_event, EVENT_GEOREP_FAULTY, get_up_nodes,
+ unshare_propagation_supported)
+from gsyncdstatus import GeorepStatus, set_monitor_status
+import py2py3
+from py2py3 import pipe
+
+ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
+
+
+def get_subvol_num(brick_idx, vol, hot):
+ tier = vol.is_tier()
+ disperse_count = vol.disperse_count(tier, hot)
+ replica_count = vol.replica_count(tier, hot)
+ distribute_count = vol.distribution_count(tier, hot)
+ gconf.setconfig("master-distribution-count", distribute_count)
+
+ if (tier and not hot):
+ brick_idx = brick_idx - vol.get_hot_bricks_count(tier)
+
+ subvol_size = disperse_count if disperse_count > 0 else replica_count
+ cnt = int((brick_idx + 1) / subvol_size)
+ rem = (brick_idx + 1) % subvol_size
+ if rem > 0:
+ cnt = cnt + 1
+
+ if (tier and hot):
+ return "hot_" + str(cnt)
+ elif (tier and not hot):
+ return "cold_" + str(cnt)
+ else:
+ return str(cnt)
+
+
+class Monitor(object):
+
+ """class which spawns and manages gsyncd workers"""
+
+ ST_INIT = 'Initializing...'
+ ST_STARTED = 'Started'
+ ST_STABLE = 'Active'
+ ST_FAULTY = 'Faulty'
+ ST_INCON = 'inconsistent'
+ _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]
+
+ def __init__(self):
+ self.lock = Lock()
+ self.state = {}
+ self.status = {}
+
+ @staticmethod
+ def terminate():
+ # relax one SIGTERM by setting a handler that sets back
+ # standard handler
+ set_term_handler(lambda *a: set_term_handler())
+ # give a chance to graceful exit
+ errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH])
+
+ def monitor(self, w, argv, cpids, slave_vol, slave_host, master,
+ suuid, slavenodes):
+ """the monitor loop
+
+ Basic logic is a blantantly simple blunt heuristics:
+ if spawned client survives 60 secs, it's considered OK.
+ This servers us pretty well as it's not vulneralbe to
+ any kind of irregular behavior of the child...
+
+ ... well, except for one: if children is hung up on
+ waiting for some event, it can survive aeons, still
+ will be defunct. So we tweak the above logic to
+ expect the worker to send us a signal within 60 secs
+ (in the form of closing its end of a pipe). The worker
+ does this when it's done with the setup stage
+ ready to enter the service loop (note it's the setup
+ stage which is vulnerable to hangs -- the full
+ blown worker blows up on EPIPE if the net goes down,
+ due to the keep-alive thread)
+ """
+ if not self.status.get(w[0]['dir'], None):
+ self.status[w[0]['dir']] = GeorepStatus(gconf.get("state-file"),
+ w[0]['host'],
+ w[0]['dir'],
+ w[0]['uuid'],
+ master,
+ "%s::%s" % (slave_host,
+ slave_vol))
+ ret = 0
+
+ def nwait(p, o=0):
+ try:
+ p2, r = waitpid(p, o)
+ if not p2:
+ return
+ return r
+ except OSError as e:
+ # no child process, this happens if the child process
+ # already died and has been cleaned up
+ if e.errno == ECHILD:
+ return -1
+ else:
+ raise
+
+ def exit_signalled(s):
+ """ child terminated due to receipt of SIGUSR1 """
+ return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1))
+
+ def exit_status(s):
+ if os.WIFEXITED(s):
+ return os.WEXITSTATUS(s)
+ return 1
+
+ conn_timeout = gconf.get("connection-timeout")
+ while ret in (0, 1):
+ remote_user, remote_host = w[1][0].split("@")
+ remote_id = w[1][1]
+ # Check the status of the connected slave node
+ # If the connected slave node is down then try to connect to
+ # different up node.
+ current_slave_host = remote_host
+ slave_up_hosts = get_up_nodes(slavenodes, gconf.get("ssh-port"))
+
+ if (current_slave_host, remote_id) not in slave_up_hosts:
+ if len(slave_up_hosts) > 0:
+ remote_new = random.choice(slave_up_hosts)
+ remote_host = "%s@%s" % (remote_user, remote_new[0])
+ remote_id = remote_new[1]
+
+ # Spawn the worker in lock to avoid fd leak
+ self.lock.acquire()
+
+ self.status[w[0]['dir']].set_worker_status(self.ST_INIT)
+ logging.info(lf('starting gsyncd worker',
+ brick=w[0]['dir'],
+ slave_node=remote_host))
+
+ pr, pw = pipe()
+ cpid = os.fork()
+ if cpid == 0:
+ os.close(pr)
+
+ args_to_worker = argv + [
+ 'worker',
+ rconf.args.master,
+ rconf.args.slave,
+ '--feedback-fd', str(pw),
+ '--local-path', w[0]['dir'],
+ '--local-node', w[0]['host'],
+ '--local-node-id', w[0]['uuid'],
+ '--slave-id', suuid,
+ '--subvol-num', str(w[2]),
+ '--resource-remote', remote_host,
+ '--resource-remote-id', remote_id
+ ]
+
+ if rconf.args.config_file is not None:
+ args_to_worker += ['-c', rconf.args.config_file]
+
+ if w[3]:
+ args_to_worker.append("--is-hottier")
+
+ if rconf.args.debug:
+ args_to_worker.append("--debug")
+
+ access_mount = gconf.get("access-mount")
+ if access_mount:
+ os.execv(sys.executable, args_to_worker)
+ else:
+ if unshare_propagation_supported():
+ logging.debug("Worker would mount volume privately")
+ unshare_cmd = ['unshare', '-m', '--propagation',
+ 'private']
+ cmd = unshare_cmd + args_to_worker
+ os.execvp("unshare", cmd)
+ else:
+ logging.debug("Mount is not private. It would be lazy"
+ " umounted")
+ os.execv(sys.executable, args_to_worker)
+
+ cpids.add(cpid)
+ os.close(pw)
+
+ self.lock.release()
+
+ t0 = time.time()
+ so = select((pr,), (), (), conn_timeout)[0]
+ os.close(pr)
+
+ if so:
+ ret = nwait(cpid, os.WNOHANG)
+
+ if ret is not None:
+ logging.info(lf("worker died before establishing "
+ "connection",
+ brick=w[0]['dir']))
+ else:
+ logging.debug("worker(%s) connected" % w[0]['dir'])
+ while time.time() < t0 + conn_timeout:
+ ret = nwait(cpid, os.WNOHANG)
+
+ if ret is not None:
+ logging.info(lf("worker died in startup phase",
+ brick=w[0]['dir']))
+ break
+
+ time.sleep(1)
+ else:
+ logging.info(
+ lf("Worker not confirmed after wait, aborting it. "
+ "Gsyncd invocation on remote slave via SSH or "
+ "gluster master mount might have hung. Please "
+ "check the above logs for exact issue and check "
+ "master or slave volume for errors. Restarting "
+ "master/slave volume accordingly might help.",
+ brick=w[0]['dir'],
+ timeout=conn_timeout))
+ errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
+ ret = nwait(cpid)
+ if ret is None:
+ ret = nwait(cpid)
+ if exit_signalled(ret):
+ ret = 0
+ else:
+ ret = exit_status(ret)
+ if ret in (0, 1):
+ self.status[w[0]['dir']].set_worker_status(self.ST_FAULTY)
+ gf_event(EVENT_GEOREP_FAULTY,
+ master_volume=master.volume,
+ master_node=w[0]['host'],
+ master_node_id=w[0]['uuid'],
+ slave_host=slave_host,
+ slave_volume=slave_vol,
+ current_slave_host=current_slave_host,
+ brick_path=w[0]['dir'])
+ time.sleep(10)
+ self.status[w[0]['dir']].set_worker_status(self.ST_INCON)
+ return ret
+
+ def multiplex(self, wspx, suuid, slave_vol, slave_host, master, slavenodes):
+ argv = [os.path.basename(sys.executable), sys.argv[0]]
+
+ cpids = set()
+ ta = []
+ for wx in wspx:
+ def wmon(w):
+ cpid, _ = self.monitor(w, argv, cpids, slave_vol,
+ slave_host, master, suuid, slavenodes)
+ time.sleep(1)
+ self.lock.acquire()
+ for cpid in cpids:
+ errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
+ self.lock.release()
+ finalize(exval=1)
+ t = Thread(target=wmon, args=[wx])
+ t.start()
+ ta.append(t)
+
+ # monitor status was being updated in each monitor thread. It
+ # should not be done as it can cause deadlock for a worker start.
+ # set_monitor_status uses flock to synchronize multple instances
+ # updating the file. Since each monitor thread forks worker,
+ # these processes can hold the reference to fd of status
+ # file causing deadlock to workers which starts later as flock
+ # will not be release until all references to same fd is closed.
+ # It will also cause fd leaks.
+
+ self.lock.acquire()
+ set_monitor_status(gconf.get("state-file"), self.ST_STARTED)
+ self.lock.release()
+ for t in ta:
+ t.join()
+
+
+def distribute(master, slave):
+ if rconf.args.use_gconf_volinfo:
+ mvol = VolinfoFromGconf(master.volume, master=True)
+ else:
+ mvol = Volinfo(master.volume, master.host, master=True)
+ logging.debug('master bricks: ' + repr(mvol.bricks))
+ prelude = []
+ slave_host = None
+ slave_vol = None
+
+ prelude = [gconf.get("ssh-command")] + \
+ gconf.get("ssh-options").split() + \
+ ["-p", str(gconf.get("ssh-port"))] + \
+ [slave.remote_addr]
+
+ logging.debug('slave SSH gateway: ' + slave.remote_addr)
+
+ if rconf.args.use_gconf_volinfo:
+ svol = VolinfoFromGconf(slave.volume, master=False)
+ else:
+ svol = Volinfo(slave.volume, "localhost", prelude, master=False)
+
+ sbricks = svol.bricks
+ suuid = svol.uuid
+ slave_host = slave.remote_addr.split('@')[-1]
+ slave_vol = slave.volume
+
+ # save this xattr for the session delete command
+ old_stime_xattr_prefix = gconf.get("stime-xattr-prefix", None)
+ new_stime_xattr_prefix = "trusted.glusterfs." + mvol.uuid + "." + \
+ svol.uuid
+ if not old_stime_xattr_prefix or \
+ old_stime_xattr_prefix != new_stime_xattr_prefix:
+ gconf.setconfig("stime-xattr-prefix", new_stime_xattr_prefix)
+
+ logging.debug('slave bricks: ' + repr(sbricks))
+
+ slavenodes = set((b['host'], b["uuid"]) for b in sbricks)
+ rap = SSH.parse_ssh_address(slave)
+ slaves = [(rap['user'] + '@' + h[0], h[1]) for h in slavenodes]
+
+ workerspex = []
+ for idx, brick in enumerate(mvol.bricks):
+ if rconf.args.local_node_id == brick['uuid']:
+ is_hot = mvol.is_hot(":".join([brick['host'], brick['dir']]))
+ workerspex.append((brick,
+ slaves[idx % len(slaves)],
+ get_subvol_num(idx, mvol, is_hot),
+ is_hot))
+ logging.debug('worker specs: ' + repr(workerspex))
+ return workerspex, suuid, slave_vol, slave_host, master, slavenodes
+
+
+def monitor(local, remote):
+ # Check if gsyncd restarted in pause state. If
+ # yes, send SIGSTOP to negative of monitor pid
+ # to go back to pause state.
+ if rconf.args.pause_on_start:
+ errno_wrap(os.kill, [-os.getpid(), signal.SIGSTOP], [ESRCH])
+
+ """oh yeah, actually Monitor is used as singleton, too"""
+ return Monitor().multiplex(*distribute(local, remote))
+
+
+def startup(go_daemon=True):
+ """set up logging, pidfile grabbing, daemonization"""
+ pid_file = gconf.get("pid-file")
+ if not grabpidfile():
+ sys.stderr.write("pidfile is taken, exiting.\n")
+ sys.exit(2)
+ rconf.pid_file_owned = True
+
+ if not go_daemon:
+ return
+
+ x, y = pipe()
+ cpid = os.fork()
+ if cpid:
+ os.close(x)
+ sys.exit()
+ os.close(y)
+ os.setsid()
+ dn = os.open(os.devnull, os.O_RDWR)
+ for f in (sys.stdin, sys.stdout, sys.stderr):
+ os.dup2(dn, f.fileno())
+
+ if not grabpidfile(pid_file + '.tmp'):
+ raise GsyncdError("cannot grab temporary pidfile")
+
+ os.rename(pid_file + '.tmp', pid_file)
+
+ # wait for parent to terminate
+ # so we can start up with
+ # no messing from the dirty
+ # ol' bustard
+ select((x,), (), ())
+ os.close(x)