diff options
Diffstat (limited to 'geo-replication/syncdaemon/syncdutils.py')
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 160 |
1 files changed, 126 insertions, 34 deletions
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index fd96ca70b2f..a3df103e76c 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -21,8 +21,8 @@ import subprocess import socket from subprocess import PIPE from threading import Lock, Thread as baseThread -from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED -from errno import EINTR, ENOENT, ESTALE, EBUSY, errorcode +from errno import (EACCES, EAGAIN, EPIPE, ENOTCONN, ENOMEM, ECONNABORTED, + EINTR, ENOENT, ESTALE, EBUSY, ENODATA, errorcode, EIO) from signal import signal, SIGTERM import select as oselect from os import waitpid as owaitpid @@ -37,10 +37,10 @@ from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE sys.path.insert(1, GLUSTERFS_LIBEXECDIR) EVENTS_ENABLED = True try: - from events.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY - from events.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE - from events.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE - from events.eventtypes import GEOREP_CHECKPOINT_COMPLETED \ + from gfevents.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY + from gfevents.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE + from gfevents.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE + from gfevents.eventtypes import GEOREP_CHECKPOINT_COMPLETED \ as EVENT_GEOREP_CHECKPOINT_COMPLETED except ImportError: # Events APIs not installed, dummy eventtypes with None @@ -55,14 +55,15 @@ from rconf import rconf from hashlib import sha256 as sha256 +ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') + # auxiliary gfid based access prefix _CL_AUX_GFID_PFX = ".gfid/" +ROOT_GFID = "00000000-0000-0000-0000-000000000001" GF_OP_RETRIES = 10 GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' -CHANGELOG_AGENT_SERVER_VERSION = 1.0 -CHANGELOG_AGENT_CLIENT_VERSION = 1.0 NodeID = None rsync_version = None unshare_mnt_propagation = None @@ -99,6 +100,19 @@ def unescape_space_newline(s): .replace(NEWLINE_ESCAPE_CHAR, "\n")\ .replace(PERCENTAGE_ESCAPE_CHAR, "%") +# gf_mount_ready() returns 1 if all subvols are up, else 0 +def gf_mount_ready(): + ret = errno_wrap(Xattr.lgetxattr, + ['.', 'dht.subvol.status', 16], + [ENOENT, ENOTSUP, ENODATA], [ENOMEM]) + + if isinstance(ret, int): + logging.error("failed to get the xattr value") + return 1 + ret = ret.rstrip('\x00') + if ret == "1": + return 1 + return 0 def norm(s): if s: @@ -330,13 +344,24 @@ def log_raise_exception(excont): ECONNABORTED): logging.error(lf('Gluster Mount process exited', error=errorcode[exc.errno])) + elif isinstance(exc, OSError) and exc.errno == EIO: + logging.error("Getting \"Input/Output error\" " + "is most likely due to " + "a. Brick is down or " + "b. Split brain issue.") + logging.error("This is expected as per design to " + "keep the consistency of the file system. " + "Once the above issue is resolved " + "geo-replication would automatically " + "proceed further.") + logtag = "FAIL" else: logtag = "FAIL" if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG): logtag = "FULL EXCEPTION TRACE" if logtag: logging.exception(logtag + ": ") - sys.stderr.write("failed with %s.\n" % type(exc).__name__) + sys.stderr.write("failed with %s: %s.\n" % (type(exc).__name__, exc)) excont.exval = 1 sys.exit(excont.exval) @@ -563,7 +588,6 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]): def lstat(e): return errno_wrap(os.lstat, [e], [ENOENT], [ESTALE, EBUSY]) - def get_gfid_from_mnt(gfidpath): return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', @@ -599,7 +623,7 @@ class ChangelogException(OSError): def gf_event(event_type, **kwargs): if EVENTS_ENABLED: - from events.gf_event import gf_event as gfevent + from gfevents.gf_event import gf_event as gfevent gfevent(event_type, **kwargs) @@ -670,9 +694,10 @@ def get_slv_dir_path(slv_host, slv_volume, gfid): global slv_bricks dir_path = ENOENT + pfx = gauxpfx() if not slv_bricks: - slv_info = Volinfo(slv_volume, slv_host) + slv_info = Volinfo(slv_volume, slv_host, master=False) slv_bricks = slv_info.bricks # Result of readlink would be of format as below. # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename" @@ -683,19 +708,32 @@ def get_slv_dir_path(slv_host, slv_volume, gfid): gfid[2:4], gfid], [ENOENT], [ESTALE]) if dir_path != ENOENT: - break - - if not isinstance(dir_path, int): - realpath = errno_wrap(os.readlink, [dir_path], - [ENOENT], [ESTALE]) - - if not isinstance(realpath, int): - realpath_parts = realpath.split('/') - pargfid = realpath_parts[-2] - basename = realpath_parts[-1] - pfx = gauxpfx() - dir_entry = os.path.join(pfx, pargfid, basename) - return dir_entry + try: + realpath = errno_wrap(os.readlink, [dir_path], + [ENOENT], [ESTALE]) + if not isinstance(realpath, int): + realpath_parts = realpath.split('/') + pargfid = realpath_parts[-2] + basename = realpath_parts[-1] + dir_entry = os.path.join(pfx, pargfid, basename) + return dir_entry + except OSError: + # .gfid/GFID + gfidpath = unescape_space_newline(os.path.join(pfx, gfid)) + realpath = errno_wrap(Xattr.lgetxattr_buf, + [gfidpath, 'glusterfs.gfid2path'], [ENOENT], [ESTALE]) + if not isinstance(realpath, int): + basename = os.path.basename(realpath).rstrip('\x00') + dirpath = os.path.dirname(realpath) + if dirpath == "/": + pargfid = ROOT_GFID + else: + dirpath = dirpath.strip("/") + pargfid = get_gfid_from_mnt(dirpath) + if isinstance(pargfid, int): + return None + dir_entry = os.path.join(pfx, pargfid, basename) + return dir_entry return None @@ -705,12 +743,12 @@ def lf(event, **kwargs): Log Format helper function, log messages can be easily modified to structured log format. lf("Config Change", sync_jobs=4, brick=/bricks/b1) will be - converted as "Config Change<TAB>brick=/bricks/b1<TAB>sync_jobs=4" + converted as "Config Change [{brick=/bricks/b1}, {sync_jobs=4}]" """ - msg = event + msgparts = [] for k, v in kwargs.items(): - msg += "\t{0}={1}".format(k, v) - return msg + msgparts.append("{%s=%s}" % (k, v)) + return "%s [%s]" % (event, ", ".join(msgparts)) class Popen(subprocess.Popen): @@ -856,10 +894,29 @@ class Popen(subprocess.Popen): self.errfail() +def host_brick_split(value): + """ + IPv6 compatible way to split and get the host + and brick information. Example inputs: + node1.example.com:/exports/bricks/brick1/brick + fe80::af0f:df82:844f:ef66%utun0:/exports/bricks/brick1/brick + """ + parts = value.split(":") + brick = parts[-1] + hostparts = parts[0:-1] + return (":".join(hostparts), brick) + + class Volinfo(object): - def __init__(self, vol, host='localhost', prelude=[]): - po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, + def __init__(self, vol, host='localhost', prelude=[], master=True): + if master: + gluster_cmd_dir = gconf.get("gluster-command-dir") + else: + gluster_cmd_dir = gconf.get("slave-gluster-command-dir") + + gluster_cmd = os.path.join(gluster_cmd_dir, 'gluster') + po = Popen(prelude + [gluster_cmd, '--xml', '--remote-host=' + host, 'volume', 'info', vol], stdout=PIPE, stderr=PIPE, universal_newlines=True) vix = po.stdout.read() @@ -892,7 +949,7 @@ class Volinfo(object): @memoize def bricks(self): def bparse(b): - host, dirp = b.find("name").text.split(':', 2) + host, dirp = host_brick_split(b.find("name").text) return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text} return [bparse(b) for b in self.get('brick')] @@ -924,6 +981,14 @@ class Volinfo(object): else: return int(self.get('disperseCount')[0].text) + def distribution_count(self, tier, hot): + if (tier and hot): + return int(self.get('hotBricks/hotdistCount')[0].text) + elif (tier and not hot): + return int(self.get('coldBricks/colddistCount')[0].text) + else: + return int(self.get('distCount')[0].text) + @property @memoize def hot_bricks(self): @@ -960,6 +1025,16 @@ class VolinfoFromGconf(object): def is_hot(self, brickpath): return False + def is_uuid(self, value): + try: + uuid.UUID(value) + return True + except ValueError: + return False + + def possible_path(self, value): + return "/" in value + @property @memoize def bricks(self): @@ -973,8 +1048,22 @@ class VolinfoFromGconf(object): out = [] for b in bricks_data: parts = b.split(":") - bpath = parts[2] if len(parts) == 3 else "" - out.append({"host": parts[1], "dir": bpath, "uuid": parts[0]}) + b_uuid = None + if self.is_uuid(parts[0]): + b_uuid = parts[0] + # Set all parts except first + parts = parts[1:] + + if self.possible_path(parts[-1]): + bpath = parts[-1] + # Set all parts except last + parts = parts[0:-1] + + out.append({ + "host": ":".join(parts), # if remaining parts are IPv6 name + "dir": bpath, + "uuid": b_uuid + }) return out @@ -992,6 +1081,9 @@ class VolinfoFromGconf(object): def disperse_count(self, tier, hot): return gconf.get("master-disperse-count") + def distribution_count(self, tier, hot): + return gconf.get("master-distribution-count") + @property @memoize def hot_bricks(self): |
