Merge pull request #986 from PrzemekWirkus/plugin_pooling

Added timeouts (delays) on serial port and mounting point
pull/995/head
Martin Kojtal 2015-03-24 08:45:47 +00:00
commit 9f429e66eb
4 changed files with 63 additions and 6 deletions

View File

@ -126,21 +126,43 @@ class Mbed:
serial_baud = serial_baud if serial_baud is not None else self.serial_baud
serial_timeout = serial_timeout if serial_timeout is not None else self.serial_timeout
# Clear serial port
if self.serial:
self.serial.close()
self.serial = None
result = True
try:
self.serial = Serial(self.port, baudrate=serial_baud, timeout=serial_timeout)
except Exception as e:
print "MBED: %s"% str(e)
result = False
# We will pool for serial to be re-mounted if it was unmounted after device reset
result = self.pool_for_serial_init(serial_baud, serial_timeout) # Blocking
# Port can be opened
if result:
self.flush()
return result
def pool_for_serial_init(self, serial_baud, serial_timeout, pooling_loops=40, init_delay=0.5, loop_delay=0.25):
""" Functions pools for serial port readiness
"""
result = True
last_error = None
# This loop is used to check for serial port availability due to
# some delays and remounting when devices are being flashed with new software.
for i in range(pooling_loops):
sleep(loop_delay if i else init_delay)
try:
self.serial = Serial(self.port, baudrate=serial_baud, timeout=serial_timeout)
except Exception as e:
result = False
last_error = "MBED: %s"% str(e)
stdout.write('.')
stdout.flush()
else:
print "...port ready!"
result = True
break
if not result and last_error:
print last_error
return result
def set_serial_timeout(self, timeout):
""" Wraps self.mbed.serial object timeout property
"""

View File

@ -15,6 +15,9 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
from os import access, F_OK
from sys import stdout
from time import sleep
from subprocess import call
@ -58,6 +61,34 @@ class HostTestPluginBase:
print "Plugin error: %s::%s: %s"% (self.name, self.type, text)
return False
def print_plugin_info(self, text, NL=True):
""" Function prints notification in console and exits always with True
"""
if NL:
print "Plugin info: %s::%s: %s"% (self.name, self.type, text)
else:
print "Plugin info: %s::%s: %s"% (self.name, self.type, text),
return True
def print_plugin_char(self, char):
""" Function prints char on stdout
"""
stdout.write(char)
stdout.flush()
return True
def check_mount_point_ready(self, destination_disk, init_delay=0.2, loop_delay=0.25):
""" Checks if destination_disk is ready and can be accessed by e.g. copy commands
@init_delay - Initial delay time before first access check
@loop_delay - pooling delay for access check
"""
if not access(destination_disk, F_OK):
self.print_plugin_info("Waiting for mount point '%s' to be ready..."% destination_disk, NL=False)
sleep(init_delay)
while not access(destination_disk, F_OK):
sleep(loop_delay)
self.print_plugin_char('.')
def check_parameters(self, capabilitity, *args, **kwargs):
""" This function should be ran each time we call execute()
to check if none of the required parameters is missing.

View File

@ -59,6 +59,8 @@ class HostTestPluginCopyMethod_Mbed(HostTestPluginBase):
if capabilitity == 'default':
image_path = kwargs['image_path']
destination_disk = kwargs['destination_disk']
# Wait for mount point to be ready
self.check_mount_point_ready(destination_disk) # Blocking
result = self.generic_mbed_copy(image_path, destination_disk)
return result

View File

@ -43,6 +43,8 @@ class HostTestPluginCopyMethod_Shell(HostTestPluginBase):
if self.check_parameters(capabilitity, *args, **kwargs) is True:
image_path = kwargs['image_path']
destination_disk = kwargs['destination_disk']
# Wait for mount point to be ready
self.check_mount_point_ready(destination_disk) # Blocking
# Prepare correct command line parameter values
image_base_name = basename(image_path)
destination_path = join(destination_disk, image_base_name)