USB generic tests

pull/9768/head
Maciej Bocianski 2018-03-13 16:18:20 +01:00 committed by Russ Butler
parent 7673ccd1c9
commit 33bef331e6
4 changed files with 1087 additions and 29 deletions

View File

@ -20,6 +20,7 @@ from mbed_host_tests import BaseHostTest
from argparse import ArgumentParser
import time
import sys
import inspect
from threading import Thread
import usb.core
@ -28,6 +29,11 @@ from usb.util import CTRL_OUT, CTRL_IN
from usb.util import CTRL_TYPE_STANDARD, CTRL_TYPE_CLASS, CTRL_TYPE_VENDOR
from usb.util import (CTRL_RECIPIENT_DEVICE, CTRL_RECIPIENT_INTERFACE,
CTRL_RECIPIENT_ENDPOINT, CTRL_RECIPIENT_OTHER)
from usb.util import (DESC_TYPE_DEVICE, DESC_TYPE_CONFIG, DESC_TYPE_STRING,
DESC_TYPE_INTERFACE, DESC_TYPE_ENDPOINT)
import struct
from collections import namedtuple
def get_interface(dev, interface, alternate=0):
intf = None
@ -45,22 +51,218 @@ VENDOR_TEST_CTRL_OUT_DELAY = 5
VENDOR_TEST_CTRL_NONE_DELAY = 6
VENDOR_TEST_CTRL_IN_STATUS_DELAY = 7
VENDOR_TEST_CTRL_OUT_STATUS_DELAY = 8
VENDOR_TEST_CTRL_IN_SIZES = 9
VENDOR_TEST_CTRL_OUT_SIZES = 10
VENDOR_TEST_UNSUPPORTED_REQUEST = 32
REQUEST_GET_STATUS = 0 # done
REQUEST_CLEAR_FEATURE = 1 # done
REQUEST_SET_FEATURE = 3 # done
REQUEST_SET_ADDRESS = 5 # ???
REQUEST_GET_DESCRIPTOR = 6
REQUEST_SET_DESCRIPTOR = 7 # done
REQUEST_GET_CONFIGURATION = 8 # done
REQUEST_SET_CONFIGURATION = 9 # done
REQUEST_GET_INTERFACE = 10 # done
REQUEST_SET_INTERFACE = 11 # done
REQUEST_SYNCH_FRAME = 12 # almost done
FEATURE_ENDPOINT_HALT = 0
FEATURE_DEVICE_REMOTE_WAKEUP = 1
DEVICE_QUALIFIER_DESC_SIZE = 10
DESC_TYPE_DEVICE_QUALIFIER = 0x06
DEVICE_DESC_SIZE = 18
device_descriptor_parser = struct.Struct('BBHBBBBHHHBBBB')
device_descriptor_keys = ['bLength', 'bDescriptorType', 'bcdUSB', 'bDeviceClass',
'bDeviceSubClass', 'bDeviceProtocol', 'bMaxPacketSize0',
'idVendor', 'idProduct', 'bcdDevice', 'iManufacturer',
'iProduct', 'iSerialNumber', 'bNumConfigurations']
CONFIGURATION_DESC_SIZE = 9
configuration_descriptor_parser = struct.Struct('BBHBBBBB')
configuration_descriptor_keys = ['bLength', 'bDescriptorType', 'wTotalLength',
'bNumInterfaces', 'bConfigurationValue',
'iConfiguration', 'bmAttributes', 'bMaxPower']
INTERFACE_DESC_SIZE = 9
interface_descriptor_parser = struct.Struct('BBBBBBBBB')
interface_descriptor_keys = ['bLength', 'bDescriptorType', 'bInterfaceNumber',
'bAlternateSetting', 'bNumEndpoints',
'bInterfaceClass', 'bInterfaceSubClass',
'bInterfaceProtocol', 'iInterface']
ENDPOINT_DESC_SIZE = 7
interface_descriptor_parser = struct.Struct('BBBBBHB')
interface_descriptor_keys = ['bLength', 'bDescriptorType', 'bEndpointAddress',
'bmAttributes', 'wMaxPacketSize', 'bInterval']
class PyusbBasicTest(BaseHostTest):
"""
"""
def _callback_usb_enumeration_done(self, key, value, timestamp):
print("Received key %s = %s" % (key, value))
self.log("Received key %s = %s" % (key, value))
test_device(value, self.log)
passed = True
results = "pass" if passed else "fail"
self.send_kv(results, "0")
def _callback_control_basic_test(self, key, value, timestamp):
serial_number, vendor_id, product_id = value.split(' ')
self.log("Received serial %s" % (serial_number))
self.log("Received vendor_id %s" % (vendor_id))
self.log("Received product_id %s" % (product_id))
dev = self.find_device(serial_number)
if(dev == None):
return
try:
control_basic_test(dev, vendor_id, product_id, log=print)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
def _callback_control_stall_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
control_stall_test(dev, log=print)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
def _callback_control_sizes_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
control_sizes_test(dev, log=print)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
def _callback_control_stress_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
control_stress_test(dev, log=print)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
def _callback_device_reset_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
self.device_reset_test.send(dev)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
def _callback_device_soft_reconnection_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
self.device_soft_reconnection_test.send(dev)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
def _callback_device_suspend_resume_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
self.device_suspend_resume_test.send(dev)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
def _callback_repeated_construction_destruction_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
self.repeated_construction_destruction_test.send(dev)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
def find_device(self, serial_number):
# to make it more reliable, 20 retries in 2[s]
for _ in range(20):
dev = usb.core.find(custom_match=TestMatch(serial_number))
if dev is not None:
break
time.sleep(0.1)
if dev is None:
self.log("Device not found")
self.send_kv("failed", "0")
return dev
def report_success(self):
self.send_kv("pass", "0")
def report_error(self, msg):
self.log('TEST FAILED: {}'.format(msg))
self.send_kv("failed", "0")
def setup(self):
self.__result = False
self.register_callback('usb_enumeration_done', self._callback_usb_enumeration_done)
self.device_reset_test = device_reset_test(log=print)
self.device_reset_test.send(None)
self.device_soft_reconnection_test = device_soft_reconnection_test(log=print)
self.device_soft_reconnection_test.send(None)
self.device_suspend_resume_test = device_suspend_resume_test(log=print)
self.device_suspend_resume_test.send(None)
self.repeated_construction_destruction_test = repeated_construction_destruction_test(log=print)
self.repeated_construction_destruction_test.send(None)
self.register_callback('control_basic_test', self._callback_control_basic_test)
self.register_callback('control_stall_test', self._callback_control_stall_test)
self.register_callback('control_sizes_test', self._callback_control_sizes_test)
self.register_callback('control_stress_test', self._callback_control_stress_test)
self.register_callback('device_reset_test', self._callback_device_reset_test)
self.register_callback('device_soft_reconnection_test', self._callback_device_soft_reconnection_test)
self.register_callback('device_suspend_resume_test', self._callback_device_suspend_resume_test)
self.register_callback('repeated_construction_destruction_test', self._callback_repeated_construction_destruction_test)
def result(self):
return self.__result
@ -81,6 +283,20 @@ class TestMatch(object):
return False
def lineno():
"""Returns the current line number in our program."""
return inspect.currentframe().f_back.f_lineno
def raise_if_different(expected, actual, line, text=''):
"""Raise a RuntimeError if actual is different than expected."""
if expected != actual:
raise RuntimeError('[{}]:{}, {} Got {!r}, expected {!r}'.format(__file__, line, text, actual, expected))
def raise_unconditionally(line, text=''):
"""Raise a RuntimeError unconditionally."""
raise RuntimeError('[{}]:{}, {}'.format(__file__, line, text))
def test_device(serial_number, log=print):
dev = usb.core.find(custom_match=TestMatch(serial_number))
if dev is None:
@ -123,7 +339,6 @@ def test_device(serial_number, log=print):
#-report throughput for in/out of control, bulk, interrupt and iso transfers
#-verify that construction/destruction repeatedly works gracefully
intf = get_interface(dev, 0, 0)
# Find endpoints
@ -205,7 +420,6 @@ def test_device(serial_number, log=print):
t.join()
return True
def write_data(pipe):
print("Write data running")
@ -215,8 +429,417 @@ def write_data(pipe):
count += 1
print("Count %s" % count)
time.sleep(0.5)
def control_basic_test(dev, vendor_id, product_id, log):
get_status_test(dev, log)
set_clear_feature_test(dev, log)
get_set_interface_test(dev, log)
get_set_configuration_test(dev, log)
get_descriptor_test(dev, vendor_id, product_id, log)
set_descriptor_test(dev, log)
#synch_frame_test(dev, log) wait for isochronous endpoint
def get_status_test(dev, log):
# Control IN GET_STATUS on DEVICE
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_DEVICE)
request = REQUEST_GET_STATUS
value = 0 # Always 0 for this request
index = 0 # 0 if recipient is device
length = 2 # Always 2 for this request (size of return data)
ret = dev.ctrl_transfer(request_type, request, value, index, length)
ret = ret[0] | (ret[1] << 8)
# Status bits
# ret == 0b01 (D0)Self Powered
# ret == 0b10 (D1)Remote Wakeup
# (D2 - D15 reserved) Must be set to 0
if(ret < 0 or ret > 3):
raise_unconditionally(lineno(), "GET_STATUS on DEVICE failed")
# Control IN GET_STATUS on INTERFACE
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_INTERFACE)
request = REQUEST_GET_STATUS
value = 0 # Always 0 for this request
index = 0 # interface index
length = 2 # Always 2 for this request (size of return data)
ret = dev.ctrl_transfer(request_type, request, value, index, length)
ret = ret[0] | (ret[1] << 8)
# Status bits
# ret == 0b0
# (D0 - D15 reserved) Must be set to 0
if(ret != 0):
raise_unconditionally(lineno(), "GET_STATUS on INTERFACE failed")
# Control IN GET_STATUS on ENDPOINT 0
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_ENDPOINT)
request = REQUEST_GET_STATUS
value = 0 # Always 0 for this request
index = 0 # endpoint index
length = 2 # Always 2 for this request (size of return data)
ret = dev.ctrl_transfer(request_type, request, value, index, length)
ret = ret[0] | (ret[1] << 8)
# Status bits
# ret == 0b1 (D0)endpoint Halt
# (D1 - D15 reserved) Must be set to 0
# endpoint 0 can't be halted ret == 0
if(ret != 0):
raise_unconditionally(lineno(), "GET_STATUS on ENDPOINT failed")
def set_clear_feature_test(dev, log):
# The state of the Direction bit is ignored if the wLength field is zero,
# signifying there is no Data stage - see USB spec 9.3.1
# according to this SET/CLEAR_FEATURE ignores direction bits and should
# work for both CTRL_OUT and CTRL_IN
# Control OUT SET_FEATURE on endpoint - halt
request_type = build_request_type(CTRL_OUT, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_ENDPOINT)
request = REQUEST_SET_FEATURE
value = FEATURE_ENDPOINT_HALT
index = 1 # Endpoint index
length = 0 # Always 0 for this request
try:
dev.ctrl_transfer(request_type, request, value, index, length)
except usb.core.USBError:
raise_unconditionally(lineno(), "endpoint halt failed")
# check if endpoint was halted
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_ENDPOINT)
request = REQUEST_GET_STATUS
value = 0 # Always 0 for this request
index = 1 # Endpoint index
length = 2 # Always 2 for this request (size of return data)
ret = dev.ctrl_transfer(request_type, request, value, index, length)
ret = ret[0] | (ret[1] << 8)
if(ret != 1):
raise_unconditionally(lineno(), "endpoint was not halted")
# Control OUT CLEAR_FEATURE on endpoint - unhalt
request_type = build_request_type(CTRL_OUT, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_ENDPOINT)
request = REQUEST_CLEAR_FEATURE
value = FEATURE_ENDPOINT_HALT
index = 1 # Endpoint index
length = 0 # Always 0 for this request
try:
dev.ctrl_transfer(request_type, request, value, index, length)
except usb.core.USBError:
raise_unconditionally(lineno(), "endpoint was not unhalted")
# check if endpoint was unhalted
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_ENDPOINT)
request = REQUEST_GET_STATUS
value = 0 # Always 0 for this request
index = 1 # Endpoint index
length = 2 # Always 2 for this request (size of return data)
ret = dev.ctrl_transfer(request_type, request, value, index, length)
ret = ret[0]
if(ret != 0):
raise_unconditionally(lineno(), "endpoint unhalthalt failed")
# retest for CTRL_IN
# Control IN SET_FEATURE on endpoint - halt
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_ENDPOINT)
request = REQUEST_SET_FEATURE
value = FEATURE_ENDPOINT_HALT
index = 1 # Endpoint index
length = 0 # Always 0 for this request
try:
dev.ctrl_transfer(request_type, request, value, index, length)
except usb.core.USBError:
raise_unconditionally(lineno(), "endpoint halt failed")
# check if endpoint was halted
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_ENDPOINT)
request = REQUEST_GET_STATUS
value = 0 # Always 0 for this request
index = 1 # Endpoint index
length = 2 # Always 2 for this request (size of return data)
ret = dev.ctrl_transfer(request_type, request, value, index, length)
ret = ret[0] | (ret[1] << 8)
if(ret != 1):
raise_unconditionally(lineno(), "endpoint was not halted")
# Control IN CLEAR_FEATURE on endpoint - unhalt
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_ENDPOINT)
request = REQUEST_CLEAR_FEATURE
value = FEATURE_ENDPOINT_HALT
index = 1 # Endpoint index
length = 0 # Always 0 for this request
try:
dev.ctrl_transfer(request_type, request, value, index, length)
except usb.core.USBError:
raise_unconditionally(lineno(), "endpoint was not unhalted")
# check if endpoint was unhalted
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_ENDPOINT)
request = REQUEST_GET_STATUS
value = 0 # Always 0 for this request
index = 1 # Endpoint index
length = 2 # Always 2 for this request (size of return data)
ret = dev.ctrl_transfer(request_type, request, value, index, length)
ret = ret[0]
if(ret != 0):
raise_unconditionally(lineno(), "endpoint unhalthalt failed")
def get_set_interface_test(dev, log):
# Control IN GET_INTERFACE
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_INTERFACE)
request = REQUEST_GET_INTERFACE
value = 0 # Always 0 for this request
index = 0 # Interface index
length = 1 # Always 1 for this request (size of return data)
try:
ret = dev.ctrl_transfer(request_type, request, value, index, length)
print("GET_INTERFACE ret: %d" % (ret[0]))
if(ret[0] != 0):
raise_unconditionally(lineno(), "Wrong interface was set expected: 0")
except usb.core.USBError:
raise_unconditionally(lineno(), "GET_INTERFACE failed")
# test control data transfer
control_data_test(dev, [64, 256], log)
# Control IN SET_INTERFACE
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_INTERFACE)
request = REQUEST_SET_INTERFACE
value = 1 # Alternative interface setting index
index = 0 # Interface index
length = 0 # Always 0 for this request
try:
dev.ctrl_transfer(request_type, request, value, index, length)
except usb.core.USBError:
raise_unconditionally(lineno(), "SET_INTERFACE failed")
# test control data transfer after alternative interface set
control_data_test(dev, [64, 256], log)
# Control IN GET_INTERFACE - check if alternative interface setting was set
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_INTERFACE)
request = REQUEST_GET_INTERFACE
value = 0 # Always 0 for this request
index = 0 # Interface index
length = 1 # Always 1 for this request (size of return data)
try:
ret = dev.ctrl_transfer(request_type, request, value, index, length)
if(ret[0] != 1):
raise_unconditionally(lineno(), "Alternative interface setting was not set properly")
except usb.core.USBError:
raise_unconditionally(lineno(), "GET_INTERFACE failed")
# Control IN SET_INTERFACE restore interfejs settings
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_INTERFACE)
request = REQUEST_SET_INTERFACE
value = 0 # Interface setting index
index = 0 # Interface index
length = 0 # Always 0 for this request
try:
dev.ctrl_transfer(request_type, request, value, index, length)
except usb.core.USBError:
raise_unconditionally(lineno(), "SET_INTERFACE request failed")
# test control data transfer after interface restoring
control_data_test(dev, [64, 256], log)
# Control IN GET_INTERFACE - check if alternative interface setting was restored properly
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_INTERFACE)
request = REQUEST_GET_INTERFACE
value = 0 # Always 0 for this request
index = 0 # Interface index
length = 1 # Always 1 for this request (size of return data)
try:
ret = dev.ctrl_transfer(request_type, request, value, index, length)
if(ret[0] != 0):
raise_unconditionally(lineno(), "Alternative interface setting was not restored properly")
except usb.core.USBError:
raise_unconditionally(lineno(), "GET_INTERFACE failed")
def get_set_configuration_test(dev, log):
# Set Configuration can also be used, with wValue set to 0, to deconfigure the device
# Control IN GET_CONFIGURATION
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_INTERFACE)
request = REQUEST_GET_CONFIGURATION
value = 0 # Always 0 for this request
index = 0 # Always 0 for this request
length = 1 # Always 1 for this request (size of return data)
try:
ret = dev.ctrl_transfer(request_type, request, value, index, length)
if(ret[0] != 1):
raise_unconditionally(lineno(), "Expected first configuration set")
except usb.core.USBError:
raise_unconditionally(lineno(), "GET_CONFIGURATION failed")
# test control data transfer
control_data_test(dev, [64, 256], log)
# Control OUT SET_CONFIGURATION 0 - deconfigure the device
request_type = build_request_type(CTRL_OUT, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_INTERFACE)
request = REQUEST_SET_CONFIGURATION
value = 0 # Configuration Value (0 - deconfigure the device)
index = 0 # Always 0 for this request
length = 0 # Always 0 for this request
try:
ret = dev.ctrl_transfer(request_type, request, value, index, length)
except usb.core.USBError:
raise_unconditionally(lineno(), "SET_CONFIGURATION failed")
# Control IN GET_CONFIGURATION - check if deconfigured
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_INTERFACE)
request = REQUEST_GET_CONFIGURATION
value = 0 # Always 0 for this request
index = 0 # Always 0 for this request
length = 1 # Always 1 for this request (size of return data)
try:
ret = dev.ctrl_transfer(request_type, request, value, index, length)
if(ret[0] != 0):
raise_unconditionally(lineno(), "Expected to be deconfigured")
except usb.core.USBError:
raise_unconditionally(lineno(), "GET_CONFIGURATION failed")
# Control OUT SET_CONFIGURATION 1 - restore first configuration
request_type = build_request_type(CTRL_OUT, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_INTERFACE)
request = REQUEST_SET_CONFIGURATION
value = 1 # Configuration Value
index = 0 # Always 0 for this request
length = 0 # Always 0 for this request
try:
ret = dev.ctrl_transfer(request_type, request, value, index, length)
except usb.core.USBError:
raise_unconditionally(lineno(), "SET_CONFIGURATION failed")
# test control data transfer after configured back
control_data_test(dev, [64, 256], log)
# Control IN GET_CONFIGURATION - check if configured back
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_INTERFACE)
request = REQUEST_GET_CONFIGURATION
value = 0 # Always 0 for this request
index = 0 # Always 0 for this request
length = 1 # Always 1 for this request (size of return data)
try:
ret = dev.ctrl_transfer(request_type, request, value, index, length)
if(ret[0] != 1):
raise_unconditionally(lineno(), "Expected to be deconfigured: 1")
except usb.core.USBError:
raise_unconditionally(lineno(), "GET_CONFIGURATION failed")
control_data_test(dev, [64, 256], log)
def get_descriptor_test(dev, vendor_id, product_id, log):
# Control IN GET_DESCRIPTOR - device
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_DEVICE)
request = REQUEST_GET_DESCRIPTOR
value = (DESC_TYPE_DEVICE << 8) | (0 << 0) # Descriptor Type (H) and Descriptor Index (L)
index = 0 # 0 or Language ID for this request
length = DEVICE_DESC_SIZE # Descriptor Length
try:
ret = dev.ctrl_transfer(request_type, request, value, index, length)
#print("### DEVICE_DESC ####################################################")
#dev_desc = dict(zip(device_descriptor_keys, device_descriptor_parser.unpack(ret)))
#for key in dev_desc:
# print("%s: %d" % (key, dev_desc[key]))
#assert vendor_id != dev_desc['idVendor']
#assert product_id != dev_desc['idProduct']
except usb.core.USBError:
raise_unconditionally(lineno(), "Requesting device descriptor failed")
# Control IN GET_DESCRIPTOR - configuration
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_DEVICE)
request = REQUEST_GET_DESCRIPTOR
value = (DESC_TYPE_CONFIG << 8) | (0 << 0) # Descriptor Type (H) and Descriptor Index (L)
index = 0 # 0 or Language ID for this request
length = CONFIGURATION_DESC_SIZE # Descriptor Length
try:
ret = dev.ctrl_transfer(request_type, request, value, index, length)
#print("### CONFIGURATION_DESC ####################################################")
#conf_desc = dict(zip(configuration_descriptor_keys, configuration_descriptor_parser.unpack(ret)))
#for key in conf_desc:
# print("%s: %d" % (key, conf_desc[key]))
#print("#######################################################")
except usb.core.USBError:
raise_unconditionally(lineno(), "Requesting configuration descriptor failed")
# Control IN GET_DESCRIPTOR - interface
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_DEVICE)
request = REQUEST_GET_DESCRIPTOR
value = (DESC_TYPE_INTERFACE << 8) | (0 << 0) # Descriptor Type (H) and Descriptor Index (L)
index = 0 # 0 or Language ID for this request
length = INTERFACE_DESC_SIZE # Descriptor Length
try:
ret = dev.ctrl_transfer(request_type, request, value, index, length)
raise_unconditionally(lineno(), "Requesting interface descriptor should fail since it is not directly accessible")
except usb.core.USBError:
log("interface descriptor is not directly accessible")
# Control IN GET_DESCRIPTOR - interface
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_DEVICE)
request = REQUEST_GET_DESCRIPTOR
value = (DESC_TYPE_ENDPOINT << 8) | (0 << 0) # Descriptor Type (H) and Descriptor Index (L)
index = 0 # 0 or Language ID for this request
length = INTERFACE_DESC_SIZE # Descriptor Length
try:
ret = dev.ctrl_transfer(request_type, request, value, index, length)
raise_unconditionally(lineno(), "Requesting endpoint descriptor should fail since it is not directly accessible")
except usb.core.USBError:
log("endpoint descriptor is not directly accessible")
def set_descriptor_test(dev, log):
# SET_DESCRIPTOR is optional and not implemented in Mbed
# command should fail with no action on device side
# Control OUT SET_DESCRIPTOR
request_type = build_request_type(CTRL_OUT, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_DEVICE)
request = REQUEST_SET_DESCRIPTOR
value = (DESC_TYPE_DEVICE << 8) | (0 << 0) # Descriptor Type (H) and Descriptor Index (L)
index = 0 # 0 or Language ID for this request
data = bytearray(DEVICE_DESC_SIZE) # Descriptor data
try:
dev.ctrl_transfer(request_type, request, value, index, data)
raise_unconditionally(lineno(), "SET_DESCRIPTOR should fail since it is not implemented")
except usb.core.USBError:
log("SET_DESCRIPTOR is unsupported")
def synch_frame_test(dev, log):
# only for isochronous endpoints
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_ENDPOINT)
request = REQUEST_SYNCH_FRAME
value = 0 # Always 0 for this request
index = 1 # Endpoint index
length = 2 # Always 2 for this request (size of return data)
try:
ret = dev.ctrl_transfer(request_type, request, value, index, length)
ret = ret[0] | (ret[1] << 8)
log("synch frame ret: %d" % (ret))
except usb.core.USBError:
raise_unconditionally(lineno(), "SYNCH_FRAME failed")
def control_stall_test(dev, log):
# Control OUT stall
@ -228,7 +851,7 @@ def control_stall_test(dev, log):
index = 0 # Communication interface
data = bytearray(64) # Dummy data
dev.ctrl_transfer(request_type, request, value, index, data, 5000)
raise Exception("Invalid request not stalled")
raise_unconditionally(lineno(), "Invalid request not stalled")
except usb.core.USBError:
log("Invalid request stalled")
@ -241,7 +864,7 @@ def control_stall_test(dev, log):
index = 0 # Communication interface
length = 0
dev.ctrl_transfer(request_type, request, value, index, length, 5000)
raise Exception("Invalid request not stalled")
raise_unconditionally(lineno(), "Invalid request not stalled")
except usb.core.USBError:
log("Invalid request stalled")
@ -254,7 +877,7 @@ def control_stall_test(dev, log):
index = 0 # Communication interface
length = 0
dev.ctrl_transfer(request_type, request, value, index, length, 5000)
raise Exception("Invalid request not stalled")
raise_unconditionally(lineno(), "Invalid request not stalled")
except usb.core.USBError:
log("Invalid request stalled")
@ -267,11 +890,11 @@ def control_stall_test(dev, log):
index = 0 # Communication interface
length = 255
dev.ctrl_transfer(request_type, request, value, index, length, 5000)
raise Exception("Invalid request not stalled")
raise_unconditionally(lineno(), "Invalid request not stalled")
except usb.core.USBError:
log("Invalid request stalled")
for i in (6, 7, 5):
for i in (3, 4, 5):
try:
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_DEVICE)
@ -280,11 +903,66 @@ def control_stall_test(dev, log):
index = 0 # Communication interface
length = 255
resp = dev.ctrl_transfer(request_type, request, value, index, length, 5000)
log("Requesting string %s passed" % i)
except usb.core.USBError:
raise_unconditionally(lineno(), "Requesting string failed i: " + str(i))
for i in (6, 7):
try:
request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD,
CTRL_RECIPIENT_DEVICE)
request = 0x6 # GET_DESCRIPTOR
value = (0x03 << 8) | (i << 0) # String descriptor index
index = 0 # Communication interface
length = 255
resp = dev.ctrl_transfer(request_type, request, value, index, length, 5000)
raise_unconditionally(lineno(), "Requesting string passed i: " + str(i))
except usb.core.USBError:
log("Requesting string %s failed" % i)
def control_sizes_test(dev, log):
list = [1, 2, 3, 7, 8, 9, 15, 16, 17, 31, 32, 33, 63, 64, 65, 127, 128, 129, 255, 256, 257, 511, 512, 513, 1023, 1024, 1025, 2047, 2048]
control_data_test(dev, list, log)
def control_data_test(dev, sizes_list, log):
# Test control requests of various data stage sizes (1,8,16,32,64,255,256,...)
count = 1
for i in sizes_list:
request_type = build_request_type(CTRL_OUT, CTRL_TYPE_VENDOR,
CTRL_RECIPIENT_DEVICE)
request = VENDOR_TEST_CTRL_OUT_SIZES
value = i # Size of data the device should actually read
index = 0 # Unused - set for debugging only
data = bytearray(i) # Dummy data
if i == 1:
data[0] = count
else:
data[0] = count - 1
data[i - 1] = count + 1
try:
dev.ctrl_transfer(request_type, request, value, index, data, 5000)
except usb.core.USBError:
raise_unconditionally(lineno(), "VENDOR_TEST_CTRL_OUT_SIZES failed ")
request_type = build_request_type(CTRL_IN, CTRL_TYPE_VENDOR,
CTRL_RECIPIENT_DEVICE)
request = VENDOR_TEST_CTRL_IN_SIZES
value = 0 # Size of data the device should actually send
index = 0 # Unused - set for debugging only
length = i
try:
ret = dev.ctrl_transfer(request_type, request, value, index, length, 5000)
if i == 1:
raise_if_different(count, ret[0], lineno(), "send/receive data not match")
else:
raise_if_different(count - 1, ret[0], lineno(), "send/receive data not match")
raise_if_different(count + 1, ret[i - 1], lineno(), "send/receive data not match")
except usb.core.USBError:
raise_unconditionally(lineno(), "VENDOR_TEST_CTRL_IN_SIZES failed")
count += 1
def control_stress_test(dev, log):
# Test various patterns of control transfers
@ -337,6 +1015,65 @@ def control_stress_test(dev, log):
count += 1
def device_reset_test(log):
dev = yield
dev.reset();
dev = yield
dev.reset();
dev = yield
dev.reset();
dev = yield
# run other test to check if USB works fine after reset
control_data_test(dev, [64, 256], log)
yield
def device_soft_reconnection_test(log):
list = [64, 256]
dev = yield
# run other test to check if USB works fine before reconnection
control_data_test(dev, list, log)
dev = yield
# run other test to check if USB works fine after reconnection
control_data_test(dev, list, log)
dev = yield
# run other test to check if USB works fine after reconnection
control_data_test(dev, list, log)
dev = yield
# run other test to check if USB works fine after reconnection
control_data_test(dev, list, log)
dev = yield
# run other test to check if USB works fine after reconnection
control_data_test(dev, list, log)
yield
def device_suspend_resume_test(log):
dev = yield
time.sleep(0.1)
control_data_test(dev, [64, 256], log)
time.sleep(0.1)
control_data_test(dev, [64, 256], log)
time.sleep(0.1)
control_data_test(dev, [64, 256], log)
time.sleep(0.1)
control_data_test(dev, [64, 256], log)
time.sleep(0.1)
yield
def repeated_construction_destruction_test(log):
# run other test to check if USB works fine after repeated construction/destruction
list = [64, 256]
dev = yield
control_data_test(dev, list, log)
dev = yield
control_data_test(dev, list, log)
dev = yield
control_data_test(dev, list, log)
yield
def main():
parser = ArgumentParser(description="USB basic test")
parser.add_argument('serial', help='USB serial number of DUT')

View File

@ -30,12 +30,15 @@
#define VENDOR_TEST_CTRL_NONE_DELAY 6
#define VENDOR_TEST_CTRL_IN_STATUS_DELAY 7
#define VENDOR_TEST_CTRL_OUT_STATUS_DELAY 8
#define VENDOR_TEST_CTRL_IN_SIZES 9
#define VENDOR_TEST_CTRL_OUT_SIZES 10
#define MAX_EP_SIZE 64
#define MIN_EP_SIZE 8
USBTester::USBTester(uint16_t vendor_id, uint16_t product_id, uint16_t product_release, bool connect_blocking): USBDevice(vendor_id, product_id, product_release)
USBTester::USBTester(uint16_t vendor_id, uint16_t product_id, uint16_t product_release, bool connect_blocking): USBDevice(vendor_id, product_id, product_release),
reset_count(0), suspend_count(0), resume_count(0)
{
EndpointResolver resolver(endpoint_table());
@ -60,6 +63,46 @@ USBTester::~USBTester()
deinit();
}
const char *USBTester::get_desc_string(const uint8_t *desc)
{
static char ret_string[128] = {};
const uint8_t desc_size = desc[0] - 2;
const uint8_t *desc_str = &desc[2];
uint32_t j = 0;
for(uint32_t i = 0; i < desc_size; i+=2, j++) {
ret_string[j] = desc_str[i];
}
ret_string[j] = '\0';
return ret_string;
}
void USBTester::suspend(bool suspended)
{
if(suspended) {
++suspend_count;
} else {
++resume_count;
}
}
const char *USBTester::get_serial_desc_string()
{
return get_desc_string(string_iserial_desc());
}
const char *USBTester::get_iinterface_desc_string()
{
return get_desc_string(string_iserial_desc());
}
const char *USBTester::get_iproduct_desc_string()
{
return get_desc_string(string_iserial_desc());
}
void USBTester::callback_state_change(DeviceState new_state)
{
// Nothing to do
@ -93,6 +136,16 @@ void USBTester::callback_request(const setup_packet_t *setup)
result = Success;
delay = 2000;
break;
case VENDOR_TEST_CTRL_IN_SIZES:
result = Send;
data = ctrl_buf;
size = setup->wLength;
break;
case VENDOR_TEST_CTRL_OUT_SIZES:
result = Receive;
data = ctrl_buf;
size = setup->wValue;
break;
default:
result = PassThrough;
break;
@ -124,6 +177,12 @@ void USBTester::callback_request_xfer_done(const setup_packet_t *setup, bool abo
case VENDOR_TEST_CTRL_OUT:
result = true;
break;
case VENDOR_TEST_CTRL_OUT_SIZES:
result = true;
break;
case VENDOR_TEST_CTRL_IN_SIZES:
result = true;
break;
default:
result = false;
break;

View File

@ -40,6 +40,24 @@ public:
~USBTester();
/*
*
* @returns descriptor string in ASCII
*/
const char *get_serial_desc_string();
const char *get_iproduct_desc_string();
const char *get_iinterface_desc_string();
uint32_t get_reset_count() const { return reset_count; }
uint32_t get_suspend_count() const { return suspend_count; }
uint32_t get_resume_count() const { return resume_count; }
void clear_reset_count() { reset_count = 0; }
void clear_suspend_count() { suspend_count = 0; }
void clear_resume_count() { resume_count = 0; }
private:
const char *get_desc_string(const uint8_t *desc);
virtual void suspend(bool suspended);
protected:
/*
@ -78,6 +96,9 @@ protected:
uint8_t int_out;
uint8_t int_buf[64];
EventQueue *queue;
volatile uint32_t reset_count;
volatile uint32_t suspend_count;
volatile uint32_t resume_count;
virtual void callback_state_change(DeviceState new_state);
virtual void callback_request(const setup_packet_t *setup);
@ -86,6 +107,7 @@ protected:
virtual void callback_set_interface(uint16_t interface, uint8_t alternate);
virtual void epbulk_out_callback(usb_ep_t endpoint);
virtual void epint_out_callback(usb_ep_t endpoint);
virtual void callback_reset() { ++reset_count; }
uint8_t ctrl_buf[2048];
};

View File

@ -29,23 +29,263 @@
using namespace utest::v1;
// Echo server (echo payload to host)
void test_case_basic()
void control_basic_test()
{
uint16_t vendor_id = 0x0d28;
uint16_t product_id = 0x0205;
uint16_t product_release = 0x0001;
char _key[11] = {};
char _value[128] = {};
char str[128] = {};
{
USBTester serial(vendor_id, product_id, product_release, true);
sprintf (str, "%s %d %d", serial.get_serial_desc_string(), vendor_id, product_id);
greentea_send_kv("control_basic_test", str);
// Wait for host before terminating
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("pass", _key);
}
}
void control_stall_test()
{
uint16_t vendor_id = 0x0d28;
uint16_t product_id = 0x0205;
uint16_t product_release = 0x0001;
char _key[11] = {};
char _value[128] = {};
{
USBTester serial(0x0d28, 0x0205, 0x0001, true);
greentea_send_kv("usb_enumeration_done", "0123456789");
USBTester serial(vendor_id, product_id, product_release, true);
greentea_send_kv("control_stall_test", serial.get_serial_desc_string());
// Wait for host before terminating
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("pass", _key);
}
}
void control_sizes_test()
{
uint16_t vendor_id = 0x0d28;
uint16_t product_id = 0x0205;
uint16_t product_release = 0x0001;
char _key[11] = {};
char _value[128] = {};
{
USBTester serial(vendor_id, product_id, product_release, true);
greentea_send_kv("control_sizes_test", serial.get_serial_desc_string());
// Wait for host before terminating
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("pass", _key);
}
}
void control_stress_test()
{
uint16_t vendor_id = 0x0d28;
uint16_t product_id = 0x0205;
uint16_t product_release = 0x0001;
char _key[11] = {};
char _value[128] = {};
{
USBTester serial(vendor_id, product_id, product_release, true);
greentea_send_kv("control_stress_test", serial.get_serial_desc_string());
// Wait for host before terminating
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("pass", _key);
}
}
void device_reset_test()
{
uint16_t vendor_id = 0x0d28;
uint16_t product_id = 0x0205;
uint16_t product_release = 0x0001;
char _key[11] = {};
char _value[128] = {};
{
USBTester serial(vendor_id, product_id, product_release, true);
greentea_send_kv("device_reset_test", serial.get_serial_desc_string());
serial.clear_reset_count();
// Wait for host before terminating
while(serial.get_reset_count() == 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("pass", _key);
while(!serial.configured());
greentea_send_kv("device_reset_test", serial.get_serial_desc_string());
serial.clear_reset_count();
// Wait for host before terminating
while(serial.get_reset_count() == 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("pass", _key);
while(!serial.configured());
greentea_send_kv("device_reset_test", serial.get_serial_desc_string());
serial.clear_reset_count();
// Wait for host before terminating
while(serial.get_reset_count() == 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("pass", _key);
while(!serial.configured());
greentea_send_kv("device_reset_test", serial.get_serial_desc_string());
// Wait for host before terminating
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("pass", _key);
}
}
void device_soft_reconnection_test()
{
uint16_t vendor_id = 0x0d28;
uint16_t product_id = 0x0205;
uint16_t product_release = 0x0001;
char _key[11] = {};
char _value[128] = {};
const uint32_t reconnect_try_count = 3;
{
USBTester serial(vendor_id, product_id, product_release, true);
greentea_send_kv("device_soft_reconnection_test", serial.get_serial_desc_string());
// Wait for host before terminating
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("pass", _key);
for(int i = 0; i < reconnect_try_count; i++) {
serial.disconnect();
// If disconnect() + connect() occur too fast the reset event will be dropped.
// At a minimum there should be a 200us delay between disconnect and connect.
// To be on the safe side I would recommend a 1ms delay, so the host controller
// has an entire USB frame to detect the disconnect.
wait_ms(1);
serial.connect();
greentea_send_kv("device_soft_reconnection_test", serial.get_serial_desc_string());
// Wait for host before terminating
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("pass", _key);
}
serial.disconnect();
wait_ms(1);
serial.connect();
serial.disconnect();
wait_ms(1);
serial.connect();
serial.disconnect();
wait_ms(1);
serial.connect();
greentea_send_kv("device_soft_reconnection_test", serial.get_serial_desc_string());
// Wait for host before terminating
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("pass", _key);
}
}
void device_suspend_resume_test()
{
uint16_t vendor_id = 0x0d28;
uint16_t product_id = 0x0205;
uint16_t product_release = 0x0001;
char _key[11] = {};
char _value[128] = {};
{
USBTester serial(vendor_id, product_id, product_release, true);
greentea_send_kv("device_suspend_resume_test", serial.get_serial_desc_string());
printf("[1] suspend_count: %d resume_count: %d\n", serial.get_suspend_count(), serial.get_resume_count());
serial.clear_suspend_count();
serial.clear_resume_count();
// Wait for host before terminating
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
printf("[2] suspend_count: %d resume_count: %d\n", serial.get_suspend_count(), serial.get_resume_count());
TEST_ASSERT_EQUAL_STRING("pass", _key);
wait_ms(5000);
printf("[3] suspend_count: %d resume_count: %d\n", serial.get_suspend_count(), serial.get_resume_count());
}
}
void repeated_construction_destruction_test()
{
uint16_t vendor_id = 0x0d28;
uint16_t product_id = 0x0205;
uint16_t product_release = 0x0001;
char _key[11] = {};
char _value[128] = {};
{
USBTester serial(vendor_id, product_id, product_release, true);
TEST_ASSERT_EQUAL(true, serial.configured());
wait_ms(1);
}
wait_ms(1);
{
USBTester serial(vendor_id, product_id, product_release, true);
TEST_ASSERT_EQUAL(true, serial.configured());
wait_ms(1);
}
wait_ms(1);
{
USBTester serial(vendor_id, product_id, product_release, true);
TEST_ASSERT_EQUAL(true, serial.configured());
wait_ms(1);
}
wait_ms(1);
{
USBTester serial(vendor_id, product_id, product_release, true);
TEST_ASSERT_EQUAL(true, serial.configured());
wait_ms(1);
greentea_send_kv("repeated_construction_destruction_test", serial.get_serial_desc_string());
// Wait for host before terminating
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("pass", _key);
}
wait_ms(1);
{
USBTester serial(vendor_id, product_id, product_release, true);
TEST_ASSERT_EQUAL(true, serial.configured());
wait_ms(1);
greentea_send_kv("repeated_construction_destruction_test", serial.get_serial_desc_string());
// Wait for host before terminating
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("pass", _key);
}
wait_ms(1);
{
USBTester serial(vendor_id, product_id, product_release, true);
TEST_ASSERT_EQUAL(true, serial.configured());
wait_ms(1);
greentea_send_kv("repeated_construction_destruction_test", serial.get_serial_desc_string());
// Wait for host before terminating
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("pass", _key);
}
}
Case cases[] = {
Case("pyusb basic test", test_case_basic),
Case("usb control basic test", control_basic_test),
Case("usb control stall test", control_stall_test),
Case("usb control sizes test", control_sizes_test),
Case("usb control stress test", control_stress_test),
Case("usb device reset test", device_reset_test),
Case("usb soft reconnection test", device_soft_reconnection_test),
Case("usb device suspend/resume test", device_suspend_resume_test),
Case("usb repeated construction destruction test", repeated_construction_destruction_test)
};
utest::v1::status_t greentea_test_setup(const size_t number_of_cases)