Merge pull request #577 from PrzemekWirkus/host_test-OO-refactoring

Tools: Host test refactoring - prepare to clean up host test implementation
pull/583/head
Martin Kojtal 2014-10-20 08:50:13 +01:00
commit ea390931b7
13 changed files with 106 additions and 139 deletions

View File

@ -19,6 +19,7 @@ from host_test import DefaultTest
from sys import stdout from sys import stdout
import re import re
class DetectPlatformTest(DefaultTest): class DetectPlatformTest(DefaultTest):
PATTERN_MICRO_NAME = "Target '(\w+)'" PATTERN_MICRO_NAME = "Target '(\w+)'"
re_detect_micro_name = re.compile(PATTERN_MICRO_NAME) re_detect_micro_name = re.compile(PATTERN_MICRO_NAME)
@ -29,20 +30,17 @@ class DetectPlatformTest(DefaultTest):
c = self.mbed.serial_readline() # {{start}} preamble c = self.mbed.serial_readline() # {{start}} preamble
print c print c
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
print c.strip()
stdout.flush()
print "HOST: Detecting target name..." self.notify(c.strip())
stdout.flush() self.notify("HOST: Detecting target name...")
c = self.mbed.serial_readline() c = self.mbed.serial_readline()
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
print c.strip() self.notify(c.strip())
stdout.flush()
# Check for target name # Check for target name
m = self.re_detect_micro_name.search(c) m = self.re_detect_micro_name.search(c)
@ -50,22 +48,17 @@ class DetectPlatformTest(DefaultTest):
micro_name = m.groups()[0] micro_name = m.groups()[0]
micro_cmp = self.mbed.options.micro == micro_name micro_cmp = self.mbed.options.micro == micro_name
result = result and micro_cmp result = result and micro_cmp
print "HOST: MUT Target name '%s', expected '%s'... [%s]"% (micro_name, self.mbed.options.micro, "OK" if micro_cmp else "FAIL") self.notify("HOST: MUT Target name '%s', expected '%s'... [%s]"% (micro_name, self.mbed.options.micro, "OK" if micro_cmp else "FAIL"))
stdout.flush()
for i in range(0, 2): for i in range(0, 2):
c = self.mbed.serial_readline() c = self.mbed.serial_readline()
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
print c.strip() self.notify(c.strip())
stdout.flush()
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
if result: # Hello World received
self.print_result('success')
else:
self.print_result('failure')
stdout.flush()
if __name__ == '__main__': if __name__ == '__main__':
DetectPlatformTest().run() DetectPlatformTest().run()

View File

@ -26,7 +26,7 @@ class DevNullTest(DefaultTest):
result = False result = False
c = self.mbed.serial_readline() c = self.mbed.serial_readline()
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return None return None
if text in c: if text in c:
result = True result = True
@ -46,19 +46,17 @@ class DevNullTest(DefaultTest):
for i in range(3): for i in range(3):
c = self.mbed.serial_read(32) c = self.mbed.serial_read(32)
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
else: else:
str += c str += c
if len(str) > 0: if len(str) > 0:
result = False result = False
break break
print "Received %d bytes: %s"% (len(str), str) self.notify("Received %d bytes: %s"% (len(str), str))
stdout.flush()
if result: self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
self.print_result('success')
else:
self.print_result('failure')
if __name__ == '__main__': if __name__ == '__main__':
DevNullTest().run() DevNullTest().run()

View File

@ -17,15 +17,16 @@ limitations under the License.
import sys import sys
import uuid import uuid
from sys import stdout from sys import stdout
from host_test import Test from host_test import TestResults, Test
class EchoTest(Test): class EchoTest(Test):
def __init__(self): def __init__(self):
TestResults.__init__(self)
Test.__init__(self) Test.__init__(self)
serial_init_res = self.mbed.init_serial(115200) serial_init_res = self.mbed.init_serial(115200)
if not serial_init_res: if not serial_init_res:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
self.mbed.reset() self.mbed.reset()
self.TEST_LOOP_COUNT = 50 self.TEST_LOOP_COUNT = 50
@ -34,7 +35,7 @@ class EchoTest(Test):
""" """
c = self.mbed.serial_readline() # '{{start}}' c = self.mbed.serial_readline() # '{{start}}'
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
print c.strip() print c.strip()
stdout.flush() stdout.flush()
@ -47,7 +48,7 @@ class EchoTest(Test):
self.mbed.serial_write(TEST) self.mbed.serial_write(TEST)
c = self.mbed.serial_readline() c = self.mbed.serial_readline()
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
if c.strip() != TEST.strip(): if c.strip() != TEST.strip():
self.notify('HOST: "%s" != "%s"'% (c, TEST)) self.notify('HOST: "%s" != "%s"'% (c, TEST))

View File

@ -25,30 +25,25 @@ class HelloTest(DefaultTest):
def run(self): def run(self):
c = self.mbed.serial_readline() c = self.mbed.serial_readline()
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
print c.strip() self.notify(c.strip())
stdout.flush()
c = self.mbed.serial_readline() c = self.mbed.serial_readline()
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
print "Read %d bytes"% len(c) self.notify("Read %d bytes"% len(c))
print c.strip() self.notify(c.strip())
stdout.flush()
result = True result = True
# Because we can have targetID here let's try to decode # Because we can have targetID here let's try to decode
if len(c) < len(self.HELLO_WORLD): if len(c) < len(self.HELLO_WORLD):
result = False result = False
else: else:
result = self.HELLO_WORLD in c result = self.HELLO_WORLD in c
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
if result: # Hello World received
self.print_result('success')
else:
self.print_result('failure')
stdout.flush()
if __name__ == '__main__': if __name__ == '__main__':
HelloTest().run() HelloTest().run()

View File

@ -255,8 +255,18 @@ class Mbed:
self.extra_serial.flushOutput() self.extra_serial.flushOutput()
class Test: class TestResults:
""" Baseclass for host test's test runner """ Test results set by host tests
"""
def __init__(self):
self.RESULT_SUCCESS = 'success'
self.RESULT_FAILURE = 'failure'
self.RESULT_ERROR = 'error'
self.RESULT_IO_SERIAL = 'ioerr_serial'
class Test(TestResults):
""" Base class for host test's test runner
""" """
def __init__(self): def __init__(self):
self.mbed = Mbed() self.mbed = Mbed()
@ -267,10 +277,10 @@ class Test:
""" """
try: try:
result = self.test() result = self.test()
self.print_result("success" if result else "failure") self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
except Exception, e: except Exception, e:
print str(e) print str(e)
self.print_result("error") self.print_result(self.RESULT_ERROR)
def setup(self): def setup(self):
""" Setup and check if configuration for test is correct. E.g. if serial port can be opened """ Setup and check if configuration for test is correct. E.g. if serial port can be opened
@ -278,7 +288,7 @@ class Test:
result = True result = True
if not self.mbed.serial: if not self.mbed.serial:
result = False result = False
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return result return result
def notify(self, message): def notify(self, message):
@ -290,17 +300,18 @@ class Test:
def print_result(self, result): def print_result(self, result):
""" Test result unified printing function """ Test result unified printing function
""" """
self.notify("\n{%s}\n{end}" % result) self.notify("\n{{%s}}\n{{end}}" % result)
class DefaultTest(Test): class DefaultTest(Test):
""" Test class with serial port initialization """ Test class with serial port initialization
""" """
def __init__(self): def __init__(self):
TestResults.__init__(self)
Test.__init__(self) Test.__init__(self)
serial_init_res = self.mbed.init_serial() serial_init_res = self.mbed.init_serial()
if not serial_init_res: if not serial_init_res:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
self.mbed.reset() self.mbed.reset()
@ -314,12 +325,12 @@ class Simple(DefaultTest):
while True: while True:
c = self.mbed.serial_read(512) c = self.mbed.serial_read(512)
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
break break
stdout.write(c) stdout.write(c)
stdout.flush() stdout.flush()
except KeyboardInterrupt, _: except KeyboardInterrupt, _:
print "\n[CTRL+c] exit" self.notify("\r\n[CTRL+C] exit")
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -32,9 +32,9 @@ class RTCTest(DefaultTest):
# Timeout changed from default: we need to wait longer for some boards to start-up # Timeout changed from default: we need to wait longer for some boards to start-up
c = self.mbed.serial_readline(timeout=10) c = self.mbed.serial_readline(timeout=10)
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
print c.strip() self.notify(c.strip())
delta = time() - start delta = time() - start
m = self.re_detect_rtc_value.search(c) m = self.re_detect_rtc_value.search(c)
if m and len(m.groups()): if m and len(m.groups()):
@ -43,17 +43,14 @@ class RTCTest(DefaultTest):
correct_time_str = strftime("%Y-%m-%d %H:%M:%S %p", gmtime(float(sec))) correct_time_str = strftime("%Y-%m-%d %H:%M:%S %p", gmtime(float(sec)))
test_result = test_result and (time_str == correct_time_str) test_result = test_result and (time_str == correct_time_str)
result_msg = "OK" if (time_str == correct_time_str and sec > 0 and sec > sec_prev) else "FAIL" result_msg = "OK" if (time_str == correct_time_str and sec > 0 and sec > sec_prev) else "FAIL"
print "HOST: [%s] [%s] received time %+d sec after %.2f sec... %s"% (sec, time_str, sec - sec_prev, delta, result_msg) self.notify("HOST: [%s] [%s] received time %+d sec after %.2f sec... %s"% (sec, time_str, sec - sec_prev, delta, result_msg))
sec_prev = sec sec_prev = sec
else: else:
test_result = False test_result = False
break break
start = time() start = time()
stdout.flush() self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE)
if test_result: # All numbers are the same
self.print_result('success')
else:
self.print_result('failure')
if __name__ == '__main__': if __name__ == '__main__':
RTCTest().run() RTCTest().run()

View File

@ -30,25 +30,22 @@ class StdioTest(DefaultTest):
c = self.mbed.serial_readline() # {{start}} preamble c = self.mbed.serial_readline() # {{start}} preamble
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
print c self.notify(c)
stdout.flush()
for i in range(0, 10): for i in range(0, 10):
random_integer = random.randint(-99999, 99999) random_integer = random.randint(-99999, 99999)
print "HOST: Generated number: " + str(random_integer) self.notify("HOST: Generated number: " + str(random_integer))
stdout.flush()
start = time() start = time()
self.mbed.serial_write(str(random_integer) + "\n") self.mbed.serial_write(str(random_integer) + "\n")
serial_stdio_msg = self.mbed.serial_readline() serial_stdio_msg = self.mbed.serial_readline()
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
delay_time = time() - start delay_time = time() - start
print serial_stdio_msg.strip() self.notify(serial_stdio_msg.strip())
stdout.flush()
# Searching for reply with scanned values # Searching for reply with scanned values
m = self.re_detect_int_value.search(serial_stdio_msg) m = self.re_detect_int_value.search(serial_stdio_msg)
@ -56,17 +53,13 @@ class StdioTest(DefaultTest):
int_value = m.groups()[0] int_value = m.groups()[0]
int_value_cmp = random_integer == int(int_value) int_value_cmp = random_integer == int(int_value)
test_result = test_result and int_value_cmp test_result = test_result and int_value_cmp
print "HOST: Number %s read after %.3f sec ... [%s]"% (int_value, delay_time, "OK" if int_value_cmp else "FAIL") self.notify("HOST: Number %s read after %.3f sec ... [%s]"% (int_value, delay_time, "OK" if int_value_cmp else "FAIL"))
print
stdout.flush()
else: else:
test_result = False test_result = False
break break
if test_result: # All numbers are the same self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE)
self.print_result('success')
else:
self.print_result('failure')
if __name__ == '__main__': if __name__ == '__main__':
StdioTest().run() StdioTest().run()

View File

@ -35,30 +35,28 @@ class TCPEchoClientTest(Test):
def send_server_ip_port(self, ip_address, port_no): def send_server_ip_port(self, ip_address, port_no):
""" Set up network host. Reset target and and send server IP via serial to Mbed """ Set up network host. Reset target and and send server IP via serial to Mbed
""" """
print "HOST: Resetting target..." self.notify("HOST: Resetting target...")
self.mbed.reset() self.mbed.reset()
c = self.mbed.serial_readline() # 'TCPCllient waiting for server IP and port...' c = self.mbed.serial_readline() # 'TCPCllient waiting for server IP and port...'
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
print c.strip() self.notify(c.strip())
print "HOST: Sending server IP Address to target...", self.notify("HOST: Sending server IP Address to target...")
stdout.flush()
connection_str = ip_address + ":" + str(port_no) + "\n" connection_str = ip_address + ":" + str(port_no) + "\n"
self.mbed.serial_write(connection_str) self.mbed.serial_write(connection_str)
print connection_str self.notify(connection_str)
stdout.flush()
# Two more strings about connection should be sent by MBED # Two more strings about connection should be sent by MBED
for i in range(0, 2): for i in range(0, 2):
c = self.mbed.serial_readline() c = self.mbed.serial_readline()
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
print c.strip() self.notify(c.strip())
stdout.flush()
class TCPEchoClient_Handler(BaseRequestHandler): class TCPEchoClient_Handler(BaseRequestHandler):

View File

@ -36,17 +36,15 @@ class TCPEchoServerTest(DefaultTest):
result = False result = False
c = self.mbed.serial_readline() c = self.mbed.serial_readline()
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
print c self.notify(c)
stdout.flush()
m = self.re_detect_server_ip.search(c) m = self.re_detect_server_ip.search(c)
if m and len(m.groups()): if m and len(m.groups()):
self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4]) self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
print "HOST: TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT) self.notify("HOST: TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
stdout.flush()
# We assume this test fails so can't send 'error' message to server # We assume this test fails so can't send 'error' message to server
try: try:
@ -54,8 +52,8 @@ class TCPEchoServerTest(DefaultTest):
self.s.connect((self.ECHO_SERVER_ADDRESS, self.ECHO_PORT)) self.s.connect((self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
except Exception, e: except Exception, e:
self.s = None self.s = None
print "HOST: Error: %s" % e self.notify("HOST: Error: %s"% e)
self.print_result('error') self.print_result(self.RESULT_ERROR)
exit(-1) exit(-1)
print 'HOST: Sending %d echo strings...'% self.ECHO_LOOPs, print 'HOST: Sending %d echo strings...'% self.ECHO_LOOPs,
@ -82,17 +80,14 @@ class TCPEchoServerTest(DefaultTest):
print "HOST: TCP Server not found" print "HOST: TCP Server not found"
result = False result = False
if result: self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
self.print_result('success')
else:
self.print_result('failure')
# Receiving # Receiving
try: try:
while True: while True:
c = self.mbed.serial_read(512) c = self.mbed.serial_read(512)
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
break break
stdout.write(c) stdout.write(c)
stdout.flush() stdout.flush()

View File

@ -84,7 +84,7 @@ class UDPEchoServerTest(DefaultTest):
def run(self): def run(self):
serial_ip_msg = self.mbed.serial_readline() serial_ip_msg = self.mbed.serial_readline()
if serial_ip_msg is None: if serial_ip_msg is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
stdout.write(serial_ip_msg) stdout.write(serial_ip_msg)
stdout.flush() stdout.flush()
@ -93,16 +93,15 @@ class UDPEchoServerTest(DefaultTest):
if m and len(m.groups()): if m and len(m.groups()):
self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4]) self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
print "HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT) self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
stdout.flush()
# Open client socket to burst datagrams to UDP server in mbed # Open client socket to burst datagrams to UDP server in mbed
try: try:
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
except Exception, e: except Exception, e:
self.s = None self.s = None
print "HOST: Error: %s" % e self.notify("HOST: Error: %s"% e)
self.print_result('error') self.print_result(self.RESULT_ERROR)
return return
# UDP replied receiver works in background to get echoed datagrams # UDP replied receiver works in background to get echoed datagrams
@ -129,8 +128,7 @@ class UDPEchoServerTest(DefaultTest):
for d in range(5): for d in range(5):
sleep(1.0) sleep(1.0)
summary_datagram_success = (float(len(dict_udp_recv_datagrams)) / float(self.TEST_PACKET_COUNT)) * 100.0 summary_datagram_success = (float(len(dict_udp_recv_datagrams)) / float(self.TEST_PACKET_COUNT)) * 100.0
# print dict_udp_recv_datagrams self.notify("HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms"% (d, summary_datagram_success, len(dict_udp_recv_datagrams), self.TEST_PACKET_COUNT, self.TEST_STRESS_FACTOR))
print "HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms" % (d, summary_datagram_success, len(dict_udp_recv_datagrams), self.TEST_PACKET_COUNT, self.TEST_STRESS_FACTOR)
result = result and (summary_datagram_success >= self.PACKET_SATURATION_RATIO) result = result and (summary_datagram_success >= self.PACKET_SATURATION_RATIO)
stdout.flush() stdout.flush()
@ -142,10 +140,7 @@ class UDPEchoServerTest(DefaultTest):
print print
stdout.flush() stdout.flush()
if result: self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
self.print_result('success')
else:
self.print_result('failure')
# Receiving serial data from mbed # Receiving serial data from mbed
print print
@ -154,7 +149,7 @@ class UDPEchoServerTest(DefaultTest):
while True: while True:
c = self.mbed.serial_read(512) c = self.mbed.serial_read(512)
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
break break
stdout.write(c) stdout.write(c)
stdout.flush() stdout.flush()

View File

@ -37,10 +37,9 @@ class UDPEchoClientTest(Test):
c = self.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...' c = self.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
print c.strip() self.notify(c.strip())
stdout.flush()
print "HOST: Sending server IP Address to target..." print "HOST: Sending server IP Address to target..."
connection_str = ip_address + ":" + str(port_no) + "\n" connection_str = ip_address + ":" + str(port_no) + "\n"
@ -48,10 +47,10 @@ class UDPEchoClientTest(Test):
c = self.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...' c = self.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...'
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
print c.strip() self.notify(c.strip())
stdout.flush()
class UDPEchoClient_Handler(BaseRequestHandler): class UDPEchoClient_Handler(BaseRequestHandler):
def handle(self): def handle(self):

View File

@ -37,17 +37,15 @@ class UDPEchoServerTest(DefaultTest):
result = True result = True
serial_ip_msg = self.mbed.serial_readline() serial_ip_msg = self.mbed.serial_readline()
if serial_ip_msg is None: if serial_ip_msg is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
stdout.write(serial_ip_msg) self.notify(serial_ip_msg)
stdout.flush()
# Searching for IP address and port prompted by server # Searching for IP address and port prompted by server
m = self.re_detect_server_ip.search(serial_ip_msg) m = self.re_detect_server_ip.search(serial_ip_msg)
if m and len(m.groups()): if m and len(m.groups()):
self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4]) self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
print "HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT) self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
stdout.flush()
# We assume this test fails so can't send 'error' message to server # We assume this test fails so can't send 'error' message to server
try: try:
@ -55,7 +53,7 @@ class UDPEchoServerTest(DefaultTest):
except Exception, e: except Exception, e:
self.s = None self.s = None
print "HOST: Error: %s" % e print "HOST: Error: %s" % e
self.print_result('error') self.print_result(self.RESULT_ERROR)
exit(-1) exit(-1)
for i in range(0, 100): for i in range(0, 100):
@ -74,17 +72,14 @@ class UDPEchoServerTest(DefaultTest):
if self.s is not None: if self.s is not None:
self.s.close() self.s.close()
if result: self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
self.print_result('success')
else:
self.print_result('failure')
# Receiving # Receiving
try: try:
while True: while True:
c = self.mbed.serial_read(512) c = self.mbed.serial_read(512)
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
break break
stdout.write(c) stdout.write(c)
stdout.flush() stdout.flush()

View File

@ -24,28 +24,28 @@ class WaitusTest(DefaultTest):
test_result = True test_result = True
# First character to start test (to know after reset when test starts) # First character to start test (to know after reset when test starts)
if self.mbed.serial_timeout(None) is None: if self.mbed.serial_timeout(None) is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
c = self.mbed.serial_read(1) c = self.mbed.serial_read(1)
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
if c == '$': # target will printout TargetID e.g.: $$$$1040e649d5c09a09a3f6bc568adef61375c6 if c == '$': # target will printout TargetID e.g.: $$$$1040e649d5c09a09a3f6bc568adef61375c6
#Read additional 39 bytes of TargetID #Read additional 39 bytes of TargetID
if self.mbed.serial_read(39) is None: if self.mbed.serial_read(39) is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
c = self.mbed.serial_read(1) # Re-read first 'tick' c = self.mbed.serial_read(1) # Re-read first 'tick'
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
print "Test started" self.notify("Test started")
start_serial_pool = time() start_serial_pool = time()
start = time() start = time()
for i in range(0, 10): for i in range(0, 10):
c = self.mbed.serial_read(1) c = self.mbed.serial_read(1)
if c is None: if c is None:
self.print_result("ioerr_serial") self.print_result(self.RESULT_IO_SERIAL)
return return
if i > 2: # we will ignore first few measurements if i > 2: # we will ignore first few measurements
delta = time() - start delta = time() - start
@ -57,18 +57,15 @@ class WaitusTest(DefaultTest):
deviation_ok = True if delta > 0 and deviation <= 0.10 else False # +/-10% deviation_ok = True if delta > 0 and deviation <= 0.10 else False # +/-10%
test_result = test_result and deviation_ok test_result = test_result and deviation_ok
msg = "OK" if deviation_ok else "FAIL" msg = "OK" if deviation_ok else "FAIL"
print ". in %.2f sec (%.2f) [%s]" % (delta, deviation, msg) self.notify(". in %.2f sec (%.2f) [%s]" % (delta, deviation, msg))
else: else:
print ". skipped" self.notify(". skipped")
stdout.flush() stdout.flush()
start = time() start = time()
measurement_time = time() - start_serial_pool measurement_time = time() - start_serial_pool
print "Completed in %.2f sec" % (measurement_time) self.notify("Completed in %.2f sec" % (measurement_time))
self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE)
if test_result: # All numbers are the same
self.print_result('success')
else:
self.print_result('failure')
if __name__ == '__main__': if __name__ == '__main__':
WaitusTest().run() WaitusTest().run()