mirror of https://github.com/ARMmbed/mbed-os.git
Added seral port read/write wrappers to handle serial port communication issues. Added IOERR_SERIAL handler for basic test. Other tests will be refactored in separate commits.
parent
f1db89ea0f
commit
82e4a672ca
|
@ -57,12 +57,33 @@ class Mbed:
|
||||||
print 'Mbed: "%s" "%s"' % (self.port, self.disk)
|
print 'Mbed: "%s" "%s"' % (self.port, self.disk)
|
||||||
|
|
||||||
def init_serial(self, baud=9600, extra_baud=9600):
|
def init_serial(self, baud=9600, extra_baud=9600):
|
||||||
self.serial = Serial(self.port, timeout = 1)
|
result = True
|
||||||
|
try:
|
||||||
|
self.serial = Serial(self.port, timeout=1)
|
||||||
|
except Exception as e:
|
||||||
|
result = False
|
||||||
|
# Port can be opened
|
||||||
|
if result:
|
||||||
self.serial.setBaudrate(baud)
|
self.serial.setBaudrate(baud)
|
||||||
if self.extra_port:
|
if self.extra_port:
|
||||||
self.extra_serial = Serial(self.extra_port, timeout = 1)
|
self.extra_serial = Serial(self.extra_port, timeout = 1)
|
||||||
self.extra_serial.setBaudrate(extra_baud)
|
self.extra_serial.setBaudrate(extra_baud)
|
||||||
self.flush()
|
self.flush()
|
||||||
|
return result
|
||||||
|
|
||||||
|
def serial_read(self, count=1):
|
||||||
|
""" Wraps self.mbed.serial object read method """
|
||||||
|
result = None
|
||||||
|
if self.serial:
|
||||||
|
result = self.serial.read(count)
|
||||||
|
return result
|
||||||
|
|
||||||
|
def serial_write(self, write_buffer):
|
||||||
|
""" Wraps self.mbed.serial object write method """
|
||||||
|
result = -1
|
||||||
|
if self.serial:
|
||||||
|
result = self.serial.write(write_buffer)
|
||||||
|
return result
|
||||||
|
|
||||||
def safe_sendBreak(self, serial):
|
def safe_sendBreak(self, serial):
|
||||||
""" Wraps serial.sendBreak() to avoid serial::serialposix.py exception on Linux
|
""" Wraps serial.sendBreak() to avoid serial::serialposix.py exception on Linux
|
||||||
|
@ -110,18 +131,28 @@ class Test:
|
||||||
print str(e)
|
print str(e)
|
||||||
self.print_result("error")
|
self.print_result("error")
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
""" Setup and check if configuration for test is correct. E.g. if serial port can be opened """
|
||||||
|
result = True
|
||||||
|
if not self.mbed.serial:
|
||||||
|
result = False
|
||||||
|
self.print_result("ioerr_serial")
|
||||||
|
return result
|
||||||
|
|
||||||
def notify(self, message):
|
def notify(self, message):
|
||||||
|
""" On screen notification function """
|
||||||
print message
|
print message
|
||||||
stdout.flush()
|
stdout.flush()
|
||||||
|
|
||||||
def print_result(self, result):
|
def print_result(self, result):
|
||||||
|
""" Test result unified printing function """
|
||||||
self.notify("\n{%s}\n{end}" % result)
|
self.notify("\n{%s}\n{end}" % result)
|
||||||
|
|
||||||
|
|
||||||
class DefaultTest(Test):
|
class DefaultTest(Test):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
Test.__init__(self)
|
Test.__init__(self)
|
||||||
self.mbed.init_serial()
|
serial_init_res = self.mbed.init_serial()
|
||||||
self.mbed.reset()
|
self.mbed.reset()
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
@ -140,7 +171,10 @@ class Simple(DefaultTest):
|
||||||
def run(self):
|
def run(self):
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
c = self.mbed.serial.read(512)
|
c = self.mbed.serial_read(512)
|
||||||
|
if c is None:
|
||||||
|
self.print_result("ioerr_serial")
|
||||||
|
break
|
||||||
stdout.write(c)
|
stdout.write(c)
|
||||||
stdout.flush()
|
stdout.flush()
|
||||||
except KeyboardInterrupt, _:
|
except KeyboardInterrupt, _:
|
||||||
|
|
Loading…
Reference in New Issue