diff options
-rw-r--r-- | cli/src/cli-cmd-parser.c | 4 | ||||
-rw-r--r-- | cli/src/cli-rpc-ops.c | 80 | ||||
-rw-r--r-- | cli/src/cli-xml-output.c | 68 | ||||
-rw-r--r-- | cli/src/cli.h | 3 | ||||
-rw-r--r-- | geo-replication/syncdaemon/Makefile.am | 3 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 25 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncdstatus.py | 317 | ||||
-rw-r--r-- | geo-replication/syncdaemon/master.py | 374 | ||||
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 58 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 12 | ||||
-rw-r--r-- | geo-replication/tests/unit/test_gsyncdstatus.py | 193 | ||||
-rw-r--r-- | rpc/rpc-lib/src/protocol-common.h | 18 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-geo-rep.c | 479 |
13 files changed, 917 insertions, 717 deletions
diff --git a/cli/src/cli-cmd-parser.c b/cli/src/cli-cmd-parser.c index 65ccfcdcc5f..d70f6ee1e30 100644 --- a/cli/src/cli-cmd-parser.c +++ b/cli/src/cli-cmd-parser.c @@ -2274,8 +2274,8 @@ config_parse (const char **words, int wordcount, dict_t *dict, ret = -1; goto out; } - snprintf (append_str, 300, "now:%" GF_PRI_SECOND ".%06"GF_PRI_SUSECONDS, - tv.tv_sec, tv.tv_usec); + snprintf (append_str, 300, "%" GF_PRI_SECOND, + tv.tv_sec); } ret = dict_set_dynstr (dict, "op_value", append_str); diff --git a/cli/src/cli-rpc-ops.c b/cli/src/cli-rpc-ops.c index 091608d8881..49c8761bbab 100644 --- a/cli/src/cli-rpc-ops.c +++ b/cli/src/cli-rpc-ops.c @@ -4406,18 +4406,24 @@ get_struct_variable (int mem_num, gf_gsync_status_t *sts_val) case 1: return (sts_val->master); case 2: return (sts_val->brick); case 3: return (sts_val->slave_user); - case 4: return (sts_val->slave_node); - case 5: return (sts_val->worker_status); - case 6: return (sts_val->checkpoint_status); + case 4: return (sts_val->slave); + case 5: return (sts_val->slave_node); + case 6: return (sts_val->worker_status); case 7: return (sts_val->crawl_status); - case 8: return (sts_val->files_syncd); - case 9: return (sts_val->files_remaining); - case 10: return (sts_val->bytes_remaining); - case 11: return (sts_val->purges_remaining); - case 12: return (sts_val->total_files_skipped); - case 13: return (sts_val->brick_host_uuid); - case 14: return (sts_val->slavekey); - case 15: return (sts_val->session_slave); + case 8: return (sts_val->last_synced); + case 9: return (sts_val->entry); + case 10: return (sts_val->data); + case 11: return (sts_val->meta); + case 12: return (sts_val->failures); + case 13: return (sts_val->checkpoint_time); + case 14: return (sts_val->checkpoint_completed); + case 15: return (sts_val->checkpoint_completion_time); + case 16: return (sts_val->brick_host_uuid); + case 17: return (sts_val->last_synced_utc); + case 18: return (sts_val->checkpoint_time_utc); + case 19: return (sts_val->checkpoint_completion_time_utc); + case 20: return (sts_val->slavekey); + case 21: return (sts_val->session_slave); default: goto out; } @@ -4435,7 +4441,7 @@ gf_cli_print_status (char **title_values, int i = 0; int j = 0; int ret = 0; - int status_fields = 7; /* Indexed at 0 */ + int status_fields = 8; /* Indexed at 0 */ int total_spacing = 0; char **output_values = NULL; char *tmp = NULL; @@ -4494,13 +4500,15 @@ gf_cli_print_status (char **title_values, strlen(title_values[j])); output_values[j][spacing[j]] = '\0'; } - cli_out ("%s %s %s %s %s %s %s %s %s %s %s %s", + cli_out ("%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s", output_values[0], output_values[1], output_values[2], output_values[3], output_values[4], output_values[5], output_values[6], output_values[7], output_values[8], output_values[9], - output_values[10], output_values[11]); + output_values[10], output_values[11], + output_values[12], output_values[13], + output_values[14], output_values[15]); /* setting and printing the hyphens */ memset (hyphens, '-', total_spacing); @@ -4527,13 +4535,15 @@ gf_cli_print_status (char **title_values, output_values[j][spacing[j]] = '\0'; } - cli_out ("%s %s %s %s %s %s %s %s %s %s %s %s", + cli_out ("%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s", output_values[0], output_values[1], output_values[2], output_values[3], output_values[4], output_values[5], output_values[6], output_values[7], output_values[8], output_values[9], - output_values[10], output_values[11]); + output_values[10], output_values[11], + output_values[12], output_values[13], + output_values[14], output_values[15]); } out: @@ -4552,6 +4562,23 @@ out: } int +gf_gsync_status_t_comparator (const void *p, const void *q) +{ + char *slavekey1 = NULL; + char *slavekey2 = NULL; + + slavekey1 = get_struct_variable (20, (*(gf_gsync_status_t **)p)); + slavekey2 = get_struct_variable (20, (*(gf_gsync_status_t **)q)); + if (!slavekey1 || !slavekey2) { + gf_log ("cli", GF_LOG_ERROR, + "struct member empty."); + return 0; + } + + return strcmp (slavekey1, slavekey2); +} + +int gf_cli_read_status_data (dict_t *dict, gf_gsync_status_t **sts_vals, int *spacing, int gsync_count, @@ -4586,6 +4613,11 @@ gf_cli_read_status_data (dict_t *dict, } } + /* Sort based on Session Slave */ + qsort(sts_vals, gsync_count, + sizeof(gf_gsync_status_t *), + gf_gsync_status_t_comparator); + out: return ret; } @@ -4596,18 +4628,20 @@ gf_cli_gsync_status_output (dict_t *dict, gf_boolean_t is_detail) int gsync_count = 0; int i = 0; int ret = 0; - int spacing[13] = {0}; - int num_of_fields = 13; + int spacing[16] = {0}; + int num_of_fields = 16; char errmsg[1024] = ""; char *master = NULL; char *slave = NULL; char *title_values[] = {"MASTER NODE", "MASTER VOL", "MASTER BRICK", "SLAVE USER", - "SLAVE", - "STATUS", "CHECKPOINT STATUS", - "CRAWL STATUS", "FILES SYNCD", - "FILES PENDING", "BYTES PENDING", - "DELETES PENDING", "FILES SKIPPED"}; + "SLAVE", "SLAVE NODE", + "STATUS", "CRAWL STATUS", + "LAST_SYNCED", "ENTRY", + "DATA", "META", "FAILURES", + "CHECKPOINT TIME", + "CHECKPOINT COMPLETED", + "CHECKPOINT COMPLETION TIME"}; gf_gsync_status_t **sts_vals = NULL; /* Checks if any session is active or not */ diff --git a/cli/src/cli-xml-output.c b/cli/src/cli-xml-output.c index cbb4c1f58e7..d7322d5bb0d 100644 --- a/cli/src/cli-xml-output.c +++ b/cli/src/cli-xml-output.c @@ -3839,25 +3839,6 @@ out: #if (HAVE_LIB_XML) int -gf_gsync_status_t_comparator (const void *p, const void *q) -{ - char *master1 = NULL; - char *master2 = NULL; - - master1 = get_struct_variable (1, (*(gf_gsync_status_t **)p)); - master2 = get_struct_variable (1, (*(gf_gsync_status_t **)q)); - if (!master1 || !master2) { - gf_log ("cli", GF_LOG_ERROR, - "struct member empty."); - return 0; - } - - return strcmp (master1,master2); -} -#endif - -#if (HAVE_LIB_XML) -int cli_xml_output_vol_gsync_status (dict_t *dict, xmlTextWriterPtr writer) { @@ -3865,8 +3846,7 @@ cli_xml_output_vol_gsync_status (dict_t *dict, int i = 1; int j = 0; int count = 0; - const int number_of_fields = 13; - const int number_of_basic_fields = 8; + const int number_of_fields = 20; int closed = 1; int session_closed = 1; gf_gsync_status_t **status_values = NULL; @@ -3878,18 +3858,31 @@ cli_xml_output_vol_gsync_status (dict_t *dict, char *slave = NULL; char *slave_next = NULL; char *title_values[] = {"master_node", - "master_node_uuid", + "", "master_brick", "slave_user", "slave", + "slave_node", "status", - "checkpoint_status", "crawl_status", - "files_syncd", - "files_pending", - "bytes_pending", - "deletes_pending", - "files_skipped"}; + /* last_synced */ + "", + "entry", + "data", + "meta", + "failures", + /* checkpoint_time */ + "", + "checkpoint_completed", + /* checkpoint_completion_time */ + "", + "master_node_uuid", + /* last_synced_utc */ + "last_synced", + /* checkpoint_time_utc */ + "checkpoint_time", + /* checkpoint_completion_time_utc */ + "checkpoint_completion_time"}; GF_ASSERT (dict); @@ -3963,7 +3956,7 @@ cli_xml_output_vol_gsync_status (dict_t *dict, session_closed = 0; - tmp = get_struct_variable (15, status_values[i]); + tmp = get_struct_variable (21, status_values[i]); if (!tmp) { gf_log ("cli", GF_LOG_ERROR, "struct member empty."); @@ -3980,18 +3973,11 @@ cli_xml_output_vol_gsync_status (dict_t *dict, XML_RET_CHECK_AND_GOTO (ret, out); for (j = 0; j < number_of_fields; j++) { - // if detail option is not set and field is not under - // basic fields or if field is volume then skip - if(!status_detail && j >= number_of_basic_fields) + /* XML ignore fields */ + if (strcmp(title_values[j], "") == 0) continue; - // Displaying the master_node uuid as second field - - if (j == 1) - tmp = get_struct_variable (13, - status_values[i]); - else - tmp = get_struct_variable (j, status_values[i]); + tmp = get_struct_variable (j, status_values[i]); if (!tmp) { gf_log ("cli", GF_LOG_ERROR, "struct member empty."); @@ -4009,8 +3995,8 @@ cli_xml_output_vol_gsync_status (dict_t *dict, XML_RET_CHECK_AND_GOTO (ret, out); if (i+1 < count) { - slave = get_struct_variable (14, status_values[i]); - slave_next = get_struct_variable (14, + slave = get_struct_variable (20, status_values[i]); + slave_next = get_struct_variable (20, status_values[i+1]); volume = get_struct_variable (1, status_values[i]); volume_next = get_struct_variable (1, diff --git a/cli/src/cli.h b/cli/src/cli.h index c0750f2dd74..243935230d1 100644 --- a/cli/src/cli.h +++ b/cli/src/cli.h @@ -449,4 +449,7 @@ print_quota_list_header (int type); void print_quota_list_empty (char *path, int type); +int +gf_gsync_status_t_comparator (const void *p, const void *q); + #endif /* __CLI_H__ */ diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am index 885963eae2b..ed0f5e40924 100644 --- a/geo-replication/syncdaemon/Makefile.am +++ b/geo-replication/syncdaemon/Makefile.am @@ -2,6 +2,7 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \ resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ - $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py + $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py \ + gsyncdstatus.py CLEANFILES = diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index b9ee5aec8c7..32e4eb7828d 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -27,12 +27,13 @@ from ipaddr import IPAddress, IPNetwork from gconf import gconf from syncdutils import FreeObject, norm, grabpidfile, finalize -from syncdutils import log_raise_exception, privileged, update_file +from syncdutils import log_raise_exception, privileged from syncdutils import GsyncdError, select, set_term_handler from configinterface import GConffile, upgrade_config_file import resource from monitor import monitor from changelogagent import agent, Changelog +from gsyncdstatus import set_monitor_status, GeorepStatus class GLogger(Logger): @@ -267,7 +268,7 @@ def main_i(): op.add_option('--socketdir', metavar='DIR') op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs) - op.add_option('--checkpoint', metavar='LABEL', default='') + op.add_option('--checkpoint', metavar='LABEL', default='0') # tunables for failover/failback mechanism: # None - gsyncd behaves as normal @@ -315,6 +316,8 @@ def main_i(): action='callback', callback=store_local) op.add_option('--delete', dest='delete', action='callback', callback=store_local_curry(True)) + op.add_option('--status-get', dest='status_get', action='callback', + callback=store_local_curry(True)) op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a), setattr( @@ -583,15 +586,8 @@ def main_i(): GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf') if confdata.op == 'set': logging.info('checkpoint %s set' % confdata.val) - gcnf.delete('checkpoint_completed') - gcnf.delete('checkpoint_target') elif confdata.op == 'del': logging.info('checkpoint info was reset') - # if it is removing 'checkpoint' then we need - # to remove 'checkpoint_completed' and 'checkpoint_target' too - gcnf.delete('checkpoint_completed') - gcnf.delete('checkpoint_target') - except IOError: if sys.exc_info()[1].errno == ENOENT: # directory of log path is not present, @@ -607,7 +603,7 @@ def main_i(): create = rconf.get('create') if create: if getattr(gconf, 'state_file', None): - update_file(gconf.state_file, lambda f: f.write(create + '\n')) + set_monitor_status(gconf.state_file, create) return go_daemon = rconf['go_daemon'] @@ -615,6 +611,15 @@ def main_i(): be_agent = rconf.get('agent') rscs, local, remote = makersc(args) + + status_get = rconf.get('status_get') + if status_get: + for brick in gconf.path: + brick_status = GeorepStatus(gconf.state_file, brick) + checkpoint_time = int(getattr(gconf, "checkpoint", "0")) + brick_status.print_status(checkpoint_time=checkpoint_time) + return + if not be_monitor and isinstance(remote, resource.SSH) and \ go_daemon == 'should': go_daemon = 'postconn' diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py new file mode 100644 index 00000000000..a49b9c23dea --- /dev/null +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import fcntl +import os +import tempfile +import urllib +import json +import time +from datetime import datetime + +DEFAULT_STATUS = "N/A" +MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped") +STATUS_VALUES = (DEFAULT_STATUS, + "Initializing...", + "Active", + "Passive", + "Faulty") + +CRAWL_STATUS_VALUES = (DEFAULT_STATUS, + "Hybrid Crawl", + "History Crawl", + "Changelog Crawl") + + +def human_time(ts): + try: + return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + return DEFAULT_STATUS + + +def human_time_utc(ts): + try: + return datetime.utcfromtimestamp( + float(ts)).strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + return DEFAULT_STATUS + + +def get_default_values(): + return { + "slave_node": DEFAULT_STATUS, + "worker_status": DEFAULT_STATUS, + "last_synced": 0, + "crawl_status": DEFAULT_STATUS, + "entry": 0, + "data": 0, + "meta": 0, + "failures": 0, + "checkpoint_completed": DEFAULT_STATUS, + "checkpoint_time": 0, + "checkpoint_completion_time": 0} + + +class LockedOpen(object): + + def __init__(self, filename, *args, **kwargs): + self.filename = filename + self.open_args = args + self.open_kwargs = kwargs + self.fileobj = None + + def __enter__(self): + """ + If two processes compete to update a file, The first process + gets the lock and the second process is blocked in the fcntl.flock() + call. When first process replaces the file and releases the lock, + the already open file descriptor in the second process now points + to a "ghost" file(not reachable by any path name) with old contents. + To avoid that conflict, check the fd already opened is same or + not. Open new one if not same + """ + f = open(self.filename, *self.open_args, **self.open_kwargs) + while True: + fcntl.flock(f, fcntl.LOCK_EX) + fnew = open(self.filename, *self.open_args, **self.open_kwargs) + if os.path.sameopenfile(f.fileno(), fnew.fileno()): + fnew.close() + break + else: + f.close() + f = fnew + self.fileobj = f + return f + + def __exit__(self, _exc_type, _exc_value, _traceback): + self.fileobj.close() + + +def set_monitor_status(status_file, status): + fd = os.open(status_file, os.O_CREAT | os.O_RDWR) + os.close(fd) + with LockedOpen(status_file, 'r+'): + with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(status_file), + delete=False) as tf: + tf.write(status) + tempname = tf.name + + os.rename(tempname, status_file) + dirfd = os.open(os.path.dirname(os.path.abspath(status_file)), + os.O_DIRECTORY) + os.fsync(dirfd) + os.close(dirfd) + + +class GeorepStatus(object): + def __init__(self, monitor_status_file, brick): + self.work_dir = os.path.dirname(monitor_status_file) + self.monitor_status_file = monitor_status_file + self.filename = os.path.join(self.work_dir, + "brick_%s.status" + % urllib.quote_plus(brick)) + + fd = os.open(self.filename, os.O_CREAT | os.O_RDWR) + os.close(fd) + fd = os.open(self.monitor_status_file, os.O_CREAT | os.O_RDWR) + os.close(fd) + self.brick = brick + self.default_values = get_default_values() + + def _update(self, mergerfunc): + with LockedOpen(self.filename, 'r+') as f: + try: + data = json.load(f) + except ValueError: + data = self.default_values + + data = mergerfunc(data) + with tempfile.NamedTemporaryFile( + 'w', + dir=os.path.dirname(self.filename), + delete=False) as tf: + tf.write(data) + tempname = tf.name + + os.rename(tempname, self.filename) + dirfd = os.open(os.path.dirname(os.path.abspath(self.filename)), + os.O_DIRECTORY) + os.fsync(dirfd) + os.close(dirfd) + + def reset_on_worker_start(self): + def merger(data): + data["slave_node"] = DEFAULT_STATUS + data["crawl_status"] = DEFAULT_STATUS + data["entry"] = 0 + data["data"] = 0 + data["meta"] = 0 + return json.dumps(data) + + self._update(merger) + + def set_field(self, key, value): + def merger(data): + data[key] = value + return json.dumps(data) + + self._update(merger) + + def set_last_synced(self, value, checkpoint_time): + def merger(data): + data["last_synced"] = value[0] + + # If checkpoint is not set or reset + # or if last set checkpoint is changed + if checkpoint_time == 0 or \ + checkpoint_time != data["checkpoint_time"]: + data["checkpoint_time"] = 0 + data["checkpoint_completion_time"] = 0 + data["checkpoint_completed"] = "No" + + # If checkpoint is completed and not marked as completed + # previously then update the checkpoint completed time + if checkpoint_time > 0 and checkpoint_time <= value[0]: + if data["checkpoint_completed"] == "No": + data["checkpoint_time"] = checkpoint_time + data["checkpoint_completion_time"] = int(time.time()) + data["checkpoint_completed"] = "Yes" + return json.dumps(data) + + self._update(merger) + + def set_worker_status(self, status): + self.set_field("worker_status", status) + + def set_worker_crawl_status(self, status): + self.set_field("crawl_status", status) + + def set_slave_node(self, slave_node): + def merger(data): + data["slave_node"] = slave_node + return json.dumps(data) + + self._update(merger) + + def inc_value(self, key, value): + def merger(data): + data[key] = data.get(key, 0) + value + return json.dumps(data) + + self._update(merger) + + def dec_value(self, key, value): + def merger(data): + data[key] = data.get(key, 0) - value + if data[key] < 0: + data[key] = 0 + return json.dumps(data) + + self._update(merger) + + def set_active(self): + self.set_field("worker_status", "Active") + + def set_passive(self): + self.set_field("worker_status", "Passive") + + def get_monitor_status(self): + data = "" + with open(self.monitor_status_file, "r") as f: + data = f.read().strip() + return data + + def get_status(self, checkpoint_time=0): + """ + Monitor Status ---> Created Started Paused Stopped + ---------------------------------------------------------------------- + slave_node N/A VALUE VALUE N/A + status Created VALUE Paused Stopped + last_synced N/A VALUE VALUE VALUE + crawl_status N/A VALUE N/A N/A + entry N/A VALUE N/A N/A + data N/A VALUE N/A N/A + meta N/A VALUE N/A N/A + failures N/A VALUE VALUE VALUE + checkpoint_completed N/A VALUE VALUE VALUE + checkpoint_time N/A VALUE VALUE VALUE + checkpoint_completed_time N/A VALUE VALUE VALUE + """ + data = self.default_values + with open(self.filename) as f: + try: + data.update(json.load(f)) + except ValueError: + pass + monitor_status = self.get_monitor_status() + + if monitor_status in ["Created", "Paused", "Stopped"]: + data["worker_status"] = monitor_status + + # Checkpoint adjustments + if checkpoint_time == 0: + data["checkpoint_completed"] = DEFAULT_STATUS + data["checkpoint_time"] = DEFAULT_STATUS + data["checkpoint_completion_time"] = DEFAULT_STATUS + else: + if checkpoint_time != data["checkpoint_time"]: + if checkpoint_time <= data["last_synced"]: + data["checkpoint_completed"] = "Yes" + data["checkpoint_time"] = checkpoint_time + data["checkpoint_completion_time"] = data["last_synced"] + else: + data["checkpoint_completed"] = "No" + data["checkpoint_time"] = checkpoint_time + data["checkpoint_completion_time"] = DEFAULT_STATUS + + if data["checkpoint_time"] not in [0, DEFAULT_STATUS]: + chkpt_time = data["checkpoint_time"] + data["checkpoint_time"] = human_time(chkpt_time) + data["checkpoint_time_utc"] = human_time_utc(chkpt_time) + + if data["checkpoint_completion_time"] not in [0, DEFAULT_STATUS]: + chkpt_completion_time = data["checkpoint_completion_time"] + data["checkpoint_completion_time"] = human_time( + chkpt_completion_time) + data["checkpoint_completion_time_utc"] = human_time_utc( + chkpt_completion_time) + + if data["last_synced"] == 0: + data["last_synced"] = DEFAULT_STATUS + data["last_synced_utc"] = DEFAULT_STATUS + else: + last_synced = data["last_synced"] + data["last_synced"] = human_time(last_synced) + data["last_synced_utc"] = human_time_utc(last_synced) + + if data["worker_status"] != "Active": + data["last_synced"] = DEFAULT_STATUS + data["last_synced_utc"] = DEFAULT_STATUS + data["crawl_status"] = DEFAULT_STATUS + data["entry"] = DEFAULT_STATUS + data["data"] = DEFAULT_STATUS + data["meta"] = DEFAULT_STATUS + data["failures"] = DEFAULT_STATUS + data["checkpoint_completed"] = DEFAULT_STATUS + data["checkpoint_time"] = DEFAULT_STATUS + data["checkpoint_completed_time"] = DEFAULT_STATUS + data["checkpoint_time_utc"] = DEFAULT_STATUS + data["checkpoint_completion_time_utc"] = DEFAULT_STATUS + + if data["worker_status"] not in ["Active", "Passive"]: + data["slave_node"] = DEFAULT_STATUS + + return data + + def print_status(self, checkpoint_time=0): + for key, value in self.get_status(checkpoint_time).items(): + print ("%s: %s" % (key, value)) diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 38535884ec6..8e4c43046b0 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -15,17 +15,15 @@ import stat import json import logging import fcntl -import socket import string import errno import tarfile -from errno import ENOENT, ENODATA, EPIPE, EEXIST, EACCES, EAGAIN +from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN from threading import Condition, Lock from datetime import datetime from gconf import gconf -from tempfile import NamedTemporaryFile from syncdutils import Thread, GsyncdError, boolify, escape -from syncdutils import unescape, select, gauxpfx, md5hex, selfkill +from syncdutils import unescape, gauxpfx, md5hex, selfkill from syncdutils import lstat, errno_wrap from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable @@ -397,18 +395,6 @@ class GMasterCommon(object): raise return default_data - def update_crawl_data(self): - if getattr(gconf, 'state_detail_file', None): - try: - same_dir = os.path.dirname(gconf.state_detail_file) - with NamedTemporaryFile(dir=same_dir, delete=False) as tmp: - json.dump(self.total_crawl_stats, tmp) - tmp.flush() - os.fsync(tmp.fileno()) - os.rename(tmp.name, gconf.state_detail_file) - except (IOError, OSError): - raise - def __init__(self, master, slave): self.master = master self.slave = slave @@ -434,14 +420,12 @@ class GMasterCommon(object): self.total_turns = int(gconf.turns) self.crawl_start = datetime.now() self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} - self.total_crawl_stats = None self.start = None self.change_seen = None # the actual volinfo we make use of self.volinfo = None self.terminate = False self.sleep_interval = 1 - self.checkpoint_thread = None self.current_files_skipped_count = 0 self.skipped_gfid_list = [] self.unlinked_gfids = [] @@ -493,7 +477,6 @@ class GMasterCommon(object): logging.debug("Got the lock") return True - def should_crawl(self): if not gconf.use_meta_volume: return gconf.glusterd_uuid in self.master.server.node_uuid() @@ -503,7 +486,6 @@ class GMasterCommon(object): sys.exit(1) return self.mgmt_lock() - def register(self): self.register() @@ -542,10 +524,8 @@ class GMasterCommon(object): if self.volinfo['retval']: logging.warn("master cluster's info may not be valid %d" % self.volinfo['retval']) - self.start_checkpoint_thread() else: raise GsyncdError("master volinfo unavailable") - self.total_crawl_stats = self.get_initial_crawl_data() self.lastreport['time'] = time.time() logging.info('crawl interval: %d seconds' % self.sleep_interval) @@ -570,7 +550,7 @@ class GMasterCommon(object): t0 = t1 self.update_worker_remote_node() if not crawl: - self.update_worker_health("Passive") + self.status.set_passive() # bring up _this_ brick to the cluster stime # which is min of cluster (but max of the replicas) brick_stime = self.xtime('.', self.slave) @@ -597,35 +577,14 @@ class GMasterCommon(object): time.sleep(5) continue - self.update_worker_health("Active") + + self.status.set_active() self.crawl() + if oneshot: return time.sleep(self.sleep_interval) - @classmethod - def _checkpt_param(cls, chkpt, prm, xtimish=True): - """use config backend to lookup a parameter belonging to - checkpoint @chkpt""" - cprm = gconf.configinterface.get_realtime('checkpoint_' + prm) - if not cprm: - return - chkpt_mapped, val = cprm.split(':', 1) - if unescape(chkpt_mapped) != chkpt: - return - if xtimish: - val = cls.deserialize_xtime(val) - return val - - @classmethod - def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True): - """use config backend to store a parameter associated - with checkpoint @chkpt""" - if xtimish: - val = cls.serialize_xtime(val) - gconf.configinterface.set( - 'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) - @staticmethod def humantime(*tpair): """format xtime-like (sec, nsec) pair to human readable format""" @@ -654,116 +613,6 @@ class GMasterCommon(object): string.zfill(m, 2), string.zfill(s, 2)) return date - def checkpt_service(self, chan, chkpt): - """checkpoint service loop - - monitor and verify checkpoint status for @chkpt, and listen - for incoming requests for whom we serve a pretty-formatted - status report""" - while True: - chkpt = gconf.configinterface.get_realtime("checkpoint") - if not chkpt: - gconf.configinterface.delete("checkpoint_completed") - gconf.configinterface.delete("checkpoint_target") - # dummy loop for the case when there is no checkpt set - select([chan], [], []) - conn, _ = chan.accept() - conn.send('\0') - conn.close() - continue - - checkpt_tgt = self._checkpt_param(chkpt, 'target') - if not checkpt_tgt: - checkpt_tgt = self.xtime('.') - if isinstance(checkpt_tgt, int): - raise GsyncdError("master root directory is " - "unaccessible (%s)", - os.strerror(checkpt_tgt)) - self._set_checkpt_param(chkpt, 'target', checkpt_tgt) - logging.debug("checkpoint target %s has been determined " - "for checkpoint %s" % - (repr(checkpt_tgt), chkpt)) - - # check if the label is 'now' - chkpt_lbl = chkpt - try: - x1, x2 = chkpt.split(':') - if x1 == 'now': - chkpt_lbl = "as of " + self.humantime(x2) - except: - pass - completed = self._checkpt_param(chkpt, 'completed', xtimish=False) - if completed: - completed = tuple(int(x) for x in completed.split('.')) - s, _, _ = select([chan], [], [], (not completed) and 5 or None) - # either request made and we re-check to not - # give back stale data, or we still hunting for completion - if (self.native_xtime(checkpt_tgt) and ( - self.native_xtime(checkpt_tgt) < self.volmark)): - # indexing has been reset since setting the checkpoint - status = "is invalid" - else: - xtr = self.xtime('.', self.slave) - if isinstance(xtr, int): - raise GsyncdError("slave root directory is " - "unaccessible (%s)", - os.strerror(xtr)) - ncompleted = self.xtime_geq(xtr, checkpt_tgt) - if completed and not ncompleted: # stale data - logging.warn("completion time %s for checkpoint %s " - "became stale" % - (self.humantime(*completed), chkpt)) - completed = None - gconf.configinterface.delete('checkpoint_completed') - if ncompleted and not completed: # just reaching completion - completed = "%.6f" % time.time() - self._set_checkpt_param( - chkpt, 'completed', completed, xtimish=False) - completed = tuple(int(x) for x in completed.split('.')) - logging.info("checkpoint %s completed" % chkpt) - status = completed and \ - "completed at " + self.humantime(completed[0]) or \ - "not reached yet" - if s: - conn = None - try: - conn, _ = chan.accept() - try: - conn.send("checkpoint %s is %s\0" % - (chkpt_lbl, status)) - except: - exc = sys.exc_info()[1] - if ((isinstance(exc, OSError) or isinstance( - exc, IOError)) and exc.errno == EPIPE): - logging.debug('checkpoint client disconnected') - else: - raise - finally: - if conn: - conn.close() - - def start_checkpoint_thread(self): - """prepare and start checkpoint service""" - if self.checkpoint_thread or not ( - getattr(gconf, 'state_socket_unencoded', None) and getattr( - gconf, 'socketdir', None) - ): - return - chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - state_socket = os.path.join( - gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket") - try: - os.unlink(state_socket) - except: - if sys.exc_info()[0] == OSError: - pass - chan.bind(state_socket) - chan.listen(1) - chkpt = gconf.configinterface.get_realtime("checkpoint") - t = Thread(target=self.checkpt_service, args=(chan, chkpt)) - t.start() - self.checkpoint_thread = t - def add_job(self, path, label, job, *a, **kw): """insert @job function to job table at @path with @label""" if self.jobtab.get(path) is None: @@ -937,11 +786,15 @@ class GMasterChangelogMixin(GMasterCommon): files_pending['purge'] += 1 def log_failures(failures, entry_key, gfid_prefix, log_prefix): + num_failures = 0 for failure in failures: st = lstat(os.path.join(gfid_prefix, failure[0][entry_key])) if not isinstance(st, int): + num_failures += 1 logging.warn('%s FAILED: %s' % (log_prefix, repr(failure))) + self.status.inc_value("failures", num_failures) + for e in clist: e = e.strip() et = e[self.IDX_START:self.IDX_END] # entry type @@ -1040,12 +893,18 @@ class GMasterChangelogMixin(GMasterCommon): else: logging.warn('got invalid changelog type: %s' % (et)) logging.debug('entries: %s' % repr(entries)) - if not retry: - self.update_worker_cumilitive_status(files_pending) + + # Increment counters for Status + self.status.inc_value("entry", len(entries)) + self.files_in_batch = len(datas) + self.status.inc_value("data", self.files_in_batch) + # sync namespace if entries: failures = self.slave.server.entry_ops(entries) log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') + self.status.dec_value("entry", len(entries)) + # sync metadata if meta_gfid: meta_entries = [] @@ -1059,8 +918,11 @@ class GMasterChangelogMixin(GMasterCommon): continue meta_entries.append(edct('META', go=go[0], stat=st)) if meta_entries: + self.status.inc_value("meta", len(entries)) failures = self.slave.server.meta_ops(meta_entries) log_failures(failures, 'go', '', 'META') + self.status.dec_value("meta", len(entries)) + # sync data if datas: self.a_syncdata(datas) @@ -1112,9 +974,17 @@ class GMasterChangelogMixin(GMasterCommon): if done: xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) + chkpt_time = gconf.configinterface.get_realtime( + "checkpoint") + checkpoint_time = 0 + if chkpt_time is not None: + checkpoint_time = int(chkpt_time) + + self.status.set_last_synced(xtl, checkpoint_time) map(self.changelog_done_func, changes) self.archive_and_purge_changelogs(changes) - self.update_worker_files_syncd() + self.status.dec_value("data", self.files_in_batch) + self.files_in_batch = 0 break # We do not know which changelog transfer failed, retry everything. @@ -1124,14 +994,22 @@ class GMasterChangelogMixin(GMasterCommon): logging.warn('changelogs %s could not be processed - ' 'moving on...' % ' '.join(map(os.path.basename, changes))) - self.update_worker_total_files_skipped( - self.current_files_skipped_count) + self.status.inc_value("failures", + self.current_files_skipped_count) logging.warn('SKIPPED GFID = %s' % ','.join(self.skipped_gfid_list)) - self.update_worker_files_syncd() + + self.files_in_batch = 0 if done: xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) + chkpt_time = gconf.configinterface.get_realtime( + "checkpoint") + checkpoint_time = 0 + if chkpt_time is not None: + checkpoint_time = int(chkpt_time) + + self.status.set_last_synced(xtl, checkpoint_time) map(self.changelog_done_func, changes) self.archive_and_purge_changelogs(changes) break @@ -1152,161 +1030,12 @@ class GMasterChangelogMixin(GMasterCommon): if not stime == URXTIME: self.sendmark(path, stime) - def get_worker_status_file(self): - file_name = gconf.local_path + '.status' - file_name = file_name.replace("/", "_") - worker_status_file = gconf.georep_session_working_dir + file_name - return worker_status_file - - def update_worker_status(self, key, value): - default_data = {"remote_node": "N/A", - "worker status": "Not Started", - "crawl status": "N/A", - "files_syncd": 0, - "files_remaining": 0, - "bytes_remaining": 0, - "purges_remaining": 0, - "total_files_skipped": 0} - worker_status_file = self.get_worker_status_file() - try: - with open(worker_status_file, 'r+') as f: - loaded_data = json.load(f) - loaded_data[key] = value - os.ftruncate(f.fileno(), 0) - os.lseek(f.fileno(), 0, os.SEEK_SET) - json.dump(loaded_data, f) - f.flush() - os.fsync(f.fileno()) - except (IOError, OSError, ValueError): - logging.info('Creating new %s' % worker_status_file) - try: - with open(worker_status_file, 'wb') as f: - default_data[key] = value - json.dump(default_data, f) - f.flush() - os.fsync(f.fileno()) - except: - raise - - def update_worker_cumilitive_status(self, files_pending): - default_data = {"remote_node": "N/A", - "worker status": "Not Started", - "crawl status": "N/A", - "files_syncd": 0, - "files_remaining": 0, - "bytes_remaining": 0, - "purges_remaining": 0, - "total_files_skipped": 0} - worker_status_file = self.get_worker_status_file() - try: - with open(worker_status_file, 'r+') as f: - loaded_data = json.load(f) - loaded_data['files_remaining'] = files_pending['count'] - loaded_data['bytes_remaining'] = files_pending['bytes'] - loaded_data['purges_remaining'] = files_pending['purge'] - os.ftruncate(f.fileno(), 0) - os.lseek(f.fileno(), 0, os.SEEK_SET) - json.dump(loaded_data, f) - f.flush() - os.fsync(f.fileno()) - except (IOError, OSError, ValueError): - logging.info('Creating new %s' % worker_status_file) - try: - with open(worker_status_file, 'wb') as f: - default_data['files_remaining'] = files_pending['count'] - default_data['bytes_remaining'] = files_pending['bytes'] - default_data['purges_remaining'] = files_pending['purge'] - json.dump(default_data, f) - f.flush() - os.fsync(f.fileno()) - except: - raise - def update_worker_remote_node(self): node = sys.argv[-1] - node = node.split("@")[-1] + node_data = node.split("@") + node = node_data[-1] remote_node_ip = node.split(":")[0] - remote_node_vol = node.split(":")[3] - remote_node = remote_node_ip + '::' + remote_node_vol - self.update_worker_status('remote_node', remote_node) - - def update_worker_health(self, state): - self.update_worker_status('worker status', state) - - def update_worker_crawl_status(self, state): - self.update_worker_status('crawl status', state) - - def update_worker_files_syncd(self): - default_data = {"remote_node": "N/A", - "worker status": "Not Started", - "crawl status": "N/A", - "files_syncd": 0, - "files_remaining": 0, - "bytes_remaining": 0, - "purges_remaining": 0, - "total_files_skipped": 0} - worker_status_file = self.get_worker_status_file() - try: - with open(worker_status_file, 'r+') as f: - loaded_data = json.load(f) - loaded_data['files_syncd'] += loaded_data['files_remaining'] - loaded_data['files_remaining'] = 0 - loaded_data['bytes_remaining'] = 0 - loaded_data['purges_remaining'] = 0 - os.ftruncate(f.fileno(), 0) - os.lseek(f.fileno(), 0, os.SEEK_SET) - json.dump(loaded_data, f) - f.flush() - os.fsync(f.fileno()) - except (IOError, OSError, ValueError): - logging.info('Creating new %s' % worker_status_file) - try: - with open(worker_status_file, 'wb') as f: - json.dump(default_data, f) - f.flush() - os.fsync(f.fileno()) - except: - raise - - def update_worker_files_remaining(self, state): - self.update_worker_status('files_remaining', state) - - def update_worker_bytes_remaining(self, state): - self.update_worker_status('bytes_remaining', state) - - def update_worker_purges_remaining(self, state): - self.update_worker_status('purges_remaining', state) - - def update_worker_total_files_skipped(self, value): - default_data = {"remote_node": "N/A", - "worker status": "Not Started", - "crawl status": "N/A", - "files_syncd": 0, - "files_remaining": 0, - "bytes_remaining": 0, - "purges_remaining": 0, - "total_files_skipped": 0} - worker_status_file = self.get_worker_status_file() - try: - with open(worker_status_file, 'r+') as f: - loaded_data = json.load(f) - loaded_data['total_files_skipped'] = value - loaded_data['files_remaining'] -= value - os.ftruncate(f.fileno(), 0) - os.lseek(f.fileno(), 0, os.SEEK_SET) - json.dump(loaded_data, f) - f.flush() - os.fsync(f.fileno()) - except (IOError, OSError, ValueError): - logging.info('Creating new %s' % worker_status_file) - try: - with open(worker_status_file, 'wb') as f: - default_data['total_files_skipped'] = value - json.dump(default_data, f) - f.flush() - os.fsync(f.fileno()) - except: - raise + self.status.set_slave_node(remote_node_ip) def changelogs_batch_process(self, changes): changelogs_batches = [] @@ -1331,7 +1060,7 @@ class GMasterChangelogMixin(GMasterCommon): self.process(batch) def crawl(self): - self.update_worker_crawl_status("Changelog Crawl") + self.status.set_worker_crawl_status("Changelog Crawl") changes = [] # get stime (from the brick) and purge changelogs # that are _historical_ to that time. @@ -1355,16 +1084,17 @@ class GMasterChangelogMixin(GMasterCommon): self.changelogs_batch_process(changes) - def register(self, register_time, changelog_agent): + def register(self, register_time, changelog_agent, status): self.changelog_agent = changelog_agent self.sleep_interval = int(gconf.change_interval) self.changelog_done_func = self.changelog_agent.done self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), ".processed") + self.status = status class GMasterChangeloghistoryMixin(GMasterChangelogMixin): - def register(self, register_time, changelog_agent): + def register(self, register_time, changelog_agent, status): self.changelog_agent = changelog_agent self.changelog_register_time = register_time self.history_crawl_start_time = register_time @@ -1372,10 +1102,11 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): self.history_turns = 0 self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), ".history/.processed") + self.status = status def crawl(self): self.history_turns += 1 - self.update_worker_crawl_status("History Crawl") + self.status.set_worker_crawl_status("History Crawl") purge_time = self.get_purge_time() logging.info('starting history crawl... turns: %s, stime: %s' @@ -1455,7 +1186,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin): XSYNC_MAX_ENTRIES = 1 << 13 - def register(self, register_time=None, changelog_agent=None): + def register(self, register_time=None, changelog_agent=None, status=None): + self.status = status self.counter = 0 self.comlist = [] self.stimes = [] @@ -1486,7 +1218,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): t.start() logging.info('starting hybrid crawl..., stime: %s' % repr(self.get_purge_time())) - self.update_worker_crawl_status("Hybrid Crawl") + self.status.set_worker_crawl_status("Hybrid Crawl") while True: try: item = self.comlist.pop(0) diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 029726c7a5a..ba5c8e32514 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -22,10 +22,12 @@ from errno import EEXIST import re import random from gconf import gconf -from syncdutils import update_file, select, waitpid +from syncdutils import select, waitpid from syncdutils import set_term_handler, is_host_local, GsyncdError from syncdutils import escape, Thread, finalize, memoize +from gsyncdstatus import GeorepStatus, set_monitor_status + ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -125,46 +127,22 @@ class Volinfo(object): def disperse_count(self): return int(self.get('disperseCount')[0].text) + class Monitor(object): """class which spawns and manages gsyncd workers""" ST_INIT = 'Initializing...' - ST_STABLE = 'Stable' - ST_FAULTY = 'faulty' + ST_STARTED = 'Started' + ST_STABLE = 'Active' + ST_FAULTY = 'Faulty' ST_INCON = 'inconsistent' _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON] def __init__(self): self.lock = Lock() self.state = {} - - def set_state(self, state, w=None): - """set the state that can be used by external agents - like glusterd for status reporting""" - computestate = lambda: self.state and self._ST_ORD[ - max(self._ST_ORD.index(s) for s in self.state.values())] - if w: - self.lock.acquire() - old_state = computestate() - self.state[w] = state - state = computestate() - self.lock.release() - if state != old_state: - self.set_state(state) - else: - if getattr(gconf, 'state_file', None): - # If previous state is paused, suffix the - # new state with '(Paused)' - try: - with open(gconf.state_file, "r") as f: - content = f.read() - if "paused" in content.lower(): - state = state + '(Paused)' - except IOError: - pass - logging.info('new state: %s' % state) - update_file(gconf.state_file, lambda f: f.write(state + '\n')) + self.status = {} @staticmethod def terminate(): @@ -174,8 +152,7 @@ class Monitor(object): # give a chance to graceful exit os.kill(-os.getpid(), signal.SIGTERM) - - def monitor(self, w, argv, cpids, agents, slave_vol, slave_host): + def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -194,8 +171,11 @@ class Monitor(object): blown worker blows up on EPIPE if the net goes down, due to the keep-alive thread) """ + if not self.status.get(w[0], None): + self.status[w[0]] = GeorepStatus(gconf.state_file, w[0]) - self.set_state(self.ST_INIT, w) + set_monitor_status(gconf.state_file, self.ST_STARTED) + self.status[w[0]].set_worker_status(self.ST_INIT) ret = 0 @@ -310,7 +290,7 @@ class Monitor(object): nwait(apid) #wait for agent ret = nwait(cpid) if ret is None: - self.set_state(self.ST_STABLE, w) + self.status[w[0]].set_worker_status(self.ST_STABLE) #If worker dies, agent terminates on EOF. #So lets wait for agent first. nwait(apid) @@ -320,12 +300,12 @@ class Monitor(object): else: ret = exit_status(ret) if ret in (0, 1): - self.set_state(self.ST_FAULTY, w) + self.status[w[0]].set_worker_status(self.ST_FAULTY) time.sleep(10) - self.set_state(self.ST_INCON, w) + self.status[w[0]].set_worker_status(self.ST_INCON) return ret - def multiplex(self, wspx, suuid, slave_vol, slave_host): + def multiplex(self, wspx, suuid, slave_vol, slave_host, master): argv = sys.argv[:] for o in ('-N', '--no-daemon', '--monitor'): while o in argv: @@ -339,7 +319,7 @@ class Monitor(object): for wx in wspx: def wmon(w): cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, - slave_host) + slave_host, master) time.sleep(1) self.lock.acquire() for cpid in cpids: @@ -401,7 +381,7 @@ def distribute(*resources): for idx, brick in enumerate(mvol.bricks) if is_host_local(brick['host'])] logging.info('worker specs: ' + repr(workerspex)) - return workerspex, suuid, slave_vol, slave_host + return workerspex, suuid, slave_vol, slave_host, master def monitor(*resources): diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index d3d1ee36e01..6bf1ad03e70 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -38,6 +38,7 @@ from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable from syncdutils import ChangelogException from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION +from gsyncdstatus import GeorepStatus UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') @@ -611,6 +612,9 @@ class Server(object): def collect_failure(e, cmd_ret): # We do this for failing fops on Slave # Master should be logging this + if cmd_ret is None: + return + if cmd_ret == EEXIST: disk_gfid = cls.gfid_mnt(e['entry']) if isinstance(disk_gfid, basestring): @@ -1344,6 +1348,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): os.close(int(ra)) os.close(int(wa)) changelog_agent = RepceClient(int(inf), int(ouf)) + status = GeorepStatus(gconf.state_file, gconf.local_path) + status.reset_on_worker_start() rv = changelog_agent.version() if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: raise GsyncdError( @@ -1367,13 +1373,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): g2.CHANGELOG_CONN_RETRIES) register_time = int(time.time()) - g2.register(register_time, changelog_agent) - g3.register(register_time, changelog_agent) + g2.register(register_time, changelog_agent, status) + g3.register(register_time, changelog_agent, status) except ChangelogException as e: logging.error("Changelog register failed, %s" % e) sys.exit(1) - g1.register() + g1.register(status=status) logging.info("Register time: %s" % register_time) # oneshot: Try to use changelog history api, if not # available switch to FS crawl diff --git a/geo-replication/tests/unit/test_gsyncdstatus.py b/geo-replication/tests/unit/test_gsyncdstatus.py new file mode 100644 index 00000000000..a65d659e356 --- /dev/null +++ b/geo-replication/tests/unit/test_gsyncdstatus.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import unittest +import os +import urllib + +from syncdaemon.gstatus import GeorepStatus, set_monitor_status +from syncdaemon.gstatus import get_default_values +from syncdaemon.gstatus import MONITOR_STATUS, DEFAULT_STATUS +from syncdaemon.gstatus import STATUS_VALUES, CRAWL_STATUS_VALUES +from syncdaemon.gstatus import human_time, human_time_utc + + +class GeorepStatusTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.work_dir = os.path.dirname(os.path.abspath(__file__)) + cls.monitor_status_file = os.path.join(cls.work_dir, "monitor.status") + cls.brick = "/exports/bricks/b1" + cls.status = GeorepStatus(cls.monitor_status_file, cls.brick) + cls.statusfile = os.path.join(cls.work_dir, + "brick_%s.status" + % urllib.quote_plus(cls.brick)) + + @classmethod + def tearDownClass(cls): + os.remove(cls.statusfile) + os.remove(cls.monitor_status_file) + + def _filter_dict(self, inp, keys): + op = {} + for k in keys: + op[k] = inp.get(k, None) + return op + + def test_monitor_status_file_created(self): + self.assertTrue(os.path.exists(self.monitor_status_file)) + + def test_status_file_created(self): + self.assertTrue(os.path.exists(self.statusfile)) + + def test_set_monitor_status(self): + for st in MONITOR_STATUS: + set_monitor_status(self.monitor_status_file, st) + self.assertTrue(self.status.get_monitor_status(), st) + + def test_default_values_test(self): + self.assertTrue(get_default_values(), { + "slave_node": DEFAULT_STATUS, + "worker_status": DEFAULT_STATUS, + "last_synced": 0, + "last_synced_utc": 0, + "crawl_status": DEFAULT_STATUS, + "entry": 0, + "data": 0, + "metadata": 0, + "failures": 0, + "checkpoint_completed": False, + "checkpoint_time": 0, + "checkpoint_time_utc": 0, + "checkpoint_completion_time": 0, + "checkpoint_completion_time_utc": 0 + }) + + def test_human_time(self): + self.assertTrue(human_time(1429174398), "2015-04-16 14:23:18") + + def test_human_time_utc(self): + self.assertTrue(human_time_utc(1429174398), "2015-04-16 08:53:18") + + def test_invalid_human_time(self): + self.assertTrue(human_time(142917439), DEFAULT_STATUS) + self.assertTrue(human_time("abcdef"), DEFAULT_STATUS) + + def test_invalid_human_time_utc(self): + self.assertTrue(human_time_utc(142917439), DEFAULT_STATUS) + self.assertTrue(human_time_utc("abcdef"), DEFAULT_STATUS) + + def test_worker_status(self): + set_monitor_status(self.monitor_status_file, "Started") + for st in STATUS_VALUES: + self.status.set_worker_status(st) + self.assertTrue(self.status.get_status()["worker_status"], st) + + def test_crawl_status(self): + set_monitor_status(self.monitor_status_file, "Started") + self.status.set_active() + for st in CRAWL_STATUS_VALUES: + self.status.set_worker_crawl_status(st) + self.assertTrue(self.status.get_status()["crawl_status"], st) + + def test_slave_node(self): + set_monitor_status(self.monitor_status_file, "Started") + self.status.set_active() + self.status.set_slave_node("fvm2") + self.assertTrue(self.status.get_status()["slave_node"], "fvm2") + + self.status.set_worker_status("Passive") + self.status.set_slave_node("fvm2") + self.assertTrue(self.status.get_status()["slave_node"], "fvm2") + + def test_active_worker_status(self): + set_monitor_status(self.monitor_status_file, "Started") + self.status.set_active() + self.assertTrue(self.status.get_status()["worker_status"], "Active") + + def test_passive_worker_status(self): + set_monitor_status(self.monitor_status_file, "Started") + self.status.set_passive() + self.assertTrue(self.status.get_status()["worker_status"], "Passive") + + def test_set_field(self): + set_monitor_status(self.monitor_status_file, "Started") + self.status.set_active() + self.status.set_field("entry", 42) + self.assertTrue(self.status.get_status()["entry"], 42) + + def test_inc_value(self): + set_monitor_status(self.monitor_status_file, "Started") + self.status.set_active() + self.status.set_field("entry", 0) + self.status.inc_value("entry", 2) + self.assertTrue(self.status.get_status()["entry"], 2) + + self.status.set_field("data", 0) + self.status.inc_value("data", 2) + self.assertTrue(self.status.get_status()["data"], 2) + + self.status.set_field("meta", 0) + self.status.inc_value("meta", 2) + self.assertTrue(self.status.get_status()["meta"], 2) + + self.status.set_field("failures", 0) + self.status.inc_value("failures", 2) + self.assertTrue(self.status.get_status()["failures"], 2) + + def test_dec_value(self): + set_monitor_status(self.monitor_status_file, "Started") + self.status.set_active() + + self.status.set_field("entry", 4) + self.status.inc_value("entry", 2) + self.assertTrue(self.status.get_status()["entry"], 2) + + self.status.set_field("data", 4) + self.status.inc_value("data", 2) + self.assertTrue(self.status.get_status()["data"], 2) + + self.status.set_field("meta", 4) + self.status.inc_value("meta", 2) + self.assertTrue(self.status.get_status()["meta"], 2) + + self.status.set_field("failures", 4) + self.status.inc_value("failures", 2) + self.assertTrue(self.status.get_status()["failures"], 2) + + def test_worker_status_when_monitor_status_created(self): + set_monitor_status(self.monitor_status_file, "Created") + for st in STATUS_VALUES: + self.status.set_worker_status(st) + self.assertTrue(self.status.get_status()["worker_status"], + "Created") + + def test_worker_status_when_monitor_status_paused(self): + set_monitor_status(self.monitor_status_file, "Paused") + for st in STATUS_VALUES: + self.status.set_worker_status(st) + self.assertTrue(self.status.get_status()["worker_status"], + "Paused") + + def test_worker_status_when_monitor_status_stopped(self): + set_monitor_status(self.monitor_status_file, "Stopped") + for st in STATUS_VALUES: + self.status.set_worker_status(st) + self.assertTrue(self.status.get_status()["worker_status"], + "Stopped") + + def test_status_when_worker_status_active(self): + set_monitor_status(self.monitor_status_file, "Started") + self.status.set_active() + + +if __name__ == "__main__": + unittest.main() diff --git a/rpc/rpc-lib/src/protocol-common.h b/rpc/rpc-lib/src/protocol-common.h index 3b964331892..2f7f23f3b45 100644 --- a/rpc/rpc-lib/src/protocol-common.h +++ b/rpc/rpc-lib/src/protocol-common.h @@ -248,15 +248,21 @@ struct gf_gsync_detailed_status_ { char master[NAME_MAX]; char brick[NAME_MAX]; char slave_user[NAME_MAX]; + char slave[NAME_MAX]; char slave_node[NAME_MAX]; char worker_status[NAME_MAX]; - char checkpoint_status[NAME_MAX]; char crawl_status[NAME_MAX]; - char files_syncd[NAME_MAX]; - char files_remaining[NAME_MAX]; - char bytes_remaining[NAME_MAX]; - char purges_remaining[NAME_MAX]; - char total_files_skipped[NAME_MAX]; + char last_synced[NAME_MAX]; + char last_synced_utc[NAME_MAX]; + char entry[NAME_MAX]; + char data[NAME_MAX]; + char meta[NAME_MAX]; + char failures[NAME_MAX]; + char checkpoint_time[NAME_MAX]; + char checkpoint_time_utc[NAME_MAX]; + char checkpoint_completed[NAME_MAX]; + char checkpoint_completion_time[NAME_MAX]; + char checkpoint_completion_time_utc[NAME_MAX]; char brick_host_uuid[NAME_MAX]; char slavekey[NAME_MAX]; char session_slave[NAME_MAX]; diff --git a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c index 708d6d3816d..24768e38231 100644 --- a/xlators/mgmt/glusterd/src/glusterd-geo-rep.c +++ b/xlators/mgmt/glusterd/src/glusterd-geo-rep.c @@ -667,6 +667,128 @@ glusterd_gsync_get_config (char *master, char *slave, char *conf_path, dict_t *d } static int +_fcbk_statustostruct (char *resbuf, size_t blen, FILE *fp, + void *data) +{ + char *ptr = NULL; + char *v = NULL; + char *k = NULL; + gf_gsync_status_t *sts_val = NULL; + + sts_val = (gf_gsync_status_t *)data; + + for (;;) { + errno = 0; + ptr = fgets (resbuf, blen, fp); + if (!ptr) + break; + v = resbuf + strlen(resbuf) - 1; + while (isspace (*v)) + /* strip trailing space */ + *v-- = '\0'; + if (v == resbuf) + /* skip empty line */ + continue; + v = strchr (resbuf, ':'); + if (!v) + return -1; + *v++ = '\0'; + while (isspace (*v)) + v++; + v = gf_strdup (v); + if (!v) + return -1; + + k = gf_strdup (resbuf); + if (!k) + return -1; + + if (strcmp (k, "worker_status") == 0) { + memcpy (sts_val->worker_status, v, + strlen(v)); + sts_val->worker_status[strlen(v)] = '\0'; + } else if (strcmp (k, "slave_node") == 0) { + memcpy (sts_val->slave_node, v, + strlen(v)); + sts_val->slave_node[strlen(v)] = '\0'; + } else if (strcmp (k, "crawl_status") == 0) { + memcpy (sts_val->crawl_status, v, + strlen(v)); + sts_val->crawl_status[strlen(v)] = '\0'; + } else if (strcmp (k, "last_synced") == 0) { + memcpy (sts_val->last_synced, v, + strlen(v)); + sts_val->last_synced[strlen(v)] = '\0'; + } else if (strcmp (k, "last_synced_utc") == 0) { + memcpy (sts_val->last_synced_utc, v, + strlen(v)); + sts_val->last_synced_utc[strlen(v)] = '\0'; + } else if (strcmp (k, "entry") == 0) { + memcpy (sts_val->entry, v, + strlen(v)); + sts_val->entry[strlen(v)] = '\0'; + } else if (strcmp (k, "data") == 0) { + memcpy (sts_val->data, v, + strlen(v)); + sts_val->data[strlen(v)] = '\0'; + } else if (strcmp (k, "meta") == 0) { + memcpy (sts_val->meta, v, + strlen(v)); + sts_val->meta[strlen(v)] = '\0'; + } else if (strcmp (k, "failures") == 0) { + memcpy (sts_val->failures, v, + strlen(v)); + sts_val->failures[strlen(v)] = '\0'; + } else if (strcmp (k, "checkpoint_time") == 0) { + memcpy (sts_val->checkpoint_time, v, + strlen(v)); + sts_val->checkpoint_time[strlen(v)] = '\0'; + } else if (strcmp (k, "checkpoint_time_utc") == 0) { + memcpy (sts_val->checkpoint_time_utc, v, + strlen(v)); + sts_val->checkpoint_time_utc[strlen(v)] = '\0'; + } else if (strcmp (k, "checkpoint_completed") == 0) { + memcpy (sts_val->checkpoint_completed, v, + strlen(v)); + sts_val->checkpoint_completed[strlen(v)] = '\0'; + } else if (strcmp (k, "checkpoint_completion_time") == 0) { + memcpy (sts_val->checkpoint_completion_time, v, + strlen(v)); + sts_val->checkpoint_completion_time[strlen(v)] = '\0'; + } else if (strcmp (k, "checkpoint_completion_time_utc") == 0) { + memcpy (sts_val->checkpoint_completion_time_utc, v, + strlen(v)); + sts_val->checkpoint_completion_time_utc[strlen(v)] = + '\0'; + } + } + + return errno ? -1 : 0; +} + + +static int +glusterd_gsync_get_status (char *master, char *slave, char *conf_path, + char *brick_path, gf_gsync_status_t *sts_val) +{ + /* key + value, where value must be able to accommodate a path */ + char resbuf[256 + PATH_MAX] = {0,}; + runner_t runner = {0,}; + + runinit (&runner); + runner_add_args (&runner, GSYNCD_PREFIX"/gsyncd", "-c", NULL); + runner_argprintf (&runner, "%s", conf_path); + runner_argprintf (&runner, "--iprefix=%s", DATADIR); + runner_argprintf (&runner, ":%s", master); + runner_add_args (&runner, slave, "--status-get", NULL); + runner_add_args (&runner, "--path", brick_path, NULL); + + return glusterd_query_extutil_generic (resbuf, sizeof (resbuf), + &runner, sts_val, + _fcbk_statustostruct); +} + +static int glusterd_gsync_get_param_file (char *prmfile, const char *param, char *master, char *slave, char *conf_path) { @@ -2804,7 +2926,6 @@ gd_pause_or_resume_gsync (dict_t *dict, char *master, char *slave, gf_boolean_t is_template_in_use = _gf_false; char monitor_status[NAME_MAX] = {0,}; char *statefile = NULL; - char *token = NULL; xlator_t *this = NULL; this = THIS; @@ -2869,10 +2990,10 @@ gd_pause_or_resume_gsync (dict_t *dict, char *master, char *slave, do not update status again*/ if (strstr (monitor_status, "Paused")) goto out; - (void) strcat (monitor_status, "(Paused)"); + ret = glusterd_create_status_file ( master, slave, slave_host, slave_vol, - monitor_status); + "Paused"); if (ret) { gf_log (this->name, GF_LOG_ERROR, "Unable to update state_file." @@ -2893,10 +3014,10 @@ gd_pause_or_resume_gsync (dict_t *dict, char *master, char *slave, goto out; } } else { - token = strtok (monitor_status, "("); ret = glusterd_create_status_file (master, slave, slave_host, - slave_vol, token); + slave_vol, + "Started"); if (ret) { gf_log (this->name, GF_LOG_ERROR, "Resume Failed: Unable to update " @@ -3321,159 +3442,6 @@ out: return ret; } -static int -glusterd_parse_gsync_status (char *buf, gf_gsync_status_t *sts_val) -{ - int ret = -1; - int i = -1; - int num_of_fields = 8; - char *token = NULL; - char **tokens = NULL; - char **ptr = NULL; - char *save_ptr = NULL; - char na_buf[] = "N/A"; - - if (!buf) { - gf_log ("", GF_LOG_ERROR, "Empty buf"); - goto out; - } - - tokens = calloc (num_of_fields, sizeof (char *)); - if (!tokens) { - gf_log ("", GF_LOG_ERROR, "Out of memory"); - goto out; - } - - ptr = tokens; - - for (token = strtok_r (buf, ",", &save_ptr); token; - token = strtok_r (NULL, ",", &save_ptr)) { - *ptr = gf_strdup(token); - if (!*ptr) { - gf_log ("", GF_LOG_ERROR, "Out of memory"); - goto out; - } - ptr++; - } - - for (i = 0; i < num_of_fields; i++) { - token = strtok_r (tokens[i], ":", &save_ptr); - token = strtok_r (NULL, "\0", &save_ptr); - token++; - - /* token NULL check */ - if (!token && (i != 0) && - (i != 5) && (i != 7)) - token = na_buf; - - if (i == 0) { - if (!token) - token = na_buf; - else { - token++; - if (!token) - token = na_buf; - else - token[strlen(token) - 1] = '\0'; - } - memcpy (sts_val->slave_node, token, strlen(token)); - } - if (i == 1) - memcpy (sts_val->files_syncd, token, strlen(token)); - if (i == 2) - memcpy (sts_val->purges_remaining, token, strlen(token)); - if (i == 3) - memcpy (sts_val->total_files_skipped, token, strlen(token)); - if (i == 4) - memcpy (sts_val->files_remaining, token, strlen(token)); - if (i == 5) { - if (!token) - token = na_buf; - else { - token++; - if (!token) - token = na_buf; - else - token[strlen(token) - 1] = '\0'; - } - memcpy (sts_val->worker_status, token, strlen(token)); - } - if (i == 6) - memcpy (sts_val->bytes_remaining, token, strlen(token)); - if (i == 7) { - if (!token) - token = na_buf; - else { - token++; - if (!token) - token = na_buf; - else - token[strlen(token) - 2] = '\0'; - } - memcpy (sts_val->crawl_status, token, strlen(token)); - } - } - - ret = 0; -out: - for (i = 0; i< num_of_fields; i++) - if (tokens[i]) - GF_FREE(tokens[i]); - - gf_log ("", GF_LOG_DEBUG, "Returning %d", ret); - return ret; -} - -static int -glusterd_gsync_fetch_status_extra (char *path, gf_gsync_status_t *sts_val) -{ - char sockpath[PATH_MAX] = {0,}; - struct sockaddr_un sa = {0,}; - int s = -1; - struct pollfd pfd = {0,}; - int ret = 0; - - glusterd_set_socket_filepath (path, sockpath, sizeof (sockpath)); - - strncpy(sa.sun_path, sockpath, sizeof(sa.sun_path)); - if (sa.sun_path[sizeof (sa.sun_path) - 1]) - return -1; - sa.sun_family = AF_UNIX; - - s = socket(AF_UNIX, SOCK_STREAM, 0); - if (s == -1) - return -1; - ret = fcntl (s, F_GETFL); - if (ret != -1) - ret = fcntl (s, F_SETFL, ret | O_NONBLOCK); - if (ret == -1) - goto out; - - ret = connect (s, (struct sockaddr *)&sa, sizeof (sa)); - if (ret == -1) - goto out; - pfd.fd = s; - pfd.events = POLLIN; - /* we don't want to hang on gsyncd */ - if (poll (&pfd, 1, 5000) < 1 || - !(pfd.revents & POLLIN)) { - ret = -1; - goto out; - } - ret = read(s, sts_val->checkpoint_status, - sizeof(sts_val->checkpoint_status)); - /* we expect a terminating 0 byte */ - if (ret == 0 || (ret > 0 && sts_val->checkpoint_status[ret - 1])) - ret = -1; - if (ret > 0) { - ret = 0; - } - -out: - close (s); - return ret; -} - int glusterd_fetch_values_from_config (char *master, char *slave, char *confpath, dict_t *confd, @@ -3567,6 +3535,7 @@ glusterd_read_status_file (glusterd_volinfo_t *volinfo, char *slave, gf_boolean_t is_template_in_use = _gf_false; glusterd_conf_t *priv = NULL; struct stat stbuf = {0,}; + dict_t *statusd = NULL; GF_ASSERT (THIS); GF_ASSERT (THIS->private); @@ -3657,114 +3626,7 @@ fetch_data: goto out; } - /* Creating the brick state file's path */ - memset(brick_state_file, '\0', PATH_MAX); - memcpy (brick_path, brickinfo->path, PATH_MAX - 1); - for (i = 0; i < strlen(brick_path) - 1; i++) - if (brick_path[i] == '/') - brick_path[i] = '_'; - ret = snprintf(brick_state_file, PATH_MAX - 1, "%s%s.status", - georep_session_wrkng_dir, brick_path); - brick_state_file[ret] = '\0'; - - gf_log ("", GF_LOG_DEBUG, "brick_state_file = %s", brick_state_file); - - memset (tmp, '\0', sizeof(tmp)); - - ret = glusterd_gsync_read_frm_status (brick_state_file, - tmp, sizeof (tmp)); - if (ret <= 0) { - gf_log ("", GF_LOG_ERROR, "Unable to read the status" - "file for %s brick for %s(master), %s(slave) " - "session", brickinfo->path, master, slave); - memcpy (sts_val->slave_node, slave, strlen(slave)); - sts_val->slave_node[strlen(slave)] = '\0'; - ret = snprintf (sts_val->worker_status, sizeof(sts_val->worker_status), "N/A"); - sts_val->worker_status[ret] = '\0'; - ret = snprintf (sts_val->checkpoint_status, sizeof(sts_val->checkpoint_status), "N/A"); - sts_val->checkpoint_status[ret] = '\0'; - ret = snprintf (sts_val->crawl_status, sizeof(sts_val->crawl_status), "N/A"); - sts_val->crawl_status[ret] = '\0'; - ret = snprintf (sts_val->files_syncd, sizeof(sts_val->files_syncd), "N/A"); - sts_val->files_syncd[ret] = '\0'; - ret = snprintf (sts_val->purges_remaining, sizeof(sts_val->purges_remaining), "N/A"); - sts_val->purges_remaining[ret] = '\0'; - ret = snprintf (sts_val->total_files_skipped, sizeof(sts_val->total_files_skipped), "N/A"); - sts_val->total_files_skipped[ret] = '\0'; - ret = snprintf (sts_val->files_remaining, sizeof(sts_val->files_remaining), "N/A"); - sts_val->files_remaining[ret] = '\0'; - ret = snprintf (sts_val->bytes_remaining, sizeof(sts_val->bytes_remaining), "N/A"); - sts_val->bytes_remaining[ret] = '\0'; - goto store_status; - } - - ret = glusterd_gsync_fetch_status_extra (socketfile, sts_val); - if (ret || strlen(sts_val->checkpoint_status) == 0) { - gf_log ("", GF_LOG_DEBUG, "No checkpoint status" - "for %s(master), %s(slave)", master, slave); - ret = snprintf (sts_val->checkpoint_status, sizeof(sts_val->checkpoint_status), "N/A"); - sts_val->checkpoint_status[ret] = '\0'; - } - - ret = glusterd_parse_gsync_status (tmp, sts_val); - if (ret) { - gf_log ("", GF_LOG_ERROR, - "Unable to parse the gsync status for %s", - brickinfo->path); - memcpy (sts_val->slave_node, slave, strlen(slave)); - sts_val->slave_node[strlen(slave)] = '\0'; - ret = snprintf (sts_val->worker_status, sizeof(sts_val->worker_status), "N/A"); - sts_val->worker_status[ret] = '\0'; - ret = snprintf (sts_val->checkpoint_status, sizeof(sts_val->checkpoint_status), "N/A"); - sts_val->checkpoint_status[ret] = '\0'; - ret = snprintf (sts_val->crawl_status, sizeof(sts_val->crawl_status), "N/A"); - sts_val->crawl_status[ret] = '\0'; - ret = snprintf (sts_val->files_syncd, sizeof(sts_val->files_syncd), "N/A"); - sts_val->files_syncd[ret] = '\0'; - ret = snprintf (sts_val->purges_remaining, sizeof(sts_val->purges_remaining), "N/A"); - sts_val->purges_remaining[ret] = '\0'; - ret = snprintf (sts_val->total_files_skipped, sizeof(sts_val->total_files_skipped), "N/A"); - sts_val->total_files_skipped[ret] = '\0'; - ret = snprintf (sts_val->files_remaining, sizeof(sts_val->files_remaining), "N/A"); - sts_val->files_remaining[ret] = '\0'; - ret = snprintf (sts_val->bytes_remaining, sizeof(sts_val->bytes_remaining), "N/A"); - sts_val->bytes_remaining[ret] = '\0'; - } - -store_status: - if ((strcmp (monitor_status, "Stable"))) { - memcpy (sts_val->worker_status, monitor_status, strlen(monitor_status)); - sts_val->worker_status[strlen(monitor_status)] = '\0'; - ret = snprintf (sts_val->crawl_status, sizeof(sts_val->crawl_status), "N/A"); - sts_val->crawl_status[ret] = '\0'; - ret = snprintf (sts_val->checkpoint_status, sizeof(sts_val->checkpoint_status), "N/A"); - sts_val->checkpoint_status[ret] = '\0'; - } - - if (is_template_in_use) { - ret = snprintf (sts_val->worker_status, - sizeof(sts_val->worker_status), - "Config Corrupted"); - sts_val->worker_status[ret] = '\0'; - } - - if (strcmp (sts_val->worker_status, "Active")) { - ret = snprintf (sts_val->checkpoint_status, sizeof(sts_val->checkpoint_status), "N/A"); - sts_val->checkpoint_status[ret] = '\0'; - ret = snprintf (sts_val->crawl_status, sizeof(sts_val->crawl_status), "N/A"); - sts_val->crawl_status[ret] = '\0'; - } - - if (!strcmp (sts_val->slave_node, "N/A")) { - memcpy (sts_val->slave_node, slave, strlen(slave)); - sts_val->slave_node[strlen(slave)] = '\0'; - } - - memcpy (sts_val->node, node, strlen(node)); - sts_val->node[strlen(node)] = '\0'; - memcpy (sts_val->brick, brickinfo->path, strlen(brickinfo->path)); - sts_val->brick[strlen(brickinfo->path)] = '\0'; - + /* Slave Key */ ret = glusterd_get_slave (volinfo, slave, &slavekey); if (ret < 0) { GF_FREE (sts_val); @@ -3773,14 +3635,87 @@ store_status: memcpy (sts_val->slavekey, slavekey, strlen(slavekey)); sts_val->slavekey[strlen(slavekey)] = '\0'; + /* Master Volume */ + memcpy (sts_val->master, master, strlen(master)); + sts_val->master[strlen(master)] = '\0'; + + /* Master Brick Node */ + memcpy (sts_val->node, node, strlen(node)); + sts_val->node[strlen(node)] = '\0'; + + /* Master Brick Path */ + memcpy (sts_val->brick, brickinfo->path, + strlen(brickinfo->path)); + sts_val->brick[strlen(brickinfo->path)] = '\0'; + + /* Brick Host UUID */ brick_host_uuid = uuid_utoa(brickinfo->uuid); brick_host_uuid_length = strlen (brick_host_uuid); memcpy (sts_val->brick_host_uuid, brick_host_uuid, brick_host_uuid_length); sts_val->brick_host_uuid[brick_host_uuid_length] = '\0'; - memcpy (sts_val->master, master, strlen(master)); - sts_val->master[strlen(master)] = '\0'; + /* Slave */ + memcpy (sts_val->slave, slave, strlen(slave)); + sts_val->slave[strlen(slave)] = '\0'; + + snprintf (sts_val->slave_node, + sizeof(sts_val->slave_node), "N/A"); + + snprintf (sts_val->worker_status, + sizeof(sts_val->worker_status), "N/A"); + + snprintf (sts_val->crawl_status, + sizeof(sts_val->crawl_status), "N/A"); + + snprintf (sts_val->last_synced, + sizeof(sts_val->last_synced), "N/A"); + + snprintf (sts_val->last_synced_utc, + sizeof(sts_val->last_synced_utc), "N/A"); + + snprintf (sts_val->entry, sizeof(sts_val->entry), "N/A"); + + snprintf (sts_val->data, sizeof(sts_val->data), "N/A"); + + snprintf (sts_val->meta, sizeof(sts_val->meta), "N/A"); + + snprintf (sts_val->failures, sizeof(sts_val->failures), "N/A"); + + snprintf (sts_val->checkpoint_time, + sizeof(sts_val->checkpoint_time), "N/A"); + + snprintf (sts_val->checkpoint_time_utc, + sizeof(sts_val->checkpoint_time_utc), "N/A"); + + snprintf (sts_val->checkpoint_completed, + sizeof(sts_val->checkpoint_completed), "N/A"); + + snprintf (sts_val->checkpoint_completion_time, + sizeof(sts_val->checkpoint_completion_time), + "N/A"); + + snprintf (sts_val->checkpoint_completion_time_utc, + sizeof(sts_val->checkpoint_completion_time_utc), + "N/A"); + + /* Get all the other values from Gsyncd */ + ret = glusterd_gsync_get_status (master, slave, conf_path, + brickinfo->path, sts_val); + + if (ret) { + gf_log ("", GF_LOG_ERROR, "Unable to get status data " + "for %s(master), %s(slave), %s(brick)", + master, slave, brickinfo->path); + ret = -1; + goto out; + } + + if (is_template_in_use) { + snprintf (sts_val->worker_status, + sizeof(sts_val->worker_status), + "Config Corrupted"); + } ret = dict_get_str (volinfo->gsync_slaves, slavekey, &slaveentry); @@ -3809,8 +3744,10 @@ store_status: strlen(slaveuser)); sts_val->slave_user[strlen(slaveuser)] = '\0'; - snprintf (sts_val_name, sizeof (sts_val_name), "status_value%d", gsync_count); - ret = dict_set_bin (dict, sts_val_name, sts_val, sizeof(gf_gsync_status_t)); + snprintf (sts_val_name, sizeof (sts_val_name), + "status_value%d", gsync_count); + ret = dict_set_bin (dict, sts_val_name, sts_val, + sizeof(gf_gsync_status_t)); if (ret) { GF_FREE (sts_val); goto out; @@ -5258,7 +5195,7 @@ glusterd_create_essential_dir_files (glusterd_volinfo_t *volinfo, dict_t *dict, } else { ret = glusterd_create_status_file (volinfo->volname, slave, slave_host, slave_vol, - "Not Started"); + "Created"); if (ret || lstat (statefile, &stbuf)) { snprintf (errmsg, sizeof (errmsg), "Unable to create %s" ". Error : %s", statefile, strerror (errno)); |