diff options
Diffstat (limited to 'geo-replication/syncdaemon/syncdutils.py')
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 190 |
1 files changed, 161 insertions, 29 deletions
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 6acc9f17ad7..a3df103e76c 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -21,8 +21,8 @@ import subprocess import socket from subprocess import PIPE from threading import Lock, Thread as baseThread -from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED -from errno import EINTR, ENOENT, ESTALE, EBUSY, errorcode +from errno import (EACCES, EAGAIN, EPIPE, ENOTCONN, ENOMEM, ECONNABORTED, + EINTR, ENOENT, ESTALE, EBUSY, ENODATA, errorcode, EIO) from signal import signal, SIGTERM import select as oselect from os import waitpid as owaitpid @@ -37,10 +37,10 @@ from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE sys.path.insert(1, GLUSTERFS_LIBEXECDIR) EVENTS_ENABLED = True try: - from events.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY - from events.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE - from events.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE - from events.eventtypes import GEOREP_CHECKPOINT_COMPLETED \ + from gfevents.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY + from gfevents.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE + from gfevents.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE + from gfevents.eventtypes import GEOREP_CHECKPOINT_COMPLETED \ as EVENT_GEOREP_CHECKPOINT_COMPLETED except ImportError: # Events APIs not installed, dummy eventtypes with None @@ -55,17 +55,19 @@ from rconf import rconf from hashlib import sha256 as sha256 +ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') + # auxiliary gfid based access prefix _CL_AUX_GFID_PFX = ".gfid/" +ROOT_GFID = "00000000-0000-0000-0000-000000000001" GF_OP_RETRIES = 10 GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' -CHANGELOG_AGENT_SERVER_VERSION = 1.0 -CHANGELOG_AGENT_CLIENT_VERSION = 1.0 NodeID = None rsync_version = None unshare_mnt_propagation = None +slv_bricks = None SPACE_ESCAPE_CHAR = "%20" NEWLINE_ESCAPE_CHAR = "%0A" PERCENTAGE_ESCAPE_CHAR = "%25" @@ -98,6 +100,19 @@ def unescape_space_newline(s): .replace(NEWLINE_ESCAPE_CHAR, "\n")\ .replace(PERCENTAGE_ESCAPE_CHAR, "%") +# gf_mount_ready() returns 1 if all subvols are up, else 0 +def gf_mount_ready(): + ret = errno_wrap(Xattr.lgetxattr, + ['.', 'dht.subvol.status', 16], + [ENOENT, ENOTSUP, ENODATA], [ENOMEM]) + + if isinstance(ret, int): + logging.error("failed to get the xattr value") + return 1 + ret = ret.rstrip('\x00') + if ret == "1": + return 1 + return 0 def norm(s): if s: @@ -159,7 +174,8 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url): rconf.ssh_ctl_dir = ctld content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr, resource_url) - content_sha256 = sha256hex(content) + encoded_content = content.encode() + content_sha256 = sha256hex(encoded_content) """ The length of ctl_path for ssh connection should not be > 108. ssh fails with ctl_path too long if it is so. But when rsync @@ -171,7 +187,7 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url): fname = os.path.join(rconf.ssh_ctl_dir, "%s.mft" % content_sha256) - create_manifest(fname, content) + create_manifest(fname, encoded_content) ssh_ctl_path = os.path.join(rconf.ssh_ctl_dir, "%s.sock" % content_sha256) rconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path] @@ -184,7 +200,7 @@ def grabfile(fname, content=None): """ # damn those messy open() mode codes fd = os.open(fname, os.O_CREAT | os.O_RDWR) - f = os.fdopen(fd, 'r+b', 0) + f = os.fdopen(fd, 'r+') try: fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB) except: @@ -198,6 +214,7 @@ def grabfile(fname, content=None): try: f.truncate() f.write(content) + f.flush() except: f.close() raise @@ -260,7 +277,8 @@ def finalize(*args, **kwargs): umount_cmd = rconf.mbr_umount_cmd + [rconf.mount_point, 'lazy'] else: umount_cmd = ['umount', '-l', rconf.mount_point] - p0 = subprocess.Popen(umount_cmd, stderr=subprocess.PIPE) + p0 = subprocess.Popen(umount_cmd, stderr=subprocess.PIPE, + universal_newlines=True) _, errdata = p0.communicate() if p0.returncode == 0: try: @@ -326,13 +344,24 @@ def log_raise_exception(excont): ECONNABORTED): logging.error(lf('Gluster Mount process exited', error=errorcode[exc.errno])) + elif isinstance(exc, OSError) and exc.errno == EIO: + logging.error("Getting \"Input/Output error\" " + "is most likely due to " + "a. Brick is down or " + "b. Split brain issue.") + logging.error("This is expected as per design to " + "keep the consistency of the file system. " + "Once the above issue is resolved " + "geo-replication would automatically " + "proceed further.") + logtag = "FAIL" else: logtag = "FAIL" if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG): logtag = "FULL EXCEPTION TRACE" if logtag: logging.exception(logtag + ": ") - sys.stderr.write("failed with %s.\n" % type(exc).__name__) + sys.stderr.write("failed with %s: %s.\n" % (type(exc).__name__, exc)) excont.exval = 1 sys.exit(excont.exval) @@ -559,7 +588,6 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]): def lstat(e): return errno_wrap(os.lstat, [e], [ENOENT], [ESTALE, EBUSY]) - def get_gfid_from_mnt(gfidpath): return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', @@ -595,7 +623,7 @@ class ChangelogException(OSError): def gf_event(event_type, **kwargs): if EVENTS_ENABLED: - from events.gf_event import gf_event as gfevent + from gfevents.gf_event import gf_event as gfevent gfevent(event_type, **kwargs) @@ -635,7 +663,8 @@ def unshare_propagation_supported(): unshare_mnt_propagation = False p = subprocess.Popen(["unshare", "--help"], stderr=subprocess.PIPE, - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, + universal_newlines=True) out, err = p.communicate() if p.returncode == 0: if "propagation" in out: @@ -652,7 +681,8 @@ def get_rsync_version(rsync_cmd): rsync_version = "0" p = subprocess.Popen([rsync_cmd, "--version"], stderr=subprocess.PIPE, - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, + universal_newlines=True) out, err = p.communicate() if p.returncode == 0: rsync_version = out.split(" ", 4)[3] @@ -660,17 +690,65 @@ def get_rsync_version(rsync_cmd): return rsync_version +def get_slv_dir_path(slv_host, slv_volume, gfid): + global slv_bricks + + dir_path = ENOENT + pfx = gauxpfx() + + if not slv_bricks: + slv_info = Volinfo(slv_volume, slv_host, master=False) + slv_bricks = slv_info.bricks + # Result of readlink would be of format as below. + # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename" + for brick in slv_bricks: + dir_path = errno_wrap(os.path.join, + [brick['dir'], + ".glusterfs", gfid[0:2], + gfid[2:4], + gfid], [ENOENT], [ESTALE]) + if dir_path != ENOENT: + try: + realpath = errno_wrap(os.readlink, [dir_path], + [ENOENT], [ESTALE]) + if not isinstance(realpath, int): + realpath_parts = realpath.split('/') + pargfid = realpath_parts[-2] + basename = realpath_parts[-1] + dir_entry = os.path.join(pfx, pargfid, basename) + return dir_entry + except OSError: + # .gfid/GFID + gfidpath = unescape_space_newline(os.path.join(pfx, gfid)) + realpath = errno_wrap(Xattr.lgetxattr_buf, + [gfidpath, 'glusterfs.gfid2path'], [ENOENT], [ESTALE]) + if not isinstance(realpath, int): + basename = os.path.basename(realpath).rstrip('\x00') + dirpath = os.path.dirname(realpath) + if dirpath == "/": + pargfid = ROOT_GFID + else: + dirpath = dirpath.strip("/") + pargfid = get_gfid_from_mnt(dirpath) + if isinstance(pargfid, int): + return None + dir_entry = os.path.join(pfx, pargfid, basename) + return dir_entry + + return None + + def lf(event, **kwargs): """ Log Format helper function, log messages can be easily modified to structured log format. lf("Config Change", sync_jobs=4, brick=/bricks/b1) will be - converted as "Config Change<TAB>brick=/bricks/b1<TAB>sync_jobs=4" + converted as "Config Change [{brick=/bricks/b1}, {sync_jobs=4}]" """ - msg = event + msgparts = [] for k, v in kwargs.items(): - msg += "\t{0}={1}".format(k, v) - return msg + msgparts.append("{%s=%s}" % (k, v)) + return "%s [%s]" % (event, ", ".join(msgparts)) class Popen(subprocess.Popen): @@ -807,7 +885,7 @@ class Popen(subprocess.Popen): break b = os.read(self.stderr.fileno(), 1024) if b: - elines.append(b) + elines.append(b.decode()) else: break self.stderr.close() @@ -816,12 +894,31 @@ class Popen(subprocess.Popen): self.errfail() +def host_brick_split(value): + """ + IPv6 compatible way to split and get the host + and brick information. Example inputs: + node1.example.com:/exports/bricks/brick1/brick + fe80::af0f:df82:844f:ef66%utun0:/exports/bricks/brick1/brick + """ + parts = value.split(":") + brick = parts[-1] + hostparts = parts[0:-1] + return (":".join(hostparts), brick) + + class Volinfo(object): - def __init__(self, vol, host='localhost', prelude=[]): - po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, + def __init__(self, vol, host='localhost', prelude=[], master=True): + if master: + gluster_cmd_dir = gconf.get("gluster-command-dir") + else: + gluster_cmd_dir = gconf.get("slave-gluster-command-dir") + + gluster_cmd = os.path.join(gluster_cmd_dir, 'gluster') + po = Popen(prelude + [gluster_cmd, '--xml', '--remote-host=' + host, 'volume', 'info', vol], - stdout=PIPE, stderr=PIPE) + stdout=PIPE, stderr=PIPE, universal_newlines=True) vix = po.stdout.read() po.wait() po.terminate_geterr() @@ -852,7 +949,7 @@ class Volinfo(object): @memoize def bricks(self): def bparse(b): - host, dirp = b.find("name").text.split(':', 2) + host, dirp = host_brick_split(b.find("name").text) return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text} return [bparse(b) for b in self.get('brick')] @@ -884,6 +981,14 @@ class Volinfo(object): else: return int(self.get('disperseCount')[0].text) + def distribution_count(self, tier, hot): + if (tier and hot): + return int(self.get('hotBricks/hotdistCount')[0].text) + elif (tier and not hot): + return int(self.get('coldBricks/colddistCount')[0].text) + else: + return int(self.get('distCount')[0].text) + @property @memoize def hot_bricks(self): @@ -900,7 +1005,7 @@ class VolinfoFromGconf(object): # Glusterd will generate following config items before Geo-rep start # So that Geo-rep need not run gluster commands from inside # Volinfo object API/interface kept as is so that caller need not - # change anything exept calling this instead of Volinfo() + # change anything except calling this instead of Volinfo() # # master-bricks= # master-bricks=NODEID:HOSTNAME:PATH,.. @@ -920,6 +1025,16 @@ class VolinfoFromGconf(object): def is_hot(self, brickpath): return False + def is_uuid(self, value): + try: + uuid.UUID(value) + return True + except ValueError: + return False + + def possible_path(self, value): + return "/" in value + @property @memoize def bricks(self): @@ -933,8 +1048,22 @@ class VolinfoFromGconf(object): out = [] for b in bricks_data: parts = b.split(":") - bpath = parts[2] if len(parts) == 3 else "" - out.append({"host": parts[1], "dir": bpath, "uuid": parts[0]}) + b_uuid = None + if self.is_uuid(parts[0]): + b_uuid = parts[0] + # Set all parts except first + parts = parts[1:] + + if self.possible_path(parts[-1]): + bpath = parts[-1] + # Set all parts except last + parts = parts[0:-1] + + out.append({ + "host": ":".join(parts), # if remaining parts are IPv6 name + "dir": bpath, + "uuid": b_uuid + }) return out @@ -952,6 +1081,9 @@ class VolinfoFromGconf(object): def disperse_count(self, tier, hot): return gconf.get("master-disperse-count") + def distribution_count(self, tier, hot): + return gconf.get("master-distribution-count") + @property @memoize def hot_bricks(self): |
