Refactored generic tests with timer host test supervision

Reactored RTOS tests
pull/900/head
Przemek Wirkus 2015-01-27 12:10:16 +00:00
parent 06848611d3
commit 6903b54b9e
23 changed files with 272 additions and 86 deletions

View File

@ -29,25 +29,25 @@ void notify_test_id(const char *test_id);
void notify_test_description(const char *description);
// Host test auto-detection API
#define TEST_START(TESTID) notify_start(); notify_test_id(TESTID)
#define TEST_START(TESTID) notify_test_id(TESTID); notify_start()
#define TEST_HOSTTEST(NAME) notify_host_test_name(#NAME)
#define TEST_TIMEOUT(SECONDS) notify_timeout(SECONDS)
#define TEST_DESCRIPTION(DESC) notify_test_description(#DESC)
#define TEST_RESULT(RESULT) notify_completion(RESULT)
/**
Test auto-detection preamble example:
main() {
TEST_START("MBED_10");
TEST_TIMEOUT(10);
TEST_HOSTTEST( host_test );
TEST_DESCRIPTION(Hello World);
TEST_START("MBED_10");
// Proper 'host_test.py' should take over supervising of this test
// Test code
bool result = ...;
notify_completion(result);
TEST_RESULT(result);
}
*/

View File

@ -2,7 +2,12 @@
int main()
{
notify_start();
TEST_TIMEOUT(10);
TEST_HOSTTEST(hello_auto);
TEST_DESCRIPTION(Hello World);
TEST_START("MBED_10");
printf("Hello World\r\n");
while(1);
}

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
void print_char(char c = '*')
{
@ -32,6 +33,11 @@ void flip_2() {
}
int main() {
TEST_TIMEOUT(15);
TEST_HOSTTEST(wait_us_auto);
TEST_DESCRIPTION(Ticker Int);
TEST_START("MBED_11");
led1 = 0;
led2 = 0;
flipper_1.attach(&flip_1, 1.0); // the address of the function to be attached (flip) and the interval (1 second)

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
Ticker tick;
DigitalOut led(LED1);
@ -26,6 +27,12 @@ void togglePin(void)
int main()
{
TEST_TIMEOUT(15);
TEST_HOSTTEST(wait_us_auto);
TEST_DESCRIPTION(Ticker Int us);
TEST_START("MBED_23");
tick.attach_us(togglePin, 1000);
while (1);
}

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
void ticker_callback_1(void);
void ticker_callback_2(void);
@ -31,6 +32,12 @@ void ticker_callback_1(void)
int main(void)
{
TEST_TIMEOUT(15);
TEST_HOSTTEST(wait_us_auto);
TEST_DESCRIPTION(Ticker Two callbacks);
TEST_START("MBED_34");
ticker.attach(ticker_callback_1, 1.0);
while(1);
}

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
DigitalOut led(LED1);
@ -14,6 +15,11 @@ void print_char(char c = '*')
int main()
{
TEST_TIMEOUT(15);
TEST_HOSTTEST(wait_us_auto);
TEST_DESCRIPTION(Time us);
TEST_START("MBED_25");
while (true) {
for (int i = 0; i < MS_INTERVALS; i++) {
wait_us(1000);

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
Timeout timer;
DigitalOut led(LED1);
@ -7,16 +8,14 @@ namespace {
const int MS_INTERVALS = 1000;
}
void print_char(char c = '*')
{
void print_char(char c = '*') {
printf("%c", c);
fflush(stdout);
}
void toggleOff(void);
void toggleOn(void)
{
void toggleOn(void) {
static int toggle_counter = 0;
if (toggle_counter == MS_INTERVALS) {
led = !led;
@ -27,13 +26,17 @@ void toggleOn(void)
timer.attach_us(toggleOff, 500);
}
void toggleOff(void)
{
void toggleOff(void) {
timer.attach_us(toggleOn, 500);
}
int main()
{
int main() {
TEST_TIMEOUT(15);
TEST_HOSTTEST(wait_us_auto);
TEST_DESCRIPTION(Timeout Int us);
TEST_START("MBED_24");
toggleOn();
while (1);
}

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
#include "rtos.h"
/*
@ -12,8 +13,7 @@
#define STACK_SIZE DEFAULT_STACK_SIZE
#endif
void print_char(char c = '*')
{
void print_char(char c = '*') {
printf("%c", c);
fflush(stdout);
}
@ -30,6 +30,11 @@ void led2_thread(void const *argument) {
}
int main() {
TEST_TIMEOUT(15);
TEST_HOSTTEST(wait_us_auto);
TEST_DESCRIPTION(Basic thread);
TEST_START("RTOS_1");
Thread thread(led2_thread, NULL, osPriorityNormal, STACK_SIZE);
while (true) {

View File

@ -46,7 +46,7 @@ void sd_thread(void const *argument)
}
fclose(f);
} else {
notify_completion(false);
TEST_RESULT(false);
return;
}
}
@ -65,7 +65,7 @@ void sd_thread(void const *argument)
}
fclose(f);
} else {
notify_completion(false);
TEST_RESULT(false);
return;
}
}
@ -74,15 +74,19 @@ void sd_thread(void const *argument)
// check that the data written == data read
for (int i = 0; i < SIZE; i++) {
if (data_written[i] != data_read[i]) {
notify_completion(false);
TEST_RESULT(false);
return;
}
}
notify_completion(true);
TEST_RESULT(true);
}
int main()
{
int main() {
TEST_TIMEOUT(20);
TEST_HOSTTEST(default_auto);
TEST_DESCRIPTION(SD File write-read);
TEST_START("RTOS_9");
Thread t(sd_thread, NULL, osPriorityNormal, (DEFAULT_STACK_SIZE * 2.25));
while (true) {

View File

@ -36,6 +36,11 @@ void queue_thread(void const *argument) {
}
int main (void) {
TEST_TIMEOUT(20);
TEST_HOSTTEST(default_auto);
TEST_DESCRIPTION(ISR (Queue));
TEST_START("RTOS_8");
Thread thread(queue_thread, NULL, osPriorityNormal, STACK_SIZE);
Ticker ticker;
ticker.attach(queue_isr, 1.0);
@ -59,6 +64,6 @@ int main (void) {
}
}
notify_completion(result);
TEST_RESULT(result);
return 0;
}

View File

@ -40,6 +40,11 @@ void send_thread (void const *argument) {
}
int main (void) {
TEST_TIMEOUT(20);
TEST_HOSTTEST(default_auto);
TEST_DESCRIPTION(Mail messaging);
TEST_START("RTOS_6");
Thread thread(send_thread, NULL, osPriorityNormal, STACK_SIZE);
bool result = true;
int result_counter = 0;
@ -65,6 +70,6 @@ int main (void) {
}
}
}
notify_completion(result);
TEST_RESULT(result);
return 0;
}

View File

@ -16,8 +16,7 @@
#define STACK_SIZE DEFAULT_STACK_SIZE
#endif
void print_char(char c = '*')
{
void print_char(char c = '*') {
printf("%c", c);
fflush(stdout);
}
@ -60,6 +59,11 @@ void test_thread(void const *args) {
}
int main() {
TEST_TIMEOUT(20);
TEST_HOSTTEST(default);
TEST_DESCRIPTION(Mutex resource lock);
TEST_START("RTOS_2");
const int t1_delay = THREAD_DELAY * 1;
const int t2_delay = THREAD_DELAY * 2;
const int t3_delay = THREAD_DELAY * 3;
@ -78,6 +82,6 @@ int main() {
}
fflush(stdout);
notify_completion(!mutex_defect);
TEST_RESULT(!mutex_defect);
return 0;
}

View File

@ -42,6 +42,11 @@ void send_thread (void const *argument) {
}
int main (void) {
TEST_TIMEOUT(20);
TEST_HOSTTEST(default_auto);
TEST_DESCRIPTION(Queue messaging);
TEST_START("RTOS_5");
Thread thread(send_thread, NULL, osPriorityNormal, STACK_SIZE);
bool result = true;
int result_counter = 0;
@ -67,6 +72,6 @@ int main (void) {
}
}
}
notify_completion(result);
TEST_RESULT(result);
return 0;
}

View File

@ -17,8 +17,7 @@
#define STACK_SIZE DEFAULT_STACK_SIZE
#endif
void print_char(char c = '*')
{
void print_char(char c = '*') {
printf("%c", c);
fflush(stdout);
}
@ -49,6 +48,11 @@ void test_thread(void const *delay) {
}
int main (void) {
TEST_TIMEOUT(20);
TEST_HOSTTEST(default_auto);
TEST_DESCRIPTION(Semaphore resource lock);
TEST_START("RTOS_3");
const int t1_delay = THREAD_DELAY * 1;
const int t2_delay = THREAD_DELAY * 2;
const int t3_delay = THREAD_DELAY * 3;
@ -66,6 +70,6 @@ int main (void) {
}
fflush(stdout);
notify_completion(!sem_defect);
TEST_RESULT(!sem_defect);
return 0;
}

View File

@ -30,6 +30,11 @@ void led_thread(void const *argument) {
}
int main (void) {
TEST_TIMEOUT(20);
TEST_HOSTTEST(default_auto);
TEST_DESCRIPTION(Signals messaging);
TEST_START("RTOS_4");
Thread thread(led_thread, NULL, osPriorityNormal, STACK_SIZE);
bool result = true;
@ -41,6 +46,6 @@ int main (void) {
break;
}
}
notify_completion(result);
TEST_RESULT(result);
return 0;
}

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
#include "rtos.h"
DigitalOut LEDs[4] = {
@ -22,6 +23,11 @@ void blink(void const *n) {
}
int main(void) {
TEST_TIMEOUT(15);
TEST_HOSTTEST(wait_us_auto);
TEST_DESCRIPTION(Timer);
TEST_START("RTOS_7");
RtosTimer led_1_timer(blink, osTimerPeriodic, (void *)0);
RtosTimer led_2_timer(blink, osTimerPeriodic, (void *)1);
RtosTimer led_3_timer(blink, osTimerPeriodic, (void *)2);

View File

@ -13,4 +13,27 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
"""
from host_registry import HostRegistry
from default_auto import DefaultAuto
from hello_auto import HelloTest
from wait_us_auto import WaitusTest
HOSTREGISTRY = HostRegistry()
HOSTREGISTRY.register_host_test("default", DefaultAuto())
HOSTREGISTRY.register_host_test("default_auto", DefaultAuto())
HOSTREGISTRY.register_host_test("hello_auto", HelloTest())
HOSTREGISTRY.register_host_test("wait_us_auto", WaitusTest())
###############################################################################
# Functional interface for test supervisor registry
###############################################################################
def get_host_test(ht_name):
return HOSTREGISTRY.get_host_test(ht_name)
def is_host_test(ht_name):
return HOSTREGISTRY.is_host_test(ht_name)

View File

@ -0,0 +1,36 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from sys import stdout
class DefaultAuto():
""" Simple, basic host test's test runner waiting for serial port
output from MUT, no supervision over test running in MUT is executed.
"""
def test(self, selftest):
result = selftest.RESULT_SUCCESS
try:
while True:
c = selftest.mbed.serial_read(512)
if c is None:
return selftest.RESULT_IO_SERIAL
stdout.write(c)
stdout.flush()
except KeyboardInterrupt, _:
selftest.notify("\r\n[CTRL+C] exit")
result = selftest.RESULT_ERROR
return result

View File

@ -15,23 +15,15 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
from host_test import DefaultTest
class HelloTest(DefaultTest):
class HelloTest():
HELLO_WORLD = "Hello World"
def test(self):
c = self.mbed.serial_readline()
def test(self, selftest):
c = selftest.mbed.serial_readline()
if c is None:
return self.RESULT_IO_SERIAL
self.notify(c.strip())
c = self.mbed.serial_readline()
if c is None:
return self.RESULT_IO_SERIAL
self.notify("Read %d bytes:"% len(c))
self.notify(c.strip())
return selftest.RESULT_IO_SERIAL
selftest.notify("Read %d bytes:"% len(c))
selftest.notify(c.strip())
result = True
# Because we can have targetID here let's try to decode
@ -39,8 +31,4 @@ class HelloTest(DefaultTest):
result = False
else:
result = self.HELLO_WORLD in c
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':
HelloTest().run()
return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE

View File

@ -0,0 +1,36 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
class HostRegistry:
""" Class stores registry with host tests and objects representing them
"""
HOST_TESTS = {} # host_test_name -> host_test_ojbect
def register_host_test(self, ht_name, ht_object):
if ht_name not in self.HOST_TESTS:
self.HOST_TESTS[ht_name] = ht_object
def unregister_host_test(self):
if ht_name in HOST_TESTS:
self.HOST_TESTS[ht_name] = None
def get_host_test(self, ht_name):
return self.HOST_TESTS[ht_name] if ht_name in self.HOST_TESTS else None
def is_host_test(self, ht_name):
return ht_name in self.HOST_TESTS

View File

@ -23,6 +23,8 @@ except ImportError, e:
exit(-1)
import os
import re
import types
from sys import stdout
from time import sleep, time
from optparse import OptionParser
@ -254,12 +256,37 @@ class HostTestResults:
self.RESULT_PASSIVE = "passive"
import workspace_tools.host_tests as host_tests
class Test(HostTestResults):
""" Base class for host test's test runner
"""
# Select default host_test supervision (replaced after autodetection)
test_supervisor = host_tests.get_host_test("default")
def __init__(self):
self.mbed = Mbed()
def detect_test_config(self, verbose=False):
""" Detects test case configuration
"""
result = {}
while True:
line = self.mbed.serial_readline()
if "{start}" in line:
self.notify("HOST: Start test...")
break
else:
m = re.search('{([\w_]+);([\w\d ]+)}}', line[:-1])
if m and len(m.groups()) == 2:
result[m.group(1)] = m.group(2)
if verbose:
self.notify("HOST: Property '%s' = '%s'"% (m.group(1), m.group(2)))
else:
self.notify("HOST: Unknown property: %s"% line.strip())
return result
def run(self):
""" Test runner for host test. This function will start executing
test and forward test result via serial port to test suite
@ -284,7 +311,13 @@ class Test(HostTestResults):
# Run test
try:
result = self.test()
CONFIG = self.detect_test_config(verbose=True) # print CONFIG
if "host_test_name" in CONFIG:
if host_tests.is_host_test(CONFIG["host_test_name"]):
self.test_supervisor = host_tests.get_host_test(CONFIG["host_test_name"])
result = self.test_supervisor.test(self) #result = self.test()
if result is not None:
self.print_result(result)
else:
@ -341,6 +374,5 @@ class Simple(DefaultTest):
result = self.RESULT_ERROR
return result
if __name__ == '__main__':
Simple().run()

View File

@ -16,10 +16,8 @@ limitations under the License.
"""
from time import time
from host_test import DefaultTest
class WaitusTest(DefaultTest):
class WaitusTest():
""" This test is reading single characters from stdio
and measures time between their occurrences.
"""
@ -27,30 +25,30 @@ class WaitusTest(DefaultTest):
TICK_LOOP_SUCCESSFUL_COUNTS = 10
DEVIATION = 0.10 # +/-10%
def test(self):
def test(self, selftest):
test_result = True
# First character to start test (to know after reset when test starts)
if self.mbed.set_serial_timeout(None) is None:
return self.RESULT_IO_SERIAL
c = self.mbed.serial_read(1)
if selftest.mbed.set_serial_timeout(None) is None:
return selftest.RESULT_IO_SERIAL
c = selftest.mbed.serial_read(1)
if c is None:
return self.RESULT_IO_SERIAL
return selftest.RESULT_IO_SERIAL
if c == '$': # target will printout TargetID e.g.: $$$$1040e649d5c09a09a3f6bc568adef61375c6
#Read additional 39 bytes of TargetID
if self.mbed.serial_read(39) is None:
return self.RESULT_IO_SERIAL
c = self.mbed.serial_read(1) # Re-read first 'tick'
if selftest.mbed.serial_read(39) is None:
return selftest.RESULT_IO_SERIAL
c = selftest.mbed.serial_read(1) # Re-read first 'tick'
if c is None:
return self.RESULT_IO_SERIAL
return selftest.RESULT_IO_SERIAL
start_serial_pool = time()
start = time()
success_counter = 0
for i in range(0, self.TICK_LOOP_COUNTER):
c = self.mbed.serial_read(1)
c = selftest.mbed.serial_read(1)
if c is None:
return self.RESULT_IO_SERIAL
return selftest.RESULT_IO_SERIAL
delta = time() - start
deviation = abs(delta - 1)
# Round values
@ -60,16 +58,12 @@ class WaitusTest(DefaultTest):
deviation_ok = True if delta > 0 and deviation <= self.DEVIATION else False
success_counter = success_counter+1 if deviation_ok else 0
msg = "OK" if deviation_ok else "FAIL"
self.notify("%s in %.2f sec (%.2f) [%s]"% (c, delta, deviation, msg))
selftest.notify("%s in %.2f sec (%.2f) [%s]"% (c, delta, deviation, msg))
start = time()
if success_counter >= self.TICK_LOOP_SUCCESSFUL_COUNTS:
break
measurement_time = time() - start_serial_pool
self.notify("Consecutive OK timer reads: %d"% success_counter)
self.notify("Completed in %.2f sec" % (measurement_time))
selftest.notify("Consecutive OK timer reads: %d"% success_counter)
selftest.notify("Completed in %.2f sec" % (measurement_time))
test_result = True if success_counter >= self.TICK_LOOP_SUCCESSFUL_COUNTS else False
return self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE
if __name__ == '__main__':
WaitusTest().run()
return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE

View File

@ -375,14 +375,14 @@ TESTS = [
"source_dir": join(TEST_DIR, "mbed", "hello"),
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"automated": True,
"host_test": "hello_auto",
#"host_test": "hello_auto",
},
{
"id": "MBED_11", "description": "Ticker Int",
"source_dir": join(TEST_DIR, "mbed", "ticker"),
"dependencies": [MBED_LIBRARIES],
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"automated": True,
"host_test": "wait_us_auto",
#"host_test": "wait_us_auto",
"duration": 20,
},
{
@ -455,7 +455,7 @@ TESTS = [
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"duration": 15,
"automated": True,
"host_test": "wait_us_auto"
#"host_test": "wait_us_auto"
},
{
"id": "MBED_24", "description": "Timeout Int us",
@ -463,7 +463,7 @@ TESTS = [
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"duration": 15,
"automated": True,
"host_test": "wait_us_auto"
#"host_test": "wait_us_auto"
},
{
"id": "MBED_25", "description": "Time us",
@ -471,7 +471,7 @@ TESTS = [
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"duration": 15,
"automated": True,
"host_test": "wait_us_auto"
#"host_test": "wait_us_auto"
},
{
"id": "MBED_26", "description": "Integer constant division",
@ -525,7 +525,7 @@ TESTS = [
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"duration": 15,
"automated": True,
"host_test": "wait_us_auto"
#"host_test": "wait_us_auto"
},
@ -579,10 +579,10 @@ TESTS = [
{
"id": "RTOS_1", "description": "Basic thread",
"source_dir": join(TEST_DIR, "rtos", "mbed", "basic"),
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES],
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, TEST_MBED_LIB],
"duration": 15,
"automated": True,
"host_test": "wait_us_auto",
#"host_test": "wait_us_auto",
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812", "KL25Z", "KL05Z", "K64F", "KL46Z", "RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE", "NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8", "NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC", "DISCO_F401VC"],
},
{
@ -625,10 +625,10 @@ TESTS = [
{
"id": "RTOS_7", "description": "Timer",
"source_dir": join(TEST_DIR, "rtos", "mbed", "timer"),
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES],
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, TEST_MBED_LIB],
"duration": 15,
"automated": True,
"host_test": "wait_us_auto",
#"host_test": "wait_us_auto",
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812", "KL25Z", "KL05Z", "K64F", "KL46Z", "RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE", "NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8", "NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC", "DISCO_F401VC"],
},
{