diff --git a/tools/test_api.py b/tools/test_api.py index fd2b4c760a..2d5561d873 100644 --- a/tools/test_api.py +++ b/tools/test_api.py @@ -31,13 +31,11 @@ import datetime import threading import ctypes import functools -import subprocess from colorama import Fore, Back, Style from prettytable import PrettyTable from copy import copy, deepcopy from time import sleep, time - try: from Queue import Queue, Empty except ImportError: @@ -116,7 +114,6 @@ class ProcessObserver(Thread): class SingleTestExecutor(threading.Thread): """ Example: Single test class in separate thread usage """ - def __init__(self, single_test): self.single_test = single_test threading.Thread.__init__(self) @@ -137,7 +134,7 @@ class SingleTestExecutor(threading.Thread): # table shows text x toolchain test result matrix print(self.single_test.generate_test_summary_by_target( test_summary, shuffle_seed)) - print("Completed in %.2f sec" % (elapsed_time)) + print("Completed in %.2f sec"% (elapsed_time)) class SingleTestRunner(object): @@ -160,26 +157,26 @@ class SingleTestRunner(object): TEST_RESULT_NOT_SUPPORTED = "NOT_SUPPORTED" GLOBAL_LOOPS_COUNT = 1 # How many times each test should be repeated - TEST_LOOPS_LIST = [] # We redefine no.of loops per test_id - TEST_LOOPS_DICT = {} # TEST_LOOPS_LIST in dict format: { test_id : test_loop_count} + TEST_LOOPS_LIST = [] # We redefine no.of loops per test_id + TEST_LOOPS_DICT = {} # TEST_LOOPS_LIST in dict format: { test_id : test_loop_count} - muts = {} # MUTs descriptor (from external file) - test_spec = {} # Test specification (from external file) + muts = {} # MUTs descriptor (from external file) + test_spec = {} # Test specification (from external file) # mbed test suite -> SingleTestRunner - TEST_RESULT_MAPPING = {"success": TEST_RESULT_OK, - "failure": TEST_RESULT_FAIL, - "error": TEST_RESULT_ERROR, - "ioerr_copy": TEST_RESULT_IOERR_COPY, - "ioerr_disk": TEST_RESULT_IOERR_DISK, - "ioerr_serial": TEST_RESULT_IOERR_SERIAL, - "timeout": TEST_RESULT_TIMEOUT, - "no_image": TEST_RESULT_NO_IMAGE, - "end": TEST_RESULT_UNDEF, - "mbed_assert": TEST_RESULT_MBED_ASSERT, - "build_failed": TEST_RESULT_BUILD_FAILED, - "not_supproted": TEST_RESULT_NOT_SUPPORTED - } + TEST_RESULT_MAPPING = {"success" : TEST_RESULT_OK, + "failure" : TEST_RESULT_FAIL, + "error" : TEST_RESULT_ERROR, + "ioerr_copy" : TEST_RESULT_IOERR_COPY, + "ioerr_disk" : TEST_RESULT_IOERR_DISK, + "ioerr_serial" : TEST_RESULT_IOERR_SERIAL, + "timeout" : TEST_RESULT_TIMEOUT, + "no_image" : TEST_RESULT_NO_IMAGE, + "end" : TEST_RESULT_UNDEF, + "mbed_assert" : TEST_RESULT_MBED_ASSERT, + "build_failed" : TEST_RESULT_BUILD_FAILED, + "not_supproted" : TEST_RESULT_NOT_SUPPORTED + } def __init__(self, _global_loops_count=1, @@ -289,18 +286,17 @@ class SingleTestRunner(object): # Database related initializations self.db_logger = factory_db_logger(self.opts_db_url) - self.db_logger_build_id = None # Build ID (database index of build_id table) + self.db_logger_build_id = None # Build ID (database index of build_id table) # Let's connect to database to set up credentials and confirm database is ready if self.db_logger: - self.db_logger.connect_url(self.opts_db_url) # Save db access info inside db_logger object + self.db_logger.connect_url(self.opts_db_url) # Save db access info inside db_logger object if self.db_logger.is_connected(): # Get hostname and uname so we can use it as build description # when creating new build_id in external database (_hostname, _uname) = self.db_logger.get_hostname() _host_location = os.path.dirname(os.path.abspath(__file__)) build_id_type = None if self.opts_only_build_tests is None else self.db_logger.BUILD_ID_TYPE_BUILD_ONLY - self.db_logger_build_id = self.db_logger.get_next_build_id(_hostname, desc=_uname, - location=_host_location, type=build_id_type) + self.db_logger_build_id = self.db_logger.get_next_build_id(_hostname, desc=_uname, location=_host_location, type=build_id_type) self.db_logger.disconnect() def dump_options(self): @@ -311,23 +307,23 @@ class SingleTestRunner(object): or data_str = json.dumps(self.dump_options()) """ - result = {"db_url": str(self.opts_db_url), - "log_file_name": str(self.opts_log_file_name), - "shuffle_test_order": str(self.opts_shuffle_test_order), - "shuffle_test_seed": str(self.opts_shuffle_test_seed), - "test_by_names": str(self.opts_test_by_names), - "peripheral_by_names": str(self.opts_peripheral_by_names), - "test_only_peripheral": str(self.opts_test_only_peripheral), - "test_only_common": str(self.opts_test_only_common), - "verbose": str(self.opts_verbose), - "firmware_global_name": str(self.opts_firmware_global_name), - "only_build_tests": str(self.opts_only_build_tests), - "copy_method": str(self.opts_copy_method), - "mut_reset_type": str(self.opts_mut_reset_type), - "jobs": str(self.opts_jobs), - "extend_test_timeout": str(self.opts_extend_test_timeout), - "_dummy": '' - } + result = {"db_url" : str(self.opts_db_url), + "log_file_name" : str(self.opts_log_file_name), + "shuffle_test_order" : str(self.opts_shuffle_test_order), + "shuffle_test_seed" : str(self.opts_shuffle_test_seed), + "test_by_names" : str(self.opts_test_by_names), + "peripheral_by_names" : str(self.opts_peripheral_by_names), + "test_only_peripheral" : str(self.opts_test_only_peripheral), + "test_only_common" : str(self.opts_test_only_common), + "verbose" : str(self.opts_verbose), + "firmware_global_name" : str(self.opts_firmware_global_name), + "only_build_tests" : str(self.opts_only_build_tests), + "copy_method" : str(self.opts_copy_method), + "mut_reset_type" : str(self.opts_mut_reset_type), + "jobs" : str(self.opts_jobs), + "extend_test_timeout" : str(self.opts_extend_test_timeout), + "_dummy" : '' + } return result def shuffle_random_func(self): @@ -369,6 +365,7 @@ class SingleTestRunner(object): 'shuffle_random_seed': self.shuffle_random_seed } + # print '=== %s::%s ===' % (target, toolchain) # Let's build our test if target not in TARGET_MAP: @@ -398,7 +395,7 @@ class SingleTestRunner(object): print(self.logger.log_line( self.logger.LogType.NOTIF, 'Skipped tests for %s target. Toolchain %s is not ' - 'supported for this target' % (T.name, toolchain))) + 'supported for this target'% (T.name, toolchain))) continue except ToolException: @@ -441,13 +438,13 @@ class SingleTestRunner(object): _extra=json.dumps(self.dump_options())) self.db_logger.disconnect(); - valid_test_map_keys = self.get_valid_tests(test_map_keys, target, toolchain, test_ids, - self.opts_include_non_automated) + valid_test_map_keys = self.get_valid_tests(test_map_keys, target, toolchain, test_ids, self.opts_include_non_automated) skipped_test_map_keys = self.get_skipped_tests(test_map_keys, valid_test_map_keys) for skipped_test_id in skipped_test_map_keys: test_suite_properties['skipped'].append(skipped_test_id) + # First pass through all tests and determine which libraries need to be built libraries = [] for test_id in valid_test_map_keys: @@ -459,6 +456,7 @@ class SingleTestRunner(object): if lib['build_dir'] in test.dependencies and lib['id'] not in libraries: libraries.append(lib['id']) + clean_project_options = True if self.opts_goanna_for_tests or clean or self.opts_clean else None # Build all required libraries @@ -480,10 +478,11 @@ class SingleTestRunner(object): 'There were errors while building library %s' % lib_id)) continue + for test_id in valid_test_map_keys: test = TEST_MAP[test_id] - test_suite_properties['test.libs.%s.%s.%s' % (target, toolchain, test_id)] = ', '.join(libraries) + test_suite_properties['test.libs.%s.%s.%s'% (target, toolchain, test_id)] = ', '.join(libraries) # TODO: move this 2 below loops to separate function INC_DIRS = [] @@ -495,18 +494,18 @@ class SingleTestRunner(object): for lib_id in libraries: if 'macros' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['macros']: MACROS.extend(LIBRARY_MAP[lib_id]['macros']) - MACROS.append('TEST_SUITE_TARGET_NAME="%s"' % target) - MACROS.append('TEST_SUITE_TEST_ID="%s"' % test_id) + MACROS.append('TEST_SUITE_TARGET_NAME="%s"'% target) + MACROS.append('TEST_SUITE_TEST_ID="%s"'% test_id) test_uuid = uuid.uuid4() - MACROS.append('TEST_SUITE_UUID="%s"' % str(test_uuid)) + MACROS.append('TEST_SUITE_UUID="%s"'% str(test_uuid)) # Prepare extended test results data structure (it can be used to generate detailed test report) if target not in self.test_summary_ext: self.test_summary_ext[target] = {} # test_summary_ext : toolchain if toolchain not in self.test_summary_ext[target]: - self.test_summary_ext[target][toolchain] = {} # test_summary_ext : toolchain : target + self.test_summary_ext[target][toolchain] = {} # test_summary_ext : toolchain : target - tt_test_id = "%s::%s::%s" % (toolchain, target, test_id) # For logging only + tt_test_id = "%s::%s::%s" % (toolchain, target, test_id) # For logging only project_name = self.opts_firmware_global_name if self.opts_firmware_global_name else None try: @@ -524,6 +523,7 @@ class SingleTestRunner(object): except Exception as e: project_name_str = project_name if project_name is not None else test_id + test_result = self.TEST_RESULT_FAIL if isinstance(e, ToolException): @@ -538,6 +538,7 @@ class SingleTestRunner(object): 'Project %s is not supported' % project_name_str)) test_result = self.TEST_RESULT_NOT_SUPPORTED + # Append test results to global test summary self.test_summary.append( (test_result, target, toolchain, test_id, @@ -548,17 +549,17 @@ class SingleTestRunner(object): if test_id not in self.test_summary_ext[target][toolchain]: self.test_summary_ext[target][toolchain][test_id] = [] - self.test_summary_ext[target][toolchain][test_id].append({0: { - 'result': test_result, - 'output': '', - 'target_name': target, + self.test_summary_ext[target][toolchain][test_id].append({ 0: { + 'result' : test_result, + 'output' : '', + 'target_name' : target, 'target_name_unique': target, - 'toolchain_name': toolchain, - 'id': test_id, - 'description': test.get_description(), - 'elapsed_time': 0, - 'duration': 0, - 'copy_method': None + 'toolchain_name' : toolchain, + 'id' : test_id, + 'description' : test.get_description(), + 'elapsed_time' : 0, + 'duration' : 0, + 'copy_method' : None }}) continue @@ -576,9 +577,9 @@ class SingleTestRunner(object): test_spec = self.shape_test_request(target, path, test_id, test_duration) test_loops = self.get_test_loop_count(test_id) - test_suite_properties['test.duration.%s.%s.%s' % (target, toolchain, test_id)] = test_duration - test_suite_properties['test.loops.%s.%s.%s' % (target, toolchain, test_id)] = test_loops - test_suite_properties['test.path.%s.%s.%s' % (target, toolchain, test_id)] = path + test_suite_properties['test.duration.%s.%s.%s'% (target, toolchain, test_id)] = test_duration + test_suite_properties['test.loops.%s.%s.%s'% (target, toolchain, test_id)] = test_loops + test_suite_properties['test.path.%s.%s.%s'% (target, toolchain, test_id)] = path # read MUTs, test specification and perform tests handle_results = self.handle(test_spec, target, toolchain, test_loops=test_loops) @@ -626,11 +627,12 @@ class SingleTestRunner(object): if self.opts_shuffle_test_seed is not None and self.is_shuffle_seed_float(): self.shuffle_random_seed = round(float(self.opts_shuffle_test_seed), self.SHUFFLE_SEED_ROUND) + if self.opts_parallel_test_exec: ################################################################### # Experimental, parallel test execution per singletest instance. ################################################################### - execute_threads = [] # Threads used to build mbed SDL, libs, test cases and execute tests + execute_threads = [] # Threads used to build mbed SDL, libs, test cases and execute tests # Note: We are building here in parallel for each target separately! # So we are not building the same thing multiple times and compilers # in separate threads do not collide. @@ -638,29 +640,26 @@ class SingleTestRunner(object): # get information about available MUTs (per target). for target, toolchains in self.test_spec['targets'].items(): self.test_suite_properties_ext[target] = {} - t = threading.Thread(target=self.execute_thread_slice, args=( - q, target, toolchains, clean, test_ids, self.build_report, self.build_properties)) + t = threading.Thread(target=self.execute_thread_slice, args = (q, target, toolchains, clean, test_ids, self.build_report, self.build_properties)) t.daemon = True t.start() execute_threads.append(t) for t in execute_threads: - q.get() # t.join() would block some threads because we should not wait in any order for thread end + q.get() # t.join() would block some threads because we should not wait in any order for thread end else: # Serialized (not parallel) test execution for target, toolchains in self.test_spec['targets'].items(): if target not in self.test_suite_properties_ext: self.test_suite_properties_ext[target] = {} - self.execute_thread_slice(q, target, toolchains, clean, test_ids, self.build_report, - self.build_properties) + self.execute_thread_slice(q, target, toolchains, clean, test_ids, self.build_report, self.build_properties) q.get() if self.db_logger: self.db_logger.reconnect(); if self.db_logger.is_connected(): - self.db_logger.update_build_id_info(self.db_logger_build_id, - _status_fk=self.db_logger.BUILD_ID_STATUS_COMPLETED) + self.db_logger.update_build_id_info(self.db_logger_build_id, _status_fk=self.db_logger.BUILD_ID_STATUS_COMPLETED) self.db_logger.disconnect(); return self.test_summary, self.shuffle_random_seed, self.test_summary_ext, self.test_suite_properties_ext, self.build_report, self.build_properties @@ -684,8 +683,8 @@ class SingleTestRunner(object): continue if (self.opts_peripheral_by_names and test.peripherals and - not any((i in self.opts_peripheral_by_names) - for i in test.peripherals)): + not any((i in self.opts_peripheral_by_names) + for i in test.peripherals)): # We will skip tests not forced with -p option if self.opts_verbose_skipped_tests: print(self.logger.log_line( @@ -756,7 +755,7 @@ class SingleTestRunner(object): result = "Test summary:\n" for target in unique_targets: - result_dict = {} # test : { toolchain : result } + result_dict = {} # test : { toolchain : result } unique_target_toolchains = [] for test in test_summary: if test[TARGET_INDEX] == target: @@ -770,7 +769,7 @@ class SingleTestRunner(object): pt = PrettyTable(pt_cols) for col in pt_cols: pt.align[col] = "l" - pt.padding_width = 1 # One space between column edges and contents (default) + pt.padding_width = 1 # One space between column edges and contents (default) for test in unique_tests: if test in result_dict: @@ -782,40 +781,40 @@ class SingleTestRunner(object): row.append(test_results[toolchain]) pt.add_row(row) result += pt.get_string() - shuffle_seed_text = "Shuffle Seed: %.*f" % (self.SHUFFLE_SEED_ROUND, - shuffle_seed if shuffle_seed else self.shuffle_random_seed) - result += "\n%s" % (shuffle_seed_text if self.opts_shuffle_test_order else '') + shuffle_seed_text = "Shuffle Seed: %.*f"% (self.SHUFFLE_SEED_ROUND, + shuffle_seed if shuffle_seed else self.shuffle_random_seed) + result += "\n%s"% (shuffle_seed_text if self.opts_shuffle_test_order else '') return result def generate_test_summary(self, test_summary, shuffle_seed=None): """ Prints well-formed summary with results (SQL table like) table shows target x test results matrix across """ - success_code = 0 # Success code that can be leter returned to + success_code = 0 # Success code that can be leter returned to result = "Test summary:\n" # Pretty table package is used to print results pt = PrettyTable(["Result", "Target", "Toolchain", "Test ID", "Test Description", "Elapsed Time (sec)", "Timeout (sec)", "Loops"]) - pt.align["Result"] = "l" # Left align - pt.align["Target"] = "l" # Left align - pt.align["Toolchain"] = "l" # Left align - pt.align["Test ID"] = "l" # Left align - pt.align["Test Description"] = "l" # Left align - pt.padding_width = 1 # One space between column edges and contents (default) + pt.align["Result"] = "l" # Left align + pt.align["Target"] = "l" # Left align + pt.align["Toolchain"] = "l" # Left align + pt.align["Test ID"] = "l" # Left align + pt.align["Test Description"] = "l" # Left align + pt.padding_width = 1 # One space between column edges and contents (default) - result_dict = {self.TEST_RESULT_OK: 0, - self.TEST_RESULT_FAIL: 0, - self.TEST_RESULT_ERROR: 0, - self.TEST_RESULT_UNDEF: 0, - self.TEST_RESULT_IOERR_COPY: 0, - self.TEST_RESULT_IOERR_DISK: 0, - self.TEST_RESULT_IOERR_SERIAL: 0, - self.TEST_RESULT_NO_IMAGE: 0, - self.TEST_RESULT_TIMEOUT: 0, - self.TEST_RESULT_MBED_ASSERT: 0, - self.TEST_RESULT_BUILD_FAILED: 0, - self.TEST_RESULT_NOT_SUPPORTED: 0 - } + result_dict = {self.TEST_RESULT_OK : 0, + self.TEST_RESULT_FAIL : 0, + self.TEST_RESULT_ERROR : 0, + self.TEST_RESULT_UNDEF : 0, + self.TEST_RESULT_IOERR_COPY : 0, + self.TEST_RESULT_IOERR_DISK : 0, + self.TEST_RESULT_IOERR_SERIAL : 0, + self.TEST_RESULT_NO_IMAGE : 0, + self.TEST_RESULT_TIMEOUT : 0, + self.TEST_RESULT_MBED_ASSERT : 0, + self.TEST_RESULT_BUILD_FAILED : 0, + self.TEST_RESULT_NOT_SUPPORTED : 0 + } for test in test_summary: if test[0] in result_dict: @@ -825,11 +824,10 @@ class SingleTestRunner(object): result += "\n" # Print result count - result += "Result: " + ' / '.join( - ['%s %s' % (value, key) for (key, value) in {k: v for k, v in result_dict.items() if v != 0}.items()]) - shuffle_seed_text = "Shuffle Seed: %.*f\n" % (self.SHUFFLE_SEED_ROUND, - shuffle_seed if shuffle_seed else self.shuffle_random_seed) - result += "\n%s" % (shuffle_seed_text if self.opts_shuffle_test_order else '') + result += "Result: " + ' / '.join(['%s %s' % (value, key) for (key, value) in {k: v for k, v in result_dict.items() if v != 0}.items()]) + shuffle_seed_text = "Shuffle Seed: %.*f\n"% (self.SHUFFLE_SEED_ROUND, + shuffle_seed if shuffle_seed else self.shuffle_random_seed) + result += "\n%s"% (shuffle_seed_text if self.opts_shuffle_test_order else '') return result def test_loop_list_to_dict(self, test_loops_str): @@ -885,7 +883,7 @@ class SingleTestRunner(object): return None mcu = mut['mcu'] - copy_method = mut.get('copy_method') # Available board configuration selection e.g. core selection etc. + copy_method = mut.get('copy_method') # Available board configuration selection e.g. core selection etc. if self.db_logger: self.db_logger.reconnect() @@ -941,14 +939,13 @@ class SingleTestRunner(object): # Host test execution start_host_exec_time = time() - single_test_result = self.TEST_RESULT_UNDEF # single test run result + single_test_result = self.TEST_RESULT_UNDEF # single test run result _copy_method = selected_copy_method if not exists(image_path): single_test_result = self.TEST_RESULT_NO_IMAGE elapsed_time = 0 - single_test_output = self.logger.log_line(self.logger.LogType.ERROR, - 'Image file does not exist: %s' % image_path) + single_test_output = self.logger.log_line(self.logger.LogType.ERROR, 'Image file does not exist: %s'% image_path) print(single_test_output) else: # Host test execution @@ -968,20 +965,20 @@ class SingleTestRunner(object): # Store test result test_all_result.append(single_test_result) - total_elapsed_time = time() - start_host_exec_time # Test time with copy (flashing) / reset + total_elapsed_time = time() - start_host_exec_time # Test time with copy (flashing) / reset elapsed_time = single_testduration # TIme of single test case execution after reset detailed_test_results[test_index] = { - 'result': single_test_result, - 'output': single_test_output, - 'target_name': target_name, - 'target_name_unique': target_name_unique, - 'toolchain_name': toolchain_name, - 'id': test_id, - 'description': test_description, - 'elapsed_time': round(elapsed_time, 2), - 'duration': single_timeout, - 'copy_method': _copy_method, + 'result' : single_test_result, + 'output' : single_test_output, + 'target_name' : target_name, + 'target_name_unique' : target_name_unique, + 'toolchain_name' : toolchain_name, + 'id' : test_id, + 'description' : test_description, + 'elapsed_time' : round(elapsed_time, 2), + 'duration' : single_timeout, + 'copy_method' : _copy_method, } print(self.print_test_result( @@ -1009,8 +1006,7 @@ class SingleTestRunner(object): if self.db_logger: self.db_logger.disconnect() - return (self.shape_global_test_loop_result(test_all_result, - self.opts_waterfall_test and self.opts_consolidate_waterfall_test), + return (self.shape_global_test_loop_result(test_all_result, self.opts_waterfall_test and self.opts_consolidate_waterfall_test), target_name_unique, toolchain_name, test_id, @@ -1048,7 +1044,7 @@ class SingleTestRunner(object): tokens.append(test_description) separator = "::" time_info = " in %.2f of %d sec" % (round(elapsed_time, 2), duration) - result = separator.join(tokens) + " [" + test_result + "]" + time_info + result = separator.join(tokens) + " [" + test_result +"]" + time_info return Fore.MAGENTA + result + Fore.RESET def shape_test_loop_ok_result_count(self, test_all_result): @@ -1056,7 +1052,7 @@ class SingleTestRunner(object): """ test_loop_count = len(test_all_result) test_loop_ok_result = test_all_result.count(self.TEST_RESULT_OK) - return "%d/%d" % (test_loop_ok_result, test_loop_count) + return "%d/%d"% (test_loop_ok_result, test_loop_count) def shape_global_test_loop_result(self, test_all_result, waterfall_and_consolidate): """ Reformats list of results to simple string @@ -1110,16 +1106,16 @@ class SingleTestRunner(object): Returns string """ result = None - if re.search("HOST: Property '%s'" % property_name, line) is not None: - property = re.search("HOST: Property '%s' = '([\w\d _]+)'" % property_name, line) + if re.search("HOST: Property '%s'"% property_name, line) is not None: + property = re.search("HOST: Property '%s' = '([\w\d _]+)'"% property_name, line) if property is not None and len(property.groups()) == 1: result = property.groups()[0] return result cmd = ["python", - '%s.py' % name, + '%s.py'% name, '-d', disk, - '-f', '"%s"' % image_path, + '-f', '"%s"'% image_path, '-p', port, '-t', str(duration), '-C', str(program_cycle_s)] @@ -1143,7 +1139,7 @@ class SingleTestRunner(object): proc = Popen(cmd, stdout=PIPE, cwd=HOST_TESTS) obs = ProcessObserver(proc) - update_once_flag = {} # Stores flags checking if some auto-parameter was already set + update_once_flag = {} # Stores flags checking if some auto-parameter was already set line = '' output = [] start_time = time() @@ -1182,7 +1178,7 @@ class SingleTestRunner(object): else: line += c end_time = time() - testcase_duration = end_time - start_time # Test case duration from reset to {end} + testcase_duration = end_time - start_time # Test case duration from reset to {end} c = get_char_from_queue(obs) @@ -1259,11 +1255,11 @@ def show_json_file_format_error(json_spec_filename, line, column): with open(json_spec_filename) as data_file: line_no = 1 for json_line in data_file: - if line_no + 5 >= line: # Print last few lines before error - print('Line %d:\t' % line_no + json_line) + if line_no + 5 >= line: # Print last few lines before error + print('Line %d:\t'%line_no + json_line) if line_no == line: - print('%s\t%s^'(' ' * len('Line %d:' % line_no), - '-' * (column - 1))) + print('%s\t%s^' (' ' * len('Line %d:' % line_no), + '-' * (column - 1))) break line_no += 1 @@ -1311,7 +1307,7 @@ def get_json_data_from_file(json_spec_filename, verbose=False): show_json_file_format_error(json_spec_filename, line, column) except IOError as fileopen_error_msg: - print('JSON file %s not opened. Reason: %s\n' % + print('JSON file %s not opened. Reason: %s\n'% (json_spec_filename, fileopen_error_msg)) if verbose and result: pp = pprint.PrettyPrinter(indent=4) @@ -1384,7 +1380,7 @@ def print_test_configuration_from_json(json_data, join_delim=", "): target_supported_toolchains = get_target_supported_toolchains(target) if not target_supported_toolchains: target_supported_toolchains = [] - target_name = target if target in TARGET_MAP else "%s*" % target + target_name = target if target in TARGET_MAP else "%s*"% target row = [target_name] toolchains = targets[target] @@ -1415,27 +1411,27 @@ def print_test_configuration_from_json(json_data, join_delim=", "): pt.add_row(row) # generate result string - result = pt.get_string() # Test specification table + result = pt.get_string() # Test specification table if toolchain_conflicts or toolchain_path_conflicts: result += "\n" result += "Toolchain conflicts:\n" for target in toolchain_conflicts: if target not in TARGET_MAP: - result += "\t* Target %s unknown\n" % (target) + result += "\t* Target %s unknown\n"% (target) conflict_target_list = join_delim.join(toolchain_conflicts[target]) sufix = 's' if len(toolchain_conflicts[target]) > 1 else '' - result += "\t* Target %s does not support %s toolchain%s\n" % (target, conflict_target_list, sufix) + result += "\t* Target %s does not support %s toolchain%s\n"% (target, conflict_target_list, sufix) for toolchain in toolchain_path_conflicts: - # Let's check toolchain configuration + # Let's check toolchain configuration if toolchain in TOOLCHAIN_PATHS: toolchain_path = TOOLCHAIN_PATHS[toolchain] if not os.path.isdir(toolchain_path): - result += "\t# Toolchain %s path not found: %s\n" % (toolchain, toolchain_path) + result += "\t# Toolchain %s path not found: %s\n"% (toolchain, toolchain_path) return result -def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=',', platform_filter=None): +def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=',',platform_filter=None): """ Generates table summary with all test cases and additional test cases information using pretty print functionality. Allows test suite user to see test cases @@ -1466,7 +1462,7 @@ def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=',' counter_all = 0 counter_automated = 0 - pt.padding_width = 1 # One space between column edges and contents (default) + pt.padding_width = 1 # One space between column edges and contents (default) for test_id in sorted(TEST_MAP.keys()): if platform_filter is not None: @@ -1520,8 +1516,7 @@ def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=',' pt.align['percent [%]'] = "r" for unique_id in unique_test_id: # print "\t\t%s: %d / %d" % (unique_id, counter_dict_test_id_types[unique_id], counter_dict_test_id_types_all[unique_id]) - percent_progress = round( - 100.0 * counter_dict_test_id_types[unique_id] / float(counter_dict_test_id_types_all[unique_id]), 1) + percent_progress = round(100.0 * counter_dict_test_id_types[unique_id] / float(counter_dict_test_id_types_all[unique_id]), 1) str_progress = progress_bar(percent_progress, 75) row = [unique_id, counter_dict_test_id_types[unique_id], @@ -1538,7 +1533,7 @@ def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=',' def progress_bar(percent_progress, saturation=0): """ This function creates progress bar with optional simple saturation mark """ - step = int(percent_progress / 2) # Scale by to (scale: 1 - 50) + step = int(percent_progress / 2) # Scale by to (scale: 1 - 50) str_progress = '#' * step + '.' * int(50 - step) c = '!' if str_progress[38] == '.' else '|' if saturation > 0: @@ -1578,32 +1573,26 @@ def singletest_in_cli_mode(single_test): if single_test.opts_report_html_file_name: # Export results in form of HTML report to separate file report_exporter = ReportExporter(ResultExporterType.HTML) - report_exporter.report_to_file(test_summary_ext, single_test.opts_report_html_file_name, - test_suite_properties=test_suite_properties_ext) + report_exporter.report_to_file(test_summary_ext, single_test.opts_report_html_file_name, test_suite_properties=test_suite_properties_ext) if single_test.opts_report_junit_file_name: # Export results in form of JUnit XML report to separate file report_exporter = ReportExporter(ResultExporterType.JUNIT) - report_exporter.report_to_file(test_summary_ext, single_test.opts_report_junit_file_name, - test_suite_properties=test_suite_properties_ext) + report_exporter.report_to_file(test_summary_ext, single_test.opts_report_junit_file_name, test_suite_properties=test_suite_properties_ext) if single_test.opts_report_text_file_name: # Export results in form of a text file report_exporter = ReportExporter(ResultExporterType.TEXT) - report_exporter.report_to_file(test_summary_ext, single_test.opts_report_text_file_name, - test_suite_properties=test_suite_properties_ext) + report_exporter.report_to_file(test_summary_ext, single_test.opts_report_text_file_name, test_suite_properties=test_suite_properties_ext) if single_test.opts_report_build_file_name: # Export build results as html report to sparate file report_exporter = ReportExporter(ResultExporterType.JUNIT, package="build") - report_exporter.report_to_file(build_report, single_test.opts_report_build_file_name, - test_suite_properties=build_properties) + report_exporter.report_to_file(build_report, single_test.opts_report_build_file_name, test_suite_properties=build_properties) # Returns True if no build failures of the test projects or their dependencies return status - class TestLogger(): """ Super-class for logging and printing ongoing events for test suite pass """ - def __init__(self, store_log=True): """ We can control if logger actually stores log in memory or just handled all log entries immediately @@ -1619,18 +1608,18 @@ class TestLogger(): ERROR='Error', EXCEPT='Exception') - self.LogToFileAttr = construct_enum(CREATE=1, # Create or overwrite existing log file - APPEND=2) # Append to existing log file + self.LogToFileAttr = construct_enum(CREATE=1, # Create or overwrite existing log file + APPEND=2) # Append to existing log file def log_line(self, LogType, log_line, timestamp=True, line_delim='\n'): """ Log one line of text """ log_timestamp = time() - log_entry = {'log_type': LogType, - 'log_timestamp': log_timestamp, - 'log_line': log_line, - '_future': None - } + log_entry = {'log_type' : LogType, + 'log_timestamp' : log_timestamp, + 'log_line' : log_line, + '_future' : None + } # Store log in memory if self.store_log: self.log.append(log_entry) @@ -1640,20 +1629,18 @@ class TestLogger(): class CLITestLogger(TestLogger): """ Logger used with CLI (Command line interface) test suite. Logs on screen and to file if needed """ - def __init__(self, store_log=True, file_name=None): TestLogger.__init__(self) self.log_file_name = file_name - # self.TIMESTAMP_FORMAT = '%y-%m-%d %H:%M:%S' # Full date and time - self.TIMESTAMP_FORMAT = '%H:%M:%S' # Time only + #self.TIMESTAMP_FORMAT = '%y-%m-%d %H:%M:%S' # Full date and time + self.TIMESTAMP_FORMAT = '%H:%M:%S' # Time only def log_print(self, log_entry, timestamp=True): """ Prints on screen formatted log entry """ ts = log_entry['log_timestamp'] - timestamp_str = datetime.datetime.fromtimestamp(ts).strftime( - "[%s] " % self.TIMESTAMP_FORMAT) if timestamp else '' - log_line_str = "%(log_type)s: %(log_line)s" % (log_entry) + timestamp_str = datetime.datetime.fromtimestamp(ts).strftime("[%s] "% self.TIMESTAMP_FORMAT) if timestamp else '' + log_line_str = "%(log_type)s: %(log_line)s"% (log_entry) return timestamp_str + log_line_str def log_line(self, LogType, log_line, timestamp=True, line_delim='\n'): @@ -1691,7 +1678,7 @@ def detect_database_verbose(db_url): if result is not None: # Parsing passed (db_type, username, password, host, db_name) = result - # print "DB type '%s', user name '%s', password '%s', host '%s', db name '%s'"% result + #print "DB type '%s', user name '%s', password '%s', host '%s', db name '%s'"% result # Let's try to connect db_ = factory_db_logger(db_url) if db_ is not None: @@ -1715,12 +1702,11 @@ def get_module_avail(module_name): """ return module_name in sys.modules.keys() - def get_autodetected_MUTS_list(platform_name_filter=None): oldError = None if os.name == 'nt': # Disable Windows error box temporarily - oldError = ctypes.windll.kernel32.SetErrorMode(1) # note that SEM_FAILCRITICALERRORS = 1 + oldError = ctypes.windll.kernel32.SetErrorMode(1) #note that SEM_FAILCRITICALERRORS = 1 mbeds = mbed_lstools.create() detect_muts_list = mbeds.list_mbeds() @@ -1730,7 +1716,6 @@ def get_autodetected_MUTS_list(platform_name_filter=None): return get_autodetected_MUTS(detect_muts_list, platform_name_filter=platform_name_filter) - def get_autodetected_MUTS(mbeds_list, platform_name_filter=None): """ Function detects all connected to host mbed-enabled devices and generates artificial MUTS file. If function fails to auto-detect devices it will return empty dictionary. @@ -1742,7 +1727,7 @@ def get_autodetected_MUTS(mbeds_list, platform_name_filter=None): @param mbeds_list list of mbeds captured from mbed_lstools @param platform_name You can filter 'platform_name' with list of filtered targets from 'platform_name_filter' """ - result = {} # Should be in muts_all.json format + result = {} # Should be in muts_all.json format # Align mbeds_list from mbed_lstools to MUT file format (JSON dictionary with muts) # mbeds_list = [{'platform_name': 'NUCLEO_F302R8', 'mount_point': 'E:', 'target_id': '07050200623B61125D5EF72A', 'serial_port': u'COM34'}] index = 1 @@ -1755,11 +1740,10 @@ def get_autodetected_MUTS(mbeds_list, platform_name_filter=None): # For mcu_unique - we are assigning 'platform_name_unique' value from mbedls output (if its existing) # if not we are creating our own unique value (last few chars from platform's target_id). m = {'mcu': mut['platform_name'], - 'mcu_unique': mut['platform_name_unique'] if 'platform_name_unique' in mut else "%s[%s]" % ( - mut['platform_name'], mut['target_id'][-4:]), + 'mcu_unique' : mut['platform_name_unique'] if 'platform_name_unique' in mut else "%s[%s]" % (mut['platform_name'], mut['target_id'][-4:]), 'port': mut['serial_port'], 'disk': mut['mount_point'], - 'peripherals': [] # No peripheral detection + 'peripherals': [] # No peripheral detection } if index not in result: result[index] = {} @@ -1780,7 +1764,7 @@ def get_autodetected_TEST_SPEC(mbeds_list, use_supported_toolchains - if True add all supported toolchains to test_spec toolchain_filter - if [...list of toolchains...] add from all toolchains only those in filter to test_spec """ - result = {'targets': {}} + result = {'targets': {} } for mut in mbeds_list: mcu = mut['mcu'] @@ -1838,7 +1822,7 @@ def get_default_test_options_parser(): toolchain_list = list(TOOLCHAINS) + ["DEFAULT", "ALL"] parser.add_argument('--tc', dest='toolchains_filter', - type=argparse_many(argparse_uppercase_type(toolchain_list, "toolchains")), + type=argparse_many(argparse_uppercase_type(toolchain_list, "toolchains")), help="Toolchain filter for --auto argument. Use toolchains names separated by comma, 'default' or 'all' to select toolchains") test_scopes = ','.join(["'%s'" % n for n in get_available_oper_test_scopes()]) @@ -1874,9 +1858,9 @@ def get_default_test_options_parser(): help='Runs only test enumerated it this switch. Use comma to separate test case names') parser.add_argument('-p', '--peripheral-by-names', - dest='peripheral_by_names', - type=argparse_many(str), - help='Forces discovery of particular peripherals. Use comma to separate peripheral names') + dest='peripheral_by_names', + type=argparse_many(str), + help='Forces discovery of particular peripherals. Use comma to separate peripheral names') copy_methods = host_tests_plugins.get_plugin_caps('CopyMethod') copy_methods_str = "Plugin support: " + ', '.join(copy_methods) @@ -1884,7 +1868,7 @@ def get_default_test_options_parser(): parser.add_argument('-c', '--copy-method', dest='copy_method', type=argparse_uppercase_type(copy_methods, "flash method"), - help="Select binary copy (flash) method. Default is Python's shutil.copy() method. %s" % copy_methods_str) + help="Select binary copy (flash) method. Default is Python's shutil.copy() method. %s"% copy_methods_str) reset_methods = host_tests_plugins.get_plugin_caps('ResetMethod') reset_methods_str = "Plugin support: " + ', '.join(reset_methods) @@ -1893,7 +1877,7 @@ def get_default_test_options_parser(): dest='mut_reset_type', default=None, type=argparse_uppercase_type(reset_methods, "reset method"), - help='Extra reset method used to reset MUT by host test script. %s' % reset_methods_str) + help='Extra reset method used to reset MUT by host test script. %s'% reset_methods_str) parser.add_argument('-g', '--goanna-for-tests', dest='goanna_for_tests', @@ -2057,20 +2041,18 @@ def get_default_test_options_parser(): help="Depth level for static memory report") return parser - def test_path_to_name(path, base): """Change all slashes in a path into hyphens This creates a unique cross-platform test name based on the path This can eventually be overriden by a to-be-determined meta-data mechanism""" name_parts = [] - head, tail = os.path.split(relpath(path, base)) + head, tail = os.path.split(relpath(path,base)) while (tail and tail != "."): name_parts.insert(0, tail) head, tail = os.path.split(head) return "-".join(name_parts).lower() - def get_test_config(config_name, target_name): """Finds the path to a test configuration file config_name: path to a custom configuration file OR mbed OS interface "ethernet, wifi_odin, etc" @@ -2169,7 +2151,6 @@ def print_tests(tests, format="list", sort=True): print("Unknown format '%s'" % format) sys.exit(1) - def norm_relative_path(path, start): """This function will create a normalized, relative path. It mimics the python os.path.relpath function, but also normalizes a Windows-syle path @@ -2326,16 +2307,16 @@ def build_tests(tests, base_source_paths, build_path, target, toolchain_name, # Set the overall result to a failure if a build failure occurred if ('reason' in worker_result and - not worker_result['reason'] and - not isinstance(worker_result['reason'], NotSupportedException)): + not worker_result['reason'] and + not isinstance(worker_result['reason'], NotSupportedException)): result = False break # Adding binary path to test build result if ('result' in worker_result and - worker_result['result'] and - 'bin_file' in worker_result): + worker_result['result'] and + 'bin_file' in worker_result): bin_file = norm_relative_path(worker_result['bin_file'], execution_directory) test_key = 'test_apps' if 'test_apps-' in worker_result['kwargs']['project_id'] else 'tests' @@ -2382,6 +2363,4 @@ def build_tests(tests, base_source_paths, build_path, target, toolchain_name, def test_spec_from_test_builds(test_builds): return { "builds": test_builds - } - - + } \ No newline at end of file