Merge branch 'master' of github.com:mbedmicro/mbed

pull/445/head
Bogdan Marinescu 2014-08-14 12:40:26 +01:00
commit ba4f09543f
10 changed files with 390 additions and 110 deletions

View File

@ -29,7 +29,7 @@ class EchoTest(Test):
TEST="longer serial test"
check = True
for i in range(1, 100):
self.mbed.serial.write(TEST + "\n")
self.mbed.serial_write(TEST + "\n")
l = self.mbed.serial.readline().strip()
if not l: continue

View File

@ -28,8 +28,7 @@ from time import sleep
from sys import stdout
class Mbed:
"""
Base class for a host driven test
""" Base class for a host driven test
"""
def __init__(self):
parser = OptionParser()
@ -63,8 +62,17 @@ class Mbed:
dest="forced_reset_type",
help="Forces different type of reset")
parser.add_option("-R", "--reset-timeout",
dest="forced_reset_timeout",
metavar="NUMBER",
type="int",
help="When forcing a reset using option -r you can set up after reset timeout in seconds")
(self.options, _) = parser.parse_args()
self.DEFAULT_RESET_TOUT = 2
self.DEFAULT_TOUT = 10
if self.options.port is None:
raise Exception("The serial port of the target mbed have to be provided as command line arguments")
@ -73,10 +81,12 @@ class Mbed:
self.extra_port = self.options.extra
self.extra_serial = None
self.serial = None
self.timeout = 10 if self.options.timeout is None else self.options.timeout
print 'Mbed: "%s" "%s"' % (self.port, self.disk)
self.timeout = self.DEFAULT_TOUT if self.options.timeout is None else self.options.timeout
print 'Host test instrumentation on port: "%s" with serial: "%s"' % (self.port, self.disk)
def init_serial(self, baud=9600, extra_baud=9600):
""" Initialize serial port. Function will return error is port can't be opened or initialized
"""
result = True
try:
self.serial = Serial(self.port, timeout=1)
@ -91,8 +101,18 @@ class Mbed:
self.flush()
return result
def serial_timeout(self, timeout):
""" Wraps self.mbed.serial object timeout property
"""
result = None
if self.serial:
self.serial.timeout = timeout
result = True
return result
def serial_read(self, count=1):
""" Wraps self.mbed.serial object read method """
""" Wraps self.mbed.serial object read method
"""
result = None
if self.serial:
try:
@ -102,13 +122,14 @@ class Mbed:
return result
def serial_write(self, write_buffer):
""" Wraps self.mbed.serial object write method """
result = -1
""" Wraps self.mbed.serial object write method
"""
result = None
if self.serial:
try:
result = self.serial.write(write_buffer)
except:
result = -1
result = None
return result
def safe_sendBreak(self, serial):
@ -132,24 +153,38 @@ class Mbed:
result = False
return result
def touch_file(self, path, name):
with os.open(path, 'a'):
def touch_file(self, path):
""" Touch file and set timestamp to items
"""
with open(path, 'a'):
os.utime(path, None)
def reset_timeout(self, timeout):
""" Timeout executed just after reset command is issued
"""
for n in range(0, timeout):
sleep(1)
def reset(self):
""" reboot.txt - startup from standby state, reboots when in run mode.
""" Reset function. Supports 'standard' send break command via Mbed's CDC,
also handles other reset modes.
E.g. reset by touching file with specific file name:
reboot.txt - startup from standby state, reboots when in run mode.
shutdown.txt - shutdown from run mode
reset.txt - reset fpga during run mode """
if self.options.forced_reset_type:
path = os.path.join([self.disk, self.options.forced_reset_type.lower()])
if self.options.forced_reset_type.endswith('.txt'):
self.touch_file(path)
reset.txt - reset FPGA during run mode
"""
if self.options.forced_reset_type and self.options.forced_reset_type.endswith('.txt'):
reset_file_path = os.path.join(self.disk, self.options.forced_reset_type.lower())
self.touch_file(reset_file_path)
else:
self.safe_sendBreak(self.serial) # Instead of serial.sendBreak()
# Give time to wait for the image loading
sleep(2)
reset_tout_s = self.options.forced_reset_timeout if self.options.forced_reset_timeout is not None else self.DEFAULT_RESET_TOUT
self.reset_timeout(reset_tout_s)
def flush(self):
""" Flush serial ports
"""
self.serial.flushInput()
self.serial.flushOutput()
if self.extra_serial:
@ -158,10 +193,15 @@ class Mbed:
class Test:
""" Baseclass for host test's test runner
"""
def __init__(self):
self.mbed = Mbed()
def run(self):
""" Test runner for host test. This function will start executing
test and forward test result via serial port to test suite
"""
try:
result = self.test()
self.print_result("success" if result else "failure")
@ -170,7 +210,8 @@ class Test:
self.print_result("error")
def setup(self):
""" Setup and check if configuration for test is correct. E.g. if serial port can be opened """
""" Setup and check if configuration for test is correct. E.g. if serial port can be opened
"""
result = True
if not self.mbed.serial:
result = False
@ -178,16 +219,20 @@ class Test:
return result
def notify(self, message):
""" On screen notification function """
""" On screen notification function
"""
print message
stdout.flush()
def print_result(self, result):
""" Test result unified printing function """
""" Test result unified printing function
"""
self.notify("\n{%s}\n{end}" % result)
class DefaultTest(Test):
""" Test class with serial port initialization
"""
def __init__(self):
Test.__init__(self)
serial_init_res = self.mbed.init_serial()
@ -195,6 +240,10 @@ class DefaultTest(Test):
class Simple(DefaultTest):
""" Simple, basic host test's test runner waiting for serial port
output from MUT, no supervision over test running in MUT is executed.
Just waiting for result
"""
def run(self):
try:
while True:
@ -207,5 +256,6 @@ class Simple(DefaultTest):
except KeyboardInterrupt, _:
print "\n[CTRL+c] exit"
if __name__ == '__main__':
Simple().run()

View File

@ -26,7 +26,9 @@ class RTCTest(DefaultTest):
def run(self):
test_result = True
c = self.mbed.serial.timeout = None
if self.mbed.serial_timeout(None) is None:
self.print_result("ioerr_serial")
return
for i in range(0, 5):
c = self.mbed.serial_read(38) # 38 len("[1256729742] [2009-10-28 11:35:42 AM]\n"
if c is None:

View File

@ -31,7 +31,7 @@ class StdioTest(DefaultTest):
for i in range(1, 5):
random_integer = random.randint(-10000, 10000)
print "Generated number: " + str(random_integer)
self.mbed.serial.write(str(random_integer) + "\n")
self.mbed.serial_write(str(random_integer) + "\n")
serial_stdio_msg = ""
ip_msg_timeout = self.mbed.options.timeout

View File

@ -33,7 +33,7 @@ class TCPEchoClientTest(Test):
self.mbed.reset()
print "Sending server IP Address to target..."
connection_str = ip_address + ":" + str(port_no) + "\n"
self.mbed.serial.write(connection_str)
self.mbed.serial_write(connection_str)
class TCPEchoClient_Handler(BaseRequestHandler):

View File

@ -32,7 +32,7 @@ class UDPEchoClientTest(Test):
self.mbed.reset()
print "Sending server IP Address to target..."
connection_str = ip_address + ":" + str(port_no) + "\n"
self.mbed.serial.write(connection_str)
self.mbed.serial_write(connection_str)
class UDPEchoClient_Handler(BaseRequestHandler):

View File

@ -23,7 +23,9 @@ class WaitusTest(DefaultTest):
def run(self):
test_result = True
# First character to start test (to know after reset when test starts)
self.mbed.serial.timeout = None
if self.mbed.serial_timeout(None) is None:
self.print_result("ioerr_serial")
return
c = self.mbed.serial_read(1)
if c is None:
self.print_result("ioerr_serial")

View File

@ -91,7 +91,8 @@ from workspace_tools.test_api import get_default_test_options_parser
def get_version():
""" Returns test script version """
""" Returns test script version
"""
single_test_version_major = 1
single_test_version_minor = 1
return (single_test_version_major, single_test_version_minor)
@ -168,6 +169,7 @@ if __name__ == '__main__':
single_test = SingleTestRunner(_global_loops_count=opts.test_global_loops_value,
_test_loops_list=opts.test_loops_list,
_muts=MUTs,
_opts_log_file_name=opts.log_file_name,
_test_spec=test_spec,
_opts_goanna_for_mbed_sdk=opts.goanna_for_mbed_sdk,
_opts_goanna_for_tests=opts.goanna_for_tests,
@ -184,8 +186,9 @@ if __name__ == '__main__':
_opts_suppress_summary=opts.suppress_summary,
_opts_test_x_toolchain_summary=opts.test_x_toolchain_summary,
_opts_copy_method=opts.copy_method,
_opts_mut_reset_type=opts.mut_reset_type
)
_opts_mut_reset_type=opts.mut_reset_type,
_opts_jobs=opts.jobs,
_opts_extend_test_timeout=opts.extend_test_timeout)
# Runs test suite in CLI mode
singletest_in_cli_mode(single_test)

View File

@ -19,10 +19,13 @@ Author: Przemyslaw Wirkus <Przemyslaw.wirkus@arm.com>
import os
import re
import sys
import json
import time
import pprint
import random
import optparse
import datetime
import threading
from types import ListType
from prettytable import PrettyTable
@ -35,10 +38,11 @@ from threading import Thread
from subprocess import Popen, PIPE, call
# Imports related to mbed build api
from workspace_tools.tests import TESTS
from workspace_tools.tests import TEST_MAP
from workspace_tools.paths import BUILD_DIR
from workspace_tools.paths import HOST_TESTS
from workspace_tools.tests import TEST_MAP
from workspace_tools.tests import TESTS
from workspace_tools.utils import construct_enum
from workspace_tools.targets import TARGET_MAP
from workspace_tools.build_api import build_project, build_mbed_libs, build_lib
from workspace_tools.build_api import get_target_supported_toolchains
@ -68,7 +72,8 @@ class ProcessObserver(Thread):
class SingleTestExecutor(threading.Thread):
""" Example: Single test class in separate thread usage """
""" Example: Single test class in separate thread usage
"""
def __init__(self, single_test):
self.single_test = single_test
threading.Thread.__init__(self)
@ -86,12 +91,12 @@ class SingleTestExecutor(threading.Thread):
# prints well-formed summary with results (SQL table like)
# table shows text x toolchain test result matrix
print self.single_test.generate_test_summary_by_target(test_summary, shuffle_seed)
print "Completed in %d sec"% (elapsed_time)
print "Completed in %.2f sec"% (elapsed_time)
class SingleTestRunner(object):
""" Object wrapper for single test run which may involve multiple MUTs."""
""" Object wrapper for single test run which may involve multiple MUTs
"""
RE_DETECT_TESTCASE_RESULT = None
# Return codes for test script
@ -127,6 +132,7 @@ class SingleTestRunner(object):
_global_loops_count=1,
_test_loops_list=None,
_muts={},
_opts_log_file_name=None,
_test_spec={},
_opts_goanna_for_mbed_sdk=None,
_opts_goanna_for_tests=None,
@ -144,9 +150,10 @@ class SingleTestRunner(object):
_opts_test_x_toolchain_summary=False,
_opts_copy_method=None,
_opts_mut_reset_type=None,
_opts_jobs=None
):
""" Let's try hard to init this object """
_opts_jobs=None,
_opts_extend_test_timeout=None):
""" Let's try hard to init this object
"""
PATTERN = "\\{(" + "|".join(self.TEST_RESULT_MAPPING.keys()) + ")\\}"
self.RE_DETECT_TESTCASE_RESULT = re.compile(PATTERN)
# Settings related to test loops counters
@ -168,6 +175,7 @@ class SingleTestRunner(object):
self.test_spec = _test_spec
# Settings passed e.g. from command line
self.opts_log_file_name = _opts_log_file_name
self.opts_goanna_for_mbed_sdk = _opts_goanna_for_mbed_sdk
self.opts_goanna_for_tests = _opts_goanna_for_tests
self.opts_shuffle_test_order = _opts_shuffle_test_order
@ -184,13 +192,17 @@ class SingleTestRunner(object):
self.opts_test_x_toolchain_summary = _opts_test_x_toolchain_summary
self.opts_copy_method = _opts_copy_method
self.opts_mut_reset_type = _opts_mut_reset_type
self.opts_jobs = _opts_jobs
self.opts_jobs = _opts_jobs if _opts_jobs is not None else 1
self.opts_extend_test_timeout = _opts_extend_test_timeout
self.logger = CLITestLogger(file_name=self.opts_log_file_name) # Default test logger
def shuffle_random_func(self):
return self.shuffle_random_seed
def is_shuffle_seed_float(self):
""" return true if function parameter can be converted to float """
""" return true if function parameter can be converted to float
"""
result = True
try:
float(self.shuffle_random_seed)
@ -214,7 +226,7 @@ class SingleTestRunner(object):
# print '=== %s::%s ===' % (target, toolchain)
# Let's build our test
if target not in TARGET_MAP:
print 'Skipped tests for %s target. Target platform not found' % (target)
print self.logger.log_line(self.logger.LogType.NOTIF, 'Skipped tests for %s target. Target platform not found' % (target))
continue
T = TARGET_MAP[target]
@ -227,7 +239,7 @@ class SingleTestRunner(object):
clean=clean_mbed_libs_options,
jobs=self.opts_jobs)
if not build_mbed_libs_result:
print 'Skipped tests for %s target. Toolchain %s is not yet supported for this target' % (T.name, toolchain)
print self.logger.log_line(self.logger.LogType.NOTIF, 'Skipped tests for %s target. Toolchain %s is not yet supported for this target' % (T.name, toolchain))
continue
build_dir = join(BUILD_DIR, "test", target, toolchain)
@ -247,19 +259,19 @@ class SingleTestRunner(object):
if self.opts_test_only_peripheral and not test.peripherals:
if self.opts_verbose_skipped_tests:
print "TargetTest::%s::NotPeripheralTestSkipped()" % (target)
print self.logger.log_line(self.logger.LogType.INFO, 'Common test skipped for target %s'% (target))
continue
if self.opts_test_only_common and test.peripherals:
if self.opts_verbose_skipped_tests:
print "TargetTest::%s::PeripheralTestSkipped()" % (target)
print self.logger.log_line(self.logger.LogType.INFO, 'Peripheral test skipped for target %s'% (target))
continue
if test.automated and test.is_supported(target, toolchain):
if not self.is_peripherals_available(target, test.peripherals):
if self.opts_verbose_skipped_tests:
test_peripherals = test.peripherals if test.peripherals else []
print "TargetTest::%s::TestSkipped(%s)" % (target, ",".join(test_peripherals))
print self.logger.log_line(self.logger.LogType.INFO, 'Peripheral %s test skipped for target %s'% (",".join(test_peripherals), target))
continue
build_project_options = ["analyze"] if self.opts_goanna_for_tests else None
@ -310,9 +322,14 @@ class SingleTestRunner(object):
# With this option we are skipping testing phase
continue
# Test duration can be increased by global value
test_duration = test.duration
if self.opts_extend_test_timeout is not None:
test_duration += self.opts_extend_test_timeout
# For an automated test the duration act as a timeout after
# which the test gets interrupted
test_spec = self.shape_test_request(target, path, test_id, test.duration)
test_spec = self.shape_test_request(target, path, test_id, test_duration)
test_loops = self.get_test_loop_count(test_id)
single_test_result = self.handle(test_spec, target, toolchain, test_loops=test_loops)
if single_test_result is not None:
@ -321,7 +338,8 @@ class SingleTestRunner(object):
def generate_test_summary_by_target(self, test_summary, shuffle_seed=None):
""" Prints well-formed summary with results (SQL table like)
table shows text x toolchain test result matrix """
table shows text x toolchain test result matrix
"""
RESULT_INDEX = 0
TARGET_INDEX = 1
TOOLCHAIN_INDEX = 2
@ -334,23 +352,30 @@ class SingleTestRunner(object):
unique_toolchains = get_unique_value_from_summary(test_summary, TOOLCHAIN_INDEX)
result = "Test summary:\n"
result_dict = {} # test : { toolchain : result }
for target in unique_targets:
result_dict = {} # test : { toolchain : result }
unique_target_toolchains = []
for test in test_summary:
if test[TARGET_INDEX] == target:
if test[TOOLCHAIN_INDEX] not in unique_target_toolchains:
unique_target_toolchains.append(test[TOOLCHAIN_INDEX])
if test[TEST_INDEX] not in result_dict:
result_dict[test[TEST_INDEX]] = {}
result_dict[test[TEST_INDEX]][test[TOOLCHAIN_INDEX]] = test[RESULT_INDEX]
pt_cols = ["Target", "Test ID", "Test Description"] + unique_toolchains
pt_cols = ["Target", "Test ID", "Test Description"] + unique_target_toolchains
pt = PrettyTable(pt_cols)
for col in pt_cols:
pt.align[col] = "l"
pt.padding_width = 1 # One space between column edges and contents (default)
for test in unique_tests:
if test in result_dict:
test_results = result_dict[test]
if test in unique_test_desc:
row = [target, test, unique_test_desc[test]]
for toolchain in unique_toolchains:
if toolchain in test_results:
row.append(test_results[toolchain])
pt.add_row(row)
result += pt.get_string()
@ -361,7 +386,8 @@ class SingleTestRunner(object):
def generate_test_summary(self, test_summary, shuffle_seed=None):
""" Prints well-formed summary with results (SQL table like)
table shows target x test results matrix across """
table shows target x test results matrix across
"""
result = "Test summary:\n"
# Pretty table package is used to print results
pt = PrettyTable(["Result", "Target", "Toolchain", "Test ID", "Test Description",
@ -399,7 +425,8 @@ class SingleTestRunner(object):
return result
def test_loop_list_to_dict(self, test_loops_str):
""" Transforms test_id=X,test_id=X,test_id=X into dictionary {test_id : test_id_loops_count} """
""" Transforms test_id=X,test_id=X,test_id=X into dictionary {test_id : test_id_loops_count}
"""
result = {}
if test_loops_str:
test_loops = test_loops_str.split(',')
@ -416,7 +443,8 @@ class SingleTestRunner(object):
def get_test_loop_count(self, test_id):
""" This function returns no. of loops per test (deducted by test_id_.
If test is not in list of redefined loop counts it will use default value. """
If test is not in list of redefined loop counts it will use default value.
"""
result = self.GLOBAL_LOOPS_COUNT
if test_id in self.TEST_LOOPS_DICT:
result = self.TEST_LOOPS_DICT[test_id]
@ -433,21 +461,39 @@ class SingleTestRunner(object):
profile.set_preference('browser.download.manager.showWhenStarting', False)
profile.set_preference('browser.download.dir', dest_disk)
profile.set_preference('browser.helperApps.neverAsk.saveToDisk', 'application/octet-stream')
# Launch browser with profile and get file
browser = webdriver.Firefox(profile)
browser.get(file_path)
browser.close()
def file_copy_method_selector(self, image_path, disk, copy_method):
def image_copy_method_selector(self, target_name, image_path, disk, copy_method,
images_config=None, image_dest=None):
""" Function copied image file and fiddles with image configuration files in needed.
This function will select proper image configuration (modify image config file
if needed) after image is copied.
"""
image_dest = image_dest if image_dest is not None else ''
_copy_res, _err_msg, _copy_method = self.file_copy_method_selector(image_path, disk, self.opts_copy_method, image_dest=image_dest)
if images_config is not None:
# For different targets additional configuration file has to be changed
# Here we select target and proper function to handle configuration change
if target == 'ARM_MPS2':
images_cfg_path = images_config
image0file_path = os.path.join(disk, image_dest, basename(image_path))
mps2_set_board_image_file(disk, images_cfg_path, image0file_path)
return _copy_res, _err_msg, _copy_method
def file_copy_method_selector(self, image_path, disk, copy_method, image_dest=''):
""" Copy file depending on method you want to use. Handles exception
and return code from shell copy commands. """
and return code from shell copy commands.
"""
result = True
resutl_msg = ""
if copy_method == 'cp' or copy_method == 'copy' or copy_method == 'xcopy':
source_path = image_path.encode('ascii', 'ignore')
destination_path = os.path.join(disk.encode('ascii', 'ignore'), basename(image_path).encode('ascii', 'ignore'))
destination_path = os.path.join(disk.encode('ascii', 'ignore'), image_dest, basename(image_path).encode('ascii', 'ignore'))
cmd = [copy_method, source_path, destination_path]
try:
ret = call(cmd, shell=True)
@ -457,10 +503,10 @@ class SingleTestRunner(object):
except Exception, e:
resutl_msg = e
result = False
if copy_method == 'firefox':
elif copy_method == 'firefox':
try:
source_path = image_path.encode('ascii', 'ignore')
destination_path = disk.encode('ascii', 'ignore')
destination_path = os.path.join(disk.encode('ascii', 'ignore'), image_dest)
self.file_store_firefox(source_path, destination_path)
except Exception, e:
resutl_msg = e
@ -473,10 +519,12 @@ class SingleTestRunner(object):
except Exception, e:
resutl_msg = e
result = False
return result, resutl_msg, copy_method
def delete_file(self, file_path):
""" Remove file from the system """
""" Remove file from the system
"""
result = True
resutl_msg = ""
try:
@ -488,7 +536,8 @@ class SingleTestRunner(object):
def handle(self, test_spec, target_name, toolchain_name, test_loops=1):
""" Function determines MUT's mbed disk/port and copies binary to
target. Test is being invoked afterwards. """
target. Test is being invoked afterwards.
"""
data = json.loads(test_spec)
# Get test information, image and test timeout
test_id = data['test_id']
@ -511,13 +560,19 @@ class SingleTestRunner(object):
disk = mut['disk']
port = mut['port']
target_by_mcu = TARGET_MAP[mut['mcu']]
# Some extra stuff can be declared in MUTs structure
reset_type = mut.get('reset_type') # reboot.txt, reset.txt, shutdown.txt
reset_tout = mut.get('reset_tout') # COPY_IMAGE -> RESET_PROC -> SLEEP(RESET_TOUT)
image_dest = mut.get('image_dest') # Image file destination DISK + IMAGE_DEST + BINARY_NAME
images_config = mut.get('images_config') # Available images selection via config file
mobo_config = mut.get('mobo_config') # Available board configuration selection e.g. core selection etc.
# Program
# When the build and test system were separate, this was relative to a
# base network folder base path: join(NETWORK_BASE_PATH, )
image_path = image
if not exists(image_path):
print "Error: Image file does not exist: %s" % image_path
print self.logger.log_line(self.logger.LogType.ERROR, 'Image file does not exist: %s' % image_path)
elapsed_time = 0
test_result = self.TEST_RESULT_NO_IMAGE
return (test_result, target_name, toolchain_name,
@ -532,7 +587,10 @@ class SingleTestRunner(object):
test_all_result = []
for test_index in range(test_loops):
# Choose one method of copy files to mbed virtual drive
_copy_res, _err_msg, _copy_method = self.file_copy_method_selector(image_path, disk, self.opts_copy_method)
#_copy_res, _err_msg, _copy_method = self.file_copy_method_selector(image_path, disk, self.opts_copy_method, image_dest=image_dest)
_copy_res, _err_msg, _copy_method = self.image_copy_method_selector(target_name, image_path, disk, self.opts_copy_method,
images_config, image_dest)
# Host test execution
start_host_exec_time = time()
@ -540,7 +598,7 @@ class SingleTestRunner(object):
single_test_result = self.TEST_RESULT_UNDEF # singe test run result
if not _copy_res: # Serial port copy error
single_test_result = self.TEST_RESULT_IOERR_COPY
print "Error: Copy method '%s'. %s"% (_copy_method, _err_msg)
print self.logger.log_line(self.logger.LogType.ERROR, "Copy method '%s' failed. Reason: %s"% (_copy_method, _err_msg))
else:
# Copy Extra Files
if not target_by_mcu.is_disk_virtual and test.extra_files:
@ -552,8 +610,11 @@ class SingleTestRunner(object):
start_host_exec_time = time()
host_test_verbose = self.opts_verbose_test_result_only or self.opts_verbose
host_test_reset = self.opts_mut_reset_type
single_test_result = self.run_host_test(test.host_test, disk, port, duration, verbose=host_test_verbose, reset=host_test_reset)
host_test_reset = self.opts_mut_reset_type if reset_type is None else reset_type
single_test_result = self.run_host_test(test.host_test, disk, port, duration,
verbose=host_test_verbose,
reset=host_test_reset,
reset_tout=reset_tout)
# Store test result
test_all_result.append(single_test_result)
@ -567,7 +628,8 @@ class SingleTestRunner(object):
def print_test_result(self, test_result, target_name, toolchain_name,
test_id, test_description, elapsed_time, duration):
""" Use specific convention to print test result and related data."""
""" Use specific convention to print test result and related data
"""
tokens = []
tokens.append("TargetTest")
tokens.append(target_name)
@ -580,22 +642,25 @@ class SingleTestRunner(object):
return result
def shape_test_loop_ok_result_count(self, test_all_result):
""" Reformats list of results to simple string """
""" Reformats list of results to simple string
"""
test_loop_count = len(test_all_result)
test_loop_ok_result = test_all_result.count(self.TEST_RESULT_OK)
return "%d/%d"% (test_loop_ok_result, test_loop_count)
def shape_global_test_loop_result(self, test_all_result):
""" Reformats list of results to simple string """
""" Reformats list of results to simple string
"""
result = self.TEST_RESULT_FAIL
if all(test_all_result[0] == res for res in test_all_result):
result = test_all_result[0]
return result
def run_host_test(self, name, disk, port, duration, reset=None, verbose=False, extra_serial=None):
def run_host_test(self, name, disk, port, duration, reset=None, reset_tout=None, verbose=False, extra_serial=None):
""" Function creates new process with host test configured with particular test case.
Function also is pooling for serial port activity from process to catch all data
printed by test runner and host test during test execution."""
printed by test runner and host test during test execution
"""
# print "{%s} port:%s disk:%s" % (name, port, disk),
cmd = ["python", "%s.py" % name, '-p', port, '-d', disk, '-t', str(duration)]
@ -604,6 +669,11 @@ class SingleTestRunner(object):
cmd += ["-e", extra_serial]
if reset is not None:
cmd += ["-r", reset]
if reset_tout is not None:
cmd += ["-R", str(reset_tout)]
if verbose:
print "Host test cmd: " + " ".join(cmd)
proc = Popen(cmd, stdout=PIPE, cwd=HOST_TESTS)
obs = ProcessObserver(proc)
@ -612,7 +682,7 @@ class SingleTestRunner(object):
output = []
while (time() - start_time) < duration:
try:
c = obs.queue.get(block=True, timeout=1)
c = obs.queue.get(block=True, timeout=0.5)
except Empty, _:
c = None
@ -620,7 +690,8 @@ class SingleTestRunner(object):
output.append(c)
# Give the mbed under test a way to communicate the end of the test
if c in ['\n', '\r']:
if '{end}' in line: break
if '{end}' in line:
break
line = ''
else:
line += c
@ -644,7 +715,8 @@ class SingleTestRunner(object):
return result
def is_peripherals_available(self, target_mcu_name, peripherals=None):
""" Checks if specified target should run specific peripheral test case."""
""" Checks if specified target should run specific peripheral test case
"""
if peripherals is not None:
peripherals = set(peripherals)
for id, mut in self.muts.iteritems():
@ -661,7 +733,8 @@ class SingleTestRunner(object):
return False
def shape_test_request(self, mcu, image_path, test_id, duration=10):
""" Function prepares JOSN structure describing test specification."""
""" Function prepares JOSN structure describing test specification
"""
test_spec = {
"mcu": mcu,
"image": image_path,
@ -672,7 +745,8 @@ class SingleTestRunner(object):
def get_unique_value_from_summary(test_summary, index):
""" Gets list of unique target names """
""" Gets list of unique target names
"""
result = []
for test in test_summary:
target_name = test[index]
@ -682,7 +756,8 @@ def get_unique_value_from_summary(test_summary, index):
def get_unique_value_from_summary_ext(test_summary, index_key, index_val):
""" Gets list of unique target names and return dictionary """
""" Gets list of unique target names and return dictionary
"""
result = {}
for test in test_summary:
key = test[index_key]
@ -693,7 +768,8 @@ def get_unique_value_from_summary_ext(test_summary, index_key, index_val):
def show_json_file_format_error(json_spec_filename, line, column):
""" Prints JSON broken content """
""" Prints JSON broken content
"""
with open(json_spec_filename) as data_file:
line_no = 1
for json_line in data_file:
@ -707,7 +783,8 @@ def show_json_file_format_error(json_spec_filename, line, column):
def json_format_error_defect_pos(json_error_msg):
""" Gets first error line and column in JSON file format.
Parsed from exception thrown by json.loads() string """
Parsed from exception thrown by json.loads() string
"""
result = None
line, column = 0, 0
# Line value search
@ -727,7 +804,8 @@ def json_format_error_defect_pos(json_error_msg):
def get_json_data_from_file(json_spec_filename, verbose=False):
""" Loads from file JSON formatted string to data structure """
""" Loads from file JSON formatted string to data structure
"""
result = None
try:
with open(json_spec_filename) as data_file:
@ -735,7 +813,7 @@ def get_json_data_from_file(json_spec_filename, verbose=False):
result = json.load(data_file)
except ValueError as json_error_msg:
result = None
print "Error in '%s' file. %s" % (json_spec_filename, json_error_msg)
print self.logger.log_line(self.logger.LogType.ERROR, 'JSON file %s parsing failed. Reason: %s' % (json_spec_filename, json_error_msg))
# We can print where error occurred inside JSON file if we can parse exception msg
json_format_defect_pos = json_format_error_defect_pos(str(json_error_msg))
if json_format_defect_pos is not None:
@ -745,7 +823,8 @@ def get_json_data_from_file(json_spec_filename, verbose=False):
show_json_file_format_error(json_spec_filename, line, column)
except IOError as fileopen_error_msg:
print "Error: %s" % (fileopen_error_msg)
print self.logger.log_line(self.logger.LogType.ERROR, 'JSON file %s not opened. Reason: %s'% (json_spec_filename, fileopen_error_msg))
print
if verbose and result:
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(result)
@ -753,7 +832,8 @@ def get_json_data_from_file(json_spec_filename, verbose=False):
def print_muts_configuration_from_json(json_data, join_delim=", "):
""" Prints MUTs configuration passed to test script for verboseness. """
""" Prints MUTs configuration passed to test script for verboseness
"""
muts_info_cols = []
# We need to check all unique properties for each defined MUT
for k in json_data:
@ -782,7 +862,8 @@ def print_muts_configuration_from_json(json_data, join_delim=", "):
def print_test_configuration_from_json(json_data, join_delim=", "):
""" Prints test specification configuration passed to test script for verboseness. """
""" Prints test specification configuration passed to test script for verboseness
"""
toolchains_info_cols = []
# We need to check all toolchains for each device
for k in json_data:
@ -845,7 +926,8 @@ def print_test_configuration_from_json(json_data, join_delim=", "):
def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=','):
""" Generates table summary with all test cases and additional test cases
information using pretty print functionality. Allows test suite user to
see test cases. """
see test cases
"""
# get all unique test ID prefixes
unique_test_id = []
for test in TESTS:
@ -932,7 +1014,8 @@ def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=','
def progress_bar(percent_progress, saturation=0):
""" This function creates progress bar with optional simple saturation mark"""
""" This function creates progress bar with optional simple saturation mark
"""
step = int(percent_progress / 2) # Scale by to (scale: 1 - 50)
str_progress = '#' * step + '.' * int(50 - step)
c = '!' if str_progress[38] == '.' else '|'
@ -943,7 +1026,8 @@ def progress_bar(percent_progress, saturation=0):
def singletest_in_cli_mode(single_test):
""" Runs SingleTestRunner object in CLI (Command line interface) mode """
""" Runs SingleTestRunner object in CLI (Command line interface) mode
"""
start = time()
# Execute tests depending on options and filter applied
test_summary, shuffle_seed = single_test.execute()
@ -956,11 +1040,135 @@ def singletest_in_cli_mode(single_test):
# prints well-formed summary with results (SQL table like)
# table shows text x toolchain test result matrix
print single_test.generate_test_summary_by_target(test_summary, shuffle_seed)
print "Completed in %d sec"% (elapsed_time)
print "Completed in %.2f sec"% (elapsed_time)
def mps2_set_board_image_file(disk, images_cfg_path, image0file_path, image_name='images.txt'):
""" This function will alter image cfg file.
Main goal of this function is to change number of images to 1, comment all
existing image entries and append at the end of file new entry with test path.
@return True when all steps succeed.
"""
MBED_SDK_TEST_STAMP = 'test suite entry'
image_path = os.path.join(disk, images_cfg_path, image_name)
new_file_lines = [] # New configuration file lines (entries)
# Check each line of the image configuration file
try:
with open(image_path, 'r') as file:
for line in file:
if re.search('^TOTALIMAGES', line):
# Check number of total images, should be 1
new_file_lines.append(re.sub('^TOTALIMAGES:[\t ]*[\d]+', 'TOTALIMAGES: 1', line))
pass
elif re.search('; - %s[\n\r]*$'% MBED_SDK_TEST_STAMP, line):
# Look for test suite entries and remove them
pass # Omit all test suite entries
elif re.search('^IMAGE[\d]+FILE', line):
# Check all image entries and mark the ';'
new_file_lines.append(';' + line) # Comment non test suite lines
else:
# Append line to new file
new_file_lines.append(line)
except IOError as e:
return False
# Add new image entry with proper commented stamp
new_file_lines.append('IMAGE0FILE: %s ; - %s\r\n'% (image0file_path, MBED_SDK_TEST_STAMP))
# Write all lines to file
try:
with open(image_path, 'w') as file:
for line in new_file_lines:
file.write(line),
except IOError as e:
return False
return True
def mps2_select_core(disk, mobo_config_name=""):
""" Function selects actual core
"""
# TODO: implement core selection
pass
def mps2_switch_usb_auto_mounting_after_restart(disk, usb_config_name=""):
""" Function alters configuration to allow USB MSD to be mounted after restarts
"""
# TODO: implement USB MSD restart detection
pass
class TestLogger():
""" Super-class for logging and printing ongoing events for test suite pass
"""
def __init__(self, store_log=True):
""" We can control if logger actually stores log in memory
or just handled all log entries immediately
"""
self.log = []
self.log_to_file = False
self.log_file_name = None
self.store_log = store_log
self.LogType = construct_enum(INFO='Info',
WARN='Warning',
NOTIF='Notification',
ERROR='Error',
EXCEPT='Exception')
self.LogToFileAttr = construct_enum(CREATE=1, # Create or overwrite existing log file
APPEND=2) # Append to existing log file
def log_line(self, LogType, log_line):
""" Log one line of text
"""
log_timestamp = time()
log_entry = {'log_type' : LogType,
'log_timestamp' : log_timestamp,
'log_line' : log_line,
'_future' : None}
# Store log in memory
if self.store_log:
self.log.append(log_entry)
return log_entry
class CLITestLogger(TestLogger):
""" Logger used with CLI (Command line interface) test suite. Logs on screen and to file if needed
"""
def __init__(self, store_log=True, file_name=None):
TestLogger.__init__(self)
self.log_file_name = file_name
#self.TIMESTAMP_FORMAT = '%y-%m-%d %H:%M:%S' # Full date and time
self.TIMESTAMP_FORMAT = '%H:%M:%S' # Time only
def log_print(self, log_entry, timestamp=True):
""" Prints on screen formatted log entry
"""
ts = log_entry['log_timestamp']
timestamp_str = datetime.datetime.fromtimestamp(ts).strftime("[%s] "% self.TIMESTAMP_FORMAT) if timestamp else ''
log_line_str = "%(log_type)s: %(log_line)s"% (log_entry)
return timestamp_str + log_line_str
def log_line(self, LogType, log_line, timestamp=True, line_delim='\n'):
log_entry = TestLogger.log_line(self, LogType, log_line)
log_line_str = self.log_print(log_entry, timestamp)
if self.log_file_name is not None:
try:
with open(self.log_file_name, 'a') as file:
file.write(log_line_str + line_delim)
except IOError:
pass
return log_line_str
def get_default_test_options_parser():
""" Get common test script options used by CLI, webservices etc. """
""" Get common test script options used by CLI, webservices etc.
"""
parser = optparse.OptionParser()
parser.add_option('-i', '--tests',
dest='test_spec_filename',
@ -1062,7 +1270,7 @@ def get_default_test_options_parser():
parser.add_option('', '--firmware-name',
dest='firmware_global_name',
help='Set global name for all produced projects. E.g. you can call all test binaries firmware.bin')
help='Set global name for all produced projects. Note, proper file extension will be added by buid scripts.')
parser.add_option('-u', '--shuffle',
dest='shuffle_test_order',
@ -1085,6 +1293,16 @@ def get_default_test_options_parser():
default=None,
help='For some commands you can use filter to filter out results')
parser.add_option('', '--inc-timeout',
dest='extend_test_timeout',
metavar="NUMBER",
type="int",
help='You can increase global timeout for each test by specifying additional test timeout in seconds')
parser.add_option('-l', '--log',
dest='log_file_name',
help='Log events to external file (note not all console entries may be visible in log file)')
parser.add_option('', '--verbose-skipped',
dest='verbose_skipped_tests',
default=False,

View File

@ -53,7 +53,8 @@ class SingleTestRunnerWebService(SingleTestRunner):
REST_TEST_RESULTS='test_results')
def get_rest_result_template(self, result, command, success_code):
""" Returns common part of every web service request """
""" Returns common part of every web service request
"""
result = {"result" : result,
"command" : command,
"success_code": success_code} # 0 - OK, >0 - Error number
@ -61,12 +62,14 @@ class SingleTestRunnerWebService(SingleTestRunner):
# REST API handlers for Flask framework
def rest_api_status(self):
""" Returns current test execution status. E.g. running / finished etc. """
""" Returns current test execution status. E.g. running / finished etc.
"""
with self.resource_lock:
pass
def rest_api_config(self):
""" Returns configuration passed to SingleTest executor """
""" Returns configuration passed to SingleTest executor
"""
with self.resource_lock:
pass
@ -76,7 +79,8 @@ class SingleTestRunnerWebService(SingleTestRunner):
pass
def rest_api_request_handler(self, request_type):
""" Returns various data structures. Both static and mutable during test """
""" Returns various data structures. Both static and mutable during test
"""
result = {}
success_code = 0
with self.resource_lock:
@ -97,7 +101,8 @@ def singletest_in_webservice_mode():
def get_default_test_webservice_options_parser():
""" Get test script web service options used by CLI, webservices etc. """
""" Get test script web service options used by CLI, webservices etc.
"""
parser = get_default_test_options_parser()
# Things related to web services offered by test suite scripts