diff options
author | Aravinda VK <avishwan@redhat.com> | 2019-10-23 10:10:12 +0530 |
---|---|---|
committer | Amar Tumballi <amarts@gmail.com> | 2019-11-07 06:24:39 +0000 |
commit | 0fc68040b72fc94dec3874345547e294b9ec1f45 (patch) | |
tree | ea053550c79c3903804a7e3c3812551a41e17b01 /geo-replication/syncdaemon/libgfchangelog.py | |
parent | 0ab6c178468b6cce095c54ab62cfa51162d01fcc (diff) |
georep: Merge Worker and Agent as a single process
- libgfchangelog is simplified by removing unnecessary API Class
- Merged Agent logic into Worker instead of running Worker and Agent as
two separate processes and maintaining RPC between Worker and Agent.
- Geo-rep command Pause and Resume will continue without any changes.
But Agent functionality also gets paused with that.
Updates: #755
Change-Id: Ie2c00fa7dddf21f180f0649e0aaf084d29023c98
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/libgfchangelog.py')
-rw-r--r-- | geo-replication/syncdaemon/libgfchangelog.py | 247 |
1 files changed, 124 insertions, 123 deletions
diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py index 8d129567075..34beadb3552 100644 --- a/geo-replication/syncdaemon/libgfchangelog.py +++ b/geo-replication/syncdaemon/libgfchangelog.py @@ -16,126 +16,127 @@ from py2py3 import gr_cl_history_changelog, gr_cl_done, gr_create_string_buffer from py2py3 import gr_cl_register, gr_cl_history_done, bytearray_to_str -class Changes(object): - libgfc = CDLL(find_library("gfchangelog"), mode=RTLD_GLOBAL, - use_errno=True) - - @classmethod - def geterrno(cls): - return get_errno() - - @classmethod - def raise_changelog_err(cls): - errn = cls.geterrno() - raise ChangelogException(errn, os.strerror(errn)) - - @classmethod - def _get_api(cls, call): - return getattr(cls.libgfc, call) - - @classmethod - def cl_init(cls): - ret = cls._get_api('gf_changelog_init')(None) - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_register(cls, brick, path, log_file, log_level, retries=0): - ret = gr_cl_register(cls, brick, path, log_file, log_level, retries) - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_scan(cls): - ret = cls._get_api('gf_changelog_scan')() - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_startfresh(cls): - ret = cls._get_api('gf_changelog_start_fresh')() - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_getchanges(cls): - """ remove hardcoding for path name length """ - def clsort(f): - return f.split('.')[-1] - changes = [] - buf = gr_create_string_buffer(4096) - call = cls._get_api('gf_changelog_next_change') - - while True: - ret = call(buf, 4096) - if ret in (0, -1): - break - # py2 and py3 compatibility - result = bytearray_to_str(buf.raw[:ret - 1]) - changes.append(result) - if ret == -1: - cls.raise_changelog_err() - # cleanup tracker - cls.cl_startfresh() - return sorted(changes, key=clsort) - - @classmethod - def cl_done(cls, clfile): - ret = gr_cl_done(cls, clfile) - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_history_scan(cls): - ret = cls._get_api('gf_history_changelog_scan')() - if ret == -1: - cls.raise_changelog_err() - - return ret - - @classmethod - def cl_history_changelog(cls, changelog_path, start, end, num_parallel): - actual_end = c_ulong() - ret = gr_cl_history_changelog(cls, changelog_path, start, end, - num_parallel, byref(actual_end)) - if ret == -1: - cls.raise_changelog_err() - - if ret == -2: - raise ChangelogHistoryNotAvailable() - - return (ret, actual_end.value) - - @classmethod - def cl_history_startfresh(cls): - ret = cls._get_api('gf_history_changelog_start_fresh')() - if ret == -1: - cls.raise_changelog_err() - - @classmethod - def cl_history_getchanges(cls): - """ remove hardcoding for path name length """ - def clsort(f): - return f.split('.')[-1] - - changes = [] - buf = gr_create_string_buffer(4096) - call = cls._get_api('gf_history_changelog_next_change') - - while True: - ret = call(buf, 4096) - if ret in (0, -1): - break - # py2 and py3 compatibility - result = bytearray_to_str(buf.raw[:ret - 1]) - changes.append(result) - if ret == -1: - cls.raise_changelog_err() - - return sorted(changes, key=clsort) - - @classmethod - def cl_history_done(cls, clfile): - ret = gr_cl_history_done(cls, clfile) - if ret == -1: - cls.raise_changelog_err() +libgfc = CDLL( + find_library("gfchangelog"), + mode=RTLD_GLOBAL, + use_errno=True +) + + +def _raise_changelog_err(): + errn = get_errno() + raise ChangelogException(errn, os.strerror(errn)) + + +def _init(): + if libgfc.gf_changelog_init(None) == -1: + _raise_changelog_err() + + +def register(brick, path, log_file, log_level, retries=0): + _init() + + ret = gr_cl_register(libgfc, brick, path, log_file, log_level, retries) + + if ret == -1: + _raise_changelog_err() + + +def scan(): + ret = libgfc.gf_changelog_scan() + if ret == -1: + _raise_changelog_err() + + +def startfresh(): + ret = libgfc.gf_changelog_start_fresh() + if ret == -1: + _raise_changelog_err() + + +def getchanges(): + def clsort(cfile): + return cfile.split('.')[-1] + + changes = [] + buf = gr_create_string_buffer(4096) + call = libgfc.gf_changelog_next_change + + while True: + ret = call(buf, 4096) + if ret in (0, -1): + break + + # py2 and py3 compatibility + result = bytearray_to_str(buf.raw[:ret - 1]) + changes.append(result) + + if ret == -1: + _raise_changelog_err() + + # cleanup tracker + startfresh() + + return sorted(changes, key=clsort) + + +def done(clfile): + ret = gr_cl_done(libgfc, clfile) + if ret == -1: + _raise_changelog_err() + + +def history_scan(): + ret = libgfc.gf_history_changelog_scan() + if ret == -1: + _raise_changelog_err() + + return ret + + +def history_changelog(changelog_path, start, end, num_parallel): + actual_end = c_ulong() + ret = gr_cl_history_changelog(libgfc, changelog_path, start, end, + num_parallel, byref(actual_end)) + if ret == -1: + _raise_changelog_err() + + if ret == -2: + raise ChangelogHistoryNotAvailable() + + return (ret, actual_end.value) + + +def history_startfresh(): + ret = libgfc.gf_history_changelog_start_fresh() + if ret == -1: + _raise_changelog_err() + + +def history_getchanges(): + def clsort(cfile): + return cfile.split('.')[-1] + + changes = [] + buf = gr_create_string_buffer(4096) + call = libgfc.gf_history_changelog_next_change + + while True: + ret = call(buf, 4096) + if ret in (0, -1): + break + + # py2 and py3 compatibility + result = bytearray_to_str(buf.raw[:ret - 1]) + changes.append(result) + + if ret == -1: + _raise_changelog_err() + + return sorted(changes, key=clsort) + + +def history_done(clfile): + ret = gr_cl_history_done(libgfc, clfile) + if ret == -1: + _raise_changelog_err() |