diff --git a/workspace_tools/singletest.py b/workspace_tools/singletest.py index 73d741c342..b8d7a6757f 100644 --- a/workspace_tools/singletest.py +++ b/workspace_tools/singletest.py @@ -43,6 +43,7 @@ import sys import json import optparse import pprint +import re from prettytable import PrettyTable from serial import Serial @@ -70,59 +71,23 @@ from workspace_tools.settings import MUTs class SingleTestRunner(): """ Object wrapper for single test run which may involve multiple MUTs.""" + + re_detect_testcase_result = None + + TEST_RESULT_UNDEF = "UNDEF" + + # mbed test suite -> SingleTestRunner + TEST_RESULT_MAPPING = {"success" : "OK", + "failure" : "FAIL", + "error" : "ERROR", + "end" : TEST_RESULT_UNDEF} + def __init__(self): - pass + PATTERN = "\{(" + "|".join(self.TEST_RESULT_MAPPING.keys()) + ")\}" + self.re_detect_testcase_result = re.compile(PATTERN) - def reset(self, mcu_name, serial, - verbose=False, sleep_before_reset=0, sleep_after_reset=0): - """ - Functions resets target using various methods (e.g. serial break) - depending on target type. - """ - if sleep_before_reset > 0: - sleep(sleep_before_reset) - if verbose: - verbose_msg = "Reset::cmd(sendBreak)" - # Reset type decision - if mcu_name.startswith('NRF51822'): # Nordic - call(["nrfjprog", "-r"]) - verbose_msg = "Reset::cmd(nrfjprog)" - elif mcu_name.startswith('NUCLEO'): # ST NUCLEO - call(["ST-LINK_CLI.exe", "-Rst"]) - verbose_msg = "Reset::cmd(ST-LINK_CLI.exe)" - else: - serial.sendBreak() - - if sleep_before_reset > 0: - sleep(sleep_after_reset) - if verbose: - print verbose_msg - - def flush_serial(self, serial): - """ Flushing serial in/out. """ - serial.flushInput() - serial.flushOutput() - - def is_peripherals_available(self, target, peripherals=None): - """ Checks if specified target should run specific peripheral test case.""" - if peripherals is not None: - peripherals = set(peripherals) - - for id, mut in MUTs.iteritems(): - # Target check - if mut["mcu"] != target: - continue - # Peripherals check - if peripherals is not None: - if 'peripherals' not in mut: - continue - if not peripherals.issubset(set(mut['peripherals'])): - continue - return True - return False - - def run_host_test(self, name, target_name, disk, port, - duration, extra_serial, verbose=False): + def run_host_test(self, target_name, port, + duration, verbose=False): """ Functions resets target and grabs by timeouted pooling test log via serial port. @@ -130,24 +95,23 @@ class SingleTestRunner(): """ output = "" # Prepare serial for receiving data from target - baud = 9600 + BAUD = 9600 serial = Serial(port, timeout=1) - serial.setBaudrate(baud) - self.flush_serial(serial) + serial.setBaudrate(BAUD) + flush_serial(serial) # Resetting target and pooling - self.reset(target_name, serial, verbose=verbose) - start = time() + reset(target_name, serial, verbose=verbose) + start_serial_timeour = time() try: - while (time() - start) < duration: + while (time() - start_serial_timeour) < duration: test_output = serial.read(512) output += test_output - self.flush_serial(serial) + flush_serial(serial) if '{end}' in output: break - except KeyboardInterrupt, _: print "CTRL+C break" - self.flush_serial(serial) + flush_serial(serial) serial.close() # Handle verbose mode @@ -155,34 +119,16 @@ class SingleTestRunner(): print "Test::Output::Start" print output print "Test::Output::Finish" - + # Parse test 'output' data - result = "UNDEF" + result = self.TEST_RESULT_UNDEF for line in output.splitlines(): - if '{success}' in line: - result = "OK" - if '{failure}' in line: - result = "FAIL" - if '{error}' in line: - result = "ERROR" - if '{end}' in line: + m = self.re_detect_testcase_result.search(line) + if m and len(m.groups()): + result = self.TEST_RESULT_MAPPING[m.groups(0)[0]] break return result - def print_test_result(self, test_result, target_name, toolchain_name, - test_id, test_description, elapsed_time, duration): - """ Use specific convention to pront test result and related data.""" - tokens = [] - tokens.append("TargetTest") - tokens.append(target_name) - tokens.append(toolchain_name) - tokens.append(test_id) - tokens.append(test_description) - separator = "::" - time_info = " in %d of %d sec" % (elapsed_time, duration) - result = separator.join(tokens) + " [" + test_result +"]" + time_info - return result - def handle(self, test_spec, target_name, toolchain_name): """ Function determines MUT's mbed disk/port and copies binary to @@ -238,14 +184,78 @@ class SingleTestRunner(): # Host test execution start = time() - test_result = self.run_host_test(test.host_test, target_name, disk, port, duration, extra_serial) + test_result = self.run_host_test(target_name, port, duration) elapsed_time = time() - start - print self.print_test_result(test_result, target_name, toolchain_name, - test_id, test_description, elapsed_time, duration) + print print_test_result(test_result, target_name, toolchain_name, + test_id, test_description, elapsed_time, duration) return (test_result, target_name, toolchain_name, test_id, test_description, round(elapsed_time, 2), duration) +def flush_serial(serial): + """ Flushing serial in/out. """ + serial.flushInput() + serial.flushOutput() + + +def reset(mcu_name, serial, verbose=False, sleep_before_reset=0, sleep_after_reset=0): + """ + Functions resets target using various methods (e.g. serial break) + depending on target type. + """ + if sleep_before_reset > 0: + sleep(sleep_before_reset) + if verbose: + verbose_msg = "Reset::cmd(sendBreak)" + # Reset type decision + if mcu_name.startswith('NRF51822'): # Nordic + call(["nrfjprog", "-r"]) + verbose_msg = "Reset::cmd(nrfjprog)" + elif mcu_name.startswith('NUCLEO'): # ST NUCLEO + call(["ST-LINK_CLI.exe", "-Rst"]) + verbose_msg = "Reset::cmd(ST-LINK_CLI.exe)" + else: + serial.sendBreak() + + if sleep_before_reset > 0: + sleep(sleep_after_reset) + if verbose: + print verbose_msg + + +def is_peripherals_available(target, peripherals=None): + """ Checks if specified target should run specific peripheral test case.""" + if peripherals is not None: + peripherals = set(peripherals) + for id, mut in MUTs.iteritems(): + # Target check + if mut["mcu"] != target: + continue + # Peripherals check + if peripherals is not None: + if 'peripherals' not in mut: + continue + if not peripherals.issubset(set(mut['peripherals'])): + continue + return True + return False + + +def print_test_result(test_result, target_name, toolchain_name, + test_id, test_description, elapsed_time, duration): + """ Use specific convention to pront test result and related data.""" + tokens = [] + tokens.append("TargetTest") + tokens.append(target_name) + tokens.append(toolchain_name) + tokens.append(test_id) + tokens.append(test_description) + separator = "::" + time_info = " in %d of %d sec" % (elapsed_time, duration) + result = separator.join(tokens) + " [" + test_result +"]" + time_info + return result + + def shape_test_request(mcu, image_path, test_id, duration=10): """ Function prepares JOSN structure describing test specification.""" test_spec = { @@ -299,7 +309,9 @@ if __name__ == '__main__': action="store_true", help='Verbose mode (pronts some extra information)') - parser.epilog="Example: singletest.py -i test_spec.json" + parser.description = """This script allows you to run mbed defined test cases for particular MCU(s) and corresponding toolchain(s).""" + parser.epilog = """Example: singletest.py -i test_spec.json [-M muts_all.json]""" + (opts, args) = parser.parse_args() # Open file with test specification @@ -310,12 +322,13 @@ if __name__ == '__main__': parser.print_help() exit(-1) + # Get extra MUTs if applicable if opts.muts_spec_filename: MUTs = get_json_data_from_file(opts.muts_spec_filename, opts.verbose) if MUTs is None: parser.print_help() exit(-1) - + # Magic happens here... ;) start = time() single_test = SingleTestRunner() @@ -340,7 +353,7 @@ if __name__ == '__main__': continue if test.automated and test.is_supported(target, toolchain): - if not single_test.is_peripherals_available(target, test.peripherals): + if not is_peripherals_available(target, test.peripherals): if opts.verbose: print "TargetTest::%s::TestSkipped(%s)" % (target, ",".join(test.peripherals)) continue