diff --git a/workspace_tools/host_tests/detect_auto.py b/workspace_tools/host_tests/detect_auto.py index 18a9d29fc7..03ff17fa9d 100644 --- a/workspace_tools/host_tests/detect_auto.py +++ b/workspace_tools/host_tests/detect_auto.py @@ -15,31 +15,27 @@ See the License for the specific language governing permissions and limitations under the License. """ -from host_test import DefaultTest -from sys import stdout import re +from host_test import DefaultTest class DetectPlatformTest(DefaultTest): PATTERN_MICRO_NAME = "Target '(\w+)'" re_detect_micro_name = re.compile(PATTERN_MICRO_NAME) - def run(self): + def test(self): result = True c = self.mbed.serial_readline() # {{start}} preamble - print c if c is None: - self.print_result(self.RESULT_IO_SERIAL) - return + return self.RESULT_IO_SERIAL self.notify(c.strip()) self.notify("HOST: Detecting target name...") c = self.mbed.serial_readline() if c is None: - self.print_result(self.RESULT_IO_SERIAL) - return + return self.RESULT_IO_SERIAL self.notify(c.strip()) # Check for target name @@ -53,11 +49,10 @@ class DetectPlatformTest(DefaultTest): for i in range(0, 2): c = self.mbed.serial_readline() if c is None: - self.print_result(self.RESULT_IO_SERIAL) - return + return self.RESULT_IO_SERIAL self.notify(c.strip()) - self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE) + return self.RESULT_SUCCESS if result else self.RESULT_FAILURE if __name__ == '__main__': diff --git a/workspace_tools/host_tests/dev_null_auto.py b/workspace_tools/host_tests/dev_null_auto.py index 3e6e0af6cb..6962b4f46e 100644 --- a/workspace_tools/host_tests/dev_null_auto.py +++ b/workspace_tools/host_tests/dev_null_auto.py @@ -16,7 +16,7 @@ limitations under the License. """ from host_test import DefaultTest -from sys import stdout + class DevNullTest(DefaultTest): @@ -25,18 +25,15 @@ class DevNullTest(DefaultTest): """ result = False c = self.mbed.serial_readline() - if c is None: - self.print_result(self.RESULT_IO_SERIAL) - return None - if text in c: + if c and text in c: result = True return result - def run(self): + def test(self): result = True - # Test should print some text and later stop printing - res = self.check_readline("re-routing stdout to /null") # 'MBED: re-routing stdout to /null' + # 'MBED: re-routing stdout to /null' + res = self.check_readline("re-routing stdout to /null") if not res: # We haven't read preamble line result = False @@ -46,16 +43,14 @@ class DevNullTest(DefaultTest): for i in range(3): c = self.mbed.serial_read(32) if c is None: - self.print_result(self.RESULT_IO_SERIAL) - return + return self.RESULT_IO_SERIAL else: str += c if len(str) > 0: result = False break self.notify("Received %d bytes: %s"% (len(str), str)) - - self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE) + return self.RESULT_SUCCESS if result else self.RESULT_FAILURE if __name__ == '__main__': diff --git a/workspace_tools/host_tests/echo.py b/workspace_tools/host_tests/echo.py index 1cfe3fbfd4..0e6856b186 100644 --- a/workspace_tools/host_tests/echo.py +++ b/workspace_tools/host_tests/echo.py @@ -14,6 +14,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ + import sys import uuid from sys import stdout @@ -21,42 +22,49 @@ from host_test import TestResults, Test class EchoTest(Test): + """ This host test will use mbed serial port with + baudrate 115200 to perform echo test on that port. + """ + def __init__(self): + # Constructors TestResults.__init__(self) Test.__init__(self) - serial_init_res = self.mbed.init_serial(115200) + + # Test parameters + self.TEST_SERIAL_BAUDRATE = 115200 + self.TEST_LOOP_COUNT = 50 + + # Initializations + serial_init_res = self.mbed.init_serial(self.TEST_SERIAL_BAUDRATE) if not serial_init_res: self.print_result(self.RESULT_IO_SERIAL) self.mbed.reset() - self.TEST_LOOP_COUNT = 50 def test(self): """ Test function, return True or False to get standard test notification on stdout """ c = self.mbed.serial_readline() # '{{start}}' if c is None: - self.print_result(self.RESULT_IO_SERIAL) - return - print c.strip() - stdout.flush() + return self.RESULT_IO_SERIAL + self.notify(c.strip()) self.mbed.flush() self.notify("HOST: Starting the ECHO test") result = True for i in range(0, self.TEST_LOOP_COUNT): - TEST = str(uuid.uuid4()) + "\n" - self.mbed.serial_write(TEST) + TEST_STRING = str(uuid.uuid4()) + "\n" + self.mbed.serial_write(TEST_STRING) c = self.mbed.serial_readline() if c is None: - self.print_result(self.RESULT_IO_SERIAL) - return - if c.strip() != TEST.strip(): - self.notify('HOST: "%s" != "%s"'% (c, TEST)) + return self.RESULT_IO_SERIAL + if c.strip() != TEST_STRING.strip(): + self.notify('HOST: "%s" != "%s"'% (c, TEST_STRING)) result = False else: sys.stdout.write('.') stdout.flush() - return result + return self.RESULT_SUCCESS if result else self.RESULT_FAILURE if __name__ == '__main__': diff --git a/workspace_tools/host_tests/hello_auto.py b/workspace_tools/host_tests/hello_auto.py index 9c4f7d3a2f..27ea7a8530 100644 --- a/workspace_tools/host_tests/hello_auto.py +++ b/workspace_tools/host_tests/hello_auto.py @@ -16,24 +16,21 @@ limitations under the License. """ from host_test import DefaultTest -from sys import stdout -import re + class HelloTest(DefaultTest): HELLO_WORLD = "Hello World" - def run(self): + def test(self): c = self.mbed.serial_readline() if c is None: - self.print_result(self.RESULT_IO_SERIAL) - return + return self.RESULT_IO_SERIAL self.notify(c.strip()) c = self.mbed.serial_readline() if c is None: - self.print_result(self.RESULT_IO_SERIAL) - return - self.notify("Read %d bytes"% len(c)) + return self.RESULT_IO_SERIAL + self.notify("Read %d bytes:"% len(c)) self.notify(c.strip()) result = True @@ -42,7 +39,7 @@ class HelloTest(DefaultTest): result = False else: result = self.HELLO_WORLD in c - self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE) + return self.RESULT_SUCCESS if result else self.RESULT_FAILURE if __name__ == '__main__': diff --git a/workspace_tools/host_tests/host_test.py b/workspace_tools/host_tests/host_test.py index 02fc52102c..5e9ce50f0e 100644 --- a/workspace_tools/host_tests/host_test.py +++ b/workspace_tools/host_tests/host_test.py @@ -22,10 +22,12 @@ except ImportError, e: print "Error: Can't import 'serial' module: %s"% e exit(-1) + import os -from optparse import OptionParser -from time import sleep, time from sys import stdout +from time import sleep, time +from optparse import OptionParser + # This is a little tricky. We need to add upper directory to path so # we can find packages we want from the same level as other files do @@ -273,17 +275,20 @@ class Test(TestResults): def run(self): """ Test runner for host test. This function will start executing - test and forward test result via serial port to test suite + test() function and forward test result via serial port to test suite """ try: - result = self.test() - self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE) + # We expect here output from test in one of possible statuses + # E.g. self.RESULT_SUCCESS, self.RESULT_FAILURE, self.RESULT_ERROR + result_status = self.test() + self.print_result(result_status) except Exception, e: - print str(e) + self.notify(str(e)) self.print_result(self.RESULT_ERROR) def setup(self): - """ Setup and check if configuration for test is correct. E.g. if serial port can be opened + """ Setup and check if configuration for test is correct. + E.g. if serial port can be opened """ result = True if not self.mbed.serial: @@ -320,17 +325,19 @@ class Simple(DefaultTest): output from MUT, no supervision over test running in MUT is executed. Just waiting for result """ - def run(self): + def test(self): try: while True: c = self.mbed.serial_read(512) if c is None: - self.print_result(self.RESULT_IO_SERIAL) - break + return self.RESULT_IO_SERIAL stdout.write(c) stdout.flush() except KeyboardInterrupt, _: self.notify("\r\n[CTRL+C] exit") + # If this function ends we assume user break or exception + # occured and error should be issued to test suite + return self.RESULT_ERROR if __name__ == '__main__': diff --git a/workspace_tools/host_tests/rtc_auto.py b/workspace_tools/host_tests/rtc_auto.py index 4c31c65e07..0ccdce5665 100644 --- a/workspace_tools/host_tests/rtc_auto.py +++ b/workspace_tools/host_tests/rtc_auto.py @@ -18,13 +18,13 @@ limitations under the License. import re from host_test import DefaultTest from time import time, strftime, gmtime -from sys import stdout + class RTCTest(DefaultTest): PATTERN_RTC_VALUE = "\[(\d+)\] \[(\d+-\d+-\d+ \d+:\d+:\d+ [AaPpMm]{2})\]" re_detect_rtc_value = re.compile(PATTERN_RTC_VALUE) - def run(self): + def test(self): test_result = True start = time() sec_prev = 0 @@ -32,8 +32,7 @@ class RTCTest(DefaultTest): # Timeout changed from default: we need to wait longer for some boards to start-up c = self.mbed.serial_readline(timeout=10) if c is None: - self.print_result(self.RESULT_IO_SERIAL) - return + return self.RESULT_IO_SERIAL self.notify(c.strip()) delta = time() - start m = self.re_detect_rtc_value.search(c) @@ -49,7 +48,7 @@ class RTCTest(DefaultTest): test_result = False break start = time() - self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE) + return self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE if __name__ == '__main__': diff --git a/workspace_tools/host_tests/stdio_auto.py b/workspace_tools/host_tests/stdio_auto.py index 3b30ea1a68..c0c7a58076 100644 --- a/workspace_tools/host_tests/stdio_auto.py +++ b/workspace_tools/host_tests/stdio_auto.py @@ -17,21 +17,20 @@ limitations under the License. import re import random -from sys import stdout from time import time from host_test import DefaultTest + class StdioTest(DefaultTest): PATTERN_INT_VALUE = "Your value was: (-?\d+)" re_detect_int_value = re.compile(PATTERN_INT_VALUE) - def run(self): + def test(self): test_result = True c = self.mbed.serial_readline() # {{start}} preamble if c is None: - self.print_result(self.RESULT_IO_SERIAL) - return + return self.RESULT_IO_SERIAL self.notify(c) for i in range(0, 10): @@ -42,8 +41,7 @@ class StdioTest(DefaultTest): serial_stdio_msg = self.mbed.serial_readline() if c is None: - self.print_result(self.RESULT_IO_SERIAL) - return + return self.RESULT_IO_SERIAL delay_time = time() - start self.notify(serial_stdio_msg.strip()) @@ -57,8 +55,7 @@ class StdioTest(DefaultTest): else: test_result = False break - - self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE) + return self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE if __name__ == '__main__': diff --git a/workspace_tools/host_tests/tcpecho_client_auto.py b/workspace_tools/host_tests/tcpecho_client_auto.py index 9d937a4c01..1007a68f62 100644 --- a/workspace_tools/host_tests/tcpecho_client_auto.py +++ b/workspace_tools/host_tests/tcpecho_client_auto.py @@ -18,7 +18,6 @@ limitations under the License. import sys import socket from sys import stdout -from time import sleep from host_test import Test from SocketServer import BaseRequestHandler, TCPServer diff --git a/workspace_tools/host_tests/tcpecho_server_auto.py b/workspace_tools/host_tests/tcpecho_server_auto.py index 22dacd75bb..3532325d3f 100644 --- a/workspace_tools/host_tests/tcpecho_server_auto.py +++ b/workspace_tools/host_tests/tcpecho_server_auto.py @@ -20,9 +20,9 @@ import sys import uuid import socket from sys import stdout -from time import time from host_test import DefaultTest + class TCPEchoServerTest(DefaultTest): ECHO_SERVER_ADDRESS = "" ECHO_PORT = 0 diff --git a/workspace_tools/host_tests/udpecho_server_auto.py b/workspace_tools/host_tests/udpecho_server_auto.py index 523c7d8beb..a71fdf17d4 100644 --- a/workspace_tools/host_tests/udpecho_server_auto.py +++ b/workspace_tools/host_tests/udpecho_server_auto.py @@ -15,12 +15,10 @@ See the License for the specific language governing permissions and limitations under the License. """ - import re import sys import uuid from sys import stdout -from time import time from host_test import DefaultTest from socket import socket, AF_INET, SOCK_DGRAM diff --git a/workspace_tools/host_tests/wait_us_auto.py b/workspace_tools/host_tests/wait_us_auto.py index a331fd4047..0e5d553cef 100644 --- a/workspace_tools/host_tests/wait_us_auto.py +++ b/workspace_tools/host_tests/wait_us_auto.py @@ -15,38 +15,36 @@ See the License for the specific language governing permissions and limitations under the License. """ -from host_test import DefaultTest from time import time -from sys import stdout +from host_test import DefaultTest + class WaitusTest(DefaultTest): - def run(self): + """ This test is reading single characters from stdio + and measures time between their occurrences. + """ + def test(self): test_result = True # First character to start test (to know after reset when test starts) if self.mbed.serial_timeout(None) is None: - self.print_result(self.RESULT_IO_SERIAL) - return + return self.RESULT_IO_SERIAL c = self.mbed.serial_read(1) if c is None: - self.print_result(self.RESULT_IO_SERIAL) - return + return self.RESULT_IO_SERIAL if c == '$': # target will printout TargetID e.g.: $$$$1040e649d5c09a09a3f6bc568adef61375c6 #Read additional 39 bytes of TargetID if self.mbed.serial_read(39) is None: - self.print_result(self.RESULT_IO_SERIAL) - return + return self.RESULT_IO_SERIAL c = self.mbed.serial_read(1) # Re-read first 'tick' if c is None: - self.print_result(self.RESULT_IO_SERIAL) - return + return self.RESULT_IO_SERIAL self.notify("Test started") start_serial_pool = time() start = time() for i in range(0, 10): c = self.mbed.serial_read(1) if c is None: - self.print_result(self.RESULT_IO_SERIAL) - return + return self.RESULT_IO_SERIAL if i > 2: # we will ignore first few measurements delta = time() - start deviation = abs(delta - 1) @@ -60,12 +58,11 @@ class WaitusTest(DefaultTest): self.notify(". in %.2f sec (%.2f) [%s]" % (delta, deviation, msg)) else: self.notify(". skipped") - stdout.flush() start = time() measurement_time = time() - start_serial_pool self.notify("Completed in %.2f sec" % (measurement_time)) - self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE) + return self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE + - if __name__ == '__main__': WaitusTest().run()