diff --git a/workspace_tools/host_tests/detect_auto.py b/workspace_tools/host_tests/detect_auto.py index 7c755c9676..18a9d29fc7 100644 --- a/workspace_tools/host_tests/detect_auto.py +++ b/workspace_tools/host_tests/detect_auto.py @@ -19,6 +19,7 @@ from host_test import DefaultTest from sys import stdout import re + class DetectPlatformTest(DefaultTest): PATTERN_MICRO_NAME = "Target '(\w+)'" re_detect_micro_name = re.compile(PATTERN_MICRO_NAME) @@ -29,20 +30,17 @@ class DetectPlatformTest(DefaultTest): c = self.mbed.serial_readline() # {{start}} preamble print c if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return - print c.strip() - stdout.flush() - print "HOST: Detecting target name..." - stdout.flush() + self.notify(c.strip()) + self.notify("HOST: Detecting target name...") c = self.mbed.serial_readline() if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return - print c.strip() - stdout.flush() + self.notify(c.strip()) # Check for target name m = self.re_detect_micro_name.search(c) @@ -50,22 +48,17 @@ class DetectPlatformTest(DefaultTest): micro_name = m.groups()[0] micro_cmp = self.mbed.options.micro == micro_name result = result and micro_cmp - print "HOST: MUT Target name '%s', expected '%s'... [%s]"% (micro_name, self.mbed.options.micro, "OK" if micro_cmp else "FAIL") - stdout.flush() + self.notify("HOST: MUT Target name '%s', expected '%s'... [%s]"% (micro_name, self.mbed.options.micro, "OK" if micro_cmp else "FAIL")) for i in range(0, 2): c = self.mbed.serial_readline() if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return - print c.strip() - stdout.flush() + self.notify(c.strip()) + + self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE) - if result: # Hello World received - self.print_result('success') - else: - self.print_result('failure') - stdout.flush() if __name__ == '__main__': DetectPlatformTest().run() diff --git a/workspace_tools/host_tests/dev_null_auto.py b/workspace_tools/host_tests/dev_null_auto.py index 2eb422d66d..3e6e0af6cb 100644 --- a/workspace_tools/host_tests/dev_null_auto.py +++ b/workspace_tools/host_tests/dev_null_auto.py @@ -26,7 +26,7 @@ class DevNullTest(DefaultTest): result = False c = self.mbed.serial_readline() if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return None if text in c: result = True @@ -46,19 +46,17 @@ class DevNullTest(DefaultTest): for i in range(3): c = self.mbed.serial_read(32) if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return else: str += c if len(str) > 0: result = False break - print "Received %d bytes: %s"% (len(str), str) - stdout.flush() - if result: - self.print_result('success') - else: - self.print_result('failure') + self.notify("Received %d bytes: %s"% (len(str), str)) + + self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE) + if __name__ == '__main__': DevNullTest().run() diff --git a/workspace_tools/host_tests/echo.py b/workspace_tools/host_tests/echo.py index a2275aedce..1cfe3fbfd4 100644 --- a/workspace_tools/host_tests/echo.py +++ b/workspace_tools/host_tests/echo.py @@ -17,15 +17,16 @@ limitations under the License. import sys import uuid from sys import stdout -from host_test import Test +from host_test import TestResults, Test class EchoTest(Test): def __init__(self): + TestResults.__init__(self) Test.__init__(self) serial_init_res = self.mbed.init_serial(115200) if not serial_init_res: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) self.mbed.reset() self.TEST_LOOP_COUNT = 50 @@ -34,7 +35,7 @@ class EchoTest(Test): """ c = self.mbed.serial_readline() # '{{start}}' if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return print c.strip() stdout.flush() @@ -47,7 +48,7 @@ class EchoTest(Test): self.mbed.serial_write(TEST) c = self.mbed.serial_readline() if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return if c.strip() != TEST.strip(): self.notify('HOST: "%s" != "%s"'% (c, TEST)) diff --git a/workspace_tools/host_tests/hello_auto.py b/workspace_tools/host_tests/hello_auto.py index 64cf379845..9c4f7d3a2f 100644 --- a/workspace_tools/host_tests/hello_auto.py +++ b/workspace_tools/host_tests/hello_auto.py @@ -25,30 +25,25 @@ class HelloTest(DefaultTest): def run(self): c = self.mbed.serial_readline() if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return - print c.strip() - stdout.flush() + self.notify(c.strip()) c = self.mbed.serial_readline() if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return - print "Read %d bytes"% len(c) - print c.strip() - stdout.flush() + self.notify("Read %d bytes"% len(c)) + self.notify(c.strip()) + result = True # Because we can have targetID here let's try to decode if len(c) < len(self.HELLO_WORLD): result = False else: result = self.HELLO_WORLD in c + self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE) - if result: # Hello World received - self.print_result('success') - else: - self.print_result('failure') - stdout.flush() if __name__ == '__main__': HelloTest().run() diff --git a/workspace_tools/host_tests/host_test.py b/workspace_tools/host_tests/host_test.py index a517517c3a..02fc52102c 100644 --- a/workspace_tools/host_tests/host_test.py +++ b/workspace_tools/host_tests/host_test.py @@ -255,8 +255,18 @@ class Mbed: self.extra_serial.flushOutput() -class Test: - """ Baseclass for host test's test runner +class TestResults: + """ Test results set by host tests + """ + def __init__(self): + self.RESULT_SUCCESS = 'success' + self.RESULT_FAILURE = 'failure' + self.RESULT_ERROR = 'error' + self.RESULT_IO_SERIAL = 'ioerr_serial' + + +class Test(TestResults): + """ Base class for host test's test runner """ def __init__(self): self.mbed = Mbed() @@ -267,10 +277,10 @@ class Test: """ try: result = self.test() - self.print_result("success" if result else "failure") + self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE) except Exception, e: print str(e) - self.print_result("error") + self.print_result(self.RESULT_ERROR) def setup(self): """ Setup and check if configuration for test is correct. E.g. if serial port can be opened @@ -278,7 +288,7 @@ class Test: result = True if not self.mbed.serial: result = False - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return result def notify(self, message): @@ -290,17 +300,18 @@ class Test: def print_result(self, result): """ Test result unified printing function """ - self.notify("\n{%s}\n{end}" % result) + self.notify("\n{{%s}}\n{{end}}" % result) class DefaultTest(Test): """ Test class with serial port initialization """ def __init__(self): + TestResults.__init__(self) Test.__init__(self) serial_init_res = self.mbed.init_serial() if not serial_init_res: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) self.mbed.reset() @@ -314,12 +325,12 @@ class Simple(DefaultTest): while True: c = self.mbed.serial_read(512) if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) break stdout.write(c) stdout.flush() except KeyboardInterrupt, _: - print "\n[CTRL+c] exit" + self.notify("\r\n[CTRL+C] exit") if __name__ == '__main__': diff --git a/workspace_tools/host_tests/rtc_auto.py b/workspace_tools/host_tests/rtc_auto.py index a96eaa8d39..4c31c65e07 100644 --- a/workspace_tools/host_tests/rtc_auto.py +++ b/workspace_tools/host_tests/rtc_auto.py @@ -32,9 +32,9 @@ class RTCTest(DefaultTest): # Timeout changed from default: we need to wait longer for some boards to start-up c = self.mbed.serial_readline(timeout=10) if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return - print c.strip() + self.notify(c.strip()) delta = time() - start m = self.re_detect_rtc_value.search(c) if m and len(m.groups()): @@ -43,17 +43,14 @@ class RTCTest(DefaultTest): correct_time_str = strftime("%Y-%m-%d %H:%M:%S %p", gmtime(float(sec))) test_result = test_result and (time_str == correct_time_str) result_msg = "OK" if (time_str == correct_time_str and sec > 0 and sec > sec_prev) else "FAIL" - print "HOST: [%s] [%s] received time %+d sec after %.2f sec... %s"% (sec, time_str, sec - sec_prev, delta, result_msg) + self.notify("HOST: [%s] [%s] received time %+d sec after %.2f sec... %s"% (sec, time_str, sec - sec_prev, delta, result_msg)) sec_prev = sec else: test_result = False break start = time() - stdout.flush() - if test_result: # All numbers are the same - self.print_result('success') - else: - self.print_result('failure') + self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE) + if __name__ == '__main__': RTCTest().run() diff --git a/workspace_tools/host_tests/stdio_auto.py b/workspace_tools/host_tests/stdio_auto.py index c91d056909..3b30ea1a68 100644 --- a/workspace_tools/host_tests/stdio_auto.py +++ b/workspace_tools/host_tests/stdio_auto.py @@ -30,25 +30,22 @@ class StdioTest(DefaultTest): c = self.mbed.serial_readline() # {{start}} preamble if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return - print c - stdout.flush() + self.notify(c) for i in range(0, 10): random_integer = random.randint(-99999, 99999) - print "HOST: Generated number: " + str(random_integer) - stdout.flush() + self.notify("HOST: Generated number: " + str(random_integer)) start = time() self.mbed.serial_write(str(random_integer) + "\n") serial_stdio_msg = self.mbed.serial_readline() if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return delay_time = time() - start - print serial_stdio_msg.strip() - stdout.flush() + self.notify(serial_stdio_msg.strip()) # Searching for reply with scanned values m = self.re_detect_int_value.search(serial_stdio_msg) @@ -56,17 +53,13 @@ class StdioTest(DefaultTest): int_value = m.groups()[0] int_value_cmp = random_integer == int(int_value) test_result = test_result and int_value_cmp - print "HOST: Number %s read after %.3f sec ... [%s]"% (int_value, delay_time, "OK" if int_value_cmp else "FAIL") - print - stdout.flush() + self.notify("HOST: Number %s read after %.3f sec ... [%s]"% (int_value, delay_time, "OK" if int_value_cmp else "FAIL")) else: test_result = False break - if test_result: # All numbers are the same - self.print_result('success') - else: - self.print_result('failure') + self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE) + if __name__ == '__main__': StdioTest().run() diff --git a/workspace_tools/host_tests/tcpecho_client_auto.py b/workspace_tools/host_tests/tcpecho_client_auto.py index ba4ce13839..9d937a4c01 100644 --- a/workspace_tools/host_tests/tcpecho_client_auto.py +++ b/workspace_tools/host_tests/tcpecho_client_auto.py @@ -35,30 +35,28 @@ class TCPEchoClientTest(Test): def send_server_ip_port(self, ip_address, port_no): """ Set up network host. Reset target and and send server IP via serial to Mbed """ - print "HOST: Resetting target..." + self.notify("HOST: Resetting target...") self.mbed.reset() c = self.mbed.serial_readline() # 'TCPCllient waiting for server IP and port...' if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return - print c.strip() - print "HOST: Sending server IP Address to target...", - stdout.flush() + self.notify(c.strip()) + self.notify("HOST: Sending server IP Address to target...") + connection_str = ip_address + ":" + str(port_no) + "\n" self.mbed.serial_write(connection_str) - print connection_str - stdout.flush() + self.notify(connection_str) # Two more strings about connection should be sent by MBED for i in range(0, 2): c = self.mbed.serial_readline() if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return - print c.strip() - stdout.flush() + self.notify(c.strip()) class TCPEchoClient_Handler(BaseRequestHandler): diff --git a/workspace_tools/host_tests/tcpecho_server_auto.py b/workspace_tools/host_tests/tcpecho_server_auto.py index f98f0264ff..22dacd75bb 100644 --- a/workspace_tools/host_tests/tcpecho_server_auto.py +++ b/workspace_tools/host_tests/tcpecho_server_auto.py @@ -36,17 +36,15 @@ class TCPEchoServerTest(DefaultTest): result = False c = self.mbed.serial_readline() if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return - print c - stdout.flush() + self.notify(c) m = self.re_detect_server_ip.search(c) if m and len(m.groups()): self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4]) self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method - print "HOST: TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT) - stdout.flush() + self.notify("HOST: TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT)) # We assume this test fails so can't send 'error' message to server try: @@ -54,8 +52,8 @@ class TCPEchoServerTest(DefaultTest): self.s.connect((self.ECHO_SERVER_ADDRESS, self.ECHO_PORT)) except Exception, e: self.s = None - print "HOST: Error: %s" % e - self.print_result('error') + self.notify("HOST: Error: %s"% e) + self.print_result(self.RESULT_ERROR) exit(-1) print 'HOST: Sending %d echo strings...'% self.ECHO_LOOPs, @@ -82,17 +80,14 @@ class TCPEchoServerTest(DefaultTest): print "HOST: TCP Server not found" result = False - if result: - self.print_result('success') - else: - self.print_result('failure') + self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE) # Receiving try: while True: c = self.mbed.serial_read(512) if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) break stdout.write(c) stdout.flush() diff --git a/workspace_tools/host_tests/udp_link_layer_auto.py b/workspace_tools/host_tests/udp_link_layer_auto.py index 156b3a143d..8c8282b060 100644 --- a/workspace_tools/host_tests/udp_link_layer_auto.py +++ b/workspace_tools/host_tests/udp_link_layer_auto.py @@ -84,7 +84,7 @@ class UDPEchoServerTest(DefaultTest): def run(self): serial_ip_msg = self.mbed.serial_readline() if serial_ip_msg is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return stdout.write(serial_ip_msg) stdout.flush() @@ -93,16 +93,15 @@ class UDPEchoServerTest(DefaultTest): if m and len(m.groups()): self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4]) self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method - print "HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT) - stdout.flush() + self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT)) # Open client socket to burst datagrams to UDP server in mbed try: self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) except Exception, e: self.s = None - print "HOST: Error: %s" % e - self.print_result('error') + self.notify("HOST: Error: %s"% e) + self.print_result(self.RESULT_ERROR) return # UDP replied receiver works in background to get echoed datagrams @@ -129,8 +128,7 @@ class UDPEchoServerTest(DefaultTest): for d in range(5): sleep(1.0) summary_datagram_success = (float(len(dict_udp_recv_datagrams)) / float(self.TEST_PACKET_COUNT)) * 100.0 - # print dict_udp_recv_datagrams - print "HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms" % (d, summary_datagram_success, len(dict_udp_recv_datagrams), self.TEST_PACKET_COUNT, self.TEST_STRESS_FACTOR) + self.notify("HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms"% (d, summary_datagram_success, len(dict_udp_recv_datagrams), self.TEST_PACKET_COUNT, self.TEST_STRESS_FACTOR)) result = result and (summary_datagram_success >= self.PACKET_SATURATION_RATIO) stdout.flush() @@ -142,10 +140,7 @@ class UDPEchoServerTest(DefaultTest): print stdout.flush() - if result: - self.print_result('success') - else: - self.print_result('failure') + self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE) # Receiving serial data from mbed print @@ -154,7 +149,7 @@ class UDPEchoServerTest(DefaultTest): while True: c = self.mbed.serial_read(512) if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) break stdout.write(c) stdout.flush() diff --git a/workspace_tools/host_tests/udpecho_client_auto.py b/workspace_tools/host_tests/udpecho_client_auto.py index c7bce37209..ea6dc86288 100644 --- a/workspace_tools/host_tests/udpecho_client_auto.py +++ b/workspace_tools/host_tests/udpecho_client_auto.py @@ -37,10 +37,9 @@ class UDPEchoClientTest(Test): c = self.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...' if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return - print c.strip() - stdout.flush() + self.notify(c.strip()) print "HOST: Sending server IP Address to target..." connection_str = ip_address + ":" + str(port_no) + "\n" @@ -48,10 +47,10 @@ class UDPEchoClientTest(Test): c = self.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...' if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return - print c.strip() - stdout.flush() + self.notify(c.strip()) + class UDPEchoClient_Handler(BaseRequestHandler): def handle(self): diff --git a/workspace_tools/host_tests/udpecho_server_auto.py b/workspace_tools/host_tests/udpecho_server_auto.py index 5697a42821..523c7d8beb 100644 --- a/workspace_tools/host_tests/udpecho_server_auto.py +++ b/workspace_tools/host_tests/udpecho_server_auto.py @@ -37,17 +37,15 @@ class UDPEchoServerTest(DefaultTest): result = True serial_ip_msg = self.mbed.serial_readline() if serial_ip_msg is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return - stdout.write(serial_ip_msg) - stdout.flush() + self.notify(serial_ip_msg) # Searching for IP address and port prompted by server m = self.re_detect_server_ip.search(serial_ip_msg) if m and len(m.groups()): self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4]) self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method - print "HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT) - stdout.flush() + self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT)) # We assume this test fails so can't send 'error' message to server try: @@ -55,7 +53,7 @@ class UDPEchoServerTest(DefaultTest): except Exception, e: self.s = None print "HOST: Error: %s" % e - self.print_result('error') + self.print_result(self.RESULT_ERROR) exit(-1) for i in range(0, 100): @@ -74,17 +72,14 @@ class UDPEchoServerTest(DefaultTest): if self.s is not None: self.s.close() - if result: - self.print_result('success') - else: - self.print_result('failure') + self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE) # Receiving try: while True: c = self.mbed.serial_read(512) if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) break stdout.write(c) stdout.flush() diff --git a/workspace_tools/host_tests/wait_us_auto.py b/workspace_tools/host_tests/wait_us_auto.py index 23971d99c0..a331fd4047 100644 --- a/workspace_tools/host_tests/wait_us_auto.py +++ b/workspace_tools/host_tests/wait_us_auto.py @@ -24,28 +24,28 @@ class WaitusTest(DefaultTest): test_result = True # First character to start test (to know after reset when test starts) if self.mbed.serial_timeout(None) is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return c = self.mbed.serial_read(1) if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return if c == '$': # target will printout TargetID e.g.: $$$$1040e649d5c09a09a3f6bc568adef61375c6 #Read additional 39 bytes of TargetID if self.mbed.serial_read(39) is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return c = self.mbed.serial_read(1) # Re-read first 'tick' if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return - print "Test started" + self.notify("Test started") start_serial_pool = time() start = time() for i in range(0, 10): c = self.mbed.serial_read(1) if c is None: - self.print_result("ioerr_serial") + self.print_result(self.RESULT_IO_SERIAL) return if i > 2: # we will ignore first few measurements delta = time() - start @@ -57,18 +57,15 @@ class WaitusTest(DefaultTest): deviation_ok = True if delta > 0 and deviation <= 0.10 else False # +/-10% test_result = test_result and deviation_ok msg = "OK" if deviation_ok else "FAIL" - print ". in %.2f sec (%.2f) [%s]" % (delta, deviation, msg) + self.notify(". in %.2f sec (%.2f) [%s]" % (delta, deviation, msg)) else: - print ". skipped" + self.notify(". skipped") stdout.flush() start = time() measurement_time = time() - start_serial_pool - print "Completed in %.2f sec" % (measurement_time) - - if test_result: # All numbers are the same - self.print_result('success') - else: - self.print_result('failure') + self.notify("Completed in %.2f sec" % (measurement_time)) + self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE) + if __name__ == '__main__': WaitusTest().run()