"""hostutils module contains wrappers for commands that can be executed on any host in the test environment Supported Wrappers: ------------------- *) rmdir *) mkdir *) mkfs *) execute_command """ import re import os import atfutils from collections import namedtuple from atfglobals import GlobalObj from atfutils import commands system_dirs = re.compile('(/bin|/boot|/dev|/etc|/lib|/mnt|/net|/opt|/root|/sbin|/usr|/var|/sys)\/?$') def rmdir(hostkey, dirpath): """ """ logger = GlobalObj.getLoggerObj() output = atfutils.get_new_output_obj() if system_dirs.match(dirpath): logger.error("System Directiories cannot be deleted") output["exitstatus"] = 1 return output command = ' '.join([commands['unlink_dir_force'], dirpath]) output = execute_command(hostkey, command) return output def mkdir(hostkey, dirpath): """ """ logger = GlobalObj.getLoggerObj() output = atfutils.get_new_output_obj() if system_dirs.match(dirpath): logger.error("System Directiories cannot be created") output["exitstatus"] = 1 return output command = ' '.join([commands['mkdir'], dirpath]) output = execute_command(hostkey, command) return output def mkfs(hostkey, device, fstype, options=None): """ """ command = [commands['mkfs']] # mkfs type if fstype is None: fstype = "xfs" command.extend(["-t", fstype]) # mkfs options if specified if options is not None: command.extend(["-o", options]) # mkfs device command.extend(["-f", device]) command = ' '.join(command) output = execute_command(hostkey, command) return output def mount(hostkey, device, fstype, mountpoint, options=None): """ """ command = [commands['mount']] # mount type if fstype is None: fstype = "xfs" command.extend(["-t", fstype]) # mount options if options is not None: command.extend(["-o", options]) # mount device, mountpoint command.extend([device, mountpoint]) command = ' '.join(command) output = execute_command(hostkey, command) return output def umount(hostkey, mountpoint): """ """ command = ' '.join([commands['umount'], mountpoint]) output = execute_command(hostkey, command) return output def umount_device(hostkey, device): """ """ all_outputs = {} umount_device_output = atfutils.get_new_output_obj() output = find_mountpoints(hostkey, device) assert_success_status = atfutils.assert_success(output["exitstatus"]) if assert_success_status is not 0: return output else: mountpoints = output["stdoutdata"] for mountpoint in mountpoints: output = umount(hostkey, mountpoint) assert_success_status = atfutils.assert_success(output["exitstatus"]) if assert_success_status is not 0: stdoutdata = str(output["stdoutdata"]) if ((stdoutdata.rfind("not found")) or (stdoutdata.rfind("not mount"))): output["exitstatus"] = 0 all_outputs[mountpoint] = output assert_success_status = atfutils.assert_success_of_outputs(all_outputs) if assert_success_status is not 0: umount_device_output["exitstatus"] = 1 umount_device_output["stderrdata"] = "Unable to unmount device %s" % (device) else: umount_device_output["exitstatus"] = 0 umount_device_output["stdoutdata"] = "Successfully able to unmount device %s" % (device) return umount_device_output def md5sum(hostkey, path): """ """ base_command = "rm -rf %s ; arequal-checksum %s" landfill_dir = os.path.join(path, ".landfill") command = base_command % (landfill_dir, path) output = execute_command(hostkey, command) if output['stdoutdata'] is None or (not output['stdoutdata']): output['stdoutdata'] = "" else: output['stdoutdata'] = str(output['stdoutdata']) return output def find_mountpoints(hostkey, device): """ """ base_command = "mount | grep" mountpoints = [] command = ' '.join([base_command, device]) output = execute_command(hostkey, command) assert_success_status = atfutils.assert_success(output['exitstatus']) if assert_success_status is 0: if output["stdoutdata"]: for data in output["stdoutdata"]: mountpoints.append(data.split(" ")[2]) output["stdoutdata"] = mountpoints return output def execute_command(hostkey, command, commandInput=None): """ """ logger = GlobalObj.getLoggerObj() output = atfutils.get_new_output_obj() new_command = _substitute_value_for_variables(command) cm = GlobalObj.getConnectionsManagerObj() host_connection = cm.getConnection(hostkey) if not host_connection: logger.error("SSH Connection Not established to host '%s' " % hostkey) output["exitstatus"] = 1 return output env = GlobalObj.getTestenvObj() host_obj = env.getHost(hostkey) if host_obj is None: logger.error("Invalid Host. %s is not defined in TestEnvironment" % hostkey) output["exitstatus"] = 1 return output hostname = host_obj.hostname logger.debug('%s: Executing Command: %s' % (hostname, command)) output = host_connection.executecommand(new_command, commandInput) atfutils.print_stdout(output['stdoutdata']) atfutils.print_stderr(output['stderrdata']) return output def _substitute_value_for_variables(command): """ """ logger = GlobalObj.getLoggerObj() env = GlobalObj.getTestenvObj() new_command = command pattern_for_variables = re.compile("<[a-z]+\d*\.[a-z]*>") name_to_functions_mapping = { "export" : "getExportdir", "server" : "getServer", "brick" : "getBrick", "volume" : "getVolume", "client" : "getClient", "mountdevice" : "getMountDevice", "mount" : "getMount"} name_pattern = re.compile('(export|server|brick|volume|client|mountdevice|mount)*') variable_to_replace_named_tuple = namedtuple("Vars", ["actual_var", "name", "option"]) variables_to_replace = [] variables = pattern_for_variables.findall(command) if not variables: return new_command else: active_volume = env.getActiveVolume() for variable in variables: if variable not in variables_to_replace: name, option = (variable.strip("<>")).split(".") variables_to_replace.append(variable_to_replace_named_tuple(variable, name, option)) for variable in variables_to_replace: actual_var = variable.actual_var name = variable.name option = variable.option matched_obj = name_pattern.match(name.lower()) result = matched_obj.group(0) if result is '': continue else: funcname = name_to_functions_mapping[result] function = getattr(env, funcname) obj_value = function(name) if obj_value is None: continue else: try: option_value = obj_value.__getattribute__(option) except AttributeError: logger.error("Attribute Error: %s object has no attribute '%s'" % (name, option)) continue else: actual_var_pattern = re.compile(actual_var) new_command = actual_var_pattern.sub(option_value, new_command) variables = pattern_for_variables.findall(command) if not variables: new_command = "" return new_command return new_command