From b13c483dca20e4015b958f8959328e665a357f60 Mon Sep 17 00:00:00 2001 From: Avra Sengupta Date: Sat, 1 Jun 2013 16:17:57 +0530 Subject: gsyncd: distribute the crawling load * also consume changelog for change detection. * Status fixes * Use new libgfchangelog done API * process (and sync) one changelog at a time Change-Id: I24891615bb762e0741b1819ddfdef8802326cb16 BUG: 847839 Original Author: Csaba Henk Original Author: Aravinda VK Original Author: Venky Shankar Original Author: Amar Tumballi Original Author: Avra Sengupta Signed-off-by: Avra Sengupta Reviewed-on: http://review.gluster.org/5131 Reviewed-by: Vijay Bellur Tested-by: Vijay Bellur --- geo-replication/syncdaemon/syncdutils.py | 102 ++++++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 1 deletion(-) (limited to 'geo-replication/syncdaemon/syncdutils.py') diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 0764c07904d..720200018e5 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -5,8 +5,9 @@ import time import fcntl import shutil import logging +import socket from threading import Lock, Thread as baseThread -from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, errorcode +from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, ENOENT, EPERM, ESTALE, errorcode from signal import signal, SIGTERM, SIGKILL from time import sleep import select as oselect @@ -25,6 +26,15 @@ try: except ImportError: import urllib +try: + from hashlib import md5 as md5 +except ImportError: + # py 2.4 + from md5 import new as md5 + +# auxillary gfid based access prefix +_CL_AUX_GFID_PFX = ".gfid/" + def escape(s): """the chosen flavor of string escaping, used all over to turn whatever data to creatable representation""" @@ -286,3 +296,93 @@ def waitpid (*a): def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})): signal(SIGTERM, hook) + +def is_host_local(host): + locaddr = False + for ai in socket.getaddrinfo(host, None): + # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators/mgmt/glusterd/src/glusterd-utils.c#L125 + if ai[0] == socket.AF_INET: + if ai[-1][0].split(".")[0] == "127": + locaddr = True + break + elif ai[0] == socket.AF_INET6: + if ai[-1][0] == "::1": + locaddr = True + break + else: + continue + try: + # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue, + # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587 + s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP) + except socket.error: + ex = sys.exc_info()[1] + if ex.errno != EPERM: + raise + f = None + try: + f = open("/proc/sys/net/ipv4/ip_nonlocal_bind") + if int(f.read()) != 0: + raise GsyncdError( + "non-local bind is set and not allowed to create raw sockets, " + "cannot determine if %s is local" % host) + s = socket.socket(ai[0], socket.SOCK_DGRAM) + finally: + if f: + f.close() + try: + s.bind(ai[-1]) + locaddr = True + break + except: + pass + s.close() + return locaddr + +def funcode(f): + fc = getattr(f, 'func_code', None) + if not fc: + # python 3 + fc = f.__code__ + return fc + +def memoize(f): + fc = funcode(f) + fn = fc.co_name + def ff(self, *a, **kw): + rv = getattr(self, '_' + fn, None) + if rv == None: + rv = f(self, *a, **kw) + setattr(self, '_' + fn, rv) + return rv + return ff + +def umask(): + return os.umask(0) + +def entry2pb(e): + return e.rsplit('/', 1) + +def gauxpfx(): + return _CL_AUX_GFID_PFX + +def md5hex(s): + return md5(s).hexdigest() + +def selfkill(sig=SIGTERM): + os.kill(os.getpid(), sig) + +def errno_wrap(call, arg=[], errnos=[]): + """ wrapper around calls resilient to errnos. + retry in case of ESTALE + """ + while True: + try: + return call(*arg) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in errnos: + return ex.errno + if not ex.errno == ESTALE: + raise + time.sleep(0.5) # retry the call -- cgit