Host test refactoring

Host test refactoring: replaced magic values of test results returned by host test

Prints with stdout.flush() replaced with notify function

Host test refactoring: created separate enum like class for test result strings
pull/577/head
Przemek Wirkus 2014-10-17 15:14:53 +01:00
parent 35e65dae00
commit 755565d854
13 changed files with 106 additions and 139 deletions

View File

@ -19,6 +19,7 @@ from host_test import DefaultTest
from sys import stdout
import re
class DetectPlatformTest(DefaultTest):
PATTERN_MICRO_NAME = "Target '(\w+)'"
re_detect_micro_name = re.compile(PATTERN_MICRO_NAME)
@ -29,20 +30,17 @@ class DetectPlatformTest(DefaultTest):
c = self.mbed.serial_readline() # {{start}} preamble
print c
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
print c.strip()
stdout.flush()
print "HOST: Detecting target name..."
stdout.flush()
self.notify(c.strip())
self.notify("HOST: Detecting target name...")
c = self.mbed.serial_readline()
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
print c.strip()
stdout.flush()
self.notify(c.strip())
# Check for target name
m = self.re_detect_micro_name.search(c)
@ -50,22 +48,17 @@ class DetectPlatformTest(DefaultTest):
micro_name = m.groups()[0]
micro_cmp = self.mbed.options.micro == micro_name
result = result and micro_cmp
print "HOST: MUT Target name '%s', expected '%s'... [%s]"% (micro_name, self.mbed.options.micro, "OK" if micro_cmp else "FAIL")
stdout.flush()
self.notify("HOST: MUT Target name '%s', expected '%s'... [%s]"% (micro_name, self.mbed.options.micro, "OK" if micro_cmp else "FAIL"))
for i in range(0, 2):
c = self.mbed.serial_readline()
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
print c.strip()
stdout.flush()
self.notify(c.strip())
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
if result: # Hello World received
self.print_result('success')
else:
self.print_result('failure')
stdout.flush()
if __name__ == '__main__':
DetectPlatformTest().run()

View File

@ -26,7 +26,7 @@ class DevNullTest(DefaultTest):
result = False
c = self.mbed.serial_readline()
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return None
if text in c:
result = True
@ -46,19 +46,17 @@ class DevNullTest(DefaultTest):
for i in range(3):
c = self.mbed.serial_read(32)
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
else:
str += c
if len(str) > 0:
result = False
break
print "Received %d bytes: %s"% (len(str), str)
stdout.flush()
if result:
self.print_result('success')
else:
self.print_result('failure')
self.notify("Received %d bytes: %s"% (len(str), str))
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
if __name__ == '__main__':
DevNullTest().run()

View File

@ -17,15 +17,16 @@ limitations under the License.
import sys
import uuid
from sys import stdout
from host_test import Test
from host_test import TestResults, Test
class EchoTest(Test):
def __init__(self):
TestResults.__init__(self)
Test.__init__(self)
serial_init_res = self.mbed.init_serial(115200)
if not serial_init_res:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
self.mbed.reset()
self.TEST_LOOP_COUNT = 50
@ -34,7 +35,7 @@ class EchoTest(Test):
"""
c = self.mbed.serial_readline() # '{{start}}'
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
print c.strip()
stdout.flush()
@ -47,7 +48,7 @@ class EchoTest(Test):
self.mbed.serial_write(TEST)
c = self.mbed.serial_readline()
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
if c.strip() != TEST.strip():
self.notify('HOST: "%s" != "%s"'% (c, TEST))

View File

@ -25,30 +25,25 @@ class HelloTest(DefaultTest):
def run(self):
c = self.mbed.serial_readline()
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
print c.strip()
stdout.flush()
self.notify(c.strip())
c = self.mbed.serial_readline()
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
print "Read %d bytes"% len(c)
print c.strip()
stdout.flush()
self.notify("Read %d bytes"% len(c))
self.notify(c.strip())
result = True
# Because we can have targetID here let's try to decode
if len(c) < len(self.HELLO_WORLD):
result = False
else:
result = self.HELLO_WORLD in c
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
if result: # Hello World received
self.print_result('success')
else:
self.print_result('failure')
stdout.flush()
if __name__ == '__main__':
HelloTest().run()

View File

@ -255,8 +255,18 @@ class Mbed:
self.extra_serial.flushOutput()
class Test:
""" Baseclass for host test's test runner
class TestResults:
""" Test results set by host tests
"""
def __init__(self):
self.RESULT_SUCCESS = 'success'
self.RESULT_FAILURE = 'failure'
self.RESULT_ERROR = 'error'
self.RESULT_IO_SERIAL = 'ioerr_serial'
class Test(TestResults):
""" Base class for host test's test runner
"""
def __init__(self):
self.mbed = Mbed()
@ -267,10 +277,10 @@ class Test:
"""
try:
result = self.test()
self.print_result("success" if result else "failure")
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
except Exception, e:
print str(e)
self.print_result("error")
self.print_result(self.RESULT_ERROR)
def setup(self):
""" Setup and check if configuration for test is correct. E.g. if serial port can be opened
@ -278,7 +288,7 @@ class Test:
result = True
if not self.mbed.serial:
result = False
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return result
def notify(self, message):
@ -290,17 +300,18 @@ class Test:
def print_result(self, result):
""" Test result unified printing function
"""
self.notify("\n{%s}\n{end}" % result)
self.notify("\n{{%s}}\n{{end}}" % result)
class DefaultTest(Test):
""" Test class with serial port initialization
"""
def __init__(self):
TestResults.__init__(self)
Test.__init__(self)
serial_init_res = self.mbed.init_serial()
if not serial_init_res:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
self.mbed.reset()
@ -314,12 +325,12 @@ class Simple(DefaultTest):
while True:
c = self.mbed.serial_read(512)
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
break
stdout.write(c)
stdout.flush()
except KeyboardInterrupt, _:
print "\n[CTRL+c] exit"
self.notify("\r\n[CTRL+C] exit")
if __name__ == '__main__':

View File

@ -32,9 +32,9 @@ class RTCTest(DefaultTest):
# Timeout changed from default: we need to wait longer for some boards to start-up
c = self.mbed.serial_readline(timeout=10)
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
print c.strip()
self.notify(c.strip())
delta = time() - start
m = self.re_detect_rtc_value.search(c)
if m and len(m.groups()):
@ -43,17 +43,14 @@ class RTCTest(DefaultTest):
correct_time_str = strftime("%Y-%m-%d %H:%M:%S %p", gmtime(float(sec)))
test_result = test_result and (time_str == correct_time_str)
result_msg = "OK" if (time_str == correct_time_str and sec > 0 and sec > sec_prev) else "FAIL"
print "HOST: [%s] [%s] received time %+d sec after %.2f sec... %s"% (sec, time_str, sec - sec_prev, delta, result_msg)
self.notify("HOST: [%s] [%s] received time %+d sec after %.2f sec... %s"% (sec, time_str, sec - sec_prev, delta, result_msg))
sec_prev = sec
else:
test_result = False
break
start = time()
stdout.flush()
if test_result: # All numbers are the same
self.print_result('success')
else:
self.print_result('failure')
self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE)
if __name__ == '__main__':
RTCTest().run()

View File

@ -30,25 +30,22 @@ class StdioTest(DefaultTest):
c = self.mbed.serial_readline() # {{start}} preamble
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
print c
stdout.flush()
self.notify(c)
for i in range(0, 10):
random_integer = random.randint(-99999, 99999)
print "HOST: Generated number: " + str(random_integer)
stdout.flush()
self.notify("HOST: Generated number: " + str(random_integer))
start = time()
self.mbed.serial_write(str(random_integer) + "\n")
serial_stdio_msg = self.mbed.serial_readline()
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
delay_time = time() - start
print serial_stdio_msg.strip()
stdout.flush()
self.notify(serial_stdio_msg.strip())
# Searching for reply with scanned values
m = self.re_detect_int_value.search(serial_stdio_msg)
@ -56,17 +53,13 @@ class StdioTest(DefaultTest):
int_value = m.groups()[0]
int_value_cmp = random_integer == int(int_value)
test_result = test_result and int_value_cmp
print "HOST: Number %s read after %.3f sec ... [%s]"% (int_value, delay_time, "OK" if int_value_cmp else "FAIL")
print
stdout.flush()
self.notify("HOST: Number %s read after %.3f sec ... [%s]"% (int_value, delay_time, "OK" if int_value_cmp else "FAIL"))
else:
test_result = False
break
if test_result: # All numbers are the same
self.print_result('success')
else:
self.print_result('failure')
self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE)
if __name__ == '__main__':
StdioTest().run()

View File

@ -35,30 +35,28 @@ class TCPEchoClientTest(Test):
def send_server_ip_port(self, ip_address, port_no):
""" Set up network host. Reset target and and send server IP via serial to Mbed
"""
print "HOST: Resetting target..."
self.notify("HOST: Resetting target...")
self.mbed.reset()
c = self.mbed.serial_readline() # 'TCPCllient waiting for server IP and port...'
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
print c.strip()
print "HOST: Sending server IP Address to target...",
stdout.flush()
self.notify(c.strip())
self.notify("HOST: Sending server IP Address to target...")
connection_str = ip_address + ":" + str(port_no) + "\n"
self.mbed.serial_write(connection_str)
print connection_str
stdout.flush()
self.notify(connection_str)
# Two more strings about connection should be sent by MBED
for i in range(0, 2):
c = self.mbed.serial_readline()
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
print c.strip()
stdout.flush()
self.notify(c.strip())
class TCPEchoClient_Handler(BaseRequestHandler):

View File

@ -36,17 +36,15 @@ class TCPEchoServerTest(DefaultTest):
result = False
c = self.mbed.serial_readline()
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
print c
stdout.flush()
self.notify(c)
m = self.re_detect_server_ip.search(c)
if m and len(m.groups()):
self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
print "HOST: TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT)
stdout.flush()
self.notify("HOST: TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
# We assume this test fails so can't send 'error' message to server
try:
@ -54,8 +52,8 @@ class TCPEchoServerTest(DefaultTest):
self.s.connect((self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
except Exception, e:
self.s = None
print "HOST: Error: %s" % e
self.print_result('error')
self.notify("HOST: Error: %s"% e)
self.print_result(self.RESULT_ERROR)
exit(-1)
print 'HOST: Sending %d echo strings...'% self.ECHO_LOOPs,
@ -82,17 +80,14 @@ class TCPEchoServerTest(DefaultTest):
print "HOST: TCP Server not found"
result = False
if result:
self.print_result('success')
else:
self.print_result('failure')
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
# Receiving
try:
while True:
c = self.mbed.serial_read(512)
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
break
stdout.write(c)
stdout.flush()

View File

@ -84,7 +84,7 @@ class UDPEchoServerTest(DefaultTest):
def run(self):
serial_ip_msg = self.mbed.serial_readline()
if serial_ip_msg is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
stdout.write(serial_ip_msg)
stdout.flush()
@ -93,16 +93,15 @@ class UDPEchoServerTest(DefaultTest):
if m and len(m.groups()):
self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
print "HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT)
stdout.flush()
self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
# Open client socket to burst datagrams to UDP server in mbed
try:
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
except Exception, e:
self.s = None
print "HOST: Error: %s" % e
self.print_result('error')
self.notify("HOST: Error: %s"% e)
self.print_result(self.RESULT_ERROR)
return
# UDP replied receiver works in background to get echoed datagrams
@ -129,8 +128,7 @@ class UDPEchoServerTest(DefaultTest):
for d in range(5):
sleep(1.0)
summary_datagram_success = (float(len(dict_udp_recv_datagrams)) / float(self.TEST_PACKET_COUNT)) * 100.0
# print dict_udp_recv_datagrams
print "HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms" % (d, summary_datagram_success, len(dict_udp_recv_datagrams), self.TEST_PACKET_COUNT, self.TEST_STRESS_FACTOR)
self.notify("HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms"% (d, summary_datagram_success, len(dict_udp_recv_datagrams), self.TEST_PACKET_COUNT, self.TEST_STRESS_FACTOR))
result = result and (summary_datagram_success >= self.PACKET_SATURATION_RATIO)
stdout.flush()
@ -142,10 +140,7 @@ class UDPEchoServerTest(DefaultTest):
print
stdout.flush()
if result:
self.print_result('success')
else:
self.print_result('failure')
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
# Receiving serial data from mbed
print
@ -154,7 +149,7 @@ class UDPEchoServerTest(DefaultTest):
while True:
c = self.mbed.serial_read(512)
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
break
stdout.write(c)
stdout.flush()

View File

@ -37,10 +37,9 @@ class UDPEchoClientTest(Test):
c = self.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
print c.strip()
stdout.flush()
self.notify(c.strip())
print "HOST: Sending server IP Address to target..."
connection_str = ip_address + ":" + str(port_no) + "\n"
@ -48,10 +47,10 @@ class UDPEchoClientTest(Test):
c = self.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
print c.strip()
stdout.flush()
self.notify(c.strip())
class UDPEchoClient_Handler(BaseRequestHandler):
def handle(self):

View File

@ -37,17 +37,15 @@ class UDPEchoServerTest(DefaultTest):
result = True
serial_ip_msg = self.mbed.serial_readline()
if serial_ip_msg is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
stdout.write(serial_ip_msg)
stdout.flush()
self.notify(serial_ip_msg)
# Searching for IP address and port prompted by server
m = self.re_detect_server_ip.search(serial_ip_msg)
if m and len(m.groups()):
self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
print "HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT)
stdout.flush()
self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
# We assume this test fails so can't send 'error' message to server
try:
@ -55,7 +53,7 @@ class UDPEchoServerTest(DefaultTest):
except Exception, e:
self.s = None
print "HOST: Error: %s" % e
self.print_result('error')
self.print_result(self.RESULT_ERROR)
exit(-1)
for i in range(0, 100):
@ -74,17 +72,14 @@ class UDPEchoServerTest(DefaultTest):
if self.s is not None:
self.s.close()
if result:
self.print_result('success')
else:
self.print_result('failure')
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
# Receiving
try:
while True:
c = self.mbed.serial_read(512)
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
break
stdout.write(c)
stdout.flush()

View File

@ -24,28 +24,28 @@ class WaitusTest(DefaultTest):
test_result = True
# First character to start test (to know after reset when test starts)
if self.mbed.serial_timeout(None) is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
c = self.mbed.serial_read(1)
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
if c == '$': # target will printout TargetID e.g.: $$$$1040e649d5c09a09a3f6bc568adef61375c6
#Read additional 39 bytes of TargetID
if self.mbed.serial_read(39) is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
c = self.mbed.serial_read(1) # Re-read first 'tick'
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
print "Test started"
self.notify("Test started")
start_serial_pool = time()
start = time()
for i in range(0, 10):
c = self.mbed.serial_read(1)
if c is None:
self.print_result("ioerr_serial")
self.print_result(self.RESULT_IO_SERIAL)
return
if i > 2: # we will ignore first few measurements
delta = time() - start
@ -57,18 +57,15 @@ class WaitusTest(DefaultTest):
deviation_ok = True if delta > 0 and deviation <= 0.10 else False # +/-10%
test_result = test_result and deviation_ok
msg = "OK" if deviation_ok else "FAIL"
print ". in %.2f sec (%.2f) [%s]" % (delta, deviation, msg)
self.notify(". in %.2f sec (%.2f) [%s]" % (delta, deviation, msg))
else:
print ". skipped"
self.notify(". skipped")
stdout.flush()
start = time()
measurement_time = time() - start_serial_pool
print "Completed in %.2f sec" % (measurement_time)
if test_result: # All numbers are the same
self.print_result('success')
else:
self.print_result('failure')
self.notify("Completed in %.2f sec" % (measurement_time))
self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE)
if __name__ == '__main__':
WaitusTest().run()