Host test refactoring: cleaned a bit functionality of host tests. Forced test() function in DefaultTest to be method returning test result

pull/588/head
Przemek Wirkus 2014-10-21 13:12:32 +01:00
parent 9ace6d40a4
commit 3850ddc8c1
8 changed files with 80 additions and 81 deletions

View File

@ -15,31 +15,28 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
from host_test import DefaultTest
from sys import stdout
import re
from sys import stdout
from host_test import DefaultTest
class DetectPlatformTest(DefaultTest):
PATTERN_MICRO_NAME = "Target '(\w+)'"
re_detect_micro_name = re.compile(PATTERN_MICRO_NAME)
def run(self):
def test(self):
result = True
c = self.mbed.serial_readline() # {{start}} preamble
print c
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
self.notify(c.strip())
self.notify("HOST: Detecting target name...")
c = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
self.notify(c.strip())
# Check for target name
@ -53,11 +50,10 @@ class DetectPlatformTest(DefaultTest):
for i in range(0, 2):
c = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
self.notify(c.strip())
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':

View File

@ -25,18 +25,15 @@ class DevNullTest(DefaultTest):
"""
result = False
c = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return None
if text in c:
if c and text in c:
result = True
return result
def run(self):
def test(self):
result = True
# Test should print some text and later stop printing
res = self.check_readline("re-routing stdout to /null") # 'MBED: re-routing stdout to /null'
# 'MBED: re-routing stdout to /null'
res = self.check_readline("re-routing stdout to /null")
if not res:
# We haven't read preamble line
result = False
@ -46,16 +43,14 @@ class DevNullTest(DefaultTest):
for i in range(3):
c = self.mbed.serial_read(32)
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
else:
str += c
if len(str) > 0:
result = False
break
self.notify("Received %d bytes: %s"% (len(str), str))
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':

View File

@ -21,42 +21,49 @@ from host_test import TestResults, Test
class EchoTest(Test):
""" This host test will use mbed serial port with
baudrate 115200 to perform echo test on that port.
"""
def __init__(self):
# Constructors
TestResults.__init__(self)
Test.__init__(self)
serial_init_res = self.mbed.init_serial(115200)
# Test parameters
self.TEST_SERIAL_BAUDRATE = 115200
self.TEST_LOOP_COUNT = 50
# Initializations
serial_init_res = self.mbed.init_serial(self.TEST_SERIAL_BAUDRATE)
if not serial_init_res:
self.print_result(self.RESULT_IO_SERIAL)
self.mbed.reset()
self.TEST_LOOP_COUNT = 50
def test(self):
""" Test function, return True or False to get standard test notification on stdout
"""
c = self.mbed.serial_readline() # '{{start}}'
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
print c.strip()
stdout.flush()
return self.RESULT_IO_SERIAL
self.notify(c.strip())
self.mbed.flush()
self.notify("HOST: Starting the ECHO test")
result = True
for i in range(0, self.TEST_LOOP_COUNT):
TEST = str(uuid.uuid4()) + "\n"
self.mbed.serial_write(TEST)
TEST_STRING = str(uuid.uuid4()) + "\n"
self.mbed.serial_write(TEST_STRING)
c = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
if c.strip() != TEST.strip():
self.notify('HOST: "%s" != "%s"'% (c, TEST))
return self.RESULT_IO_SERIAL
if c.strip() != TEST_STRING.strip():
self.notify('HOST: "%s" != "%s"'% (c, TEST_STRING))
result = False
else:
sys.stdout.write('.')
stdout.flush()
return result
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':

View File

@ -15,25 +15,24 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
from host_test import DefaultTest
from sys import stdout
import re
from sys import stdout
from host_test import DefaultTest
class HelloTest(DefaultTest):
HELLO_WORLD = "Hello World"
def run(self):
def test(self):
c = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
self.notify(c.strip())
c = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
self.notify("Read %d bytes"% len(c))
return self.RESULT_IO_SERIAL
self.notify("Read %d bytes:"% len(c))
self.notify(c.strip())
result = True
@ -42,7 +41,7 @@ class HelloTest(DefaultTest):
result = False
else:
result = self.HELLO_WORLD in c
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':

View File

@ -22,10 +22,12 @@ except ImportError, e:
print "Error: Can't import 'serial' module: %s"% e
exit(-1)
import os
from optparse import OptionParser
from time import sleep, time
from sys import stdout
from time import sleep, time
from optparse import OptionParser
# This is a little tricky. We need to add upper directory to path so
# we can find packages we want from the same level as other files do
@ -273,17 +275,20 @@ class Test(TestResults):
def run(self):
""" Test runner for host test. This function will start executing
test and forward test result via serial port to test suite
test() function and forward test result via serial port to test suite
"""
try:
result = self.test()
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
# We expect here output from test in one of possible statuses
# E.g. self.RESULT_SUCCESS, self.RESULT_FAILURE, self.RESULT_ERROR
result_status = self.test()
self.print_result(result_status)
except Exception, e:
print str(e)
self.notify(str(e))
self.print_result(self.RESULT_ERROR)
def setup(self):
""" Setup and check if configuration for test is correct. E.g. if serial port can be opened
""" Setup and check if configuration for test is correct.
E.g. if serial port can be opened
"""
result = True
if not self.mbed.serial:
@ -320,17 +325,19 @@ class Simple(DefaultTest):
output from MUT, no supervision over test running in MUT is executed.
Just waiting for result
"""
def run(self):
def test(self):
try:
while True:
c = self.mbed.serial_read(512)
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
break
return self.RESULT_IO_SERIAL
stdout.write(c)
stdout.flush()
except KeyboardInterrupt, _:
self.notify("\r\n[CTRL+C] exit")
# If this function ends we assume user break or exception
# occured and error should be issued to test suite
return self.RESULT_ERROR
if __name__ == '__main__':

View File

@ -16,15 +16,16 @@ limitations under the License.
"""
import re
from sys import stdout
from host_test import DefaultTest
from time import time, strftime, gmtime
from sys import stdout
class RTCTest(DefaultTest):
PATTERN_RTC_VALUE = "\[(\d+)\] \[(\d+-\d+-\d+ \d+:\d+:\d+ [AaPpMm]{2})\]"
re_detect_rtc_value = re.compile(PATTERN_RTC_VALUE)
def run(self):
def test(self):
test_result = True
start = time()
sec_prev = 0
@ -32,8 +33,7 @@ class RTCTest(DefaultTest):
# Timeout changed from default: we need to wait longer for some boards to start-up
c = self.mbed.serial_readline(timeout=10)
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
self.notify(c.strip())
delta = time() - start
m = self.re_detect_rtc_value.search(c)
@ -49,7 +49,7 @@ class RTCTest(DefaultTest):
test_result = False
break
start = time()
self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE)
return self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE
if __name__ == '__main__':

View File

@ -25,13 +25,12 @@ class StdioTest(DefaultTest):
PATTERN_INT_VALUE = "Your value was: (-?\d+)"
re_detect_int_value = re.compile(PATTERN_INT_VALUE)
def run(self):
def test(self):
test_result = True
c = self.mbed.serial_readline() # {{start}} preamble
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
self.notify(c)
for i in range(0, 10):
@ -42,8 +41,7 @@ class StdioTest(DefaultTest):
serial_stdio_msg = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
delay_time = time() - start
self.notify(serial_stdio_msg.strip())
@ -57,8 +55,7 @@ class StdioTest(DefaultTest):
else:
test_result = False
break
self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE)
return self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE
if __name__ == '__main__':

View File

@ -15,38 +15,37 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
from host_test import DefaultTest
from time import time
from sys import stdout
from host_test import DefaultTest
class WaitusTest(DefaultTest):
def run(self):
""" This test is reading single characters from stdio
and measures time between their occurrences.
"""
def test(self):
test_result = True
# First character to start test (to know after reset when test starts)
if self.mbed.serial_timeout(None) is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
c = self.mbed.serial_read(1)
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
if c == '$': # target will printout TargetID e.g.: $$$$1040e649d5c09a09a3f6bc568adef61375c6
#Read additional 39 bytes of TargetID
if self.mbed.serial_read(39) is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
c = self.mbed.serial_read(1) # Re-read first 'tick'
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
self.notify("Test started")
start_serial_pool = time()
start = time()
for i in range(0, 10):
c = self.mbed.serial_read(1)
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
if i > 2: # we will ignore first few measurements
delta = time() - start
deviation = abs(delta - 1)
@ -60,12 +59,11 @@ class WaitusTest(DefaultTest):
self.notify(". in %.2f sec (%.2f) [%s]" % (delta, deviation, msg))
else:
self.notify(". skipped")
stdout.flush()
start = time()
measurement_time = time() - start_serial_pool
self.notify("Completed in %.2f sec" % (measurement_time))
self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE)
return self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE
if __name__ == '__main__':
WaitusTest().run()