Remove autoformatting

pull/7745/head
Olli-Pekka Puolitaival 2018-08-15 08:56:26 +03:00
parent 3b3bed2406
commit bf1781b005
1 changed files with 173 additions and 194 deletions

View File

@ -31,13 +31,11 @@ import datetime
import threading
import ctypes
import functools
import subprocess
from colorama import Fore, Back, Style
from prettytable import PrettyTable
from copy import copy, deepcopy
from time import sleep, time
try:
from Queue import Queue, Empty
except ImportError:
@ -116,7 +114,6 @@ class ProcessObserver(Thread):
class SingleTestExecutor(threading.Thread):
""" Example: Single test class in separate thread usage
"""
def __init__(self, single_test):
self.single_test = single_test
threading.Thread.__init__(self)
@ -137,7 +134,7 @@ class SingleTestExecutor(threading.Thread):
# table shows text x toolchain test result matrix
print(self.single_test.generate_test_summary_by_target(
test_summary, shuffle_seed))
print("Completed in %.2f sec" % (elapsed_time))
print("Completed in %.2f sec"% (elapsed_time))
class SingleTestRunner(object):
@ -160,26 +157,26 @@ class SingleTestRunner(object):
TEST_RESULT_NOT_SUPPORTED = "NOT_SUPPORTED"
GLOBAL_LOOPS_COUNT = 1 # How many times each test should be repeated
TEST_LOOPS_LIST = [] # We redefine no.of loops per test_id
TEST_LOOPS_DICT = {} # TEST_LOOPS_LIST in dict format: { test_id : test_loop_count}
TEST_LOOPS_LIST = [] # We redefine no.of loops per test_id
TEST_LOOPS_DICT = {} # TEST_LOOPS_LIST in dict format: { test_id : test_loop_count}
muts = {} # MUTs descriptor (from external file)
test_spec = {} # Test specification (from external file)
muts = {} # MUTs descriptor (from external file)
test_spec = {} # Test specification (from external file)
# mbed test suite -> SingleTestRunner
TEST_RESULT_MAPPING = {"success": TEST_RESULT_OK,
"failure": TEST_RESULT_FAIL,
"error": TEST_RESULT_ERROR,
"ioerr_copy": TEST_RESULT_IOERR_COPY,
"ioerr_disk": TEST_RESULT_IOERR_DISK,
"ioerr_serial": TEST_RESULT_IOERR_SERIAL,
"timeout": TEST_RESULT_TIMEOUT,
"no_image": TEST_RESULT_NO_IMAGE,
"end": TEST_RESULT_UNDEF,
"mbed_assert": TEST_RESULT_MBED_ASSERT,
"build_failed": TEST_RESULT_BUILD_FAILED,
"not_supproted": TEST_RESULT_NOT_SUPPORTED
}
TEST_RESULT_MAPPING = {"success" : TEST_RESULT_OK,
"failure" : TEST_RESULT_FAIL,
"error" : TEST_RESULT_ERROR,
"ioerr_copy" : TEST_RESULT_IOERR_COPY,
"ioerr_disk" : TEST_RESULT_IOERR_DISK,
"ioerr_serial" : TEST_RESULT_IOERR_SERIAL,
"timeout" : TEST_RESULT_TIMEOUT,
"no_image" : TEST_RESULT_NO_IMAGE,
"end" : TEST_RESULT_UNDEF,
"mbed_assert" : TEST_RESULT_MBED_ASSERT,
"build_failed" : TEST_RESULT_BUILD_FAILED,
"not_supproted" : TEST_RESULT_NOT_SUPPORTED
}
def __init__(self,
_global_loops_count=1,
@ -289,18 +286,17 @@ class SingleTestRunner(object):
# Database related initializations
self.db_logger = factory_db_logger(self.opts_db_url)
self.db_logger_build_id = None # Build ID (database index of build_id table)
self.db_logger_build_id = None # Build ID (database index of build_id table)
# Let's connect to database to set up credentials and confirm database is ready
if self.db_logger:
self.db_logger.connect_url(self.opts_db_url) # Save db access info inside db_logger object
self.db_logger.connect_url(self.opts_db_url) # Save db access info inside db_logger object
if self.db_logger.is_connected():
# Get hostname and uname so we can use it as build description
# when creating new build_id in external database
(_hostname, _uname) = self.db_logger.get_hostname()
_host_location = os.path.dirname(os.path.abspath(__file__))
build_id_type = None if self.opts_only_build_tests is None else self.db_logger.BUILD_ID_TYPE_BUILD_ONLY
self.db_logger_build_id = self.db_logger.get_next_build_id(_hostname, desc=_uname,
location=_host_location, type=build_id_type)
self.db_logger_build_id = self.db_logger.get_next_build_id(_hostname, desc=_uname, location=_host_location, type=build_id_type)
self.db_logger.disconnect()
def dump_options(self):
@ -311,23 +307,23 @@ class SingleTestRunner(object):
or
data_str = json.dumps(self.dump_options())
"""
result = {"db_url": str(self.opts_db_url),
"log_file_name": str(self.opts_log_file_name),
"shuffle_test_order": str(self.opts_shuffle_test_order),
"shuffle_test_seed": str(self.opts_shuffle_test_seed),
"test_by_names": str(self.opts_test_by_names),
"peripheral_by_names": str(self.opts_peripheral_by_names),
"test_only_peripheral": str(self.opts_test_only_peripheral),
"test_only_common": str(self.opts_test_only_common),
"verbose": str(self.opts_verbose),
"firmware_global_name": str(self.opts_firmware_global_name),
"only_build_tests": str(self.opts_only_build_tests),
"copy_method": str(self.opts_copy_method),
"mut_reset_type": str(self.opts_mut_reset_type),
"jobs": str(self.opts_jobs),
"extend_test_timeout": str(self.opts_extend_test_timeout),
"_dummy": ''
}
result = {"db_url" : str(self.opts_db_url),
"log_file_name" : str(self.opts_log_file_name),
"shuffle_test_order" : str(self.opts_shuffle_test_order),
"shuffle_test_seed" : str(self.opts_shuffle_test_seed),
"test_by_names" : str(self.opts_test_by_names),
"peripheral_by_names" : str(self.opts_peripheral_by_names),
"test_only_peripheral" : str(self.opts_test_only_peripheral),
"test_only_common" : str(self.opts_test_only_common),
"verbose" : str(self.opts_verbose),
"firmware_global_name" : str(self.opts_firmware_global_name),
"only_build_tests" : str(self.opts_only_build_tests),
"copy_method" : str(self.opts_copy_method),
"mut_reset_type" : str(self.opts_mut_reset_type),
"jobs" : str(self.opts_jobs),
"extend_test_timeout" : str(self.opts_extend_test_timeout),
"_dummy" : ''
}
return result
def shuffle_random_func(self):
@ -369,6 +365,7 @@ class SingleTestRunner(object):
'shuffle_random_seed': self.shuffle_random_seed
}
# print '=== %s::%s ===' % (target, toolchain)
# Let's build our test
if target not in TARGET_MAP:
@ -398,7 +395,7 @@ class SingleTestRunner(object):
print(self.logger.log_line(
self.logger.LogType.NOTIF,
'Skipped tests for %s target. Toolchain %s is not '
'supported for this target' % (T.name, toolchain)))
'supported for this target'% (T.name, toolchain)))
continue
except ToolException:
@ -441,13 +438,13 @@ class SingleTestRunner(object):
_extra=json.dumps(self.dump_options()))
self.db_logger.disconnect();
valid_test_map_keys = self.get_valid_tests(test_map_keys, target, toolchain, test_ids,
self.opts_include_non_automated)
valid_test_map_keys = self.get_valid_tests(test_map_keys, target, toolchain, test_ids, self.opts_include_non_automated)
skipped_test_map_keys = self.get_skipped_tests(test_map_keys, valid_test_map_keys)
for skipped_test_id in skipped_test_map_keys:
test_suite_properties['skipped'].append(skipped_test_id)
# First pass through all tests and determine which libraries need to be built
libraries = []
for test_id in valid_test_map_keys:
@ -459,6 +456,7 @@ class SingleTestRunner(object):
if lib['build_dir'] in test.dependencies and lib['id'] not in libraries:
libraries.append(lib['id'])
clean_project_options = True if self.opts_goanna_for_tests or clean or self.opts_clean else None
# Build all required libraries
@ -480,10 +478,11 @@ class SingleTestRunner(object):
'There were errors while building library %s' % lib_id))
continue
for test_id in valid_test_map_keys:
test = TEST_MAP[test_id]
test_suite_properties['test.libs.%s.%s.%s' % (target, toolchain, test_id)] = ', '.join(libraries)
test_suite_properties['test.libs.%s.%s.%s'% (target, toolchain, test_id)] = ', '.join(libraries)
# TODO: move this 2 below loops to separate function
INC_DIRS = []
@ -495,18 +494,18 @@ class SingleTestRunner(object):
for lib_id in libraries:
if 'macros' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['macros']:
MACROS.extend(LIBRARY_MAP[lib_id]['macros'])
MACROS.append('TEST_SUITE_TARGET_NAME="%s"' % target)
MACROS.append('TEST_SUITE_TEST_ID="%s"' % test_id)
MACROS.append('TEST_SUITE_TARGET_NAME="%s"'% target)
MACROS.append('TEST_SUITE_TEST_ID="%s"'% test_id)
test_uuid = uuid.uuid4()
MACROS.append('TEST_SUITE_UUID="%s"' % str(test_uuid))
MACROS.append('TEST_SUITE_UUID="%s"'% str(test_uuid))
# Prepare extended test results data structure (it can be used to generate detailed test report)
if target not in self.test_summary_ext:
self.test_summary_ext[target] = {} # test_summary_ext : toolchain
if toolchain not in self.test_summary_ext[target]:
self.test_summary_ext[target][toolchain] = {} # test_summary_ext : toolchain : target
self.test_summary_ext[target][toolchain] = {} # test_summary_ext : toolchain : target
tt_test_id = "%s::%s::%s" % (toolchain, target, test_id) # For logging only
tt_test_id = "%s::%s::%s" % (toolchain, target, test_id) # For logging only
project_name = self.opts_firmware_global_name if self.opts_firmware_global_name else None
try:
@ -524,6 +523,7 @@ class SingleTestRunner(object):
except Exception as e:
project_name_str = project_name if project_name is not None else test_id
test_result = self.TEST_RESULT_FAIL
if isinstance(e, ToolException):
@ -538,6 +538,7 @@ class SingleTestRunner(object):
'Project %s is not supported' % project_name_str))
test_result = self.TEST_RESULT_NOT_SUPPORTED
# Append test results to global test summary
self.test_summary.append(
(test_result, target, toolchain, test_id,
@ -548,17 +549,17 @@ class SingleTestRunner(object):
if test_id not in self.test_summary_ext[target][toolchain]:
self.test_summary_ext[target][toolchain][test_id] = []
self.test_summary_ext[target][toolchain][test_id].append({0: {
'result': test_result,
'output': '',
'target_name': target,
self.test_summary_ext[target][toolchain][test_id].append({ 0: {
'result' : test_result,
'output' : '',
'target_name' : target,
'target_name_unique': target,
'toolchain_name': toolchain,
'id': test_id,
'description': test.get_description(),
'elapsed_time': 0,
'duration': 0,
'copy_method': None
'toolchain_name' : toolchain,
'id' : test_id,
'description' : test.get_description(),
'elapsed_time' : 0,
'duration' : 0,
'copy_method' : None
}})
continue
@ -576,9 +577,9 @@ class SingleTestRunner(object):
test_spec = self.shape_test_request(target, path, test_id, test_duration)
test_loops = self.get_test_loop_count(test_id)
test_suite_properties['test.duration.%s.%s.%s' % (target, toolchain, test_id)] = test_duration
test_suite_properties['test.loops.%s.%s.%s' % (target, toolchain, test_id)] = test_loops
test_suite_properties['test.path.%s.%s.%s' % (target, toolchain, test_id)] = path
test_suite_properties['test.duration.%s.%s.%s'% (target, toolchain, test_id)] = test_duration
test_suite_properties['test.loops.%s.%s.%s'% (target, toolchain, test_id)] = test_loops
test_suite_properties['test.path.%s.%s.%s'% (target, toolchain, test_id)] = path
# read MUTs, test specification and perform tests
handle_results = self.handle(test_spec, target, toolchain, test_loops=test_loops)
@ -626,11 +627,12 @@ class SingleTestRunner(object):
if self.opts_shuffle_test_seed is not None and self.is_shuffle_seed_float():
self.shuffle_random_seed = round(float(self.opts_shuffle_test_seed), self.SHUFFLE_SEED_ROUND)
if self.opts_parallel_test_exec:
###################################################################
# Experimental, parallel test execution per singletest instance.
###################################################################
execute_threads = [] # Threads used to build mbed SDL, libs, test cases and execute tests
execute_threads = [] # Threads used to build mbed SDL, libs, test cases and execute tests
# Note: We are building here in parallel for each target separately!
# So we are not building the same thing multiple times and compilers
# in separate threads do not collide.
@ -638,29 +640,26 @@ class SingleTestRunner(object):
# get information about available MUTs (per target).
for target, toolchains in self.test_spec['targets'].items():
self.test_suite_properties_ext[target] = {}
t = threading.Thread(target=self.execute_thread_slice, args=(
q, target, toolchains, clean, test_ids, self.build_report, self.build_properties))
t = threading.Thread(target=self.execute_thread_slice, args = (q, target, toolchains, clean, test_ids, self.build_report, self.build_properties))
t.daemon = True
t.start()
execute_threads.append(t)
for t in execute_threads:
q.get() # t.join() would block some threads because we should not wait in any order for thread end
q.get() # t.join() would block some threads because we should not wait in any order for thread end
else:
# Serialized (not parallel) test execution
for target, toolchains in self.test_spec['targets'].items():
if target not in self.test_suite_properties_ext:
self.test_suite_properties_ext[target] = {}
self.execute_thread_slice(q, target, toolchains, clean, test_ids, self.build_report,
self.build_properties)
self.execute_thread_slice(q, target, toolchains, clean, test_ids, self.build_report, self.build_properties)
q.get()
if self.db_logger:
self.db_logger.reconnect();
if self.db_logger.is_connected():
self.db_logger.update_build_id_info(self.db_logger_build_id,
_status_fk=self.db_logger.BUILD_ID_STATUS_COMPLETED)
self.db_logger.update_build_id_info(self.db_logger_build_id, _status_fk=self.db_logger.BUILD_ID_STATUS_COMPLETED)
self.db_logger.disconnect();
return self.test_summary, self.shuffle_random_seed, self.test_summary_ext, self.test_suite_properties_ext, self.build_report, self.build_properties
@ -684,8 +683,8 @@ class SingleTestRunner(object):
continue
if (self.opts_peripheral_by_names and test.peripherals and
not any((i in self.opts_peripheral_by_names)
for i in test.peripherals)):
not any((i in self.opts_peripheral_by_names)
for i in test.peripherals)):
# We will skip tests not forced with -p option
if self.opts_verbose_skipped_tests:
print(self.logger.log_line(
@ -756,7 +755,7 @@ class SingleTestRunner(object):
result = "Test summary:\n"
for target in unique_targets:
result_dict = {} # test : { toolchain : result }
result_dict = {} # test : { toolchain : result }
unique_target_toolchains = []
for test in test_summary:
if test[TARGET_INDEX] == target:
@ -770,7 +769,7 @@ class SingleTestRunner(object):
pt = PrettyTable(pt_cols)
for col in pt_cols:
pt.align[col] = "l"
pt.padding_width = 1 # One space between column edges and contents (default)
pt.padding_width = 1 # One space between column edges and contents (default)
for test in unique_tests:
if test in result_dict:
@ -782,40 +781,40 @@ class SingleTestRunner(object):
row.append(test_results[toolchain])
pt.add_row(row)
result += pt.get_string()
shuffle_seed_text = "Shuffle Seed: %.*f" % (self.SHUFFLE_SEED_ROUND,
shuffle_seed if shuffle_seed else self.shuffle_random_seed)
result += "\n%s" % (shuffle_seed_text if self.opts_shuffle_test_order else '')
shuffle_seed_text = "Shuffle Seed: %.*f"% (self.SHUFFLE_SEED_ROUND,
shuffle_seed if shuffle_seed else self.shuffle_random_seed)
result += "\n%s"% (shuffle_seed_text if self.opts_shuffle_test_order else '')
return result
def generate_test_summary(self, test_summary, shuffle_seed=None):
""" Prints well-formed summary with results (SQL table like)
table shows target x test results matrix across
"""
success_code = 0 # Success code that can be leter returned to
success_code = 0 # Success code that can be leter returned to
result = "Test summary:\n"
# Pretty table package is used to print results
pt = PrettyTable(["Result", "Target", "Toolchain", "Test ID", "Test Description",
"Elapsed Time (sec)", "Timeout (sec)", "Loops"])
pt.align["Result"] = "l" # Left align
pt.align["Target"] = "l" # Left align
pt.align["Toolchain"] = "l" # Left align
pt.align["Test ID"] = "l" # Left align
pt.align["Test Description"] = "l" # Left align
pt.padding_width = 1 # One space between column edges and contents (default)
pt.align["Result"] = "l" # Left align
pt.align["Target"] = "l" # Left align
pt.align["Toolchain"] = "l" # Left align
pt.align["Test ID"] = "l" # Left align
pt.align["Test Description"] = "l" # Left align
pt.padding_width = 1 # One space between column edges and contents (default)
result_dict = {self.TEST_RESULT_OK: 0,
self.TEST_RESULT_FAIL: 0,
self.TEST_RESULT_ERROR: 0,
self.TEST_RESULT_UNDEF: 0,
self.TEST_RESULT_IOERR_COPY: 0,
self.TEST_RESULT_IOERR_DISK: 0,
self.TEST_RESULT_IOERR_SERIAL: 0,
self.TEST_RESULT_NO_IMAGE: 0,
self.TEST_RESULT_TIMEOUT: 0,
self.TEST_RESULT_MBED_ASSERT: 0,
self.TEST_RESULT_BUILD_FAILED: 0,
self.TEST_RESULT_NOT_SUPPORTED: 0
}
result_dict = {self.TEST_RESULT_OK : 0,
self.TEST_RESULT_FAIL : 0,
self.TEST_RESULT_ERROR : 0,
self.TEST_RESULT_UNDEF : 0,
self.TEST_RESULT_IOERR_COPY : 0,
self.TEST_RESULT_IOERR_DISK : 0,
self.TEST_RESULT_IOERR_SERIAL : 0,
self.TEST_RESULT_NO_IMAGE : 0,
self.TEST_RESULT_TIMEOUT : 0,
self.TEST_RESULT_MBED_ASSERT : 0,
self.TEST_RESULT_BUILD_FAILED : 0,
self.TEST_RESULT_NOT_SUPPORTED : 0
}
for test in test_summary:
if test[0] in result_dict:
@ -825,11 +824,10 @@ class SingleTestRunner(object):
result += "\n"
# Print result count
result += "Result: " + ' / '.join(
['%s %s' % (value, key) for (key, value) in {k: v for k, v in result_dict.items() if v != 0}.items()])
shuffle_seed_text = "Shuffle Seed: %.*f\n" % (self.SHUFFLE_SEED_ROUND,
shuffle_seed if shuffle_seed else self.shuffle_random_seed)
result += "\n%s" % (shuffle_seed_text if self.opts_shuffle_test_order else '')
result += "Result: " + ' / '.join(['%s %s' % (value, key) for (key, value) in {k: v for k, v in result_dict.items() if v != 0}.items()])
shuffle_seed_text = "Shuffle Seed: %.*f\n"% (self.SHUFFLE_SEED_ROUND,
shuffle_seed if shuffle_seed else self.shuffle_random_seed)
result += "\n%s"% (shuffle_seed_text if self.opts_shuffle_test_order else '')
return result
def test_loop_list_to_dict(self, test_loops_str):
@ -885,7 +883,7 @@ class SingleTestRunner(object):
return None
mcu = mut['mcu']
copy_method = mut.get('copy_method') # Available board configuration selection e.g. core selection etc.
copy_method = mut.get('copy_method') # Available board configuration selection e.g. core selection etc.
if self.db_logger:
self.db_logger.reconnect()
@ -941,14 +939,13 @@ class SingleTestRunner(object):
# Host test execution
start_host_exec_time = time()
single_test_result = self.TEST_RESULT_UNDEF # single test run result
single_test_result = self.TEST_RESULT_UNDEF # single test run result
_copy_method = selected_copy_method
if not exists(image_path):
single_test_result = self.TEST_RESULT_NO_IMAGE
elapsed_time = 0
single_test_output = self.logger.log_line(self.logger.LogType.ERROR,
'Image file does not exist: %s' % image_path)
single_test_output = self.logger.log_line(self.logger.LogType.ERROR, 'Image file does not exist: %s'% image_path)
print(single_test_output)
else:
# Host test execution
@ -968,20 +965,20 @@ class SingleTestRunner(object):
# Store test result
test_all_result.append(single_test_result)
total_elapsed_time = time() - start_host_exec_time # Test time with copy (flashing) / reset
total_elapsed_time = time() - start_host_exec_time # Test time with copy (flashing) / reset
elapsed_time = single_testduration # TIme of single test case execution after reset
detailed_test_results[test_index] = {
'result': single_test_result,
'output': single_test_output,
'target_name': target_name,
'target_name_unique': target_name_unique,
'toolchain_name': toolchain_name,
'id': test_id,
'description': test_description,
'elapsed_time': round(elapsed_time, 2),
'duration': single_timeout,
'copy_method': _copy_method,
'result' : single_test_result,
'output' : single_test_output,
'target_name' : target_name,
'target_name_unique' : target_name_unique,
'toolchain_name' : toolchain_name,
'id' : test_id,
'description' : test_description,
'elapsed_time' : round(elapsed_time, 2),
'duration' : single_timeout,
'copy_method' : _copy_method,
}
print(self.print_test_result(
@ -1009,8 +1006,7 @@ class SingleTestRunner(object):
if self.db_logger:
self.db_logger.disconnect()
return (self.shape_global_test_loop_result(test_all_result,
self.opts_waterfall_test and self.opts_consolidate_waterfall_test),
return (self.shape_global_test_loop_result(test_all_result, self.opts_waterfall_test and self.opts_consolidate_waterfall_test),
target_name_unique,
toolchain_name,
test_id,
@ -1048,7 +1044,7 @@ class SingleTestRunner(object):
tokens.append(test_description)
separator = "::"
time_info = " in %.2f of %d sec" % (round(elapsed_time, 2), duration)
result = separator.join(tokens) + " [" + test_result + "]" + time_info
result = separator.join(tokens) + " [" + test_result +"]" + time_info
return Fore.MAGENTA + result + Fore.RESET
def shape_test_loop_ok_result_count(self, test_all_result):
@ -1056,7 +1052,7 @@ class SingleTestRunner(object):
"""
test_loop_count = len(test_all_result)
test_loop_ok_result = test_all_result.count(self.TEST_RESULT_OK)
return "%d/%d" % (test_loop_ok_result, test_loop_count)
return "%d/%d"% (test_loop_ok_result, test_loop_count)
def shape_global_test_loop_result(self, test_all_result, waterfall_and_consolidate):
""" Reformats list of results to simple string
@ -1110,16 +1106,16 @@ class SingleTestRunner(object):
Returns string
"""
result = None
if re.search("HOST: Property '%s'" % property_name, line) is not None:
property = re.search("HOST: Property '%s' = '([\w\d _]+)'" % property_name, line)
if re.search("HOST: Property '%s'"% property_name, line) is not None:
property = re.search("HOST: Property '%s' = '([\w\d _]+)'"% property_name, line)
if property is not None and len(property.groups()) == 1:
result = property.groups()[0]
return result
cmd = ["python",
'%s.py' % name,
'%s.py'% name,
'-d', disk,
'-f', '"%s"' % image_path,
'-f', '"%s"'% image_path,
'-p', port,
'-t', str(duration),
'-C', str(program_cycle_s)]
@ -1143,7 +1139,7 @@ class SingleTestRunner(object):
proc = Popen(cmd, stdout=PIPE, cwd=HOST_TESTS)
obs = ProcessObserver(proc)
update_once_flag = {} # Stores flags checking if some auto-parameter was already set
update_once_flag = {} # Stores flags checking if some auto-parameter was already set
line = ''
output = []
start_time = time()
@ -1182,7 +1178,7 @@ class SingleTestRunner(object):
else:
line += c
end_time = time()
testcase_duration = end_time - start_time # Test case duration from reset to {end}
testcase_duration = end_time - start_time # Test case duration from reset to {end}
c = get_char_from_queue(obs)
@ -1259,11 +1255,11 @@ def show_json_file_format_error(json_spec_filename, line, column):
with open(json_spec_filename) as data_file:
line_no = 1
for json_line in data_file:
if line_no + 5 >= line: # Print last few lines before error
print('Line %d:\t' % line_no + json_line)
if line_no + 5 >= line: # Print last few lines before error
print('Line %d:\t'%line_no + json_line)
if line_no == line:
print('%s\t%s^'(' ' * len('Line %d:' % line_no),
'-' * (column - 1)))
print('%s\t%s^' (' ' * len('Line %d:' % line_no),
'-' * (column - 1)))
break
line_no += 1
@ -1311,7 +1307,7 @@ def get_json_data_from_file(json_spec_filename, verbose=False):
show_json_file_format_error(json_spec_filename, line, column)
except IOError as fileopen_error_msg:
print('JSON file %s not opened. Reason: %s\n' %
print('JSON file %s not opened. Reason: %s\n'%
(json_spec_filename, fileopen_error_msg))
if verbose and result:
pp = pprint.PrettyPrinter(indent=4)
@ -1384,7 +1380,7 @@ def print_test_configuration_from_json(json_data, join_delim=", "):
target_supported_toolchains = get_target_supported_toolchains(target)
if not target_supported_toolchains:
target_supported_toolchains = []
target_name = target if target in TARGET_MAP else "%s*" % target
target_name = target if target in TARGET_MAP else "%s*"% target
row = [target_name]
toolchains = targets[target]
@ -1415,27 +1411,27 @@ def print_test_configuration_from_json(json_data, join_delim=", "):
pt.add_row(row)
# generate result string
result = pt.get_string() # Test specification table
result = pt.get_string() # Test specification table
if toolchain_conflicts or toolchain_path_conflicts:
result += "\n"
result += "Toolchain conflicts:\n"
for target in toolchain_conflicts:
if target not in TARGET_MAP:
result += "\t* Target %s unknown\n" % (target)
result += "\t* Target %s unknown\n"% (target)
conflict_target_list = join_delim.join(toolchain_conflicts[target])
sufix = 's' if len(toolchain_conflicts[target]) > 1 else ''
result += "\t* Target %s does not support %s toolchain%s\n" % (target, conflict_target_list, sufix)
result += "\t* Target %s does not support %s toolchain%s\n"% (target, conflict_target_list, sufix)
for toolchain in toolchain_path_conflicts:
# Let's check toolchain configuration
# Let's check toolchain configuration
if toolchain in TOOLCHAIN_PATHS:
toolchain_path = TOOLCHAIN_PATHS[toolchain]
if not os.path.isdir(toolchain_path):
result += "\t# Toolchain %s path not found: %s\n" % (toolchain, toolchain_path)
result += "\t# Toolchain %s path not found: %s\n"% (toolchain, toolchain_path)
return result
def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=',', platform_filter=None):
def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=',',platform_filter=None):
""" Generates table summary with all test cases and additional test cases
information using pretty print functionality. Allows test suite user to
see test cases
@ -1466,7 +1462,7 @@ def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=','
counter_all = 0
counter_automated = 0
pt.padding_width = 1 # One space between column edges and contents (default)
pt.padding_width = 1 # One space between column edges and contents (default)
for test_id in sorted(TEST_MAP.keys()):
if platform_filter is not None:
@ -1520,8 +1516,7 @@ def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=','
pt.align['percent [%]'] = "r"
for unique_id in unique_test_id:
# print "\t\t%s: %d / %d" % (unique_id, counter_dict_test_id_types[unique_id], counter_dict_test_id_types_all[unique_id])
percent_progress = round(
100.0 * counter_dict_test_id_types[unique_id] / float(counter_dict_test_id_types_all[unique_id]), 1)
percent_progress = round(100.0 * counter_dict_test_id_types[unique_id] / float(counter_dict_test_id_types_all[unique_id]), 1)
str_progress = progress_bar(percent_progress, 75)
row = [unique_id,
counter_dict_test_id_types[unique_id],
@ -1538,7 +1533,7 @@ def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=','
def progress_bar(percent_progress, saturation=0):
""" This function creates progress bar with optional simple saturation mark
"""
step = int(percent_progress / 2) # Scale by to (scale: 1 - 50)
step = int(percent_progress / 2) # Scale by to (scale: 1 - 50)
str_progress = '#' * step + '.' * int(50 - step)
c = '!' if str_progress[38] == '.' else '|'
if saturation > 0:
@ -1578,32 +1573,26 @@ def singletest_in_cli_mode(single_test):
if single_test.opts_report_html_file_name:
# Export results in form of HTML report to separate file
report_exporter = ReportExporter(ResultExporterType.HTML)
report_exporter.report_to_file(test_summary_ext, single_test.opts_report_html_file_name,
test_suite_properties=test_suite_properties_ext)
report_exporter.report_to_file(test_summary_ext, single_test.opts_report_html_file_name, test_suite_properties=test_suite_properties_ext)
if single_test.opts_report_junit_file_name:
# Export results in form of JUnit XML report to separate file
report_exporter = ReportExporter(ResultExporterType.JUNIT)
report_exporter.report_to_file(test_summary_ext, single_test.opts_report_junit_file_name,
test_suite_properties=test_suite_properties_ext)
report_exporter.report_to_file(test_summary_ext, single_test.opts_report_junit_file_name, test_suite_properties=test_suite_properties_ext)
if single_test.opts_report_text_file_name:
# Export results in form of a text file
report_exporter = ReportExporter(ResultExporterType.TEXT)
report_exporter.report_to_file(test_summary_ext, single_test.opts_report_text_file_name,
test_suite_properties=test_suite_properties_ext)
report_exporter.report_to_file(test_summary_ext, single_test.opts_report_text_file_name, test_suite_properties=test_suite_properties_ext)
if single_test.opts_report_build_file_name:
# Export build results as html report to sparate file
report_exporter = ReportExporter(ResultExporterType.JUNIT, package="build")
report_exporter.report_to_file(build_report, single_test.opts_report_build_file_name,
test_suite_properties=build_properties)
report_exporter.report_to_file(build_report, single_test.opts_report_build_file_name, test_suite_properties=build_properties)
# Returns True if no build failures of the test projects or their dependencies
return status
class TestLogger():
""" Super-class for logging and printing ongoing events for test suite pass
"""
def __init__(self, store_log=True):
""" We can control if logger actually stores log in memory
or just handled all log entries immediately
@ -1619,18 +1608,18 @@ class TestLogger():
ERROR='Error',
EXCEPT='Exception')
self.LogToFileAttr = construct_enum(CREATE=1, # Create or overwrite existing log file
APPEND=2) # Append to existing log file
self.LogToFileAttr = construct_enum(CREATE=1, # Create or overwrite existing log file
APPEND=2) # Append to existing log file
def log_line(self, LogType, log_line, timestamp=True, line_delim='\n'):
""" Log one line of text
"""
log_timestamp = time()
log_entry = {'log_type': LogType,
'log_timestamp': log_timestamp,
'log_line': log_line,
'_future': None
}
log_entry = {'log_type' : LogType,
'log_timestamp' : log_timestamp,
'log_line' : log_line,
'_future' : None
}
# Store log in memory
if self.store_log:
self.log.append(log_entry)
@ -1640,20 +1629,18 @@ class TestLogger():
class CLITestLogger(TestLogger):
""" Logger used with CLI (Command line interface) test suite. Logs on screen and to file if needed
"""
def __init__(self, store_log=True, file_name=None):
TestLogger.__init__(self)
self.log_file_name = file_name
# self.TIMESTAMP_FORMAT = '%y-%m-%d %H:%M:%S' # Full date and time
self.TIMESTAMP_FORMAT = '%H:%M:%S' # Time only
#self.TIMESTAMP_FORMAT = '%y-%m-%d %H:%M:%S' # Full date and time
self.TIMESTAMP_FORMAT = '%H:%M:%S' # Time only
def log_print(self, log_entry, timestamp=True):
""" Prints on screen formatted log entry
"""
ts = log_entry['log_timestamp']
timestamp_str = datetime.datetime.fromtimestamp(ts).strftime(
"[%s] " % self.TIMESTAMP_FORMAT) if timestamp else ''
log_line_str = "%(log_type)s: %(log_line)s" % (log_entry)
timestamp_str = datetime.datetime.fromtimestamp(ts).strftime("[%s] "% self.TIMESTAMP_FORMAT) if timestamp else ''
log_line_str = "%(log_type)s: %(log_line)s"% (log_entry)
return timestamp_str + log_line_str
def log_line(self, LogType, log_line, timestamp=True, line_delim='\n'):
@ -1691,7 +1678,7 @@ def detect_database_verbose(db_url):
if result is not None:
# Parsing passed
(db_type, username, password, host, db_name) = result
# print "DB type '%s', user name '%s', password '%s', host '%s', db name '%s'"% result
#print "DB type '%s', user name '%s', password '%s', host '%s', db name '%s'"% result
# Let's try to connect
db_ = factory_db_logger(db_url)
if db_ is not None:
@ -1715,12 +1702,11 @@ def get_module_avail(module_name):
"""
return module_name in sys.modules.keys()
def get_autodetected_MUTS_list(platform_name_filter=None):
oldError = None
if os.name == 'nt':
# Disable Windows error box temporarily
oldError = ctypes.windll.kernel32.SetErrorMode(1) # note that SEM_FAILCRITICALERRORS = 1
oldError = ctypes.windll.kernel32.SetErrorMode(1) #note that SEM_FAILCRITICALERRORS = 1
mbeds = mbed_lstools.create()
detect_muts_list = mbeds.list_mbeds()
@ -1730,7 +1716,6 @@ def get_autodetected_MUTS_list(platform_name_filter=None):
return get_autodetected_MUTS(detect_muts_list, platform_name_filter=platform_name_filter)
def get_autodetected_MUTS(mbeds_list, platform_name_filter=None):
""" Function detects all connected to host mbed-enabled devices and generates artificial MUTS file.
If function fails to auto-detect devices it will return empty dictionary.
@ -1742,7 +1727,7 @@ def get_autodetected_MUTS(mbeds_list, platform_name_filter=None):
@param mbeds_list list of mbeds captured from mbed_lstools
@param platform_name You can filter 'platform_name' with list of filtered targets from 'platform_name_filter'
"""
result = {} # Should be in muts_all.json format
result = {} # Should be in muts_all.json format
# Align mbeds_list from mbed_lstools to MUT file format (JSON dictionary with muts)
# mbeds_list = [{'platform_name': 'NUCLEO_F302R8', 'mount_point': 'E:', 'target_id': '07050200623B61125D5EF72A', 'serial_port': u'COM34'}]
index = 1
@ -1755,11 +1740,10 @@ def get_autodetected_MUTS(mbeds_list, platform_name_filter=None):
# For mcu_unique - we are assigning 'platform_name_unique' value from mbedls output (if its existing)
# if not we are creating our own unique value (last few chars from platform's target_id).
m = {'mcu': mut['platform_name'],
'mcu_unique': mut['platform_name_unique'] if 'platform_name_unique' in mut else "%s[%s]" % (
mut['platform_name'], mut['target_id'][-4:]),
'mcu_unique' : mut['platform_name_unique'] if 'platform_name_unique' in mut else "%s[%s]" % (mut['platform_name'], mut['target_id'][-4:]),
'port': mut['serial_port'],
'disk': mut['mount_point'],
'peripherals': [] # No peripheral detection
'peripherals': [] # No peripheral detection
}
if index not in result:
result[index] = {}
@ -1780,7 +1764,7 @@ def get_autodetected_TEST_SPEC(mbeds_list,
use_supported_toolchains - if True add all supported toolchains to test_spec
toolchain_filter - if [...list of toolchains...] add from all toolchains only those in filter to test_spec
"""
result = {'targets': {}}
result = {'targets': {} }
for mut in mbeds_list:
mcu = mut['mcu']
@ -1838,7 +1822,7 @@ def get_default_test_options_parser():
toolchain_list = list(TOOLCHAINS) + ["DEFAULT", "ALL"]
parser.add_argument('--tc',
dest='toolchains_filter',
type=argparse_many(argparse_uppercase_type(toolchain_list, "toolchains")),
type=argparse_many(argparse_uppercase_type(toolchain_list, "toolchains")),
help="Toolchain filter for --auto argument. Use toolchains names separated by comma, 'default' or 'all' to select toolchains")
test_scopes = ','.join(["'%s'" % n for n in get_available_oper_test_scopes()])
@ -1874,9 +1858,9 @@ def get_default_test_options_parser():
help='Runs only test enumerated it this switch. Use comma to separate test case names')
parser.add_argument('-p', '--peripheral-by-names',
dest='peripheral_by_names',
type=argparse_many(str),
help='Forces discovery of particular peripherals. Use comma to separate peripheral names')
dest='peripheral_by_names',
type=argparse_many(str),
help='Forces discovery of particular peripherals. Use comma to separate peripheral names')
copy_methods = host_tests_plugins.get_plugin_caps('CopyMethod')
copy_methods_str = "Plugin support: " + ', '.join(copy_methods)
@ -1884,7 +1868,7 @@ def get_default_test_options_parser():
parser.add_argument('-c', '--copy-method',
dest='copy_method',
type=argparse_uppercase_type(copy_methods, "flash method"),
help="Select binary copy (flash) method. Default is Python's shutil.copy() method. %s" % copy_methods_str)
help="Select binary copy (flash) method. Default is Python's shutil.copy() method. %s"% copy_methods_str)
reset_methods = host_tests_plugins.get_plugin_caps('ResetMethod')
reset_methods_str = "Plugin support: " + ', '.join(reset_methods)
@ -1893,7 +1877,7 @@ def get_default_test_options_parser():
dest='mut_reset_type',
default=None,
type=argparse_uppercase_type(reset_methods, "reset method"),
help='Extra reset method used to reset MUT by host test script. %s' % reset_methods_str)
help='Extra reset method used to reset MUT by host test script. %s'% reset_methods_str)
parser.add_argument('-g', '--goanna-for-tests',
dest='goanna_for_tests',
@ -2057,20 +2041,18 @@ def get_default_test_options_parser():
help="Depth level for static memory report")
return parser
def test_path_to_name(path, base):
"""Change all slashes in a path into hyphens
This creates a unique cross-platform test name based on the path
This can eventually be overriden by a to-be-determined meta-data mechanism"""
name_parts = []
head, tail = os.path.split(relpath(path, base))
head, tail = os.path.split(relpath(path,base))
while (tail and tail != "."):
name_parts.insert(0, tail)
head, tail = os.path.split(head)
return "-".join(name_parts).lower()
def get_test_config(config_name, target_name):
"""Finds the path to a test configuration file
config_name: path to a custom configuration file OR mbed OS interface "ethernet, wifi_odin, etc"
@ -2169,7 +2151,6 @@ def print_tests(tests, format="list", sort=True):
print("Unknown format '%s'" % format)
sys.exit(1)
def norm_relative_path(path, start):
"""This function will create a normalized, relative path. It mimics the
python os.path.relpath function, but also normalizes a Windows-syle path
@ -2326,16 +2307,16 @@ def build_tests(tests, base_source_paths, build_path, target, toolchain_name,
# Set the overall result to a failure if a build failure occurred
if ('reason' in worker_result and
not worker_result['reason'] and
not isinstance(worker_result['reason'], NotSupportedException)):
not worker_result['reason'] and
not isinstance(worker_result['reason'], NotSupportedException)):
result = False
break
# Adding binary path to test build result
if ('result' in worker_result and
worker_result['result'] and
'bin_file' in worker_result):
worker_result['result'] and
'bin_file' in worker_result):
bin_file = norm_relative_path(worker_result['bin_file'], execution_directory)
test_key = 'test_apps' if 'test_apps-' in worker_result['kwargs']['project_id'] else 'tests'
@ -2382,6 +2363,4 @@ def build_tests(tests, base_source_paths, build_path, target, toolchain_name,
def test_spec_from_test_builds(test_builds):
return {
"builds": test_builds
}
}