diff --git a/TESTS/host_tests/pyusb_basic.py b/TESTS/host_tests/pyusb_basic.py index 24c7e95145..ae6c164a32 100644 --- a/TESTS/host_tests/pyusb_basic.py +++ b/TESTS/host_tests/pyusb_basic.py @@ -20,6 +20,7 @@ from mbed_host_tests import BaseHostTest from argparse import ArgumentParser import time import sys +import inspect from threading import Thread import usb.core @@ -28,6 +29,11 @@ from usb.util import CTRL_OUT, CTRL_IN from usb.util import CTRL_TYPE_STANDARD, CTRL_TYPE_CLASS, CTRL_TYPE_VENDOR from usb.util import (CTRL_RECIPIENT_DEVICE, CTRL_RECIPIENT_INTERFACE, CTRL_RECIPIENT_ENDPOINT, CTRL_RECIPIENT_OTHER) +from usb.util import (DESC_TYPE_DEVICE, DESC_TYPE_CONFIG, DESC_TYPE_STRING, + DESC_TYPE_INTERFACE, DESC_TYPE_ENDPOINT) + +import struct +from collections import namedtuple def get_interface(dev, interface, alternate=0): intf = None @@ -45,22 +51,218 @@ VENDOR_TEST_CTRL_OUT_DELAY = 5 VENDOR_TEST_CTRL_NONE_DELAY = 6 VENDOR_TEST_CTRL_IN_STATUS_DELAY = 7 VENDOR_TEST_CTRL_OUT_STATUS_DELAY = 8 +VENDOR_TEST_CTRL_IN_SIZES = 9 +VENDOR_TEST_CTRL_OUT_SIZES = 10 VENDOR_TEST_UNSUPPORTED_REQUEST = 32 +REQUEST_GET_STATUS = 0 # done +REQUEST_CLEAR_FEATURE = 1 # done +REQUEST_SET_FEATURE = 3 # done +REQUEST_SET_ADDRESS = 5 # ??? +REQUEST_GET_DESCRIPTOR = 6 +REQUEST_SET_DESCRIPTOR = 7 # done +REQUEST_GET_CONFIGURATION = 8 # done +REQUEST_SET_CONFIGURATION = 9 # done +REQUEST_GET_INTERFACE = 10 # done +REQUEST_SET_INTERFACE = 11 # done +REQUEST_SYNCH_FRAME = 12 # almost done + +FEATURE_ENDPOINT_HALT = 0 +FEATURE_DEVICE_REMOTE_WAKEUP = 1 + + + +DEVICE_QUALIFIER_DESC_SIZE = 10 + +DESC_TYPE_DEVICE_QUALIFIER = 0x06 + + +DEVICE_DESC_SIZE = 18 +device_descriptor_parser = struct.Struct('BBHBBBBHHHBBBB') +device_descriptor_keys = ['bLength', 'bDescriptorType', 'bcdUSB', 'bDeviceClass', + 'bDeviceSubClass', 'bDeviceProtocol', 'bMaxPacketSize0', + 'idVendor', 'idProduct', 'bcdDevice', 'iManufacturer', + 'iProduct', 'iSerialNumber', 'bNumConfigurations'] + +CONFIGURATION_DESC_SIZE = 9 +configuration_descriptor_parser = struct.Struct('BBHBBBBB') +configuration_descriptor_keys = ['bLength', 'bDescriptorType', 'wTotalLength', + 'bNumInterfaces', 'bConfigurationValue', + 'iConfiguration', 'bmAttributes', 'bMaxPower'] + +INTERFACE_DESC_SIZE = 9 +interface_descriptor_parser = struct.Struct('BBBBBBBBB') +interface_descriptor_keys = ['bLength', 'bDescriptorType', 'bInterfaceNumber', + 'bAlternateSetting', 'bNumEndpoints', + 'bInterfaceClass', 'bInterfaceSubClass', + 'bInterfaceProtocol', 'iInterface'] + +ENDPOINT_DESC_SIZE = 7 +interface_descriptor_parser = struct.Struct('BBBBBHB') +interface_descriptor_keys = ['bLength', 'bDescriptorType', 'bEndpointAddress', + 'bmAttributes', 'wMaxPacketSize', 'bInterval'] + class PyusbBasicTest(BaseHostTest): - """ - """ - def _callback_usb_enumeration_done(self, key, value, timestamp): - print("Received key %s = %s" % (key, value)) - self.log("Received key %s = %s" % (key, value)) - test_device(value, self.log) - passed = True - results = "pass" if passed else "fail" - self.send_kv(results, "0") + + def _callback_control_basic_test(self, key, value, timestamp): + serial_number, vendor_id, product_id = value.split(' ') + self.log("Received serial %s" % (serial_number)) + self.log("Received vendor_id %s" % (vendor_id)) + self.log("Received product_id %s" % (product_id)) + + dev = self.find_device(serial_number) + if(dev == None): + return + + try: + control_basic_test(dev, vendor_id, product_id, log=print) + self.report_success() + except (RuntimeError) as exc: + self.report_error(exc) + + + def _callback_control_stall_test(self, key, value, timestamp): + self.log("Received serial %s" % (value)) + + dev = self.find_device(value) + if(dev == None): + return + + try: + control_stall_test(dev, log=print) + self.report_success() + except (RuntimeError) as exc: + self.report_error(exc) + + + def _callback_control_sizes_test(self, key, value, timestamp): + self.log("Received serial %s" % (value)) + + dev = self.find_device(value) + if(dev == None): + return + + try: + control_sizes_test(dev, log=print) + self.report_success() + except (RuntimeError) as exc: + self.report_error(exc) + + + def _callback_control_stress_test(self, key, value, timestamp): + self.log("Received serial %s" % (value)) + + dev = self.find_device(value) + if(dev == None): + return + + try: + control_stress_test(dev, log=print) + self.report_success() + except (RuntimeError) as exc: + self.report_error(exc) + + + def _callback_device_reset_test(self, key, value, timestamp): + self.log("Received serial %s" % (value)) + + dev = self.find_device(value) + if(dev == None): + return + + try: + self.device_reset_test.send(dev) + self.report_success() + except (RuntimeError) as exc: + self.report_error(exc) + + def _callback_device_soft_reconnection_test(self, key, value, timestamp): + self.log("Received serial %s" % (value)) + + dev = self.find_device(value) + if(dev == None): + return + + try: + self.device_soft_reconnection_test.send(dev) + self.report_success() + except (RuntimeError) as exc: + self.report_error(exc) + + + def _callback_device_suspend_resume_test(self, key, value, timestamp): + self.log("Received serial %s" % (value)) + + dev = self.find_device(value) + if(dev == None): + return + + try: + self.device_suspend_resume_test.send(dev) + self.report_success() + except (RuntimeError) as exc: + self.report_error(exc) + + + def _callback_repeated_construction_destruction_test(self, key, value, timestamp): + self.log("Received serial %s" % (value)) + + dev = self.find_device(value) + if(dev == None): + return + + try: + self.repeated_construction_destruction_test.send(dev) + self.report_success() + except (RuntimeError) as exc: + self.report_error(exc) + + + def find_device(self, serial_number): + # to make it more reliable, 20 retries in 2[s] + for _ in range(20): + dev = usb.core.find(custom_match=TestMatch(serial_number)) + if dev is not None: + break + time.sleep(0.1) + + if dev is None: + self.log("Device not found") + self.send_kv("failed", "0") + return dev + + + def report_success(self): + self.send_kv("pass", "0") + + def report_error(self, msg): + self.log('TEST FAILED: {}'.format(msg)) + self.send_kv("failed", "0") def setup(self): self.__result = False - self.register_callback('usb_enumeration_done', self._callback_usb_enumeration_done) + + self.device_reset_test = device_reset_test(log=print) + self.device_reset_test.send(None) + + self.device_soft_reconnection_test = device_soft_reconnection_test(log=print) + self.device_soft_reconnection_test.send(None) + + self.device_suspend_resume_test = device_suspend_resume_test(log=print) + self.device_suspend_resume_test.send(None) + + self.repeated_construction_destruction_test = repeated_construction_destruction_test(log=print) + self.repeated_construction_destruction_test.send(None) + + self.register_callback('control_basic_test', self._callback_control_basic_test) + self.register_callback('control_stall_test', self._callback_control_stall_test) + self.register_callback('control_sizes_test', self._callback_control_sizes_test) + self.register_callback('control_stress_test', self._callback_control_stress_test) + self.register_callback('device_reset_test', self._callback_device_reset_test) + self.register_callback('device_soft_reconnection_test', self._callback_device_soft_reconnection_test) + self.register_callback('device_suspend_resume_test', self._callback_device_suspend_resume_test) + self.register_callback('repeated_construction_destruction_test', self._callback_repeated_construction_destruction_test) + def result(self): return self.__result @@ -81,6 +283,20 @@ class TestMatch(object): return False +def lineno(): + """Returns the current line number in our program.""" + return inspect.currentframe().f_back.f_lineno + +def raise_if_different(expected, actual, line, text=''): + """Raise a RuntimeError if actual is different than expected.""" + if expected != actual: + raise RuntimeError('[{}]:{}, {} Got {!r}, expected {!r}'.format(__file__, line, text, actual, expected)) + +def raise_unconditionally(line, text=''): + """Raise a RuntimeError unconditionally.""" + raise RuntimeError('[{}]:{}, {}'.format(__file__, line, text)) + + def test_device(serial_number, log=print): dev = usb.core.find(custom_match=TestMatch(serial_number)) if dev is None: @@ -123,7 +339,6 @@ def test_device(serial_number, log=print): #-report throughput for in/out of control, bulk, interrupt and iso transfers #-verify that construction/destruction repeatedly works gracefully - intf = get_interface(dev, 0, 0) # Find endpoints @@ -205,7 +420,6 @@ def test_device(serial_number, log=print): t.join() - return True def write_data(pipe): print("Write data running") @@ -215,8 +429,417 @@ def write_data(pipe): count += 1 print("Count %s" % count) time.sleep(0.5) - - + + +def control_basic_test(dev, vendor_id, product_id, log): + get_status_test(dev, log) + set_clear_feature_test(dev, log) + get_set_interface_test(dev, log) + get_set_configuration_test(dev, log) + get_descriptor_test(dev, vendor_id, product_id, log) + set_descriptor_test(dev, log) + #synch_frame_test(dev, log) wait for isochronous endpoint + + +def get_status_test(dev, log): + # Control IN GET_STATUS on DEVICE + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_DEVICE) + request = REQUEST_GET_STATUS + value = 0 # Always 0 for this request + index = 0 # 0 if recipient is device + length = 2 # Always 2 for this request (size of return data) + ret = dev.ctrl_transfer(request_type, request, value, index, length) + ret = ret[0] | (ret[1] << 8) + # Status bits + # ret == 0b01 (D0)Self Powered + # ret == 0b10 (D1)Remote Wakeup + # (D2 - D15 reserved) Must be set to 0 + if(ret < 0 or ret > 3): + raise_unconditionally(lineno(), "GET_STATUS on DEVICE failed") + + # Control IN GET_STATUS on INTERFACE + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_INTERFACE) + request = REQUEST_GET_STATUS + value = 0 # Always 0 for this request + index = 0 # interface index + length = 2 # Always 2 for this request (size of return data) + ret = dev.ctrl_transfer(request_type, request, value, index, length) + ret = ret[0] | (ret[1] << 8) + # Status bits + # ret == 0b0 + # (D0 - D15 reserved) Must be set to 0 + if(ret != 0): + raise_unconditionally(lineno(), "GET_STATUS on INTERFACE failed") + + # Control IN GET_STATUS on ENDPOINT 0 + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_ENDPOINT) + request = REQUEST_GET_STATUS + value = 0 # Always 0 for this request + index = 0 # endpoint index + length = 2 # Always 2 for this request (size of return data) + ret = dev.ctrl_transfer(request_type, request, value, index, length) + ret = ret[0] | (ret[1] << 8) + # Status bits + # ret == 0b1 (D0)endpoint Halt + # (D1 - D15 reserved) Must be set to 0 + # endpoint 0 can't be halted ret == 0 + if(ret != 0): + raise_unconditionally(lineno(), "GET_STATUS on ENDPOINT failed") + + +def set_clear_feature_test(dev, log): + # The state of the Direction bit is ignored if the wLength field is zero, + # signifying there is no Data stage - see USB spec 9.3.1 + # according to this SET/CLEAR_FEATURE ignores direction bits and should + # work for both CTRL_OUT and CTRL_IN + + # Control OUT SET_FEATURE on endpoint - halt + request_type = build_request_type(CTRL_OUT, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_ENDPOINT) + request = REQUEST_SET_FEATURE + value = FEATURE_ENDPOINT_HALT + index = 1 # Endpoint index + length = 0 # Always 0 for this request + try: + dev.ctrl_transfer(request_type, request, value, index, length) + except usb.core.USBError: + raise_unconditionally(lineno(), "endpoint halt failed") + + # check if endpoint was halted + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_ENDPOINT) + request = REQUEST_GET_STATUS + value = 0 # Always 0 for this request + index = 1 # Endpoint index + length = 2 # Always 2 for this request (size of return data) + ret = dev.ctrl_transfer(request_type, request, value, index, length) + ret = ret[0] | (ret[1] << 8) + if(ret != 1): + raise_unconditionally(lineno(), "endpoint was not halted") + + # Control OUT CLEAR_FEATURE on endpoint - unhalt + request_type = build_request_type(CTRL_OUT, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_ENDPOINT) + request = REQUEST_CLEAR_FEATURE + value = FEATURE_ENDPOINT_HALT + index = 1 # Endpoint index + length = 0 # Always 0 for this request + try: + dev.ctrl_transfer(request_type, request, value, index, length) + except usb.core.USBError: + raise_unconditionally(lineno(), "endpoint was not unhalted") + + # check if endpoint was unhalted + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_ENDPOINT) + request = REQUEST_GET_STATUS + value = 0 # Always 0 for this request + index = 1 # Endpoint index + length = 2 # Always 2 for this request (size of return data) + ret = dev.ctrl_transfer(request_type, request, value, index, length) + ret = ret[0] + if(ret != 0): + raise_unconditionally(lineno(), "endpoint unhalthalt failed") + + # retest for CTRL_IN + # Control IN SET_FEATURE on endpoint - halt + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_ENDPOINT) + request = REQUEST_SET_FEATURE + value = FEATURE_ENDPOINT_HALT + index = 1 # Endpoint index + length = 0 # Always 0 for this request + try: + dev.ctrl_transfer(request_type, request, value, index, length) + except usb.core.USBError: + raise_unconditionally(lineno(), "endpoint halt failed") + + # check if endpoint was halted + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_ENDPOINT) + request = REQUEST_GET_STATUS + value = 0 # Always 0 for this request + index = 1 # Endpoint index + length = 2 # Always 2 for this request (size of return data) + ret = dev.ctrl_transfer(request_type, request, value, index, length) + ret = ret[0] | (ret[1] << 8) + if(ret != 1): + raise_unconditionally(lineno(), "endpoint was not halted") + + # Control IN CLEAR_FEATURE on endpoint - unhalt + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_ENDPOINT) + request = REQUEST_CLEAR_FEATURE + value = FEATURE_ENDPOINT_HALT + index = 1 # Endpoint index + length = 0 # Always 0 for this request + try: + dev.ctrl_transfer(request_type, request, value, index, length) + except usb.core.USBError: + raise_unconditionally(lineno(), "endpoint was not unhalted") + + # check if endpoint was unhalted + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_ENDPOINT) + request = REQUEST_GET_STATUS + value = 0 # Always 0 for this request + index = 1 # Endpoint index + length = 2 # Always 2 for this request (size of return data) + ret = dev.ctrl_transfer(request_type, request, value, index, length) + ret = ret[0] + if(ret != 0): + raise_unconditionally(lineno(), "endpoint unhalthalt failed") + + +def get_set_interface_test(dev, log): + # Control IN GET_INTERFACE + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_INTERFACE) + request = REQUEST_GET_INTERFACE + value = 0 # Always 0 for this request + index = 0 # Interface index + length = 1 # Always 1 for this request (size of return data) + try: + ret = dev.ctrl_transfer(request_type, request, value, index, length) + print("GET_INTERFACE ret: %d" % (ret[0])) + if(ret[0] != 0): + raise_unconditionally(lineno(), "Wrong interface was set expected: 0") + except usb.core.USBError: + raise_unconditionally(lineno(), "GET_INTERFACE failed") + # test control data transfer + control_data_test(dev, [64, 256], log) + + # Control IN SET_INTERFACE + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_INTERFACE) + request = REQUEST_SET_INTERFACE + value = 1 # Alternative interface setting index + index = 0 # Interface index + length = 0 # Always 0 for this request + try: + dev.ctrl_transfer(request_type, request, value, index, length) + except usb.core.USBError: + raise_unconditionally(lineno(), "SET_INTERFACE failed") + # test control data transfer after alternative interface set + control_data_test(dev, [64, 256], log) + + # Control IN GET_INTERFACE - check if alternative interface setting was set + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_INTERFACE) + request = REQUEST_GET_INTERFACE + value = 0 # Always 0 for this request + index = 0 # Interface index + length = 1 # Always 1 for this request (size of return data) + try: + ret = dev.ctrl_transfer(request_type, request, value, index, length) + if(ret[0] != 1): + raise_unconditionally(lineno(), "Alternative interface setting was not set properly") + except usb.core.USBError: + raise_unconditionally(lineno(), "GET_INTERFACE failed") + + # Control IN SET_INTERFACE restore interfejs settings + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_INTERFACE) + request = REQUEST_SET_INTERFACE + value = 0 # Interface setting index + index = 0 # Interface index + length = 0 # Always 0 for this request + try: + dev.ctrl_transfer(request_type, request, value, index, length) + except usb.core.USBError: + raise_unconditionally(lineno(), "SET_INTERFACE request failed") + # test control data transfer after interface restoring + control_data_test(dev, [64, 256], log) + + # Control IN GET_INTERFACE - check if alternative interface setting was restored properly + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_INTERFACE) + request = REQUEST_GET_INTERFACE + value = 0 # Always 0 for this request + index = 0 # Interface index + length = 1 # Always 1 for this request (size of return data) + try: + ret = dev.ctrl_transfer(request_type, request, value, index, length) + if(ret[0] != 0): + raise_unconditionally(lineno(), "Alternative interface setting was not restored properly") + except usb.core.USBError: + raise_unconditionally(lineno(), "GET_INTERFACE failed") + + +def get_set_configuration_test(dev, log): + # Set Configuration can also be used, with wValue set to 0, to deconfigure the device + # Control IN GET_CONFIGURATION + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_INTERFACE) + request = REQUEST_GET_CONFIGURATION + value = 0 # Always 0 for this request + index = 0 # Always 0 for this request + length = 1 # Always 1 for this request (size of return data) + try: + ret = dev.ctrl_transfer(request_type, request, value, index, length) + if(ret[0] != 1): + raise_unconditionally(lineno(), "Expected first configuration set") + except usb.core.USBError: + raise_unconditionally(lineno(), "GET_CONFIGURATION failed") + # test control data transfer + control_data_test(dev, [64, 256], log) + + # Control OUT SET_CONFIGURATION 0 - deconfigure the device + request_type = build_request_type(CTRL_OUT, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_INTERFACE) + request = REQUEST_SET_CONFIGURATION + value = 0 # Configuration Value (0 - deconfigure the device) + index = 0 # Always 0 for this request + length = 0 # Always 0 for this request + try: + ret = dev.ctrl_transfer(request_type, request, value, index, length) + except usb.core.USBError: + raise_unconditionally(lineno(), "SET_CONFIGURATION failed") + + # Control IN GET_CONFIGURATION - check if deconfigured + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_INTERFACE) + request = REQUEST_GET_CONFIGURATION + value = 0 # Always 0 for this request + index = 0 # Always 0 for this request + length = 1 # Always 1 for this request (size of return data) + try: + ret = dev.ctrl_transfer(request_type, request, value, index, length) + if(ret[0] != 0): + raise_unconditionally(lineno(), "Expected to be deconfigured") + except usb.core.USBError: + raise_unconditionally(lineno(), "GET_CONFIGURATION failed") + + # Control OUT SET_CONFIGURATION 1 - restore first configuration + request_type = build_request_type(CTRL_OUT, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_INTERFACE) + request = REQUEST_SET_CONFIGURATION + value = 1 # Configuration Value + index = 0 # Always 0 for this request + length = 0 # Always 0 for this request + try: + ret = dev.ctrl_transfer(request_type, request, value, index, length) + except usb.core.USBError: + raise_unconditionally(lineno(), "SET_CONFIGURATION failed") + # test control data transfer after configured back + control_data_test(dev, [64, 256], log) + + # Control IN GET_CONFIGURATION - check if configured back + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_INTERFACE) + request = REQUEST_GET_CONFIGURATION + value = 0 # Always 0 for this request + index = 0 # Always 0 for this request + length = 1 # Always 1 for this request (size of return data) + try: + ret = dev.ctrl_transfer(request_type, request, value, index, length) + if(ret[0] != 1): + raise_unconditionally(lineno(), "Expected to be deconfigured: 1") + except usb.core.USBError: + raise_unconditionally(lineno(), "GET_CONFIGURATION failed") + control_data_test(dev, [64, 256], log) + + +def get_descriptor_test(dev, vendor_id, product_id, log): + # Control IN GET_DESCRIPTOR - device + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_DEVICE) + request = REQUEST_GET_DESCRIPTOR + value = (DESC_TYPE_DEVICE << 8) | (0 << 0) # Descriptor Type (H) and Descriptor Index (L) + index = 0 # 0 or Language ID for this request + length = DEVICE_DESC_SIZE # Descriptor Length + try: + ret = dev.ctrl_transfer(request_type, request, value, index, length) + #print("### DEVICE_DESC ####################################################") + #dev_desc = dict(zip(device_descriptor_keys, device_descriptor_parser.unpack(ret))) + #for key in dev_desc: + # print("%s: %d" % (key, dev_desc[key])) + #assert vendor_id != dev_desc['idVendor'] + #assert product_id != dev_desc['idProduct'] + except usb.core.USBError: + raise_unconditionally(lineno(), "Requesting device descriptor failed") + + # Control IN GET_DESCRIPTOR - configuration + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_DEVICE) + request = REQUEST_GET_DESCRIPTOR + value = (DESC_TYPE_CONFIG << 8) | (0 << 0) # Descriptor Type (H) and Descriptor Index (L) + index = 0 # 0 or Language ID for this request + length = CONFIGURATION_DESC_SIZE # Descriptor Length + try: + ret = dev.ctrl_transfer(request_type, request, value, index, length) + #print("### CONFIGURATION_DESC ####################################################") + #conf_desc = dict(zip(configuration_descriptor_keys, configuration_descriptor_parser.unpack(ret))) + #for key in conf_desc: + # print("%s: %d" % (key, conf_desc[key])) + #print("#######################################################") + except usb.core.USBError: + raise_unconditionally(lineno(), "Requesting configuration descriptor failed") + + # Control IN GET_DESCRIPTOR - interface + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_DEVICE) + request = REQUEST_GET_DESCRIPTOR + value = (DESC_TYPE_INTERFACE << 8) | (0 << 0) # Descriptor Type (H) and Descriptor Index (L) + index = 0 # 0 or Language ID for this request + length = INTERFACE_DESC_SIZE # Descriptor Length + try: + ret = dev.ctrl_transfer(request_type, request, value, index, length) + raise_unconditionally(lineno(), "Requesting interface descriptor should fail since it is not directly accessible") + except usb.core.USBError: + log("interface descriptor is not directly accessible") + + # Control IN GET_DESCRIPTOR - interface + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_DEVICE) + request = REQUEST_GET_DESCRIPTOR + value = (DESC_TYPE_ENDPOINT << 8) | (0 << 0) # Descriptor Type (H) and Descriptor Index (L) + index = 0 # 0 or Language ID for this request + length = INTERFACE_DESC_SIZE # Descriptor Length + try: + ret = dev.ctrl_transfer(request_type, request, value, index, length) + raise_unconditionally(lineno(), "Requesting endpoint descriptor should fail since it is not directly accessible") + except usb.core.USBError: + log("endpoint descriptor is not directly accessible") + + +def set_descriptor_test(dev, log): + # SET_DESCRIPTOR is optional and not implemented in Mbed + # command should fail with no action on device side + + # Control OUT SET_DESCRIPTOR + request_type = build_request_type(CTRL_OUT, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_DEVICE) + request = REQUEST_SET_DESCRIPTOR + value = (DESC_TYPE_DEVICE << 8) | (0 << 0) # Descriptor Type (H) and Descriptor Index (L) + index = 0 # 0 or Language ID for this request + data = bytearray(DEVICE_DESC_SIZE) # Descriptor data + try: + dev.ctrl_transfer(request_type, request, value, index, data) + raise_unconditionally(lineno(), "SET_DESCRIPTOR should fail since it is not implemented") + except usb.core.USBError: + log("SET_DESCRIPTOR is unsupported") + + +def synch_frame_test(dev, log): + # only for isochronous endpoints + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_ENDPOINT) + request = REQUEST_SYNCH_FRAME + value = 0 # Always 0 for this request + index = 1 # Endpoint index + length = 2 # Always 2 for this request (size of return data) + try: + ret = dev.ctrl_transfer(request_type, request, value, index, length) + ret = ret[0] | (ret[1] << 8) + log("synch frame ret: %d" % (ret)) + except usb.core.USBError: + raise_unconditionally(lineno(), "SYNCH_FRAME failed") + + def control_stall_test(dev, log): # Control OUT stall @@ -228,7 +851,7 @@ def control_stall_test(dev, log): index = 0 # Communication interface data = bytearray(64) # Dummy data dev.ctrl_transfer(request_type, request, value, index, data, 5000) - raise Exception("Invalid request not stalled") + raise_unconditionally(lineno(), "Invalid request not stalled") except usb.core.USBError: log("Invalid request stalled") @@ -241,7 +864,7 @@ def control_stall_test(dev, log): index = 0 # Communication interface length = 0 dev.ctrl_transfer(request_type, request, value, index, length, 5000) - raise Exception("Invalid request not stalled") + raise_unconditionally(lineno(), "Invalid request not stalled") except usb.core.USBError: log("Invalid request stalled") @@ -254,7 +877,7 @@ def control_stall_test(dev, log): index = 0 # Communication interface length = 0 dev.ctrl_transfer(request_type, request, value, index, length, 5000) - raise Exception("Invalid request not stalled") + raise_unconditionally(lineno(), "Invalid request not stalled") except usb.core.USBError: log("Invalid request stalled") @@ -267,11 +890,11 @@ def control_stall_test(dev, log): index = 0 # Communication interface length = 255 dev.ctrl_transfer(request_type, request, value, index, length, 5000) - raise Exception("Invalid request not stalled") + raise_unconditionally(lineno(), "Invalid request not stalled") except usb.core.USBError: log("Invalid request stalled") - for i in (6, 7, 5): + for i in (3, 4, 5): try: request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, CTRL_RECIPIENT_DEVICE) @@ -280,11 +903,66 @@ def control_stall_test(dev, log): index = 0 # Communication interface length = 255 resp = dev.ctrl_transfer(request_type, request, value, index, length, 5000) - log("Requesting string %s passed" % i) + except usb.core.USBError: + raise_unconditionally(lineno(), "Requesting string failed i: " + str(i)) + + for i in (6, 7): + try: + request_type = build_request_type(CTRL_IN, CTRL_TYPE_STANDARD, + CTRL_RECIPIENT_DEVICE) + request = 0x6 # GET_DESCRIPTOR + value = (0x03 << 8) | (i << 0) # String descriptor index + index = 0 # Communication interface + length = 255 + resp = dev.ctrl_transfer(request_type, request, value, index, length, 5000) + raise_unconditionally(lineno(), "Requesting string passed i: " + str(i)) except usb.core.USBError: log("Requesting string %s failed" % i) - - + + +def control_sizes_test(dev, log): + list = [1, 2, 3, 7, 8, 9, 15, 16, 17, 31, 32, 33, 63, 64, 65, 127, 128, 129, 255, 256, 257, 511, 512, 513, 1023, 1024, 1025, 2047, 2048] + control_data_test(dev, list, log) + + +def control_data_test(dev, sizes_list, log): + # Test control requests of various data stage sizes (1,8,16,32,64,255,256,...) + count = 1 + for i in sizes_list: + request_type = build_request_type(CTRL_OUT, CTRL_TYPE_VENDOR, + CTRL_RECIPIENT_DEVICE) + request = VENDOR_TEST_CTRL_OUT_SIZES + value = i # Size of data the device should actually read + index = 0 # Unused - set for debugging only + data = bytearray(i) # Dummy data + if i == 1: + data[0] = count + else: + data[0] = count - 1 + data[i - 1] = count + 1 + try: + dev.ctrl_transfer(request_type, request, value, index, data, 5000) + except usb.core.USBError: + raise_unconditionally(lineno(), "VENDOR_TEST_CTRL_OUT_SIZES failed ") + + request_type = build_request_type(CTRL_IN, CTRL_TYPE_VENDOR, + CTRL_RECIPIENT_DEVICE) + request = VENDOR_TEST_CTRL_IN_SIZES + value = 0 # Size of data the device should actually send + index = 0 # Unused - set for debugging only + length = i + try: + ret = dev.ctrl_transfer(request_type, request, value, index, length, 5000) + if i == 1: + raise_if_different(count, ret[0], lineno(), "send/receive data not match") + else: + raise_if_different(count - 1, ret[0], lineno(), "send/receive data not match") + raise_if_different(count + 1, ret[i - 1], lineno(), "send/receive data not match") + except usb.core.USBError: + raise_unconditionally(lineno(), "VENDOR_TEST_CTRL_IN_SIZES failed") + count += 1 + + def control_stress_test(dev, log): # Test various patterns of control transfers @@ -337,6 +1015,65 @@ def control_stress_test(dev, log): count += 1 +def device_reset_test(log): + dev = yield + dev.reset(); + dev = yield + dev.reset(); + dev = yield + dev.reset(); + dev = yield + # run other test to check if USB works fine after reset + control_data_test(dev, [64, 256], log) + yield + + +def device_soft_reconnection_test(log): + list = [64, 256] + dev = yield + # run other test to check if USB works fine before reconnection + control_data_test(dev, list, log) + dev = yield + # run other test to check if USB works fine after reconnection + control_data_test(dev, list, log) + dev = yield + # run other test to check if USB works fine after reconnection + control_data_test(dev, list, log) + dev = yield + # run other test to check if USB works fine after reconnection + control_data_test(dev, list, log) + dev = yield + # run other test to check if USB works fine after reconnection + control_data_test(dev, list, log) + yield + + +def device_suspend_resume_test(log): + dev = yield + time.sleep(0.1) + control_data_test(dev, [64, 256], log) + time.sleep(0.1) + control_data_test(dev, [64, 256], log) + time.sleep(0.1) + control_data_test(dev, [64, 256], log) + time.sleep(0.1) + control_data_test(dev, [64, 256], log) + time.sleep(0.1) + yield + + +def repeated_construction_destruction_test(log): + # run other test to check if USB works fine after repeated construction/destruction + list = [64, 256] + dev = yield + control_data_test(dev, list, log) + dev = yield + control_data_test(dev, list, log) + dev = yield + control_data_test(dev, list, log) + yield + + def main(): parser = ArgumentParser(description="USB basic test") parser.add_argument('serial', help='USB serial number of DUT') diff --git a/TESTS/usb_device/basic/USBTester.cpp b/TESTS/usb_device/basic/USBTester.cpp index 3235a3f855..a30a819dd7 100644 --- a/TESTS/usb_device/basic/USBTester.cpp +++ b/TESTS/usb_device/basic/USBTester.cpp @@ -30,12 +30,15 @@ #define VENDOR_TEST_CTRL_NONE_DELAY 6 #define VENDOR_TEST_CTRL_IN_STATUS_DELAY 7 #define VENDOR_TEST_CTRL_OUT_STATUS_DELAY 8 +#define VENDOR_TEST_CTRL_IN_SIZES 9 +#define VENDOR_TEST_CTRL_OUT_SIZES 10 #define MAX_EP_SIZE 64 #define MIN_EP_SIZE 8 -USBTester::USBTester(uint16_t vendor_id, uint16_t product_id, uint16_t product_release, bool connect_blocking): USBDevice(vendor_id, product_id, product_release) +USBTester::USBTester(uint16_t vendor_id, uint16_t product_id, uint16_t product_release, bool connect_blocking): USBDevice(vendor_id, product_id, product_release), + reset_count(0), suspend_count(0), resume_count(0) { EndpointResolver resolver(endpoint_table()); @@ -60,6 +63,46 @@ USBTester::~USBTester() deinit(); } + +const char *USBTester::get_desc_string(const uint8_t *desc) +{ + static char ret_string[128] = {}; + const uint8_t desc_size = desc[0] - 2; + const uint8_t *desc_str = &desc[2]; + uint32_t j = 0; + for(uint32_t i = 0; i < desc_size; i+=2, j++) { + ret_string[j] = desc_str[i]; + } + ret_string[j] = '\0'; + return ret_string; +} + +void USBTester::suspend(bool suspended) +{ + if(suspended) { + ++suspend_count; + } else { + ++resume_count; + } +} + +const char *USBTester::get_serial_desc_string() +{ + return get_desc_string(string_iserial_desc()); +} + +const char *USBTester::get_iinterface_desc_string() +{ + return get_desc_string(string_iserial_desc()); +} + +const char *USBTester::get_iproduct_desc_string() +{ + return get_desc_string(string_iserial_desc()); +} + + + void USBTester::callback_state_change(DeviceState new_state) { // Nothing to do @@ -93,6 +136,16 @@ void USBTester::callback_request(const setup_packet_t *setup) result = Success; delay = 2000; break; + case VENDOR_TEST_CTRL_IN_SIZES: + result = Send; + data = ctrl_buf; + size = setup->wLength; + break; + case VENDOR_TEST_CTRL_OUT_SIZES: + result = Receive; + data = ctrl_buf; + size = setup->wValue; + break; default: result = PassThrough; break; @@ -124,6 +177,12 @@ void USBTester::callback_request_xfer_done(const setup_packet_t *setup, bool abo case VENDOR_TEST_CTRL_OUT: result = true; break; + case VENDOR_TEST_CTRL_OUT_SIZES: + result = true; + break; + case VENDOR_TEST_CTRL_IN_SIZES: + result = true; + break; default: result = false; break; diff --git a/TESTS/usb_device/basic/USBTester.h b/TESTS/usb_device/basic/USBTester.h index 7b56d3171e..b7005c98c0 100644 --- a/TESTS/usb_device/basic/USBTester.h +++ b/TESTS/usb_device/basic/USBTester.h @@ -40,6 +40,24 @@ public: ~USBTester(); + /* + * + * @returns descriptor string in ASCII + */ + const char *get_serial_desc_string(); + const char *get_iproduct_desc_string(); + const char *get_iinterface_desc_string(); + uint32_t get_reset_count() const { return reset_count; } + uint32_t get_suspend_count() const { return suspend_count; } + uint32_t get_resume_count() const { return resume_count; } + void clear_reset_count() { reset_count = 0; } + void clear_suspend_count() { suspend_count = 0; } + void clear_resume_count() { resume_count = 0; } + +private: + const char *get_desc_string(const uint8_t *desc); + virtual void suspend(bool suspended); + protected: /* @@ -78,6 +96,9 @@ protected: uint8_t int_out; uint8_t int_buf[64]; EventQueue *queue; + volatile uint32_t reset_count; + volatile uint32_t suspend_count; + volatile uint32_t resume_count; virtual void callback_state_change(DeviceState new_state); virtual void callback_request(const setup_packet_t *setup); @@ -86,6 +107,7 @@ protected: virtual void callback_set_interface(uint16_t interface, uint8_t alternate); virtual void epbulk_out_callback(usb_ep_t endpoint); virtual void epint_out_callback(usb_ep_t endpoint); + virtual void callback_reset() { ++reset_count; } uint8_t ctrl_buf[2048]; }; diff --git a/TESTS/usb_device/basic/main.cpp b/TESTS/usb_device/basic/main.cpp index 141bb473e2..1577a87148 100644 --- a/TESTS/usb_device/basic/main.cpp +++ b/TESTS/usb_device/basic/main.cpp @@ -29,23 +29,263 @@ using namespace utest::v1; -// Echo server (echo payload to host) -void test_case_basic() +void control_basic_test() { + uint16_t vendor_id = 0x0d28; + uint16_t product_id = 0x0205; + uint16_t product_release = 0x0001; + char _key[11] = {}; + char _value[128] = {}; + char str[128] = {}; + + { + USBTester serial(vendor_id, product_id, product_release, true); + sprintf (str, "%s %d %d", serial.get_serial_desc_string(), vendor_id, product_id); + greentea_send_kv("control_basic_test", str); + // Wait for host before terminating + greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value)); + TEST_ASSERT_EQUAL_STRING("pass", _key); + } +} + +void control_stall_test() +{ + uint16_t vendor_id = 0x0d28; + uint16_t product_id = 0x0205; + uint16_t product_release = 0x0001; char _key[11] = {}; char _value[128] = {}; { - USBTester serial(0x0d28, 0x0205, 0x0001, true); - - greentea_send_kv("usb_enumeration_done", "0123456789"); + USBTester serial(vendor_id, product_id, product_release, true); + greentea_send_kv("control_stall_test", serial.get_serial_desc_string()); // Wait for host before terminating greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value)); + TEST_ASSERT_EQUAL_STRING("pass", _key); + } +} + +void control_sizes_test() +{ + uint16_t vendor_id = 0x0d28; + uint16_t product_id = 0x0205; + uint16_t product_release = 0x0001; + char _key[11] = {}; + char _value[128] = {}; + + { + USBTester serial(vendor_id, product_id, product_release, true); + greentea_send_kv("control_sizes_test", serial.get_serial_desc_string()); + // Wait for host before terminating + greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value)); + TEST_ASSERT_EQUAL_STRING("pass", _key); + } +} + +void control_stress_test() +{ + uint16_t vendor_id = 0x0d28; + uint16_t product_id = 0x0205; + uint16_t product_release = 0x0001; + char _key[11] = {}; + char _value[128] = {}; + + { + USBTester serial(vendor_id, product_id, product_release, true); + greentea_send_kv("control_stress_test", serial.get_serial_desc_string()); + // Wait for host before terminating + greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value)); + TEST_ASSERT_EQUAL_STRING("pass", _key); + } +} + +void device_reset_test() +{ + uint16_t vendor_id = 0x0d28; + uint16_t product_id = 0x0205; + uint16_t product_release = 0x0001; + char _key[11] = {}; + char _value[128] = {}; + + { + USBTester serial(vendor_id, product_id, product_release, true); + + greentea_send_kv("device_reset_test", serial.get_serial_desc_string()); + serial.clear_reset_count(); + // Wait for host before terminating + while(serial.get_reset_count() == 0); + greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value)); + TEST_ASSERT_EQUAL_STRING("pass", _key); + + while(!serial.configured()); + + greentea_send_kv("device_reset_test", serial.get_serial_desc_string()); + serial.clear_reset_count(); + // Wait for host before terminating + while(serial.get_reset_count() == 0); + greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value)); + TEST_ASSERT_EQUAL_STRING("pass", _key); + + while(!serial.configured()); + + greentea_send_kv("device_reset_test", serial.get_serial_desc_string()); + serial.clear_reset_count(); + // Wait for host before terminating + while(serial.get_reset_count() == 0); + greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value)); + TEST_ASSERT_EQUAL_STRING("pass", _key); + + while(!serial.configured()); + + greentea_send_kv("device_reset_test", serial.get_serial_desc_string()); + // Wait for host before terminating + greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value)); + TEST_ASSERT_EQUAL_STRING("pass", _key); + } +} + + +void device_soft_reconnection_test() +{ + uint16_t vendor_id = 0x0d28; + uint16_t product_id = 0x0205; + uint16_t product_release = 0x0001; + char _key[11] = {}; + char _value[128] = {}; + const uint32_t reconnect_try_count = 3; + + { + USBTester serial(vendor_id, product_id, product_release, true); + + greentea_send_kv("device_soft_reconnection_test", serial.get_serial_desc_string()); + // Wait for host before terminating + greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value)); + TEST_ASSERT_EQUAL_STRING("pass", _key); + + for(int i = 0; i < reconnect_try_count; i++) { + serial.disconnect(); + // If disconnect() + connect() occur too fast the reset event will be dropped. + // At a minimum there should be a 200us delay between disconnect and connect. + // To be on the safe side I would recommend a 1ms delay, so the host controller + // has an entire USB frame to detect the disconnect. + wait_ms(1); + serial.connect(); + greentea_send_kv("device_soft_reconnection_test", serial.get_serial_desc_string()); + // Wait for host before terminating + greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value)); + TEST_ASSERT_EQUAL_STRING("pass", _key); + } + + serial.disconnect(); + wait_ms(1); + serial.connect(); + serial.disconnect(); + wait_ms(1); + serial.connect(); + serial.disconnect(); + wait_ms(1); + serial.connect(); + greentea_send_kv("device_soft_reconnection_test", serial.get_serial_desc_string()); + // Wait for host before terminating + greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value)); + TEST_ASSERT_EQUAL_STRING("pass", _key); + } +} + +void device_suspend_resume_test() +{ + uint16_t vendor_id = 0x0d28; + uint16_t product_id = 0x0205; + uint16_t product_release = 0x0001; + char _key[11] = {}; + char _value[128] = {}; + + { + USBTester serial(vendor_id, product_id, product_release, true); + greentea_send_kv("device_suspend_resume_test", serial.get_serial_desc_string()); + printf("[1] suspend_count: %d resume_count: %d\n", serial.get_suspend_count(), serial.get_resume_count()); + serial.clear_suspend_count(); + serial.clear_resume_count(); + // Wait for host before terminating + greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value)); + printf("[2] suspend_count: %d resume_count: %d\n", serial.get_suspend_count(), serial.get_resume_count()); + TEST_ASSERT_EQUAL_STRING("pass", _key); + wait_ms(5000); + printf("[3] suspend_count: %d resume_count: %d\n", serial.get_suspend_count(), serial.get_resume_count()); + } +} + +void repeated_construction_destruction_test() +{ + uint16_t vendor_id = 0x0d28; + uint16_t product_id = 0x0205; + uint16_t product_release = 0x0001; + char _key[11] = {}; + char _value[128] = {}; + + { + USBTester serial(vendor_id, product_id, product_release, true); + TEST_ASSERT_EQUAL(true, serial.configured()); + wait_ms(1); + } + + wait_ms(1); + { + USBTester serial(vendor_id, product_id, product_release, true); + TEST_ASSERT_EQUAL(true, serial.configured()); + wait_ms(1); + } + + wait_ms(1); + { + USBTester serial(vendor_id, product_id, product_release, true); + TEST_ASSERT_EQUAL(true, serial.configured()); + wait_ms(1); + } + + wait_ms(1); + { + USBTester serial(vendor_id, product_id, product_release, true); + TEST_ASSERT_EQUAL(true, serial.configured()); + wait_ms(1); + greentea_send_kv("repeated_construction_destruction_test", serial.get_serial_desc_string()); + // Wait for host before terminating + greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value)); + TEST_ASSERT_EQUAL_STRING("pass", _key); + } + + wait_ms(1); + { + USBTester serial(vendor_id, product_id, product_release, true); + TEST_ASSERT_EQUAL(true, serial.configured()); + wait_ms(1); + greentea_send_kv("repeated_construction_destruction_test", serial.get_serial_desc_string()); + // Wait for host before terminating + greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value)); + TEST_ASSERT_EQUAL_STRING("pass", _key); + } + + wait_ms(1); + { + USBTester serial(vendor_id, product_id, product_release, true); + TEST_ASSERT_EQUAL(true, serial.configured()); + wait_ms(1); + greentea_send_kv("repeated_construction_destruction_test", serial.get_serial_desc_string()); + // Wait for host before terminating + greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value)); + TEST_ASSERT_EQUAL_STRING("pass", _key); } } Case cases[] = { - Case("pyusb basic test", test_case_basic), + Case("usb control basic test", control_basic_test), + Case("usb control stall test", control_stall_test), + Case("usb control sizes test", control_sizes_test), + Case("usb control stress test", control_stress_test), + Case("usb device reset test", device_reset_test), + Case("usb soft reconnection test", device_soft_reconnection_test), + Case("usb device suspend/resume test", device_suspend_resume_test), + Case("usb repeated construction destruction test", repeated_construction_destruction_test) }; utest::v1::status_t greentea_test_setup(const size_t number_of_cases)