Merge pull request #900 from PrzemekWirkus/host_test_autodetection

Host test autodetection improvements
pull/917/head
Martin Kojtal 2015-02-16 09:37:56 +00:00
commit c9e7f409af
74 changed files with 1080 additions and 549 deletions

View File

@ -1,5 +1,8 @@
---
install: "sudo $TRAVIS_BUILD_DIR/travis/install_dependencies.sh > /dev/null"
python:
- "2.7"
script: "python workspace_tools/build_travis.py"
install:
- "sudo $TRAVIS_BUILD_DIR/travis/install_dependencies.sh > /dev/null"
- sudo pip install colorama
- sudo pip install prettytable

View File

@ -1,4 +1,9 @@
#include "test_env.h"
int main() {
notify_completion(true);
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(Basic);
MBED_HOSTTEST_START("MBED_A1");
MBED_HOSTTEST_RESULT(true);
}

View File

@ -10,6 +10,12 @@ extern "C" void mbed_main() {
}
int main() {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(Call function mbed_main before main);
MBED_HOSTTEST_START("MBED_A21");
printf("MBED: main() starts now!\r\n");
notify_completion(mbed_main_called);
MBED_HOSTTEST_RESULT(mbed_main_called);
}

View File

@ -54,6 +54,11 @@ Heap::hello
Heap::destroy
*******************/
int main (void) {
MBED_HOSTTEST_TIMEOUT(10);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(C++);
MBED_HOSTTEST_START("MBED_12");
bool result = true;
for (;;)
{
@ -77,6 +82,5 @@ int main (void) {
break;
}
notify_completion(result);
return 0;
MBED_HOSTTEST_RESULT(result);
}

View File

@ -2,9 +2,14 @@
#include "test_env.h"
int main() {
MBED_HOSTTEST_TIMEOUT(10);
MBED_HOSTTEST_SELECT(detect_auto);
MBED_HOSTTEST_DESCRIPTION(Simple detect test);
MBED_HOSTTEST_START("DTCT_1");
notify_start();
printf("MBED: Target '%s'\r\n", TEST_SUITE_TARGET_NAME);
printf("MBED: Test ID '%s'\r\n", TEST_SUITE_TEST_ID);
printf("MBED: UUID '%s'\r\n", TEST_SUITE_UUID);
notify_completion(true);
MBED_HOSTTEST_RESULT(true);
}

View File

@ -1,8 +1,7 @@
#include "mbed.h"
#include "test_env.h"
class DevNull : public Stream
{
class DevNull : public Stream {
public:
DevNull(const char *name = NULL) : Stream(name) {}
@ -17,12 +16,15 @@ protected:
DevNull null("null");
int main()
{
int main() {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(dev_null_auto);
MBED_HOSTTEST_DESCRIPTION(stdout redirected to dev null);
MBED_HOSTTEST_START("EXAMPLE_1");
printf("MBED: re-routing stdout to /null\r\n");
freopen("/null", "w", stdout);
printf("MBED: printf redirected to /null\r\n"); // This shouldn't appear
// If failure message can be seen test should fail :)
notify_completion(false); // This is 'false' on purpose
return 0;
MBED_HOSTTEST_RESULT(false); // This is 'false' on purpose
}

View File

@ -41,20 +41,24 @@ DigitalIn in(p25);
#endif
int main()
{
int main() {
MBED_HOSTTEST_TIMEOUT(10);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(DigitalIn DigitalOut);
MBED_HOSTTEST_START("MBED_A5");
out = 0;
wait(0.1);
if (in != 0) {
printf("ERROR: in != 0\n");
notify_completion(false);
MBED_HOSTTEST_RESULT(false);
}
out = 1;
wait(0.1);
if (in != 1) {
printf("ERROR: in != 1\n");
notify_completion(false);
MBED_HOSTTEST_RESULT(false);
}
notify_completion(true);
MBED_HOSTTEST_RESULT(true);
}

View File

@ -44,6 +44,11 @@ DigitalInOut d2(p25);
int main()
{
MBED_HOSTTEST_TIMEOUT(10);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(DigitalInOut);
MBED_HOSTTEST_START("MBED_A6");
bool check = true;
d1.output();
@ -76,5 +81,5 @@ int main()
check = false;
}
notify_completion(check);
MBED_HOSTTEST_RESULT(check);
}

View File

@ -17,6 +17,11 @@ const char *result_str(bool result) {
}
int main() {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(Integer constant division);
MBED_HOSTTEST_START("MBED_26");
bool result = true;
{ // 0xFFFFFFFF * 8 = 0x7fffffff8
@ -35,6 +40,5 @@ int main() {
printf("64bit: 0x17FFFFFFE8: expected 0x%lX got 0x%lX ... %s\r\n", values.first, test_ret, result_str(test_res));
}
notify_completion(result);
return 0;
MBED_HOSTTEST_RESULT(result);
}

View File

@ -7,18 +7,18 @@
namespace {
const int BUFFER_SIZE = 48;
char buffer[BUFFER_SIZE] = {0};
}
int main() {
char buffer[BUFFER_SIZE] = {0};
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(echo);
MBED_HOSTTEST_DESCRIPTION(Serial Echo at 115200);
MBED_HOSTTEST_START("MBED_A9");
Serial pc(TXPIN, RXPIN);
pc.baud(115200);
pc.puts("{{");
pc.puts(TEST_ENV_START); // Host test is expecting preamble
pc.puts("}}");
while (1) {
pc.gets(buffer, BUFFER_SIZE - 1);
pc.printf("%s", buffer);

View File

@ -56,6 +56,29 @@ bool notify_completion_str(bool success, char* buffer)
return result;
}
// Host test auto-detection API
void notify_host_test_name(const char *host_test) {
if (host_test) {
printf("{{host_test_name;%s}}" NL, host_test);
}
}
void notify_timeout(int timeout) {
printf("{{timeout;%d}}" NL, timeout);
}
void notify_test_id(const char *test_id) {
if (test_id) {
printf("{{test_id;%s}}" NL, test_id);
}
}
void notify_test_description(const char *description) {
if (description) {
printf("{{description;%s}}" NL, description);
}
}
// -DMBED_BUILD_TIMESTAMP=1406208182.13
unsigned int testenv_randseed()

View File

@ -22,6 +22,36 @@ void notify_performance_coefficient(const char* measurement_name, const int valu
void notify_performance_coefficient(const char* measurement_name, const unsigned int value);
void notify_performance_coefficient(const char* measurement_name, const double value);
// Host test auto-detection API
void notify_host_test_name(const char *host_test);
void notify_timeout(int timeout);
void notify_test_id(const char *test_id);
void notify_test_description(const char *description);
// Host test auto-detection API
#define MBED_HOSTTEST_START(TESTID) notify_test_id(TESTID); notify_start()
#define MBED_HOSTTEST_SELECT(NAME) notify_host_test_name(#NAME)
#define MBED_HOSTTEST_TIMEOUT(SECONDS) notify_timeout(SECONDS)
#define MBED_HOSTTEST_DESCRIPTION(DESC) notify_test_description(#DESC)
#define MBED_HOSTTEST_RESULT(RESULT) notify_completion(RESULT)
/**
Test auto-detection preamble example:
main() {
MBED_HOSTTEST_TIMEOUT(10);
MBED_HOSTTEST_SELECT( host_test );
MBED_HOSTTEST_DESCRIPTION(Hello World);
MBED_HOSTTEST_START("MBED_10");
// Proper 'host_test.py' should take over supervising of this test
// Test code
bool result = ...;
MBED_HOSTTEST_RESULT(result);
}
*/
// Test functionality useful during testing
unsigned int testenv_randseed();

View File

@ -6,8 +6,7 @@ Serial pc(USBTX, USBRX);
#define FILENAME "/local/out.txt"
#define TEST_STRING "Hello World!"
FILE *test_open(const char *mode)
{
FILE *test_open(const char *mode) {
FILE *f = fopen(FILENAME, mode);
if (f == NULL) {
printf("Error opening file"NL);
@ -16,8 +15,7 @@ FILE *test_open(const char *mode)
return f;
}
void test_write(FILE *f, char *str, int str_len)
{
void test_write(FILE *f, char *str, int str_len) {
int n = fprintf(f, str);
if (n != str_len) {
@ -26,8 +24,7 @@ void test_write(FILE *f, char *str, int str_len)
}
}
void test_read(FILE *f, char *str, int str_len)
{
void test_read(FILE *f, char *str, int str_len) {
int n = fread(str, sizeof(unsigned char), str_len, f);
if (n != str_len) {
@ -36,8 +33,7 @@ void test_read(FILE *f, char *str, int str_len)
}
}
void test_close(FILE *f)
{
void test_close(FILE *f) {
int rc = fclose(f);
if (rc != 0) {
@ -46,8 +42,12 @@ void test_close(FILE *f)
}
}
int main()
{
int main() {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(Semihost file system);
MBED_HOSTTEST_START("MBED_A2");
pc.printf("Test the Stream class\n");
printf("connected: %s\n", (semihost_connected()) ? ("Yes") : ("No"));
@ -74,5 +74,5 @@ int main()
test_close(f);
// Check the two strings are equal
notify_completion((strncmp(buffer, str, str_len) == 0));
MBED_HOSTTEST_RESULT((strncmp(buffer, str, str_len) == 0));
}

View File

@ -2,7 +2,12 @@
int main()
{
notify_start();
MBED_HOSTTEST_TIMEOUT(5);
MBED_HOSTTEST_SELECT(hello_auto);
MBED_HOSTTEST_DESCRIPTION(Hello World);
MBED_HOSTTEST_START("MBED_10");
printf("Hello World\r\n");
while(1);
}

View File

@ -29,6 +29,11 @@ float calc_3d_vector_len(float x, float y, float z) {
#define MEASURE_DEVIATION_TOLERANCE 0.025 // 2.5%
int main(void) {
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(MMA8451Q accelerometer);
MBED_HOSTTEST_START("KL25Z_5");
DigitalOut led(LED_GREEN);
MMA8451Q acc(SDA, SCL, MMA8451_I2C_ADDRESS);
bool result = true;
@ -47,5 +52,5 @@ int main(void) {
wait(0.5);
led = !led;
}
notify_completion(result);
MBED_HOSTTEST_RESULT(result);
}

View File

@ -32,13 +32,17 @@ TMP102 temperature(I2C_SDA, I2C_SCL, 0x90);
TMP102 temperature(p28, p27, 0x90);
#endif
int main()
{
int main() {
MBED_HOSTTEST_TIMEOUT(10);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(DigitalIn DigitalOut);
MBED_HOSTTEST_START("MBED_A4");
float t = temperature.read();
printf("TMP102: Temperature: %f\n\r", t);
// In our test environment (ARM office) we should get a temperature within
// the range ]15, 30[C
bool result = (t > 15.0) && (t < 30.0);
notify_completion(result);
MBED_HOSTTEST_RESULT(result);
}

View File

@ -73,8 +73,12 @@ const int i2c_freq_hz = 400000;
const int i2c_delay_us = 0;
}
int main()
{
int main() {
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(I2C EEPROM read write test);
MBED_HOSTTEST_START("MBED_A19");
const int EEPROM_MEM_ADDR = 0xA0;
const char MARK = 0x66;
int fw = 0;
@ -146,5 +150,5 @@ int main()
printf("\tTotal failures: %d\r\n", fw + fr + fc);
}
notify_completion(result);
MBED_HOSTTEST_RESULT(result);
}

View File

@ -22,13 +22,13 @@
******************************************************************************/
// Test configuration block
namespace
{
namespace {
const int ntests = 1000;
const int i2c_freq_hz = 400000;
const int i2c_delay_us = 0;
// const int EEPROM_24LC256_SIZE = (256 * 1024 / 8); // 256 kbit memory
}
// End of test configuration block
#if defined(TARGET_KL25Z)
@ -78,8 +78,12 @@ I2C i2c(p28, p27);
#define PATTERN_MASK 0x66, ~0x66, 0x00, 0xFF, 0xA5, 0x5A, 0xF0, 0x0F
int main()
{
int main() {
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(I2C EEPROM line read write test);
MBED_HOSTTEST_START("MBED_A25");
const int EEPROM_MEM_ADDR = 0xA0;
bool result = true;
@ -137,5 +141,5 @@ int main()
printf("EEPROM: Pattern match errors: %d/%d ... [%s]\r\n", pattern_errors, ntests, pattern_errors ? "FAIL" : "OK");
result = write_errors == 0 && read_errors == 0;
notify_completion(result);
MBED_HOSTTEST_RESULT(result);
}

View File

@ -87,6 +87,11 @@ void flipper() {
}
int main() {
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(InterruptIn);
MBED_HOSTTEST_START("MBED_A7");
IN_OUT_CLEAR;
//Test falling edges first
in.rise(NULL);
@ -95,7 +100,7 @@ int main() {
if(checks != 5) {
printf("MBED: falling edges test failed: %d\r\n",checks);
notify_completion(false);
MBED_HOSTTEST_RESULT(false);
}
//Now test rising edges
@ -105,7 +110,7 @@ int main() {
if (checks != 10) {
printf("MBED: raising edges test failed: %d\r\n", checks);
notify_completion(false);
MBED_HOSTTEST_RESULT(false);
}
//Now test switch off edge detection
@ -115,7 +120,7 @@ int main() {
if (checks != 10) {
printf("MBED: edge detection switch off test failed: %d\r\n", checks);
notify_completion(false);
MBED_HOSTTEST_RESULT(false);
}
//Finally test both
@ -125,9 +130,8 @@ int main() {
if (checks != 20) {
printf("MBED: Simultaneous rising and falling edges failed: %d\r\n", checks);
notify_completion(false);
MBED_HOSTTEST_RESULT(false);
}
notify_completion(true);
return 0;
MBED_HOSTTEST_RESULT(true);
}

View File

@ -91,6 +91,11 @@ PortInOut port1(PORT_1, MASK_1);
PortInOut port2(PORT_2, MASK_2);
int main() {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(PortInOut);
MBED_HOSTTEST_START("MBED_A11");
bool check = true;
port1.output();
@ -111,5 +116,5 @@ int main() {
port2 = 0; wait(0.1);
if (port1 != 0) check = false;
notify_completion(check);
MBED_HOSTTEST_RESULT(check);
}

View File

@ -91,6 +91,11 @@ PortOut port_out(PORT_1, MASK_1);
PortIn port_in (PORT_2, MASK_2);
int main() {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(PortOut PortIn);
MBED_HOSTTEST_START("MBED_A10");
port_out = MASK_1;
wait(0.1);
int value = port_in.read();

View File

@ -1,8 +1,14 @@
#include "mbed.h"
#include "test_env.h"
#define CUSTOM_TIME 1256729737
int main() {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(rtc_auto);
MBED_HOSTTEST_DESCRIPTION(RTC);
MBED_HOSTTEST_START("MBED_16");
char buffer[32] = {0};
set_time(CUSTOM_TIME); // Set RTC time to Wed, 28 Oct 2009 11:35:37
while(1) {

View File

@ -61,8 +61,12 @@ const char *sd_file_path = "/sd/out.txt";
const int DATA_SIZE = 256;
}
int main()
{
int main() {
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(SD File System);
MBED_HOSTTEST_START("MBED_A12");
uint8_t data_written[DATA_SIZE] = { 0 };
bool result = false;
@ -103,5 +107,5 @@ int main()
}
result = write_result && read_result;
notify_completion(result);
MBED_HOSTTEST_RESULT(result);
}

View File

@ -55,16 +55,14 @@ SDFileSystem sd(SDMOSI, SDMISO, SDSCLK, SDSSEL, "sd");
SDFileSystem sd(p11, p12, p13, p14, "sd");
#endif
namespace
{
namespace {
char buffer[1024];
const int KIB_RW = 128;
Timer timer;
const char *bin_filename = "0:testfile.bin";
}
bool test_sf_file_write_fatfs(const char *filename, const int kib_rw)
{
bool test_sf_file_write_fatfs(const char *filename, const int kib_rw) {
FIL file;
bool result = true;
FRESULT res = f_open(&file, filename, FA_WRITE | FA_CREATE_ALWAYS);
@ -96,8 +94,7 @@ bool test_sf_file_write_fatfs(const char *filename, const int kib_rw)
return result;
}
bool test_sf_file_read_fatfs(const char *filename, const int kib_rw)
{
bool test_sf_file_read_fatfs(const char *filename, const int kib_rw) {
FIL file;
bool result = true;
FRESULT res = f_open(&file, filename, FA_READ | FA_OPEN_EXISTING);
@ -123,13 +120,16 @@ bool test_sf_file_read_fatfs(const char *filename, const int kib_rw)
return result;
}
char RandomChar()
{
char RandomChar() {
return rand() % 100;
}
int main()
{
int main() {
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(SD FatFS RW Speed);
MBED_HOSTTEST_START("PERF_3");
// Test header
printf("\r\n");
printf("SD Card FatFS Performance Test\r\n");
@ -156,5 +156,5 @@ int main()
}
break;
}
notify_completion(result);
MBED_HOSTTEST_RESULT(result);
}

View File

@ -55,16 +55,14 @@ SDFileSystem sd(SDMOSI, SDMISO, SDSCLK, SDSSEL, "sd");
SDFileSystem sd(p11, p12, p13, p14, "sd");
#endif
namespace
{
namespace {
char buffer[1024];
const int KIB_RW = 128;
Timer timer;
const char *bin_filename = "testfile.bin";
}
bool test_sf_file_write_fhandle(const char *filename, const int kib_rw)
{
bool test_sf_file_write_fhandle(const char *filename, const int kib_rw) {
bool result = true;
FileHandle* file = sd.open(filename, O_WRONLY | O_CREAT | O_TRUNC);
if (file != NULL) {
@ -94,8 +92,7 @@ bool test_sf_file_write_fhandle(const char *filename, const int kib_rw)
return result;
}
bool test_sf_file_read_fhandle(const char *filename, const int kib_rw)
{
bool test_sf_file_read_fhandle(const char *filename, const int kib_rw) {
bool result = true;
FileHandle* file = sd.open(filename, O_RDONLY);
if (file) {
@ -118,13 +115,16 @@ bool test_sf_file_read_fhandle(const char *filename, const int kib_rw)
return result;
}
char RandomChar()
{
char RandomChar() {
return rand() % 100;
}
int main()
{
int main() {
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(SD FileHandle RW Speed);
MBED_HOSTTEST_START("PERF_2");
// Test header
printf("\r\n");
printf("SD Card FileHandle Performance Test\r\n");
@ -151,5 +151,5 @@ int main()
}
break;
}
notify_completion(result);
MBED_HOSTTEST_RESULT(result);
}

View File

@ -55,16 +55,14 @@ SDFileSystem sd(SDMOSI, SDMISO, SDSCLK, SDSSEL, "sd");
SDFileSystem sd(p11, p12, p13, p14, "sd");
#endif
namespace
{
namespace {
char buffer[1024];
const int KIB_RW = 128;
Timer timer;
const char *bin_filename = "/sd/testfile.bin";
}
bool test_sf_file_write_stdio(const char *filename, const int kib_rw)
{
bool test_sf_file_write_stdio(const char *filename, const int kib_rw) {
bool result = true;
FILE* file = fopen(filename, "w");
if (file != NULL) {
@ -94,8 +92,7 @@ bool test_sf_file_write_stdio(const char *filename, const int kib_rw)
return result;
}
bool test_sf_file_read_stdio(const char *filename, const int kib_rw)
{
bool test_sf_file_read_stdio(const char *filename, const int kib_rw) {
bool result = true;
FILE* file = fopen(filename, "r");
if (file) {
@ -118,13 +115,16 @@ bool test_sf_file_read_stdio(const char *filename, const int kib_rw)
return result;
}
char RandomChar()
{
char RandomChar() {
return rand() % 100;
}
int main()
{
int main() {
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(SD stdio RW Speed);
MBED_HOSTTEST_START("PERF_1");
// Test header
printf("\r\n");
printf("SD Card Stdio Performance Test\r\n");
@ -151,5 +151,5 @@ int main()
}
break;
}
notify_completion(result);
MBED_HOSTTEST_RESULT(result);
}

View File

@ -6,6 +6,10 @@
#define MAC_VENDOR_ARM_2 0xF7
int main() {
MBED_HOSTTEST_TIMEOUT(10);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(Semihost);
MBED_HOSTTEST_START("MBED_22");
printf("Semihost connected: %s\n", (semihost_connected()) ? ("Yes") : ("No"));
@ -30,6 +34,5 @@ int main() {
printf("MAC Address Prefix: 00:02:F7, Vendor: ARM\r\n");
}
notify_completion(result);
return 0;
MBED_HOSTTEST_RESULT(result);
}

View File

@ -7,19 +7,23 @@
*/
int main() {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(stdio_auto);
MBED_HOSTTEST_DESCRIPTION(stdio);
MBED_HOSTTEST_START("MBED_2");
DigitalOut led1(LED1);
DigitalOut led2(LED2);
union {
int value_int;
};
notify_start();
notify_start(); // Just to sync with host test supervisor
const char* PRINT_PATTERN = "MBED: Your value was: %d\r\n";
while (true)
{
while (true) {
// SCANF PRINTF family
value_int = 0;
led1 = 1;

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
void print_char(char c = '*')
{
@ -32,6 +33,11 @@ void flip_2() {
}
int main() {
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(wait_us_auto);
MBED_HOSTTEST_DESCRIPTION(Ticker Int);
MBED_HOSTTEST_START("MBED_11");
led1 = 0;
led2 = 0;
flipper_1.attach(&flip_1, 1.0); // the address of the function to be attached (flip) and the interval (1 second)

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
Ticker tick;
DigitalOut led(LED1);
@ -26,6 +27,12 @@ void togglePin(void)
int main()
{
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(wait_us_auto);
MBED_HOSTTEST_DESCRIPTION(Ticker Int us);
MBED_HOSTTEST_START("MBED_23");
tick.attach_us(togglePin, 1000);
while (1);
}

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
void ticker_callback_1(void);
void ticker_callback_2(void);
@ -31,6 +32,12 @@ void ticker_callback_1(void)
int main(void)
{
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(wait_us_auto);
MBED_HOSTTEST_DESCRIPTION(Ticker Two callbacks);
MBED_HOSTTEST_START("MBED_34");
ticker.attach(ticker_callback_1, 1.0);
while(1);
}

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
DigitalOut led(LED1);
@ -14,6 +15,11 @@ void print_char(char c = '*')
int main()
{
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(wait_us_auto);
MBED_HOSTTEST_DESCRIPTION(Time us);
MBED_HOSTTEST_START("MBED_25");
while (true) {
for (int i = 0; i < MS_INTERVALS; i++) {
wait_us(1000);

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
Timeout timer;
DigitalOut led(LED1);
@ -7,16 +8,14 @@ namespace {
const int MS_INTERVALS = 1000;
}
void print_char(char c = '*')
{
void print_char(char c = '*') {
printf("%c", c);
fflush(stdout);
}
void toggleOff(void);
void toggleOn(void)
{
void toggleOn(void) {
static int toggle_counter = 0;
if (toggle_counter == MS_INTERVALS) {
led = !led;
@ -27,13 +26,17 @@ void toggleOn(void)
timer.attach_us(toggleOff, 500);
}
void toggleOff(void)
{
void toggleOff(void) {
timer.attach_us(toggleOn, 500);
}
int main()
{
int main() {
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(wait_us_auto);
MBED_HOSTTEST_DESCRIPTION(Timeout Int us);
MBED_HOSTTEST_START("MBED_24");
toggleOn();
while (1);
}

View File

@ -46,14 +46,17 @@ static bool test_once() {
}
int main() {
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(Interrupt vector relocation);
MBED_HOSTTEST_START("MBED_A18");
// First test, no table reallocation
{
printf("Starting first test (interrupts not relocated).\r\n");
bool ret = test_once();
if (ret == false) {
notify_completion(false);
return 1;
MBED_HOSTTEST_RESULT(false);
}
}
@ -65,11 +68,9 @@ int main() {
bool ret = test_once();
if (ret == false) {
notify_completion(false);
return 1;
MBED_HOSTTEST_RESULT(false);
}
}
notify_completion(true);
return 0;
MBED_HOSTTEST_RESULT(true);
}

View File

@ -1,8 +1,8 @@
#include "mbed.h"
#include "test_env.h"
#include "EthernetInterface.h"
struct s_ip_address
{
struct s_ip_address {
int ip_1;
int ip_2;
int ip_3;
@ -10,6 +10,11 @@ struct s_ip_address
};
int main() {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(tcpecho_client_auto);
MBED_HOSTTEST_DESCRIPTION(TCP echo client);
MBED_HOSTTEST_START("NET_4");
char buffer[256] = {0};
char out_buffer[] = "Hello World\n";
char out_success[] = "{{success}}\n{{end}}\n";
@ -17,20 +22,20 @@ int main() {
s_ip_address ip_addr = {0, 0, 0, 0};
int port = 0;
printf("TCPCllient waiting for server IP and port...\r\n");
printf("TCPCllient waiting for server IP and port..." NL);
scanf("%d.%d.%d.%d:%d", &ip_addr.ip_1, &ip_addr.ip_2, &ip_addr.ip_3, &ip_addr.ip_4, &port);
printf("Address received:%d.%d.%d.%d:%d\r\n", ip_addr.ip_1, ip_addr.ip_2, ip_addr.ip_3, ip_addr.ip_4, port);
printf("Address received:%d.%d.%d.%d:%d" NL, ip_addr.ip_1, ip_addr.ip_2, ip_addr.ip_3, ip_addr.ip_4, port);
EthernetInterface eth;
eth.init(); //Use DHCP
eth.connect();
printf("TCPClient IP Address is %s\r\n", eth.getIPAddress());
printf("TCPClient IP Address is %s" NL, eth.getIPAddress());
sprintf(buffer, "%d.%d.%d.%d", ip_addr.ip_1, ip_addr.ip_2, ip_addr.ip_3, ip_addr.ip_4);
TCPSocketConnection socket;
while (socket.connect(buffer, port) < 0) {
printf("TCPCllient unable to connect to %s:%d\r\n", buffer, port);
printf("TCPCllient unable to connect to %s:%d" NL, buffer, port);
wait(1);
}

View File

@ -21,8 +21,12 @@ char char_rand() {
return (rand() % ASCII_MAX) + ' ';
}
int main() {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(tcpecho_client_auto);
MBED_HOSTTEST_DESCRIPTION(TCP client echo loop);
MBED_HOSTTEST_START("NET_13");
char buffer[BUFFER_SIZE] = {0};
char out_buffer[BUFFER_SIZE] = {0};
s_ip_address ip_addr = {0, 0, 0, 0};
@ -70,6 +74,5 @@ int main() {
}
socket.close();
eth.disconnect();
notify_completion(result);
return 0;
MBED_HOSTTEST_RESULT(result);
}

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
#include "EthernetInterface.h"
namespace {
@ -7,22 +8,27 @@ namespace {
}
int main (void) {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(tcpecho_server_auto);
MBED_HOSTTEST_DESCRIPTION(TCP echo server);
MBED_HOSTTEST_START("NET_3");
char buffer[BUFFER_SIZE] = {0};
EthernetInterface eth;
eth.init(); //Use DHCP
eth.connect();
printf("MBED: Server IP Address is %s:%d\r\n", eth.getIPAddress(), ECHO_SERVER_PORT);
printf("MBED: Server IP Address is %s:%d" NL, eth.getIPAddress(), ECHO_SERVER_PORT);
TCPSocketServer server;
server.bind(ECHO_SERVER_PORT);
server.listen();
while (true) {
printf("MBED: Wait for new connection...\n");
printf("MBED: Wait for new connection..." NL);
TCPSocketConnection client;
server.accept(client);
client.set_blocking(false, 1500); // Timeout after (1.5)s
printf("MBED: Connection from: %s\r\n", client.get_address());
printf("MBED: Connection from: %s" NL, client.get_address());
while (true) {
const int n = client.receive(buffer, sizeof(buffer));

View File

@ -11,8 +11,7 @@ namespace {
const int MAX_ECHO_LOOPS = 100;
const char ASCII_MAX = '~' - ' ';
struct s_ip_address
{
struct s_ip_address {
int ip_1;
int ip_2;
int ip_3;
@ -24,8 +23,12 @@ char char_rand() {
return (rand() % ASCII_MAX) + ' ';
}
int main() {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(udpecho_client_auto);
MBED_HOSTTEST_DESCRIPTION(UDP echo client);
MBED_HOSTTEST_START("NET_6");
char buffer[BUFFER_SIZE] = {0};
char out_buffer[BUFFER_SIZE] = {0};
s_ip_address ip_addr = {0, 0, 0, 0};
@ -75,6 +78,5 @@ int main() {
socket.close();
eth.disconnect();
notify_completion(result);
return 0;
MBED_HOSTTEST_RESULT(result);
}

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
#include "EthernetInterface.h"
namespace {
@ -7,6 +8,11 @@ namespace {
}
int main (void) {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(udpecho_server_auto);
MBED_HOSTTEST_DESCRIPTION(UDP echo server);
MBED_HOSTTEST_START("NET_5");
EthernetInterface eth;
eth.init(); //Use DHCP
eth.connect();

View File

@ -24,6 +24,11 @@ bool find_substring(const char *first, const char *last, const char *s_first, co
}
int main() {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(TCP client hello world);
MBED_HOSTTEST_START("NET_1");
bool result = false;
EthernetInterface eth;
eth.init(); //Use DHCP
@ -76,6 +81,5 @@ int main() {
sock.close();
eth.disconnect();
notify_completion(result);
return 0;
MBED_HOSTTEST_RESULT(result);
}

View File

@ -10,6 +10,11 @@ namespace {
int main() {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(NIST Internet Time Service);
MBED_HOSTTEST_START("NET_2");
bool result = false;
EthernetInterface eth;
eth.init(); //Use DHCP
@ -34,9 +39,11 @@ int main() {
if (n > 0) {
result = true;
const unsigned int timeRes = ntohl(in_buffer_uint);
const float years = timeRes / 60.0 / 60.0 / 24.0 / 365;
const float years = timeRes / 60.0 / 60.0 / 24.0 / 365.0;
const float days = timeRes / 24.0 / 60.0 / 60.0;
printf("UDP: Received %d bytes from server %s on port %d\r\n", n, nist.get_address(), nist.get_port());
printf("UDP: %u seconds since 01/01/1900 00:00 GMT ... %s\r\n", timeRes, timeRes > 0 ? "[OK]" : "[FAIL]");
printf("UDP: %.2f days since 01/01/1900 00:00 GMT ... %s\r\n", days, timeRes > 0 ? "[OK]" : "[FAIL]");
printf("UDP: %.2f years since 01/01/1900 00:00 GMT ... %s\r\n", years, timeRes > YEARS_TO_PASS ? "[OK]" : "[FAIL]");
if (years < YEARS_TO_PASS) {
@ -45,6 +52,5 @@ int main() {
}
sock.close();
eth.disconnect();
notify_completion(result);
return 0;
MBED_HOSTTEST_RESULT(result);
}

View File

@ -8,8 +8,12 @@ namespace {
const int BUFFER_SIZE = 512;
}
int main()
{
int main() {
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(HTTP client hello world);
MBED_HOSTTEST_START("NET_7");
char http_request_buffer[BUFFER_SIZE + 1] = {0};
HTTPClient http;
EthernetInterface eth;
@ -31,8 +35,7 @@ int main()
if (result == false) {
eth.disconnect();
notify_completion(false);
exit(ret);
MBED_HOSTTEST_RESULT(false);
}
}
@ -56,11 +59,9 @@ int main()
if (result == false) {
eth.disconnect();
notify_completion(false);
exit(ret);
MBED_HOSTTEST_RESULT(false);
}
}
eth.disconnect();
notify_completion(true);
return 0;
MBED_HOSTTEST_RESULT(true);
}

View File

@ -3,8 +3,12 @@
#include "EthernetInterface.h"
#include "NTPClient.h"
int main()
{
int main() {
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(NTP client);
MBED_HOSTTEST_START("NET_8");
EthernetInterface eth;
NTPClient ntp;
eth.init(); //Use DHCP
@ -27,11 +31,9 @@ int main()
}
if (result == false) {
notify_completion(false);
exit(ret);
MBED_HOSTTEST_RESULT(false);
}
}
eth.disconnect();
notify_completion(true);
return 0;
MBED_HOSTTEST_RESULT(true);
}

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
#include "rtos.h"
/*
@ -12,8 +13,7 @@
#define STACK_SIZE DEFAULT_STACK_SIZE
#endif
void print_char(char c = '*')
{
void print_char(char c = '*') {
printf("%c", c);
fflush(stdout);
}
@ -30,6 +30,11 @@ void led2_thread(void const *argument) {
}
int main() {
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(wait_us_auto);
MBED_HOSTTEST_DESCRIPTION(Basic thread);
MBED_HOSTTEST_START("RTOS_1");
Thread thread(led2_thread, NULL, osPriorityNormal, STACK_SIZE);
while (true) {

View File

@ -5,11 +5,17 @@
DigitalOut led2(LED2);
#define SIZE 120
#define SIZE 100
namespace {
// Allocate data buffers
uint8_t data_written[SIZE] = { 0 };
uint8_t data_read[SIZE] = { 0 };
}
void sd_thread(void const *argument)
{
const char *FILE_NAME = "/sd/out.txt";
const char *FILE_NAME = "/sd/rtos9_test.txt";
#if defined(TARGET_KL25Z)
SDFileSystem sd(PTD2, PTD3, PTD1, PTD0, "sd");
@ -27,62 +33,64 @@ void sd_thread(void const *argument)
SDFileSystem sd(p11, p12, p13, p14, "sd");
#endif
// Allocate data buffers
uint8_t data_written[SIZE] = { 0 };
uint8_t data_read[SIZE] = { 0 };
{
// fill data_written buffer with random data
FILE *f = fopen(FILE_NAME, "w");
FILE *f = fopen(FILE_NAME, "w+");
if (f) {
// write these data into the file
printf("Writing %d bytes to file:\r\n", SIZE);
printf("Writing %d bytes to file:" NL, SIZE);
for (int i = 0; i < SIZE; i++) {
data_written[i] = rand() % 0xff;
fprintf(f, "%c", data_written[i]);
printf("%02X ", data_written[i]);
if (i && ((i % 20) == 19))
printf("\r\n");
printf(NL);
}
fclose(f);
printf("MBED: Done" NL);
} else {
notify_completion(false);
return;
printf("MBED: Can't open '%s'" NL, FILE_NAME);
MBED_HOSTTEST_RESULT(false);
}
}
printf("\r\n\r\n");
printf(NL);
{
// read back the data from the file and store them in data_read
FILE *f = fopen(FILE_NAME, "r");
if (f) {
printf("Reading %d bytes from file:\r\n", SIZE);
printf("MBED: Reading %d bytes from file:" NL, SIZE);
for (int i = 0; i < SIZE; i++) {
data_read[i] = fgetc(f);
printf("%02X ", data_read[i]);
if (i && ((i % 20) == 19))
printf("\r\n");
printf(NL);
}
fclose(f);
printf("MBED: Done\r\n");
} else {
notify_completion(false);
return;
printf("MBED: Can't open '%s'" NL, FILE_NAME);
MBED_HOSTTEST_RESULT(false);
}
}
printf("\r\nDone.\r\n");
// check that the data written == data read
for (int i = 0; i < SIZE; i++) {
if (data_written[i] != data_read[i]) {
notify_completion(false);
return;
printf("MBED: Data index=%d: w[0x%02X] != r[0x%02X]" NL, i, data_written[i], data_read[i]);
MBED_HOSTTEST_RESULT(false);
}
}
notify_completion(true);
MBED_HOSTTEST_RESULT(true);
}
int main()
{
int main() {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(SD File write read);
MBED_HOSTTEST_START("RTOS_9");
Thread t(sd_thread, NULL, osPriorityNormal, (DEFAULT_STACK_SIZE * 2.25));
while (true) {

View File

@ -36,6 +36,11 @@ void queue_thread(void const *argument) {
}
int main (void) {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(ISR (Queue));
MBED_HOSTTEST_START("RTOS_8");
Thread thread(queue_thread, NULL, osPriorityNormal, STACK_SIZE);
Ticker ticker;
ticker.attach(queue_isr, 1.0);
@ -59,6 +64,6 @@ int main (void) {
}
}
notify_completion(result);
MBED_HOSTTEST_RESULT(result);
return 0;
}

View File

@ -40,6 +40,11 @@ void send_thread (void const *argument) {
}
int main (void) {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(Mail messaging);
MBED_HOSTTEST_START("RTOS_6");
Thread thread(send_thread, NULL, osPriorityNormal, STACK_SIZE);
bool result = true;
int result_counter = 0;
@ -65,6 +70,6 @@ int main (void) {
}
}
}
notify_completion(result);
MBED_HOSTTEST_RESULT(result);
return 0;
}

View File

@ -16,8 +16,7 @@
#define STACK_SIZE DEFAULT_STACK_SIZE
#endif
void print_char(char c = '*')
{
void print_char(char c = '*') {
printf("%c", c);
fflush(stdout);
}
@ -60,6 +59,11 @@ void test_thread(void const *args) {
}
int main() {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default);
MBED_HOSTTEST_DESCRIPTION(Mutex resource lock);
MBED_HOSTTEST_START("RTOS_2");
const int t1_delay = THREAD_DELAY * 1;
const int t2_delay = THREAD_DELAY * 2;
const int t3_delay = THREAD_DELAY * 3;
@ -78,6 +82,6 @@ int main() {
}
fflush(stdout);
notify_completion(!mutex_defect);
MBED_HOSTTEST_RESULT(!mutex_defect);
return 0;
}

View File

@ -42,6 +42,11 @@ void send_thread (void const *argument) {
}
int main (void) {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(Queue messaging);
MBED_HOSTTEST_START("RTOS_5");
Thread thread(send_thread, NULL, osPriorityNormal, STACK_SIZE);
bool result = true;
int result_counter = 0;
@ -67,6 +72,6 @@ int main (void) {
}
}
}
notify_completion(result);
MBED_HOSTTEST_RESULT(result);
return 0;
}

View File

@ -17,8 +17,7 @@
#define STACK_SIZE DEFAULT_STACK_SIZE
#endif
void print_char(char c = '*')
{
void print_char(char c = '*') {
printf("%c", c);
fflush(stdout);
}
@ -49,6 +48,11 @@ void test_thread(void const *delay) {
}
int main (void) {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(Semaphore resource lock);
MBED_HOSTTEST_START("RTOS_3");
const int t1_delay = THREAD_DELAY * 1;
const int t2_delay = THREAD_DELAY * 2;
const int t3_delay = THREAD_DELAY * 3;
@ -66,6 +70,6 @@ int main (void) {
}
fflush(stdout);
notify_completion(!sem_defect);
MBED_HOSTTEST_RESULT(!sem_defect);
return 0;
}

View File

@ -30,6 +30,11 @@ void led_thread(void const *argument) {
}
int main (void) {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(Signals messaging);
MBED_HOSTTEST_START("RTOS_4");
Thread thread(led_thread, NULL, osPriorityNormal, STACK_SIZE);
bool result = true;
@ -41,6 +46,6 @@ int main (void) {
break;
}
}
notify_completion(result);
MBED_HOSTTEST_RESULT(result);
return 0;
}

View File

@ -1,4 +1,5 @@
#include "mbed.h"
#include "test_env.h"
#include "rtos.h"
DigitalOut LEDs[4] = {
@ -22,6 +23,11 @@ void blink(void const *n) {
}
int main(void) {
MBED_HOSTTEST_TIMEOUT(15);
MBED_HOSTTEST_SELECT(wait_us_auto);
MBED_HOSTTEST_DESCRIPTION(Timer);
MBED_HOSTTEST_START("RTOS_7");
RtosTimer led_1_timer(blink, osTimerPeriodic, (void *)0);
RtosTimer led_2_timer(blink, osTimerPeriodic, (void *)1);
RtosTimer led_3_timer(blink, osTimerPeriodic, (void *)2);

View File

@ -10,8 +10,12 @@ It is declared in \cpputest\src\Platforms\armcc\UtestPlatform.cpp
*/
Serial mbed_cpputest_console(STDIO_UART_TX, STDIO_UART_RX);
int main(int ac, char** av)
{
int main(int ac, char** av) {
MBED_HOSTTEST_TIMEOUT(20);
MBED_HOSTTEST_SELECT(default_auto);
MBED_HOSTTEST_DESCRIPTION(Unit test);
MBED_HOSTTEST_START("UT");
unsigned failureCount = 0;
{
// Some compilers may not pass ac, av so we need to supply them ourselves
@ -20,6 +24,6 @@ int main(int ac, char** av)
failureCount = CommandLineTestRunner::RunAllTests(ac, av);
}
notify_completion(failureCount == 0);
MBED_HOSTTEST_RESULT(failureCount == 0);
return failureCount;
}

View File

@ -3,11 +3,11 @@ This module defines the attributes of the
PyPI package for the Mbed SDK
"""
from distutils.core import setup
from setuptools import find_packages
from shutil import copyfileobj
from os.path import isfile, join
from tempfile import TemporaryFile
from shutil import copyfileobj
from setuptools import find_packages
from distutils.core import setup
LICENSE = open('LICENSE').read()
DESCRIPTION = """A set of Python scripts that can be used to compile programs written on top of the `mbed framework`_. It can also be used to export mbed projects to other build systems and IDEs (uVision, IAR, makefiles).
@ -40,7 +40,7 @@ setup(name='mbed-tools',
url='https://github.com/mbedmicro/mbed',
packages=find_packages(),
license=LICENSE,
install_requires=["PrettyTable>=0.7.2", "PySerial>=2.7", "IntelHex>=1.3"])
install_requires=["PrettyTable>=0.7.2", "PySerial>=2.7", "IntelHex>=1.3", "colorama>=0.3.3"])
# Restore previous private_settings if needed
if backup:

View File

@ -17,6 +17,8 @@ limitations under the License.
import re
import tempfile
import colorama
from types import ListType
from shutil import rmtree

View File

@ -13,4 +13,47 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
"""
from host_registry import HostRegistry
# Host test supervisors
from echo import EchoTest
from rtc_auto import RTCTest
from stdio_auto import StdioTest
from hello_auto import HelloTest
from detect_auto import DetectPlatformTest
from default_auto import DefaultAuto
from dev_null_auto import DevNullTest
from wait_us_auto import WaitusTest
from tcpecho_server_auto import TCPEchoServerTest
from udpecho_server_auto import UDPEchoServerTest
from tcpecho_client_auto import TCPEchoClientTest
from udpecho_client_auto import UDPEchoClientTest
# Populate registry with supervising objects
HOSTREGISTRY = HostRegistry()
HOSTREGISTRY.register_host_test("echo", EchoTest())
HOSTREGISTRY.register_host_test("default", DefaultAuto())
HOSTREGISTRY.register_host_test("rtc_auto", RTCTest())
HOSTREGISTRY.register_host_test("hello_auto", HelloTest())
HOSTREGISTRY.register_host_test("stdio_auto", StdioTest())
HOSTREGISTRY.register_host_test("detect_auto", DetectPlatformTest())
HOSTREGISTRY.register_host_test("default_auto", DefaultAuto())
HOSTREGISTRY.register_host_test("wait_us_auto", WaitusTest())
HOSTREGISTRY.register_host_test("dev_null_auto", DevNullTest())
HOSTREGISTRY.register_host_test("tcpecho_server_auto", TCPEchoServerTest())
HOSTREGISTRY.register_host_test("udpecho_server_auto", UDPEchoServerTest())
HOSTREGISTRY.register_host_test("tcpecho_client_auto", TCPEchoClientTest())
HOSTREGISTRY.register_host_test("udpecho_client_auto", UDPEchoClientTest())
###############################################################################
# Functional interface for test supervisor registry
###############################################################################
def get_host_test(ht_name):
return HOSTREGISTRY.get_host_test(ht_name)
def is_host_test(ht_name):
return HOSTREGISTRY.is_host_test(ht_name)

View File

@ -0,0 +1,36 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from sys import stdout
class DefaultAuto():
""" Simple, basic host test's test runner waiting for serial port
output from MUT, no supervision over test running in MUT is executed.
"""
def test(self, selftest):
result = selftest.RESULT_SUCCESS
try:
while True:
c = selftest.mbed.serial_read(512)
if c is None:
return selftest.RESULT_IO_SERIAL
stdout.write(c)
stdout.flush()
except KeyboardInterrupt, _:
selftest.notify("\r\n[CTRL+C] exit")
result = selftest.RESULT_ERROR
return result

View File

@ -16,44 +16,40 @@ limitations under the License.
"""
import re
from host_test import DefaultTest
class DetectPlatformTest(DefaultTest):
class DetectPlatformTest():
PATTERN_MICRO_NAME = "Target '(\w+)'"
re_detect_micro_name = re.compile(PATTERN_MICRO_NAME)
def test(self):
def test(self, selftest):
result = True
c = self.mbed.serial_readline() # {{start}} preamble
c = selftest.mbed.serial_readline() # {{start}} preamble
if c is None:
return self.RESULT_IO_SERIAL
return selftest.RESULT_IO_SERIAL
self.notify(c.strip())
self.notify("HOST: Detecting target name...")
selftest.notify(c.strip())
selftest.notify("HOST: Detecting target name...")
c = self.mbed.serial_readline()
c = selftest.mbed.serial_readline()
if c is None:
return self.RESULT_IO_SERIAL
self.notify(c.strip())
return selftest.RESULT_IO_SERIAL
selftest.notify(c.strip())
# Check for target name
m = self.re_detect_micro_name.search(c)
if m and len(m.groups()):
micro_name = m.groups()[0]
micro_cmp = self.mbed.options.micro == micro_name
micro_cmp = selftest.mbed.options.micro == micro_name
result = result and micro_cmp
self.notify("HOST: MUT Target name '%s', expected '%s'... [%s]"% (micro_name, self.mbed.options.micro, "OK" if micro_cmp else "FAIL"))
selftest.notify("HOST: MUT Target name '%s', expected '%s'... [%s]"% (micro_name,
selftest.mbed.options.micro,
"OK" if micro_cmp else "FAIL"))
for i in range(0, 2):
c = self.mbed.serial_readline()
c = selftest.mbed.serial_readline()
if c is None:
return self.RESULT_IO_SERIAL
self.notify(c.strip())
return selftest.RESULT_IO_SERIAL
selftest.notify(c.strip())
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':
DetectPlatformTest().run()
return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE

View File

@ -15,25 +15,22 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
from host_test import DefaultTest
class DevNullTest():
class DevNullTest(DefaultTest):
def check_readline(self, text):
def check_readline(self, selftest, text):
""" Reads line from serial port and checks if text was part of read string
"""
result = False
c = self.mbed.serial_readline()
c = selftest.mbed.serial_readline()
if c and text in c:
result = True
return result
def test(self):
def test(self, selftest):
result = True
# Test should print some text and later stop printing
# 'MBED: re-routing stdout to /null'
res = self.check_readline("re-routing stdout to /null")
res = self.check_readline(selftest, "re-routing stdout to /null")
if not res:
# We haven't read preamble line
result = False
@ -41,17 +38,13 @@ class DevNullTest(DefaultTest):
# Check if there are printed characters
str = ''
for i in range(3):
c = self.mbed.serial_read(32)
c = selftest.mbed.serial_read(32)
if c is None:
return self.RESULT_IO_SERIAL
return selftest.RESULT_IO_SERIAL
else:
str += c
if len(str) > 0:
result = False
break
self.notify("Received %d bytes: %s"% (len(str), str))
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':
DevNullTest().run()
selftest.notify("Received %d bytes: %s"% (len(str), str))
return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE

View File

@ -18,50 +18,35 @@ limitations under the License.
import sys
import uuid
from sys import stdout
from host_test import HostTestResults, Test
class EchoTest():
class EchoTest(Test):
""" This host test will use mbed serial port with
baudrate 115200 to perform echo test on that port.
"""
# Test parameters
TEST_SERIAL_BAUDRATE = 115200
TEST_LOOP_COUNT = 50
def __init__(self):
# Constructors
HostTestResults.__init__(self)
Test.__init__(self)
# Test parameters
self.TEST_SERIAL_BAUDRATE = 115200
self.TEST_LOOP_COUNT = 50
# Custom initialization for echo test
self.mbed.init_serial_params(serial_baud=self.TEST_SERIAL_BAUDRATE)
def test(self):
""" Test function, return True or False to get standard test notification on stdout
def test(self, selftest):
""" This host test will use mbed serial port with
baudrate 115200 to perform echo test on that port.
"""
c = self.mbed.serial_readline() # '{{start}}'
if c is None:
return self.RESULT_IO_SERIAL
# Custom initialization for echo test
selftest.mbed.init_serial_params(serial_baud=self.TEST_SERIAL_BAUDRATE)
selftest.mbed.init_serial()
self.mbed.flush()
self.notify("HOST: Starting the ECHO test")
# Test function, return True or False to get standard test notification on stdout
selftest.mbed.flush()
selftest.notify("HOST: Starting the ECHO test")
result = True
for i in range(0, self.TEST_LOOP_COUNT):
TEST_STRING = str(uuid.uuid4()) + "\n"
self.mbed.serial_write(TEST_STRING)
c = self.mbed.serial_readline()
selftest.mbed.serial_write(TEST_STRING)
c = selftest.mbed.serial_readline()
if c is None:
return self.RESULT_IO_SERIAL
return selftest.RESULT_IO_SERIAL
if c.strip() != TEST_STRING.strip():
self.notify('HOST: "%s" != "%s"'% (c, TEST_STRING))
selftest.notify('HOST: "%s" != "%s"'% (c, TEST_STRING))
result = False
else:
sys.stdout.write('.')
stdout.flush()
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':
EchoTest().run()
return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE

View File

@ -15,23 +15,15 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
from host_test import DefaultTest
class HelloTest(DefaultTest):
class HelloTest():
HELLO_WORLD = "Hello World"
def test(self):
c = self.mbed.serial_readline()
def test(self, selftest):
c = selftest.mbed.serial_readline()
if c is None:
return self.RESULT_IO_SERIAL
self.notify(c.strip())
c = self.mbed.serial_readline()
if c is None:
return self.RESULT_IO_SERIAL
self.notify("Read %d bytes:"% len(c))
self.notify(c.strip())
return selftest.RESULT_IO_SERIAL
selftest.notify("Read %d bytes:"% len(c))
selftest.notify(c.strip())
result = True
# Because we can have targetID here let's try to decode
@ -39,8 +31,4 @@ class HelloTest(DefaultTest):
result = False
else:
result = self.HELLO_WORLD in c
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':
HelloTest().run()
return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE

View File

@ -0,0 +1,36 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
class HostRegistry:
""" Class stores registry with host tests and objects representing them
"""
HOST_TESTS = {} # host_test_name -> host_test_ojbect
def register_host_test(self, ht_name, ht_object):
if ht_name not in self.HOST_TESTS:
self.HOST_TESTS[ht_name] = ht_object
def unregister_host_test(self):
if ht_name in HOST_TESTS:
self.HOST_TESTS[ht_name] = None
def get_host_test(self, ht_name):
return self.HOST_TESTS[ht_name] if ht_name in self.HOST_TESTS else None
def is_host_test(self, ht_name):
return ht_name in self.HOST_TESTS

View File

@ -23,6 +23,8 @@ except ImportError, e:
exit(-1)
import os
import re
import types
from sys import stdout
from time import sleep, time
from optparse import OptionParser
@ -124,6 +126,10 @@ class Mbed:
serial_baud = serial_baud if serial_baud is not None else self.serial_baud
serial_timeout = serial_timeout if serial_timeout is not None else self.serial_timeout
if self.serial:
self.serial.close()
self.serial = None
result = True
try:
self.serial = Serial(self.port, baudrate=serial_baud, timeout=serial_timeout)
@ -252,14 +258,50 @@ class HostTestResults:
self.RESULT_NO_IMAGE = 'no_image'
self.RESULT_IOERR_COPY = "ioerr_copy"
self.RESULT_PASSIVE = "passive"
self.RESULT_NOT_DETECTED = "not_detected"
import workspace_tools.host_tests as host_tests
class Test(HostTestResults):
""" Base class for host test's test runner
"""
# Select default host_test supervision (replaced after autodetection)
test_supervisor = host_tests.get_host_test("default")
def __init__(self):
self.mbed = Mbed()
def detect_test_config(self, verbose=False):
""" Detects test case configuration
"""
result = {}
while True:
line = self.mbed.serial_readline()
if "{start}" in line:
self.notify("HOST: Start test...")
break
else:
# Detect if this is property from TEST_ENV print
m = re.search('{([\w_]+);([\w\d\+ ]+)}}', line[:-1])
if m and len(m.groups()) == 2:
# This is most likely auto-detection property
result[m.group(1)] = m.group(2)
if verbose:
self.notify("HOST: Property '%s' = '%s'"% (m.group(1), m.group(2)))
else:
# We can check if this is TArget Id in mbed specific format
m2 = re.search('^([\$]+)([a-fA-F0-9]+)', line[:-1])
if m2 and len(m2.groups()) == 2:
if verbose:
target_id = m2.group(1) + m2.group(2)
self.notify("HOST: TargetID '%s'"% target_id)
self.notify(line[len(target_id):-1])
else:
self.notify("HOST: Unknown property: %s"% line.strip())
return result
def run(self):
""" Test runner for host test. This function will start executing
test and forward test result via serial port to test suite
@ -284,7 +326,13 @@ class Test(HostTestResults):
# Run test
try:
result = self.test()
CONFIG = self.detect_test_config(verbose=True) # print CONFIG
if "host_test_name" in CONFIG:
if host_tests.is_host_test(CONFIG["host_test_name"]):
self.test_supervisor = host_tests.get_host_test(CONFIG["host_test_name"])
result = self.test_supervisor.test(self) #result = self.test()
if result is not None:
self.print_result(result)
else:
@ -312,35 +360,15 @@ class Test(HostTestResults):
def print_result(self, result):
""" Test result unified printing function
"""
self.notify("\n{{%s}}\n{{end}}" % result)
self.notify("\r\n{{%s}}\r\n{{end}}" % result)
class DefaultTest(Test):
class DefaultTestSelector(Test):
""" Test class with serial port initialization
"""
def __init__(self):
HostTestResults.__init__(self)
Test.__init__(self)
class Simple(DefaultTest):
""" Simple, basic host test's test runner waiting for serial port
output from MUT, no supervision over test running in MUT is executed.
"""
def test(self):
result = self.RESULT_SUCCESS
try:
while True:
c = self.mbed.serial_read(512)
if c is None:
return self.RESULT_IO_SERIAL
stdout.write(c)
stdout.flush()
except KeyboardInterrupt, _:
self.notify("\r\n[CTRL+C] exit")
result = self.RESULT_ERROR
return result
if __name__ == '__main__':
Simple().run()
DefaultTestSelector().run()

View File

@ -16,24 +16,22 @@ limitations under the License.
"""
import re
from host_test import DefaultTest
from time import time, strftime, gmtime
class RTCTest(DefaultTest):
class RTCTest():
PATTERN_RTC_VALUE = "\[(\d+)\] \[(\d+-\d+-\d+ \d+:\d+:\d+ [AaPpMm]{2})\]"
re_detect_rtc_value = re.compile(PATTERN_RTC_VALUE)
def test(self):
def test(self, selftest):
test_result = True
start = time()
sec_prev = 0
for i in range(0, 5):
# Timeout changed from default: we need to wait longer for some boards to start-up
c = self.mbed.serial_readline(timeout=10)
c = selftest.mbed.serial_readline(timeout=10)
if c is None:
return self.RESULT_IO_SERIAL
self.notify(c.strip())
return selftest.RESULT_IO_SERIAL
selftest.notify(c.strip())
delta = time() - start
m = self.re_detect_rtc_value.search(c)
if m and len(m.groups()):
@ -42,14 +40,10 @@ class RTCTest(DefaultTest):
correct_time_str = strftime("%Y-%m-%d %H:%M:%S %p", gmtime(float(sec)))
test_result = test_result and (time_str == correct_time_str)
result_msg = "OK" if (time_str == correct_time_str and sec > 0 and sec > sec_prev) else "FAIL"
self.notify("HOST: [%s] [%s] received time %+d sec after %.2f sec... %s"% (sec, time_str, sec - sec_prev, delta, result_msg))
selftest.notify("HOST: [%s] [%s] received time %+d sec after %.2f sec... %s"% (sec, time_str, sec - sec_prev, delta, result_msg))
sec_prev = sec
else:
test_result = False
break
start = time()
return self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE
if __name__ == '__main__':
RTCTest().run()
return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE

View File

@ -18,32 +18,30 @@ limitations under the License.
import re
import random
from time import time
from host_test import DefaultTest
class StdioTest(DefaultTest):
class StdioTest():
PATTERN_INT_VALUE = "Your value was: (-?\d+)"
re_detect_int_value = re.compile(PATTERN_INT_VALUE)
def test(self):
def test(self, selftest):
test_result = True
c = self.mbed.serial_readline() # {{start}} preamble
c = selftest.mbed.serial_readline() # {{start}} preamble
if c is None:
return self.RESULT_IO_SERIAL
self.notify(c)
return selftest.RESULT_IO_SERIAL
selftest.notify(c)
for i in range(0, 10):
random_integer = random.randint(-99999, 99999)
self.notify("HOST: Generated number: " + str(random_integer))
selftest.notify("HOST: Generated number: " + str(random_integer))
start = time()
self.mbed.serial_write(str(random_integer) + "\n")
selftest.mbed.serial_write(str(random_integer) + "\n")
serial_stdio_msg = self.mbed.serial_readline()
serial_stdio_msg = selftest.mbed.serial_readline()
if serial_stdio_msg is None:
return self.RESULT_IO_SERIAL
return selftest.RESULT_IO_SERIAL
delay_time = time() - start
self.notify(serial_stdio_msg.strip())
selftest.notify(serial_stdio_msg.strip())
# Searching for reply with scanned values
m = self.re_detect_int_value.search(serial_stdio_msg)
@ -51,12 +49,8 @@ class StdioTest(DefaultTest):
int_value = m.groups()[0]
int_value_cmp = random_integer == int(int_value)
test_result = test_result and int_value_cmp
self.notify("HOST: Number %s read after %.3f sec ... [%s]"% (int_value, delay_time, "OK" if int_value_cmp else "FAIL"))
selftest.notify("HOST: Number %s read after %.3f sec ... [%s]"% (int_value, delay_time, "OK" if int_value_cmp else "FAIL"))
else:
test_result = False
break
return self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE
if __name__ == '__main__':
StdioTest().run()
return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE

View File

@ -18,47 +18,11 @@ limitations under the License.
import sys
import socket
from sys import stdout
from host_test import HostTestResults, Test
from SocketServer import BaseRequestHandler, TCPServer
SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
SERVER_PORT = 7
class TCPEchoClientTest(Test):
def __init__(self):
HostTestResults.__init__(self)
Test.__init__(self)
def send_server_ip_port(self, ip_address, port_no):
""" Set up network host. Reset target and and send server IP via serial to Mbed
"""
c = self.mbed.serial_readline() # 'TCPCllient waiting for server IP and port...'
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
self.notify(c.strip())
self.notify("HOST: Sending server IP Address to target...")
connection_str = ip_address + ":" + str(port_no) + "\n"
self.mbed.serial_write(connection_str)
self.notify(connection_str)
# Two more strings about connection should be sent by MBED
for i in range(0, 2):
c = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
self.notify(c.strip())
def test(self):
# Returning none will suppress host test from printing success code
return None
class TCPEchoClient_Handler(BaseRequestHandler):
def handle(self):
""" One handle per connection
@ -78,12 +42,33 @@ class TCPEchoClient_Handler(BaseRequestHandler):
count += 1
stdout.flush()
class TCPEchoClientTest():
def send_server_ip_port(self, selftest, ip_address, port_no):
""" Set up network host. Reset target and and send server IP via serial to Mbed
"""
c = selftest.mbed.serial_readline() # 'TCPCllient waiting for server IP and port...'
if c is None:
self.print_result(selftest.RESULT_IO_SERIAL)
return
server = TCPServer((SERVER_IP, SERVER_PORT), TCPEchoClient_Handler)
print "HOST: Listening for TCP connections: " + SERVER_IP + ":" + str(SERVER_PORT)
selftest.notify(c.strip())
selftest.notify("HOST: Sending server IP Address to target...")
mbed_test = TCPEchoClientTest();
mbed_test.run()
mbed_test.send_server_ip_port(SERVER_IP, SERVER_PORT)
connection_str = ip_address + ":" + str(port_no) + "\n"
selftest.mbed.serial_write(connection_str)
selftest.notify(connection_str)
server.serve_forever()
# Two more strings about connection should be sent by MBED
for i in range(0, 2):
c = selftest.mbed.serial_readline()
if c is None:
selftest.print_result(self.RESULT_IO_SERIAL)
return
selftest.notify(c.strip())
def test(self, selftest):
# Returning none will suppress host test from printing success code
server = TCPServer((SERVER_IP, SERVER_PORT), TCPEchoClient_Handler)
print "HOST: Listening for TCP connections: " + SERVER_IP + ":" + str(SERVER_PORT)
self.send_server_ip_port(selftest, SERVER_IP, SERVER_PORT)
server.serve_forever()

View File

@ -20,10 +20,8 @@ import sys
import uuid
import socket
from sys import stdout
from host_test import DefaultTest
class TCPEchoServerTest(DefaultTest):
class TCPEchoServerTest():
ECHO_SERVER_ADDRESS = ""
ECHO_PORT = 0
ECHO_LOOPs = 100
@ -32,18 +30,18 @@ class TCPEchoServerTest(DefaultTest):
PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
def test(self):
def test(self, selftest):
result = False
c = self.mbed.serial_readline()
c = selftest.mbed.serial_readline()
if c is None:
return self.RESULT_IO_SERIAL
self.notify(c)
return selftest.RESULT_IO_SERIAL
selftest.notify(c)
m = self.re_detect_server_ip.search(c)
if m and len(m.groups()):
self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
self.notify("HOST: TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
selftest.notify("HOST: TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
# We assume this test fails so can't send 'error' message to server
try:
@ -51,8 +49,8 @@ class TCPEchoServerTest(DefaultTest):
self.s.connect((self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
except Exception, e:
self.s = None
self.notify("HOST: Socket error: %s"% e)
return self.RESULT_ERROR
selftest.notify("HOST: Socket error: %s"% e)
return selftest.RESULT_ERROR
print 'HOST: Sending %d echo strings...'% self.ECHO_LOOPs,
for i in range(0, self.ECHO_LOOPs):
@ -62,8 +60,8 @@ class TCPEchoServerTest(DefaultTest):
data = self.s.recv(128)
except Exception, e:
self.s = None
self.notify("HOST: Socket error: %s"% e)
return self.RESULT_ERROR
selftest.notify("HOST: Socket error: %s"% e)
return selftest.RESULT_ERROR
received_str = repr(data)[1:-1]
if TEST_STRING == received_str: # We need to cut not needed single quotes from the string
@ -81,10 +79,6 @@ class TCPEchoServerTest(DefaultTest):
if self.s is not None:
self.s.close()
else:
self.notify("HOST: TCP Server not found")
selftest.notify("HOST: TCP Server not found")
result = False
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':
TCPEchoServerTest().run()
return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE

View File

@ -18,42 +18,11 @@ limitations under the License.
import sys
import socket
from sys import stdout
from host_test import HostTestResults, Test
from SocketServer import BaseRequestHandler, UDPServer
SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
SERVER_PORT = 7
class UDPEchoClientTest(Test):
def __init__(self):
HostTestResults.__init__(self)
Test.__init__(self)
def send_server_ip_port(self, ip_address, port_no):
c = self.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
self.notify(c.strip())
self.notify("HOST: Sending server IP Address to target...")
connection_str = ip_address + ":" + str(port_no) + "\n"
self.mbed.serial_write(connection_str)
c = self.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
self.notify(c.strip())
return self.RESULT_PASSIVE
def test(self):
# Returning none will suppress host test from printing success code
return None
class UDPEchoClient_Handler(BaseRequestHandler):
def handle(self):
""" One handle per connection
@ -67,12 +36,29 @@ class UDPEchoClient_Handler(BaseRequestHandler):
sys.stdout.write('.')
stdout.flush()
class UDPEchoClientTest():
server = UDPServer((SERVER_IP, SERVER_PORT), UDPEchoClient_Handler)
print "HOST: Listening for UDP connections..."
def send_server_ip_port(self, selftest, ip_address, port_no):
c = selftest.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
if c is None:
selftest.print_result(selftest.RESULT_IO_SERIAL)
return
selftest.notify(c.strip())
mbed_test = UDPEchoClientTest();
mbed_test.run()
mbed_test.send_server_ip_port(SERVER_IP, SERVER_PORT)
selftest.notify("HOST: Sending server IP Address to target...")
connection_str = ip_address + ":" + str(port_no) + "\n"
selftest.mbed.serial_write(connection_str)
server.serve_forever()
c = selftest.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
if c is None:
self.print_result(selftest.RESULT_IO_SERIAL)
return
selftest.notify(c.strip())
return selftest.RESULT_PASSIVE
def test(self, selftest):
# Returning none will suppress host test from printing success code
server = UDPServer((SERVER_IP, SERVER_PORT), UDPEchoClient_Handler)
print "HOST: Listening for UDP connections..."
self.send_server_ip_port(selftest, SERVER_IP, SERVER_PORT)
server.serve_forever()

View File

@ -19,11 +19,9 @@ import re
import sys
import uuid
from sys import stdout
from host_test import DefaultTest
from socket import socket, AF_INET, SOCK_DGRAM
class UDPEchoServerTest(DefaultTest):
class UDPEchoServerTest():
ECHO_SERVER_ADDRESS = ""
ECHO_PORT = 0
s = None # Socket
@ -31,26 +29,26 @@ class UDPEchoServerTest(DefaultTest):
PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
def test(self):
def test(self, selftest):
result = True
serial_ip_msg = self.mbed.serial_readline()
serial_ip_msg = selftest.mbed.serial_readline()
if serial_ip_msg is None:
return self.RESULT_IO_SERIAL
self.notify(serial_ip_msg)
return selftest.RESULT_IO_SERIAL
selftest.notify(serial_ip_msg)
# Searching for IP address and port prompted by server
m = self.re_detect_server_ip.search(serial_ip_msg)
if m and len(m.groups()):
self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
selftest.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
# We assume this test fails so can't send 'error' message to server
try:
self.s = socket(AF_INET, SOCK_DGRAM)
except Exception, e:
self.s = None
self.notify("HOST: Socket error: %s"% e)
return self.RESULT_ERROR
selftest.notify("HOST: Socket error: %s"% e)
return selftest.RESULT_ERROR
for i in range(0, 100):
TEST_STRING = str(uuid.uuid4())
@ -67,8 +65,4 @@ class UDPEchoServerTest(DefaultTest):
if self.s is not None:
self.s.close()
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':
UDPEchoServerTest().run()
return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE

View File

@ -16,10 +16,8 @@ limitations under the License.
"""
from time import time
from host_test import DefaultTest
class WaitusTest(DefaultTest):
class WaitusTest():
""" This test is reading single characters from stdio
and measures time between their occurrences.
"""
@ -27,30 +25,30 @@ class WaitusTest(DefaultTest):
TICK_LOOP_SUCCESSFUL_COUNTS = 10
DEVIATION = 0.10 # +/-10%
def test(self):
def test(self, selftest):
test_result = True
# First character to start test (to know after reset when test starts)
if self.mbed.set_serial_timeout(None) is None:
return self.RESULT_IO_SERIAL
c = self.mbed.serial_read(1)
if selftest.mbed.set_serial_timeout(None) is None:
return selftest.RESULT_IO_SERIAL
c = selftest.mbed.serial_read(1)
if c is None:
return self.RESULT_IO_SERIAL
return selftest.RESULT_IO_SERIAL
if c == '$': # target will printout TargetID e.g.: $$$$1040e649d5c09a09a3f6bc568adef61375c6
#Read additional 39 bytes of TargetID
if self.mbed.serial_read(39) is None:
return self.RESULT_IO_SERIAL
c = self.mbed.serial_read(1) # Re-read first 'tick'
if selftest.mbed.serial_read(39) is None:
return selftest.RESULT_IO_SERIAL
c = selftest.mbed.serial_read(1) # Re-read first 'tick'
if c is None:
return self.RESULT_IO_SERIAL
return selftest.RESULT_IO_SERIAL
start_serial_pool = time()
start = time()
success_counter = 0
for i in range(0, self.TICK_LOOP_COUNTER):
c = self.mbed.serial_read(1)
c = selftest.mbed.serial_read(1)
if c is None:
return self.RESULT_IO_SERIAL
return selftest.RESULT_IO_SERIAL
delta = time() - start
deviation = abs(delta - 1)
# Round values
@ -60,16 +58,12 @@ class WaitusTest(DefaultTest):
deviation_ok = True if delta > 0 and deviation <= self.DEVIATION else False
success_counter = success_counter+1 if deviation_ok else 0
msg = "OK" if deviation_ok else "FAIL"
self.notify("%s in %.2f sec (%.2f) [%s]"% (c, delta, deviation, msg))
selftest.notify("%s in %.2f sec (%.2f) [%s]"% (c, delta, deviation, msg))
start = time()
if success_counter >= self.TICK_LOOP_SUCCESSFUL_COUNTS:
break
measurement_time = time() - start_serial_pool
self.notify("Consecutive OK timer reads: %d"% success_counter)
self.notify("Completed in %.2f sec" % (measurement_time))
selftest.notify("Consecutive OK timer reads: %d"% success_counter)
selftest.notify("Completed in %.2f sec" % (measurement_time))
test_result = True if success_counter >= self.TICK_LOOP_SUCCESSFUL_COUNTS else False
return self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE
if __name__ == '__main__':
WaitusTest().run()
return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE

View File

@ -71,7 +71,15 @@ from workspace_tools.test_api import get_avail_tests_summary_table
from workspace_tools.test_api import get_default_test_options_parser
from workspace_tools.test_api import print_muts_configuration_from_json
from workspace_tools.test_api import print_test_configuration_from_json
from workspace_tools.test_api import get_autodetected_MUTS
from workspace_tools.test_api import get_autodetected_TEST_SPEC
from workspace_tools.test_api import get_module_avail
# Importing extra modules which can be not installed but if available they can extend test suite functionality
try:
import mbed_lstools
except:
pass
def get_version():
""" Returns test script version
@ -126,29 +134,55 @@ if __name__ == '__main__':
print mcu_toolchain_matrix(platform_filter=opts.general_filter_regex)
exit(0)
# Open file with test specification
# test_spec_filename tells script which targets and their toolchain(s)
# should be covered by the test scenario
test_spec = get_json_data_from_file(opts.test_spec_filename) if opts.test_spec_filename else None
if test_spec is None:
if not opts.test_spec_filename:
parser.print_help()
exit(-1)
test_spec = None
MUTs = None
# Get extra MUTs if applicable
MUTs = get_json_data_from_file(opts.muts_spec_filename) if opts.muts_spec_filename else None
if opts.auto_detect:
print "MBEDLS: Detecting connected mbed-enabled devices... "
if MUTs is None:
if not opts.muts_spec_filename:
parser.print_help()
exit(-1)
if get_module_avail('mbed_lstools'):
mbeds = mbed_lstools.create()
muts_list = mbeds.list_mbeds()
for mut in muts_list:
print "MBEDLS: Detected %s, port: %s, mounted: %s"% (mut['platform_name'],
mut['serial_port'],
mut['mount_point'])
# Set up parameters for test specification filter function (we need to set toolchains per target here)
use_default_toolchain = 'default' in opts.toolchains_filter.split(',') if opts.toolchains_filter is not None else True
use_supported_toolchains = 'all' in opts.toolchains_filter.split(',') if opts.toolchains_filter is not None else False
toolchain_filter = opts.toolchains_filter
# Test specification with information about each target and associated toolchain
test_spec = get_autodetected_TEST_SPEC(muts_list,
use_default_toolchain=use_default_toolchain,
use_supported_toolchains=use_supported_toolchains,
toolchain_filter=toolchain_filter)
# MUTs configuration auto-detection
MUTs = get_autodetected_MUTS(muts_list)
else:
# Open file with test specification
# test_spec_filename tells script which targets and their toolchain(s)
# should be covered by the test scenario
test_spec = get_json_data_from_file(opts.test_spec_filename) if opts.test_spec_filename else None
if test_spec is None:
if not opts.test_spec_filename:
parser.print_help()
exit(-1)
# Get extra MUTs if applicable
MUTs = get_json_data_from_file(opts.muts_spec_filename) if opts.muts_spec_filename else None
if MUTs is None:
if not opts.muts_spec_filename:
parser.print_help()
exit(-1)
if opts.verbose_test_configuration_only:
print "MUTs configuration in %s:"% opts.muts_spec_filename
print "MUTs configuration in %s:"% ('auto-detected' if opts.auto_detect else opts.muts_spec_filename)
if MUTs:
print print_muts_configuration_from_json(MUTs)
print print_muts_configuration_from_json(MUTs, platform_filter=opts.general_filter_regex)
print
print "Test specification in %s:"% opts.test_spec_filename
print "Test specification in %s:"% ('auto-detected' if opts.auto_detect else opts.test_spec_filename)
if test_spec:
print print_test_configuration_from_json(test_spec)
exit(0)

View File

@ -28,6 +28,7 @@ import optparse
import datetime
import threading
from types import ListType
from colorama import Fore, Back, Style
from prettytable import PrettyTable
from time import sleep, time
@ -54,6 +55,11 @@ from workspace_tools.test_exporters import ReportExporter, ResultExporterType
import workspace_tools.host_tests.host_tests_plugins as host_tests_plugins
try:
import mbed_lstools
except:
pass
class ProcessObserver(Thread):
def __init__(self, proc):
@ -167,6 +173,9 @@ class SingleTestRunner(object):
_opts_extend_test_timeout=None):
""" Let's try hard to init this object
"""
from colorama import init
init()
PATTERN = "\\{(" + "|".join(self.TEST_RESULT_MAPPING.keys()) + ")\\}"
self.RE_DETECT_TESTCASE_RESULT = re.compile(PATTERN)
# Settings related to test loops counters
@ -681,18 +690,20 @@ class SingleTestRunner(object):
host_test_verbose = self.opts_verbose_test_result_only or self.opts_verbose
host_test_reset = self.opts_mut_reset_type if reset_type is None else reset_type
single_test_result, single_test_output = self.run_host_test(test.host_test,
image_path, disk, port, duration,
micro=target_name,
verbose=host_test_verbose,
reset=host_test_reset,
reset_tout=reset_tout,
copy_method=selected_copy_method,
program_cycle_s=target_by_mcu.program_cycle_s())
host_test_result = self.run_host_test(test.host_test,
image_path, disk, port, duration,
micro=target_name,
verbose=host_test_verbose,
reset=host_test_reset,
reset_tout=reset_tout,
copy_method=selected_copy_method,
program_cycle_s=target_by_mcu.program_cycle_s())
single_test_result, single_test_output, single_testduration, single_timeout = host_test_result
# Store test result
test_all_result.append(single_test_result)
elapsed_time = time() - start_host_exec_time
total_elapsed_time = time() - start_host_exec_time # Test time with copy (flashing) / reset
elapsed_time = single_testduration # TIme of single test case execution after reset
detailed_test_results[test_index] = {
'single_test_result' : single_test_result,
@ -702,12 +713,12 @@ class SingleTestRunner(object):
'test_id' : test_id,
'test_description' : test_description,
'elapsed_time' : round(elapsed_time, 2),
'duration' : duration,
'duration' : single_timeout,
'copy_method' : _copy_method,
}
print self.print_test_result(single_test_result, target_name, toolchain_name,
test_id, test_description, elapsed_time, duration)
test_id, test_description, elapsed_time, single_timeout)
# Update database entries for ongoing test
if self.db_logger and self.db_logger.is_connected():
@ -720,7 +731,7 @@ class SingleTestRunner(object):
single_test_result,
single_test_output,
elapsed_time,
duration,
single_timeout,
test_index)
# If we perform waterfall test we test until we get OK and we stop testing
@ -730,9 +741,14 @@ class SingleTestRunner(object):
if self.db_logger:
self.db_logger.disconnect()
return (self.shape_global_test_loop_result(test_all_result), target_name, toolchain_name,
test_id, test_description, round(elapsed_time, 2),
duration, self.shape_test_loop_ok_result_count(test_all_result)), detailed_test_results
return (self.shape_global_test_loop_result(test_all_result),
target_name,
toolchain_name,
test_id,
test_description,
round(elapsed_time, 2),
single_timeout,
self.shape_test_loop_ok_result_count(test_all_result)), detailed_test_results
def print_test_result(self, test_result, target_name, toolchain_name,
test_id, test_description, elapsed_time, duration):
@ -747,7 +763,7 @@ class SingleTestRunner(object):
separator = "::"
time_info = " in %.2f of %d sec" % (round(elapsed_time, 2), duration)
result = separator.join(tokens) + " [" + test_result +"]" + time_info
return result
return Fore.MAGENTA + result + Fore.RESET
def shape_test_loop_ok_result_count(self, test_all_result):
""" Reformats list of results to simple string
@ -799,6 +815,17 @@ class SingleTestRunner(object):
break
return result
def get_auto_property_value(property_name, line):
""" Scans auto detection line from MUT and returns scanned parameter 'property_name'
Returns string
"""
result = None
if re.search("HOST: Property '%s'"% property_name, line) is not None:
property = re.search("HOST: Property '%s' = '([\w\d _]+)'"% property_name, line)
if property is not None and len(property.groups()) == 1:
result = property.groups()[0]
return result
# print "{%s} port:%s disk:%s" % (name, port, disk),
cmd = ["python",
'%s.py'% name,
@ -819,17 +846,17 @@ class SingleTestRunner(object):
cmd += ["-R", str(reset_tout)]
if verbose:
print "Executing '" + " ".join(cmd) + "'"
print Fore.MAGENTA + "Executing '" + " ".join(cmd) + "'" + Fore.RESET
print "Test::Output::Start"
proc = Popen(cmd, stdout=PIPE, cwd=HOST_TESTS)
obs = ProcessObserver(proc)
start_time = time()
update_once_flag = {} # Stores flags checking if some auto-parameter was already set
line = ''
output = []
while (time() - start_time) < (2 * duration):
start_time = time()
while (time() - start_time) < (duration):
c = get_char_from_queue(obs)
if c:
if verbose:
sys.stdout.write(c)
@ -837,11 +864,28 @@ class SingleTestRunner(object):
output.append(c)
# Give the mbed under test a way to communicate the end of the test
if c in ['\n', '\r']:
# Checking for auto-detection information from the test about MUT reset moment
if 'reset_target' not in update_once_flag and "HOST: Reset target..." in line:
# We will update this marker only once to prevent multiple time resets
update_once_flag['reset_target'] = True
start_time = time()
# Checking for auto-detection information from the test about timeout
auto_timeout_val = get_auto_property_value('timeout', line)
if 'timeout' not in update_once_flag and auto_timeout_val is not None:
# We will update this marker only once to prevent multiple time resets
update_once_flag['timeout'] = True
duration = int(auto_timeout_val)
# Check for test end
if '{end}' in line:
break
line = ''
else:
line += c
end_time = time()
testcase_duration = end_time - start_time # Test case duration from reset to {end}
c = get_char_from_queue(obs)
@ -857,7 +901,7 @@ class SingleTestRunner(object):
obs.stop()
result = get_test_result(output)
return result, "".join(output)
return (result, "".join(output), testcase_duration, duration)
def is_peripherals_available(self, target_mcu_name, peripherals=None):
""" Checks if specified target should run specific peripheral test case
@ -976,7 +1020,7 @@ def get_json_data_from_file(json_spec_filename, verbose=False):
return result
def print_muts_configuration_from_json(json_data, join_delim=", "):
def print_muts_configuration_from_json(json_data, join_delim=", ", platform_filter=None):
""" Prints MUTs configuration passed to test script for verboseness
"""
muts_info_cols = []
@ -997,12 +1041,17 @@ def print_muts_configuration_from_json(json_data, join_delim=", "):
for k in json_data:
row = [k]
mut_info = json_data[k]
for col in muts_info_cols:
cell_val = mut_info[col] if col in mut_info else None
if type(cell_val) == ListType:
cell_val = join_delim.join(cell_val)
row.append(cell_val)
pt.add_row(row)
add_row = True
if platform_filter and 'mcu' in mut_info:
add_row = re.search(platform_filter, mut_info['mcu']) is not None
if add_row:
for col in muts_info_cols:
cell_val = mut_info[col] if col in mut_info else None
if type(cell_val) == ListType:
cell_val = join_delim.join(cell_val)
row.append(cell_val)
pt.add_row(row)
return pt.get_string()
@ -1039,6 +1088,7 @@ def print_test_configuration_from_json(json_data, join_delim=", "):
target_name = target if target in TARGET_MAP else "%s*"% target
row = [target_name]
toolchains = targets[target]
for toolchain in sorted(toolchains_info_cols):
# Check for conflicts: target vs toolchain
conflict = False
@ -1333,8 +1383,71 @@ def detect_database_verbose(db_url):
print "Parse error: '%s' - DB Url error"% (db_url)
def get_module_avail(module_name):
""" This function returns True if module_name is already impored module
"""
return module_name in sys.modules.keys()
def get_autodetected_MUTS(mbeds_list):
""" Function detects all connected to host mbed-enabled devices and generates artificial MUTS file.
If function fails to auto-detect devices it will return empty dictionary.
if get_module_avail('mbed_lstools'):
mbeds = mbed_lstools.create()
mbeds_list = mbeds.list_mbeds()
"""
result = {} # Should be in muts_all.json format
# Align mbeds_list from mbed_lstools to MUT file format (JSON dictionary with muts)
# mbeds_list = [{'platform_name': 'NUCLEO_F302R8', 'mount_point': 'E:', 'target_id': '07050200623B61125D5EF72A', 'serial_port': u'COM34'}]
index = 1
for mut in mbeds_list:
m = {'mcu' : mut['platform_name'],
'port' : mut['serial_port'],
'disk' : mut['mount_point'],
'peripherals' : [] # No peripheral detection
}
if index not in result:
result[index] = {}
result[index] = m
index += 1
return result
def get_autodetected_TEST_SPEC(mbeds_list, use_default_toolchain=True, use_supported_toolchains=False, toolchain_filter=None):
""" Function detects all connected to host mbed-enabled devices and generates artificial test_spec file.
If function fails to auto-detect devices it will return empty 'targets' test_spec description.
use_default_toolchain - if True add default toolchain to test_spec
use_supported_toolchains - if True add all supported toolchains to test_spec
toolchain_filter - if [...list of toolchains...] add from all toolchains only those in filter to test_spec
"""
result = {'targets': {} }
for mut in mbeds_list:
mcu = mut['platform_name']
if mcu in TARGET_MAP:
default_toolchain = TARGET_MAP[mcu].default_toolchain
supported_toolchains = TARGET_MAP[mcu].supported_toolchains
# Decide which toolchains should be added to test specification toolchain pool for each target
toolchains = []
if use_default_toolchain:
toolchains.append(default_toolchain)
if use_supported_toolchains:
toolchains += supported_toolchains
if toolchain_filter is not None:
all_toolchains = supported_toolchains + [default_toolchain]
for toolchain in toolchain_filter.split(','):
if toolchain in all_toolchains:
toolchains.append(toolchain)
result['targets'][mcu] = list(set(toolchains))
return result
def get_default_test_options_parser():
""" Get common test script options used by CLI, webservices etc.
""" Get common test script options used by CLI, web services etc.
"""
parser = optparse.OptionParser()
parser.add_option('-i', '--tests',
@ -1353,6 +1466,19 @@ def get_default_test_options_parser():
type="int",
help="Define number of compilation jobs. Default value is 1")
if get_module_avail('mbed_lstools'):
# Additional features available when mbed_lstools is installed on host and imported
# mbed_lstools allow users to detect connected to host mbed-enabled devices
parser.add_option('', '--auto',
dest='auto_detect',
metavar=False,
action="store_true",
help='Use mbed-ls module to detect all connected mbed devices')
parser.add_option('', '--tc',
dest='toolchains_filter',
help="Toolchain filter for --auto option. Use toolcahins names separated by comma, 'default' or 'all' to select toolchains")
parser.add_option('', '--clean',
dest='clean',
metavar=False,

View File

@ -119,6 +119,7 @@ TESTS = [
"id": "MBED_A7", "description": "InterruptIn",
"source_dir": join(TEST_DIR, "mbed", "interruptin"),
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"duration": 15,
"automated": True,
"peripherals": ["digital_loop"]
},
@ -137,7 +138,7 @@ TESTS = [
"source_dir": join(TEST_DIR, "mbed", "echo"),
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"automated": True,
"host_test": "echo"
#"host_test": "echo"
},
{
"id": "MBED_A10", "description": "PortOut PortIn",
@ -339,7 +340,7 @@ TESTS = [
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"duration": 20,
"automated": True,
"host_test": "stdio_auto"
#"host_test": "stdio_auto"
},
{
"id": "MBED_3", "description": "PortOut",
@ -385,14 +386,14 @@ TESTS = [
"source_dir": join(TEST_DIR, "mbed", "hello"),
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"automated": True,
"host_test": "hello_auto",
#"host_test": "hello_auto",
},
{
"id": "MBED_11", "description": "Ticker Int",
"source_dir": join(TEST_DIR, "mbed", "ticker"),
"dependencies": [MBED_LIBRARIES],
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"automated": True,
"host_test": "wait_us_auto",
#"host_test": "wait_us_auto",
"duration": 20,
},
{
@ -421,9 +422,9 @@ TESTS = [
{
"id": "MBED_16", "description": "RTC",
"source_dir": join(TEST_DIR, "mbed", "rtc"),
"dependencies": [MBED_LIBRARIES],
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"automated": True,
"host_test": "rtc_auto",
#"host_test": "rtc_auto",
"duration": 15
},
{
@ -465,7 +466,7 @@ TESTS = [
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"duration": 15,
"automated": True,
"host_test": "wait_us_auto"
#"host_test": "wait_us_auto"
},
{
"id": "MBED_24", "description": "Timeout Int us",
@ -473,7 +474,7 @@ TESTS = [
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"duration": 15,
"automated": True,
"host_test": "wait_us_auto"
#"host_test": "wait_us_auto"
},
{
"id": "MBED_25", "description": "Time us",
@ -481,7 +482,7 @@ TESTS = [
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"duration": 15,
"automated": True,
"host_test": "wait_us_auto"
#"host_test": "wait_us_auto"
},
{
"id": "MBED_26", "description": "Integer constant division",
@ -535,7 +536,7 @@ TESTS = [
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"duration": 15,
"automated": True,
"host_test": "wait_us_auto"
#"host_test": "wait_us_auto"
},
@ -589,11 +590,16 @@ TESTS = [
{
"id": "RTOS_1", "description": "Basic thread",
"source_dir": join(TEST_DIR, "rtos", "mbed", "basic"),
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES],
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, TEST_MBED_LIB],
"duration": 15,
"automated": True,
"host_test": "wait_us_auto",
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812", "KL25Z", "KL05Z", "K64F", "KL46Z", "RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE", "NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8", "NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC", "DISCO_F401VC"],
#"host_test": "wait_us_auto",
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812",
"KL25Z", "KL05Z", "K64F", "KL46Z",
"RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE",
"NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8",
"NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC",
"DISCO_F401VC"],
},
{
"id": "RTOS_2", "description": "Mutex resource lock",
@ -601,7 +607,12 @@ TESTS = [
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, TEST_MBED_LIB],
"duration": 20,
"automated": True,
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812", "KL25Z", "KL05Z", "K64F", "KL46Z", "RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE", "NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8", "NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC", "DISCO_F401VC"],
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812",
"KL25Z", "KL05Z", "K64F", "KL46Z",
"RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE",
"NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8",
"NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC",
"DISCO_F401VC"],
},
{
"id": "RTOS_3", "description": "Semaphore resource lock",
@ -609,44 +620,74 @@ TESTS = [
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, TEST_MBED_LIB],
"duration": 20,
"automated": True,
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812", "KL25Z", "KL05Z", "K64F", "KL46Z", "RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE", "NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8", "NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC", "DISCO_F401VC"],
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812",
"KL25Z", "KL05Z", "K64F", "KL46Z",
"RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE",
"NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8",
"NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC",
"DISCO_F401VC"],
},
{
"id": "RTOS_4", "description": "Signals messaging",
"source_dir": join(TEST_DIR, "rtos", "mbed", "signals"),
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, TEST_MBED_LIB],
"automated": True,
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812", "KL25Z", "KL05Z", "K64F", "KL46Z", "RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE", "NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8", "NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC", "DISCO_F401VC"],
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812",
"KL25Z", "KL05Z", "K64F", "KL46Z",
"RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE",
"NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8",
"NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC",
"DISCO_F401VC"],
},
{
"id": "RTOS_5", "description": "Queue messaging",
"source_dir": join(TEST_DIR, "rtos", "mbed", "queue"),
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, TEST_MBED_LIB],
"automated": True,
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812", "KL25Z", "KL05Z", "K64F", "KL46Z", "RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE", "NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8", "NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC", "DISCO_F401VC"],
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812",
"KL25Z", "KL05Z", "K64F", "KL46Z",
"RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE",
"NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8",
"NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC",
"DISCO_F401VC"],
},
{
"id": "RTOS_6", "description": "Mail messaging",
"source_dir": join(TEST_DIR, "rtos", "mbed", "mail"),
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, TEST_MBED_LIB],
"automated": True,
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812", "KL25Z", "KL05Z", "K64F", "KL46Z", "RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE", "NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8", "NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC", "DISCO_F401VC"],
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812",
"KL25Z", "KL05Z", "K64F", "KL46Z",
"RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE",
"NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8",
"NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC",
"DISCO_F401VC"],
},
{
"id": "RTOS_7", "description": "Timer",
"source_dir": join(TEST_DIR, "rtos", "mbed", "timer"),
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES],
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, TEST_MBED_LIB],
"duration": 15,
"automated": True,
"host_test": "wait_us_auto",
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812", "KL25Z", "KL05Z", "K64F", "KL46Z", "RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE", "NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8", "NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC", "DISCO_F401VC"],
#"host_test": "wait_us_auto",
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812",
"KL25Z", "KL05Z", "K64F", "KL46Z",
"RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE",
"NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8",
"NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC",
"DISCO_F401VC"],
},
{
"id": "RTOS_8", "description": "ISR (Queue)",
"source_dir": join(TEST_DIR, "rtos", "mbed", "isr"),
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, TEST_MBED_LIB],
"automated": True,
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812", "KL25Z", "KL05Z", "K64F", "KL46Z", "RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE", "NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8", "NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC", "DISCO_F401VC"],
"mcu": ["LPC1768", "LPC1549", "LPC11U24", "LPC812",
"KL25Z", "KL05Z", "K64F", "KL46Z",
"RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE",
"NUCLEO_F401RE", "NUCLEO_F334R8", "DISCO_F334C8", "NUCLEO_F302R8",
"NUCLEO_L053R8", "DISCO_L053C8", "NUCLEO_F072RB", "NUCLEO_F091RC",
"DISCO_F401VC"],
},
{
"id": "RTOS_9", "description": "SD File write-read",
@ -654,7 +695,9 @@ TESTS = [
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, TEST_MBED_LIB, FS_LIBRARY],
"automated": True,
"peripherals": ["SD"],
"mcu": ["LPC1768", "LPC11U24", "LPC812", "KL25Z", "KL05Z", "K64F", "KL46Z", "RZ_A1H", "DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE", "NUCLEO_F401RE"],
"mcu": ["LPC1768", "LPC11U24", "LPC812", "KL25Z",
"KL05Z", "K64F", "KL46Z", "RZ_A1H",
"DISCO_F407VG", "DISCO_F429ZI", "NUCLEO_F411RE", "NUCLEO_F401RE"],
},
# Networking Tests
@ -677,9 +720,9 @@ TESTS = [
{
"id": "NET_3", "description": "TCP echo server",
"source_dir": join(TEST_DIR, "net", "echo", "tcp_server"),
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY],
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY, TEST_MBED_LIB],
"automated": True,
"host_test" : "tcpecho_server_auto",
#"host_test" : "tcpecho_server_auto",
"peripherals": ["ethernet"],
},
{
@ -687,15 +730,15 @@ TESTS = [
"source_dir": join(TEST_DIR, "net", "echo", "tcp_client"),
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY, TEST_MBED_LIB],
"automated": True,
"host_test": "tcpecho_client_auto",
#"host_test": "tcpecho_client_auto",
"peripherals": ["ethernet"]
},
{
"id": "NET_5", "description": "UDP echo server",
"source_dir": join(TEST_DIR, "net", "echo", "udp_server"),
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY],
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY, TEST_MBED_LIB],
"automated": True,
"host_test" : "udpecho_server_auto",
#"host_test" : "udpecho_server_auto",
"peripherals": ["ethernet"]
},
{
@ -703,7 +746,7 @@ TESTS = [
"source_dir": join(TEST_DIR, "net", "echo", "udp_client"),
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY, TEST_MBED_LIB],
"automated": True,
"host_test" : "udpecho_client_auto",
#"host_test" : "udpecho_client_auto",
"peripherals": ["ethernet"],
},
{
@ -751,7 +794,7 @@ TESTS = [
"dependencies": [MBED_LIBRARIES, RTOS_LIBRARIES, ETH_LIBRARY, TEST_MBED_LIB],
"automated": True,
"duration": 15,
"host_test": "tcpecho_client_auto",
#"host_test": "tcpecho_client_auto",
"peripherals": ["ethernet"],
},
{
@ -872,7 +915,7 @@ TESTS = [
"source_dir": join(TEST_DIR, "mbed", "dev_null"),
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"automated": True,
"host_test" : "dev_null_auto",
#"host_test" : "dev_null_auto",
},
{
"id": "EXAMPLE_2", "description": "FS + RTOS",
@ -922,7 +965,7 @@ TESTS = [
"source_dir": join(TEST_DIR, "mbed", "detect"),
"dependencies": [MBED_LIBRARIES, TEST_MBED_LIB],
"automated": True,
"host_test" : "detect_auto",
#"host_test" : "detect_auto",
},
]

View File

@ -17,6 +17,7 @@ limitations under the License.
import re
import sys
import colorama
from os import stat, walk
from copy import copy
from time import time, sleep
@ -49,6 +50,23 @@ def print_notify(event, silent=False):
if not silent:
print '%s: %s' % (event['action'].title(), basename(event['file']))
def print_notify_color(event, silent=False):
""" Default command line notification with colors
"""
from colorama import Fore, Back, Style
if event['type'] in ['info', 'debug']:
print Fore.GREEN + event['message'] + Fore.RESET
elif event['type'] == 'cc':
event['severity'] = event['severity'].title()
event['file'] = basename(event['file'])
print Fore.YELLOW + '[%(severity)s] %(file)s@%(line)s: %(message)s'% event + Fore.RESET
elif event['type'] == 'progress':
if not silent:
print '%s: %s' % (event['action'].title(), basename(event['file']))
def print_notify_verbose(event, silent=False):
""" Default command line notification with more verbose mode
"""
@ -221,7 +239,7 @@ class mbedToolchain:
self.legacy_ignore_dirs = LEGACY_IGNORE_DIRS - set([target.name, LEGACY_TOOLCHAIN_NAMES[self.name]])
self.notify_fun = notify if notify is not None else print_notify
self.notify_fun = notify if notify is not None else print_notify_color
self.options = options if options is not None else []
self.macros = macros or []
@ -713,6 +731,8 @@ class mbedToolchain:
def var(self, key, value):
self.notify({'type': 'var', 'key': key, 'val': value})
from colorama import init
init()
from workspace_tools.settings import ARM_BIN
from workspace_tools.settings import GCC_ARM_PATH, GCC_CR_PATH, GCC_CS_PATH, CW_EWL_PATH, CW_GCC_PATH