FIxed few minor pylint warnings and suggestions

pull/196/head
Przemek Wirkus 2014-02-27 16:47:53 +00:00
parent 415577b393
commit 28d1e5e381
1 changed files with 102 additions and 89 deletions

View File

@ -43,6 +43,7 @@ import sys
import json
import optparse
import pprint
import re
from prettytable import PrettyTable
from serial import Serial
@ -70,59 +71,23 @@ from workspace_tools.settings import MUTs
class SingleTestRunner():
""" Object wrapper for single test run which may involve multiple MUTs."""
re_detect_testcase_result = None
TEST_RESULT_UNDEF = "UNDEF"
# mbed test suite -> SingleTestRunner
TEST_RESULT_MAPPING = {"success" : "OK",
"failure" : "FAIL",
"error" : "ERROR",
"end" : TEST_RESULT_UNDEF}
def __init__(self):
pass
PATTERN = "\{(" + "|".join(self.TEST_RESULT_MAPPING.keys()) + ")\}"
self.re_detect_testcase_result = re.compile(PATTERN)
def reset(self, mcu_name, serial,
verbose=False, sleep_before_reset=0, sleep_after_reset=0):
"""
Functions resets target using various methods (e.g. serial break)
depending on target type.
"""
if sleep_before_reset > 0:
sleep(sleep_before_reset)
if verbose:
verbose_msg = "Reset::cmd(sendBreak)"
# Reset type decision
if mcu_name.startswith('NRF51822'): # Nordic
call(["nrfjprog", "-r"])
verbose_msg = "Reset::cmd(nrfjprog)"
elif mcu_name.startswith('NUCLEO'): # ST NUCLEO
call(["ST-LINK_CLI.exe", "-Rst"])
verbose_msg = "Reset::cmd(ST-LINK_CLI.exe)"
else:
serial.sendBreak()
if sleep_before_reset > 0:
sleep(sleep_after_reset)
if verbose:
print verbose_msg
def flush_serial(self, serial):
""" Flushing serial in/out. """
serial.flushInput()
serial.flushOutput()
def is_peripherals_available(self, target, peripherals=None):
""" Checks if specified target should run specific peripheral test case."""
if peripherals is not None:
peripherals = set(peripherals)
for id, mut in MUTs.iteritems():
# Target check
if mut["mcu"] != target:
continue
# Peripherals check
if peripherals is not None:
if 'peripherals' not in mut:
continue
if not peripherals.issubset(set(mut['peripherals'])):
continue
return True
return False
def run_host_test(self, name, target_name, disk, port,
duration, extra_serial, verbose=False):
def run_host_test(self, target_name, port,
duration, verbose=False):
"""
Functions resets target and grabs by timeouted pooling test log
via serial port.
@ -130,24 +95,23 @@ class SingleTestRunner():
"""
output = ""
# Prepare serial for receiving data from target
baud = 9600
BAUD = 9600
serial = Serial(port, timeout=1)
serial.setBaudrate(baud)
self.flush_serial(serial)
serial.setBaudrate(BAUD)
flush_serial(serial)
# Resetting target and pooling
self.reset(target_name, serial, verbose=verbose)
start = time()
reset(target_name, serial, verbose=verbose)
start_serial_timeour = time()
try:
while (time() - start) < duration:
while (time() - start_serial_timeour) < duration:
test_output = serial.read(512)
output += test_output
self.flush_serial(serial)
flush_serial(serial)
if '{end}' in output:
break
except KeyboardInterrupt, _:
print "CTRL+C break"
self.flush_serial(serial)
flush_serial(serial)
serial.close()
# Handle verbose mode
@ -157,32 +121,14 @@ class SingleTestRunner():
print "Test::Output::Finish"
# Parse test 'output' data
result = "UNDEF"
result = self.TEST_RESULT_UNDEF
for line in output.splitlines():
if '{success}' in line:
result = "OK"
if '{failure}' in line:
result = "FAIL"
if '{error}' in line:
result = "ERROR"
if '{end}' in line:
m = self.re_detect_testcase_result.search(line)
if m and len(m.groups()):
result = self.TEST_RESULT_MAPPING[m.groups(0)[0]]
break
return result
def print_test_result(self, test_result, target_name, toolchain_name,
test_id, test_description, elapsed_time, duration):
""" Use specific convention to pront test result and related data."""
tokens = []
tokens.append("TargetTest")
tokens.append(target_name)
tokens.append(toolchain_name)
tokens.append(test_id)
tokens.append(test_description)
separator = "::"
time_info = " in %d of %d sec" % (elapsed_time, duration)
result = separator.join(tokens) + " [" + test_result +"]" + time_info
return result
def handle(self, test_spec, target_name, toolchain_name):
"""
Function determines MUT's mbed disk/port and copies binary to
@ -238,14 +184,78 @@ class SingleTestRunner():
# Host test execution
start = time()
test_result = self.run_host_test(test.host_test, target_name, disk, port, duration, extra_serial)
test_result = self.run_host_test(target_name, port, duration)
elapsed_time = time() - start
print self.print_test_result(test_result, target_name, toolchain_name,
print print_test_result(test_result, target_name, toolchain_name,
test_id, test_description, elapsed_time, duration)
return (test_result, target_name, toolchain_name,
test_id, test_description, round(elapsed_time, 2), duration)
def flush_serial(serial):
""" Flushing serial in/out. """
serial.flushInput()
serial.flushOutput()
def reset(mcu_name, serial, verbose=False, sleep_before_reset=0, sleep_after_reset=0):
"""
Functions resets target using various methods (e.g. serial break)
depending on target type.
"""
if sleep_before_reset > 0:
sleep(sleep_before_reset)
if verbose:
verbose_msg = "Reset::cmd(sendBreak)"
# Reset type decision
if mcu_name.startswith('NRF51822'): # Nordic
call(["nrfjprog", "-r"])
verbose_msg = "Reset::cmd(nrfjprog)"
elif mcu_name.startswith('NUCLEO'): # ST NUCLEO
call(["ST-LINK_CLI.exe", "-Rst"])
verbose_msg = "Reset::cmd(ST-LINK_CLI.exe)"
else:
serial.sendBreak()
if sleep_before_reset > 0:
sleep(sleep_after_reset)
if verbose:
print verbose_msg
def is_peripherals_available(target, peripherals=None):
""" Checks if specified target should run specific peripheral test case."""
if peripherals is not None:
peripherals = set(peripherals)
for id, mut in MUTs.iteritems():
# Target check
if mut["mcu"] != target:
continue
# Peripherals check
if peripherals is not None:
if 'peripherals' not in mut:
continue
if not peripherals.issubset(set(mut['peripherals'])):
continue
return True
return False
def print_test_result(test_result, target_name, toolchain_name,
test_id, test_description, elapsed_time, duration):
""" Use specific convention to pront test result and related data."""
tokens = []
tokens.append("TargetTest")
tokens.append(target_name)
tokens.append(toolchain_name)
tokens.append(test_id)
tokens.append(test_description)
separator = "::"
time_info = " in %d of %d sec" % (elapsed_time, duration)
result = separator.join(tokens) + " [" + test_result +"]" + time_info
return result
def shape_test_request(mcu, image_path, test_id, duration=10):
""" Function prepares JOSN structure describing test specification."""
test_spec = {
@ -299,7 +309,9 @@ if __name__ == '__main__':
action="store_true",
help='Verbose mode (pronts some extra information)')
parser.epilog="Example: singletest.py -i test_spec.json"
parser.description = """This script allows you to run mbed defined test cases for particular MCU(s) and corresponding toolchain(s)."""
parser.epilog = """Example: singletest.py -i test_spec.json [-M muts_all.json]"""
(opts, args) = parser.parse_args()
# Open file with test specification
@ -310,6 +322,7 @@ if __name__ == '__main__':
parser.print_help()
exit(-1)
# Get extra MUTs if applicable
if opts.muts_spec_filename:
MUTs = get_json_data_from_file(opts.muts_spec_filename, opts.verbose)
if MUTs is None:
@ -340,7 +353,7 @@ if __name__ == '__main__':
continue
if test.automated and test.is_supported(target, toolchain):
if not single_test.is_peripherals_available(target, test.peripherals):
if not is_peripherals_available(target, test.peripherals):
if opts.verbose:
print "TargetTest::%s::TestSkipped(%s)" % (target, ",".join(test.peripherals))
continue