""" Parser module contains parsers for parsing: *) TestrunInfo file *) TestEnvironment file *) Testcaselist file. """ import ConfigParser import re from collections import OrderedDict import os from atfglobals import GlobalObj def verify_necessary_options(cp, section, necessary_options): """Helper function for all parsers to verify necessary options in a section of the config file. Parameters: cp : Config Parser Object section: name of the section in ConfigFile necessary_options: Options necessary under Section 'section' Returns: Success: True (if all necessary_options are found in 'section') Failure: False ( if any of the necessaty_options not found in 'section') """ logger = GlobalObj.getLoggerObj() all_options_found = True items = dict(cp.items(section)) for option in necessary_options: if not (items.has_key(option) and items[option]): logger.error("' %s ' Should be defined in Section: %s" % (option, section)) all_options_found = False return all_options_found def parse_testrun_info_file(filename): """ Parse TestrunInfo File """ GlobalObj.initTestrunInfoObj() testruninfo_obj = GlobalObj.getTestrunInfoObj() cp = ConfigParser.SafeConfigParser() necessary_sections = ["keywords", "testunits", "glusterversion"] matched_sections = [] unmatched_sections = [] logger = GlobalObj.getLoggerObj() if not cp.read(filename): logger.error("Error reading file ' %s '.File Not found " % filename) return 1 else: available_sections = cp.sections() found_all_sections = True for section in necessary_sections: matched_obj = re.search(section, str(available_sections), re.IGNORECASE) if not matched_obj: found_all_sections = False unmatched_sections.append(section) else: matched_sections.append(matched_obj.group(0)) if not found_all_sections: for section in unmatched_sections: logger.error("Section %s Not Found" % section) logger.error("Please define the above sections in TestRunInfo File") return 1 else: for section in matched_sections: Map = {} if re.match("keywords", section, re.IGNORECASE): Map = dict(cp.items(section)) testruninfo_obj.addKeywords(Map['keywords']) elif re.match("testunits", section, re.IGNORECASE): Map = dict(cp.items(section)) testunits = Map.values() for testunit in testunits: if testunit: testruninfo_obj.addTestUnits(testunit) elif re.match("glusterversion", section, re.IGNORECASE): Map = dict(cp.items(section)) glusterversion = Map['version'] if not glusterversion: logger.error("version option not defined in GlusterVersion. " + \ "The 'version' option should be defined") return 1 else: testruninfo_obj.addGlusterVersion(glusterversion) return 0 def parse_testcaseslist_file(filename): """ Parse TestCasesList file """ return_status = 1 testcaseslist = [] logger = GlobalObj.getLoggerObj() if not os.path.exists(filename): logger.error("%s file not found." % filename) return return_status testruninfo_obj = GlobalObj.getTestrunInfoObj() glusterversion = testruninfo_obj.getGlusterVersion() testcaseslistfile = open(filename, "r") filedata = testcaseslistfile.readlines() testcaseslist = [] for data in filedata: if (not data.strip() or re.match("^#", data)): continue else: testcaseid, version, keyword = (value.strip() for value in data.split(':')) testcaseslist.append({'testcaseid':testcaseid, 'version':version, 'keyword':keyword}) selected_testcases = [] if glusterversion == "master": for testcase in testcaseslist: selected_testcases.append(testcase['testcaseid']) return selected_testcases else: for testcase in testcaseslist: if re.match("^(<|>|=|<=|>=)", testcase['version']): condition, version = (x.strip() for x in testcase['version'].split()) if ((condition == "<=" and glusterversion <= version) or (condition == ">=" and glusterversion >= version) or (condition == "<" and glusterversion < version) or (condition == ">" and glusterversion > version) or (condition == "=" and glusterversion == version)): selected_testcases.append(testcase['testcaseid']) elif re.search("-", testcase['version']): fromversion, toversion = (x.strip() for x in testcase['version'].split("-")) if (glusterversion >= fromversion and glusterversion <= toversion): selected_testcases.append(testcase['testcaseid']) return selected_testcases def parse_testenv_configfile(filename): """ parse testenv.cfg file """ GlobalObj.initTestenvObj() env = GlobalObj.getTestenvObj() cp = ConfigParser.SafeConfigParser(dict_type=OrderedDict) sections_to_functions_mapping = { "export" : "addExportdir", "server" : "addServer", "brick" : "addBrick", "volume" : "addVolume", "client" : "addClient", "mountdevice" : "addMountDevice", "mount" : "addMount", "defaults" : "addDefaults"} section_pattern = re.compile('(export|server|brick|volume|client|mountdevice|mount)*') logger = GlobalObj.getLoggerObj() if not cp.read(filename): logger.error("Error reading file ' %s '.File Not found " % filename) return 1 else: defaults = dict(cp.defaults()) function = getattr(env, "addDefaults") function(**defaults) sections = cp.sections() for section in sections: items = dict(cp.items(section)) matched_obj = section_pattern.match((section.lower())) result = matched_obj.group(0) if result is '': continue else: funcname = sections_to_functions_mapping[result] function = getattr(env, funcname) necessary_options = [] if re.match('export', result): necessary_options = ["dir"] if verify_necessary_options(cp, section, necessary_options): dir_ = items.pop('dir') function(section, dir_, **items) else: return 1 elif re.match('server', result): necessary_options = ["hostname", "user", "password", "glusterversion"] if verify_necessary_options(cp, section, necessary_options): hostname = items.pop('hostname') user = items.pop('user') password = items.pop('password') glusterversion = items.pop('glusterversion') function(section, hostname, user, password, glusterversion, **items) else: return 1 elif re.match('client', result): necessary_options = ["hostname", "user", "password", "glusterversion"] if verify_necessary_options(cp, section, necessary_options): hostname = items.pop('hostname') user = items.pop('user') password = items.pop('password') glusterversion = items.pop('glusterversion') function(section, hostname, user, password, glusterversion, **items) else: return 1 elif re.match('brick', result): necessary_options = ["hostname", "path"] if verify_necessary_options(cp, section, necessary_options): hostname = items.pop('hostname') path = items.pop('path') function(section, hostname, path, **items) else: return 1 elif re.match('volume', result): necessary_options = ["volumename", "bricks"] if verify_necessary_options(cp, section, necessary_options): volumename = items.pop('volumename') bricks = items.pop('bricks') if (items.has_key('replica') and items['replica']): replica = items.pop('replica') else: replica = None if (items.has_key('stripe') and items['stripe']): stripe = items.pop('stripe') else: stripe = None if (items.has_key('transporttype') and items['transporttype']): transporttype = items.pop('transporttype') else: transporttype = None function(section, volumename, replica, stripe, transporttype, bricks) else: return 1 elif re.match('mountdevice', result): necessary_options = ["hostname", "volumename"] if verify_necessary_options(cp, section, necessary_options): hostname = items.pop('hostname') volumename = items.pop('volumename') function(section, hostname, volumename) elif re.match('mount', result): necessary_options = ["client", "dir", "device"] if verify_necessary_options(cp, section, necessary_options): client = items.pop('client') dir_ = items.pop('dir') device = items.pop('device') function(section, client, dir_, device, **items) else: return 1 return 0 __all__ = ['parse_testrun_info_file', 'parse_testcaseslist_file', 'parse_testenv_configfile']