Merge pull request #588 from PrzemekWirkus/host_test_OO_refactoring_part2

Host test refactoring  - part2
pull/609/head^2
Bogdan Marinescu 2014-10-21 14:05:26 +01:00
commit a1b06cd939
11 changed files with 80 additions and 88 deletions

View File

@ -15,31 +15,27 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
from host_test import DefaultTest
from sys import stdout
import re
from host_test import DefaultTest
class DetectPlatformTest(DefaultTest):
PATTERN_MICRO_NAME = "Target '(\w+)'"
re_detect_micro_name = re.compile(PATTERN_MICRO_NAME)
def run(self):
def test(self):
result = True
c = self.mbed.serial_readline() # {{start}} preamble
print c
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
self.notify(c.strip())
self.notify("HOST: Detecting target name...")
c = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
self.notify(c.strip())
# Check for target name
@ -53,11 +49,10 @@ class DetectPlatformTest(DefaultTest):
for i in range(0, 2):
c = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
self.notify(c.strip())
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':

View File

@ -16,7 +16,7 @@ limitations under the License.
"""
from host_test import DefaultTest
from sys import stdout
class DevNullTest(DefaultTest):
@ -25,18 +25,15 @@ class DevNullTest(DefaultTest):
"""
result = False
c = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return None
if text in c:
if c and text in c:
result = True
return result
def run(self):
def test(self):
result = True
# Test should print some text and later stop printing
res = self.check_readline("re-routing stdout to /null") # 'MBED: re-routing stdout to /null'
# 'MBED: re-routing stdout to /null'
res = self.check_readline("re-routing stdout to /null")
if not res:
# We haven't read preamble line
result = False
@ -46,16 +43,14 @@ class DevNullTest(DefaultTest):
for i in range(3):
c = self.mbed.serial_read(32)
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
else:
str += c
if len(str) > 0:
result = False
break
self.notify("Received %d bytes: %s"% (len(str), str))
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':

View File

@ -14,6 +14,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import sys
import uuid
from sys import stdout
@ -21,42 +22,49 @@ from host_test import TestResults, Test
class EchoTest(Test):
""" This host test will use mbed serial port with
baudrate 115200 to perform echo test on that port.
"""
def __init__(self):
# Constructors
TestResults.__init__(self)
Test.__init__(self)
serial_init_res = self.mbed.init_serial(115200)
# Test parameters
self.TEST_SERIAL_BAUDRATE = 115200
self.TEST_LOOP_COUNT = 50
# Initializations
serial_init_res = self.mbed.init_serial(self.TEST_SERIAL_BAUDRATE)
if not serial_init_res:
self.print_result(self.RESULT_IO_SERIAL)
self.mbed.reset()
self.TEST_LOOP_COUNT = 50
def test(self):
""" Test function, return True or False to get standard test notification on stdout
"""
c = self.mbed.serial_readline() # '{{start}}'
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
print c.strip()
stdout.flush()
return self.RESULT_IO_SERIAL
self.notify(c.strip())
self.mbed.flush()
self.notify("HOST: Starting the ECHO test")
result = True
for i in range(0, self.TEST_LOOP_COUNT):
TEST = str(uuid.uuid4()) + "\n"
self.mbed.serial_write(TEST)
TEST_STRING = str(uuid.uuid4()) + "\n"
self.mbed.serial_write(TEST_STRING)
c = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
if c.strip() != TEST.strip():
self.notify('HOST: "%s" != "%s"'% (c, TEST))
return self.RESULT_IO_SERIAL
if c.strip() != TEST_STRING.strip():
self.notify('HOST: "%s" != "%s"'% (c, TEST_STRING))
result = False
else:
sys.stdout.write('.')
stdout.flush()
return result
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':

View File

@ -16,24 +16,21 @@ limitations under the License.
"""
from host_test import DefaultTest
from sys import stdout
import re
class HelloTest(DefaultTest):
HELLO_WORLD = "Hello World"
def run(self):
def test(self):
c = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
self.notify(c.strip())
c = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
self.notify("Read %d bytes"% len(c))
return self.RESULT_IO_SERIAL
self.notify("Read %d bytes:"% len(c))
self.notify(c.strip())
result = True
@ -42,7 +39,7 @@ class HelloTest(DefaultTest):
result = False
else:
result = self.HELLO_WORLD in c
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
if __name__ == '__main__':

View File

@ -22,10 +22,12 @@ except ImportError, e:
print "Error: Can't import 'serial' module: %s"% e
exit(-1)
import os
from optparse import OptionParser
from time import sleep, time
from sys import stdout
from time import sleep, time
from optparse import OptionParser
# This is a little tricky. We need to add upper directory to path so
# we can find packages we want from the same level as other files do
@ -273,17 +275,20 @@ class Test(TestResults):
def run(self):
""" Test runner for host test. This function will start executing
test and forward test result via serial port to test suite
test() function and forward test result via serial port to test suite
"""
try:
result = self.test()
self.print_result(self.RESULT_SUCCESS if result else self.RESULT_FAILURE)
# We expect here output from test in one of possible statuses
# E.g. self.RESULT_SUCCESS, self.RESULT_FAILURE, self.RESULT_ERROR
result_status = self.test()
self.print_result(result_status)
except Exception, e:
print str(e)
self.notify(str(e))
self.print_result(self.RESULT_ERROR)
def setup(self):
""" Setup and check if configuration for test is correct. E.g. if serial port can be opened
""" Setup and check if configuration for test is correct.
E.g. if serial port can be opened
"""
result = True
if not self.mbed.serial:
@ -320,17 +325,19 @@ class Simple(DefaultTest):
output from MUT, no supervision over test running in MUT is executed.
Just waiting for result
"""
def run(self):
def test(self):
try:
while True:
c = self.mbed.serial_read(512)
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
break
return self.RESULT_IO_SERIAL
stdout.write(c)
stdout.flush()
except KeyboardInterrupt, _:
self.notify("\r\n[CTRL+C] exit")
# If this function ends we assume user break or exception
# occured and error should be issued to test suite
return self.RESULT_ERROR
if __name__ == '__main__':

View File

@ -18,13 +18,13 @@ limitations under the License.
import re
from host_test import DefaultTest
from time import time, strftime, gmtime
from sys import stdout
class RTCTest(DefaultTest):
PATTERN_RTC_VALUE = "\[(\d+)\] \[(\d+-\d+-\d+ \d+:\d+:\d+ [AaPpMm]{2})\]"
re_detect_rtc_value = re.compile(PATTERN_RTC_VALUE)
def run(self):
def test(self):
test_result = True
start = time()
sec_prev = 0
@ -32,8 +32,7 @@ class RTCTest(DefaultTest):
# Timeout changed from default: we need to wait longer for some boards to start-up
c = self.mbed.serial_readline(timeout=10)
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
self.notify(c.strip())
delta = time() - start
m = self.re_detect_rtc_value.search(c)
@ -49,7 +48,7 @@ class RTCTest(DefaultTest):
test_result = False
break
start = time()
self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE)
return self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE
if __name__ == '__main__':

View File

@ -17,21 +17,20 @@ limitations under the License.
import re
import random
from sys import stdout
from time import time
from host_test import DefaultTest
class StdioTest(DefaultTest):
PATTERN_INT_VALUE = "Your value was: (-?\d+)"
re_detect_int_value = re.compile(PATTERN_INT_VALUE)
def run(self):
def test(self):
test_result = True
c = self.mbed.serial_readline() # {{start}} preamble
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
self.notify(c)
for i in range(0, 10):
@ -42,8 +41,7 @@ class StdioTest(DefaultTest):
serial_stdio_msg = self.mbed.serial_readline()
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
delay_time = time() - start
self.notify(serial_stdio_msg.strip())
@ -57,8 +55,7 @@ class StdioTest(DefaultTest):
else:
test_result = False
break
self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE)
return self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE
if __name__ == '__main__':

View File

@ -18,7 +18,6 @@ limitations under the License.
import sys
import socket
from sys import stdout
from time import sleep
from host_test import Test
from SocketServer import BaseRequestHandler, TCPServer

View File

@ -20,9 +20,9 @@ import sys
import uuid
import socket
from sys import stdout
from time import time
from host_test import DefaultTest
class TCPEchoServerTest(DefaultTest):
ECHO_SERVER_ADDRESS = ""
ECHO_PORT = 0

View File

@ -15,12 +15,10 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
import re
import sys
import uuid
from sys import stdout
from time import time
from host_test import DefaultTest
from socket import socket, AF_INET, SOCK_DGRAM

View File

@ -15,38 +15,36 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
from host_test import DefaultTest
from time import time
from sys import stdout
from host_test import DefaultTest
class WaitusTest(DefaultTest):
def run(self):
""" This test is reading single characters from stdio
and measures time between their occurrences.
"""
def test(self):
test_result = True
# First character to start test (to know after reset when test starts)
if self.mbed.serial_timeout(None) is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
c = self.mbed.serial_read(1)
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
if c == '$': # target will printout TargetID e.g.: $$$$1040e649d5c09a09a3f6bc568adef61375c6
#Read additional 39 bytes of TargetID
if self.mbed.serial_read(39) is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
c = self.mbed.serial_read(1) # Re-read first 'tick'
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
self.notify("Test started")
start_serial_pool = time()
start = time()
for i in range(0, 10):
c = self.mbed.serial_read(1)
if c is None:
self.print_result(self.RESULT_IO_SERIAL)
return
return self.RESULT_IO_SERIAL
if i > 2: # we will ignore first few measurements
delta = time() - start
deviation = abs(delta - 1)
@ -60,12 +58,11 @@ class WaitusTest(DefaultTest):
self.notify(". in %.2f sec (%.2f) [%s]" % (delta, deviation, msg))
else:
self.notify(". skipped")
stdout.flush()
start = time()
measurement_time = time() - start_serial_pool
self.notify("Completed in %.2f sec" % (measurement_time))
self.print_result(self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE)
return self.RESULT_SUCCESS if test_result else self.RESULT_FAILURE
if __name__ == '__main__':
WaitusTest().run()