Merge pull request #966 from PrzemekWirkus/devel_parallel_exec_2

Experimental parallel test execution on MUTs (option --parallel), v2
pull/965/head^2
Martin Kojtal 2015-03-12 10:35:07 +00:00
commit db8ec90de6
2 changed files with 259 additions and 214 deletions

View File

@ -221,6 +221,7 @@ if __name__ == '__main__':
_opts_verbose=opts.verbose,
_opts_firmware_global_name=opts.firmware_global_name,
_opts_only_build_tests=opts.only_build_tests,
_opts_parallel_test_exec=opts.parallel_test_exec,
_opts_suppress_summary=opts.suppress_summary,
_opts_test_x_toolchain_summary=opts.test_x_toolchain_summary,
_opts_copy_method=opts.copy_method,

View File

@ -34,7 +34,7 @@ from prettytable import PrettyTable
from time import sleep, time
from Queue import Queue, Empty
from os.path import join, exists, basename
from threading import Thread
from threading import Thread, Lock
from subprocess import Popen, PIPE
# Imports related to mbed build api
@ -167,6 +167,7 @@ class SingleTestRunner(object):
_opts_verbose=False,
_opts_firmware_global_name=None,
_opts_only_build_tests=False,
_opts_parallel_test_exec=False,
_opts_suppress_summary=False,
_opts_test_x_toolchain_summary=False,
_opts_copy_method=None,
@ -217,6 +218,7 @@ class SingleTestRunner(object):
self.opts_verbose = _opts_verbose
self.opts_firmware_global_name = _opts_firmware_global_name
self.opts_only_build_tests = _opts_only_build_tests
self.opts_parallel_test_exec = _opts_parallel_test_exec
self.opts_suppress_summary = _opts_suppress_summary
self.opts_test_x_toolchain_summary = _opts_test_x_toolchain_summary
self.opts_copy_method = _opts_copy_method
@ -257,7 +259,7 @@ class SingleTestRunner(object):
"shuffle_test_order" : str(self.opts_shuffle_test_order),
"shuffle_test_seed" : str(self.opts_shuffle_test_seed),
"test_by_names" : str(self.opts_test_by_names),
"peripheral_by_names" : str(self.opts_peripheral_by_names),
"peripheral_by_names" : str(self.opts_peripheral_by_names),
"test_only_peripheral" : str(self.opts_test_only_peripheral),
"test_only_common" : str(self.opts_test_only_common),
"verbose" : str(self.opts_verbose),
@ -284,225 +286,260 @@ class SingleTestRunner(object):
result = False
return result
# This will store target / toolchain specific properties
test_suite_properties_ext = {} # target : toolchain
# Here we store test results
test_summary = []
# Here we store test results in extended data structure
test_summary_ext = {}
execute_thread_slice_lock = Lock()
def execute_thread_slice(self, q, target, toolchains, clean, test_ids):
for toolchain in toolchains:
# print target, toolchain
# Test suite properties returned to external tools like CI
test_suite_properties = {}
test_suite_properties['jobs'] = self.opts_jobs
test_suite_properties['clean'] = clean
test_suite_properties['target'] = target
test_suite_properties['test_ids'] = ', '.join(test_ids)
test_suite_properties['toolchain'] = toolchain
test_suite_properties['shuffle_random_seed'] = self.shuffle_random_seed
# print '=== %s::%s ===' % (target, toolchain)
# Let's build our test
if target not in TARGET_MAP:
print self.logger.log_line(self.logger.LogType.NOTIF, 'Skipped tests for %s target. Target platform not found'% (target))
continue
T = TARGET_MAP[target]
build_mbed_libs_options = ["analyze"] if self.opts_goanna_for_mbed_sdk else None
clean_mbed_libs_options = True if self.opts_goanna_for_mbed_sdk or clean or self.opts_clean else None
try:
build_mbed_libs_result = build_mbed_libs(T,
toolchain,
options=build_mbed_libs_options,
clean=clean_mbed_libs_options,
jobs=self.opts_jobs)
if not build_mbed_libs_result:
print self.logger.log_line(self.logger.LogType.NOTIF, 'Skipped tests for %s target. Toolchain %s is not yet supported for this target'% (T.name, toolchain))
continue
except ToolException:
print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building MBED libs for %s using %s'% (target, toolchain))
#return self.test_summary, self.shuffle_random_seed, self.test_summary_ext, self.test_suite_properties_ext
q.put(target + '_'.join(toolchains))
return
build_dir = join(BUILD_DIR, "test", target, toolchain)
test_suite_properties['build_mbed_libs_result'] = build_mbed_libs_result
test_suite_properties['build_dir'] = build_dir
test_suite_properties['skipped'] = []
# Enumerate through all tests and shuffle test order if requested
test_map_keys = sorted(TEST_MAP.keys())
if self.opts_shuffle_test_order:
random.shuffle(test_map_keys, self.shuffle_random_func)
# Update database with shuffle seed f applicable
if self.db_logger:
self.db_logger.reconnect();
if self.db_logger.is_connected():
self.db_logger.update_build_id_info(self.db_logger_build_id, _shuffle_seed=self.shuffle_random_func())
self.db_logger.disconnect();
if self.db_logger:
self.db_logger.reconnect();
if self.db_logger.is_connected():
# Update MUTs and Test Specification in database
self.db_logger.update_build_id_info(self.db_logger_build_id, _muts=self.muts, _test_spec=self.test_spec)
# Update Extra information in database (some options passed to test suite)
self.db_logger.update_build_id_info(self.db_logger_build_id, _extra=json.dumps(self.dump_options()))
self.db_logger.disconnect();
for test_id in test_map_keys:
test = TEST_MAP[test_id]
if self.opts_test_by_names and test_id not in self.opts_test_by_names.split(','):
continue
if test_ids and test_id not in test_ids:
continue
if self.opts_test_only_peripheral and not test.peripherals:
if self.opts_verbose_skipped_tests:
print self.logger.log_line(self.logger.LogType.INFO, 'Common test skipped for target %s'% (target))
test_suite_properties['skipped'].append(test_id)
continue
if self.opts_peripheral_by_names and test.peripherals and not len([i for i in test.peripherals if i in self.opts_peripheral_by_names.split(',')]):
# We will skip tests not forced with -p option
if self.opts_verbose_skipped_tests:
print self.logger.log_line(self.logger.LogType.INFO, 'Common test skipped for target %s'% (target))
test_suite_properties['skipped'].append(test_id)
continue
if self.opts_test_only_common and test.peripherals:
if self.opts_verbose_skipped_tests:
print self.logger.log_line(self.logger.LogType.INFO, 'Peripheral test skipped for target %s'% (target))
test_suite_properties['skipped'].append(test_id)
continue
if test.automated and test.is_supported(target, toolchain):
if test.peripherals is None and self.opts_only_build_tests:
# When users are using 'build only flag' and test do not have
# specified peripherals we can allow test building by default
pass
elif self.opts_peripheral_by_names and test_id not in self.opts_peripheral_by_names.split(','):
# If we force peripheral with option -p we expect test
# to pass even if peripheral is not in MUTs file.
pass
elif not self.is_peripherals_available(target, test.peripherals):
if self.opts_verbose_skipped_tests:
if test.peripherals:
print self.logger.log_line(self.logger.LogType.INFO, 'Peripheral %s test skipped for target %s'% (",".join(test.peripherals), target))
else:
print self.logger.log_line(self.logger.LogType.INFO, 'Test %s skipped for target %s'% (test_id, target))
test_suite_properties['skipped'].append(test_id)
continue
build_project_options = ["analyze"] if self.opts_goanna_for_tests else None
clean_project_options = True if self.opts_goanna_for_tests or clean or self.opts_clean else None
# Detect which lib should be added to test
# Some libs have to compiled like RTOS or ETH
libraries = []
for lib in LIBRARIES:
if lib['build_dir'] in test.dependencies:
libraries.append(lib['id'])
# Build libs for test
for lib_id in libraries:
try:
build_lib(lib_id,
T,
toolchain,
options=build_project_options,
verbose=self.opts_verbose,
clean=clean_mbed_libs_options,
jobs=self.opts_jobs)
except ToolException:
print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building library %s'% (lib_id))
#return self.test_summary, self.shuffle_random_seed, self.test_summary_ext, self.test_suite_properties_ext
q.put(target + '_'.join(toolchains))
return
test_suite_properties['test.libs.%s.%s.%s'% (target, toolchain, test_id)] = ', '.join(libraries)
# TODO: move this 2 below loops to separate function
INC_DIRS = []
for lib_id in libraries:
if 'inc_dirs_ext' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['inc_dirs_ext']:
INC_DIRS.extend(LIBRARY_MAP[lib_id]['inc_dirs_ext'])
MACROS = []
for lib_id in libraries:
if 'macros' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['macros']:
MACROS.extend(LIBRARY_MAP[lib_id]['macros'])
MACROS.append('TEST_SUITE_TARGET_NAME="%s"'% target)
MACROS.append('TEST_SUITE_TEST_ID="%s"'% test_id)
test_uuid = uuid.uuid4()
MACROS.append('TEST_SUITE_UUID="%s"'% str(test_uuid))
project_name = self.opts_firmware_global_name if self.opts_firmware_global_name else None
try:
path = build_project(test.source_dir,
join(build_dir, test_id),
T,
toolchain,
test.dependencies,
options=build_project_options,
clean=clean_project_options,
verbose=self.opts_verbose,
name=project_name,
macros=MACROS,
inc_dirs=INC_DIRS,
jobs=self.opts_jobs)
except ToolException:
project_name_str = project_name if project_name is not None else test_id
print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building project %s'% (project_name_str))
# return self.test_summary, self.shuffle_random_seed, self.test_summary_ext, self.test_suite_properties_ext
q.put(target + '_'.join(toolchains))
return
if self.opts_only_build_tests:
# With this option we are skipping testing phase
continue
# Test duration can be increased by global value
test_duration = test.duration
if self.opts_extend_test_timeout is not None:
test_duration += self.opts_extend_test_timeout
# For an automated test the duration act as a timeout after
# which the test gets interrupted
test_spec = self.shape_test_request(target, path, test_id, test_duration)
test_loops = self.get_test_loop_count(test_id)
test_suite_properties['test.duration.%s.%s.%s'% (target, toolchain, test_id)] = test_duration
test_suite_properties['test.loops.%s.%s.%s'% (target, toolchain, test_id)] = test_loops
test_suite_properties['test.path.%s.%s.%s'% (target, toolchain, test_id)] = path
# read MUTs, test specification and perform tests
single_test_result, detailed_test_results = self.handle(test_spec, target, toolchain, test_loops=test_loops)
# Append test results to global test summary
if single_test_result is not None:
self.test_summary.append(single_test_result)
# Prepare extended test results data structure (it can be used to generate detailed test report)
if toolchain not in self.test_summary_ext:
self.test_summary_ext[toolchain] = {} # test_summary_ext : toolchain
if target not in self.test_summary_ext[toolchain]:
self.test_summary_ext[toolchain][target] = {} # test_summary_ext : toolchain : target
if target not in self.test_summary_ext[toolchain][target]:
self.test_summary_ext[toolchain][target][test_id] = detailed_test_results # test_summary_ext : toolchain : target : test_it
test_suite_properties['skipped'] = ', '.join(test_suite_properties['skipped'])
self.test_suite_properties_ext[target][toolchain] = test_suite_properties
# return self.test_summary, self.shuffle_random_seed, test_summary_ext, self.test_suite_properties_ext
q.put(target + '_'.join(toolchains))
return
def execute(self):
clean = self.test_spec.get('clean', False)
test_ids = self.test_spec.get('test_ids', [])
# This will store target / toolchain specific properties
test_suite_properties_ext = {} # target : toolchain
# Here we store test results
test_summary = []
# Here we store test results in extended data structure
test_summary_ext = {}
q = Queue()
# Generate seed for shuffle if seed is not provided in
self.shuffle_random_seed = round(random.random(), self.SHUFFLE_SEED_ROUND)
if self.opts_shuffle_test_seed is not None and self.is_shuffle_seed_float():
self.shuffle_random_seed = round(float(self.opts_shuffle_test_seed), self.SHUFFLE_SEED_ROUND)
for target, toolchains in self.test_spec['targets'].iteritems():
test_suite_properties_ext[target] = {}
for toolchain in toolchains:
# Test suite properties returned to external tools like CI
test_suite_properties = {}
test_suite_properties['jobs'] = self.opts_jobs
test_suite_properties['clean'] = clean
test_suite_properties['target'] = target
test_suite_properties['test_ids'] = ', '.join(test_ids)
test_suite_properties['toolchain'] = toolchain
test_suite_properties['shuffle_random_seed'] = self.shuffle_random_seed
if self.opts_parallel_test_exec:
###################################################################
# Experimental, parallel test execution per singletest instance.
###################################################################
execute_threads = [] # Threads used to build mbed SDL, libs, test cases and execute tests
# Note: We are building here in parallel for each target separately!
# So we are not building the same thing multiple times and compilers
# in separate threads do not collide.
# Inside execute_thread_slice() function function handle() will be called to
# get information about available MUTs (per target).
for target, toolchains in self.test_spec['targets'].iteritems():
self.test_suite_properties_ext[target] = {}
t = threading.Thread(target=self.execute_thread_slice, args = (q, target, toolchains, clean, test_ids))
t.daemon = True
t.start()
execute_threads.append(t)
# print '=== %s::%s ===' % (target, toolchain)
# Let's build our test
if target not in TARGET_MAP:
print self.logger.log_line(self.logger.LogType.NOTIF, 'Skipped tests for %s target. Target platform not found'% (target))
continue
T = TARGET_MAP[target]
build_mbed_libs_options = ["analyze"] if self.opts_goanna_for_mbed_sdk else None
clean_mbed_libs_options = True if self.opts_goanna_for_mbed_sdk or clean or self.opts_clean else None
try:
build_mbed_libs_result = build_mbed_libs(T,
toolchain,
options=build_mbed_libs_options,
clean=clean_mbed_libs_options,
jobs=self.opts_jobs)
if not build_mbed_libs_result:
print self.logger.log_line(self.logger.LogType.NOTIF, 'Skipped tests for %s target. Toolchain %s is not yet supported for this target'% (T.name, toolchain))
continue
except ToolException:
print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building MBED libs for %s using %s'% (target, toolchain))
return test_summary, self.shuffle_random_seed, test_summary_ext, test_suite_properties_ext
build_dir = join(BUILD_DIR, "test", target, toolchain)
test_suite_properties['build_mbed_libs_result'] = build_mbed_libs_result
test_suite_properties['build_dir'] = build_dir
test_suite_properties['skipped'] = []
# Enumerate through all tests and shuffle test order if requested
test_map_keys = sorted(TEST_MAP.keys())
if self.opts_shuffle_test_order:
random.shuffle(test_map_keys, self.shuffle_random_func)
# Update database with shuffle seed f applicable
if self.db_logger:
self.db_logger.reconnect();
if self.db_logger.is_connected():
self.db_logger.update_build_id_info(self.db_logger_build_id, _shuffle_seed=self.shuffle_random_func())
self.db_logger.disconnect();
if self.db_logger:
self.db_logger.reconnect();
if self.db_logger.is_connected():
# Update MUTs and Test Specification in database
self.db_logger.update_build_id_info(self.db_logger_build_id, _muts=self.muts, _test_spec=self.test_spec)
# Update Extra information in database (some options passed to test suite)
self.db_logger.update_build_id_info(self.db_logger_build_id, _extra=json.dumps(self.dump_options()))
self.db_logger.disconnect();
for test_id in test_map_keys:
test = TEST_MAP[test_id]
if self.opts_test_by_names and test_id not in self.opts_test_by_names.split(','):
continue
if test_ids and test_id not in test_ids:
continue
if self.opts_test_only_peripheral and not test.peripherals:
if self.opts_verbose_skipped_tests:
print self.logger.log_line(self.logger.LogType.INFO, 'Common test skipped for target %s'% (target))
test_suite_properties['skipped'].append(test_id)
continue
if self.opts_peripheral_by_names and test.peripherals and not len([i for i in test.peripherals if i in self.opts_peripheral_by_names.split(',')]):
# We will skip tests not forced with -p option
if self.opts_verbose_skipped_tests:
print self.logger.log_line(self.logger.LogType.INFO, 'Common test skipped for target %s'% (target))
test_suite_properties['skipped'].append(test_id)
continue
if self.opts_test_only_common and test.peripherals:
if self.opts_verbose_skipped_tests:
print self.logger.log_line(self.logger.LogType.INFO, 'Peripheral test skipped for target %s'% (target))
test_suite_properties['skipped'].append(test_id)
continue
if test.automated and test.is_supported(target, toolchain):
if test.peripherals is None and self.opts_only_build_tests:
# When users are using 'build only flag' and test do not have
# specified peripherals we can allow test building by default
pass
elif self.opts_peripheral_by_names and test_id not in self.opts_peripheral_by_names.split(','):
# If we force peripheral with option -p we expect test
# to pass even if peripheral is not in MUTs file.
pass
elif not self.is_peripherals_available(target, test.peripherals):
if self.opts_verbose_skipped_tests:
if test.peripherals:
print self.logger.log_line(self.logger.LogType.INFO, 'Peripheral %s test skipped for target %s'% (",".join(test.peripherals), target))
else:
print self.logger.log_line(self.logger.LogType.INFO, 'Test %s skipped for target %s'% (test_id, target))
test_suite_properties['skipped'].append(test_id)
continue
build_project_options = ["analyze"] if self.opts_goanna_for_tests else None
clean_project_options = True if self.opts_goanna_for_tests or clean or self.opts_clean else None
# Detect which lib should be added to test
# Some libs have to compiled like RTOS or ETH
libraries = []
for lib in LIBRARIES:
if lib['build_dir'] in test.dependencies:
libraries.append(lib['id'])
# Build libs for test
for lib_id in libraries:
try:
build_lib(lib_id,
T,
toolchain,
options=build_project_options,
verbose=self.opts_verbose,
clean=clean_mbed_libs_options,
jobs=self.opts_jobs)
except ToolException:
print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building library %s'% (lib_id))
return test_summary, self.shuffle_random_seed, test_summary_ext, test_suite_properties_ext
test_suite_properties['test.libs.%s.%s.%s'% (target, toolchain, test_id)] = ', '.join(libraries)
# TODO: move this 2 below loops to separate function
INC_DIRS = []
for lib_id in libraries:
if 'inc_dirs_ext' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['inc_dirs_ext']:
INC_DIRS.extend(LIBRARY_MAP[lib_id]['inc_dirs_ext'])
MACROS = []
for lib_id in libraries:
if 'macros' in LIBRARY_MAP[lib_id] and LIBRARY_MAP[lib_id]['macros']:
MACROS.extend(LIBRARY_MAP[lib_id]['macros'])
MACROS.append('TEST_SUITE_TARGET_NAME="%s"'% target)
MACROS.append('TEST_SUITE_TEST_ID="%s"'% test_id)
test_uuid = uuid.uuid4()
MACROS.append('TEST_SUITE_UUID="%s"'% str(test_uuid))
project_name = self.opts_firmware_global_name if self.opts_firmware_global_name else None
try:
path = build_project(test.source_dir,
join(build_dir, test_id),
T,
toolchain,
test.dependencies,
options=build_project_options,
clean=clean_project_options,
verbose=self.opts_verbose,
name=project_name,
macros=MACROS,
inc_dirs=INC_DIRS,
jobs=self.opts_jobs)
except ToolException:
project_name_str = project_name if project_name is not None else test_id
print self.logger.log_line(self.logger.LogType.ERROR, 'There were errors while building project %s'% (project_name_str))
return test_summary, self.shuffle_random_seed, test_summary_ext, test_suite_properties_ext
if self.opts_only_build_tests:
# With this option we are skipping testing phase
continue
# Test duration can be increased by global value
test_duration = test.duration
if self.opts_extend_test_timeout is not None:
test_duration += self.opts_extend_test_timeout
# For an automated test the duration act as a timeout after
# which the test gets interrupted
test_spec = self.shape_test_request(target, path, test_id, test_duration)
test_loops = self.get_test_loop_count(test_id)
test_suite_properties['test.duration.%s.%s.%s'% (target, toolchain, test_id)] = test_duration
test_suite_properties['test.loops.%s.%s.%s'% (target, toolchain, test_id)] = test_loops
test_suite_properties['test.path.%s.%s.%s'% (target, toolchain, test_id)] = path
# read MUTs, test specification and perform tests
single_test_result, detailed_test_results = self.handle(test_spec, target, toolchain, test_loops=test_loops)
# Append test results to global test summary
if single_test_result is not None:
test_summary.append(single_test_result)
# Prepare extended test results data structure (it can be used to generate detailed test report)
if toolchain not in test_summary_ext:
test_summary_ext[toolchain] = {} # test_summary_ext : toolchain
if target not in test_summary_ext[toolchain]:
test_summary_ext[toolchain][target] = {} # test_summary_ext : toolchain : target
if target not in test_summary_ext[toolchain][target]:
test_summary_ext[toolchain][target][test_id] = detailed_test_results # test_summary_ext : toolchain : target : test_it
test_suite_properties['skipped'] = ', '.join(test_suite_properties['skipped'])
test_suite_properties_ext[target][toolchain] = test_suite_properties
for t in execute_threads:
q.get() # t.join() would block some threads because we should not wait in any order for thread end
else:
# Serialized (not parallel) test execution
for target, toolchains in self.test_spec['targets'].iteritems():
if target not in self.test_suite_properties_ext:
self.test_suite_properties_ext[target] = {}
self.execute_thread_slice(q, target, toolchains, clean, test_ids)
q.get()
if self.db_logger:
self.db_logger.reconnect();
@ -510,7 +547,7 @@ class SingleTestRunner(object):
self.db_logger.update_build_id_info(self.db_logger_build_id, _status_fk=self.db_logger.BUILD_ID_STATUS_COMPLETED)
self.db_logger.disconnect();
return test_summary, self.shuffle_random_seed, test_summary_ext, test_suite_properties_ext
return self.test_summary, self.shuffle_random_seed, self.test_summary_ext, self.test_suite_properties_ext
def generate_test_summary_by_target(self, test_summary, shuffle_seed=None):
""" Prints well-formed summary with results (SQL table like)
@ -641,7 +678,8 @@ class SingleTestRunner(object):
def handle(self, test_spec, target_name, toolchain_name, test_loops=1):
""" Function determines MUT's mbed disk/port and copies binary to
target. Test is being invoked afterwards.
target.
Test is being invoked afterwards.
"""
data = json.loads(test_spec)
# Get test information, image and test timeout
@ -1527,7 +1565,7 @@ def get_default_test_options_parser():
parser.add_option('-p', '--peripheral-by-names',
dest='peripheral_by_names',
help='Forces discovery of particular peripherals. Use comma to separate peripheral names.')
help='Forces discovery of particular peripherals. Use comma to separate peripheral names.')
copy_methods = host_tests_plugins.get_plugin_caps('CopyMethod')
copy_methods_str = "Plugin support: " + ', '.join(copy_methods)
@ -1592,6 +1630,12 @@ def get_default_test_options_parser():
default=False,
help="Only build tests, skips actual test procedures (flashing etc.)")
parser.add_option('', '--parallel',
dest='parallel_test_exec',
default=False,
action="store_true",
help='Experimental, you execute test runners for connected to your host MUTs in parallel (speeds up test result collection)')
parser.add_option('', '--config',
dest='verbose_test_configuration_only',
default=False,