diff --git a/TESTS/host_tests/pyusb_basic.py b/TESTS/host_tests/pyusb_basic.py index 07a6cdf016..640d4a1791 100644 --- a/TESTS/host_tests/pyusb_basic.py +++ b/TESTS/host_tests/pyusb_basic.py @@ -85,13 +85,9 @@ REQUEST_SYNCH_FRAME = 12 FEATURE_ENDPOINT_HALT = 0 FEATURE_DEVICE_REMOTE_WAKEUP = 1 - - DEVICE_QUALIFIER_DESC_SIZE = 10 - DESC_TYPE_DEVICE_QUALIFIER = 0x06 - DEVICE_DESC_SIZE = 18 device_descriptor_parser = struct.Struct('BBHBBBBHHHBBBB') device_descriptor_keys = ['bLength', 'bDescriptorType', 'bcdUSB', 'bDeviceClass', @@ -352,11 +348,9 @@ class PyusbBasicTest(BaseHostTest): self.register_callback('reset_support', self._callback_reset_support) - def result(self): return self.__result - def teardown(self): pass @@ -415,10 +409,9 @@ def get_set_configuration_test(dev, log): # check if dafault(1) configuration set try: ret = usb.control.get_configuration(dev) - raise_if_different(1, ret, lineno(), "FAILED - expected first configuration set") + raise_if_different(1, ret, lineno(), 'Invalid configuration.') except usb.core.USBError as error: - print(error) - raise_unconditionally(lineno(), "FAILED - get_configuration !!!") + raise_unconditionally(lineno(), 'get_configuration request failed ({}).'.format(str(error).strip())) cfg = dev.get_active_configuration() for intf in cfg: @@ -428,17 +421,15 @@ def get_set_configuration_test(dev, log): try: ret = dev.set_configuration(0) except usb.core.USBError as error: - print(error) - raise_unconditionally(lineno(), "FAILED - set_configuration(0) !!!") + raise_unconditionally(lineno(), 'set_configuration request (deconfigure) failed ({}).'.format(str(error).strip())) # check if deconfigured try: ret = usb.control.get_configuration(dev) - raise_if_different(0, ret, lineno(), "FAILED - expected to be deconfigured") + raise_if_different(0, ret, lineno(), 'Invalid configuration.') print("device deconfigured - OK") except usb.core.USBError as error: - print(error) - raise_unconditionally(lineno(), "FAILED - get_active_configuration !!!") + raise_unconditionally(lineno(), 'get_configuration request failed ({}).'.format(str(error).strip())) # for every configuration for cfg in dev: @@ -446,17 +437,15 @@ def get_set_configuration_test(dev, log): # set configuration ret = cfg.set() except usb.core.USBError as error: - print(error) - raise_unconditionally(lineno(), "FAILED - set configuration") + raise_unconditionally(lineno(), 'set_configuration request failed ({}).'.format(str(error).strip())) # check if configured try: ret = usb.control.get_configuration(dev) - raise_if_different(cfg.bConfigurationValue, ret, lineno(), "FAILED - expected {} configuration set".format(cfg.bConfigurationValue)) + raise_if_different(cfg.bConfigurationValue, ret, lineno(), 'Invalid configuration.') print("configuration {} set - OK ".format(cfg.bConfigurationValue)) except usb.core.USBError as error: - print(error) - raise_unconditionally(lineno(), "FAILED - get_configuration !!!") + raise_unconditionally(lineno(), 'get_configuration request failed ({}).'.format(str(error).strip())) # test control data transfer after configuration set control_data_test(dev, [64, 256], log) print("") # new line @@ -477,7 +466,7 @@ def get_set_interface_test(dev, log): cfg.set() # for every interface for intf in cfg: - intf.set_altsetting(); + intf.set_altsetting() altsett = usb.control.get_interface(dev, intf.bInterfaceNumber) raise_if_different(intf.bAlternateSetting, altsett, lineno(), text='Wrong alternate setting for interface {}'.format(intf.bInterfaceNumber)) print("cfg({}) inteface {}.{} set - OK".format(cfg.bConfigurationValue, intf.bInterfaceNumber, intf.bAlternateSetting)) @@ -583,15 +572,13 @@ def set_clear_feature_test(dev, log): try: usb.control.set_feature(dev, FEATURE_ENDPOINT_HALT, ep) except usb.core.USBError as err: - print(err) - raise_unconditionally(lineno(), "endpoint {} halt failed".format(ep.bEndpointAddress)) + raise_unconditionally(lineno(), 'set_feature request (halt) failed for endpoint {} ({}).'.format(ep.bEndpointAddress, str(err).strip())) # check if endpoint was halted try: ret = usb.control.get_status(dev, ep) except usb.core.USBError as err: - print(err) - raise_unconditionally(lineno(), "endpoint status failed".format(ep.bEndpointAddress)) + raise_unconditionally(lineno(), 'get_status request failed for endpoint {} ({}).'.format(ep.bEndpointAddress, str(err).strip())) if(ret != 1): raise_unconditionally(lineno(), "endpoint {} was not halted".format(ep.bEndpointAddress)) print("cfg({}) intf({}.{}) ep {} halted - OK".format(cfg.bConfigurationValue, intf.bInterfaceNumber, intf.bAlternateSetting, ep.bEndpointAddress)) @@ -600,8 +587,7 @@ def set_clear_feature_test(dev, log): try: usb.control.clear_feature(dev, FEATURE_ENDPOINT_HALT, ep) except usb.core.USBError as err: - print(err) - raise_unconditionally(lineno(), "endpoint {} unhalt failed".format(ep.bEndpointAddress)) + raise_unconditionally(lineno(), "clear_feature request (unhalt) failed for endpoint {} ({})".format(ep.bEndpointAddress, str(err).strip())) # check if endpoint was unhalted ret = usb.control.get_status(dev, ep) @@ -635,9 +621,9 @@ def get_descriptor_test(dev, vendor_id, product_id, log): try: ret = get_descriptor(dev, (DESC_TYPE_DEVICE << 8) | (0 << 0), 0, DEVICE_DESC_SIZE) dev_desc = dict(zip(device_descriptor_keys, device_descriptor_parser.unpack(ret))) - raise_if_different(DEVICE_DESC_SIZE, dev_desc['bLength'], lineno(), text='Wrong device descriptor size !!!') - raise_if_different(vendor_id, dev_desc['idVendor'], lineno(), text='Wrong vendor id !!!') - raise_if_different(product_id, dev_desc['idProduct'], lineno(), text='Wrong product id !!!') + raise_if_different(DEVICE_DESC_SIZE, dev_desc['bLength'], lineno(), text='Wrong device descriptor size.') + raise_if_different(vendor_id, dev_desc['idVendor'], lineno(), text='Wrong vendor ID.') + raise_if_different(product_id, dev_desc['idProduct'], lineno(), text='Wrong product ID.') except usb.core.USBError: raise_unconditionally(lineno(), "Requesting device descriptor failed") @@ -645,21 +631,21 @@ def get_descriptor_test(dev, vendor_id, product_id, log): try: ret = get_descriptor(dev, (DESC_TYPE_CONFIG << 8) | (0 << 0), 0, CONFIGURATION_DESC_SIZE) conf_desc = dict(zip(configuration_descriptor_keys, configuration_descriptor_parser.unpack(ret))) - raise_if_different(CONFIGURATION_DESC_SIZE, conf_desc['bLength'], lineno(), text='Wrong configuration descriptor size !!!') + raise_if_different(CONFIGURATION_DESC_SIZE, conf_desc['bLength'], lineno(), text='Wrong configuration descriptor size.') except usb.core.USBError: raise_unconditionally(lineno(), "Requesting configuration descriptor failed") # interface descriptor try: ret = get_descriptor(dev, (DESC_TYPE_INTERFACE << 8) | (0 << 0), 0, INTERFACE_DESC_SIZE) - raise_unconditionally(lineno(), "Requesting interface descriptor should fail since it is not directly accessible") + raise_unconditionally(lineno(), "Requesting interface descriptor should fail since it is not directly accessible.") except usb.core.USBError: log("interface descriptor is not directly accessible - OK") # endpoint descriptor try: ret = get_descriptor(dev, (DESC_TYPE_ENDPOINT << 8) | (0 << 0), 0, ENDPOINT_DESC_SIZE) - raise_unconditionally(lineno(), "Requesting endpoint descriptor should fail since it is not directly accessible") + raise_unconditionally(lineno(), "Requesting endpoint descriptor should fail since it is not directly accessible.") except usb.core.USBError: log("endpoint descriptor is not directly accessible - OK") print("") # new line @@ -687,7 +673,7 @@ def set_descriptor_test(dev, log): data = bytearray(DEVICE_DESC_SIZE) # Descriptor data try: dev.ctrl_transfer(request_type, request, value, index, data) - raise_unconditionally(lineno(), "SET_DESCRIPTOR should fail since it is not implemented") + raise_unconditionally(lineno(), "set_descriptor request should fail since it is not implemented") except usb.core.USBError: log("SET_DESCRIPTOR is unsupported - OK") print("") # new line @@ -715,7 +701,7 @@ def synch_frame_test(dev, log): ret = ret[0] | (ret[1] << 8) log("synch frame ret: %d" % (ret)) except usb.core.USBError: - raise_unconditionally(lineno(), "SYNCH_FRAME failed") + raise_unconditionally(lineno(), "SYNCH_FRAME request failed") print("") # new line @@ -847,7 +833,7 @@ def control_data_test(dev, sizes_list, log): ret = dev.ctrl_transfer(request_type, request, value, index, length, 5000) raise_if_different(i, len(ret), lineno(), "send/receive data is the wrong size") for j in range(0, i): - raise_if_different(data[j], ret[j], lineno(), "send/receive data not match") + raise_if_different(data[j], ret[j], lineno(), "send/receive data mismatch") except usb.core.USBError: raise_unconditionally(lineno(), "VENDOR_TEST_CTRL_IN_SIZES failed") count += 1 @@ -1001,7 +987,7 @@ def halt_ep_test(dev, ep_out, ep_in, log): raise RuntimeError('Invalid endpoint status after halt operation') except Exception as err: ctrl_error.set() - log('Endpoint {:#04x} halt failed ({}).'.format(ep_to_halt.bEndpointAddress, err)) + log('Endpoint {:#04x} halt failed ({!r}).'.format(ep_to_halt.bEndpointAddress, err)) # Whether the halt operation was successful or not, # wait a bit so the main thread has a chance to run into a USBError # or report the failure of halt operation. @@ -1015,27 +1001,32 @@ def halt_ep_test(dev, ep_out, ep_in, log): try: while delayed_halt.is_alive(): if ctrl_error.is_set(): - raise_unconditionally(lineno(), 'Halting endpoint {0.bEndpointAddress:#04x} failed' - .format(ep_to_halt)) + break try: loopback_ep_test(ep_out, ep_in, ep_out.wMaxPacketSize) except usb.core.USBError as err: + if ctrl_error.is_set(): + break try: ep_status = usb.control.get_status(dev, ep_to_halt) except usb.core.USBError as err: + if ctrl_error.is_set(): + break raise_unconditionally(lineno(), 'Unable to get endpoint status ({!r}).'.format(err)) if ep_status == 1: # OK, got USBError because of endpoint halt return else: raise_unconditionally(lineno(), 'Unexpected error ({!r}).'.format(err)) - except: - raise + if ctrl_error.is_set(): + raise_unconditionally(lineno(), 'Halting endpoint {0.bEndpointAddress:#04x} failed' + .format(ep_to_halt)) finally: # Always wait for the Timer thread created above. delayed_halt.join() - ep_out.clear_halt() - ep_in.clear_halt() + if not ctrl_error.is_set(): + ep_out.clear_halt() + ep_in.clear_halt() raise_unconditionally(lineno(), 'Halting endpoint {0.bEndpointAddress:#04x}' ' during transmission did not raise USBError.' .format(ep_to_halt))