mirror of https://github.com/ARMmbed/mbed-os.git
				
				
				
			
		
			
				
	
	
		
			348 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			348 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
"""
 | 
						|
mbed SDK
 | 
						|
Copyright (c) 2018 ARM Limited
 | 
						|
SPDX-License-Identifier: Apache-2.0
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
"""
 | 
						|
from __future__ import print_function
 | 
						|
import functools
 | 
						|
import itertools
 | 
						|
import time
 | 
						|
import threading
 | 
						|
import uuid
 | 
						|
import sys
 | 
						|
import serial
 | 
						|
import serial.tools.list_ports as stlp
 | 
						|
import mbed_host_tests
 | 
						|
 | 
						|
 | 
						|
MSG_KEY_DEVICE_READY = 'ready'
 | 
						|
MSG_KEY_SERIAL_NUMBER = 'usb_dev_sn'
 | 
						|
MSG_KEY_PORT_OPEN_WAIT = 'port_open_wait'
 | 
						|
MSG_KEY_PORT_OPEN_CLOSE = 'port_open_close'
 | 
						|
MSG_KEY_SEND_BYTES_SINGLE = 'send_single'
 | 
						|
MSG_KEY_SEND_BYTES_MULTIPLE = 'send_multiple'
 | 
						|
MSG_KEY_LOOPBACK = 'loopback'
 | 
						|
MSG_KEY_CHANGE_LINE_CODING = 'change_lc'
 | 
						|
 | 
						|
RX_BUFF_SIZE = 32
 | 
						|
 | 
						|
# This delay eliminates the possibility of the device detecting
 | 
						|
# the port being closed when still waiting for data.
 | 
						|
TERM_CLOSE_DELAY = 0.01
 | 
						|
 | 
						|
# A duration the serial terminal is open on the host side
 | 
						|
# during terminal reopen test.
 | 
						|
TERM_REOPEN_DELAY = 0.1
 | 
						|
 | 
						|
# 6 (baud) + 2 (bits) + 1 (parity) + 1 (stop) + 3 * comma
 | 
						|
LINE_CODING_STRLEN = 13
 | 
						|
 | 
						|
 | 
						|
def usb_serial_name(serial_number):
 | 
						|
    """Get USB serial device name based on the device serial number."""
 | 
						|
    if sys.platform.startswith('win'):
 | 
						|
        # The USB spec defines all USB string descriptors to be
 | 
						|
        # UNICODE UTF-16LE. Windows however, decodes the USB serial
 | 
						|
        # number string descriptor as uppercase characters only.
 | 
						|
        # To solve this issue, convert the pattern to uppercase.
 | 
						|
        serial_number = str(serial_number).upper()
 | 
						|
    for port_info in stlp.comports():
 | 
						|
        if port_info.serial_number == serial_number:
 | 
						|
            return port_info.device
 | 
						|
    return None
 | 
						|
 | 
						|
 | 
						|
class RetryError(Exception):
 | 
						|
    """Exception raised by retry_fun_call()."""
 | 
						|
 | 
						|
 | 
						|
def retry_fun_call(fun, num_retries=3, retry_delay=0.0):
 | 
						|
    """Call fun and retry if any exception was raised.
 | 
						|
 | 
						|
    fun is called at most num_retries with a retry_dalay in between calls.
 | 
						|
    Raises RetryError if the retry limit is exhausted.
 | 
						|
    """
 | 
						|
    verbose = False
 | 
						|
    final_err = None
 | 
						|
    for retry in range(1, num_retries + 1):
 | 
						|
        try:
 | 
						|
            return fun()  # pylint: disable=not-callable
 | 
						|
        except Exception as exc:  # pylint: disable=broad-except
 | 
						|
            final_err = exc
 | 
						|
            if verbose:
 | 
						|
                print('Retry {}/{} failed ({})'
 | 
						|
                      .format(retry, num_retries, str(fun)))
 | 
						|
            time.sleep(retry_delay)
 | 
						|
    err_msg = 'Failed with "{}". Tried {} times.'
 | 
						|
    raise RetryError(err_msg.format(final_err, num_retries))
 | 
						|
 | 
						|
 | 
						|
class USBSerialTest(mbed_host_tests.BaseHostTest):
 | 
						|
    """Host side test for USB CDC & Serial classes."""
 | 
						|
 | 
						|
    _BYTESIZES = {
 | 
						|
        5: serial.FIVEBITS,
 | 
						|
        6: serial.SIXBITS,
 | 
						|
        7: serial.SEVENBITS,
 | 
						|
        8: serial.EIGHTBITS}
 | 
						|
    _PARITIES = {
 | 
						|
        0: serial.PARITY_NONE,
 | 
						|
        1: serial.PARITY_ODD,
 | 
						|
        2: serial.PARITY_EVEN,
 | 
						|
        3: serial.PARITY_MARK,
 | 
						|
        4: serial.PARITY_SPACE}
 | 
						|
    _STOPBITS = {
 | 
						|
        0: serial.STOPBITS_ONE,
 | 
						|
        1: serial.STOPBITS_ONE_POINT_FIVE,
 | 
						|
        2: serial.STOPBITS_TWO}
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_usb_serial_name(usb_id_str):
 | 
						|
        """Get USB serial device name as registered in the system.
 | 
						|
 | 
						|
        Search is based on the unique USB SN generated by the host
 | 
						|
        during test suite setup.
 | 
						|
        Raises RuntimeError if the device is not found.
 | 
						|
        """
 | 
						|
        port_name = usb_serial_name(usb_id_str)
 | 
						|
        if port_name is None:
 | 
						|
            err_msg = 'USB serial device (SN={}) not found.'
 | 
						|
            raise RuntimeError(err_msg.format(usb_id_str))
 | 
						|
        return port_name
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        super(USBSerialTest, self).__init__()
 | 
						|
        self.__bg_task = None
 | 
						|
        self.dut_usb_dev_sn = uuid.uuid4().hex  # 32 hex digit string
 | 
						|
 | 
						|
    def port_open_wait(self):
 | 
						|
        """Open the serial and wait until it's closed by the device."""
 | 
						|
        mbed_serial = serial.Serial(dsrdtr=False)
 | 
						|
        mbed_serial.dtr = False
 | 
						|
        try:
 | 
						|
            mbed_serial.port = retry_fun_call(
 | 
						|
                fun=functools.partial(self.get_usb_serial_name, self.dut_usb_dev_sn),  # pylint: disable=not-callable
 | 
						|
                num_retries=20,
 | 
						|
                retry_delay=0.05)
 | 
						|
            retry_fun_call(
 | 
						|
                fun=mbed_serial.open,
 | 
						|
                num_retries=20,
 | 
						|
                retry_delay=0.05)
 | 
						|
        except RetryError as exc:
 | 
						|
            self.log('TEST ERROR: {}'.format(exc))
 | 
						|
            self.notify_complete(False)
 | 
						|
            return
 | 
						|
        mbed_serial.dtr = True
 | 
						|
        try:
 | 
						|
            mbed_serial.read()  # wait until closed
 | 
						|
        except (serial.portNotOpenError, serial.SerialException):
 | 
						|
            pass
 | 
						|
 | 
						|
    def port_open_close(self):
 | 
						|
        """Open the serial and close it with a delay."""
 | 
						|
        mbed_serial = serial.Serial(timeout=0.5, write_timeout=0.1, dsrdtr=False)
 | 
						|
        mbed_serial.dtr = False
 | 
						|
        try:
 | 
						|
            mbed_serial.port = retry_fun_call(
 | 
						|
                fun=functools.partial(self.get_usb_serial_name, self.dut_usb_dev_sn),  # pylint: disable=not-callable
 | 
						|
                num_retries=20,
 | 
						|
                retry_delay=0.05)
 | 
						|
            retry_fun_call(
 | 
						|
                fun=mbed_serial.open,
 | 
						|
                num_retries=20,
 | 
						|
                retry_delay=0.05)
 | 
						|
        except RetryError as exc:
 | 
						|
            self.log('TEST ERROR: {}'.format(exc))
 | 
						|
            self.notify_complete(False)
 | 
						|
            return
 | 
						|
        mbed_serial.reset_output_buffer()
 | 
						|
        mbed_serial.dtr = True
 | 
						|
        time.sleep(TERM_REOPEN_DELAY)
 | 
						|
        mbed_serial.close()
 | 
						|
 | 
						|
    def send_data_sequence(self, chunk_size=1):
 | 
						|
        """Open the serial and send a sequence of values.
 | 
						|
 | 
						|
        chunk_size defines the size of data sent in each write operation.
 | 
						|
        The input buffer content is discarded.
 | 
						|
        """
 | 
						|
        mbed_serial = serial.Serial(write_timeout=0.1, dsrdtr=False)
 | 
						|
        try:
 | 
						|
            mbed_serial.port = retry_fun_call(
 | 
						|
                fun=functools.partial(self.get_usb_serial_name, self.dut_usb_dev_sn),  # pylint: disable=not-callable
 | 
						|
                num_retries=20,
 | 
						|
                retry_delay=0.05)
 | 
						|
            retry_fun_call(
 | 
						|
                fun=mbed_serial.open,
 | 
						|
                num_retries=20,
 | 
						|
                retry_delay=0.05)
 | 
						|
        except RetryError as exc:
 | 
						|
            self.log('TEST ERROR: {}'.format(exc))
 | 
						|
            self.notify_complete(False)
 | 
						|
            return
 | 
						|
        mbed_serial.reset_output_buffer()
 | 
						|
        mbed_serial.dtr = True
 | 
						|
        for byteval in itertools.chain(reversed(range(0x100)), range(0x100)):
 | 
						|
            try:
 | 
						|
                payload = bytearray(chunk_size * (byteval,))
 | 
						|
                mbed_serial.write(payload)
 | 
						|
#                 self.log('SENT: {!r}'.format(payload))
 | 
						|
                # Discard input buffer content. The data received from the
 | 
						|
                # device during the concurrent rx/tx test is irrelevant.
 | 
						|
                mbed_serial.reset_input_buffer()
 | 
						|
            except serial.SerialException as exc:
 | 
						|
                self.log('TEST ERROR: {}'.format(exc))
 | 
						|
                self.notify_complete(False)
 | 
						|
                return
 | 
						|
        while mbed_serial.out_waiting > 0:
 | 
						|
            time.sleep(0.001)
 | 
						|
        time.sleep(TERM_CLOSE_DELAY)
 | 
						|
        mbed_serial.close()
 | 
						|
 | 
						|
    def loopback(self):
 | 
						|
        """Open the serial and send back every byte received."""
 | 
						|
        mbed_serial = serial.Serial(timeout=0.5, write_timeout=0.1, dsrdtr=False)
 | 
						|
        mbed_serial.dtr = False
 | 
						|
        try:
 | 
						|
            mbed_serial.port = retry_fun_call(
 | 
						|
                fun=functools.partial(self.get_usb_serial_name, self.dut_usb_dev_sn),  # pylint: disable=not-callable
 | 
						|
                num_retries=20,
 | 
						|
                retry_delay=0.05)
 | 
						|
            retry_fun_call(
 | 
						|
                fun=mbed_serial.open,
 | 
						|
                num_retries=20,
 | 
						|
                retry_delay=0.05)
 | 
						|
        except RetryError as exc:
 | 
						|
            self.log('TEST ERROR: {}'.format(exc))
 | 
						|
            self.notify_complete(False)
 | 
						|
            return
 | 
						|
        mbed_serial.reset_output_buffer()
 | 
						|
        mbed_serial.dtr = True
 | 
						|
        try:
 | 
						|
            payload = mbed_serial.read(1)
 | 
						|
            while len(payload) == 1:
 | 
						|
                mbed_serial.write(payload)
 | 
						|
#                 self.log('SENT: {!r}'.format(payload))
 | 
						|
                payload = mbed_serial.read(1)
 | 
						|
        except serial.SerialException as exc:
 | 
						|
            self.log('TEST ERROR: {}'.format(exc))
 | 
						|
            self.notify_complete(False)
 | 
						|
            return
 | 
						|
        while mbed_serial.out_waiting > 0:
 | 
						|
            time.sleep(0.001)
 | 
						|
        time.sleep(TERM_CLOSE_DELAY)
 | 
						|
        mbed_serial.close()
 | 
						|
 | 
						|
    def change_line_coding(self):
 | 
						|
        """Open the serial and change serial params according to device request.
 | 
						|
 | 
						|
        New line coding params are read from the device serial data.
 | 
						|
        """
 | 
						|
        mbed_serial = serial.Serial(timeout=0.5, dsrdtr=False)
 | 
						|
        mbed_serial.dtr = False
 | 
						|
        try:
 | 
						|
            mbed_serial.port = retry_fun_call(
 | 
						|
                fun=functools.partial(self.get_usb_serial_name, self.dut_usb_dev_sn),  # pylint: disable=not-callable
 | 
						|
                num_retries=20,
 | 
						|
                retry_delay=0.05)
 | 
						|
            retry_fun_call(
 | 
						|
                fun=mbed_serial.open,
 | 
						|
                num_retries=20,
 | 
						|
                retry_delay=0.05)
 | 
						|
        except RetryError as exc:
 | 
						|
            self.log('TEST ERROR: {}'.format(exc))
 | 
						|
            self.notify_complete(False)
 | 
						|
            return
 | 
						|
        mbed_serial.reset_output_buffer()
 | 
						|
        mbed_serial.dtr = True
 | 
						|
        try:
 | 
						|
            payload = mbed_serial.read(LINE_CODING_STRLEN)
 | 
						|
            while len(payload) == LINE_CODING_STRLEN:
 | 
						|
                baud, bits, parity, stop = (int(i) for i in payload.split(','))
 | 
						|
                new_line_coding = {
 | 
						|
                    'baudrate': baud,
 | 
						|
                    'bytesize': self._BYTESIZES[bits],
 | 
						|
                    'parity': self._PARITIES[parity],
 | 
						|
                    'stopbits': self._STOPBITS[stop]}
 | 
						|
                mbed_serial.apply_settings(new_line_coding)
 | 
						|
                payload = mbed_serial.read(LINE_CODING_STRLEN)
 | 
						|
        except serial.SerialException as exc:
 | 
						|
            self.log('TEST ERROR: {}'.format(exc))
 | 
						|
            self.notify_complete(False)
 | 
						|
            return
 | 
						|
        time.sleep(TERM_CLOSE_DELAY)
 | 
						|
        mbed_serial.close()
 | 
						|
 | 
						|
    def setup(self):
 | 
						|
        self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
 | 
						|
        self.register_callback(MSG_KEY_PORT_OPEN_WAIT, self.cb_port_open_wait)
 | 
						|
        self.register_callback(MSG_KEY_PORT_OPEN_CLOSE, self.cb_port_open_close)
 | 
						|
        self.register_callback(MSG_KEY_SEND_BYTES_SINGLE, self.cb_send_bytes_single)
 | 
						|
        self.register_callback(MSG_KEY_SEND_BYTES_MULTIPLE, self.cb_send_bytes_multiple)
 | 
						|
        self.register_callback(MSG_KEY_LOOPBACK, self.cb_loopback)
 | 
						|
        self.register_callback(MSG_KEY_CHANGE_LINE_CODING, self.cb_change_line_coding)
 | 
						|
 | 
						|
    def cb_device_ready(self, key, value, timestamp):
 | 
						|
        """Send a unique USB SN to the device.
 | 
						|
 | 
						|
        DUT uses this SN every time it connects to host as a USB device.
 | 
						|
        """
 | 
						|
        self.send_kv(MSG_KEY_SERIAL_NUMBER, self.dut_usb_dev_sn)
 | 
						|
 | 
						|
    def start_bg_task(self, **thread_kwargs):
 | 
						|
        """Start a new daemon thread.
 | 
						|
 | 
						|
        The callbacks delegate serial handling to a background task to
 | 
						|
        prevent any delays in the device side assert handling. Only one
 | 
						|
        background task is kept running to prevent multiple access
 | 
						|
        to serial.
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            self.__bg_task.join()
 | 
						|
        except (AttributeError, RuntimeError):
 | 
						|
            pass
 | 
						|
        self.__bg_task = threading.Thread(**thread_kwargs)
 | 
						|
        self.__bg_task.daemon = True
 | 
						|
        self.__bg_task.start()
 | 
						|
 | 
						|
    def cb_port_open_wait(self, key, value, timestamp):
 | 
						|
        """Open the serial and wait until it's closed by the device."""
 | 
						|
        self.start_bg_task(target=self.port_open_wait)
 | 
						|
 | 
						|
    def cb_port_open_close(self, key, value, timestamp):
 | 
						|
        """Open the serial and close it with a delay."""
 | 
						|
        self.start_bg_task(target=self.port_open_close)
 | 
						|
 | 
						|
    def cb_send_bytes_single(self, key, value, timestamp):
 | 
						|
        """Open the serial and send a sequence of values."""
 | 
						|
        self.start_bg_task(
 | 
						|
            target=self.send_data_sequence,
 | 
						|
            args=(1, ))
 | 
						|
 | 
						|
    def cb_send_bytes_multiple(self, key, value, timestamp):
 | 
						|
        """Open the serial and send a sequence of one byte values."""
 | 
						|
        chunk_size = RX_BUFF_SIZE * int(value)
 | 
						|
        self.start_bg_task(
 | 
						|
            target=self.send_data_sequence,
 | 
						|
            args=(chunk_size, ))
 | 
						|
 | 
						|
    def cb_loopback(self, key, value, timestamp):
 | 
						|
        """Open the serial and send a sequence of multibyte values."""
 | 
						|
        self.start_bg_task(target=self.loopback)
 | 
						|
 | 
						|
    def cb_change_line_coding(self, key, value, timestamp):
 | 
						|
        """Open the serial and change the line coding."""
 | 
						|
        self.start_bg_task(target=self.change_line_coding)
 |