diff options
Diffstat (limited to 'geo-replication/syncdaemon/syncdutils.py')
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 832 |
1 files changed, 711 insertions, 121 deletions
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 987e1bf186e..a3df103e76c 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -15,52 +15,104 @@ import time import fcntl import shutil import logging +import errno +import threading +import subprocess import socket +from subprocess import PIPE from threading import Lock, Thread as baseThread -from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED -from errno import EINTR, ENOENT, EPERM, ESTALE, errorcode +from errno import (EACCES, EAGAIN, EPIPE, ENOTCONN, ENOMEM, ECONNABORTED, + EINTR, ENOENT, ESTALE, EBUSY, ENODATA, errorcode, EIO) from signal import signal, SIGTERM import select as oselect from os import waitpid as owaitpid - +import xml.etree.ElementTree as XET +from select import error as SelectError try: from cPickle import PickleError except ImportError: - # py 3 from pickle import PickleError -from gconf import gconf - +from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE +sys.path.insert(1, GLUSTERFS_LIBEXECDIR) +EVENTS_ENABLED = True try: - # py 3 - from urllib import parse as urllib + from gfevents.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY + from gfevents.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE + from gfevents.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE + from gfevents.eventtypes import GEOREP_CHECKPOINT_COMPLETED \ + as EVENT_GEOREP_CHECKPOINT_COMPLETED except ImportError: - import urllib + # Events APIs not installed, dummy eventtypes with None + EVENTS_ENABLED = False + EVENT_GEOREP_FAULTY = None + EVENT_GEOREP_ACTIVE = None + EVENT_GEOREP_PASSIVE = None + EVENT_GEOREP_CHECKPOINT_COMPLETED = None -try: - from hashlib import md5 as md5 -except ImportError: - # py 2.4 - from md5 import new as md5 +import gsyncdconfig as gconf +from rconf import rconf + +from hashlib import sha256 as sha256 + +ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') # auxiliary gfid based access prefix _CL_AUX_GFID_PFX = ".gfid/" -GF_OP_RETRIES = 20 +ROOT_GFID = "00000000-0000-0000-0000-000000000001" +GF_OP_RETRIES = 10 -CHANGELOG_AGENT_SERVER_VERSION = 1.0 -CHANGELOG_AGENT_CLIENT_VERSION = 1.0 +GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' + +NodeID = None +rsync_version = None +unshare_mnt_propagation = None +slv_bricks = None +SPACE_ESCAPE_CHAR = "%20" +NEWLINE_ESCAPE_CHAR = "%0A" +PERCENTAGE_ESCAPE_CHAR = "%25" + +final_lock = Lock() + +def sup(x, *a, **kw): + """a rubyesque "super" for python ;) + + invoke caller method in parent class with given args. + """ + return getattr(super(type(x), x), + sys._getframe(1).f_code.co_name)(*a, **kw) def escape(s): """the chosen flavor of string escaping, used all over to turn whatever data to creatable representation""" - return urllib.quote_plus(s) + return s.replace("/", "-").strip("-") + + +def escape_space_newline(s): + return s.replace("%", PERCENTAGE_ESCAPE_CHAR)\ + .replace(" ", SPACE_ESCAPE_CHAR)\ + .replace("\n", NEWLINE_ESCAPE_CHAR) + +def unescape_space_newline(s): + return s.replace(SPACE_ESCAPE_CHAR, " ")\ + .replace(NEWLINE_ESCAPE_CHAR, "\n")\ + .replace(PERCENTAGE_ESCAPE_CHAR, "%") -def unescape(s): - """inverse of .escape""" - return urllib.unquote_plus(s) +# gf_mount_ready() returns 1 if all subvols are up, else 0 +def gf_mount_ready(): + ret = errno_wrap(Xattr.lgetxattr, + ['.', 'dht.subvol.status', 16], + [ENOENT, ENOTSUP, ENODATA], [ENOMEM]) + if isinstance(ret, int): + logging.error("failed to get the xattr value") + return 1 + ret = ret.rstrip('\x00') + if ret == "1": + return 1 + return 0 def norm(s): if s: @@ -119,17 +171,26 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url): """ Setup GConf ssh control path parameters """ - gconf.ssh_ctl_dir = ctld + rconf.ssh_ctl_dir = ctld content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr, resource_url) - content_md5 = md5hex(content) - fname = os.path.join(gconf.ssh_ctl_dir, - "%s.mft" % content_md5) + encoded_content = content.encode() + content_sha256 = sha256hex(encoded_content) + """ + The length of ctl_path for ssh connection should not be > 108. + ssh fails with ctl_path too long if it is so. But when rsync + is piped to ssh, it is not taking > 90. Hence using first 32 + bytes of hash. Hash collision doesn't matter as only one sock + file is created per directory. + """ + content_sha256 = content_sha256[:32] + fname = os.path.join(rconf.ssh_ctl_dir, + "%s.mft" % content_sha256) - create_manifest(fname, content) - ssh_ctl_path = os.path.join(gconf.ssh_ctl_dir, - "%s.sock" % content_md5) - gconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path] + create_manifest(fname, encoded_content) + ssh_ctl_path = os.path.join(rconf.ssh_ctl_dir, + "%s.sock" % content_sha256) + rconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path] def grabfile(fname, content=None): @@ -139,7 +200,7 @@ def grabfile(fname, content=None): """ # damn those messy open() mode codes fd = os.open(fname, os.O_CREAT | os.O_RDWR) - f = os.fdopen(fd, 'r+b', 0) + f = os.fdopen(fd, 'r+') try: fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB) except: @@ -153,34 +214,34 @@ def grabfile(fname, content=None): try: f.truncate() f.write(content) + f.flush() except: f.close() raise - gconf.permanent_handles.append(f) + rconf.permanent_handles.append(f) return f def grabpidfile(fname=None, setpid=True): """.grabfile customization for pid files""" if not fname: - fname = gconf.pid_file + fname = gconf.get("pid-file") content = None if setpid: content = str(os.getpid()) + '\n' return grabfile(fname, content=content) -final_lock = Lock() - -def finalize(*a, **kw): +def finalize(*args, **kwargs): """all those messy final steps we go trough upon termination Do away with pidfile, ssh control dir and logging. """ + final_lock.acquire() - if getattr(gconf, 'pid_file', None): - rm_pidf = gconf.pid_file_owned - if gconf.cpid: + if gconf.get('pid_file'): + rm_pidf = rconf.pid_file_owned + if rconf.cpid: # exit path from parent branch of daemonization rm_pidf = False while True: @@ -188,39 +249,50 @@ def finalize(*a, **kw): if not f: # child has already taken over pidfile break - if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid: + if os.waitpid(rconf.cpid, os.WNOHANG)[0] == rconf.cpid: # child has terminated rm_pidf = True break time.sleep(0.1) if rm_pidf: try: - os.unlink(gconf.pid_file) + os.unlink(rconf.pid_file) except: ex = sys.exc_info()[1] if ex.errno == ENOENT: pass else: raise - if gconf.ssh_ctl_dir and not gconf.cpid: + if rconf.ssh_ctl_dir and not rconf.cpid: def handle_rm_error(func, path, exc_info): if exc_info[1].errno == ENOENT: return raise exc_info[1] - shutil.rmtree(gconf.ssh_ctl_dir, onerror=handle_rm_error) - if getattr(gconf, 'state_socket', None): - try: - os.unlink(gconf.state_socket) - except: - if sys.exc_info()[0] == OSError: + shutil.rmtree(rconf.ssh_ctl_dir, onerror=handle_rm_error) + + """ Unmount if not done """ + if rconf.mount_point: + if rconf.mountbroker: + umount_cmd = rconf.mbr_umount_cmd + [rconf.mount_point, 'lazy'] + else: + umount_cmd = ['umount', '-l', rconf.mount_point] + p0 = subprocess.Popen(umount_cmd, stderr=subprocess.PIPE, + universal_newlines=True) + _, errdata = p0.communicate() + if p0.returncode == 0: + try: + os.rmdir(rconf.mount_point) + except OSError: pass + else: + pass - if gconf.log_exit: + if rconf.log_exit: logging.info("exiting.") sys.stdout.flush() sys.stderr.flush() - os._exit(kw.get('exval', 0)) + os._exit(kwargs.get('exval', 0)) def log_raise_exception(excont): @@ -230,6 +302,7 @@ def log_raise_exception(excont): Translate some weird sounding but well understood exceptions into human-friendly lingo """ + is_filelog = False for h in logging.getLogger().handlers: fno = getattr(getattr(h, 'stream', None), 'fileno', None) @@ -250,9 +323,9 @@ def log_raise_exception(excont): ((isinstance(exc, OSError) or isinstance(exc, IOError)) and exc.errno == EPIPE): logging.error('connection to peer is broken') - if hasattr(gconf, 'transport'): - gconf.transport.wait() - if gconf.transport.returncode == 127: + if hasattr(rconf, 'transport'): + rconf.transport.wait() + if rconf.transport.returncode == 127: logging.error("getting \"No such file or directory\"" "errors is most likely due to " "MISCONFIGURATION, please remove all " @@ -266,18 +339,29 @@ def log_raise_exception(excont): "<SLAVEVOL> config remote-gsyncd " "<GSYNCD_PATH> (Example GSYNCD_PATH: " "`/usr/libexec/glusterfs/gsyncd`)") - gconf.transport.terminate_geterr() + rconf.transport.terminate_geterr() elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED): - logging.error('glusterfs session went down [%s]', - errorcode[exc.errno]) + logging.error(lf('Gluster Mount process exited', + error=errorcode[exc.errno])) + elif isinstance(exc, OSError) and exc.errno == EIO: + logging.error("Getting \"Input/Output error\" " + "is most likely due to " + "a. Brick is down or " + "b. Split brain issue.") + logging.error("This is expected as per design to " + "keep the consistency of the file system. " + "Once the above issue is resolved " + "geo-replication would automatically " + "proceed further.") + logtag = "FAIL" else: logtag = "FAIL" if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG): logtag = "FULL EXCEPTION TRACE" if logtag: logging.exception(logtag + ": ") - sys.stderr.write("failed with %s.\n" % type(exc).__name__) + sys.stderr.write("failed with %s: %s.\n" % (type(exc).__name__, exc)) excont.exval = 1 sys.exit(excont.exval) @@ -300,20 +384,20 @@ class Thread(baseThread): function coughs up an exception """ - def __init__(self, *a, **kw): - tf = kw.get('target') + def __init__(self, *args, **kwargs): + tf = kwargs.get('target') if tf: - def twrap(*aa): + def twrap(*aargs): excont = FreeObject(exval=0) try: - tf(*aa) + tf(*aargs) except: try: log_raise_exception(excont) finally: finalize(exval=excont.exval) - kw['target'] = twrap - baseThread.__init__(self, *a, **kw) + kwargs['target'] = twrap + baseThread.__init__(self, *args, **kwargs) self.setDaemon(True) @@ -321,6 +405,33 @@ class GsyncdError(Exception): pass +class _MetaXattr(object): + + """singleton class, a lazy wrapper around the + libcxattr module + + libcxattr (a heavy import due to ctypes) is + loaded only when when the single + instance is tried to be used. + + This reduces runtime for those invocations + which do not need filesystem manipulation + (eg. for config, url parsing) + """ + + def __getattr__(self, meth): + from libcxattr import Xattr as LXattr + xmeth = [m for m in dir(LXattr) if m[0] != '_'] + if meth not in xmeth: + return + for m in xmeth: + setattr(self, m, getattr(LXattr, m)) + return getattr(self, meth) + + +Xattr = _MetaXattr() + + def getusername(uid=None): if uid is None: uid = os.geteuid() @@ -351,81 +462,63 @@ def boolify(s): lstr = s.lower() if lstr in true_list: rv = True - elif not lstr in false_list: - logging.warn("Unknown string (%s) in string to boolean conversion " - "defaulting to False\n" % (s)) + elif lstr not in false_list: + logging.warn(lf("Unknown string in \"string to boolean\" conversion, " + "defaulting to False", + str=s)) return rv -def eintr_wrap(func, exc, *a): +def eintr_wrap(func, exc, *args): """ wrapper around syscalls resilient to interrupt caused by signals """ while True: try: - return func(*a) + return func(*args) except exc: ex = sys.exc_info()[1] if not ex.args[0] == EINTR: raise -def select(*a): - return eintr_wrap(oselect.select, oselect.error, *a) +def select(*args): + return eintr_wrap(oselect.select, oselect.error, *args) + +def waitpid(*args): + return eintr_wrap(owaitpid, OSError, *args) -def waitpid(*a): - return eintr_wrap(owaitpid, OSError, *a) +def term_handler_default_hook(signum, frame): + finalize(signum, frame, exval=1) -def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})): + +def set_term_handler(hook=term_handler_default_hook): signal(SIGTERM, hook) -def is_host_local(host): - locaddr = False - for ai in socket.getaddrinfo(host, None): - # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators - # /mgmt/glusterd/src/glusterd-utils.c#L125 - if ai[0] == socket.AF_INET: - if ai[-1][0].split(".")[0] == "127": - locaddr = True - break - elif ai[0] == socket.AF_INET6: - if ai[-1][0] == "::1": - locaddr = True +def get_node_uuid(): + global NodeID + if NodeID is not None: + return NodeID + + NodeID = "" + with open(UUID_FILE) as f: + for line in f: + if line.startswith("UUID="): + NodeID = line.strip().split("=")[-1] break - else: - continue - try: - # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue, - # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587 - s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP) - except socket.error: - ex = sys.exc_info()[1] - if ex.errno != EPERM: - raise - f = None - try: - f = open("/proc/sys/net/ipv4/ip_nonlocal_bind") - if int(f.read()) != 0: - raise GsyncdError( - "non-local bind is set and not allowed to create " - "raw sockets, cannot determine if %s is local" % host) - s = socket.socket(ai[0], socket.SOCK_DGRAM) - finally: - if f: - f.close() - try: - s.bind(ai[-1]) - locaddr = True - break - except: - pass - s.close() - return locaddr + + if NodeID == "": + raise GsyncdError("Failed to get Host UUID from %s" % UUID_FILE) + return NodeID + + +def is_host_local(host_id): + return host_id == get_node_uuid() def funcode(f): @@ -461,8 +554,8 @@ def gauxpfx(): return _CL_AUX_GFID_PFX -def md5hex(s): - return md5(s).hexdigest() +def sha256hex(s): + return sha256(s).hexdigest() def selfkill(sig=SIGTERM): @@ -480,34 +573,60 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]): ex = sys.exc_info()[1] if ex.errno in errnos: return ex.errno - if not ex.errno in retry_errnos: + if ex.errno not in retry_errnos: raise nr_tries += 1 if nr_tries == GF_OP_RETRIES: # probably a screwed state, cannot do much... - logging.warn('reached maximum retries (%s)...%s' % - (repr(arg), ex)) - return ex.errno + logging.warn(lf('reached maximum retries', + args=repr(arg), + error=ex)) + raise time.sleep(0.250) # retry the call def lstat(e): - return errno_wrap(os.lstat, [e], [ENOENT], [ESTALE]) + return errno_wrap(os.lstat, [e], [ENOENT], [ESTALE, EBUSY]) + +def get_gfid_from_mnt(gfidpath): + return errno_wrap(Xattr.lgetxattr, + [gfidpath, 'glusterfs.gfid.string', + GX_GFID_CANONICAL_LEN], [ENOENT], [ESTALE]) + + +def matching_disk_gfid(gfid, entry): + disk_gfid = get_gfid_from_mnt(entry) + if isinstance(disk_gfid, int): + return False -class NoPurgeTimeAvailable(Exception): + if not gfid == disk_gfid: + return False + + return True + + +class NoStimeAvailable(Exception): pass class PartialHistoryAvailable(Exception): pass + class ChangelogHistoryNotAvailable(Exception): pass + class ChangelogException(OSError): pass +def gf_event(event_type, **kwargs): + if EVENTS_ENABLED: + from gfevents.gf_event import gf_event as gfevent + gfevent(event_type, **kwargs) + + class GlusterLogLevel(object): NONE = 0 EMERG = 1 @@ -523,3 +642,474 @@ class GlusterLogLevel(object): def get_changelog_log_level(lvl): return getattr(GlusterLogLevel, lvl, GlusterLogLevel.INFO) + + +def get_master_and_slave_data_from_args(args): + master_name = None + slave_data = None + for arg in args: + if arg.startswith(":"): + master_name = arg.replace(":", "") + if "::" in arg: + slave_data = arg.replace("ssh://", "") + + return (master_name, slave_data) + +def unshare_propagation_supported(): + global unshare_mnt_propagation + if unshare_mnt_propagation is not None: + return unshare_mnt_propagation + + unshare_mnt_propagation = False + p = subprocess.Popen(["unshare", "--help"], + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=True) + out, err = p.communicate() + if p.returncode == 0: + if "propagation" in out: + unshare_mnt_propagation = True + + return unshare_mnt_propagation + + +def get_rsync_version(rsync_cmd): + global rsync_version + if rsync_version is not None: + return rsync_version + + rsync_version = "0" + p = subprocess.Popen([rsync_cmd, "--version"], + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=True) + out, err = p.communicate() + if p.returncode == 0: + rsync_version = out.split(" ", 4)[3] + + return rsync_version + + +def get_slv_dir_path(slv_host, slv_volume, gfid): + global slv_bricks + + dir_path = ENOENT + pfx = gauxpfx() + + if not slv_bricks: + slv_info = Volinfo(slv_volume, slv_host, master=False) + slv_bricks = slv_info.bricks + # Result of readlink would be of format as below. + # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename" + for brick in slv_bricks: + dir_path = errno_wrap(os.path.join, + [brick['dir'], + ".glusterfs", gfid[0:2], + gfid[2:4], + gfid], [ENOENT], [ESTALE]) + if dir_path != ENOENT: + try: + realpath = errno_wrap(os.readlink, [dir_path], + [ENOENT], [ESTALE]) + if not isinstance(realpath, int): + realpath_parts = realpath.split('/') + pargfid = realpath_parts[-2] + basename = realpath_parts[-1] + dir_entry = os.path.join(pfx, pargfid, basename) + return dir_entry + except OSError: + # .gfid/GFID + gfidpath = unescape_space_newline(os.path.join(pfx, gfid)) + realpath = errno_wrap(Xattr.lgetxattr_buf, + [gfidpath, 'glusterfs.gfid2path'], [ENOENT], [ESTALE]) + if not isinstance(realpath, int): + basename = os.path.basename(realpath).rstrip('\x00') + dirpath = os.path.dirname(realpath) + if dirpath == "/": + pargfid = ROOT_GFID + else: + dirpath = dirpath.strip("/") + pargfid = get_gfid_from_mnt(dirpath) + if isinstance(pargfid, int): + return None + dir_entry = os.path.join(pfx, pargfid, basename) + return dir_entry + + return None + + +def lf(event, **kwargs): + """ + Log Format helper function, log messages can be + easily modified to structured log format. + lf("Config Change", sync_jobs=4, brick=/bricks/b1) will be + converted as "Config Change [{brick=/bricks/b1}, {sync_jobs=4}]" + """ + msgparts = [] + for k, v in kwargs.items(): + msgparts.append("{%s=%s}" % (k, v)) + return "%s [%s]" % (event, ", ".join(msgparts)) + + +class Popen(subprocess.Popen): + + """customized subclass of subprocess.Popen with a ring + buffer for children error output""" + + @classmethod + def init_errhandler(cls): + """start the thread which handles children's error output""" + cls.errstore = {} + + def tailer(): + while True: + errstore = cls.errstore.copy() + try: + poe, _, _ = select( + [po.stderr for po in errstore], [], [], 1) + except (ValueError, SelectError): + # stderr is already closed wait for some time before + # checking next error + time.sleep(0.5) + continue + for po in errstore: + if po.stderr not in poe: + continue + po.lock.acquire() + try: + if po.on_death_row: + continue + la = errstore[po] + try: + fd = po.stderr.fileno() + except ValueError: # file is already closed + time.sleep(0.5) + continue + + try: + l = os.read(fd, 1024) + except OSError: + time.sleep(0.5) + continue + + if not l: + continue + tots = len(l) + for lx in la: + tots += len(lx) + while tots > 1 << 20 and la: + tots -= len(la.pop(0)) + la.append(l) + finally: + po.lock.release() + t = Thread(target=tailer) + t.start() + cls.errhandler = t + + @classmethod + def fork(cls): + """fork wrapper that restarts errhandler thread in child""" + pid = os.fork() + if not pid: + cls.init_errhandler() + return pid + + def __init__(self, args, *a, **kw): + """customizations for subprocess.Popen instantiation + + - 'close_fds' is taken to be the default + - if child's stderr is chosen to be managed, + register it with the error handler thread + """ + self.args = args + if 'close_fds' not in kw: + kw['close_fds'] = True + self.lock = threading.Lock() + self.on_death_row = False + self.elines = [] + try: + sup(self, args, *a, **kw) + except: + ex = sys.exc_info()[1] + if not isinstance(ex, OSError): + raise + raise GsyncdError("""execution of "%s" failed with %s (%s)""" % + (args[0], errno.errorcode[ex.errno], + os.strerror(ex.errno))) + if kw.get('stderr') == subprocess.PIPE: + assert(getattr(self, 'errhandler', None)) + self.errstore[self] = [] + + def errlog(self): + """make a log about child's failure event""" + logging.error(lf("command returned error", + cmd=" ".join(self.args), + error=self.returncode)) + lp = '' + + def logerr(l): + logging.error(self.args[0] + "> " + l) + for l in self.elines: + ls = l.split('\n') + ls[0] = lp + ls[0] + lp = ls.pop() + for ll in ls: + logerr(ll) + if lp: + logerr(lp) + + def errfail(self): + """fail nicely if child did not terminate with success""" + self.errlog() + finalize(exval=1) + + def terminate_geterr(self, fail_on_err=True): + """kill child, finalize stderr harvesting (unregister + from errhandler, set up .elines), fail on error if + asked for + """ + self.lock.acquire() + try: + self.on_death_row = True + finally: + self.lock.release() + elines = self.errstore.pop(self) + if self.poll() is None: + self.terminate() + if self.poll() is None: + time.sleep(0.1) + self.kill() + self.wait() + while True: + if not select([self.stderr], [], [], 0.1)[0]: + break + b = os.read(self.stderr.fileno(), 1024) + if b: + elines.append(b.decode()) + else: + break + self.stderr.close() + self.elines = elines + if fail_on_err and self.returncode != 0: + self.errfail() + + +def host_brick_split(value): + """ + IPv6 compatible way to split and get the host + and brick information. Example inputs: + node1.example.com:/exports/bricks/brick1/brick + fe80::af0f:df82:844f:ef66%utun0:/exports/bricks/brick1/brick + """ + parts = value.split(":") + brick = parts[-1] + hostparts = parts[0:-1] + return (":".join(hostparts), brick) + + +class Volinfo(object): + + def __init__(self, vol, host='localhost', prelude=[], master=True): + if master: + gluster_cmd_dir = gconf.get("gluster-command-dir") + else: + gluster_cmd_dir = gconf.get("slave-gluster-command-dir") + + gluster_cmd = os.path.join(gluster_cmd_dir, 'gluster') + po = Popen(prelude + [gluster_cmd, '--xml', '--remote-host=' + host, + 'volume', 'info', vol], + stdout=PIPE, stderr=PIPE, universal_newlines=True) + vix = po.stdout.read() + po.wait() + po.terminate_geterr() + vi = XET.fromstring(vix) + if vi.find('opRet').text != '0': + if prelude: + via = '(via %s) ' % prelude.join(' ') + else: + via = ' ' + raise GsyncdError('getting volume info of %s%s ' + 'failed with errorcode %s' % + (vol, via, vi.find('opErrno').text)) + self.tree = vi + self.volume = vol + self.host = host + + def get(self, elem): + return self.tree.findall('.//' + elem) + + def is_tier(self): + return (self.get('typeStr')[0].text == 'Tier') + + def is_hot(self, brickpath): + logging.debug('brickpath: ' + repr(brickpath)) + return brickpath in self.hot_bricks + + @property + @memoize + def bricks(self): + def bparse(b): + host, dirp = host_brick_split(b.find("name").text) + return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text} + return [bparse(b) for b in self.get('brick')] + + @property + @memoize + def uuid(self): + ids = self.get('id') + if len(ids) != 1: + raise GsyncdError("volume info of %s obtained from %s: " + "ambiguous uuid" % (self.volume, self.host)) + return ids[0].text + + def replica_count(self, tier, hot): + if (tier and hot): + return int(self.get('hotBricks/hotreplicaCount')[0].text) + elif (tier and not hot): + return int(self.get('coldBricks/coldreplicaCount')[0].text) + else: + return int(self.get('replicaCount')[0].text) + + def disperse_count(self, tier, hot): + if (tier and hot): + # Tiering doesn't support disperse volume as hot brick, + # hence no xml output, so returning 0. In case, if it's + # supported later, we should change here. + return 0 + elif (tier and not hot): + return int(self.get('coldBricks/colddisperseCount')[0].text) + else: + return int(self.get('disperseCount')[0].text) + + def distribution_count(self, tier, hot): + if (tier and hot): + return int(self.get('hotBricks/hotdistCount')[0].text) + elif (tier and not hot): + return int(self.get('coldBricks/colddistCount')[0].text) + else: + return int(self.get('distCount')[0].text) + + @property + @memoize + def hot_bricks(self): + return [b.text for b in self.get('hotBricks/brick')] + + def get_hot_bricks_count(self, tier): + if (tier): + return int(self.get('hotBricks/hotbrickCount')[0].text) + else: + return 0 + + +class VolinfoFromGconf(object): + # Glusterd will generate following config items before Geo-rep start + # So that Geo-rep need not run gluster commands from inside + # Volinfo object API/interface kept as is so that caller need not + # change anything except calling this instead of Volinfo() + # + # master-bricks= + # master-bricks=NODEID:HOSTNAME:PATH,.. + # slave-bricks=NODEID:HOSTNAME,.. + # master-volume-id= + # slave-volume-id= + # master-replica-count= + # master-disperse_count= + def __init__(self, vol, host='localhost', master=True): + self.volume = vol + self.host = host + self.master = master + + def is_tier(self): + return False + + def is_hot(self, brickpath): + return False + + def is_uuid(self, value): + try: + uuid.UUID(value) + return True + except ValueError: + return False + + def possible_path(self, value): + return "/" in value + + @property + @memoize + def bricks(self): + pfx = "master-" if self.master else "slave-" + bricks_data = gconf.get(pfx + "bricks") + if bricks_data is None: + return [] + + bricks_data = bricks_data.split(",") + bricks_data = [b.strip() for b in bricks_data] + out = [] + for b in bricks_data: + parts = b.split(":") + b_uuid = None + if self.is_uuid(parts[0]): + b_uuid = parts[0] + # Set all parts except first + parts = parts[1:] + + if self.possible_path(parts[-1]): + bpath = parts[-1] + # Set all parts except last + parts = parts[0:-1] + + out.append({ + "host": ":".join(parts), # if remaining parts are IPv6 name + "dir": bpath, + "uuid": b_uuid + }) + + return out + + @property + @memoize + def uuid(self): + if self.master: + return gconf.get("master-volume-id") + else: + return gconf.get("slave-volume-id") + + def replica_count(self, tier, hot): + return gconf.get("master-replica-count") + + def disperse_count(self, tier, hot): + return gconf.get("master-disperse-count") + + def distribution_count(self, tier, hot): + return gconf.get("master-distribution-count") + + @property + @memoize + def hot_bricks(self): + return [] + + def get_hot_bricks_count(self, tier): + return 0 + + +def can_ssh(host, port=22): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.connect((host, port)) + flag = True + except socket.error: + flag = False + + s.close() + return flag + + +def get_up_nodes(hosts, port): + # List of hosts with Hostname/IP and UUID + up_nodes = [] + for h in hosts: + if can_ssh(h[0], port): + up_nodes.append(h) + + return up_nodes |
