summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r--geo-replication/syncdaemon/resource.py189
1 files changed, 34 insertions, 155 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 0ca023cd8c5..a9810ae325b 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -15,18 +15,15 @@ import stat
import time
import signal
import fcntl
-import errno
import types
import struct
import socket
import logging
import tempfile
-import threading
import subprocess
import errno
from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES
from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM
-from select import error as SelectError
import shutil
from gconf import gconf
@@ -43,7 +40,7 @@ from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
from syncdutils import GX_GFID_CANONICAL_LEN
from gsyncdstatus import GeorepStatus
from syncdutils import get_master_and_slave_data_from_args
-from syncdutils import mntpt_list, lf
+from syncdutils import mntpt_list, lf, Popen, sup, Volinfo
from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
@@ -52,14 +49,9 @@ UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
-def sup(x, *a, **kw):
- """a rubyesque "super" for python ;)
-
- invoke caller method in parent class with given args.
- """
- return getattr(super(type(x), x),
- sys._getframe(1).f_code.co_name)(*a, **kw)
-
+slv_volume = None
+slv_host = None
+slv_bricks = None
def desugar(ustr):
"""transform sugared url strings to standard <scheme>://<urlbody> form
@@ -114,149 +106,6 @@ def parse_url(ustr):
return getattr(this, sch.upper())(path)
-class Popen(subprocess.Popen):
-
- """customized subclass of subprocess.Popen with a ring
- buffer for children error output"""
-
- @classmethod
- def init_errhandler(cls):
- """start the thread which handles children's error output"""
- cls.errstore = {}
-
- def tailer():
- while True:
- errstore = cls.errstore.copy()
- try:
- poe, _, _ = select(
- [po.stderr for po in errstore], [], [], 1)
- except (ValueError, SelectError):
- # stderr is already closed wait for some time before
- # checking next error
- time.sleep(0.5)
- continue
- for po in errstore:
- if po.stderr not in poe:
- continue
- po.lock.acquire()
- try:
- if po.on_death_row:
- continue
- la = errstore[po]
- try:
- fd = po.stderr.fileno()
- except ValueError: # file is already closed
- time.sleep(0.5)
- continue
-
- try:
- l = os.read(fd, 1024)
- except OSError:
- time.sleep(0.5)
- continue
-
- if not l:
- continue
- tots = len(l)
- for lx in la:
- tots += len(lx)
- while tots > 1 << 20 and la:
- tots -= len(la.pop(0))
- la.append(l)
- finally:
- po.lock.release()
- t = syncdutils.Thread(target=tailer)
- t.start()
- cls.errhandler = t
-
- @classmethod
- def fork(cls):
- """fork wrapper that restarts errhandler thread in child"""
- pid = os.fork()
- if not pid:
- cls.init_errhandler()
- return pid
-
- def __init__(self, args, *a, **kw):
- """customizations for subprocess.Popen instantiation
-
- - 'close_fds' is taken to be the default
- - if child's stderr is chosen to be managed,
- register it with the error handler thread
- """
- self.args = args
- if 'close_fds' not in kw:
- kw['close_fds'] = True
- self.lock = threading.Lock()
- self.on_death_row = False
- self.elines = []
- try:
- sup(self, args, *a, **kw)
- except:
- ex = sys.exc_info()[1]
- if not isinstance(ex, OSError):
- raise
- raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
- (args[0], errno.errorcode[ex.errno],
- os.strerror(ex.errno)))
- if kw.get('stderr') == subprocess.PIPE:
- assert(getattr(self, 'errhandler', None))
- self.errstore[self] = []
-
- def errlog(self):
- """make a log about child's failure event"""
- logging.error(lf("command returned error",
- cmd=" ".join(self.args),
- error=self.returncode))
- lp = ''
-
- def logerr(l):
- logging.error(self.args[0] + "> " + l)
- for l in self.elines:
- ls = l.split('\n')
- ls[0] = lp + ls[0]
- lp = ls.pop()
- for ll in ls:
- logerr(ll)
- if lp:
- logerr(lp)
-
- def errfail(self):
- """fail nicely if child did not terminate with success"""
- self.errlog()
- syncdutils.finalize(exval=1)
-
- def terminate_geterr(self, fail_on_err=True):
- """kill child, finalize stderr harvesting (unregister
- from errhandler, set up .elines), fail on error if
- asked for
- """
- self.lock.acquire()
- try:
- self.on_death_row = True
- finally:
- self.lock.release()
- elines = self.errstore.pop(self)
- if self.poll() is None:
- self.terminate()
- if self.poll() is None:
- time.sleep(0.1)
- self.kill()
- self.wait()
- while True:
- if not select([self.stderr], [], [], 0.1)[0]:
- break
- b = os.read(self.stderr.fileno(), 1024)
- if b:
- elines.append(b)
- else:
- break
- self.stderr.close()
- self.elines = elines
- if fail_on_err and self.returncode != 0:
- self.errfail()
-
-
class Server(object):
"""singleton implemening those filesystem access primitives
@@ -776,6 +625,31 @@ class Server(object):
if isinstance(st, int):
blob = entry_pack_mkdir(
gfid, bname, e['mode'], e['uid'], e['gid'])
+ else:
+ # If gfid of a directory exists on slave but path based
+ # create is getting EEXIST. This means the directory is
+ # renamed in master but recorded as MKDIR during hybrid
+ # crawl. Get the directory path by reading the backend
+ # symlink and trying to rename to new name as said by
+ # master.
+ global slv_bricks
+ global slv_volume
+ global slv_host
+ if not slv_bricks:
+ slv_info = Volinfo (slv_volume, slv_host)
+ slv_bricks = slv_info.bricks
+ # Result of readlink would be of format as below.
+ # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
+ realpath = os.readlink(os.path.join(slv_bricks[0]['dir'],
+ ".glusterfs", gfid[0:2],
+ gfid[2:4], gfid))
+ realpath_parts = realpath.split('/')
+ src_pargfid = realpath_parts[-2]
+ src_basename = realpath_parts[-1]
+ src_entry = os.path.join(pfx, src_pargfid, src_basename)
+ logging.info(lf("Special case: rename on mkdir",
+ gfid=gfid, entry=repr(entry)))
+ rename_with_disk_gfid_confirmation(gfid, src_entry, entry)
elif op == 'LINK':
slink = os.path.join(pfx, gfid)
st = lstat(slink)
@@ -1309,6 +1183,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def __init__(self, path):
self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern)
+ global slv_volume
+ global slv_host
+ slv_volume = self.volume
+ slv_host = self.host
+
def canonical_path(self):
return ':'.join([gethostbyname(self.host), self.volume])