diff options
-rwxr-xr-x | glustolibs-gluster/glustolibs/gluster/gluster_base_class.py | 210 | ||||
-rw-r--r-- | glustolibs-io/glustolibs/io/memory_and_cpu_utils.py | 899 |
2 files changed, 1109 insertions, 0 deletions
diff --git a/glustolibs-gluster/glustolibs/gluster/gluster_base_class.py b/glustolibs-gluster/glustolibs/gluster/gluster_base_class.py index b43318fe4..baec1be8a 100755 --- a/glustolibs-gluster/glustolibs/gluster/gluster_base_class.py +++ b/glustolibs-gluster/glustolibs/gluster/gluster_base_class.py @@ -1105,3 +1105,213 @@ class GlusterBaseClass(TestCase): raise ExecutionError("Force cleanup of nfs-ganesha " "cluster failed") g.log.info("Teardown nfs ganesha cluster succeeded") + + @classmethod + def start_memory_and_cpu_usage_logging(cls, interval=60, count=100): + """Upload logger script and start logging usage on cluster + + Kawrgs: + interval(int): Time interval after which logs are to be collected + (Default: 60) + count(int): Number of samples to be collected(Default: 100) + + Returns: + proc_dict(dict):Dictionary of logging processes + """ + # imports are added inside function to make it them + # optional and not cause breakage on installation + # which don't use the resource leak library + from glustolibs.io.memory_and_cpu_utils import ( + check_upload_memory_and_cpu_logger_script, + log_memory_and_cpu_usage_on_cluster) + + # Checking if script is present on servers or not if not then + # upload it to servers. + if not check_upload_memory_and_cpu_logger_script(cls.servers): + return None + + # Checking if script is present on clients or not if not then + # upload it to clients. + if not check_upload_memory_and_cpu_logger_script(cls.clients): + return None + + # Start logging on servers and clients + proc_dict = log_memory_and_cpu_usage_on_cluster( + cls.servers, cls.clients, cls.id(), interval, count) + + return proc_dict + + @classmethod + def compute_and_print_usage_stats(cls, proc_dict, kill_proc=False): + """Compute and print CPU and memory usage statistics + + Args: + proc_dict(dict):Dictionary of logging processes + + Kwargs: + kill_proc(bool): Kill logging process if true else wait + for process to complete execution + """ + # imports are added inside function to make it them + # optional and not cause breakage on installation + # which don't use the resource leak library + from glustolibs.io.memory_and_cpu_utils import ( + wait_for_logging_processes_to_stop, kill_all_logging_processes, + compute_data_usage_stats_on_servers, + compute_data_usage_stats_on_clients) + + # Wait or kill running logging process + if kill_proc: + nodes = cls.servers + cls.clients + ret = kill_all_logging_processes(proc_dict, nodes, cluster=True) + if not ret: + g.log.error("Unable to stop logging processes.") + else: + ret = wait_for_logging_processes_to_stop(proc_dict, cluster=True) + if not ret: + g.log.error("Processes didn't complete still running.") + + # Compute and print stats for servers + ret = compute_data_usage_stats_on_servers(cls.servers, cls.id()) + g.log.info('*' * 50) + g.log.info(ret) # TODO: Make logged message more structured + g.log.info('*' * 50) + + # Compute and print stats for clients + ret = compute_data_usage_stats_on_clients(cls.clients, cls.id()) + g.log.info('*' * 50) + g.log.info(ret) # TODO: Make logged message more structured + g.log.info('*' * 50) + + @classmethod + def check_for_memory_leaks_and_oom_kills_on_servers(cls, gain=30.0): + """Check for memory leaks and OOM kills on servers + + Kwargs: + gain(float): Accepted amount of leak for a given testcase in MB + (Default:30) + + Returns: + bool: True if memory leaks or OOM kills are observed else false + """ + # imports are added inside function to make it them + # optional and not cause breakage on installation + # which don't use the resource leak library + from glustolibs.io.memory_and_cpu_utils import ( + check_for_memory_leaks_in_glusterd, + check_for_memory_leaks_in_glusterfs, + check_for_memory_leaks_in_glusterfsd, + check_for_oom_killers_on_servers) + + # Check for memory leaks on glusterd + if check_for_memory_leaks_in_glusterd(cls.servers, cls.id(), gain): + g.log.error("Memory leak on glusterd.") + return True + + # Check for memory leaks on shd + if check_for_memory_leaks_in_glusterfs(cls.servers, cls.id(), gain): + g.log.error("Memory leak on shd.") + return True + + # Check for memory leaks on brick processes + if check_for_memory_leaks_in_glusterfsd(cls.servers, cls.id(), gain): + g.log.error("Memory leak on brick process.") + return True + + # Check OOM kills on servers for all gluster server processes + ret = check_for_oom_killers_on_servers(cls.servers) + if not ret: + g.log.error('OOM kills present on servers.') + return True + return False + + @classmethod + def check_for_memory_leaks_and_oom_kills_on_clients(cls, gain=30): + """Check for memory leaks and OOM kills on clients + + Kwargs: + gain(float): Accepted amount of leak for a given testcase in MB + (Default:30) + + Returns: + bool: True if memory leaks or OOM kills are observed else false + """ + # imports are added inside function to make it them + # optional and not cause breakage on installation + # which don't use the resource leak library + from glustolibs.io.memory_and_cpu_utils import ( + check_for_memory_leaks_in_glusterfs_fuse, + check_for_oom_killers_on_clients) + + # Check for memory leak on glusterfs fuse process + if check_for_memory_leaks_in_glusterfs_fuse(cls.clients, cls.id(), + gain): + g.log.error("Memory leaks observed on FUSE clients.") + return True + + # Check for oom kills on clients + if check_for_oom_killers_on_clients(cls.clients): + g.log.error("OOM kills present on clients.") + return True + return False + + @classmethod + def check_for_cpu_usage_spikes_on_servers(cls, threshold=3): + """Check for CPU usage spikes on servers + + Kwargs: + threshold(int): Accepted amount of instances of 100% CPU usage + (Default:3) + Returns: + bool: True if CPU spikes are more than threshold else False + """ + # imports are added inside function to make it them + # optional and not cause breakage on installation + # which don't use the resource leak library + from glustolibs.io.memory_and_cpu_utils import ( + check_for_cpu_usage_spikes_on_glusterd, + check_for_cpu_usage_spikes_on_glusterfs, + check_for_cpu_usage_spikes_on_glusterfsd) + + # Check for CPU usage spikes on glusterd + if check_for_cpu_usage_spikes_on_glusterd(cls.servers, cls.id(), + threshold): + g.log.error("CPU usage spikes observed more than threshold " + "on glusterd.") + return True + + # Check for CPU usage spikes on shd + if check_for_cpu_usage_spikes_on_glusterfs(cls.servers, cls.id(), + threshold): + g.log.error("CPU usage spikes observed more than threshold " + "on shd.") + return True + + # Check for CPU usage spikes on brick processes + if check_for_cpu_usage_spikes_on_glusterfsd(cls.servers, cls.id(), + threshold): + g.log.error("CPU usage spikes observed more than threshold " + "on shd.") + return True + return False + + @classmethod + def check_for_cpu_spikes_on_clients(cls, threshold=3): + """Check for CPU usage spikes on clients + + Kwargs: + threshold(int): Accepted amount of instances of 100% CPU usage + (Default:3) + Returns: + bool: True if CPU spikes are more than threshold else False + """ + # imports are added inside function to make it them + # optional and not cause breakage on installation + # which don't use the resource leak library + from glustolibs.io.memory_and_cpu_utils import ( + check_for_cpu_usage_spikes_on_glusterfs_fuse) + + ret = check_for_cpu_usage_spikes_on_glusterfs_fuse(cls.clients, + cls.id(), + threshold) + return ret diff --git a/glustolibs-io/glustolibs/io/memory_and_cpu_utils.py b/glustolibs-io/glustolibs/io/memory_and_cpu_utils.py new file mode 100644 index 000000000..3d105bf5e --- /dev/null +++ b/glustolibs-io/glustolibs/io/memory_and_cpu_utils.py @@ -0,0 +1,899 @@ +# Copyright (C) 2020 Red Hat, Inc. <http://www.redhat.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from glusto.core import Glusto as g + +from glustolibs.gluster.volume_ops import get_volume_status +from glustolibs.gluster.glusterfile import file_exists +from glustolibs.misc.misc_libs import upload_scripts, kill_process + +import numpy as np +import pandas as pd +from statistics import mean, median + + +def check_upload_memory_and_cpu_logger_script(servers): + """Check and upload memory_and_cpu_logger.py to servers if not present + + Args: + servers(list): List of all servers where script has to be uploaded + + Returns: + bool: True if script is uploaded successfully else false + """ + script = "/usr/share/glustolibs/io/scripts/memory_and_cpu_logger.py" + is_present = [] + for server in servers: + if not file_exists(server, script): + if not upload_scripts(server, script): + g.log.error("Unable to upload memory_and_cpu_logger.py on %s", + server) + is_present.append(False) + else: + is_present.append(True) + return all(is_present) + + +def _start_logging_processes(process, servers, test_name, interval, count): + """Start logging processes on all nodes for a given process + + Args: + servers(list): Servers on which CPU and memory usage has to be logged + test_name(str): Name of testcase for which logs are to be collected + interval(int): Time interval after which logs are to be collected + count(int): Number of samples to be captured + + Returns: + list: A list of logging processes + """ + cmd = ("/usr/bin/env python " + "/usr/share/glustolibs/io/scripts/memory_and_cpu_logger.py" + " -p %s -t %s -i %d -c %d" % (process, test_name, + interval, count)) + logging_process = [] + for server in servers: + proc = g.run_async(server, cmd) + logging_process.append(proc) + return logging_process + + +def log_memory_and_cpu_usage_on_servers(servers, test_name, interval=60, + count=100): + """Log memory and CPU usage of gluster server processes + + Args: + servers(list): Servers on which CPU and memory usage has to be logged + test_name(str): Name of the testcase for which logs are to be collected + + Kwargs: + interval(int): Time interval after which logs are to be collected + (Default:60) + count(int): Number of samples to be captured (Default:100) + + Returns: + dict: Logging processes dict for all gluster server processes + """ + logging_process_dict = {} + for proc_name in ('glusterd', 'glusterfs', 'glusterfsd'): + logging_procs = _start_logging_processes( + proc_name, servers, test_name, interval, count) + logging_process_dict[proc_name] = logging_procs + return logging_process_dict + + +def log_memory_and_cpu_usage_on_clients(servers, test_name, interval=60, + count=100): + """Log memory and CPU usage of gluster client processes + + Args: + servers(list): Clients on which CPU and memory usage has to be logged + test_name(str): Name of testcase for which logs are to be collected + + Kwargs: + interval(int): Time interval after which logs are to be collected + (Defaults:60) + count(int): Number of samples to be captured (Default:100) + + Returns: + dict: Logging processes dict for all gluster client processes + """ + logging_process_dict = {} + logging_procs = _start_logging_processes( + 'glusterfs', servers, test_name, interval, count) + logging_process_dict['glusterfs'] = logging_procs + return logging_process_dict + + +def log_memory_and_cpu_usage_on_cluster(servers, clients, test_name, + interval=60, count=100): + """Log memory and CPU usage on gluster cluster + + Args: + servers(list): Servers on which memory and CPU usage is to be logged + clients(list): Clients on which memory and CPU usage is to be logged + test_name(str): Name of testcase for which logs are to be collected + + Kwargs: + interval(int): Time interval after which logs are to be collected + (Default:60) + count(int): Number of samples to be captured (Default:100) + + Returns: + dict: Logging processes dict for all servers and clients + """ + # Start logging on all servers + server_logging_processes = log_memory_and_cpu_usage_on_servers( + servers, test_name, interval, count) + if not server_logging_processes: + return {} + + # Starting logging on all clients + client_logging_processes = log_memory_and_cpu_usage_on_clients( + clients, test_name, interval, count) + if not client_logging_processes: + return {} + + # Combining dicts + logging_process_dict = {} + for node_type, proc_dict in (('server', server_logging_processes), + ('client', client_logging_processes)): + logging_process_dict[node_type] = {} + for proc in proc_dict: + logging_process_dict[node_type][proc] = ( + proc_dict[proc]) + return logging_process_dict + + +def _process_wait_flag_append(proc, flag): + """Run async communicate and adds true to flag list""" + # If the process is already completed async_communicate() + # throws a ValueError + try: + proc.async_communicate() + flag.append(True) + except ValueError: + flag.append(True) + + +def wait_for_logging_processes_to_stop(proc_dict, cluster=False): + """Wait for all given logging processes to stop + + Args: + proc_dict(dict): Dictionary of all the active logging processes + + Kwargs: + cluster(bool): True if proc_dict is for the entire cluster else False + (Default:False) + + Retruns: + bool: True if processes are completed else False + """ + flag = [] + if cluster: + for sub_dict in proc_dict: + for proc_name in proc_dict[sub_dict]: + for proc in proc_dict[sub_dict][proc_name]: + _process_wait_flag_append(proc, flag) + else: + for proc_name in proc_dict: + for proc in proc_dict[proc_name]: + _process_wait_flag_append(proc, flag) + return all(flag) + + +def kill_all_logging_processes(proc_dict, nodes, cluster=False): + """Kill logging processes on all given nodes + + Args: + proc_dict(dict): Dictonary of all active logging processes + nodes(list): List of nodes where logging has to be stopped + + Kwargs: + cluster(bool): True if proc_dict is for a full cluster else False + (Default:False) + + Retruns: + bool: True if processes are completed else False + """ + # Kill all logging processes + for server in nodes: + if not kill_process(server, process_names='memory_and_cpu_logger.py'): + g.log.error("Unable to kill some of the processes at %s.", server) + + # This will stop the async threads created by run_aysnc() as the proc is + # already killed. + ret = wait_for_logging_processes_to_stop(proc_dict, cluster) + if ret: + return True + return False + + +def create_dataframe_from_csv(node, proc_name, test_name): + """Creates a dataframe from a given process. + + Args: + node(str): Node from which csv is to be picked + proc_name(str): Name of process for which csv is to picked + test_name(str): Name of the testcase for which CSV + + Returns: + dataframe: Pandas dataframe if CSV file exits else None + """ + # Read the csv file generated by memory_and_cpu_logger.py + ret, raw_data, _ = g.run(node, "cat /root/{}.csv" + .format(proc_name)) + if ret: + return None + + # Split the complete dump to individual lines + data = raw_data.split("\r\n") + rows, flag = [], False + for line in data: + values = line.split(',') + if test_name == values[0]: + # Reset rows if it's the second instance + if flag: + rows = [] + flag = True + continue + + # Pick and append values which have complete entry + if flag and len(values) == 4: + rows.append(values) + + # Create a panda dataframe and set the type for columns + dataframe = pd.DataFrame(rows[1:], columns=rows[0]) + conversion_dict = {'Process ID': int, + 'CPU Usage': float, + 'Memory Usage': float} + dataframe = dataframe.astype(conversion_dict) + return dataframe + + +def _get_min_max_mean_median(entrylist): + """Get the mix, max. mean and median of a list + + Args: + entrylist(list): List of values to be used + + Returns: + dict:Result dict generate from list + """ + result = {} + result['Min'] = min(entrylist) + result['Max'] = max(entrylist) + result['Mean'] = mean(entrylist) + result['Median'] = median(entrylist) + return result + + +def _compute_min_max_mean_median(dataframe, data_dict, process, node, + volume=None, brick=None): + """Compute min, max, mean and median for a given process + + Args: + dataframe(panda dataframe): Panda data frame of the csv file + data_dict(dict): data dict to which info is to be added + process(str): Name of process for which data is to be computed + node(str): Node for which min, max, mean and median has to be computed + + Kwargs: + volume(str): Volume name of the volume for which data is to be computed + brick(str): Brick path of the brick for which data is to be computed + """ + if volume and process == 'glusterfs': + # Create subdict inside dict + data_dict[node][process][volume] = {} + for usage in ('CPU Usage', 'Memory Usage'): + # Create usage subdict + data_dict[node][process][volume][usage] = {} + + # Clean data and compute values + cleaned_usage = list(dataframe[usage].dropna()) + out = _get_min_max_mean_median(cleaned_usage) + + # Add values to data_dict + for key in ('Min', 'Max', 'Mean', 'Median'): + data_dict[node][process][volume][usage][key] = out[key] + + if volume and brick and process == 'glusterfsd': + # Create subdict inside dict + data_dict[node][process][volume] = {} + data_dict[node][process][volume][brick] = {} + for usage in ('CPU Usage', 'Memory Usage'): + # Create usage subdict + data_dict[node][process][volume][brick][usage] = {} + + # Clean data and compute values + cleaned_usage = list(dataframe[usage].dropna()) + out = _get_min_max_mean_median(cleaned_usage) + + # Add values to data_dict + for key in ('Min', 'Max', 'Mean', 'Median'): + data_dict[node][process][volume][brick][usage][key] = out[key] + + # Compute CPU Uage and Memory Usage for glusterd + else: + for usage in ('CPU Usage', 'Memory Usage'): + # Create uage subdict + data_dict[node][process][usage] = {} + + # Clean data and compute value + cleaned_usage = list(dataframe[usage].dropna()) + out = _get_min_max_mean_median(cleaned_usage) + + # Add values to data_dict + for key in ('Min', 'Max', 'Mean', 'Median'): + data_dict[node][process][usage][key] = out[key] + + +def compute_data_usage_stats_on_servers(nodes, test_name): + """Compute min, max, mean and median for servers + + Args: + nodes(list): Servers from which data is to be used to compute min, max + , mean, mode and median + test_name(str): Name of testcase for which data has to be processed + + Returns: + dict: dict of min, max, mean and median for a given process + + NOTE: + This function has to be always run before cleanup. + """ + data_dict = {} + for node in nodes: + # Get the volume status on the node + volume_status = get_volume_status(node) + data_dict[node] = {} + for process in ('glusterd', 'glusterfs', 'glusterfsd'): + + # Generate a dataframe from the csv file + dataframe = create_dataframe_from_csv(node, process, test_name) + if not dataframe: + return {} + + data_dict[node][process] = {} + if process == 'glusterd': + # Checking if glusterd is restarted. + if len(set(dataframe['Process ID'])) > 1: + data_dict[node][process]['is_restarted'] = True + else: + data_dict[node][process]['is_restarted'] = False + + # Call function to compute min, max, mean and median + _compute_min_max_mean_median(dataframe, data_dict, process, + node) + continue + + # Map volumes to volume process + for volume in volume_status.keys(): + for proc in volume_status[volume][node].keys(): + if (proc == 'Self-heal Daemon' and process == 'glusterfs'): + # Fetching pid from volume status output and create a + # dataframe with the entries of only that pid + pid = volume_status[volume][node][proc]['pid'] + proc_dataframe = dataframe[ + dataframe['Process ID'] == pid] + + # Call function to compute min, max, mean + # and median + _compute_min_max_mean_median( + proc_dataframe, data_dict, process, node, volume) + + if (proc.count('/') >= 2 and process == 'glusterfsd'): + # Fetching pid from volume status output and create a + # dataframe with the entries of only that pid + pid = volume_status[volume][node][proc]['pid'] + proc_dataframe = dataframe[ + dataframe['Process ID'] == pid] + + # Call function to compute min, max, mean and median + _compute_min_max_mean_median( + proc_dataframe, data_dict, process, node, volume, + proc) + + return data_dict + + +def compute_data_usage_stats_on_clients(nodes, test_name): + """Compute min, max, mean and median for clients + + Args: + nodes(list): Clients from which data is to be used to compute min, max + , mean, mode and median + test_name(str): Name of the testcase for which data has to be processed + + Returns: + dict: dict of min, max, mean and median for a given process + """ + data_dict = {} + for node in nodes: + data_dict[node] = {} + dataframe = create_dataframe_from_csv(node, 'glusterfs', test_name) + if not dataframe: + return {} + + data_dict[node]['glusterfs'] = {} + # Call function to compute min, max, mean and median + _compute_min_max_mean_median(dataframe, data_dict, 'glusterfs', node) + + return data_dict + + +def _perform_three_point_check_for_memory_leak(dataframe, node, process, gain, + volume_status=None, + volume=None): + """Perform three point check + + Args: + dataframe(panda dataframe): Panda dataframe of a given process + node(str): Node on which memory leak has to be checked + process(str): Name of process for which check has to be done + gain(float): Accepted amount of leak for a given testcase in MB + + kwargs: + volume_status(dict): Volume status output on the give name + volumne(str):Name of volume for which 3 point check has to be done + + Returns: + bool: True if memory leak instances are observed else False + """ + # Filter dataframe to be process wise if it's volume specific process + if process in ('glusterfs', 'glusterfsd'): + pid = int(volume_status[volume][node][process]['pid']) + dataframe = dataframe[dataframe['Process ID'] == pid] + + # Compute usage gain throught the data frame + memory_increments = list(dataframe['Memory Usage'].diff().dropna()) + + # Check if usage is more than accepted amount of leak + memory_leak_decision_array = np.where( + dataframe['Memory Usage'].diff().dropna() > gain, True, False) + instances_of_leak = np.where(memory_leak_decision_array)[0] + + # If memory leak instances are present check if it's reduced + count_of_leak_instances = len(instances_of_leak) + if count_of_leak_instances > 0: + g.log.error('There are %s instances of memory leaks on node %s', + count_of_leak_instances, node) + for instance in instances_of_leak: + # In cases of last log file entry the below op could throw + # IndexError which is handled as below. + try: + # Check if memory gain had decrease in the consecutive + # entries, after 2 entry and betwen current and last entry + if all(memory_increments[instance+1] > + memory_increments[instance], + memory_increments[instance+2] > + memory_increments[instance], + (memory_increments[len(memory_increments)-1] > + memory_increments[instance])): + return True + + except IndexError: + # In case of last log file entry rerun the command + # and check for difference + g.log.info('Instance at last log entry.') + if process in ('glusterfs', 'glusterfsd'): + cmd = ("ps u -p %s | awk 'NR>1 && $11~/%s$/{print " + "$6/1024}'" % (pid, process)) + else: + cmd = ("ps u -p `pgrep glusterd` | awk 'NR>1 && $11~/" + "glusterd$/{print $6/1024}'") + ret, out, _ = g.run(node, cmd) + if ret: + g.log.error('Unable to run the command to fetch current ' + 'memory utilization.') + continue + usage_now = float(out.replace('\n', '')[2]) + last_entry = dataframe['Memory Usage'].iloc[-1] + + # Check if current memory usage is higher than last entry + fresh_diff = last_entry - usage_now + if fresh_diff > gain and last_entry > fresh_diff: + return True + return False + + +def check_for_memory_leaks_in_glusterd(nodes, test_name, gain=30.0): + """Check for memory leaks in glusterd + + Args: + nodes(list): Servers on which memory leaks have to be checked + test_name(str): Name of testcase for which memory leaks has to be checked + + Kwargs: + gain(float): Accepted amount of leak for a given testcase in MB + (Default:30) + + Returns: + bool: True if memory leak was obsevred else False + """ + is_there_a_leak = [] + for node in nodes: + dataframe = create_dataframe_from_csv(node, 'glusterd', test_name) + if not dataframe: + return False + + # Call 3 point check function + three_point_check = _perform_three_point_check_for_memory_leak( + dataframe, node, 'glusterd', gain) + if three_point_check: + g.log.error("Memory leak observed on node %s in glusterd", + node) + is_there_a_leak.append(three_point_check) + + return any(is_there_a_leak) + + +def check_for_memory_leaks_in_glusterfs(nodes, test_name, gain=30.0): + """Check for memory leaks in glusterfs + + Args: + nodes(list): Servers on which memory leaks have to be checked + test_name(str): Name of testcase for which memory leaks has to be checked + + Kwargs: + gain(float): Accepted amount of leak for a given testcase in MB + (Default:30) + + Returns: + bool: True if memory leak was obsevred else False + + NOTE: + This function should be executed with the volumes present on the cluster + """ + is_there_a_leak = [] + for node in nodes: + # Get the volume status on the node + volume_status = get_volume_status(node) + dataframe = create_dataframe_from_csv(node, 'glusterfs', test_name) + if not dataframe: + return False + + for volume in volume_status.keys(): + for process in volume_status[volume][node].keys(): + # Skiping if process isn't Self-heal Deamon + if process != 'Self-heal Daemon': + continue + + # Call 3 point check function + three_point_check = _perform_three_point_check_for_memory_leak( + dataframe, node, 'glusterfs', gain, volume_status, volume) + if three_point_check: + g.log.error("Memory leak observed on node %s in shd " + "on volume %s", node, volume) + is_there_a_leak.append(three_point_check) + + return any(is_there_a_leak) + + +def check_for_memory_leaks_in_glusterfsd(nodes, test_name, gain=30.0): + """Check for memory leaks in glusterfsd + + Args: + nodes(list): Servers on which memory leaks have to be checked + test_name(str): Name of testcase for which memory leaks has to be checked + + Kwargs: + gain(float): Accepted amount of leak for a given testcase in MB + (Default:30) + + Returns: + bool: True if memory leak was obsevred else False + + NOTE: + This function should be executed with the volumes present on the cluster. + """ + is_there_a_leak = [] + for node in nodes: + # Get the volume status on the node + volume_status = get_volume_status(node) + dataframe = create_dataframe_from_csv(node, 'glusterfsd', test_name) + if not dataframe: + return False + + for volume in volume_status.keys(): + for process in volume_status[volume][node].keys(): + # Skiping if process isn't brick process + if not process.count('/'): + continue + + # Call 3 point check function + three_point_check = _perform_three_point_check_for_memory_leak( + dataframe, node, 'glusterfsd', gain, volume_status, volume) + if three_point_check: + g.log.error("Memory leak observed on node %s in brick " + " process for brick %s on volume %s", node, + process, volume) + is_there_a_leak.append(three_point_check) + + return any(is_there_a_leak) + + +def check_for_memory_leaks_in_glusterfs_fuse(nodes, test_name, gain=30.0): + """Check for memory leaks in glusterfs fuse + + Args: + nodes(list): Servers on which memory leaks have to be checked + test_name(str): Name of testcase for which memory leaks has to be checked + + Kwargs: + gain(float): Accepted amount of leak for a given testcase in MB + (Default:30) + + Returns: + bool: True if memory leak was obsevred else False + + NOTE: + This function should be executed when the volume is still mounted. + """ + is_there_a_leak = [] + for node in nodes: + # Get the volume status on the node + dataframe = create_dataframe_from_csv(node, 'glusterfs', test_name) + if not dataframe: + return False + + # Call 3 point check function + three_point_check = _perform_three_point_check_for_memory_leak( + dataframe, node, 'glusterfs', gain) + if three_point_check: + g.log.error("Memory leak observed on node %s for client", + node) + is_there_a_leak.append(three_point_check) + + return any(is_there_a_leak) + + +def _check_for_oom_killers(nodes, process, oom_killer_list): + """Checks for OOM killers for a specific process + + Args: + nodes(list): Nodes on which OOM killers have to be checked + process(str): Process for which OOM killers have to be checked + oom_killer_list(list): A list in which the presence of + OOM killer has to be noted + """ + cmd = ("grep -i 'killed process' /var/log/messages* " + "| grep -w '{}'".format(process)) + ret = g.run_parallel(nodes, cmd) + for key in ret.keys(): + ret, out, _ = ret[key] + if not ret: + g.log.error('OOM killer observed on %s for %s', key, process) + g.log.error(out) + oom_killer_list.append(True) + else: + oom_killer_list.append(False) + + +def check_for_oom_killers_on_servers(nodes): + """Check for OOM killers on servers + + Args: + nodes(list): Servers on which OOM kills have to be checked + + Returns: + bool: True if OOM killers are present on any server else False + """ + oom_killer_list = [] + for process in ('glusterfs', 'glusterfsd', 'glusterd'): + _check_for_oom_killers(nodes, process, oom_killer_list) + return any(oom_killer_list) + + +def check_for_oom_killers_on_clients(nodes): + """Check for OOM killers on clients + + Args: + nodes(list): Clients on which OOM kills have to be checked + + Returns: + bool: True if OOM killers are present on any client else false + """ + oom_killer_list = [] + _check_for_oom_killers(nodes, 'glusterfs', oom_killer_list) + return any(oom_killer_list) + + +def _check_for_cpu_usage_spikes(dataframe, node, process, threshold, + volume_status=None, volume=None): + """Check for cpu spikes for a given process + + Args: + dataframe(panda dataframe): Panda dataframe of a given process + node(str): Node on which cpu spikes has to be checked + process(str): Name of process for which check has to be done + threshold(int): Accepted amount of 100% CPU usage instances + + kwargs: + volume_status(dict): Volume status output on the give name + volume(str):Name of volume for which check has to be done + + Returns: + bool: True if number of instances more than threshold else False + """ + # Filter dataframe to be process wise if it's volume specific process + if process in ('glusterfs', 'glusterfsd'): + pid = int(volume_status[volume][node][process]['pid']) + dataframe = dataframe[dataframe['Process ID'] == pid] + + # Check if usage is more than accepted amount of leak + cpu_spike_decision_array = np.where( + dataframe['CPU Usage'].dropna() == 100.0, True, False) + instances_of_spikes = np.where(cpu_spike_decision_array)[0] + + return bool(len(instances_of_spikes) > threshold) + + +def check_for_cpu_usage_spikes_on_glusterd(nodes, test_name, threshold=3): + """Check for CPU usage spikes on glusterd + + Args: + nodes(list): Servers on which memory leaks have to be checked + test_name(str): Name of testcase for which memory leaks has to be checked + + Kwargs: + threshold(int): Accepted amount of instances of 100% CPU usage + (Default:3) + + Returns: + bool: True if CPU spikes are more than threshold else False + """ + is_there_a_spike = [] + for node in nodes: + dataframe = create_dataframe_from_csv(node, 'glusterd', test_name) + if not dataframe: + return False + + # Call function to check for cpu spikes + cpu_spikes = _check_for_cpu_usage_spikes( + dataframe, node, 'glusterd', threshold) + if cpu_spikes: + g.log.error("CPU usage spikes observed more than " + "threshold %d on node %s for glusterd", + threshold, node) + is_there_a_spike.append(cpu_spikes) + + return any(is_there_a_spike) + + +def check_for_cpu_usage_spikes_on_glusterfs(nodes, test_name, threshold=3): + """Check for CPU usage spikes on glusterfs + + Args: + nodes(list): Servers on which memory leaks have to be checked + test_name(str): Name of testcase for which memory leaks has to be checked + + Kwargs: + threshold(int): Accepted amount of instances of 100% CPU usage + (Default:3) + + Returns: + bool: True if CPU spikes are more than threshold else False + + NOTE: + This function should be exuected with the volumes present on the cluster. + """ + is_there_a_spike = [] + for node in nodes: + # Get the volume status on the node + volume_status = get_volume_status(node) + dataframe = create_dataframe_from_csv(node, 'glusterfs', test_name) + if not dataframe: + return False + + for volume in volume_status.keys(): + for process in volume_status[volume][node].keys(): + # Skiping if process isn't Self-heal Deamon + if process != 'Self-heal Daemon': + continue + + # Call function to check for cpu spikes + cpu_spikes = _check_for_cpu_usage_spikes( + dataframe, node, 'glusterfs', threshold, volume_status, + volume) + if cpu_spikes: + g.log.error("CPU usage spikes observed more than " + "threshold %d on node %s on volume %s for shd", + threshold, node, volume) + is_there_a_spike.append(cpu_spikes) + + return any(is_there_a_spike) + + +def check_for_cpu_usage_spikes_on_glusterfsd(nodes, test_name, threshold=3): + """Check for CPU usage spikes in glusterfsd + + Args: + nodes(list): Servers on which memory leaks have to be checked + test_name(str): Name of testcase for which memory leaks has to be checked + + Kwargs: + threshold(int): Accepted amount of instances of 100% CPU usage + (Default:3) + + Returns: + bool: True if CPU spikes are more than threshold else False + + NOTE: + This function should be exuected with the volumes present on the cluster. + """ + is_there_a_spike = [] + for node in nodes: + # Get the volume status on the node + volume_status = get_volume_status(node) + dataframe = create_dataframe_from_csv(node, 'glusterfsd', test_name) + if not dataframe: + return False + + for volume in volume_status.keys(): + for process in volume_status[volume][node].keys(): + # Skiping if process isn't brick process + if process in ('Self-heal Daemon', 'Quota Daemon'): + continue + + # Call function to check for cpu spikes + cpu_spikes = _check_for_cpu_usage_spikes( + dataframe, node, 'glusterfsd', threshold, volume_status, + volume) + if cpu_spikes: + g.log.error("CPU usage spikes observed more than " + "threshold %d on node %s on volume %s for " + "brick process %s", + threshold, node, volume, process) + is_there_a_spike.append(cpu_spikes) + + return any(is_there_a_spike) + + +def check_for_cpu_usage_spikes_on_glusterfs_fuse(nodes, test_name, + threshold=3): + """Check for CPU usage spikes on glusterfs fuse + + Args: + nodes(list): Servers on which memory leaks have to be checked + test_name(str): Name of testcase for which memory leaks has to be checked + + Kwargs: + threshold(int): Accepted amount of instances of 100% CPU usage + (Default:3) + + Returns: + bool: True if CPU spikes are more than threshold else False + + NOTE: + This function should be executed when the volume is still mounted. + """ + is_there_a_spike = [] + for node in nodes: + # Get the volume status on the node + dataframe = create_dataframe_from_csv(node, 'glusterfs', test_name) + if not dataframe: + return False + + # Call function to check for cpu spikes + cpu_spikes = _check_for_cpu_usage_spikes( + dataframe, node, 'glusterfs', threshold) + if cpu_spikes: + g.log.error("CPU usage spikes observed more than " + "threshold %d on node %s for client", + threshold, node) + is_there_a_spike.append(cpu_spikes) + + return any(is_there_a_spike) |