diff --git a/workspace_tools/host_tests/host_test.py b/workspace_tools/host_tests/host_test.py index c00c8ba2fe..4dd16505a2 100644 --- a/workspace_tools/host_tests/host_test.py +++ b/workspace_tools/host_tests/host_test.py @@ -126,21 +126,43 @@ class Mbed: serial_baud = serial_baud if serial_baud is not None else self.serial_baud serial_timeout = serial_timeout if serial_timeout is not None else self.serial_timeout + # Clear serial port if self.serial: self.serial.close() self.serial = None - result = True - try: - self.serial = Serial(self.port, baudrate=serial_baud, timeout=serial_timeout) - except Exception as e: - print "MBED: %s"% str(e) - result = False + # We will pool for serial to be re-mounted if it was unmounted after device reset + result = self.pool_for_serial_init(serial_baud, serial_timeout) # Blocking + # Port can be opened if result: self.flush() return result + def pool_for_serial_init(self, serial_baud, serial_timeout, pooling_loops=40, init_delay=0.5, loop_delay=0.25): + """ Functions pools for serial port readiness + """ + result = True + last_error = None + # This loop is used to check for serial port availability due to + # some delays and remounting when devices are being flashed with new software. + for i in range(pooling_loops): + sleep(loop_delay if i else init_delay) + try: + self.serial = Serial(self.port, baudrate=serial_baud, timeout=serial_timeout) + except Exception as e: + result = False + last_error = "MBED: %s"% str(e) + stdout.write('.') + stdout.flush() + else: + print "...port ready!" + result = True + break + if not result and last_error: + print last_error + return result + def set_serial_timeout(self, timeout): """ Wraps self.mbed.serial object timeout property """ diff --git a/workspace_tools/host_tests/host_tests_plugins/host_test_plugins.py b/workspace_tools/host_tests/host_tests_plugins/host_test_plugins.py index 27b949e4dc..8bc1da35d3 100644 --- a/workspace_tools/host_tests/host_tests_plugins/host_test_plugins.py +++ b/workspace_tools/host_tests/host_tests_plugins/host_test_plugins.py @@ -15,6 +15,9 @@ See the License for the specific language governing permissions and limitations under the License. """ +from os import access, F_OK +from sys import stdout +from time import sleep from subprocess import call @@ -58,6 +61,34 @@ class HostTestPluginBase: print "Plugin error: %s::%s: %s"% (self.name, self.type, text) return False + def print_plugin_info(self, text, NL=True): + """ Function prints notification in console and exits always with True + """ + if NL: + print "Plugin info: %s::%s: %s"% (self.name, self.type, text) + else: + print "Plugin info: %s::%s: %s"% (self.name, self.type, text), + return True + + def print_plugin_char(self, char): + """ Function prints char on stdout + """ + stdout.write(char) + stdout.flush() + return True + + def check_mount_point_ready(self, destination_disk, init_delay=0.2, loop_delay=0.25): + """ Checks if destination_disk is ready and can be accessed by e.g. copy commands + @init_delay - Initial delay time before first access check + @loop_delay - pooling delay for access check + """ + if not access(destination_disk, F_OK): + self.print_plugin_info("Waiting for mount point '%s' to be ready..."% destination_disk, NL=False) + sleep(init_delay) + while not access(destination_disk, F_OK): + sleep(loop_delay) + self.print_plugin_char('.') + def check_parameters(self, capabilitity, *args, **kwargs): """ This function should be ran each time we call execute() to check if none of the required parameters is missing. diff --git a/workspace_tools/host_tests/host_tests_plugins/module_copy_mbed.py b/workspace_tools/host_tests/host_tests_plugins/module_copy_mbed.py index dc5d8fd9dd..18fe0c42ae 100644 --- a/workspace_tools/host_tests/host_tests_plugins/module_copy_mbed.py +++ b/workspace_tools/host_tests/host_tests_plugins/module_copy_mbed.py @@ -59,6 +59,8 @@ class HostTestPluginCopyMethod_Mbed(HostTestPluginBase): if capabilitity == 'default': image_path = kwargs['image_path'] destination_disk = kwargs['destination_disk'] + # Wait for mount point to be ready + self.check_mount_point_ready(destination_disk) # Blocking result = self.generic_mbed_copy(image_path, destination_disk) return result diff --git a/workspace_tools/host_tests/host_tests_plugins/module_copy_shell.py b/workspace_tools/host_tests/host_tests_plugins/module_copy_shell.py index 00f79c0d87..f7fb23b0a7 100644 --- a/workspace_tools/host_tests/host_tests_plugins/module_copy_shell.py +++ b/workspace_tools/host_tests/host_tests_plugins/module_copy_shell.py @@ -43,6 +43,8 @@ class HostTestPluginCopyMethod_Shell(HostTestPluginBase): if self.check_parameters(capabilitity, *args, **kwargs) is True: image_path = kwargs['image_path'] destination_disk = kwargs['destination_disk'] + # Wait for mount point to be ready + self.check_mount_point_ready(destination_disk) # Blocking # Prepare correct command line parameter values image_base_name = basename(image_path) destination_path = join(destination_disk, image_base_name)