pull/395/head
Mihail Stoyanov 2014-07-11 11:19:02 +03:00
commit 8c8b3eb3b9
19 changed files with 554 additions and 299 deletions

View File

@ -30,21 +30,27 @@ static gpio_irq_handler irq_handler;
#define IRQ_FALLING_EDGE PORT_PCR_IRQC(10) #define IRQ_FALLING_EDGE PORT_PCR_IRQC(10)
#define IRQ_EITHER_EDGE PORT_PCR_IRQC(11) #define IRQ_EITHER_EDGE PORT_PCR_IRQC(11)
static void handle_interrupt_in(PORT_Type *port, int ch_base) { const uint32_t search_bits[] = {0x0000FFFF, 0x000000FF, 0x0000000F, 0x00000003, 0x00000001};
uint32_t mask = 0, i;
for (i = 0; i < 32; i++) { static void handle_interrupt_in(PORT_Type *port, int ch_base) {
uint32_t pmask = (1 << i); uint32_t isfr;
if (port->ISFR & pmask) { uint8_t location;
mask |= pmask;
uint32_t id = channel_ids[ch_base + i]; while((isfr = port->ISFR) != 0) {
location = 0;
for (int i = 0; i < 5; i++) {
if (!(isfr & (search_bits[i] << location)))
location += 1 << (4 - i);
}
uint32_t id = channel_ids[ch_base + location];
if (id == 0) { if (id == 0) {
continue; continue;
} }
FGPIO_Type *gpio; FGPIO_Type *gpio;
gpio_irq_event event = IRQ_NONE; gpio_irq_event event = IRQ_NONE;
switch (port->PCR[i] & PORT_PCR_IRQC_MASK) { switch (port->PCR[location] & PORT_PCR_IRQC_MASK) {
case IRQ_RAISING_EDGE: case IRQ_RAISING_EDGE:
event = IRQ_RISE; event = IRQ_RISE;
break; break;
@ -55,16 +61,15 @@ static void handle_interrupt_in(PORT_Type *port, int ch_base) {
case IRQ_EITHER_EDGE: case IRQ_EITHER_EDGE:
gpio = (port == PORTA) ? (FPTA) : (FPTB); gpio = (port == PORTA) ? (FPTA) : (FPTB);
event = (gpio->PDIR & pmask) ? (IRQ_RISE) : (IRQ_FALL); event = (gpio->PDIR & (1 << location)) ? (IRQ_RISE) : (IRQ_FALL);
break; break;
} }
if (event != IRQ_NONE) { if (event != IRQ_NONE) {
irq_handler(id, event); irq_handler(id, event);
} }
port->ISFR = 1 << location;
} }
} }
port->ISFR = mask;
}
/* IRQ only on PORTA and PORTB */ /* IRQ only on PORTA and PORTB */
void gpio_irqA(void) { void gpio_irqA(void) {

View File

@ -30,21 +30,27 @@ static gpio_irq_handler irq_handler;
#define IRQ_FALLING_EDGE PORT_PCR_IRQC(10) #define IRQ_FALLING_EDGE PORT_PCR_IRQC(10)
#define IRQ_EITHER_EDGE PORT_PCR_IRQC(11) #define IRQ_EITHER_EDGE PORT_PCR_IRQC(11)
static void handle_interrupt_in(PORT_Type *port, int ch_base) { const uint32_t search_bits[] = {0x0000FFFF, 0x000000FF, 0x0000000F, 0x00000003, 0x00000001};
uint32_t mask = 0, i;
for (i = 0; i < 32; i++) { static void handle_interrupt_in(PORT_Type *port, int ch_base) {
uint32_t pmask = (1 << i); uint32_t isfr;
if (port->ISFR & pmask) { uint8_t location;
mask |= pmask;
uint32_t id = channel_ids[ch_base + i]; while((isfr = port->ISFR) != 0) {
location = 0;
for (int i = 0; i < 5; i++) {
if (!(isfr & (search_bits[i] << location)))
location += 1 << (4 - i);
}
uint32_t id = channel_ids[ch_base + location];
if (id == 0) { if (id == 0) {
continue; continue;
} }
FGPIO_Type *gpio; FGPIO_Type *gpio;
gpio_irq_event event = IRQ_NONE; gpio_irq_event event = IRQ_NONE;
switch (port->PCR[i] & PORT_PCR_IRQC_MASK) { switch (port->PCR[location] & PORT_PCR_IRQC_MASK) {
case IRQ_RAISING_EDGE: case IRQ_RAISING_EDGE:
event = IRQ_RISE; event = IRQ_RISE;
break; break;
@ -55,16 +61,15 @@ static void handle_interrupt_in(PORT_Type *port, int ch_base) {
case IRQ_EITHER_EDGE: case IRQ_EITHER_EDGE:
gpio = (port == PORTA) ? (FPTA) : (FPTD); gpio = (port == PORTA) ? (FPTA) : (FPTD);
event = (gpio->PDIR & pmask) ? (IRQ_RISE) : (IRQ_FALL); event = (gpio->PDIR & (1 << location)) ? (IRQ_RISE) : (IRQ_FALL);
break; break;
} }
if (event != IRQ_NONE) { if (event != IRQ_NONE) {
irq_handler(id, event); irq_handler(id, event);
} }
port->ISFR = 1 << location;
} }
} }
port->ISFR = mask;
}
void gpio_irqA(void) {handle_interrupt_in(PORTA, 0);} void gpio_irqA(void) {handle_interrupt_in(PORTA, 0);}
void gpio_irqD(void) {handle_interrupt_in(PORTD, 32);} void gpio_irqD(void) {handle_interrupt_in(PORTD, 32);}

View File

@ -30,21 +30,27 @@ static gpio_irq_handler irq_handler;
#define IRQ_FALLING_EDGE PORT_PCR_IRQC(10) #define IRQ_FALLING_EDGE PORT_PCR_IRQC(10)
#define IRQ_EITHER_EDGE PORT_PCR_IRQC(11) #define IRQ_EITHER_EDGE PORT_PCR_IRQC(11)
static void handle_interrupt_in(PORT_Type *port, int ch_base) { const uint32_t search_bits[] = {0x0000FFFF, 0x000000FF, 0x0000000F, 0x00000003, 0x00000001};
uint32_t mask = 0, i;
for (i = 0; i < 32; i++) { static void handle_interrupt_in(PORT_Type *port, int ch_base) {
uint32_t pmask = (1 << i); uint32_t isfr;
if (port->ISFR & pmask) { uint8_t location;
mask |= pmask;
uint32_t id = channel_ids[ch_base + i]; while((isfr = port->ISFR) != 0) {
location = 0;
for (int i = 0; i < 5; i++) {
if (!(isfr & (search_bits[i] << location)))
location += 1 << (4 - i);
}
uint32_t id = channel_ids[ch_base + location];
if (id == 0) { if (id == 0) {
continue; continue;
} }
FGPIO_Type *gpio; FGPIO_Type *gpio;
gpio_irq_event event = IRQ_NONE; gpio_irq_event event = IRQ_NONE;
switch (port->PCR[i] & PORT_PCR_IRQC_MASK) { switch (port->PCR[location] & PORT_PCR_IRQC_MASK) {
case IRQ_RAISING_EDGE: case IRQ_RAISING_EDGE:
event = IRQ_RISE; event = IRQ_RISE;
break; break;
@ -61,14 +67,14 @@ static void handle_interrupt_in(PORT_Type *port, int ch_base) {
} else { } else {
gpio = FPTD; gpio = FPTD;
} }
event = (gpio->PDIR & pmask) ? (IRQ_RISE) : (IRQ_FALL); event = (gpio->PDIR & (1<<location)) ? (IRQ_RISE) : (IRQ_FALL);
break; break;
} }
if (event != IRQ_NONE) if (event != IRQ_NONE) {
irq_handler(id, event); irq_handler(id, event);
} }
port->ISFR = 1 << location;
} }
port->ISFR = mask;
} }
void gpio_irqA(void) { void gpio_irqA(void) {

View File

@ -241,6 +241,9 @@ osThreadDef_t os_thread_def_main = {(os_pthread)main, osPriorityNormal, 0, NULL}
#elif defined(TARGET_STM32F407) || defined(TARGET_F407VG) #elif defined(TARGET_STM32F407) || defined(TARGET_F407VG)
#define INITIAL_SP (0x20020000UL) #define INITIAL_SP (0x20020000UL)
#elif defined(TARGET_STM32F401RE)
#define INITIAL_SP (0x20018000UL)
#elif defined(TARGET_LPC1549) #elif defined(TARGET_LPC1549)
#define INITIAL_SP (0x02009000UL) #define INITIAL_SP (0x02009000UL)

View File

@ -49,7 +49,7 @@
// counting "main", but not counting "osTimerThread" // counting "main", but not counting "osTimerThread"
// <i> Default: 6 // <i> Default: 6
#ifndef OS_TASKCNT #ifndef OS_TASKCNT
# if defined(TARGET_LPC1768) || defined(TARGET_LPC2368) || defined(TARGET_LPC4088) || defined(TARGET_LPC1347) || defined(TARGET_K64F) \ # if defined(TARGET_LPC1768) || defined(TARGET_LPC2368) || defined(TARGET_LPC4088) || defined(TARGET_LPC1347) || defined(TARGET_K64F) || defined(TARGET_STM32F401RE)\
|| defined(TARGET_KL46Z) || defined(TARGET_STM32F407) || defined(TARGET_F407VG) || defined(TARGET_STM32F303VC) || defined(TARGET_LPC1549) || defined(TARGET_LPC11U68) || defined(TARGET_NRF51822) || defined(TARGET_KL46Z) || defined(TARGET_STM32F407) || defined(TARGET_F407VG) || defined(TARGET_STM32F303VC) || defined(TARGET_LPC1549) || defined(TARGET_LPC11U68) || defined(TARGET_NRF51822)
# define OS_TASKCNT 14 # define OS_TASKCNT 14
# elif defined(TARGET_LPC11U24) || defined(TARGET_LPC11U35_401) || defined(TARGET_LPC11U35_501) || defined(TARGET_LPCCAPPUCCINO) || defined(TARGET_LPC1114) \ # elif defined(TARGET_LPC11U24) || defined(TARGET_LPC11U35_401) || defined(TARGET_LPC11U35_501) || defined(TARGET_LPCCAPPUCCINO) || defined(TARGET_LPC1114) \
@ -62,7 +62,7 @@
// <o>Scheduler (+ interrupts) stack size [bytes] <64-4096:8><#/4> // <o>Scheduler (+ interrupts) stack size [bytes] <64-4096:8><#/4>
#ifndef OS_SCHEDULERSTKSIZE #ifndef OS_SCHEDULERSTKSIZE
# if defined(TARGET_LPC1768) || defined(TARGET_LPC2368) || defined(TARGET_LPC4088) || defined(TARGET_LPC1347) || defined(TARGET_K64F) \ # if defined(TARGET_LPC1768) || defined(TARGET_LPC2368) || defined(TARGET_LPC4088) || defined(TARGET_LPC1347) || defined(TARGET_K64F) || defined(TARGET_STM32F401RE)\
|| defined(TARGET_KL46Z) || defined(TARGET_STM32F407) || defined(TARGET_F407VG) || defined(TARGET_STM32F303VC) || defined(TARGET_LPC1549) || defined(TARGET_LPC11U68) || defined(TARGET_NRF51822) || defined(TARGET_KL46Z) || defined(TARGET_STM32F407) || defined(TARGET_F407VG) || defined(TARGET_STM32F303VC) || defined(TARGET_LPC1549) || defined(TARGET_LPC11U68) || defined(TARGET_NRF51822)
# define OS_SCHEDULERSTKSIZE 256 # define OS_SCHEDULERSTKSIZE 256
# elif defined(TARGET_LPC11U24) || defined(TARGET_LPC11U35_401) || defined(TARGET_LPC11U35_501) || defined(TARGET_LPCCAPPUCCINO) || defined(TARGET_LPC1114) \ # elif defined(TARGET_LPC11U24) || defined(TARGET_LPC11U35_401) || defined(TARGET_LPC11U35_501) || defined(TARGET_LPCCAPPUCCINO) || defined(TARGET_LPC1114) \
@ -132,7 +132,8 @@
# elif defined(TARGET_NRF51822) # elif defined(TARGET_NRF51822)
# define OS_CLOCK 16000000 # define OS_CLOCK 16000000
# elif defined(TARGET_STM32F401RE)
# define OS_CLOCK 84000000
# else # else
# error "no target defined" # error "no target defined"
# endif # endif

View File

@ -1,19 +1,24 @@
#include "mbed.h" #include "mbed.h"
#include "test_env.h" #include "test_env.h"
class DevNull : public Stream { class DevNull : public Stream
{
public: public:
DevNull(const char *name = NULL) : Stream(name) {} DevNull(const char *name = NULL) : Stream(name) {}
protected: protected:
virtual int _getc() {return 0;} virtual int _getc() {
virtual int _putc(int c) {return c;} return 0;
}
virtual int _putc(int c) {
return c;
}
}; };
DevNull null("null"); DevNull null("null");
int main() { int main()
{
printf("MBED: re-routing stdout to /null\n"); printf("MBED: re-routing stdout to /null\n");
freopen("/null", "w", stdout); freopen("/null", "w", stdout);
printf("MBED: printf redirected to /null\n"); // This shouldn't appear printf("MBED: printf redirected to /null\n"); // This shouldn't appear

View File

@ -45,7 +45,7 @@ def build_project(src_path, build_path, target, toolchain_name,
if name is None: if name is None:
name = basename(src_paths[0]) name = basename(src_paths[0])
toolchain.info("\n>>> BUILD PROJECT: %s (%s, %s)" % (name.upper(), target.name, toolchain_name)) toolchain.info("Building project %s (%s, %s)" % (name.upper(), target.name, toolchain_name))
# Scan src_path and libraries_paths for resources # Scan src_path and libraries_paths for resources
resources = toolchain.scan_resources(src_paths[0]) resources = toolchain.scan_resources(src_paths[0])
@ -109,7 +109,7 @@ def build_library(src_paths, build_path, target, toolchain_name,
# The first path will give the name to the library # The first path will give the name to the library
name = basename(src_paths[0]) name = basename(src_paths[0])
toolchain.info("\n>>> BUILD LIBRARY %s (%s, %s)" % (name.upper(), target.name, toolchain_name)) toolchain.info("Building library %s (%s, %s)" % (name.upper(), target.name, toolchain_name))
# Scan Resources # Scan Resources
resources = [] resources = []
@ -165,7 +165,7 @@ def build_mbed_libs(target, toolchain_name, options=None, verbose=False, clean=F
""" Function returns True is library was built and false if building was skipped """ """ Function returns True is library was built and false if building was skipped """
# Check toolchain support # Check toolchain support
if toolchain_name not in target.supported_toolchains: if toolchain_name not in target.supported_toolchains:
print '\n%s target is not yet supported by toolchain %s' % (target.name, toolchain_name) print '%s target is not yet supported by toolchain %s' % (target.name, toolchain_name)
return False return False
# Toolchain # Toolchain
@ -183,7 +183,7 @@ def build_mbed_libs(target, toolchain_name, options=None, verbose=False, clean=F
mkdir(TMP_PATH) mkdir(TMP_PATH)
# CMSIS # CMSIS
toolchain.info("\n>>> BUILD LIBRARY %s (%s, %s)" % ('CMSIS', target.name, toolchain_name)) toolchain.info("Building library %s (%s, %s)"% ('CMSIS', target.name, toolchain_name))
cmsis_src = join(MBED_TARGETS_PATH, "cmsis") cmsis_src = join(MBED_TARGETS_PATH, "cmsis")
resources = toolchain.scan_resources(cmsis_src) resources = toolchain.scan_resources(cmsis_src)
@ -194,7 +194,7 @@ def build_mbed_libs(target, toolchain_name, options=None, verbose=False, clean=F
toolchain.copy_files(objects, BUILD_TOOLCHAIN) toolchain.copy_files(objects, BUILD_TOOLCHAIN)
# mbed # mbed
toolchain.info("\n>>> BUILD LIBRARY %s (%s, %s)" % ('MBED', target.name, toolchain_name)) toolchain.info("Building library %s (%s, %s)" % ('MBED', target.name, toolchain_name))
# Common Headers # Common Headers
toolchain.copy_files(toolchain.scan_resources(MBED_API).headers, MBED_LIBRARIES) toolchain.copy_files(toolchain.scan_resources(MBED_API).headers, MBED_LIBRARIES)
@ -242,7 +242,6 @@ def get_unique_supported_toolchains():
def mcu_toolchain_matrix(verbose_html=False): def mcu_toolchain_matrix(verbose_html=False):
""" Shows target map using prettytable """ """ Shows target map using prettytable """
unique_supported_toolchains = get_unique_supported_toolchains() unique_supported_toolchains = get_unique_supported_toolchains()
from prettytable import PrettyTable # Only use it in this function so building works without extra modules from prettytable import PrettyTable # Only use it in this function so building works without extra modules
# All tests status table print # All tests status table print
@ -277,6 +276,11 @@ def mcu_toolchain_matrix(verbose_html=False):
return result return result
def get_target_supported_toolchains(target):
""" Returns target supported toolchains list """
return TARGET_MAP[target].supported_toolchains if target in TARGET_MAP else None
def static_analysis_scan(target, toolchain_name, CPPCHECK_CMD, CPPCHECK_MSG_FORMAT, options=None, verbose=False, clean=False, macros=None, notify=None, jobs=1): def static_analysis_scan(target, toolchain_name, CPPCHECK_CMD, CPPCHECK_MSG_FORMAT, options=None, verbose=False, clean=False, macros=None, notify=None, jobs=1):
# Toolchain # Toolchain
toolchain = TOOLCHAIN_CLASSES[toolchain_name](target, options, macros=macros, notify=notify) toolchain = TOOLCHAIN_CLASSES[toolchain_name](target, options, macros=macros, notify=notify)
@ -293,7 +297,7 @@ def static_analysis_scan(target, toolchain_name, CPPCHECK_CMD, CPPCHECK_MSG_FORM
mkdir(TMP_PATH) mkdir(TMP_PATH)
# CMSIS # CMSIS
toolchain.info(">>>> STATIC ANALYSIS FOR %s (%s, %s)" % ('CMSIS', target.name, toolchain_name)) toolchain.info("Static analysis for %s (%s, %s)" % ('CMSIS', target.name, toolchain_name))
cmsis_src = join(MBED_TARGETS_PATH, "cmsis") cmsis_src = join(MBED_TARGETS_PATH, "cmsis")
resources = toolchain.scan_resources(cmsis_src) resources = toolchain.scan_resources(cmsis_src)
@ -331,7 +335,7 @@ def static_analysis_scan(target, toolchain_name, CPPCHECK_CMD, CPPCHECK_MSG_FORM
# ========================================================================= # =========================================================================
# MBED # MBED
toolchain.info(">>> STATIC ANALYSIS FOR %s (%s, %s)" % ('MBED', target.name, toolchain_name)) toolchain.info("Static analysis for %s (%s, %s)" % ('MBED', target.name, toolchain_name))
# Common Headers # Common Headers
toolchain.copy_files(toolchain.scan_resources(MBED_API).headers, MBED_LIBRARIES) toolchain.copy_files(toolchain.scan_resources(MBED_API).headers, MBED_LIBRARIES)
@ -418,7 +422,7 @@ def static_analysis_scan_library(src_paths, build_path, target, toolchain_name,
# The first path will give the name to the library # The first path will give the name to the library
name = basename(src_paths[0]) name = basename(src_paths[0])
toolchain.info(">>> STATIC ANALYSIS FOR LIBRARY %s (%s, %s)" % (name.upper(), target.name, toolchain_name)) toolchain.info("Static analysis for library %s (%s, %s)" % (name.upper(), target.name, toolchain_name))
# Scan Resources # Scan Resources
resources = [] resources = []

View File

@ -15,17 +15,16 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
from host_test import Test, DefaultTest from host_test import DefaultTest
from sys import stdout from sys import stdout
class DevNullTest(DefaultTest): class DevNullTest(DefaultTest):
def print_result(self, result):
print "\n{%s}\n{end}" % result
def run(self): def run(self):
test_result = True c = self.mbed.serial_read(512)
c = self.mbed.serial.read(512) if c is None:
self.print_result("ioerr_serial")
return
# Data from serial received correctly
print "Received %d bytes" % len(c) print "Received %d bytes" % len(c)
if "{failure}" not in c: if "{failure}" not in c:
self.print_result('success') self.print_result('success')

View File

@ -15,17 +15,17 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
from host_test import Test, DefaultTest from host_test import DefaultTest
from sys import stdout from sys import stdout
class HelloTest(DefaultTest): class HelloTest(DefaultTest):
HELLO_WORLD = "Hello World\n" HELLO_WORLD = "Hello World\n"
def print_result(self, result):
print "\n{%s}\n{end}" % result
def run(self): def run(self):
c = self.mbed.serial.read(len(self.HELLO_WORLD)) c = self.mbed.serial_read(len(self.HELLO_WORLD))
if c is None:
self.print_result("ioerr_serial")
return
stdout.write(c) stdout.write(c)
if c == self.HELLO_WORLD: # Hello World received if c == self.HELLO_WORLD: # Hello World received
self.print_result('success') self.print_result('success')

View File

@ -57,12 +57,33 @@ class Mbed:
print 'Mbed: "%s" "%s"' % (self.port, self.disk) print 'Mbed: "%s" "%s"' % (self.port, self.disk)
def init_serial(self, baud=9600, extra_baud=9600): def init_serial(self, baud=9600, extra_baud=9600):
result = True
try:
self.serial = Serial(self.port, timeout=1) self.serial = Serial(self.port, timeout=1)
except Exception as e:
result = False
# Port can be opened
if result:
self.serial.setBaudrate(baud) self.serial.setBaudrate(baud)
if self.extra_port: if self.extra_port:
self.extra_serial = Serial(self.extra_port, timeout = 1) self.extra_serial = Serial(self.extra_port, timeout = 1)
self.extra_serial.setBaudrate(extra_baud) self.extra_serial.setBaudrate(extra_baud)
self.flush() self.flush()
return result
def serial_read(self, count=1):
""" Wraps self.mbed.serial object read method """
result = None
if self.serial:
result = self.serial.read(count)
return result
def serial_write(self, write_buffer):
""" Wraps self.mbed.serial object write method """
result = -1
if self.serial:
result = self.serial.write(write_buffer)
return result
def safe_sendBreak(self, serial): def safe_sendBreak(self, serial):
""" Wraps serial.sendBreak() to avoid serial::serialposix.py exception on Linux """ Wraps serial.sendBreak() to avoid serial::serialposix.py exception on Linux
@ -83,7 +104,6 @@ class Mbed:
serial.setBreak(False) serial.setBreak(False)
except: except:
result = False result = False
pass
return result return result
def reset(self): def reset(self):
@ -98,6 +118,7 @@ class Mbed:
self.extra_serial.flushInput() self.extra_serial.flushInput()
self.extra_serial.flushOutput() self.extra_serial.flushOutput()
class Test: class Test:
def __init__(self): def __init__(self):
self.mbed = Mbed() self.mbed = Mbed()
@ -110,18 +131,28 @@ class Test:
print str(e) print str(e)
self.print_result("error") self.print_result("error")
def setup(self):
""" Setup and check if configuration for test is correct. E.g. if serial port can be opened """
result = True
if not self.mbed.serial:
result = False
self.print_result("ioerr_serial")
return result
def notify(self, message): def notify(self, message):
""" On screen notification function """
print message print message
stdout.flush() stdout.flush()
def print_result(self, result): def print_result(self, result):
""" Test result unified printing function """
self.notify("\n{%s}\n{end}" % result) self.notify("\n{%s}\n{end}" % result)
class DefaultTest(Test): class DefaultTest(Test):
def __init__(self): def __init__(self):
Test.__init__(self) Test.__init__(self)
self.mbed.init_serial() serial_init_res = self.mbed.init_serial()
self.mbed.reset() self.mbed.reset()
""" """
@ -140,7 +171,10 @@ class Simple(DefaultTest):
def run(self): def run(self):
try: try:
while True: while True:
c = self.mbed.serial.read(512) c = self.mbed.serial_read(512)
if c is None:
self.print_result("ioerr_serial")
break
stdout.write(c) stdout.write(c)
stdout.flush() stdout.flush()
except KeyboardInterrupt, _: except KeyboardInterrupt, _:

View File

@ -15,24 +15,23 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
import random
import re import re
from host_test import Test, DefaultTest from host_test import DefaultTest
from time import time, strftime, gmtime from time import strftime, gmtime
from sys import stdout from sys import stdout
class RTCTest(DefaultTest): class RTCTest(DefaultTest):
pattern_rtc_value = "^\[(\d+)\] \[(\d+-\d+-\d+ \d+:\d+:\d+ [AaPpMm]{2})\]\\n" PATTERN_RTC_VALUE = "^\[(\d+)\] \[(\d+-\d+-\d+ \d+:\d+:\d+ [AaPpMm]{2})\]\\n"
re_detect_rtc_value = re.compile(pattern_rtc_value) re_detect_rtc_value = re.compile(PATTERN_RTC_VALUE)
def print_result(self, result):
print "\n{%s}\n{end}" % result
def run(self): def run(self):
test_result = True test_result = True
c = self.mbed.serial.timeout = None c = self.mbed.serial.timeout = None
for i in range(0, 5): for i in range(0, 5):
c = self.mbed.serial.read(38) # 38 len("[1256729742] [2009-10-28 11:35:42 AM]\n" c = self.mbed.serial_read(38) # 38 len("[1256729742] [2009-10-28 11:35:42 AM]\n"
if c is None:
self.print_result("ioerr_serial")
return
stdout.flush() stdout.flush()
m = self.re_detect_rtc_value.search(c) m = self.re_detect_rtc_value.search(c)
if m and len(m.groups()): if m and len(m.groups()):

View File

@ -17,17 +17,13 @@ limitations under the License.
import random import random
import re import re
from host_test import Test, DefaultTest from host_test import DefaultTest
from time import time from time import time
from sys import stdout from sys import stdout
class StdioTest(DefaultTest): class StdioTest(DefaultTest):
PATTERN_INT_VALUE = "^Your value was: (-?\d+)"
pattern_int_value = "^Your value was: (-?\d+)" re_detect_int_value = re.compile(PATTERN_INT_VALUE)
re_detect_int_value = re.compile(pattern_int_value)
def print_result(self, result):
print "\n{%s}\n{end}" % result
def run(self): def run(self):
test_result = True test_result = True
@ -41,7 +37,10 @@ class StdioTest(DefaultTest):
ip_msg_timeout = self.mbed.options.timeout ip_msg_timeout = self.mbed.options.timeout
start_serial_pool = time(); start_serial_pool = time();
while (time() - start_serial_pool) < ip_msg_timeout: while (time() - start_serial_pool) < ip_msg_timeout:
c = self.mbed.serial.read(512) c = self.mbed.serial_read(512)
if c is None:
self.print_result("ioerr_serial")
return
stdout.write(c) stdout.write(c)
stdout.flush() stdout.flush()
serial_stdio_msg += c serial_stdio_msg += c

View File

@ -17,7 +17,7 @@ limitations under the License.
from SocketServer import BaseRequestHandler, TCPServer from SocketServer import BaseRequestHandler, TCPServer
import socket import socket
from host_test import Test, DefaultTest from host_test import Test
from sys import stdout from sys import stdout
SERVER_IP = str(socket.gethostbyname(socket.getfqdn())) SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))

View File

@ -17,7 +17,7 @@ limitations under the License.
import socket import socket
import re import re
from host_test import Test, DefaultTest from host_test import DefaultTest
from time import time from time import time
from sys import stdout from sys import stdout
@ -26,18 +26,18 @@ class TCPEchoServerTest(DefaultTest):
ECHO_PORT = 0 ECHO_PORT = 0
s = None # Socket s = None # Socket
pattern_server_ip = "^Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)" PATTERN_SERVER_IP = "^Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
re_detect_server_ip = re.compile(pattern_server_ip) re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
def print_result(self, result):
print "\n{%s}\n{end}" % result
def run(self): def run(self):
ip_msg_timeout = self.mbed.options.timeout ip_msg_timeout = self.mbed.options.timeout
serial_ip_msg = "" serial_ip_msg = ""
start_serial_pool = time(); start_serial_pool = time();
while (time() - start_serial_pool) < ip_msg_timeout: while (time() - start_serial_pool) < ip_msg_timeout:
c = self.mbed.serial.read(512) c = self.mbed.serial_read(512)
if c is None:
self.print_result("ioerr_serial")
return
stdout.write(c) stdout.write(c)
stdout.flush() stdout.flush()
serial_ip_msg += c serial_ip_msg += c
@ -80,7 +80,10 @@ class TCPEchoServerTest(DefaultTest):
# Receiving # Receiving
try: try:
while True: while True:
c = self.mbed.serial.read(512) c = self.mbed.serial_read(512)
if c is None:
self.print_result("ioerr_serial")
break
stdout.write(c) stdout.write(c)
stdout.flush() stdout.flush()
except KeyboardInterrupt, _: except KeyboardInterrupt, _:

View File

@ -23,10 +23,9 @@ udp_link_layer_auto.py -p COM20 -d E:\ -t 10
import thread import thread
from SocketServer import BaseRequestHandler, UDPServer from SocketServer import BaseRequestHandler, UDPServer
# from socket import socket, AF_INET, SOCK_DGRAM
import socket import socket
import re import re
from host_test import Test, DefaultTest from host_test import DefaultTest
from time import time, sleep from time import time, sleep
from sys import stdout from sys import stdout
@ -41,13 +40,10 @@ dict_udp_sent_datagrams = dict()
class UDPEchoClient_Handler(BaseRequestHandler): class UDPEchoClient_Handler(BaseRequestHandler):
def handle(self): def handle(self):
""" One handle per connection """ """ One handle per connection """
data, socket = self.request _data, _socket = self.request
# Process received datagram # Process received datagram
data_str = repr(data)[1:-1] data_str = repr(_data)[1:-1]
dict_udp_recv_datagrams[data_str] = time() dict_udp_recv_datagrams[data_str] = time()
# print "[UDP_COUNTER] [" + data_str + "]"
# print ".",
def udp_packet_recv(threadName, server_ip, server_port): def udp_packet_recv(threadName, server_ip, server_port):
@ -66,11 +62,8 @@ class UDPEchoServerTest(DefaultTest):
TEST_PACKET_COUNT = 1000 # how many packets should be send TEST_PACKET_COUNT = 1000 # how many packets should be send
TEST_STRESS_FACTOR = 0.001 # stress factor: 10 ms TEST_STRESS_FACTOR = 0.001 # stress factor: 10 ms
pattern_server_ip = "^Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)" PATTERN_SERVER_IP = "^Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
re_detect_server_ip = re.compile(pattern_server_ip) re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
def print_result(self, result):
print "\n{%s}\n{end}" % result
def get_control_data(self, command="stat\n"): def get_control_data(self, command="stat\n"):
BUFFER_SIZE = 256 BUFFER_SIZE = 256
@ -84,9 +77,12 @@ class UDPEchoServerTest(DefaultTest):
def run(self): def run(self):
ip_msg_timeout = self.mbed.options.timeout ip_msg_timeout = self.mbed.options.timeout
serial_ip_msg = "" serial_ip_msg = ""
start_serial_pool = time(); start_serial_pool = time()
while (time() - start_serial_pool) < ip_msg_timeout: while (time() - start_serial_pool) < ip_msg_timeout:
c = self.mbed.serial.read(512) c = self.mbed.serial_read(512)
if c is None:
self.print_result("ioerr_serial")
return
stdout.write(c) stdout.write(c)
stdout.flush() stdout.flush()
serial_ip_msg += c serial_ip_msg += c
@ -148,7 +144,10 @@ class UDPEchoServerTest(DefaultTest):
print "Remaining mbed serial port data:" print "Remaining mbed serial port data:"
try: try:
while True: while True:
c = self.mbed.serial.read(512) c = self.mbed.serial_read(512)
if c is None:
self.print_result("ioerr_serial")
break
stdout.write(c) stdout.write(c)
stdout.flush() stdout.flush()
except KeyboardInterrupt, _: except KeyboardInterrupt, _:

View File

@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
from SocketServer import BaseRequestHandler, UDPServer from SocketServer import BaseRequestHandler, UDPServer
from host_test import Test, DefaultTest from host_test import Test
import socket import socket
from sys import stdout from sys import stdout

View File

@ -17,7 +17,7 @@ limitations under the License.
from socket import socket, AF_INET, SOCK_DGRAM from socket import socket, AF_INET, SOCK_DGRAM
import re import re
from host_test import Test, DefaultTest from host_test import DefaultTest
from time import time from time import time
from sys import stdout from sys import stdout
@ -26,18 +26,18 @@ class UDPEchoServerTest(DefaultTest):
ECHO_PORT = 0 ECHO_PORT = 0
s = None # Socket s = None # Socket
pattern_server_ip = "^Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)" PATTERN_SERVER_IP = "^Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
re_detect_server_ip = re.compile(pattern_server_ip) re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
def print_result(self, result):
print "\n{%s}\n{end}" % result
def run(self): def run(self):
ip_msg_timeout = self.mbed.options.timeout ip_msg_timeout = self.mbed.options.timeout
serial_ip_msg = "" serial_ip_msg = ""
start_serial_pool = time(); start_serial_pool = time();
while (time() - start_serial_pool) < ip_msg_timeout: while (time() - start_serial_pool) < ip_msg_timeout:
c = self.mbed.serial.read(512) c = self.mbed.serial_read(512)
if c is None:
self.print_result("ioerr_serial")
return
stdout.write(c) stdout.write(c)
stdout.flush() stdout.flush()
serial_ip_msg += c serial_ip_msg += c
@ -79,7 +79,10 @@ class UDPEchoServerTest(DefaultTest):
# Receiving # Receiving
try: try:
while True: while True:
c = self.mbed.serial.read(512) c = self.mbed.serial_read(512)
if c is None:
self.print_result("ioerr_serial")
break
stdout.write(c) stdout.write(c)
stdout.flush() stdout.flush()
except KeyboardInterrupt, _: except KeyboardInterrupt, _:

View File

@ -15,28 +15,35 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
from host_test import Test, DefaultTest from host_test import DefaultTest
from time import time from time import time
from sys import stdout from sys import stdout
class WaitusTest(DefaultTest): class WaitusTest(DefaultTest):
def print_result(self, result):
print "\n{%s}\n{end}" % result
def run(self): def run(self):
test_result = True test_result = True
# First character to start test (to know after reset when test starts) # First character to start test (to know after reset when test starts)
self.mbed.serial.timeout = None self.mbed.serial.timeout = None
c = self.mbed.serial.read(1) c = self.mbed.serial_read(1)
if c is None:
self.print_result("ioerr_serial")
return
if c == '$': # target will printout TargetID e.g.: $$$$1040e649d5c09a09a3f6bc568adef61375c6 if c == '$': # target will printout TargetID e.g.: $$$$1040e649d5c09a09a3f6bc568adef61375c6
#Read additional 39 bytes of TargetID #Read additional 39 bytes of TargetID
self.mbed.serial.read(39) if not self.mbed.serial_read(39):
c = self.mbed.serial.read(1) # Re-read first 'tick' self.print_result("ioerr_serial")
return
c = self.mbed.serial_read(1) # Re-read first 'tick'
if c is None:
self.print_result("ioerr_serial")
return
print "Test started" print "Test started"
start_serial_pool = start = time(); start_serial_pool = start = time();
for i in range(0, 10): for i in range(0, 10):
c = self.mbed.serial.read(1) c = self.mbed.serial_read(1)
if c is None:
self.print_result("ioerr_serial")
return
if i > 2: # we will ignore first few measurements if i > 2: # we will ignore first few measurements
delta = time() - start delta = time() - start
deviation = abs(delta - 1) deviation = abs(delta - 1)

View File

@ -75,9 +75,9 @@ import json
import optparse import optparse
import pprint import pprint
import re import re
import os
from types import ListType from types import ListType
from prettytable import PrettyTable from prettytable import PrettyTable
from serial import Serial
from os.path import join, abspath, dirname, exists, basename from os.path import join, abspath, dirname, exists, basename
from shutil import copy from shutil import copy
@ -94,6 +94,8 @@ sys.path.insert(0, ROOT)
# Imports related to mbed build pi # Imports related to mbed build pi
from workspace_tools.build_api import build_project, build_mbed_libs, build_lib from workspace_tools.build_api import build_project, build_mbed_libs, build_lib
from workspace_tools.build_api import mcu_toolchain_matrix from workspace_tools.build_api import mcu_toolchain_matrix
from workspace_tools.build_api import get_unique_supported_toolchains
from workspace_tools.build_api import get_target_supported_toolchains
from workspace_tools.paths import BUILD_DIR from workspace_tools.paths import BUILD_DIR
from workspace_tools.paths import HOST_TESTS from workspace_tools.paths import HOST_TESTS
from workspace_tools.targets import TARGET_MAP from workspace_tools.targets import TARGET_MAP
@ -106,7 +108,6 @@ ROOT = abspath(join(dirname(__file__), ".."))
sys.path.insert(0, ROOT) sys.path.insert(0, ROOT)
# Imports related to mbed build pi # Imports related to mbed build pi
from workspace_tools.utils import delete_dir_files, copy_file
from workspace_tools.settings import MUTs from workspace_tools.settings import MUTs
@ -135,63 +136,67 @@ class ProcessObserver(Thread):
class SingleTestRunner(object): class SingleTestRunner(object):
""" Object wrapper for single test run which may involve multiple MUTs.""" """ Object wrapper for single test run which may involve multiple MUTs."""
re_detect_testcase_result = None RE_DETECT_TESTCASE_RESULT = None
# Return codes for test script
TEST_RESULT_OK = "OK" TEST_RESULT_OK = "OK"
TEST_RESULT_FAIL = "FAIL" TEST_RESULT_FAIL = "FAIL"
TEST_RESULT_ERROR = "ERROR" TEST_RESULT_ERROR = "ERROR"
TEST_RESULT_UNDEF = "UNDEF" TEST_RESULT_UNDEF = "UNDEF"
TEST_RESULT_IOERR_COPY = "IOERR_COPY"
TEST_RESULT_IOERR_DISK = "IOERR_DISK"
TEST_RESULT_IOERR_SERIAL = "IOERR_SERIAL"
TEST_RESULT_TIMEOUT = "TIMEOUT"
GLOBAL_LOOPS_COUNT = 1 # How many times each test should be repeated
TEST_LOOPS_LIST = [] # We redefine no.of loops per test_id
TEST_LOOPS_DICT = {} # TEST_LOOPS_LIST in dict format: { test_id : test_loop_count}
# mbed test suite -> SingleTestRunner # mbed test suite -> SingleTestRunner
TEST_RESULT_MAPPING = {"success" : TEST_RESULT_OK, TEST_RESULT_MAPPING = {"success" : TEST_RESULT_OK,
"failure" : TEST_RESULT_FAIL, "failure" : TEST_RESULT_FAIL,
"error" : TEST_RESULT_ERROR, "error" : TEST_RESULT_ERROR,
"ioerr_copy" : TEST_RESULT_IOERR_COPY,
"ioerr_disk" : TEST_RESULT_IOERR_DISK,
"ioerr_serial" : TEST_RESULT_IOERR_SERIAL,
"timeout" : TEST_RESULT_TIMEOUT,
"end" : TEST_RESULT_UNDEF} "end" : TEST_RESULT_UNDEF}
def __init__(self): def __init__(self, _global_loops_count=1, _test_loops_list=""):
pattern = "\\{(" + "|".join(self.TEST_RESULT_MAPPING.keys()) + ")\\}" pattern = "\\{(" + "|".join(self.TEST_RESULT_MAPPING.keys()) + ")\\}"
self.re_detect_testcase_result = re.compile(pattern) self.RE_DETECT_TESTCASE_RESULT = re.compile(pattern)
def run_simple_test(self, target_name, port,
duration, verbose=False):
"""
Functions resets target and grabs by timeouted pooling test log
via serial port.
Function assumes target is already flashed with proper 'test' binary.
"""
output = ""
# Prepare serial for receiving data from target
baud = 9600
serial = Serial(port, timeout=1)
serial.setBaudrate(baud)
flush_serial(serial)
# Resetting target and pooling
reset(target_name, serial, verbose=verbose)
start_serial_timeour = time()
try: try:
while (time() - start_serial_timeour) < duration: _global_loops_count = int(_global_loops_count)
test_output = serial.read(512) except:
output += test_output _global_loops_count = 1
flush_serial(serial) if _global_loops_count < 1:
if '{end}' in output: _global_loops_count = 1
break self.GLOBAL_LOOPS_COUNT = _global_loops_count
except KeyboardInterrupt, _: self.TEST_LOOPS_LIST = _test_loops_list if _test_loops_list else []
print "CTRL+C break" self.TEST_LOOPS_DICT = self.test_loop_list_to_dict(_test_loops_list)
flush_serial(serial)
serial.close()
# Handle verbose mode def test_loop_list_to_dict(self, test_loops_str):
if verbose: """ Transforms test_id=X,test_id=X,test_id=X into dictionary {test_id : test_id_loops_count} """
print "Test::Output::Start" result = {}
print output if test_loops_str:
print "Test::Output::Finish" test_loops = test_loops_str.split(',')
for test_loop in test_loops:
test_loop_count = test_loop.split('=')
if len(test_loop_count) == 2:
_test_id, _test_loops = test_loop_count
try:
_test_loops = int(_test_loops)
except:
continue
result[_test_id] = _test_loops
return result
# Parse test 'output' data def get_test_loop_count(self, test_id):
result = self.TEST_RESULT_UNDEF """ This function returns no. of loops per test (deducted by test_id_.
for line in output.splitlines(): If test is not in list of redefined loop counts it will use default value. """
search_result = self.re_detect_testcase_result.search(line) result = self.GLOBAL_LOOPS_COUNT
if search_result and len(search_result.groups()): if test_id in self.TEST_LOOPS_DICT:
result = self.TEST_RESULT_MAPPING[search_result.groups(0)[0]] result = self.TEST_LOOPS_DICT[test_id]
break
return result return result
def file_copy_method_selector(self, image_path, disk, copy_method): def file_copy_method_selector(self, image_path, disk, copy_method):
@ -213,18 +218,44 @@ class SingleTestRunner(object):
fdst.write(buf) fdst.write(buf)
IOError: [Errno 28] No space left on device IOError: [Errno 28] No space left on device
""" """
result = True
resutl_msg = ""
if copy_method == "cp" or copy_method == "copy" or copy_method == "xcopy": if copy_method == "cp" or copy_method == "copy" or copy_method == "xcopy":
cmd = [copy_method, image_path.encode('ascii', 'ignore'), disk.encode('ascii', 'ignore') + basename(image_path).encode('ascii', 'ignore')] cmd = [copy_method,
call(cmd, shell=True) image_path.encode('ascii', 'ignore'),
disk.encode('ascii', 'ignore') + basename(image_path).encode('ascii', 'ignore')]
try:
ret = call(cmd, shell=True)
if ret:
resutl_msg = "Return code: %d. Command: "% ret + " ".join(cmd)
result = False
except Exception, e:
resutl_msg = e
result = False
else: else:
copy_method = "shutils.copy()"
# Default python method # Default python method
try:
copy(image_path, disk) copy(image_path, disk)
except Exception, e:
resutl_msg = e
result = False
return result, resutl_msg, copy_method
def handle(self, test_spec, target_name, toolchain_name): def delete_file(file_path):
""" """ Remove file from the system """
Function determines MUT's mbed disk/port and copies binary to result = True
target. Test is being invoked afterwards. resutl_msg = ""
""" try:
os.remove(file_path)
except Exception, e:
resutl_msg = e
result = False
return result, resutl_msg
def handle(self, test_spec, target_name, toolchain_name, test_loops=1):
""" Function determines MUT's mbed disk/port and copies binary to
target. Test is being invoked afterwards. """
data = json.loads(test_spec) data = json.loads(test_spec)
# Get test information, image and test timeout # Get test information, image and test timeout
test_id = data['test_id'] test_id = data['test_id']
@ -259,31 +290,55 @@ class SingleTestRunner(object):
return (test_result, target_name, toolchain_name, return (test_result, target_name, toolchain_name,
test_id, test_description, round(elapsed_time, 2), duration) test_id, test_description, round(elapsed_time, 2), duration)
#if not target_by_mcu.is_disk_virtual:
# delete_dir_files(disk)
# Program MUT with proper image file # Program MUT with proper image file
if not disk.endswith('/') and not disk.endswith('\\'): if not disk.endswith('/') and not disk.endswith('\\'):
disk += '/' disk += '/'
# Tests can be looped so test results must be stored for the same test
test_all_result = []
for test_index in range(test_loops):
# Choose one method of copy files to mbed virtual drive # Choose one method of copy files to mbed virtual drive
self.file_copy_method_selector(image_path, disk, opts.copy_method) _copy_res, _err_msg, _copy_method = self.file_copy_method_selector(image_path, disk, opts.copy_method)
# Host test execution
start_host_exec_time = time()
if not _copy_res: # Serial port copy error
test_result = "IOERR_COPY"
print "Error: Copy method '%s'. %s"% (_copy_method, _err_msg)
else:
# Copy Extra Files # Copy Extra Files
if not target_by_mcu.is_disk_virtual and test.extra_files: if not target_by_mcu.is_disk_virtual and test.extra_files:
for f in test.extra_files: for f in test.extra_files:
copy(f, disk) copy(f, disk)
sleep(target_by_mcu.program_cycle_s()) sleep(target_by_mcu.program_cycle_s())
# Host test execution # Host test execution
start_host_exec_time = time() start_host_exec_time = time()
test_result = self.run_host_test(test.host_test, disk, port, duration, opts.verbose) test_result = self.run_host_test(test.host_test, disk, port, duration, opts.verbose)
test_all_result.append(test_result)
elapsed_time = time() - start_host_exec_time elapsed_time = time() - start_host_exec_time
print print_test_result(test_result, target_name, toolchain_name, print print_test_result(test_result, target_name, toolchain_name,
test_id, test_description, elapsed_time, duration) test_id, test_description, elapsed_time, duration)
return (test_result, target_name, toolchain_name, return (self.shape_global_test_loop_result(test_all_result), target_name, toolchain_name,
test_id, test_description, round(elapsed_time, 2), duration) test_id, test_description, round(elapsed_time, 2),
duration, self.shape_test_loop_ok_result_count(test_all_result))
def shape_test_loop_ok_result_count(self, test_all_result):
""" Reformats list of results to simple string """
test_loop_count = len(test_all_result)
test_loop_ok_result = test_all_result.count(self.TEST_RESULT_OK)
return "%d/%d"% (test_loop_ok_result, test_loop_count)
def shape_global_test_loop_result(self, test_all_result):
""" Reformats list of results to simple string """
test_loop_count = len(test_all_result)
test_loop_ok_result = test_all_result.count(self.TEST_RESULT_OK)
result = self.TEST_RESULT_FAIL
if all(test_all_result[0] == res for res in test_all_result):
result = test_all_result[0]
return result
def run_host_test(self, name, disk, port, duration, verbose=False, extra_serial=""): def run_host_test(self, name, disk, port, duration, verbose=False, extra_serial=""):
# print "{%s} port:%s disk:%s" % (name, port, disk), # print "{%s} port:%s disk:%s" % (name, port, disk),
@ -320,19 +375,13 @@ class SingleTestRunner(object):
# Parse test 'output' data # Parse test 'output' data
result = self.TEST_RESULT_UNDEF result = self.TEST_RESULT_UNDEF
for line in "".join(output).splitlines(): for line in "".join(output).splitlines():
search_result = self.re_detect_testcase_result.search(line) search_result = self.RE_DETECT_TESTCASE_RESULT.search(line)
if search_result and len(search_result.groups()): if search_result and len(search_result.groups()):
result = self.TEST_RESULT_MAPPING[search_result.groups(0)[0]] result = self.TEST_RESULT_MAPPING[search_result.groups(0)[0]]
break break
return result return result
def flush_serial(serial):
""" Flushing serial in/out. """
serial.flushInput()
serial.flushOutput()
def is_peripherals_available(target_mcu_name, peripherals=None): def is_peripherals_available(target_mcu_name, peripherals=None):
""" Checks if specified target should run specific peripheral test case.""" """ Checks if specified target should run specific peripheral test case."""
if peripherals is not None: if peripherals is not None:
@ -386,7 +435,7 @@ def get_json_data_from_file(json_spec_filename, verbose=False):
result = json.load(data_file) result = json.load(data_file)
except ValueError as json_error_msg: except ValueError as json_error_msg:
result = None result = None
print "Error: %s" % (json_error_msg) print "Error in '%s' file: %s" % (json_spec_filename, json_error_msg)
except IOError as fileopen_error_msg: except IOError as fileopen_error_msg:
print "Error: %s" % (fileopen_error_msg) print "Error: %s" % (fileopen_error_msg)
if verbose and result: if verbose and result:
@ -395,6 +444,96 @@ def get_json_data_from_file(json_spec_filename, verbose=False):
return result return result
def print_muts_configuration_from_json(json_data, join_delim=", "):
""" Prints MUTs configuration passed to test script for verboseness. """
muts_info_cols = []
# We need to check all unique properties for each defined MUT
for k in json_data:
mut_info = json_data[k]
for property in mut_info:
if property not in muts_info_cols:
muts_info_cols.append(property)
# Prepare pretty table object to display all MUTs
pt_cols = ["index"] + muts_info_cols
pt = PrettyTable(pt_cols)
for col in pt_cols:
pt.align[col] = "l"
# Add rows to pretty print object
for k in json_data:
row = [k]
mut_info = json_data[k]
for col in muts_info_cols:
cell_val = mut_info[col] if col in mut_info else None
if type(cell_val) == ListType:
cell_val = join_delim.join(cell_val)
row.append(cell_val)
pt.add_row(row)
return pt.get_string()
def print_test_configuration_from_json(json_data, join_delim=", "):
""" Prints test specification configuration passed to test script for verboseness. """
toolchains_info_cols = []
# We need to check all toolchains for each device
for k in json_data:
# k should be 'targets'
targets = json_data[k]
for target in targets:
toolchains = targets[target]
for toolchain in toolchains:
if toolchain not in toolchains_info_cols:
toolchains_info_cols.append(toolchain)
# Prepare pretty table object to display test specification
pt_cols = ["mcu"] + sorted(toolchains_info_cols)
pt = PrettyTable(pt_cols)
for col in pt_cols:
pt.align[col] = "l"
# { target : [conflicted toolchains] }
toolchain_conflicts = {}
for k in json_data:
# k should be 'targets'
targets = json_data[k]
for target in targets:
target_supported_toolchains = get_target_supported_toolchains(target)
if not target_supported_toolchains:
target_supported_toolchains = []
target_name = target if target in TARGET_MAP else "%s*"% target
row = [target_name]
toolchains = targets[target]
for toolchain in toolchains_info_cols:
# Check for conflicts
conflict = False
if toolchain in toolchains:
if toolchain not in target_supported_toolchains:
conflict = True
if target not in toolchain_conflicts:
toolchain_conflicts[target] = []
toolchain_conflicts[target].append(toolchain)
# Add marker inside table about target usage / conflict
cell_val = 'Yes' if toolchain in toolchains else '-'
if conflict:
cell_val += '*'
row.append(cell_val)
pt.add_row(row)
# generate result string
result = pt.get_string() # Test specification table
if toolchain_conflicts: # Print conflicts if the exist
result += "\n"
result += "Toolchain conflicts:\n"
for target in toolchain_conflicts:
if target not in TARGET_MAP:
result += "\t* Target %s unknown\n"% (target)
conflict_target_list = ", ".join(toolchain_conflicts[target])
sufix = 's' if len(toolchain_conflicts[target]) > 1 else ''
result += "\t* Target %s does not support %s toolchain%s\n"% (target, conflict_target_list, sufix)
return result
def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=','): def get_avail_tests_summary_table(cols=None, result_summary=True, join_delim=','):
# get all unique test ID prefixes # get all unique test ID prefixes
unique_test_id = [] unique_test_id = []
@ -534,7 +673,6 @@ def generate_test_summary_by_target(test_summary):
if test[TEST_INDEX] not in result_dict: if test[TEST_INDEX] not in result_dict:
result_dict[test[TEST_INDEX]] = { } result_dict[test[TEST_INDEX]] = { }
result_dict[test[TEST_INDEX]][test[TOOLCHAIN_INDEX]] = test[RESULT_INDEX] result_dict[test[TEST_INDEX]][test[TOOLCHAIN_INDEX]] = test[RESULT_INDEX]
pass
pt_cols = ["Target", "Test ID", "Test Description"] + unique_toolchains pt_cols = ["Target", "Test ID", "Test Description"] + unique_toolchains
pt = PrettyTable(pt_cols) pt = PrettyTable(pt_cols)
@ -559,7 +697,7 @@ def generate_test_summary(test_summary):
result = "Test summary:\n" result = "Test summary:\n"
# Pretty table package is used to print results # Pretty table package is used to print results
pt = PrettyTable(["Result", "Target", "Toolchain", "Test ID", "Test Description", pt = PrettyTable(["Result", "Target", "Toolchain", "Test ID", "Test Description",
"Elapsed Time (sec)", "Timeout (sec)"]) "Elapsed Time (sec)", "Timeout (sec)", "Loops"])
pt.align["Result"] = "l" # Left align pt.align["Result"] = "l" # Left align
pt.align["Target"] = "l" # Left align pt.align["Target"] = "l" # Left align
pt.align["Toolchain"] = "l" # Left align pt.align["Toolchain"] = "l" # Left align
@ -570,7 +708,12 @@ def generate_test_summary(test_summary):
result_dict = {single_test.TEST_RESULT_OK : 0, result_dict = {single_test.TEST_RESULT_OK : 0,
single_test.TEST_RESULT_FAIL : 0, single_test.TEST_RESULT_FAIL : 0,
single_test.TEST_RESULT_ERROR : 0, single_test.TEST_RESULT_ERROR : 0,
single_test.TEST_RESULT_UNDEF : 0 } single_test.TEST_RESULT_UNDEF : 0,
single_test.TEST_RESULT_UNDEF : 0,
single_test.TEST_RESULT_UNDEF : 0,
single_test.TEST_RESULT_IOERR_COPY : 0,
single_test.TEST_RESULT_IOERR_DISK : 0,
single_test.TEST_RESULT_TIMEOUT : 0 }
for test in test_summary: for test in test_summary:
if test[0] in result_dict: if test[0] in result_dict:
@ -666,6 +809,20 @@ if __name__ == '__main__':
default=False, default=False,
help="Only build tests, skips actual test procedures (flashing etc.)") help="Only build tests, skips actual test procedures (flashing etc.)")
parser.add_option('', '--config',
dest='verbose_test_configuration_only',
default=False,
action="store_true",
help='Displays full test specification and MUTs configration and exits')
parser.add_option('', '--loops',
dest='test_loops_list',
help='Set no. of loops per test. Format: TEST_1=1,TEST_2=2,TEST_3=3')
parser.add_option('', '--global-loops',
dest='test_global_loops_value',
help='Set global number of test loops per test. Default value is set 1')
parser.add_option('-v', '--verbose', parser.add_option('-v', '--verbose',
dest='verbose', dest='verbose',
default=False, default=False,
@ -696,21 +853,37 @@ if __name__ == '__main__':
# Open file with test specification # Open file with test specification
# test_spec_filename tells script which targets and their toolchain(s) # test_spec_filename tells script which targets and their toolchain(s)
# should be covered by the test scenario # should be covered by the test scenario
test_spec = get_json_data_from_file(opts.test_spec_filename, opts.verbose) if opts.test_spec_filename else None test_spec = get_json_data_from_file(opts.test_spec_filename) if opts.test_spec_filename else None
if test_spec is None: if test_spec is None:
parser.print_help() parser.print_help()
exit(-1) exit(-1)
# Get extra MUTs if applicable # Get extra MUTs if applicable
if opts.muts_spec_filename: if opts.muts_spec_filename:
MUTs = get_json_data_from_file(opts.muts_spec_filename, opts.verbose) MUTs = get_json_data_from_file(opts.muts_spec_filename)
if MUTs is None: if MUTs is None:
parser.print_help() parser.print_help()
exit(-1) exit(-1)
# Only prints read MUTs configuration
if MUTs and opts.verbose_test_configuration_only:
print "MUTs configuration in %s:"% opts.muts_spec_filename
print print_muts_configuration_from_json(MUTs)
print
print "Test specification in %s:"% opts.test_spec_filename
print print_test_configuration_from_json(test_spec)
exit(0)
# Verbose test specification and MUTs configuration
if MUTs and opts.verbose:
print print_muts_configuration_from_json(MUTs)
if test_spec and opts.verbose:
print print_test_configuration_from_json(test_spec)
# Magic happens here... ;) # Magic happens here... ;)
start = time() start = time()
single_test = SingleTestRunner() single_test = SingleTestRunner(_global_loops_count=opts.test_global_loops_value, _test_loops_list=opts.test_loops_list)
clean = test_spec.get('clean', False) clean = test_spec.get('clean', False)
test_ids = test_spec.get('test_ids', []) test_ids = test_spec.get('test_ids', [])
@ -723,9 +896,17 @@ if __name__ == '__main__':
for toolchain in toolchains: for toolchain in toolchains:
# print '=== %s::%s ===' % (target, toolchain) # print '=== %s::%s ===' % (target, toolchain)
# Let's build our test # Let's build our test
if target not in TARGET_MAP:
print 'Skipped tests for %s target. Target platform not found' % (target)
continue
T = TARGET_MAP[target] T = TARGET_MAP[target]
build_mbed_libs_options = ["analyze"] if opts.goanna_for_mbed_sdk else None build_mbed_libs_options = ["analyze"] if opts.goanna_for_mbed_sdk else None
build_mbed_libs(T, toolchain, options=build_mbed_libs_options) build_mbed_libs_result = build_mbed_libs(T, toolchain, options=build_mbed_libs_options)
if not build_mbed_libs_result:
print 'Skipped tests for %s target. Toolchain %s is not yet supported for this target' % (T.name, toolchain)
continue
build_dir = join(BUILD_DIR, "test", target, toolchain) build_dir = join(BUILD_DIR, "test", target, toolchain)
for test_id, test in TEST_MAP.iteritems(): for test_id, test in TEST_MAP.iteritems():
@ -752,6 +933,7 @@ if __name__ == '__main__':
print "TargetTest::%s::TestSkipped(%s)" % (target, ",".join(test_peripherals)) print "TargetTest::%s::TestSkipped(%s)" % (target, ",".join(test_peripherals))
continue continue
# This is basic structure storing test results
test_result = { test_result = {
'target': target, 'target': target,
'toolchain': toolchain, 'toolchain': toolchain,
@ -798,7 +980,8 @@ if __name__ == '__main__':
# For an automated test the duration act as a timeout after # For an automated test the duration act as a timeout after
# which the test gets interrupted # which the test gets interrupted
test_spec = shape_test_request(target, path, test_id, test.duration) test_spec = shape_test_request(target, path, test_id, test.duration)
single_test_result = single_test.handle(test_spec, target, toolchain) test_loops = single_test.get_test_loop_count(test_id)
single_test_result = single_test.handle(test_spec, target, toolchain, test_loops=test_loops)
test_summary.append(single_test_result) test_summary.append(single_test_result)
# print test_spec, target, toolchain # print test_spec, target, toolchain