Merge remote-tracking branch 'upstream/master'

pull/420/head
Toyomasa Watarai 2014-07-31 10:33:24 +09:00
commit 49231e22ad
8 changed files with 596 additions and 274 deletions

View File

@ -271,8 +271,7 @@ static void k64f_tx_reclaim(struct k64f_enetdata *k64f_enet)
// Traverse all descriptors, looking for the ones modified by the uDMA
i = k64f_enet->tx_consume_index;
while(i != k64f_enet->tx_produce_index) {
if (bdPtr[i].controlExtend2 & TX_DESC_UPDATED_MASK) { // descriptor updated by uDMA
while(i != k64f_enet->tx_produce_index && !(bdPtr[i].control & kEnetTxBdReady)) {
if (k64f_enet->txb_aligned[i]) {
free(k64f_enet->txb_aligned[i]);
k64f_enet->txb_aligned[i] = NULL;
@ -282,8 +281,7 @@ static void k64f_tx_reclaim(struct k64f_enetdata *k64f_enet)
}
osSemaphoreRelease(k64f_enet->xTXDCountSem.id);
bdPtr[i].controlExtend2 &= ~TX_DESC_UPDATED_MASK;
}
i = (i + 1) % ENET_TX_RING_LEN;
i = (i + 1) % ENET_TX_RING_LEN;
}
k64f_enet->tx_consume_index = i;

View File

@ -37,6 +37,9 @@ SDFileSystem sd(p11, p12, p13, p14, "sd");
#elif defined(TARGET_LPC11U68)
SDFileSystem sd(D11, D12, D13, D10, "sd");
#elif defined(TARGET_LPC1549)
SDFileSystem sd(D11, D12, D13, D10, "sd");
#else
SDFileSystem sd(p11, p12, p13, p14, "sd");
#endif

View File

@ -45,35 +45,86 @@ if __name__ == '__main__':
parser = get_default_options_parser()
# Extra libraries
parser.add_option("-r", "--rtos", action="store_true", dest="rtos",
default=False, help="Compile the rtos")
parser.add_option("-e", "--eth", action="store_true", dest="eth",
default=False, help="Compile the ethernet library")
parser.add_option("-U", "--usb_host", action="store_true", dest="usb_host",
default=False, help="Compile the USB Host library")
parser.add_option("-u", "--usb", action="store_true", dest="usb",
default=False, help="Compile the USB Device library")
parser.add_option("-d", "--dsp", action="store_true", dest="dsp",
default=False, help="Compile the DSP library")
parser.add_option("-f", "--fat", action="store_true", dest="fat",
default=False, help="Compile FS ad SD card file system library")
parser.add_option("-b", "--ublox", action="store_true", dest="ublox",
default=False, help="Compile the u-blox library")
parser.add_option("-D", "", action="append", dest="macros",
parser.add_option("-r", "--rtos",
action="store_true",
dest="rtos",
default=False,
help="Compile the rtos")
parser.add_option("-e", "--eth",
action="store_true", dest="eth",
default=False,
help="Compile the ethernet library")
parser.add_option("-U", "--usb_host",
action="store_true",
dest="usb_host",
default=False,
help="Compile the USB Host library")
parser.add_option("-u", "--usb",
action="store_true",
dest="usb",
default=False,
help="Compile the USB Device library")
parser.add_option("-d", "--dsp",
action="store_true",
dest="dsp",
default=False,
help="Compile the DSP library")
parser.add_option("-F", "--fat",
action="store_true",
dest="fat",
default=False,
help="Compile FS ad SD card file system library")
parser.add_option("-b", "--ublox",
action="store_true",
dest="ublox",
default=False,
help="Compile the u-blox library")
parser.add_option("-D", "",
action="append",
dest="macros",
help="Add a macro definition")
parser.add_option("-S", "--supported-toolchains", action="store_true", dest="supported_toolchains",
default=False, help="Displays supported matrix of MCUs and toolchains")
parser.add_option("", "--cppcheck", action="store_true", dest="cppcheck_validation",
default=False, help="Forces 'cppcheck' static code analysis")
parser.add_option("-v", "--verbose", action="store_true", dest="verbose",
default=False, help="Verbose diagnostic output")
parser.add_option("-x", "--extra-verbose-notifications", action="store_true", dest="extra_verbose_notify",
default=False, help="Makes compiler more verbose, CI friendly.")
parser.add_option("-S", "--supported-toolchains",
action="store_true",
dest="supported_toolchains",
default=False,
help="Displays supported matrix of MCUs and toolchains")
parser.add_option("", "--cppcheck",
action="store_true",
dest="cppcheck_validation",
default=False,
help="Forces 'cppcheck' static code analysis")
parser.add_option('-f', '--filter',
dest='general_filter_regex',
default=None,
help='For some commands you can use filter to filter out results')
parser.add_option("-v", "--verbose",
action="store_true",
dest="verbose",
default=False,
help="Verbose diagnostic output")
parser.add_option("-x", "--extra-verbose-notifications",
action="store_true",
dest="extra_verbose_notify",
default=False,
help="Makes compiler more verbose, CI friendly.")
(options, args) = parser.parse_args()
# Only prints matrix of supported toolchains
if options.supported_toolchains:
print mcu_toolchain_matrix()
print mcu_toolchain_matrix(platform_filter=options.general_filter_regex)
exit(0)
# Get target list
@ -177,9 +228,9 @@ if __name__ == '__main__':
print "Completed in: (%.2f)s" % (time() - start)
print
print print_build_results(successes, "Build successes:")
print print_build_results(skipped, "Build skipped:")
print print_build_results(failures, "Build failures:")
print print_build_results(successes, "Build successes:"),
print print_build_results(skipped, "Build skipped:"),
print print_build_results(failures, "Build failures:"),
if failures:
sys.exit(1)

View File

@ -17,6 +17,7 @@ limitations under the License.
"""
import tempfile
import re
from os.path import join, exists, basename
from shutil import rmtree
from types import ListType
@ -161,7 +162,7 @@ def build_lib(lib_id, target, toolchain, options=None, verbose=False, clean=Fals
lib.dependencies, options,
verbose=verbose, clean=clean, macros=MACROS, notify=notify, inc_dirs=lib.inc_dirs)
else:
print '\n\nLibrary "%s" is not yet supported on target %s with toolchain %s' % (lib_id, target.name, toolchain)
print 'Library "%s" is not yet supported on target %s with toolchain %s' % (lib_id, target.name, toolchain)
# We do have unique legacy conventions about how we build and package the mbed library
@ -242,7 +243,7 @@ def get_unique_supported_toolchains():
return unique_supported_toolchains
def mcu_toolchain_matrix(verbose_html=False):
def mcu_toolchain_matrix(verbose_html=False, platform_filter=None):
""" Shows target map using prettytable """
unique_supported_toolchains = get_unique_supported_toolchains()
from prettytable import PrettyTable # Only use it in this function so building works without extra modules
@ -256,7 +257,14 @@ def mcu_toolchain_matrix(verbose_html=False):
pt.align["Platform"] = "l"
perm_counter = 0
target_counter = 0
for target in sorted(TARGET_NAMES):
if platform_filter is not None:
# FIlter out platforms using regex
if re.search(platform_filter, target) is None:
continue
target_counter += 1
row = [target] # First column is platform name
default_toolchain = TARGET_MAP[target].default_toolchain
for unique_toolchain in unique_supported_toolchains:
@ -275,7 +283,7 @@ def mcu_toolchain_matrix(verbose_html=False):
result += "*Default - default on-line compiler\n"
result += "*Supported - supported off-line compiler\n"
result += "\n"
result += "Total platforms: %d\n"% (len(TARGET_NAMES))
result += "Total platforms: %d\n"% (target_counter)
result += "Total permutations: %d"% (perm_counter)
return result
@ -489,4 +497,5 @@ def print_build_results(result_list, build_name):
if result_list:
result += build_name + "\n"
result += "\n".join([" * %s" % f for f in result_list])
result += "\n"
return result

View File

@ -22,11 +22,11 @@ except ImportError, e:
print "Error: Can't import 'serial' module: %s"% e
exit(-1)
import os
from optparse import OptionParser
from time import sleep
from sys import stdout
class Mbed:
"""
Base class for a host driven test
@ -34,20 +34,34 @@ class Mbed:
def __init__(self):
parser = OptionParser()
parser.add_option("-m", "--micro", dest="micro",
help="The target microcontroller ", metavar="MICRO")
parser.add_option("-m", "--micro",
dest="micro",
help="The target microcontroller ",
metavar="MICRO")
parser.add_option("-p", "--port", dest="port",
help="The serial port of the target mbed (ie: COM3)", metavar="PORT")
parser.add_option("-p", "--port",
dest="port",
help="The serial port of the target mbed (ie: COM3)",
metavar="PORT")
parser.add_option("-d", "--disk", dest="disk",
help="The target disk path", metavar="DISK_PATH")
parser.add_option("-d", "--disk",
dest="disk",
help="The target disk path",
metavar="DISK_PATH")
parser.add_option("-t", "--timeout", dest="timeout",
help="Timeout", metavar="TIMEOUT")
parser.add_option("-t", "--timeout",
dest="timeout",
help="Timeout",
metavar="TIMEOUT")
parser.add_option("-e", "--extra", dest="extra",
help="Extra serial port (used by some tests)", metavar="EXTRA")
parser.add_option("-e", "--extra",
dest="extra",
help="Extra serial port (used by some tests)",
metavar="EXTRA")
parser.add_option("-r", "--reset",
dest="forced_reset_type",
help="Forces different type of reset")
(self.options, _) = parser.parse_args()
@ -112,9 +126,21 @@ class Mbed:
result = False
return result
def touch_file(self, path, name):
with os.open(path, 'a'):
os.utime(path, None)
def reset(self):
self.safe_sendBreak(self.serial) # Instead of serial.sendBreak()
# Give time to wait for the image loading
""" reboot.txt - startup from standby state, reboots when in run mode.
shutdown.txt - shutdown from run mode
reset.txt - reset fpga during run mode """
if self.options.forced_reset_type:
path = os.path.join([self.disk, self.options.forced_reset_type.lower()])
if self.options.forced_reset_type.endswith('.txt'):
self.touch_file(path)
else:
self.safe_sendBreak(self.serial) # Instead of serial.sendBreak()
# Give time to wait for the image loading
sleep(2)
def flush(self):

View File

@ -68,8 +68,8 @@ IAR_PATH = "C:/Program Files (x86)/IAR Systems/Embedded Workbench 6.0/arm"
CW_GCC_PATH = "C:/Freescale/CW MCU v10.3/Cross_Tools/arm-none-eabi-gcc-4_6_2/bin"
CW_EWL_PATH = "C:/Freescale/CW MCU v10.3/MCU/ARM_GCC_Support/ewl/lib"
# Goanna static analyzer
GOANNA_PATH = "c:/Program Files (x86)/RedLizards/Goanna Central 3.1.4/bin"
# Goanna static analyser. Please overload it in private_settings.py
GOANNA_PATH = "c:/Program Files (x86)/RedLizards/Goanna Central 3.2.3/bin"
# cppcheck path (command) and output message format
CPPCHECK_CMD = ["cppcheck", "--enable=all"]

View File

@ -20,26 +20,10 @@ Author: Przemyslaw Wirkus <Przemyslaw.wirkus@arm.com>
-------------------------------------------------------------------------------
Usage: singletest.py [options]
Call:
singletest.py --help
This script allows you to run mbed defined test cases for particular MCU(s)
and corresponding toolchain(s).
Options:
-h, --help show this help message and exit
-i FILE, --tests=FILE
Points to file with test specification
-M FILE, --MUTS=FILE Points to file with MUTs specification (overwrites
settings.py and private_settings.py)
-g, --goanna-for-tests
Run Goanna static analyse tool for tests
-G, --goanna-for-sdk Run Goanna static analyse tool for mbed SDK
-s, --suppress-summary
Suppresses display of wellformatted table with test
results
-v, --verbose Verbose mode (pronts some extra information)
Example: singletest.py -i test_spec.json -M muts_all.json
to get help information.
-------------------------------------------------------------------------------
@ -92,6 +76,8 @@ import re
import os
from types import ListType
import random
import thread
import threading
from os.path import join, abspath, dirname, exists, basename
from shutil import copy
@ -116,14 +102,12 @@ from workspace_tools.targets import TARGET_MAP
from workspace_tools.tests import TEST_MAP
from workspace_tools.tests import TESTS
from workspace_tools.libraries import LIBRARIES, LIBRARY_MAP
from workspace_tools.utils import construct_enum
# Be sure that the tools directory is in the search path
ROOT = abspath(join(dirname(__file__), ".."))
sys.path.insert(0, ROOT)
# Imports related to mbed build pi
from workspace_tools.settings import MUTs
class ProcessObserver(Thread):
def __init__(self, proc):
@ -167,6 +151,9 @@ class SingleTestRunner(object):
TEST_LOOPS_LIST = [] # We redefine no.of loops per test_id
TEST_LOOPS_DICT = {} # TEST_LOOPS_LIST in dict format: { test_id : test_loop_count}
muts = {} # MUTs descriptor (from external file)
test_spec = {} # Test specification (from external file)
# mbed test suite -> SingleTestRunner
TEST_RESULT_MAPPING = {"success" : TEST_RESULT_OK,
"failure" : TEST_RESULT_FAIL,
@ -178,9 +165,29 @@ class SingleTestRunner(object):
"no_image" : TEST_RESULT_NO_IMAGE,
"end" : TEST_RESULT_UNDEF}
def __init__(self, _global_loops_count=1, _test_loops_list=""):
pattern = "\\{(" + "|".join(self.TEST_RESULT_MAPPING.keys()) + ")\\}"
self.RE_DETECT_TESTCASE_RESULT = re.compile(pattern)
def __init__(self,
_global_loops_count=1,
_test_loops_list="",
_muts={},
_test_spec={},
_opts_goanna_for_mbed_sdk=None,
_opts_goanna_for_tests=None,
_opts_shuffle_test_order=False,
_opts_shuffle_test_seed=None,
_opts_test_by_names=None,
_opts_test_only_peripheral=False,
_opts_test_only_common=False,
_opts_verbose_skipped_tests=False,
_opts_verbose=False,
_opts_firmware_global_name=None,
_opts_only_build_tests=False,
_opts_suppress_summary=False,
_opts_test_x_toolchain_summary=False
):
""" Let's try hard to init this object """
PATTERN = "\\{(" + "|".join(self.TEST_RESULT_MAPPING.keys()) + ")\\}"
self.RE_DETECT_TESTCASE_RESULT = re.compile(PATTERN)
# Settings related to test loops counters
try:
_global_loops_count = int(_global_loops_count)
except:
@ -191,6 +198,285 @@ class SingleTestRunner(object):
self.TEST_LOOPS_LIST = _test_loops_list if _test_loops_list else []
self.TEST_LOOPS_DICT = self.test_loop_list_to_dict(_test_loops_list)
self.shuffle_random_seed = 0.0
self.SHUFFLE_SEED_ROUND = 10
# MUT list and test specification storage
self.muts = _muts
self.test_spec = _test_spec
# Settings passed e.g. from command line
self.opts_goanna_for_mbed_sdk = _opts_goanna_for_mbed_sdk
self.opts_goanna_for_tests = _opts_goanna_for_tests
self.opts_shuffle_test_order = _opts_shuffle_test_order
self.opts_shuffle_test_seed = _opts_shuffle_test_seed
self.opts_test_by_names = _opts_test_by_names
self.opts_test_only_peripheral = _opts_test_only_peripheral
self.opts_test_only_common = _opts_test_only_common
self.opts_verbose_skipped_tests = _opts_verbose_skipped_tests
self.opts_verbose = _opts_verbose
self.opts_firmware_global_name = _opts_firmware_global_name
self.opts_only_build_tests = _opts_only_build_tests
self.opts_suppress_summary = _opts_suppress_summary
self.opts_test_x_toolchain_summary = _opts_test_x_toolchain_summary
# With this lock we should control access to certain resources inside this class
self.resource_lock = thread.allocate_lock()
self.RestRequest = construct_enum(REST_MUTS='muts',
REST_TEST_SPEC='test_spec',
REST_TEST_RESULTS='test_results')
def get_rest_result_template(self, result, command, success_code):
result = {"result": result,
"command" : command,
"success_code": success_code}
return result
# REST API handlers for Flask framework
def rest_api_status(self):
""" Returns current test execution status. E.g. running / finished etc. """
with self.resource_lock:
pass
def rest_api_config(self):
""" Returns configuration passed to SingleTest executor """
with self.resource_lock:
pass
def rest_api_log(self):
""" Returns current test log """
with self.resource_lock:
pass
def rest_api_request_handler(self, request_type):
""" Returns various data structures. Both static and mutable during test """
result = {}
success_code = 0
with self.resource_lock:
if request_type == self.RestRequest.REST_MUTS:
result = self.muts # Returns MUTs
elif request_type == self.RestRequest.REST_TEST_SPEC:
result = self.test_spec # Returns Test Specification
elif request_type == self.RestRequest.REST_TEST_RESULTS:
pass # Returns test results
else:
success_code = -1
return json.dumps(self.get_rest_result_template(result, 'request/' + request_type, success_code), indent=4)
def shuffle_random_func(self):
return self.shuffle_random_seed
def is_shuffle_seed_float(self):
""" return true if function parameter can be converted to float """
result = True
try:
float(self.shuffle_random_seed)
except ValueError:
result = False
return result
def execute(self):
clean = self.test_spec.get('clean', False)
test_ids = self.test_spec.get('test_ids', [])
# Here we store test results
test_summary = []
# Generate seed for shuffle if seed is not provided in
self.shuffle_random_seed = round(random.random(), self.SHUFFLE_SEED_ROUND)
if self.opts_shuffle_test_seed is not None and self.is_shuffle_seed_float():
self.shuffle_random_seed = round(float(self.opts_shuffle_test_seed), self.SHUFFLE_SEED_ROUND)
for target, toolchains in self.test_spec['targets'].iteritems():
for toolchain in toolchains:
# print '=== %s::%s ===' % (target, toolchain)
# Let's build our test
if target not in TARGET_MAP:
print 'Skipped tests for %s target. Target platform not found' % (target)
continue
T = TARGET_MAP[target]
build_mbed_libs_options = ["analyze"] if self.opts_goanna_for_mbed_sdk else None
clean_mbed_libs_options = True if self.opts_goanna_for_mbed_sdk or clean else None
build_mbed_libs_result = build_mbed_libs(T,
toolchain,
options=build_mbed_libs_options,
clean=clean_mbed_libs_options)
if not build_mbed_libs_result:
print 'Skipped tests for %s target. Toolchain %s is not yet supported for this target' % (T.name, toolchain)
continue
build_dir = join(BUILD_DIR, "test", target, toolchain)
# Enumerate through all tests
test_map_keys = TEST_MAP.keys()
if self.opts_shuffle_test_order:
random.shuffle(test_map_keys, self.shuffle_random_func)
for test_id in test_map_keys:
test = TEST_MAP[test_id]
if self.opts_test_by_names and test_id not in self.opts_test_by_names.split(','):
continue
if test_ids and test_id not in test_ids:
continue
if self.opts_test_only_peripheral and not test.peripherals:
if self.opts_verbose_skipped_tests:
print "TargetTest::%s::NotPeripheralTestSkipped()" % (target)
continue
if self.opts_test_only_common and test.peripherals:
if self.opts_verbose_skipped_tests:
print "TargetTest::%s::PeripheralTestSkipped()" % (target)
continue
if test.automated and test.is_supported(target, toolchain):
if not is_peripherals_available(target, test.peripherals):
if self.opts_verbose_skipped_tests:
test_peripherals = test.peripherals if test.peripherals else []
print "TargetTest::%s::TestSkipped(%s)" % (target, ",".join(test_peripherals))
continue
build_project_options = ["analyze"] if self.opts_goanna_for_tests else None
clean_project_options = True if self.opts_goanna_for_tests or clean else None
# Detect which lib should be added to test
# Some libs have to compiled like RTOS or ETH
libraries = []
for lib in LIBRARIES:
if lib['build_dir'] in test.dependencies:
libraries.append(lib['id'])
# Build libs for test
for lib_id in libraries:
build_lib(lib_id,
T,
toolchain,
options=build_project_options,
verbose=self.opts_verbose,
clean=clean_mbed_libs_options)
# TODO: move this 2 below loops to separate function
INC_DIRS = []
for lib_id in libraries:
if 'inc_dirs_ext' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['inc_dirs_ext']:
INC_DIRS.extend(LIBRARY_MAP[lib_id]['inc_dirs_ext'])
MACROS = []
for lib_id in libraries:
if 'macros' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['macros']:
MACROS.extend(LIBRARY_MAP[lib_id]['macros'])
project_name = self.opts_firmware_global_name if opts.firmware_global_name else None
path = build_project(test.source_dir,
join(build_dir, test_id),
T,
toolchain,
test.dependencies,
options=build_project_options,
clean=clean_project_options,
verbose=self.opts_verbose,
name=project_name,
macros=MACROS,
inc_dirs=INC_DIRS)
if self.opts_only_build_tests:
# With this option we are skipping testing phase
continue
# For an automated test the duration act as a timeout after
# which the test gets interrupted
test_spec = shape_test_request(target, path, test_id, test.duration)
test_loops = single_test.get_test_loop_count(test_id)
single_test_result = single_test.handle(test_spec, target, toolchain, test_loops=test_loops)
if single_test_result is not None:
test_summary.append(single_test_result)
return test_summary, self.shuffle_random_seed
def generate_test_summary_by_target(self, test_summary, shuffle_seed=None):
""" Prints well-formed summary with results (SQL table like)
table shows text x toolchain test result matrix """
RESULT_INDEX = 0
TARGET_INDEX = 1
TOOLCHAIN_INDEX = 2
TEST_INDEX = 3
DESC_INDEX = 4
unique_targets = get_unique_value_from_summary(test_summary, TARGET_INDEX)
unique_tests = get_unique_value_from_summary(test_summary, TEST_INDEX)
unique_test_desc = get_unique_value_from_summary_ext(test_summary, TEST_INDEX, DESC_INDEX)
unique_toolchains = get_unique_value_from_summary(test_summary, TOOLCHAIN_INDEX)
result = "Test summary:\n"
result_dict = {} # test : { toolchain : result }
for target in unique_targets:
for test in test_summary:
if test[TEST_INDEX] not in result_dict:
result_dict[test[TEST_INDEX]] = {}
result_dict[test[TEST_INDEX]][test[TOOLCHAIN_INDEX]] = test[RESULT_INDEX]
pt_cols = ["Target", "Test ID", "Test Description"] + unique_toolchains
pt = PrettyTable(pt_cols)
for col in pt_cols:
pt.align[col] = "l"
pt.padding_width = 1 # One space between column edges and contents (default)
for test in unique_tests:
test_results = result_dict[test]
row = [target, test, unique_test_desc[test]]
for toolchain in unique_toolchains:
row.append(test_results[toolchain])
pt.add_row(row)
result += pt.get_string()
shuffle_seed_text = "Shuffle Seed: %.*f"% (self.SHUFFLE_SEED_ROUND,
shuffle_seed if shuffle_seed else self.shuffle_random_seed)
result += "\n%s"% (shuffle_seed_text if self.opts_shuffle_test_order else '')
return result
def generate_test_summary(self, test_summary, shuffle_seed=None):
""" Prints well-formed summary with results (SQL table like)
table shows target x test results matrix across """
result = "Test summary:\n"
# Pretty table package is used to print results
pt = PrettyTable(["Result", "Target", "Toolchain", "Test ID", "Test Description",
"Elapsed Time (sec)", "Timeout (sec)", "Loops"])
pt.align["Result"] = "l" # Left align
pt.align["Target"] = "l" # Left align
pt.align["Toolchain"] = "l" # Left align
pt.align["Test ID"] = "l" # Left align
pt.align["Test Description"] = "l" # Left align
pt.padding_width = 1 # One space between column edges and contents (default)
result_dict = {single_test.TEST_RESULT_OK : 0,
single_test.TEST_RESULT_FAIL : 0,
single_test.TEST_RESULT_ERROR : 0,
single_test.TEST_RESULT_UNDEF : 0,
single_test.TEST_RESULT_IOERR_COPY : 0,
single_test.TEST_RESULT_IOERR_DISK : 0,
single_test.TEST_RESULT_IOERR_SERIAL : 0,
single_test.TEST_RESULT_NO_IMAGE : 0,
single_test.TEST_RESULT_TIMEOUT : 0
}
for test in test_summary:
if test[0] in result_dict:
result_dict[test[0]] += 1
pt.add_row(test)
result += pt.get_string()
result += "\n"
# Print result count
result += "Result: " + ' / '.join(['%s %s' % (value, key) for (key, value) in {k: v for k, v in result_dict.items() if v != 0}.iteritems()])
shuffle_seed_text = "Shuffle Seed: %.*f\n"% (self.SHUFFLE_SEED_ROUND,
shuffle_seed if shuffle_seed else self.shuffle_random_seed)
result += "\n%s"% (shuffle_seed_text if self.opts_shuffle_test_order else '')
return result
def test_loop_list_to_dict(self, test_loops_str):
""" Transforms test_id=X,test_id=X,test_id=X into dictionary {test_id : test_id_loops_count} """
result = {}
@ -242,7 +528,7 @@ class SingleTestRunner(object):
result = False
return result, resutl_msg, copy_method
def delete_file(file_path):
def delete_file(self, file_path):
""" Remove file from the system """
result = True
resutl_msg = ""
@ -352,10 +638,10 @@ class SingleTestRunner(object):
cmd = ["python", "%s.py" % name, '-p', port, '-d', disk, '-t', str(duration), "-e", extra_serial]
proc = Popen(cmd, stdout=PIPE, cwd=HOST_TESTS)
obs = ProcessObserver(proc)
start = time()
start_time = time()
line = ''
output = []
while (time() - start) < duration:
while (time() - start_time) < duration:
try:
c = obs.queue.get(block=True, timeout=1)
except Empty, _:
@ -433,6 +719,40 @@ def shape_test_request(mcu, image_path, test_id, duration=10):
return json.dumps(test_spec)
def show_json_file_format_error(json_spec_filename, line, column):
""" Prints JSON broken content """
with open(json_spec_filename) as data_file:
line_no = 1
for json_line in data_file:
if line_no + 5 >= line: # Print last few lines before error
print 'Line %d:\t'%line_no + json_line, # Prints line
if line_no == line:
print ' ' * len('Line %d:'%line_no) + '\t', '-' * (column-1) + '^'
break
line_no += 1
def json_format_error_defect_pos(json_error_msg):
""" Gets first error line and column in JSON file format.
Parsed from exception thrown by json.loads() string """
result = None
line, column = 0, 0
# Line value search
line_search = re.search('line [0-9]+', json_error_msg)
if line_search is not None:
ls = line_search.group().split(' ')
if len(ls) == 2:
line = int(ls[1])
# Column position search
column_search = re.search('column [0-9]+', json_error_msg)
if column_search is not None:
cs = column_search.group().split(' ')
if len(cs) == 2:
column = int(cs[1])
result = [line, column]
return result
def get_json_data_from_file(json_spec_filename, verbose=False):
""" Loads from file JSON formatted string to data structure """
result = None
@ -442,7 +762,15 @@ def get_json_data_from_file(json_spec_filename, verbose=False):
result = json.load(data_file)
except ValueError as json_error_msg:
result = None
print "Error in '%s' file: %s" % (json_spec_filename, json_error_msg)
print "Error in '%s' file. %s" % (json_spec_filename, json_error_msg)
# We can print where error occurred inside JSON file if we can parse exception msg
json_format_defect_pos = json_format_error_defect_pos(str(json_error_msg))
if json_format_defect_pos is not None:
line = json_format_defect_pos[0]
column = json_format_defect_pos[1]
print
show_json_file_format_error(json_spec_filename, line, column)
except IOError as fileopen_error_msg:
print "Error: %s" % (fileopen_error_msg)
if verbose and result:
@ -634,7 +962,7 @@ def progress_bar(percent_progress, saturation=0):
step = int(percent_progress / 2) # Scale by to (scale: 1 - 50)
str_progress = '#' * step + '.' * int(50 - step)
c = '!' if str_progress[38] == '.' else '|'
if (saturation > 0):
if saturation > 0:
saturation = saturation / 2
str_progress = str_progress[:saturation] + c + str_progress[saturation:]
return str_progress
@ -661,80 +989,29 @@ def get_unique_value_from_summary_ext(test_summary, index_key, index_val):
return result
def generate_test_summary_by_target(test_summary):
""" Prints well-formed summary with results (SQL table like)
table shows text x toolchain test result matrix """
RESULT_INDEX = 0
TARGET_INDEX = 1
TOOLCHAIN_INDEX = 2
TEST_INDEX = 3
DESC_INDEX = 4
class SingleTestExecutor(threading.Thread):
def __init__(self, singletest):
threading.Thread.__init__(self)
unique_targets = get_unique_value_from_summary(test_summary, TARGET_INDEX)
unique_tests = get_unique_value_from_summary(test_summary, TEST_INDEX)
unique_test_desc = get_unique_value_from_summary_ext(test_summary, TEST_INDEX, DESC_INDEX)
unique_toolchains = get_unique_value_from_summary(test_summary, TOOLCHAIN_INDEX)
def run(self):
start = time()
result = "Test summary:\n"
result_dict = {} # test : { toolchain : result }
for target in unique_targets:
for test in test_summary:
if test[TEST_INDEX] not in result_dict:
result_dict[test[TEST_INDEX]] = { }
result_dict[test[TEST_INDEX]][test[TOOLCHAIN_INDEX]] = test[RESULT_INDEX]
# Execute tests depending on options and filter applied
test_summary, shuffle_seed = single_test.execute()
pt_cols = ["Target", "Test ID", "Test Description"] + unique_toolchains
pt = PrettyTable(pt_cols)
for col in pt_cols:
pt.align[col] = "l"
pt.padding_width = 1 # One space between column edges and contents (default)
elapsed_time = time() - start
for test in unique_tests:
test_results = result_dict[test]
row = [target, test, unique_test_desc[test]]
for toolchain in unique_toolchains:
row.append(test_results[toolchain])
pt.add_row(row)
result += pt.get_string()
result += "\n\n"
return result
# Human readable summary
if not single_test.opts_suppress_summary:
# prints well-formed summary with results (SQL table like)
print single_test.generate_test_summary(test_summary, shuffle_seed)
if single_test.opts_test_x_toolchain_summary:
# prints well-formed summary with results (SQL table like)
# table shows text x toolchain test result matrix
print single_test.generate_test_summary_by_target(test_summary, shuffle_seed)
def generate_test_summary(test_summary):
""" Prints well-formed summary with results (SQL table like)
table shows target x test results matrix across """
result = "Test summary:\n"
# Pretty table package is used to print results
pt = PrettyTable(["Result", "Target", "Toolchain", "Test ID", "Test Description",
"Elapsed Time (sec)", "Timeout (sec)", "Loops"])
pt.align["Result"] = "l" # Left align
pt.align["Target"] = "l" # Left align
pt.align["Toolchain"] = "l" # Left align
pt.align["Test ID"] = "l" # Left align
pt.align["Test Description"] = "l" # Left align
pt.padding_width = 1 # One space between column edges and contents (default)
result_dict = {single_test.TEST_RESULT_OK : 0,
single_test.TEST_RESULT_FAIL : 0,
single_test.TEST_RESULT_ERROR : 0,
single_test.TEST_RESULT_UNDEF : 0,
single_test.TEST_RESULT_IOERR_COPY : 0,
single_test.TEST_RESULT_IOERR_DISK : 0,
single_test.TEST_RESULT_IOERR_SERIAL : 0,
single_test.TEST_RESULT_NO_IMAGE : 0,
single_test.TEST_RESULT_TIMEOUT : 0 }
for test in test_summary:
if test[0] in result_dict:
result_dict[test[0]] += 1
pt.add_row(test)
result += pt.get_string()
result += "\n"
# Print result count
result += "Result: " + ' / '.join(['%s %s' % (value, key) for (key, value) in {k: v for k, v in result_dict.items() if v != 0}.iteritems()])
result += "\n"
return result
print "Completed in %d sec"% (elapsed_time)
if __name__ == '__main__':
@ -754,13 +1031,13 @@ if __name__ == '__main__':
dest='goanna_for_tests',
metavar=False,
action="store_true",
help='Run Goanna static analyse tool for tests')
help='Run Goanna static analyse tool for tests. (Project will be rebuilded)')
parser.add_option('-G', '--goanna-for-sdk',
dest='goanna_for_mbed_sdk',
metavar=False,
action="store_true",
help='Run Goanna static analyse tool for mbed SDK')
help='Run Goanna static analyse tool for mbed SDK (Project will be rebuilded)')
parser.add_option('-s', '--suppress-summary',
dest='suppress_summary',
@ -836,12 +1113,32 @@ if __name__ == '__main__':
dest='firmware_global_name',
help='Set global name for all produced projects. E.g. you can call all test binaries firmware.bin')
parser.add_option('-u', '--shuffle-tests',
parser.add_option('-u', '--shuffle',
dest='shuffle_test_order',
default=False,
action="store_true",
help='Shuffles test execution order')
parser.add_option('', '--shuffle-seed',
dest='shuffle_test_seed',
default=None,
help='Shuffle seed (If you want to reproduce your shuffle order please use seed provided in test summary)')
parser.add_option('-f', '--filter',
dest='general_filter_regex',
default=None,
help='For some commands you can use filter to filter out results')
parser.add_option('', '--rest-api',
dest='rest_api_enabled',
default=False,
action="store_true",
help='Enables REST API.')
parser.add_option('', '--rest-api-port',
dest='rest_api_port_no',
help='Sets port for REST API interface')
parser.add_option('', '--verbose-skipped',
dest='verbose_skipped_tests',
default=False,
@ -878,7 +1175,7 @@ if __name__ == '__main__':
# Only prints matrix of supported toolchains
if opts.supported_toolchains:
mcu_toolchain_matrix()
print mcu_toolchain_matrix(platform_filter=opts.general_filter_regex)
exit(0)
# Open file with test specification
@ -886,15 +1183,16 @@ if __name__ == '__main__':
# should be covered by the test scenario
test_spec = get_json_data_from_file(opts.test_spec_filename) if opts.test_spec_filename else None
if test_spec is None:
parser.print_help()
if not opts.test_spec_filename:
parser.print_help()
exit(-1)
# Get extra MUTs if applicable
if opts.muts_spec_filename:
MUTs = get_json_data_from_file(opts.muts_spec_filename)
MUTs = get_json_data_from_file(opts.muts_spec_filename) if opts.muts_spec_filename else None
if MUTs is None:
parser.print_help()
if not opts.muts_spec_filename:
parser.print_help()
exit(-1)
# Only prints read MUTs configuration
@ -912,131 +1210,63 @@ if __name__ == '__main__':
if test_spec and opts.verbose:
print print_test_configuration_from_json(test_spec)
# Magic happens here... ;)
start = time()
single_test = SingleTestRunner(_global_loops_count=opts.test_global_loops_value, _test_loops_list=opts.test_loops_list)
if opts.only_build_tests:
# We are skipping testing phase, and suppress summary
opts.suppress_summary = True
clean = test_spec.get('clean', False)
test_ids = test_spec.get('test_ids', [])
groups = test_spec.get('test_groups', [])
single_test = SingleTestRunner(_global_loops_count=opts.test_global_loops_value,
_test_loops_list=opts.test_loops_list,
_muts=MUTs,
_test_spec=test_spec,
_opts_goanna_for_mbed_sdk=opts.goanna_for_mbed_sdk,
_opts_goanna_for_tests=opts.goanna_for_tests,
_opts_shuffle_test_order=opts.shuffle_test_order,
_opts_shuffle_test_seed=opts.shuffle_test_seed,
_opts_test_by_names=opts.test_by_names,
_opts_test_only_peripheral=opts.test_only_peripheral,
_opts_test_only_common=opts.test_only_common,
_opts_verbose_skipped_tests=opts.verbose_skipped_tests,
_opts_verbose=opts.verbose,
_opts_firmware_global_name=opts.firmware_global_name,
_opts_only_build_tests=opts.only_build_tests,
_opts_suppress_summary=opts.suppress_summary,
_opts_test_x_toolchain_summary=opts.test_x_toolchain_summary
)
# Here we store test results
test_summary = []
try:
st_exec_thread = SingleTestExecutor(single_test)
except KeyboardInterrupt, e:
print "\n[CTRL+c] exit"
st_exec_thread.start()
for target, toolchains in test_spec['targets'].iteritems():
for toolchain in toolchains:
# print '=== %s::%s ===' % (target, toolchain)
# Let's build our test
if target not in TARGET_MAP:
print 'Skipped tests for %s target. Target platform not found' % (target)
continue
if opts.rest_api_enabled:
# Enable REST API
from flask import Flask
app = Flask(__name__)
T = TARGET_MAP[target]
build_mbed_libs_options = ["analyze"] if opts.goanna_for_mbed_sdk else None
build_mbed_libs_result = build_mbed_libs(T, toolchain, options=build_mbed_libs_options)
if not build_mbed_libs_result:
print 'Skipped tests for %s target. Toolchain %s is not yet supported for this target' % (T.name, toolchain)
continue
@app.route('/')
def hello_world():
return 'Hello World!'
build_dir = join(BUILD_DIR, "test", target, toolchain)
@app.route('/status')
def rest_api_status():
return single_test.rest_api_status() # TODO
# Enumerate through all tests
test_map_keys = TEST_MAP.keys()
if opts.shuffle_test_order:
random.shuffle(test_map_keys)
@app.route('/config')
def rest_api_config():
return single_test.rest_api_config() # TODO
for test_id in test_map_keys:
test = TEST_MAP[test_id]
if opts.test_by_names and test_id not in opts.test_by_names.split(','):
continue
@app.route('/log')
def rest_api_log():
return single_test.rest_api_log() # TODO
if test_ids and test_id not in test_ids:
continue
@app.route('/request/<request_type>') # 'muts', 'test_spec', 'test_results'
def rest_api_request_handler(request_type):
result = single_test.rest_api_request_handler(request_type) # TODO
return result
if opts.test_only_peripheral and not test.peripherals:
if opts.verbose_skipped_tests:
print "TargetTest::%s::NotPeripheralTestSkipped()" % (target)
continue
if opts.test_only_common and test.peripherals:
if opts.verbose_skipped_tests:
print "TargetTest::%s::PeripheralTestSkipped()" % (target)
continue
if test.automated and test.is_supported(target, toolchain):
if not is_peripherals_available(target, test.peripherals):
if opts.verbose_skipped_tests:
test_peripherals = test.peripherals if test.peripherals else []
print "TargetTest::%s::TestSkipped(%s)" % (target, ",".join(test_peripherals))
continue
# This is basic structure storing test results
test_result = {
'target': target,
'toolchain': toolchain,
'test_id': test_id,
}
build_project_options = ["analyze"] if opts.goanna_for_tests else None
# Detect which lib should be added to test
# Some libs have to compiled like RTOS or ETH
libraries = []
for lib in LIBRARIES:
if lib['build_dir'] in test.dependencies:
libraries.append(lib['id'])
# Build libs for test
for lib_id in libraries:
build_lib(lib_id, T, toolchain, options=build_project_options,
verbose=opts.verbose, clean=clean)
# TODO: move this 2 below loops to separate function
INC_DIRS = []
for lib_id in libraries:
if 'inc_dirs_ext' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['inc_dirs_ext']:
INC_DIRS.extend(LIBRARY_MAP[lib_id]['inc_dirs_ext'])
MACROS = []
for lib_id in libraries:
if 'macros' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['macros']:
MACROS.extend(LIBRARY_MAP[lib_id]['macros'])
project_name = opts.firmware_global_name if opts.firmware_global_name else None
path = build_project(test.source_dir, join(build_dir, test_id),
T, toolchain, test.dependencies,
options=build_project_options,
clean=clean,
verbose=opts.verbose,
name=project_name,
macros=MACROS,
inc_dirs=INC_DIRS)
test_result_cache = join(dirname(path), "test_result.json")
if opts.only_build_tests:
# We are skipping testing phase, and suppress summary
opts.suppress_summary = True
continue
# For an automated test the duration act as a timeout after
# which the test gets interrupted
test_spec = shape_test_request(target, path, test_id, test.duration)
test_loops = single_test.get_test_loop_count(test_id)
single_test_result = single_test.handle(test_spec, target, toolchain, test_loops=test_loops)
if single_test_result is not None:
test_summary.append(single_test_result)
# print test_spec, target, toolchain
elapsed_time = time() - start
# Human readable summary
if not opts.suppress_summary:
# prints well-formed summary with results (SQL table like)
print generate_test_summary(test_summary)
if opts.test_x_toolchain_summary:
# prints well-formed summary with results (SQL table like)
# table shows text x toolchain test result matrix
print generate_test_summary_by_target(test_summary)
print "Completed in %d sec" % (time() - start)
rest_api_port = int(opts.rest_api_port_no) if opts.rest_api_port_no else 5555
app.debug = False
app.run(port=rest_api_port) # Blocking Flask REST API web service
else:
st_exec_thread.join()

View File

@ -95,3 +95,8 @@ def args_error(parser, message):
print "\n\n%s\n\n" % message
parser.print_help()
sys.exit()
def construct_enum(**enums):
""" Create your own pseudo-enums """
return type('Enum', (), enums)