mirror of https://github.com/ARMmbed/mbed-os.git
2285 lines
98 KiB
Python
2285 lines
98 KiB
Python
"""
|
|
mbed SDK
|
|
Copyright (c) 2011-2014 ARM Limited
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
|
|
Author: Przemyslaw Wirkus <Przemyslaw.wirkus@arm.com>
|
|
"""
|
|
from __future__ import print_function
|
|
import six
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import json
|
|
import uuid
|
|
import pprint
|
|
import random
|
|
import argparse
|
|
import datetime
|
|
import threading
|
|
import ctypes
|
|
import functools
|
|
from colorama import Fore, Back, Style
|
|
from prettytable import PrettyTable, HEADER
|
|
from copy import copy, deepcopy
|
|
|
|
from time import sleep, time
|
|
try:
|
|
from Queue import Queue, Empty
|
|
except ImportError:
|
|
from queue import Queue, Empty
|
|
from os.path import join, exists, basename, relpath, isdir, isfile
|
|
from threading import Thread, Lock
|
|
from multiprocessing import Pool, cpu_count
|
|
from subprocess import Popen, PIPE
|
|
|
|
# Imports related to mbed build api
|
|
from tools.tests import TESTS
|
|
from tools.tests import TEST_MAP
|
|
from tools.paths import BUILD_DIR
|
|
from tools.paths import HOST_TESTS
|
|
from tools.utils import ToolException
|
|
from tools.utils import NotSupportedException
|
|
from tools.utils import construct_enum
|
|
from tools.targets import TARGET_MAP, Target
|
|
from tools.config import Config
|
|
import tools.test_configs as TestConfig
|
|
from tools.build_api import build_project, build_mbed_libs, build_lib
|
|
from tools.build_api import get_target_supported_toolchains
|
|
from tools.build_api import get_config
|
|
from tools.resources import Resources, MbedIgnoreSet, IGNORE_FILENAME
|
|
from tools.libraries import LIBRARIES, LIBRARY_MAP
|
|
from tools.options import extract_profile
|
|
from tools.toolchains import TOOLCHAIN_PATHS
|
|
from tools.toolchains import TOOLCHAINS
|
|
from tools.test_exporters import ReportExporter, ResultExporterType
|
|
from tools.utils import argparse_filestring_type
|
|
from tools.utils import argparse_uppercase_type
|
|
from tools.utils import argparse_lowercase_type
|
|
from tools.utils import argparse_many
|
|
from tools.notifier.mock import MockNotifier
|
|
from tools.notifier.term import TerminalNotifier
|
|
|
|
import tools.host_tests.host_tests_plugins as host_tests_plugins
|
|
|
|
try:
|
|
import mbed_lstools
|
|
from tools.compliance.ioper_runner import get_available_oper_test_scopes
|
|
except:
|
|
pass
|
|
|
|
|
|
class ProcessObserver(Thread):
|
|
def __init__(self, proc):
|
|
Thread.__init__(self)
|
|
self.proc = proc
|
|
self.queue = Queue()
|
|
self.daemon = True
|
|
self.active = True
|
|
self.start()
|
|
|
|
def run(self):
|
|
while self.active:
|
|
c = self.proc.stdout.read(1)
|
|
self.queue.put(c)
|
|
|
|
def stop(self):
|
|
self.active = False
|
|
try:
|
|
self.proc.terminate()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
class SingleTestExecutor(threading.Thread):
|
|
""" Example: Single test class in separate thread usage
|
|
"""
|
|
def __init__(self, single_test):
|
|
self.single_test = single_test
|
|
threading.Thread.__init__(self)
|
|
|
|
def run(self):
|
|
start = time()
|
|
# Execute tests depending on options and filter applied
|
|
test_summary, shuffle_seed, test_summary_ext, test_suite_properties_ext = self.single_test.execute()
|
|
elapsed_time = time() - start
|
|
|
|
# Human readable summary
|
|
if not self.single_test.opts_suppress_summary:
|
|
# prints well-formed summary with results (SQL table like)
|
|
print(self.single_test.generate_test_summary(test_summary,
|
|
shuffle_seed))
|
|
if self.single_test.opts_test_x_toolchain_summary:
|
|
# prints well-formed summary with results (SQL table like)
|
|
# table shows text x toolchain test result matrix
|
|
print(self.single_test.generate_test_summary_by_target(
|
|
test_summary, shuffle_seed))
|
|
print("Completed in %.2f sec"% (elapsed_time))
|
|
|
|
|
|
class SingleTestRunner(object):
|
|
""" Object wrapper for single test run which may involve multiple MUTs
|
|
"""
|
|
RE_DETECT_TESTCASE_RESULT = None
|
|
|
|
# Return codes for test script
|
|
TEST_RESULT_OK = "OK"
|
|
TEST_RESULT_FAIL = "FAIL"
|
|
TEST_RESULT_ERROR = "ERROR"
|
|
TEST_RESULT_UNDEF = "UNDEF"
|
|
TEST_RESULT_IOERR_COPY = "IOERR_COPY"
|
|
TEST_RESULT_IOERR_DISK = "IOERR_DISK"
|
|
TEST_RESULT_IOERR_SERIAL = "IOERR_SERIAL"
|
|
TEST_RESULT_TIMEOUT = "TIMEOUT"
|
|
TEST_RESULT_NO_IMAGE = "NO_IMAGE"
|
|
TEST_RESULT_MBED_ASSERT = "MBED_ASSERT"
|
|
TEST_RESULT_BUILD_FAILED = "BUILD_FAILED"
|
|
TEST_RESULT_NOT_SUPPORTED = "NOT_SUPPORTED"
|
|
|
|
GLOBAL_LOOPS_COUNT = 1 # How many times each test should be repeated
|
|
TEST_LOOPS_LIST = [] # We redefine no.of loops per test_id
|
|
TEST_LOOPS_DICT = {} # TEST_LOOPS_LIST in dict format: { test_id : test_loop_count}
|
|
|
|
muts = {} # MUTs descriptor (from external file)
|
|
test_spec = {} # Test specification (from external file)
|
|
|
|
# mbed test suite -> SingleTestRunner
|
|
TEST_RESULT_MAPPING = {"success" : TEST_RESULT_OK,
|
|
"failure" : TEST_RESULT_FAIL,
|
|
"error" : TEST_RESULT_ERROR,
|
|
"ioerr_copy" : TEST_RESULT_IOERR_COPY,
|
|
"ioerr_disk" : TEST_RESULT_IOERR_DISK,
|
|
"ioerr_serial" : TEST_RESULT_IOERR_SERIAL,
|
|
"timeout" : TEST_RESULT_TIMEOUT,
|
|
"no_image" : TEST_RESULT_NO_IMAGE,
|
|
"end" : TEST_RESULT_UNDEF,
|
|
"mbed_assert" : TEST_RESULT_MBED_ASSERT,
|
|
"build_failed" : TEST_RESULT_BUILD_FAILED,
|
|
"not_supproted" : TEST_RESULT_NOT_SUPPORTED
|
|
}
|
|
|
|
def __init__(self,
|
|
_global_loops_count=1,
|
|
_test_loops_list=None,
|
|
_muts={},
|
|
_clean=False,
|
|
_parser=None,
|
|
_opts=None,
|
|
_opts_log_file_name=None,
|
|
_opts_report_html_file_name=None,
|
|
_opts_report_junit_file_name=None,
|
|
_opts_report_build_file_name=None,
|
|
_opts_report_text_file_name=None,
|
|
_opts_build_report={},
|
|
_opts_build_properties={},
|
|
_test_spec={},
|
|
_opts_goanna_for_mbed_sdk=None,
|
|
_opts_goanna_for_tests=None,
|
|
_opts_shuffle_test_order=False,
|
|
_opts_shuffle_test_seed=None,
|
|
_opts_test_by_names=None,
|
|
_opts_peripheral_by_names=None,
|
|
_opts_test_only_peripheral=False,
|
|
_opts_test_only_common=False,
|
|
_opts_verbose_skipped_tests=False,
|
|
_opts_verbose_test_result_only=False,
|
|
_opts_verbose=False,
|
|
_opts_firmware_global_name=None,
|
|
_opts_only_build_tests=False,
|
|
_opts_parallel_test_exec=False,
|
|
_opts_suppress_summary=False,
|
|
_opts_test_x_toolchain_summary=False,
|
|
_opts_copy_method=None,
|
|
_opts_mut_reset_type=None,
|
|
_opts_jobs=None,
|
|
_opts_waterfall_test=None,
|
|
_opts_consolidate_waterfall_test=None,
|
|
_opts_extend_test_timeout=None,
|
|
_opts_auto_detect=None,
|
|
_opts_include_non_automated=False):
|
|
""" Let's try hard to init this object
|
|
"""
|
|
from colorama import init
|
|
init()
|
|
|
|
PATTERN = "\\{(" + "|".join(self.TEST_RESULT_MAPPING.keys()) + ")\\}"
|
|
self.RE_DETECT_TESTCASE_RESULT = re.compile(PATTERN)
|
|
# Settings related to test loops counters
|
|
try:
|
|
_global_loops_count = int(_global_loops_count)
|
|
except:
|
|
_global_loops_count = 1
|
|
if _global_loops_count < 1:
|
|
_global_loops_count = 1
|
|
self.GLOBAL_LOOPS_COUNT = _global_loops_count
|
|
self.TEST_LOOPS_LIST = _test_loops_list if _test_loops_list else []
|
|
self.TEST_LOOPS_DICT = self.test_loop_list_to_dict(_test_loops_list)
|
|
|
|
self.shuffle_random_seed = 0.0
|
|
self.SHUFFLE_SEED_ROUND = 10
|
|
|
|
# MUT list and test specification storage
|
|
self.muts = _muts
|
|
self.test_spec = _test_spec
|
|
|
|
# Settings passed e.g. from command line
|
|
self.opts_log_file_name = _opts_log_file_name
|
|
self.opts_report_html_file_name = _opts_report_html_file_name
|
|
self.opts_report_junit_file_name = _opts_report_junit_file_name
|
|
self.opts_report_build_file_name = _opts_report_build_file_name
|
|
self.opts_report_text_file_name = _opts_report_text_file_name
|
|
self.opts_goanna_for_mbed_sdk = _opts_goanna_for_mbed_sdk
|
|
self.opts_goanna_for_tests = _opts_goanna_for_tests
|
|
self.opts_shuffle_test_order = _opts_shuffle_test_order
|
|
self.opts_shuffle_test_seed = _opts_shuffle_test_seed
|
|
self.opts_test_by_names = _opts_test_by_names
|
|
self.opts_peripheral_by_names = _opts_peripheral_by_names
|
|
self.opts_test_only_peripheral = _opts_test_only_peripheral
|
|
self.opts_test_only_common = _opts_test_only_common
|
|
self.opts_verbose_skipped_tests = _opts_verbose_skipped_tests
|
|
self.opts_verbose_test_result_only = _opts_verbose_test_result_only
|
|
self.opts_verbose = _opts_verbose
|
|
self.opts_firmware_global_name = _opts_firmware_global_name
|
|
self.opts_only_build_tests = _opts_only_build_tests
|
|
self.opts_parallel_test_exec = _opts_parallel_test_exec
|
|
self.opts_suppress_summary = _opts_suppress_summary
|
|
self.opts_test_x_toolchain_summary = _opts_test_x_toolchain_summary
|
|
self.opts_copy_method = _opts_copy_method
|
|
self.opts_mut_reset_type = _opts_mut_reset_type
|
|
self.opts_jobs = _opts_jobs if _opts_jobs is not None else 1
|
|
self.opts_waterfall_test = _opts_waterfall_test
|
|
self.opts_consolidate_waterfall_test = _opts_consolidate_waterfall_test
|
|
self.opts_extend_test_timeout = _opts_extend_test_timeout
|
|
self.opts_clean = _clean
|
|
self.opts_parser = _parser
|
|
self.opts = _opts
|
|
self.opts_auto_detect = _opts_auto_detect
|
|
self.opts_include_non_automated = _opts_include_non_automated
|
|
|
|
self.build_report = _opts_build_report
|
|
self.build_properties = _opts_build_properties
|
|
|
|
# File / screen logger initialization
|
|
self.logger = CLITestLogger(file_name=self.opts_log_file_name) # Default test logger
|
|
|
|
def dump_options(self):
|
|
""" Function returns data structure with common settings passed to SingelTestRunner
|
|
It can be used for example to fill _extra fields in database storing test suite single run data
|
|
Example:
|
|
data = self.dump_options()
|
|
or
|
|
data_str = json.dumps(self.dump_options())
|
|
"""
|
|
result = {"log_file_name" : str(self.opts_log_file_name),
|
|
"shuffle_test_order" : str(self.opts_shuffle_test_order),
|
|
"shuffle_test_seed" : str(self.opts_shuffle_test_seed),
|
|
"test_by_names" : str(self.opts_test_by_names),
|
|
"peripheral_by_names" : str(self.opts_peripheral_by_names),
|
|
"test_only_peripheral" : str(self.opts_test_only_peripheral),
|
|
"test_only_common" : str(self.opts_test_only_common),
|
|
"verbose" : str(self.opts_verbose),
|
|
"firmware_global_name" : str(self.opts_firmware_global_name),
|
|
"only_build_tests" : str(self.opts_only_build_tests),
|
|
"copy_method" : str(self.opts_copy_method),
|
|
"mut_reset_type" : str(self.opts_mut_reset_type),
|
|
"jobs" : str(self.opts_jobs),
|
|
"extend_test_timeout" : str(self.opts_extend_test_timeout),
|
|
"_dummy" : ''
|
|
}
|
|
return result
|
|
|
|
def shuffle_random_func(self):
|
|
return self.shuffle_random_seed
|
|
|
|
def is_shuffle_seed_float(self):
|
|
""" return true if function parameter can be converted to float
|
|
"""
|
|
result = True
|
|
try:
|
|
float(self.shuffle_random_seed)
|
|
except ValueError:
|
|
result = False
|
|
return result
|
|
|
|
# This will store target / toolchain specific properties
|
|
test_suite_properties_ext = {} # target : toolchain
|
|
# Here we store test results
|
|
test_summary = []
|
|
# Here we store test results in extended data structure
|
|
test_summary_ext = {}
|
|
execute_thread_slice_lock = Lock()
|
|
|
|
def execute_thread_slice(self, q, target, toolchains, clean, test_ids, build_report, build_properties):
|
|
for toolchain in toolchains:
|
|
tt_id = "%s::%s" % (toolchain, target)
|
|
|
|
T = TARGET_MAP[target]
|
|
|
|
# print target, toolchain
|
|
# Test suite properties returned to external tools like CI
|
|
test_suite_properties = {
|
|
'jobs': self.opts_jobs,
|
|
'clean': clean,
|
|
'target': target,
|
|
'vendor': T.extra_labels[0],
|
|
'test_ids': ', '.join(test_ids),
|
|
'toolchain': toolchain,
|
|
'shuffle_random_seed': self.shuffle_random_seed
|
|
}
|
|
|
|
|
|
# print '=== %s::%s ===' % (target, toolchain)
|
|
# Let's build our test
|
|
if target not in TARGET_MAP:
|
|
print(self.logger.log_line(
|
|
self.logger.LogType.NOTIF,
|
|
'Skipped tests for %s target. Target platform not found' %
|
|
(target)))
|
|
continue
|
|
|
|
clean_mbed_libs_options = (self.opts_goanna_for_mbed_sdk or
|
|
self.opts_clean or clean)
|
|
|
|
profile = extract_profile(self.opts_parser, self.opts, toolchain)
|
|
stats_depth = self.opts.stats_depth or 2
|
|
|
|
try:
|
|
build_mbed_libs_result = build_mbed_libs(
|
|
T, toolchain,
|
|
clean=clean_mbed_libs_options,
|
|
jobs=self.opts_jobs,
|
|
report=build_report,
|
|
properties=build_properties,
|
|
build_profile=profile,
|
|
notify=TerminalNotifier())
|
|
|
|
if not build_mbed_libs_result:
|
|
print(self.logger.log_line(
|
|
self.logger.LogType.NOTIF,
|
|
'Skipped tests for %s target. Toolchain %s is not '
|
|
'supported for this target'% (T.name, toolchain)))
|
|
continue
|
|
|
|
except ToolException:
|
|
print(self.logger.log_line(
|
|
self.logger.LogType.ERROR,
|
|
'There were errors while building MBED libs for %s using %s'
|
|
% (target, toolchain)))
|
|
continue
|
|
|
|
build_dir = join(BUILD_DIR, "test", target, toolchain)
|
|
|
|
test_suite_properties['build_mbed_libs_result'] = build_mbed_libs_result
|
|
test_suite_properties['build_dir'] = build_dir
|
|
test_suite_properties['skipped'] = []
|
|
|
|
# Enumerate through all tests and shuffle test order if requested
|
|
test_map_keys = sorted(TEST_MAP.keys())
|
|
|
|
if self.opts_shuffle_test_order:
|
|
random.shuffle(test_map_keys, self.shuffle_random_func)
|
|
|
|
valid_test_map_keys = self.get_valid_tests(test_map_keys, target, toolchain, test_ids, self.opts_include_non_automated)
|
|
skipped_test_map_keys = self.get_skipped_tests(test_map_keys, valid_test_map_keys)
|
|
|
|
for skipped_test_id in skipped_test_map_keys:
|
|
test_suite_properties['skipped'].append(skipped_test_id)
|
|
|
|
|
|
# First pass through all tests and determine which libraries need to be built
|
|
libraries = []
|
|
for test_id in valid_test_map_keys:
|
|
test = TEST_MAP[test_id]
|
|
|
|
# Detect which lib should be added to test
|
|
# Some libs have to compiled like RTOS or ETH
|
|
for lib in LIBRARIES:
|
|
if lib['build_dir'] in test.dependencies and lib['id'] not in libraries:
|
|
libraries.append(lib['id'])
|
|
|
|
|
|
clean_project_options = True if self.opts_goanna_for_tests or clean or self.opts_clean else None
|
|
|
|
# Build all required libraries
|
|
for lib_id in libraries:
|
|
try:
|
|
build_lib(lib_id,
|
|
T,
|
|
toolchain,
|
|
clean=clean_mbed_libs_options,
|
|
jobs=self.opts_jobs,
|
|
report=build_report,
|
|
properties=build_properties,
|
|
build_profile=profile,
|
|
notify=TerminalNotifier())
|
|
|
|
except ToolException:
|
|
print(self.logger.log_line(
|
|
self.logger.LogType.ERROR,
|
|
'There were errors while building library %s' % lib_id))
|
|
continue
|
|
|
|
|
|
for test_id in valid_test_map_keys:
|
|
test = TEST_MAP[test_id]
|
|
|
|
test_suite_properties['test.libs.%s.%s.%s'% (target, toolchain, test_id)] = ', '.join(libraries)
|
|
|
|
# TODO: move this 2 below loops to separate function
|
|
INC_DIRS = []
|
|
for lib_id in libraries:
|
|
if 'inc_dirs_ext' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['inc_dirs_ext']:
|
|
INC_DIRS.extend(LIBRARY_MAP[lib_id]['inc_dirs_ext'])
|
|
|
|
MACROS = []
|
|
for lib_id in libraries:
|
|
if 'macros' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['macros']:
|
|
MACROS.extend(LIBRARY_MAP[lib_id]['macros'])
|
|
MACROS.append('TEST_SUITE_TARGET_NAME="%s"'% target)
|
|
MACROS.append('TEST_SUITE_TEST_ID="%s"'% test_id)
|
|
test_uuid = uuid.uuid4()
|
|
MACROS.append('TEST_SUITE_UUID="%s"'% str(test_uuid))
|
|
|
|
# Prepare extended test results data structure (it can be used to generate detailed test report)
|
|
if target not in self.test_summary_ext:
|
|
self.test_summary_ext[target] = {} # test_summary_ext : toolchain
|
|
if toolchain not in self.test_summary_ext[target]:
|
|
self.test_summary_ext[target][toolchain] = {} # test_summary_ext : toolchain : target
|
|
|
|
tt_test_id = "%s::%s::%s" % (toolchain, target, test_id) # For logging only
|
|
|
|
project_name = self.opts_firmware_global_name if self.opts_firmware_global_name else None
|
|
try:
|
|
path = build_project(
|
|
test.source_dir, join(build_dir, test_id), T,
|
|
toolchain, test.dependencies, clean=clean_project_options,
|
|
name=project_name, macros=MACROS,
|
|
inc_dirs=INC_DIRS, jobs=self.opts_jobs, report=build_report,
|
|
properties=build_properties, project_id=test_id,
|
|
project_description=test.get_description(),
|
|
build_profile=profile, stats_depth=stats_depth,
|
|
notify=TerminalNotifier(),
|
|
)
|
|
|
|
except Exception as e:
|
|
project_name_str = project_name if project_name is not None else test_id
|
|
|
|
|
|
test_result = self.TEST_RESULT_FAIL
|
|
|
|
if isinstance(e, ToolException):
|
|
print(self.logger.log_line(
|
|
self.logger.LogType.ERROR,
|
|
'There were errors while building project %s' %
|
|
project_name_str))
|
|
test_result = self.TEST_RESULT_BUILD_FAILED
|
|
elif isinstance(e, NotSupportedException):
|
|
print(self.logger.log_line(
|
|
self.logger.LogType.INFO,
|
|
'Project %s is not supported' % project_name_str))
|
|
test_result = self.TEST_RESULT_NOT_SUPPORTED
|
|
|
|
|
|
# Append test results to global test summary
|
|
self.test_summary.append(
|
|
(test_result, target, toolchain, test_id,
|
|
test.get_description(), 0, 0, '-')
|
|
)
|
|
|
|
# Add detailed test result to test summary structure
|
|
if test_id not in self.test_summary_ext[target][toolchain]:
|
|
self.test_summary_ext[target][toolchain][test_id] = []
|
|
|
|
self.test_summary_ext[target][toolchain][test_id].append({ 0: {
|
|
'result' : test_result,
|
|
'output' : '',
|
|
'target_name' : target,
|
|
'target_name_unique': target,
|
|
'toolchain_name' : toolchain,
|
|
'id' : test_id,
|
|
'description' : test.get_description(),
|
|
'elapsed_time' : 0,
|
|
'duration' : 0,
|
|
'copy_method' : None
|
|
}})
|
|
continue
|
|
|
|
if self.opts_only_build_tests:
|
|
# With this option we are skipping testing phase
|
|
continue
|
|
|
|
# Test duration can be increased by global value
|
|
test_duration = test.duration
|
|
if self.opts_extend_test_timeout is not None:
|
|
test_duration += self.opts_extend_test_timeout
|
|
|
|
# For an automated test the duration act as a timeout after
|
|
# which the test gets interrupted
|
|
test_spec = self.shape_test_request(target, path, test_id, test_duration)
|
|
test_loops = self.get_test_loop_count(test_id)
|
|
|
|
test_suite_properties['test.duration.%s.%s.%s'% (target, toolchain, test_id)] = test_duration
|
|
test_suite_properties['test.loops.%s.%s.%s'% (target, toolchain, test_id)] = test_loops
|
|
test_suite_properties['test.path.%s.%s.%s'% (target, toolchain, test_id)] = path
|
|
|
|
# read MUTs, test specification and perform tests
|
|
handle_results = self.handle(test_spec, target, toolchain, test_loops=test_loops)
|
|
|
|
if handle_results is None:
|
|
continue
|
|
|
|
for handle_result in handle_results:
|
|
if handle_result:
|
|
single_test_result, detailed_test_results = handle_result
|
|
else:
|
|
continue
|
|
|
|
# Append test results to global test summary
|
|
if single_test_result is not None:
|
|
self.test_summary.append(single_test_result)
|
|
|
|
# Add detailed test result to test summary structure
|
|
if target not in self.test_summary_ext[target][toolchain]:
|
|
if test_id not in self.test_summary_ext[target][toolchain]:
|
|
self.test_summary_ext[target][toolchain][test_id] = []
|
|
|
|
append_test_result = detailed_test_results
|
|
|
|
# If waterfall and consolidate-waterfall options are enabled,
|
|
# only include the last test result in the report.
|
|
if self.opts_waterfall_test and self.opts_consolidate_waterfall_test:
|
|
append_test_result = {0: detailed_test_results[len(detailed_test_results) - 1]}
|
|
|
|
self.test_summary_ext[target][toolchain][test_id].append(append_test_result)
|
|
|
|
test_suite_properties['skipped'] = ', '.join(test_suite_properties['skipped'])
|
|
self.test_suite_properties_ext[target][toolchain] = test_suite_properties
|
|
|
|
q.put(target + '_'.join(toolchains))
|
|
return
|
|
|
|
def execute(self):
|
|
clean = self.test_spec.get('clean', False)
|
|
test_ids = self.test_spec.get('test_ids', [])
|
|
q = Queue()
|
|
|
|
# Generate seed for shuffle if seed is not provided in
|
|
self.shuffle_random_seed = round(random.random(), self.SHUFFLE_SEED_ROUND)
|
|
if self.opts_shuffle_test_seed is not None and self.is_shuffle_seed_float():
|
|
self.shuffle_random_seed = round(float(self.opts_shuffle_test_seed), self.SHUFFLE_SEED_ROUND)
|
|
|
|
|
|
if self.opts_parallel_test_exec:
|
|
###################################################################
|
|
# Experimental, parallel test execution per singletest instance.
|
|
###################################################################
|
|
execute_threads = [] # Threads used to build mbed SDL, libs, test cases and execute tests
|
|
# Note: We are building here in parallel for each target separately!
|
|
# So we are not building the same thing multiple times and compilers
|
|
# in separate threads do not collide.
|
|
# Inside execute_thread_slice() function function handle() will be called to
|
|
# get information about available MUTs (per target).
|
|
for target, toolchains in self.test_spec['targets'].items():
|
|
self.test_suite_properties_ext[target] = {}
|
|
t = threading.Thread(target=self.execute_thread_slice, args = (q, target, toolchains, clean, test_ids, self.build_report, self.build_properties))
|
|
t.daemon = True
|
|
t.start()
|
|
execute_threads.append(t)
|
|
|
|
for t in execute_threads:
|
|
q.get() # t.join() would block some threads because we should not wait in any order for thread end
|
|
else:
|
|
# Serialized (not parallel) test execution
|
|
for target, toolchains in self.test_spec['targets'].items():
|
|
if target not in self.test_suite_properties_ext:
|
|
self.test_suite_properties_ext[target] = {}
|
|
|
|
self.execute_thread_slice(q, target, toolchains, clean, test_ids, self.build_report, self.build_properties)
|
|
q.get()
|
|
|
|
return self.test_summary, self.shuffle_random_seed, self.test_summary_ext, self.test_suite_properties_ext, self.build_report, self.build_properties
|
|
|
|
def get_valid_tests(self, test_map_keys, target, toolchain, test_ids, include_non_automated):
|
|
valid_test_map_keys = []
|
|
|
|
for test_id in test_map_keys:
|
|
test = TEST_MAP[test_id]
|
|
if self.opts_test_by_names and test_id not in self.opts_test_by_names:
|
|
continue
|
|
|
|
if test_ids and test_id not in test_ids:
|
|
continue
|
|
|
|
if self.opts_test_only_peripheral and not test.peripherals:
|
|
if self.opts_verbose_skipped_tests:
|
|
print(self.logger.log_line(
|
|
self.logger.LogType.INFO,
|
|
'Common test skipped for target %s' % target))
|
|
continue
|
|
|
|
if (self.opts_peripheral_by_names and test.peripherals and
|
|
not any((i in self.opts_peripheral_by_names)
|
|
for i in test.peripherals)):
|
|
# We will skip tests not forced with -p option
|
|
if self.opts_verbose_skipped_tests:
|
|
print(self.logger.log_line(
|
|
self.logger.LogType.INFO,
|
|
'Common test skipped for target %s' % target))
|
|
continue
|
|
|
|
if self.opts_test_only_common and test.peripherals:
|
|
if self.opts_verbose_skipped_tests:
|
|
print(self.logger.log_line(
|
|
self.logger.LogType.INFO,
|
|
'Peripheral test skipped for target %s' % target))
|
|
continue
|
|
|
|
if not include_non_automated and not test.automated:
|
|
if self.opts_verbose_skipped_tests:
|
|
print(self.logger.log_line(
|
|
self.logger.LogType.INFO,
|
|
'Non automated test skipped for target %s' % target))
|
|
continue
|
|
|
|
if test.is_supported(target, toolchain):
|
|
if test.peripherals is None and self.opts_only_build_tests:
|
|
# When users are using 'build only flag' and test do not have
|
|
# specified peripherals we can allow test building by default
|
|
pass
|
|
elif self.opts_peripheral_by_names and test_id not in self.opts_peripheral_by_names:
|
|
# If we force peripheral with option -p we expect test
|
|
# to pass even if peripheral is not in MUTs file.
|
|
pass
|
|
elif not self.is_peripherals_available(target, test.peripherals):
|
|
if self.opts_verbose_skipped_tests:
|
|
if test.peripherals:
|
|
print(self.logger.log_line(
|
|
self.logger.LogType.INFO,
|
|
'Peripheral %s test skipped for target %s' %
|
|
(",".join(test.peripherals), target)))
|
|
else:
|
|
print(self.logger.log_line(
|
|
self.logger.LogType.INFO,
|
|
'Test %s skipped for target %s' %
|
|
(test_id, target)))
|
|
continue
|
|
|
|
# The test has made it through all the filters, so add it to the valid tests list
|
|
valid_test_map_keys.append(test_id)
|
|
|
|
return valid_test_map_keys
|
|
|
|
def get_skipped_tests(self, all_test_map_keys, valid_test_map_keys):
|
|
# NOTE: This will not preserve order
|
|
return list(set(all_test_map_keys) - set(valid_test_map_keys))
|
|
|
|
def generate_test_summary_by_target(self, test_summary, shuffle_seed=None):
|
|
""" Prints well-formed summary with results (SQL table like)
|
|
table shows text x toolchain test result matrix
|
|
"""
|
|
RESULT_INDEX = 0
|
|
TARGET_INDEX = 1
|
|
TOOLCHAIN_INDEX = 2
|
|
TEST_INDEX = 3
|
|
DESC_INDEX = 4
|
|
|
|
unique_targets = get_unique_value_from_summary(test_summary, TARGET_INDEX)
|
|
unique_tests = get_unique_value_from_summary(test_summary, TEST_INDEX)
|
|
unique_test_desc = get_unique_value_from_summary_ext(test_summary, TEST_INDEX, DESC_INDEX)
|
|
unique_toolchains = get_unique_value_from_summary(test_summary, TOOLCHAIN_INDEX)
|
|
|
|
result = "Test summary:\n"
|
|
for target in unique_targets:
|
|
result_dict = {} # test : { toolchain : result }
|
|
unique_target_toolchains = []
|
|
for test in test_summary:
|
|
if test[TARGET_INDEX] == target:
|
|
if test[TOOLCHAIN_INDEX] not in unique_target_toolchains:
|
|
unique_target_toolchains.append(test[TOOLCHAIN_INDEX])
|
|
if test[TEST_INDEX] not in result_dict:
|
|
result_dict[test[TEST_INDEX]] = {}
|
|
result_dict[test[TEST_INDEX]][test[TOOLCHAIN_INDEX]] = test[RESULT_INDEX]
|
|
|
|
pt_cols = ["Target", "Test ID", "Test Description"] + unique_target_toolchains
|
|
pt = PrettyTable(pt_cols, junction_char="|", hrules=HEADER)
|
|
for col in pt_cols:
|
|
pt.align[col] = "l"
|
|
pt.padding_width = 1 # One space between column edges and contents (default)
|
|
|
|
for test in unique_tests:
|
|
if test in result_dict:
|
|
test_results = result_dict[test]
|
|
if test in unique_test_desc:
|
|
row = [target, test, unique_test_desc[test]]
|
|
for toolchain in unique_toolchains:
|
|
if toolchain in test_results:
|
|
row.append(test_results[toolchain])
|
|
pt.add_row(row)
|
|
result += pt.get_string()
|
|
shuffle_seed_text = "Shuffle Seed: %.*f"% (self.SHUFFLE_SEED_ROUND,
|
|
shuffle_seed if shuffle_seed else self.shuffle_random_seed)
|
|
result += "\n%s"% (shuffle_seed_text if self.opts_shuffle_test_order else '')
|
|
return result
|
|
|
|
def generate_test_summary(self, test_summary, shuffle_seed=None):
|
|
""" Prints well-formed summary with results (SQL table like)
|
|
table shows target x test results matrix across
|
|
"""
|
|
success_code = 0 # Success code that can be leter returned to
|
|
result = "Test summary:\n"
|
|
# Pretty table package is used to print results
|
|
pt = PrettyTable(["Result", "Target", "Toolchain", "Test ID", "Test Description",
|
|
"Elapsed Time (sec)", "Timeout (sec)", "Loops"], junction_char="|", hrules=HEADER)
|
|
pt.align["Result"] = "l" # Left align
|
|
pt.align["Target"] = "l" # Left align
|
|
pt.align["Toolchain"] = "l" # Left align
|
|
pt.align["Test ID"] = "l" # Left align
|
|
pt.align["Test Description"] = "l" # Left align
|
|
pt.padding_width = 1 # One space between column edges and contents (default)
|
|
|
|
result_dict = {self.TEST_RESULT_OK : 0,
|
|
self.TEST_RESULT_FAIL : 0,
|
|
self.TEST_RESULT_ERROR : 0,
|
|
self.TEST_RESULT_UNDEF : 0,
|
|
self.TEST_RESULT_IOERR_COPY : 0,
|
|
self.TEST_RESULT_IOERR_DISK : 0,
|
|
self.TEST_RESULT_IOERR_SERIAL : 0,
|
|
self.TEST_RESULT_NO_IMAGE : 0,
|
|
self.TEST_RESULT_TIMEOUT : 0,
|
|
self.TEST_RESULT_MBED_ASSERT : 0,
|
|
self.TEST_RESULT_BUILD_FAILED : 0,
|
|
self.TEST_RESULT_NOT_SUPPORTED : 0
|
|
}
|
|
|
|
for test in test_summary:
|
|
if test[0] in result_dict:
|
|
result_dict[test[0]] += 1
|
|
pt.add_row(test)
|
|
result += pt.get_string()
|
|
result += "\n"
|
|
|
|
# Print result count
|
|
result += "Result: " + ' / '.join(['%s %s' % (value, key) for (key, value) in {k: v for k, v in result_dict.items() if v != 0}.items()])
|
|
shuffle_seed_text = "Shuffle Seed: %.*f\n"% (self.SHUFFLE_SEED_ROUND,
|
|
shuffle_seed if shuffle_seed else self.shuffle_random_seed)
|
|
result += "\n%s"% (shuffle_seed_text if self.opts_shuffle_test_order else '')
|
|
return result
|
|
|
|
def test_loop_list_to_dict(self, test_loops_str):
|
|
""" Transforms test_id=X,test_id=X,test_id=X into dictionary {test_id : test_id_loops_count}
|
|
"""
|
|
result = {}
|
|
if test_loops_str:
|
|
test_loops = test_loops_str
|
|
for test_loop in test_loops:
|
|
test_loop_count = test_loop.split('=')
|
|
if len(test_loop_count) == 2:
|
|
_test_id, _test_loops = test_loop_count
|
|
try:
|
|
_test_loops = int(_test_loops)
|
|
except:
|
|
continue
|
|
result[_test_id] = _test_loops
|
|
return result
|
|
|
|
def get_test_loop_count(self, test_id):
|
|
""" This function returns no. of loops per test (deducted by test_id_.
|
|
If test is not in list of redefined loop counts it will use default value.
|
|
"""
|
|
result = self.GLOBAL_LOOPS_COUNT
|
|
if test_id in self.TEST_LOOPS_DICT:
|
|
result = self.TEST_LOOPS_DICT[test_id]
|
|
return result
|
|
|
|
def delete_file(self, file_path):
|
|
""" Remove file from the system
|
|
"""
|
|
result = True
|
|
resutl_msg = ""
|
|
try:
|
|
os.remove(file_path)
|
|
except Exception as e:
|
|
resutl_msg = e
|
|
result = False
|
|
return result, resutl_msg
|
|
|
|
def handle_mut(self, mut, data, target_name, toolchain_name, test_loops=1):
|
|
""" Test is being invoked for given MUT.
|
|
"""
|
|
# Get test information, image and test timeout
|
|
test_id = data['test_id']
|
|
test = TEST_MAP[test_id]
|
|
test_description = TEST_MAP[test_id].get_description()
|
|
image = data["image"]
|
|
duration = data.get("duration", 10)
|
|
|
|
if mut is None:
|
|
print("Error: No Mbed available: MUT[%s]" % data['mcu'])
|
|
return None
|
|
|
|
mcu = mut['mcu']
|
|
copy_method = mut.get('copy_method') # Available board configuration selection e.g. core selection etc.
|
|
|
|
selected_copy_method = self.opts_copy_method if copy_method is None else copy_method
|
|
|
|
# Tests can be looped so test results must be stored for the same test
|
|
test_all_result = []
|
|
# Test results for one test ran few times
|
|
detailed_test_results = {} # { Loop_number: { results ... } }
|
|
|
|
for test_index in range(test_loops):
|
|
|
|
# If mbedls is available and we are auto detecting MUT info,
|
|
# update MUT info (mounting may changed)
|
|
if get_module_avail('mbed_lstools') and self.opts_auto_detect:
|
|
platform_name_filter = [mcu]
|
|
muts_list = {}
|
|
found = False
|
|
|
|
for i in range(0, 60):
|
|
print('Looking for %s with MBEDLS' % mcu)
|
|
muts_list = get_autodetected_MUTS_list(platform_name_filter=platform_name_filter)
|
|
|
|
if 1 not in muts_list:
|
|
sleep(3)
|
|
else:
|
|
found = True
|
|
break
|
|
|
|
if not found:
|
|
print("Error: mbed not found with MBEDLS: %s" % data['mcu'])
|
|
return None
|
|
else:
|
|
mut = muts_list[1]
|
|
|
|
disk = mut.get('disk')
|
|
port = mut.get('port')
|
|
|
|
if disk is None or port is None:
|
|
return None
|
|
|
|
target_by_mcu = TARGET_MAP[mut['mcu']]
|
|
target_name_unique = mut['mcu_unique'] if 'mcu_unique' in mut else mut['mcu']
|
|
# Some extra stuff can be declared in MUTs structure
|
|
reset_type = mut.get('reset_type') # reboot.txt, reset.txt, shutdown.txt
|
|
reset_tout = mut.get('reset_tout') # COPY_IMAGE -> RESET_PROC -> SLEEP(RESET_TOUT)
|
|
|
|
# When the build and test system were separate, this was relative to a
|
|
# base network folder base path: join(NETWORK_BASE_PATH, ).
|
|
# "image" is now a list representing a development image and an update image
|
|
# (for device management). When testing, we only use the development image.
|
|
image_path = image[0]
|
|
|
|
# Host test execution
|
|
start_host_exec_time = time()
|
|
|
|
single_test_result = self.TEST_RESULT_UNDEF # single test run result
|
|
_copy_method = selected_copy_method
|
|
if not exists(image_path):
|
|
single_test_result = self.TEST_RESULT_NO_IMAGE
|
|
elapsed_time = 0
|
|
single_test_output = self.logger.log_line(self.logger.LogType.ERROR, 'Image file does not exist: %s'% image_path)
|
|
print(single_test_output)
|
|
else:
|
|
# Host test execution
|
|
start_host_exec_time = time()
|
|
|
|
host_test_verbose = self.opts_verbose_test_result_only or self.opts_verbose
|
|
host_test_reset = self.opts_mut_reset_type if reset_type is None else reset_type
|
|
host_test_result = self.run_host_test(test.host_test,
|
|
image_path, disk, port, duration,
|
|
micro=target_name,
|
|
verbose=host_test_verbose,
|
|
reset=host_test_reset,
|
|
reset_tout=reset_tout,
|
|
copy_method=selected_copy_method,
|
|
program_cycle_s=target_by_mcu.program_cycle_s)
|
|
single_test_result, single_test_output, single_testduration, single_timeout = host_test_result
|
|
|
|
# Store test result
|
|
test_all_result.append(single_test_result)
|
|
total_elapsed_time = time() - start_host_exec_time # Test time with copy (flashing) / reset
|
|
elapsed_time = single_testduration # TIme of single test case execution after reset
|
|
|
|
detailed_test_results[test_index] = {
|
|
'result' : single_test_result,
|
|
'output' : single_test_output,
|
|
'target_name' : target_name,
|
|
'target_name_unique' : target_name_unique,
|
|
'toolchain_name' : toolchain_name,
|
|
'id' : test_id,
|
|
'description' : test_description,
|
|
'elapsed_time' : round(elapsed_time, 2),
|
|
'duration' : single_timeout,
|
|
'copy_method' : _copy_method,
|
|
}
|
|
|
|
print(self.print_test_result(
|
|
single_test_result, target_name_unique, toolchain_name, test_id,
|
|
test_description, elapsed_time, single_timeout))
|
|
|
|
# If we perform waterfall test we test until we get OK and we stop testing
|
|
if self.opts_waterfall_test and single_test_result == self.TEST_RESULT_OK:
|
|
break
|
|
|
|
return (self.shape_global_test_loop_result(test_all_result, self.opts_waterfall_test and self.opts_consolidate_waterfall_test),
|
|
target_name_unique,
|
|
toolchain_name,
|
|
test_id,
|
|
test_description,
|
|
round(elapsed_time, 2),
|
|
single_timeout,
|
|
self.shape_test_loop_ok_result_count(test_all_result)), detailed_test_results
|
|
|
|
def handle(self, test_spec, target_name, toolchain_name, test_loops=1):
|
|
""" Function determines MUT's mbed disk/port and copies binary to
|
|
target.
|
|
"""
|
|
handle_results = []
|
|
data = json.loads(test_spec)
|
|
|
|
# Find a suitable MUT:
|
|
mut = None
|
|
for id, m in self.muts.items():
|
|
if m['mcu'] == data['mcu']:
|
|
mut = m
|
|
handle_result = self.handle_mut(mut, data, target_name, toolchain_name, test_loops=test_loops)
|
|
handle_results.append(handle_result)
|
|
|
|
return handle_results
|
|
|
|
def print_test_result(self, test_result, target_name, toolchain_name,
|
|
test_id, test_description, elapsed_time, duration):
|
|
""" Use specific convention to print test result and related data
|
|
"""
|
|
tokens = []
|
|
tokens.append("TargetTest")
|
|
tokens.append(target_name)
|
|
tokens.append(toolchain_name)
|
|
tokens.append(test_id)
|
|
tokens.append(test_description)
|
|
separator = "::"
|
|
time_info = " in %.2f of %d sec" % (round(elapsed_time, 2), duration)
|
|
result = separator.join(tokens) + " [" + test_result +"]" + time_info
|
|
return Fore.MAGENTA + result + Fore.RESET
|
|
|
|
def shape_test_loop_ok_result_count(self, test_all_result):
|
|
""" Reformats list of results to simple string
|
|
"""
|
|
test_loop_count = len(test_all_result)
|
|
test_loop_ok_result = test_all_result.count(self.TEST_RESULT_OK)
|
|
return "%d/%d"% (test_loop_ok_result, test_loop_count)
|
|
|
|
def shape_global_test_loop_result(self, test_all_result, waterfall_and_consolidate):
|
|
""" Reformats list of results to simple string
|
|
"""
|
|
result = self.TEST_RESULT_FAIL
|
|
|
|
if all(test_all_result[0] == res for res in test_all_result):
|
|
result = test_all_result[0]
|
|
elif waterfall_and_consolidate and any(res == self.TEST_RESULT_OK for res in test_all_result):
|
|
result = self.TEST_RESULT_OK
|
|
|
|
return result
|
|
|
|
def run_host_test(self, name, image_path, disk, port, duration,
|
|
micro=None, reset=None, reset_tout=None,
|
|
verbose=False, copy_method=None, program_cycle_s=None):
|
|
""" Function creates new process with host test configured with particular test case.
|
|
Function also is pooling for serial port activity from process to catch all data
|
|
printed by test runner and host test during test execution
|
|
"""
|
|
|
|
def get_char_from_queue(obs):
|
|
""" Get character from queue safe way
|
|
"""
|
|
try:
|
|
c = obs.queue.get(block=True, timeout=0.5)
|
|
except Empty:
|
|
c = None
|
|
return c
|
|
|
|
def filter_queue_char(c):
|
|
""" Filters out non ASCII characters from serial port
|
|
"""
|
|
if ord(c) not in range(128):
|
|
c = ' '
|
|
return c
|
|
|
|
def get_test_result(output):
|
|
""" Parse test 'output' data
|
|
"""
|
|
result = self.TEST_RESULT_TIMEOUT
|
|
for line in "".join(output).splitlines():
|
|
search_result = self.RE_DETECT_TESTCASE_RESULT.search(line)
|
|
if search_result and len(search_result.groups()):
|
|
result = self.TEST_RESULT_MAPPING[search_result.groups(0)[0]]
|
|
break
|
|
return result
|
|
|
|
def get_auto_property_value(property_name, line):
|
|
""" Scans auto detection line from MUT and returns scanned parameter 'property_name'
|
|
Returns string
|
|
"""
|
|
result = None
|
|
if re.search("HOST: Property '%s'"% property_name, line) is not None:
|
|
property = re.search("HOST: Property '%s' = '([\w\d _]+)'"% property_name, line)
|
|
if property is not None and len(property.groups()) == 1:
|
|
result = property.groups()[0]
|
|
return result
|
|
|
|
cmd = ["python",
|
|
'%s.py'% name,
|
|
'-d', disk,
|
|
'-f', '"%s"'% image_path,
|
|
'-p', port,
|
|
'-t', str(duration),
|
|
'-C', str(program_cycle_s)]
|
|
|
|
if get_module_avail('mbed_lstools') and self.opts_auto_detect:
|
|
cmd += ['--auto']
|
|
|
|
# Add extra parameters to host_test
|
|
if copy_method is not None:
|
|
cmd += ["-c", copy_method]
|
|
if micro is not None:
|
|
cmd += ["-m", micro]
|
|
if reset is not None:
|
|
cmd += ["-r", reset]
|
|
if reset_tout is not None:
|
|
cmd += ["-R", str(reset_tout)]
|
|
|
|
if verbose:
|
|
print(Fore.MAGENTA + "Executing '" + " ".join(cmd) + "'" + Fore.RESET)
|
|
print("Test::Output::Start")
|
|
|
|
proc = Popen(cmd, stdout=PIPE, cwd=HOST_TESTS)
|
|
obs = ProcessObserver(proc)
|
|
update_once_flag = {} # Stores flags checking if some auto-parameter was already set
|
|
line = ''
|
|
output = []
|
|
start_time = time()
|
|
while (time() - start_time) < (2 * duration):
|
|
c = get_char_from_queue(obs)
|
|
if c:
|
|
if verbose:
|
|
sys.stdout.write(c)
|
|
c = filter_queue_char(c)
|
|
output.append(c)
|
|
# Give the mbed under test a way to communicate the end of the test
|
|
if c in ['\n', '\r']:
|
|
|
|
# Checking for auto-detection information from the test about MUT reset moment
|
|
if 'reset_target' not in update_once_flag and "HOST: Reset target..." in line:
|
|
# We will update this marker only once to prevent multiple time resets
|
|
update_once_flag['reset_target'] = True
|
|
start_time = time()
|
|
|
|
# Checking for auto-detection information from the test about timeout
|
|
auto_timeout_val = get_auto_property_value('timeout', line)
|
|
if 'timeout' not in update_once_flag and auto_timeout_val is not None:
|
|
# We will update this marker only once to prevent multiple time resets
|
|
update_once_flag['timeout'] = True
|
|
duration = int(auto_timeout_val)
|
|
|
|
# Detect mbed assert:
|
|
if 'mbed assertation failed: ' in line:
|
|
output.append('{{mbed_assert}}')
|
|
break
|
|
|
|
# Check for test end
|
|
if '{end}' in line:
|
|
break
|
|
line = ''
|
|
else:
|
|
line += c
|
|
end_time = time()
|
|
testcase_duration = end_time - start_time # Test case duration from reset to {end}
|
|
|
|
c = get_char_from_queue(obs)
|
|
|
|
if c:
|
|
if verbose:
|
|
sys.stdout.write(c)
|
|
c = filter_queue_char(c)
|
|
output.append(c)
|
|
|
|
if verbose:
|
|
print("Test::Output::Finish")
|
|
# Stop test process
|
|
obs.stop()
|
|
|
|
result = get_test_result(output)
|
|
return (result, "".join(output), testcase_duration, duration)
|
|
|
|
def is_peripherals_available(self, target_mcu_name, peripherals=None):
|
|
""" Checks if specified target should run specific peripheral test case defined in MUTs file
|
|
"""
|
|
if peripherals is not None:
|
|
peripherals = set(peripherals)
|
|
for id, mut in self.muts.items():
|
|
# Target MCU name check
|
|
if mut["mcu"] != target_mcu_name:
|
|
continue
|
|
# Peripherals check
|
|
if peripherals is not None:
|
|
if 'peripherals' not in mut:
|
|
continue
|
|
if not peripherals.issubset(set(mut['peripherals'])):
|
|
continue
|
|
return True
|
|
return False
|
|
|
|
def shape_test_request(self, mcu, image_path, test_id, duration=10):
|
|
""" Function prepares JSON structure describing test specification
|
|
"""
|
|
test_spec = {
|
|
"mcu": mcu,
|
|
"image": image_path,
|
|
"duration": duration,
|
|
"test_id": test_id,
|
|
}
|
|
return json.dumps(test_spec)
|
|
|
|
|
|
def get_unique_value_from_summary(test_summary, index):
|
|
""" Gets list of unique target names
|
|
"""
|
|
result = []
|
|
for test in test_summary:
|
|
target_name = test[index]
|
|
if target_name not in result:
|
|
result.append(target_name)
|
|
return sorted(result)
|
|
|
|
|
|
def get_unique_value_from_summary_ext(test_summary, index_key, index_val):
|
|
""" Gets list of unique target names and return dictionary
|
|
"""
|
|
result = {}
|
|
for test in test_summary:
|
|
key = test[index_key]
|
|
val = test[index_val]
|
|
if key not in result:
|
|
result[key] = val
|
|
return result
|
|
|
|
|
|
def show_json_file_format_error(json_spec_filename, line, column):
|
|
""" Prints JSON broken content
|
|
"""
|
|
with open(json_spec_filename) as data_file:
|
|
line_no = 1
|
|
for json_line in data_file:
|
|
if line_no + 5 >= line: # Print last few lines before error
|
|
print('Line %d:\t'%line_no + json_line)
|
|
if line_no == line:
|
|
print('%s\t%s^' (' ' * len('Line %d:' % line_no),
|
|
'-' * (column - 1)))
|
|
break
|
|
line_no += 1
|
|
|
|
|
|
def json_format_error_defect_pos(json_error_msg):
|
|
""" Gets first error line and column in JSON file format.
|
|
Parsed from exception thrown by json.loads() string
|
|
"""
|
|
result = None
|
|
line, column = 0, 0
|
|
# Line value search
|
|
line_search = re.search('line [0-9]+', json_error_msg)
|
|
if line_search is not None:
|
|
ls = line_search.group().split(' ')
|
|
if len(ls) == 2:
|
|
line = int(ls[1])
|
|
# Column position search
|
|
column_search = re.search('column [0-9]+', json_error_msg)
|
|
if column_search is not None:
|
|
cs = column_search.group().split(' ')
|
|
if len(cs) == 2:
|
|
column = int(cs[1])
|
|
result = [line, column]
|
|
return result
|
|
|
|
|
|
def get_json_data_from_file(json_spec_filename, verbose=False):
|
|
""" Loads from file JSON formatted string to data structure
|
|
"""
|
|
result = None
|
|
try:
|
|
with open(json_spec_filename) as data_file:
|
|
try:
|
|
result = json.load(data_file)
|
|
except ValueError as json_error_msg:
|
|
result = None
|
|
print('JSON file %s parsing failed. Reason: %s' %
|
|
(json_spec_filename, json_error_msg))
|
|
# We can print where error occurred inside JSON file if we can parse exception msg
|
|
json_format_defect_pos = json_format_error_defect_pos(str(json_error_msg))
|
|
if json_format_defect_pos is not None:
|
|
line = json_format_defect_pos[0]
|
|
column = json_format_defect_pos[1]
|
|
print()
|
|
show_json_file_format_error(json_spec_filename, line, column)
|
|
|
|
except IOError as fileopen_error_msg:
|
|
print('JSON file %s not opened. Reason: %s\n'%
|
|
(json_spec_filename, fileopen_error_msg))
|
|
if verbose and result:
|
|
pp = pprint.PrettyPrinter(indent=4)
|
|
pp.pprint(result)
|
|
return result
|
|
|
|
|
|
def print_muts_configuration_from_json(json_data, join_delim=", ", platform_filter=None):
|
|
""" Prints MUTs configuration passed to test script for verboseness
|
|
"""
|
|
muts_info_cols = []
|
|
# We need to check all unique properties for each defined MUT
|
|
for k in json_data:
|
|
mut_info = json_data[k]
|
|
for mut_property in mut_info:
|
|
if mut_property not in muts_info_cols:
|
|
muts_info_cols.append(mut_property)
|
|
|
|
# Prepare pretty table object to display all MUTs
|
|
pt_cols = ["index"] + muts_info_cols
|
|
pt = PrettyTable(pt_cols, junction_char="|", hrules=HEADER)
|
|
for col in pt_cols:
|
|
pt.align[col] = "l"
|
|
|
|
# Add rows to pretty print object
|
|
for k in json_data:
|
|
row = [k]
|
|
mut_info = json_data[k]
|
|
|
|
add_row = True
|
|
if platform_filter and 'mcu' in mut_info:
|
|
add_row = re.search(platform_filter, mut_info['mcu']) is not None
|
|
if add_row:
|
|
for col in muts_info_cols:
|
|
cell_val = mut_info[col] if col in mut_info else None
|
|
if isinstance(cell_val, list):
|
|
cell_val = join_delim.join(cell_val)
|
|
row.append(cell_val)
|
|
pt.add_row(row)
|
|
return pt.get_string()
|
|
|
|
|
|
def print_test_configuration_from_json(json_data, join_delim=", "):
|
|
""" Prints test specification configuration passed to test script for verboseness
|
|
"""
|
|
toolchains_info_cols = []
|
|
# We need to check all toolchains for each device
|
|
for k in json_data:
|
|
# k should be 'targets'
|
|
targets = json_data[k]
|
|
for target in targets:
|
|
toolchains = targets[target]
|
|
for toolchain in toolchains:
|
|
if toolchain not in toolchains_info_cols:
|
|
toolchains_info_cols.append(toolchain)
|
|
|
|
# Prepare pretty table object to display test specification
|
|
pt_cols = ["mcu"] + sorted(toolchains_info_cols)
|
|
pt = PrettyTable(pt_cols, junction_char="|", hrules=HEADER)
|
|
for col in pt_cols:
|
|
pt.align[col] = "l"
|
|
|
|
# { target : [conflicted toolchains] }
|
|
toolchain_conflicts = {}
|
|
toolchain_path_conflicts = []
|
|
for k in json_data:
|
|
# k should be 'targets'
|
|
targets = json_data[k]
|
|
for target in targets:
|
|
target_supported_toolchains = get_target_supported_toolchains(target)
|
|
if not target_supported_toolchains:
|
|
target_supported_toolchains = []
|
|
target_name = target if target in TARGET_MAP else "%s*"% target
|
|
row = [target_name]
|
|
toolchains = targets[target]
|
|
|
|
for toolchain in sorted(toolchains_info_cols):
|
|
# Check for conflicts: target vs toolchain
|
|
conflict = False
|
|
conflict_path = False
|
|
if toolchain in toolchains:
|
|
if toolchain not in target_supported_toolchains:
|
|
conflict = True
|
|
if target not in toolchain_conflicts:
|
|
toolchain_conflicts[target] = []
|
|
toolchain_conflicts[target].append(toolchain)
|
|
# Add marker inside table about target usage / conflict
|
|
cell_val = 'Yes' if toolchain in toolchains else '-'
|
|
if conflict:
|
|
cell_val += '*'
|
|
# Check for conflicts: toolchain vs toolchain path
|
|
if toolchain in TOOLCHAIN_PATHS:
|
|
toolchain_path = TOOLCHAIN_PATHS[toolchain]
|
|
if not os.path.isdir(toolchain_path):
|
|
conflict_path = True
|
|
if toolchain not in toolchain_path_conflicts:
|
|
toolchain_path_conflicts.append(toolchain)
|
|
if conflict_path:
|
|
cell_val += '#'
|
|
row.append(cell_val)
|
|
pt.add_row(row)
|
|
|
|
# generate result string
|
|
result = pt.get_string() # Test specification table
|
|
if toolchain_conflicts or toolchain_path_conflicts:
|
|
result += "\n"
|
|
result += "Toolchain conflicts:\n"
|
|
for target in toolchain_conflicts:
|
|
if target not in TARGET_MAP:
|
|
result += "\t* Target %s unknown\n"% (target)
|
|
conflict_target_list = join_delim.join(toolchain_conflicts[target])
|
|
sufix = 's' if len(toolchain_conflicts[target]) > 1 else ''
|
|
result += "\t* Target %s does not support %s toolchain%s\n"% (target, conflict_target_list, sufix)
|
|
|
|
for toolchain in toolchain_path_conflicts:
|
|
# Let's check toolchain configuration
|
|
if toolchain in TOOLCHAIN_PATHS:
|
|
toolchain_path = TOOLCHAIN_PATHS[toolchain]
|
|
if not os.path.isdir(toolchain_path):
|
|
result += "\t# Toolchain %s path not found: %s\n"% (toolchain, toolchain_path)
|
|
return result
|
|
|
|
|
|
def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=',',platform_filter=None):
|
|
""" Generates table summary with all test cases and additional test cases
|
|
information using pretty print functionality. Allows test suite user to
|
|
see test cases
|
|
"""
|
|
# get all unique test ID prefixes
|
|
unique_test_id = []
|
|
for test in TESTS:
|
|
split = test['id'].split('_')[:-1]
|
|
test_id_prefix = '_'.join(split)
|
|
if test_id_prefix not in unique_test_id:
|
|
unique_test_id.append(test_id_prefix)
|
|
unique_test_id.sort()
|
|
counter_dict_test_id_types = dict((t, 0) for t in unique_test_id)
|
|
counter_dict_test_id_types_all = dict((t, 0) for t in unique_test_id)
|
|
|
|
test_properties = ['id',
|
|
'automated',
|
|
'description',
|
|
'peripherals',
|
|
'host_test',
|
|
'duration'] if cols is None else cols
|
|
|
|
# All tests status table print
|
|
pt = PrettyTable(test_properties, junction_char="|", hrules=HEADER)
|
|
for col in test_properties:
|
|
pt.align[col] = "l"
|
|
pt.align['duration'] = "r"
|
|
|
|
counter_all = 0
|
|
counter_automated = 0
|
|
pt.padding_width = 1 # One space between column edges and contents (default)
|
|
|
|
for test_id in sorted(TEST_MAP.keys()):
|
|
if platform_filter is not None:
|
|
# FIlter out platforms using regex
|
|
if re.search(platform_filter, test_id) is None:
|
|
continue
|
|
row = []
|
|
test = TEST_MAP[test_id]
|
|
split = test_id.split('_')[:-1]
|
|
test_id_prefix = '_'.join(split)
|
|
|
|
for col in test_properties:
|
|
col_value = test[col]
|
|
if isinstance(test[col], list):
|
|
col_value = join_delim.join(test[col])
|
|
elif test[col] == None:
|
|
col_value = "-"
|
|
|
|
row.append(col_value)
|
|
if test['automated'] == True:
|
|
counter_dict_test_id_types[test_id_prefix] += 1
|
|
counter_automated += 1
|
|
pt.add_row(row)
|
|
# Update counters
|
|
counter_all += 1
|
|
counter_dict_test_id_types_all[test_id_prefix] += 1
|
|
result = pt.get_string()
|
|
result += "\n\n"
|
|
|
|
if result_summary and not platform_filter:
|
|
# Automation result summary
|
|
test_id_cols = ['automated', 'all', 'percent [%]', 'progress']
|
|
pt = PrettyTable(test_id_cols, junction_char="|", hrules=HEADER)
|
|
pt.align['automated'] = "r"
|
|
pt.align['all'] = "r"
|
|
pt.align['percent [%]'] = "r"
|
|
|
|
percent_progress = round(100.0 * counter_automated / float(counter_all), 1)
|
|
str_progress = progress_bar(percent_progress, 75)
|
|
pt.add_row([counter_automated, counter_all, percent_progress, str_progress])
|
|
result += "Automation coverage:\n"
|
|
result += pt.get_string()
|
|
result += "\n\n"
|
|
|
|
# Test automation coverage table print
|
|
test_id_cols = ['id', 'automated', 'all', 'percent [%]', 'progress']
|
|
pt = PrettyTable(test_id_cols, junction_char="|", hrules=HEADER)
|
|
pt.align['id'] = "l"
|
|
pt.align['automated'] = "r"
|
|
pt.align['all'] = "r"
|
|
pt.align['percent [%]'] = "r"
|
|
for unique_id in unique_test_id:
|
|
# print "\t\t%s: %d / %d" % (unique_id, counter_dict_test_id_types[unique_id], counter_dict_test_id_types_all[unique_id])
|
|
percent_progress = round(100.0 * counter_dict_test_id_types[unique_id] / float(counter_dict_test_id_types_all[unique_id]), 1)
|
|
str_progress = progress_bar(percent_progress, 75)
|
|
row = [unique_id,
|
|
counter_dict_test_id_types[unique_id],
|
|
counter_dict_test_id_types_all[unique_id],
|
|
percent_progress,
|
|
"[" + str_progress + "]"]
|
|
pt.add_row(row)
|
|
result += "Test automation coverage:\n"
|
|
result += pt.get_string()
|
|
result += "\n\n"
|
|
return result
|
|
|
|
|
|
def progress_bar(percent_progress, saturation=0):
|
|
""" This function creates progress bar with optional simple saturation mark
|
|
"""
|
|
step = percent_progress // 2 # Scale by to (scale: 1 - 50)
|
|
str_progress = '#' * step + '.' * int(50 - step)
|
|
c = '!' if str_progress[38] == '.' else '|'
|
|
if saturation > 0:
|
|
saturation = saturation // 2
|
|
str_progress = str_progress[:saturation] + c + str_progress[saturation:]
|
|
return str_progress
|
|
|
|
|
|
def singletest_in_cli_mode(single_test):
|
|
""" Runs SingleTestRunner object in CLI (Command line interface) mode
|
|
|
|
@return returns success code (0 == success) for building and running tests
|
|
"""
|
|
start = time()
|
|
# Execute tests depending on options and filter applied
|
|
test_summary, shuffle_seed, test_summary_ext, test_suite_properties_ext, build_report, build_properties = single_test.execute()
|
|
elapsed_time = time() - start
|
|
|
|
# Human readable summary
|
|
if not single_test.opts_suppress_summary:
|
|
# prints well-formed summary with results (SQL table like)
|
|
print(single_test.generate_test_summary(test_summary, shuffle_seed))
|
|
if single_test.opts_test_x_toolchain_summary:
|
|
# prints well-formed summary with results (SQL table like)
|
|
# table shows text x toolchain test result matrix
|
|
print(single_test.generate_test_summary_by_target(test_summary,
|
|
shuffle_seed))
|
|
|
|
print("Completed in %.2f sec" % elapsed_time)
|
|
# Write summary of the builds
|
|
|
|
print_report_exporter = ReportExporter(ResultExporterType.PRINT, package="build")
|
|
status = print_report_exporter.report(build_report)
|
|
|
|
# Store extra reports in files
|
|
if single_test.opts_report_html_file_name:
|
|
# Export results in form of HTML report to separate file
|
|
report_exporter = ReportExporter(ResultExporterType.HTML)
|
|
report_exporter.report_to_file(test_summary_ext, single_test.opts_report_html_file_name, test_suite_properties=test_suite_properties_ext)
|
|
if single_test.opts_report_junit_file_name:
|
|
# Export results in form of JUnit XML report to separate file
|
|
report_exporter = ReportExporter(ResultExporterType.JUNIT)
|
|
report_exporter.report_to_file(test_summary_ext, single_test.opts_report_junit_file_name, test_suite_properties=test_suite_properties_ext)
|
|
if single_test.opts_report_text_file_name:
|
|
# Export results in form of a text file
|
|
report_exporter = ReportExporter(ResultExporterType.TEXT)
|
|
report_exporter.report_to_file(test_summary_ext, single_test.opts_report_text_file_name, test_suite_properties=test_suite_properties_ext)
|
|
if single_test.opts_report_build_file_name:
|
|
# Export build results as html report to sparate file
|
|
report_exporter = ReportExporter(ResultExporterType.JUNIT, package="build")
|
|
report_exporter.report_to_file(build_report, single_test.opts_report_build_file_name, test_suite_properties=build_properties)
|
|
|
|
# Returns True if no build failures of the test projects or their dependencies
|
|
return status
|
|
|
|
class TestLogger(object):
|
|
""" Super-class for logging and printing ongoing events for test suite pass
|
|
"""
|
|
def __init__(self, store_log=True):
|
|
""" We can control if logger actually stores log in memory
|
|
or just handled all log entries immediately
|
|
"""
|
|
self.log = []
|
|
self.log_to_file = False
|
|
self.log_file_name = None
|
|
self.store_log = store_log
|
|
|
|
self.LogType = construct_enum(INFO='Info',
|
|
WARN='Warning',
|
|
NOTIF='Notification',
|
|
ERROR='Error',
|
|
EXCEPT='Exception')
|
|
|
|
self.LogToFileAttr = construct_enum(CREATE=1, # Create or overwrite existing log file
|
|
APPEND=2) # Append to existing log file
|
|
|
|
def log_line(self, LogType, log_line, timestamp=True, line_delim='\n'):
|
|
""" Log one line of text
|
|
"""
|
|
log_timestamp = time()
|
|
log_entry = {'log_type' : LogType,
|
|
'log_timestamp' : log_timestamp,
|
|
'log_line' : log_line,
|
|
'_future' : None
|
|
}
|
|
# Store log in memory
|
|
if self.store_log:
|
|
self.log.append(log_entry)
|
|
return log_entry
|
|
|
|
|
|
class CLITestLogger(TestLogger):
|
|
""" Logger used with CLI (Command line interface) test suite. Logs on screen and to file if needed
|
|
"""
|
|
def __init__(self, store_log=True, file_name=None):
|
|
TestLogger.__init__(self)
|
|
self.log_file_name = file_name
|
|
#self.TIMESTAMP_FORMAT = '%y-%m-%d %H:%M:%S' # Full date and time
|
|
self.TIMESTAMP_FORMAT = '%H:%M:%S' # Time only
|
|
|
|
def log_print(self, log_entry, timestamp=True):
|
|
""" Prints on screen formatted log entry
|
|
"""
|
|
ts = log_entry['log_timestamp']
|
|
timestamp_str = datetime.datetime.fromtimestamp(ts).strftime("[%s] "% self.TIMESTAMP_FORMAT) if timestamp else ''
|
|
log_line_str = "%(log_type)s: %(log_line)s"% (log_entry)
|
|
return timestamp_str + log_line_str
|
|
|
|
def log_line(self, LogType, log_line, timestamp=True, line_delim='\n'):
|
|
""" Logs line, if log file output was specified log line will be appended
|
|
at the end of log file
|
|
"""
|
|
log_entry = TestLogger.log_line(self, LogType, log_line)
|
|
log_line_str = self.log_print(log_entry, timestamp)
|
|
if self.log_file_name is not None:
|
|
try:
|
|
with open(self.log_file_name, 'a') as f:
|
|
f.write(log_line_str + line_delim)
|
|
except IOError:
|
|
pass
|
|
return log_line_str
|
|
|
|
def get_module_avail(module_name):
|
|
""" This function returns True if module_name is already imported module
|
|
"""
|
|
return module_name in sys.modules.keys()
|
|
|
|
def get_autodetected_MUTS_list(platform_name_filter=None):
|
|
oldError = None
|
|
if os.name == 'nt':
|
|
# Disable Windows error box temporarily
|
|
oldError = ctypes.windll.kernel32.SetErrorMode(1) #note that SEM_FAILCRITICALERRORS = 1
|
|
|
|
mbeds = mbed_lstools.create()
|
|
detect_muts_list = mbeds.list_mbeds()
|
|
|
|
if os.name == 'nt':
|
|
ctypes.windll.kernel32.SetErrorMode(oldError)
|
|
|
|
return get_autodetected_MUTS(detect_muts_list, platform_name_filter=platform_name_filter)
|
|
|
|
def get_autodetected_MUTS(mbeds_list, platform_name_filter=None):
|
|
""" Function detects all connected to host mbed-enabled devices and generates artificial MUTS file.
|
|
If function fails to auto-detect devices it will return empty dictionary.
|
|
|
|
if get_module_avail('mbed_lstools'):
|
|
mbeds = mbed_lstools.create()
|
|
mbeds_list = mbeds.list_mbeds()
|
|
|
|
@param mbeds_list list of mbeds captured from mbed_lstools
|
|
@param platform_name You can filter 'platform_name' with list of filtered targets from 'platform_name_filter'
|
|
"""
|
|
result = {} # Should be in muts_all.json format
|
|
# Align mbeds_list from mbed_lstools to MUT file format (JSON dictionary with muts)
|
|
# mbeds_list = [{'platform_name': 'NUCLEO_F302R8', 'mount_point': 'E:', 'target_id': '07050200623B61125D5EF72A', 'serial_port': u'COM34'}]
|
|
index = 1
|
|
for mut in mbeds_list:
|
|
# Filter the MUTS if a filter is specified
|
|
|
|
if platform_name_filter and not mut['platform_name'] in platform_name_filter:
|
|
continue
|
|
|
|
# For mcu_unique - we are assigning 'platform_name_unique' value from mbedls output (if its existing)
|
|
# if not we are creating our own unique value (last few chars from platform's target_id).
|
|
m = {'mcu': mut['platform_name'],
|
|
'mcu_unique' : mut['platform_name_unique'] if 'platform_name_unique' in mut else "%s[%s]" % (mut['platform_name'], mut['target_id'][-4:]),
|
|
'port': mut['serial_port'],
|
|
'disk': mut['mount_point'],
|
|
'peripherals': [] # No peripheral detection
|
|
}
|
|
if index not in result:
|
|
result[index] = {}
|
|
result[index] = m
|
|
index += 1
|
|
return result
|
|
|
|
|
|
def get_autodetected_TEST_SPEC(mbeds_list,
|
|
use_default_toolchain=True,
|
|
use_supported_toolchains=False,
|
|
toolchain_filter=None,
|
|
platform_name_filter=None):
|
|
""" Function detects all connected to host mbed-enabled devices and generates artificial test_spec file.
|
|
If function fails to auto-detect devices it will return empty 'targets' test_spec description.
|
|
|
|
use_default_toolchain - if True add default toolchain to test_spec
|
|
use_supported_toolchains - if True add all supported toolchains to test_spec
|
|
toolchain_filter - if [...list of toolchains...] add from all toolchains only those in filter to test_spec
|
|
"""
|
|
result = {'targets': {} }
|
|
|
|
for mut in mbeds_list:
|
|
mcu = mut['mcu']
|
|
if platform_name_filter is None or (platform_name_filter and mut['mcu'] in platform_name_filter):
|
|
if mcu in TARGET_MAP:
|
|
default_toolchain = TARGET_MAP[mcu].default_toolchain
|
|
supported_toolchains = TARGET_MAP[mcu].supported_toolchains
|
|
|
|
# Decide which toolchains should be added to test specification toolchain pool for each target
|
|
toolchains = []
|
|
if use_default_toolchain:
|
|
toolchains.append(default_toolchain)
|
|
if use_supported_toolchains:
|
|
toolchains += supported_toolchains
|
|
if toolchain_filter is not None:
|
|
all_toolchains = supported_toolchains + [default_toolchain]
|
|
for toolchain in toolchain_filter:
|
|
if toolchain in all_toolchains:
|
|
toolchains.append(toolchain)
|
|
|
|
result['targets'][mcu] = list(set(toolchains))
|
|
return result
|
|
|
|
|
|
def get_default_test_options_parser():
|
|
""" Get common test script options used by CLI, web services etc.
|
|
"""
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('-i', '--tests',
|
|
dest='test_spec_filename',
|
|
metavar="FILE",
|
|
type=argparse_filestring_type,
|
|
help='Points to file with test specification')
|
|
|
|
parser.add_argument('-M', '--MUTS',
|
|
dest='muts_spec_filename',
|
|
metavar="FILE",
|
|
type=argparse_filestring_type,
|
|
help='Points to file with MUTs specification (overwrites settings.py and private_settings.py)')
|
|
|
|
parser.add_argument("-j", "--jobs",
|
|
dest='jobs',
|
|
metavar="NUMBER",
|
|
type=int,
|
|
help="Define number of compilation jobs. Default value is 1")
|
|
|
|
if get_module_avail('mbed_lstools'):
|
|
# Additional features available when mbed_lstools is installed on host and imported
|
|
# mbed_lstools allow users to detect connected to host mbed-enabled devices
|
|
parser.add_argument('--auto',
|
|
dest='auto_detect',
|
|
action="store_true",
|
|
help='Use mbed-ls module to detect all connected mbed devices')
|
|
|
|
toolchain_list = list(TOOLCHAINS) + ["DEFAULT", "ALL"]
|
|
parser.add_argument('--tc',
|
|
dest='toolchains_filter',
|
|
type=argparse_many(argparse_uppercase_type(toolchain_list, "toolchains")),
|
|
help="Toolchain filter for --auto argument. Use toolchains names separated by comma, 'default' or 'all' to select toolchains")
|
|
|
|
test_scopes = ','.join(["'%s'" % n for n in get_available_oper_test_scopes()])
|
|
parser.add_argument('--oper',
|
|
dest='operability_checks',
|
|
type=argparse_lowercase_type(get_available_oper_test_scopes(), "scopes"),
|
|
help='Perform interoperability tests between host and connected mbed devices. Available test scopes are: %s' % test_scopes)
|
|
|
|
parser.add_argument('--clean',
|
|
dest='clean',
|
|
action="store_true",
|
|
help='Clean the build directory')
|
|
|
|
parser.add_argument('-P', '--only-peripherals',
|
|
dest='test_only_peripheral',
|
|
default=False,
|
|
action="store_true",
|
|
help='Test only peripheral declared for MUT and skip common tests')
|
|
|
|
parser.add_argument("--profile", dest="profile", action="append",
|
|
type=argparse_filestring_type,
|
|
default=[])
|
|
|
|
parser.add_argument('-C', '--only-commons',
|
|
dest='test_only_common',
|
|
default=False,
|
|
action="store_true",
|
|
help='Test only board internals. Skip perpherials tests and perform common tests')
|
|
|
|
parser.add_argument('-n', '--test-by-names',
|
|
dest='test_by_names',
|
|
type=argparse_many(str),
|
|
help='Runs only test enumerated it this switch. Use comma to separate test case names')
|
|
|
|
parser.add_argument('-p', '--peripheral-by-names',
|
|
dest='peripheral_by_names',
|
|
type=argparse_many(str),
|
|
help='Forces discovery of particular peripherals. Use comma to separate peripheral names')
|
|
|
|
copy_methods = host_tests_plugins.get_plugin_caps('CopyMethod')
|
|
copy_methods_str = "Plugin support: " + ', '.join(copy_methods)
|
|
|
|
parser.add_argument('-c', '--copy-method',
|
|
dest='copy_method',
|
|
type=argparse_uppercase_type(copy_methods, "flash method"),
|
|
help="Select binary copy (flash) method. Default is Python's shutil.copy() method. %s"% copy_methods_str)
|
|
|
|
reset_methods = host_tests_plugins.get_plugin_caps('ResetMethod')
|
|
reset_methods_str = "Plugin support: " + ', '.join(reset_methods)
|
|
|
|
parser.add_argument('-r', '--reset-type',
|
|
dest='mut_reset_type',
|
|
default=None,
|
|
type=argparse_uppercase_type(reset_methods, "reset method"),
|
|
help='Extra reset method used to reset MUT by host test script. %s'% reset_methods_str)
|
|
|
|
parser.add_argument('-g', '--goanna-for-tests',
|
|
dest='goanna_for_tests',
|
|
action="store_true",
|
|
help='Run Goanna static analyse tool for tests. (Project will be rebuilded)')
|
|
|
|
parser.add_argument('-G', '--goanna-for-sdk',
|
|
dest='goanna_for_mbed_sdk',
|
|
action="store_true",
|
|
help='Run Goanna static analyse tool for mbed SDK (Project will be rebuilded)')
|
|
|
|
parser.add_argument('-s', '--suppress-summary',
|
|
dest='suppress_summary',
|
|
default=False,
|
|
action="store_true",
|
|
help='Suppresses display of wellformatted table with test results')
|
|
|
|
parser.add_argument('-t', '--test-summary',
|
|
dest='test_x_toolchain_summary',
|
|
default=False,
|
|
action="store_true",
|
|
help='Displays wellformatted table with test x toolchain test result per target')
|
|
|
|
parser.add_argument('-A', '--test-automation-report',
|
|
dest='test_automation_report',
|
|
default=False,
|
|
action="store_true",
|
|
help='Prints information about all tests and exits')
|
|
|
|
parser.add_argument('-R', '--test-case-report',
|
|
dest='test_case_report',
|
|
default=False,
|
|
action="store_true",
|
|
help='Prints information about all test cases and exits')
|
|
|
|
parser.add_argument("-S", "--supported-toolchains",
|
|
action="store_true",
|
|
dest="supported_toolchains",
|
|
default=False,
|
|
help="Displays supported matrix of MCUs and toolchains")
|
|
|
|
parser.add_argument("-O", "--only-build",
|
|
action="store_true",
|
|
dest="only_build_tests",
|
|
default=False,
|
|
help="Only build tests, skips actual test procedures (flashing etc.)")
|
|
|
|
parser.add_argument('--parallel',
|
|
dest='parallel_test_exec',
|
|
default=False,
|
|
action="store_true",
|
|
help='Experimental, you execute test runners for connected to your host MUTs in parallel (speeds up test result collection)')
|
|
|
|
parser.add_argument('--config',
|
|
dest='verbose_test_configuration_only',
|
|
default=False,
|
|
action="store_true",
|
|
help='Displays full test specification and MUTs configration and exits')
|
|
|
|
parser.add_argument('--loops',
|
|
dest='test_loops_list',
|
|
type=argparse_many(str),
|
|
help='Set no. of loops per test. Format: TEST_1=1,TEST_2=2,TEST_3=3')
|
|
|
|
parser.add_argument('--global-loops',
|
|
dest='test_global_loops_value',
|
|
type=int,
|
|
help='Set global number of test loops per test. Default value is set 1')
|
|
|
|
parser.add_argument('--consolidate-waterfall',
|
|
dest='consolidate_waterfall_test',
|
|
default=False,
|
|
action="store_true",
|
|
help='Used with --waterfall argument. Adds only one test to report reflecting outcome of waterfall test.')
|
|
|
|
parser.add_argument('-W', '--waterfall',
|
|
dest='waterfall_test',
|
|
default=False,
|
|
action="store_true",
|
|
help='Used with --loops or --global-loops arguments. Tests until OK result occurs and assumes test passed')
|
|
|
|
parser.add_argument('-N', '--firmware-name',
|
|
dest='firmware_global_name',
|
|
help='Set global name for all produced projects. Note, proper file extension will be added by buid scripts')
|
|
|
|
parser.add_argument('-u', '--shuffle',
|
|
dest='shuffle_test_order',
|
|
default=False,
|
|
action="store_true",
|
|
help='Shuffles test execution order')
|
|
|
|
parser.add_argument('--shuffle-seed',
|
|
dest='shuffle_test_seed',
|
|
default=None,
|
|
help='Shuffle seed (If you want to reproduce your shuffle order please use seed provided in test summary)')
|
|
|
|
parser.add_argument('-f', '--filter',
|
|
dest='general_filter_regex',
|
|
type=argparse_many(str),
|
|
default=None,
|
|
help='For some commands you can use filter to filter out results')
|
|
|
|
parser.add_argument('--inc-timeout',
|
|
dest='extend_test_timeout',
|
|
metavar="NUMBER",
|
|
type=int,
|
|
help='You can increase global timeout for each test by specifying additional test timeout in seconds')
|
|
|
|
parser.add_argument('-l', '--log',
|
|
dest='log_file_name',
|
|
help='Log events to external file (note not all console entries may be visible in log file)')
|
|
|
|
parser.add_argument('--report-html',
|
|
dest='report_html_file_name',
|
|
help='You can log test suite results in form of HTML report')
|
|
|
|
parser.add_argument('--report-junit',
|
|
dest='report_junit_file_name',
|
|
help='You can log test suite results in form of JUnit compliant XML report')
|
|
|
|
parser.add_argument("--report-build",
|
|
dest="report_build_file_name",
|
|
help="Output the build results to a junit xml file")
|
|
|
|
parser.add_argument("--report-text",
|
|
dest="report_text_file_name",
|
|
help="Output the build results to a text file")
|
|
|
|
parser.add_argument('--verbose-skipped',
|
|
dest='verbose_skipped_tests',
|
|
default=False,
|
|
action="store_true",
|
|
help='Prints some extra information about skipped tests')
|
|
|
|
parser.add_argument('-V', '--verbose-test-result',
|
|
dest='verbose_test_result_only',
|
|
default=False,
|
|
action="store_true",
|
|
help='Prints test serial output')
|
|
|
|
parser.add_argument('-v', '--verbose',
|
|
dest='verbose',
|
|
default=False,
|
|
action="store_true",
|
|
help='Verbose mode (prints some extra information)')
|
|
|
|
parser.add_argument('--version',
|
|
dest='version',
|
|
default=False,
|
|
action="store_true",
|
|
help='Prints script version and exits')
|
|
|
|
parser.add_argument('--stats-depth',
|
|
dest='stats_depth',
|
|
default=2,
|
|
type=int,
|
|
help="Depth level for static memory report")
|
|
return parser
|
|
|
|
def test_path_to_name(path, base):
|
|
"""Change all slashes in a path into hyphens
|
|
This creates a unique cross-platform test name based on the path
|
|
This can eventually be overriden by a to-be-determined meta-data mechanism"""
|
|
name_parts = []
|
|
head, tail = os.path.split(relpath(path,base))
|
|
while (tail and tail != "."):
|
|
name_parts.insert(0, tail)
|
|
head, tail = os.path.split(head)
|
|
|
|
return "-".join(name_parts).lower()
|
|
|
|
def get_test_config(config_name, target_name):
|
|
"""Finds the path to a test configuration file
|
|
config_name: path to a custom configuration file OR mbed OS interface "ethernet, wifi_odin, etc"
|
|
target_name: name of target to determing if mbed OS interface given is valid
|
|
returns path to config, will return None if no valid config is found
|
|
"""
|
|
# If they passed in a full path
|
|
if exists(config_name):
|
|
# This is a module config
|
|
return config_name
|
|
# Otherwise find the path to configuration file based on mbed OS interface
|
|
return TestConfig.get_config_path(config_name, target_name)
|
|
|
|
|
|
def find_tests(base_dir, target_name, toolchain_name, icetea, greentea, app_config=None):
|
|
""" Finds all tests in a directory recursively
|
|
:param base_dir: path to the directory to scan for tests (ex. 'path/to/project')
|
|
:param target_name: name of the target to use for scanning (ex. 'K64F')
|
|
:param toolchain_name: name of the toolchain to use for scanning (ex. 'GCC_ARM')
|
|
:param icetea: icetea enabled
|
|
:param greentea: greentea enabled
|
|
:param app_config - location of a chosen mbed_app.json file
|
|
|
|
returns a dictionary where keys are the test name, and the values are
|
|
lists of paths needed to biuld the test.
|
|
"""
|
|
|
|
# Temporary structure: tests referenced by (name, base, group, case) tuple
|
|
tests = {}
|
|
# List of common folders: (predicate function, path) tuple
|
|
commons = []
|
|
|
|
config = Config(target_name, base_dir, app_config)
|
|
|
|
# Scan the directory for paths to probe for 'TESTS' folders
|
|
base_resources = Resources(MockNotifier(), collect_ignores=True)
|
|
base_resources.scan_with_config(base_dir, config)
|
|
|
|
if greentea:
|
|
dirs = [d for d in base_resources.ignored_dirs if basename(d) == 'TESTS']
|
|
ignoreset = MbedIgnoreSet()
|
|
|
|
for directory in dirs:
|
|
ignorefile = join(directory, IGNORE_FILENAME)
|
|
if isfile(ignorefile):
|
|
ignoreset.add_mbedignore(directory, ignorefile)
|
|
for test_group_directory in os.listdir(directory):
|
|
grp_dir = join(directory, test_group_directory)
|
|
if not isdir(grp_dir) or ignoreset.is_ignored(grp_dir):
|
|
continue
|
|
grpignorefile = join(grp_dir, IGNORE_FILENAME)
|
|
if isfile(grpignorefile):
|
|
ignoreset.add_mbedignore(grp_dir, grpignorefile)
|
|
for test_case_directory in os.listdir(grp_dir):
|
|
d = join(directory, test_group_directory, test_case_directory)
|
|
if not isdir(d) or ignoreset.is_ignored(d):
|
|
continue
|
|
special_dirs = ['host_tests', 'COMMON']
|
|
if test_group_directory not in special_dirs and test_case_directory not in special_dirs:
|
|
test_name = test_path_to_name(d, base_dir)
|
|
tests[(test_name, directory, test_group_directory, test_case_directory)] = [d]
|
|
if test_case_directory == 'COMMON':
|
|
def predicate(base_pred, group_pred, name_base_group_case):
|
|
(name, base, group, case) = name_base_group_case
|
|
return base == base_pred and group == group_pred
|
|
|
|
commons.append((functools.partial(predicate, directory, test_group_directory), d))
|
|
if test_group_directory == 'COMMON':
|
|
def predicate(base_pred, name_base_group_case):
|
|
(name, base, group, case) = name_base_group_case
|
|
return base == base_pred
|
|
|
|
commons.append((functools.partial(predicate, directory), grp_dir))
|
|
|
|
if icetea:
|
|
dirs = [d for d in base_resources.ignored_dirs if basename(d) == 'TEST_APPS']
|
|
for directory in dirs:
|
|
if not isdir(directory):
|
|
continue
|
|
for subdir in os.listdir(directory):
|
|
d = join(directory, subdir)
|
|
if not isdir(d):
|
|
continue
|
|
if 'device' == subdir:
|
|
for test_dir in os.listdir(d):
|
|
test_dir_path = join(d, test_dir)
|
|
test_name = test_path_to_name(test_dir_path, base_dir)
|
|
tests[(test_name, directory, subdir, test_dir)] = [test_dir_path]
|
|
|
|
# Apply common directories
|
|
for pred, path in commons:
|
|
for test_identity, test_paths in six.iteritems(tests):
|
|
if pred(test_identity):
|
|
test_paths.append(path)
|
|
|
|
# Drop identity besides name
|
|
return {name: paths for (name, _, _, _), paths in six.iteritems(tests)}
|
|
|
|
|
|
def print_tests(tests, format="list", sort=True):
|
|
"""Given a dictionary of tests (as returned from "find_tests"), print them
|
|
in the specified format"""
|
|
if format == "list":
|
|
for test_name in sorted(tests.keys()):
|
|
test_path = tests[test_name][0]
|
|
print("Test Case:")
|
|
print(" Name: %s" % test_name)
|
|
print(" Path: %s" % test_path)
|
|
elif format == "json":
|
|
print(json.dumps({test_name: test_path[0] for test_name, test_paths
|
|
in tests}, indent=2))
|
|
else:
|
|
print("Unknown format '%s'" % format)
|
|
sys.exit(1)
|
|
|
|
def norm_relative_path(path, start):
|
|
"""This function will create a normalized, relative path. It mimics the
|
|
python os.path.relpath function, but also normalizes a Windows-syle path
|
|
that use backslashes to a Unix style path that uses forward slashes."""
|
|
path = os.path.normpath(path)
|
|
path = os.path.relpath(path, start)
|
|
path = path.replace("\\", "/")
|
|
return path
|
|
|
|
|
|
def build_test_worker(*args, **kwargs):
|
|
"""This is a worker function for the parallel building of tests. The `args`
|
|
and `kwargs` are passed directly to `build_project`. It returns a dictionary
|
|
with the following structure:
|
|
|
|
{
|
|
'result': `True` if no exceptions were thrown, `False` otherwise
|
|
'reason': Instance of exception that was thrown on failure
|
|
'bin_file': Path to the created binary if `build_project` was
|
|
successful. Not present otherwise
|
|
'kwargs': The keyword arguments that were passed to `build_project`.
|
|
This includes arguments that were modified (ex. report)
|
|
}
|
|
"""
|
|
ret = {
|
|
'result': False,
|
|
'args': args,
|
|
'kwargs': kwargs
|
|
}
|
|
|
|
# Use parent TOOLCHAIN_PATHS variable
|
|
for key, value in kwargs['toolchain_paths'].items():
|
|
TOOLCHAIN_PATHS[key] = value
|
|
|
|
del kwargs['toolchain_paths']
|
|
|
|
try:
|
|
bin_file, _ = build_project(*args, **kwargs)
|
|
ret['result'] = True
|
|
ret['bin_file'] = bin_file
|
|
ret['kwargs'] = kwargs
|
|
|
|
except NotSupportedException as e:
|
|
ret['reason'] = e
|
|
except ToolException as e:
|
|
ret['reason'] = e
|
|
except KeyboardInterrupt as e:
|
|
ret['reason'] = e
|
|
except:
|
|
# Print unhandled exceptions here
|
|
import traceback
|
|
traceback.print_exc(file=sys.stdout)
|
|
|
|
return ret
|
|
|
|
|
|
def build_tests(tests, base_source_paths, build_path, target, toolchain_name,
|
|
clean=False, notify=None, jobs=1, macros=None,
|
|
silent=False, report=None, properties=None,
|
|
continue_on_build_fail=False, app_config=None,
|
|
build_profile=None, stats_depth=None, ignore=None,
|
|
resource_filter=None, coverage_patterns=None):
|
|
"""Given the data structure from 'find_tests' and the typical build parameters,
|
|
build all the tests
|
|
|
|
Returns a tuple of the build result (True or False) followed by the test
|
|
build data structure"""
|
|
|
|
execution_directory = "."
|
|
base_path = norm_relative_path(build_path, execution_directory)
|
|
|
|
if isinstance(target, Target):
|
|
target_name = target.name
|
|
else:
|
|
target_name = target
|
|
target = TARGET_MAP[target_name]
|
|
cfg, _, _, _ = get_config(base_source_paths, target, app_config=app_config)
|
|
|
|
baud_rate = 9600
|
|
if 'platform.stdio-baud-rate' in cfg:
|
|
baud_rate = cfg['platform.stdio-baud-rate'].value
|
|
|
|
test_build = {
|
|
"platform": target_name,
|
|
"toolchain": toolchain_name,
|
|
"base_path": base_path,
|
|
"baud_rate": baud_rate,
|
|
"binary_type": "bootable",
|
|
"tests": {},
|
|
"test_apps": {}
|
|
}
|
|
|
|
jobs_count = int(jobs if jobs else cpu_count())
|
|
p = Pool(processes=jobs_count)
|
|
results = []
|
|
for test_name, test_paths in tests.items():
|
|
if not isinstance(test_paths, list):
|
|
test_paths = [test_paths]
|
|
|
|
test_build_path = os.path.join(build_path, test_paths[0])
|
|
src_paths = base_source_paths + test_paths
|
|
test_case_folder_name = os.path.basename(test_paths[0])
|
|
|
|
args = (src_paths, test_build_path, deepcopy(target), toolchain_name)
|
|
kwargs = {
|
|
'jobs': 1,
|
|
'clean': clean,
|
|
'macros': macros,
|
|
'name': test_case_folder_name,
|
|
'project_id': test_name,
|
|
'report': report,
|
|
'properties': properties,
|
|
'app_config': app_config,
|
|
'build_profile': build_profile,
|
|
'toolchain_paths': TOOLCHAIN_PATHS,
|
|
'stats_depth': stats_depth,
|
|
'notify': MockNotifier(),
|
|
'coverage_patterns': coverage_patterns,
|
|
'resource_filter': resource_filter
|
|
}
|
|
|
|
results.append(p.apply_async(build_test_worker, args, kwargs))
|
|
|
|
p.close()
|
|
result = True
|
|
itr = 0
|
|
while len(results):
|
|
itr += 1
|
|
if itr > 360000:
|
|
p.terminate()
|
|
p.join()
|
|
raise ToolException("Compile did not finish in 10 minutes")
|
|
else:
|
|
sleep(0.01)
|
|
pending = 0
|
|
for r in results:
|
|
if r.ready() is True:
|
|
try:
|
|
worker_result = r.get()
|
|
results.remove(r)
|
|
|
|
# Push all deferred notifications out to the actual notifier
|
|
new_notify = deepcopy(notify)
|
|
for message in worker_result['kwargs']['notify'].messages:
|
|
new_notify.notify(message)
|
|
|
|
# Take report from the kwargs and merge it into existing report
|
|
if report:
|
|
report_entry = worker_result['kwargs']['report'][target_name][toolchain_name]
|
|
report_entry[worker_result['kwargs']['project_id'].upper()][0][0]['output'] = new_notify.get_output()
|
|
for test_key in report_entry.keys():
|
|
report[target_name][toolchain_name][test_key] = report_entry[test_key]
|
|
|
|
# Set the overall result to a failure if a build failure occurred
|
|
if ('reason' in worker_result and
|
|
not worker_result['reason'] and
|
|
not isinstance(worker_result['reason'], NotSupportedException)):
|
|
result = False
|
|
break
|
|
|
|
|
|
# Adding binary path to test build result
|
|
if ('result' in worker_result and
|
|
worker_result['result'] and
|
|
'bin_file' in worker_result):
|
|
bin_file = norm_relative_path(worker_result['bin_file'], execution_directory)
|
|
|
|
test_key = 'test_apps' if 'test_apps-' in worker_result['kwargs']['project_id'] else 'tests'
|
|
test_build[test_key][worker_result['kwargs']['project_id']] = {
|
|
"binaries": [
|
|
{
|
|
"path": bin_file
|
|
}
|
|
]
|
|
}
|
|
|
|
test_key = worker_result['kwargs']['project_id'].upper()
|
|
print('Image: %s\n' % bin_file)
|
|
|
|
except:
|
|
if p._taskqueue.queue:
|
|
p._taskqueue.queue.clear()
|
|
sleep(0.5)
|
|
p.terminate()
|
|
p.join()
|
|
raise
|
|
else:
|
|
pending += 1
|
|
if pending >= jobs_count:
|
|
break
|
|
|
|
# Break as soon as possible if there is a failure and we are not
|
|
# continuing on build failures
|
|
if not result and not continue_on_build_fail:
|
|
if p._taskqueue.queue:
|
|
p._taskqueue.queue.clear()
|
|
sleep(0.5)
|
|
p.terminate()
|
|
break
|
|
|
|
p.join()
|
|
|
|
test_builds = {}
|
|
test_builds["%s-%s" % (target_name, toolchain_name)] = test_build
|
|
|
|
return result, test_builds
|
|
|
|
|
|
def test_spec_from_test_builds(test_builds):
|
|
for build in test_builds:
|
|
# Convert TZ target name to test spec platform name
|
|
#
|
|
# 1. All TZ targets should have name pattern: PLATFORM_[NPSA_]S/NS, where:
|
|
# (1) 'PLATFORM' for test spec platform name
|
|
# (2) 'NPSA' for non-PSA targets. Defaults to PSA target if absent.
|
|
# (3) 'S'/'NS' for secure/non-secure targets
|
|
# 2. Secure target may participate in Greentea, so its name is also truncated here.
|
|
if Target.get_target(test_builds[build]['platform']).is_TrustZone_target:
|
|
if test_builds[build]['platform'].endswith('_NS'):
|
|
test_builds[build]['platform'] = test_builds[build]['platform'][:-3]
|
|
elif test_builds[build]['platform'].endswith('_S'):
|
|
test_builds[build]['platform'] = test_builds[build]['platform'][:-2]
|
|
|
|
if test_builds[build]['platform'].endswith('_NPSA'):
|
|
test_builds[build]['platform'] = test_builds[build]['platform'][:-5]
|
|
# Convert Dual Core target name to test spec platform name
|
|
if test_builds[build]['platform'].endswith('_CM4') or test_builds[build]['platform'].endswith('_CM7'):
|
|
test_builds[build]['platform'] = test_builds[build]['platform'][:-4]
|
|
return {
|
|
"builds": test_builds
|
|
}
|