Merge pull request #11540 from fkjagodzinski/test_update-usb_device-basic

Tests: USB: Update error handling in basic tests
pull/11614/head
Martin Kojtal 2019-09-30 15:28:05 +02:00 committed by GitHub
commit cd8753a1bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 183 additions and 363 deletions

View File

@ -1,6 +1,6 @@
"""
mbed SDK
Copyright (c) 2018-2018 ARM Limited
Copyright (c) 2018-2019 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
@ -26,6 +26,7 @@ from threading import Thread, Event, Timer
import array
import random
import os
import traceback
import usb.core
from usb.util import build_request_type
@ -84,13 +85,9 @@ REQUEST_SYNCH_FRAME = 12
FEATURE_ENDPOINT_HALT = 0
FEATURE_DEVICE_REMOTE_WAKEUP = 1
DEVICE_QUALIFIER_DESC_SIZE = 10
DESC_TYPE_DEVICE_QUALIFIER = 0x06
DEVICE_DESC_SIZE = 18
device_descriptor_parser = struct.Struct('BBHBBBBHHHBBBB')
device_descriptor_keys = ['bLength', 'bDescriptorType', 'bcdUSB', 'bDeviceClass',
@ -122,197 +119,169 @@ ENDPOINT_TYPE_NAMES = {
usb.ENDPOINT_TYPE_INTERRUPT: 'INTERRUPT',
usb.ENDPOINT_TYPE_ISOCHRONOUS: 'ISOCHRONOUS'}
# Greentea message keys used to notify DUT of test status
MSG_KEY_TEST_CASE_FAILED = 'fail'
MSG_KEY_TEST_CASE_PASSED = 'pass'
MSG_VALUE_DUMMY = '0'
def format_local_error_msg(fmt):
"""Return an error message formatted with the last traceback entry from this file.
The message is formatted according to fmt with data from the last traceback
enrty internal to this file. There are 4 arguments supplied to the format
function: filename, line_number, exc_type and exc_value.
Returns None if formatting fails.
"""
try:
exc_type, exc_value, exc_traceback = sys.exc_info()
# A list of 4-tuples (filename, line_number, function_name, text).
tb_entries = traceback.extract_tb(exc_traceback)
# Reuse the filename from the first tuple instead of relying on __file__:
# 1. No need for path handling.
# 2. No need for file extension handling (i.e. .py vs .pyc).
name_of_this_file = tb_entries[0][0]
last_internal_tb_entry = [tb for tb in tb_entries if tb[0] == name_of_this_file][-1]
msg = fmt.format(
filename=last_internal_tb_entry[0],
line_number=last_internal_tb_entry[1],
exc_type=str(exc_type).strip(),
exc_value=str(exc_value).strip(),
)
except (IndexError, KeyError):
msg = None
return msg
class PyusbBasicTest(BaseHostTest):
def test_usb_device(self, usb_dev_serial_number, test_fun, **test_fun_kwargs):
"""Find a USB device and execute a testing function.
Search is based on usb_dev_serial_number. If the device is found, the
test_fun is executed with its dev argument set to the device found and
all other kwargs set as specified by test_fun_kwargs.
The DUT is notified with either success, failure or error status.
"""
usb_device = self.find_device(usb_dev_serial_number)
if usb_device is None:
self.notify_error('USB device (SN={}) not found.'.format(usb_dev_serial_number))
return
try:
test_fun(usb_device, **test_fun_kwargs)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
def _callback_control_basic_test(self, key, value, timestamp):
serial_number, vendor_id, product_id = value.split(' ')
self.log("Received serial %s" % (serial_number))
self.log("Received vendor_id %s" % (vendor_id))
self.log("Received product_id %s" % (product_id))
dev = self.find_device(serial_number)
if(dev == None):
return
try:
control_basic_test(dev, int(vendor_id), int(product_id), log=print)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
self.test_usb_device(
usb_dev_serial_number=serial_number,
test_fun=control_basic_test,
log=print,
vendor_id=int(vendor_id),
product_id=int(product_id)
)
def _callback_control_stall_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
control_stall_test(dev, log=print)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=control_stall_test,
log=print
)
def _callback_control_sizes_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
control_sizes_test(dev, log=print)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=control_sizes_test,
log=print
)
def _callback_control_stress_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
control_stress_test(dev, log=print)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=control_stress_test,
log=print
)
def _callback_device_reset_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
self.device_reset_test.send(dev)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
self.test_usb_device(
usb_dev_serial_number=value,
# Advance the coroutine to the next yield statement
# and send the usb_device to use.
test_fun=self.device_reset_test.send
)
def _callback_device_soft_reconnection_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
self.device_soft_reconnection_test.send(dev)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
self.test_usb_device(
usb_dev_serial_number=value,
# Advance the coroutine to the next yield statement
# and send the usb_device to use.
test_fun=self.device_soft_reconnection_test.send
)
def _callback_device_suspend_resume_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
self.device_suspend_resume_test.send(dev)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
self.test_usb_device(
usb_dev_serial_number=value,
# Advance the coroutine to the next yield statement
# and send the usb_device to use.
test_fun=self.device_suspend_resume_test.send
)
def _callback_repeated_construction_destruction_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
self.repeated_construction_destruction_test.send(dev)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
self.test_usb_device(
usb_dev_serial_number=value,
# Advance the coroutine to the next yield statement
# and send the usb_device to use.
test_fun=self.repeated_construction_destruction_test.send
)
def _callback_ep_test_data_correctness(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
ep_test_data_correctness(dev, log=print)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=ep_test_data_correctness,
log=print
)
def _callback_ep_test_halt(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
ep_test_halt(dev, log=print)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=ep_test_halt,
log=print
)
def _callback_ep_test_parallel_transfers(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
ep_test_parallel_transfers(dev, log=print)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=ep_test_parallel_transfers,
log=print
)
def _callback_ep_test_parallel_transfers_ctrl(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
ep_test_parallel_transfers_ctrl(dev, log=print)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=ep_test_parallel_transfers_ctrl,
log=print
)
def _callback_ep_test_abort(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
ep_test_abort(dev, log=print)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=ep_test_abort,
log=print
)
def _callback_ep_test_data_toggle(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if(dev == None):
return
try:
ep_test_data_toggle(dev, log=print)
self.report_success()
except (RuntimeError) as exc:
self.report_error(exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=ep_test_data_toggle,
log=print
)
def _callback_reset_support(self, key, value, timestamp):
status = "false" if sys.platform == "darwin" else "true"
@ -326,18 +295,25 @@ class PyusbBasicTest(BaseHostTest):
if dev is not None:
break
time.sleep(0.1)
if dev is None:
self.log("Device not found")
self.send_kv("failed", "0")
return dev
def notify_success(self, value=None, msg=''):
"""Report a host side test success to the DUT."""
if msg:
self.log('TEST PASSED: {}'.format(msg))
if value is None:
value = MSG_VALUE_DUMMY
self.send_kv(MSG_KEY_TEST_CASE_PASSED, value)
def report_success(self):
self.send_kv("pass", "0")
def report_error(self, msg):
def notify_failure(self, msg):
"""Report a host side test failure to the DUT."""
self.log('TEST FAILED: {}'.format(msg))
self.send_kv("failed", "0")
self.send_kv(MSG_KEY_TEST_CASE_FAILED, MSG_VALUE_DUMMY)
def notify_error(self, msg):
"""Terminate the test with an error msg."""
self.log('TEST ERROR: {}'.format(msg))
self.notify_complete(None)
def setup(self):
self.__result = False
@ -372,11 +348,9 @@ class PyusbBasicTest(BaseHostTest):
self.register_callback('reset_support', self._callback_reset_support)
def result(self):
return self.__result
def teardown(self):
pass
@ -435,10 +409,9 @@ def get_set_configuration_test(dev, log):
# check if dafault(1) configuration set
try:
ret = usb.control.get_configuration(dev)
raise_if_different(1, ret, lineno(), "FAILED - expected first configuration set")
raise_if_different(1, ret, lineno(), 'Invalid configuration.')
except usb.core.USBError as error:
print(error)
raise_unconditionally(lineno(), "FAILED - get_configuration !!!")
raise_unconditionally(lineno(), 'get_configuration request failed ({}).'.format(str(error).strip()))
cfg = dev.get_active_configuration()
for intf in cfg:
@ -448,17 +421,15 @@ def get_set_configuration_test(dev, log):
try:
ret = dev.set_configuration(0)
except usb.core.USBError as error:
print(error)
raise_unconditionally(lineno(), "FAILED - set_configuration(0) !!!")
raise_unconditionally(lineno(), 'set_configuration request (deconfigure) failed ({}).'.format(str(error).strip()))
# check if deconfigured
try:
ret = usb.control.get_configuration(dev)
raise_if_different(0, ret, lineno(), "FAILED - expected to be deconfigured")
raise_if_different(0, ret, lineno(), 'Invalid configuration.')
print("device deconfigured - OK")
except usb.core.USBError as error:
print(error)
raise_unconditionally(lineno(), "FAILED - get_active_configuration !!!")
raise_unconditionally(lineno(), 'get_configuration request failed ({}).'.format(str(error).strip()))
# for every configuration
for cfg in dev:
@ -466,17 +437,15 @@ def get_set_configuration_test(dev, log):
# set configuration
ret = cfg.set()
except usb.core.USBError as error:
print(error)
raise_unconditionally(lineno(), "FAILED - set configuration")
raise_unconditionally(lineno(), 'set_configuration request failed ({}).'.format(str(error).strip()))
# check if configured
try:
ret = usb.control.get_configuration(dev)
raise_if_different(cfg.bConfigurationValue, ret, lineno(), "FAILED - expected {} configuration set".format(cfg.bConfigurationValue))
raise_if_different(cfg.bConfigurationValue, ret, lineno(), 'Invalid configuration.')
print("configuration {} set - OK ".format(cfg.bConfigurationValue))
except usb.core.USBError as error:
print(error)
raise_unconditionally(lineno(), "FAILED - get_configuration !!!")
raise_unconditionally(lineno(), 'get_configuration request failed ({}).'.format(str(error).strip()))
# test control data transfer after configuration set
control_data_test(dev, [64, 256], log)
print("") # new line
@ -497,7 +466,7 @@ def get_set_interface_test(dev, log):
cfg.set()
# for every interface
for intf in cfg:
intf.set_altsetting();
intf.set_altsetting()
altsett = usb.control.get_interface(dev, intf.bInterfaceNumber)
raise_if_different(intf.bAlternateSetting, altsett, lineno(), text='Wrong alternate setting for interface {}'.format(intf.bInterfaceNumber))
print("cfg({}) inteface {}.{} set - OK".format(cfg.bConfigurationValue, intf.bInterfaceNumber, intf.bAlternateSetting))
@ -603,15 +572,13 @@ def set_clear_feature_test(dev, log):
try:
usb.control.set_feature(dev, FEATURE_ENDPOINT_HALT, ep)
except usb.core.USBError as err:
print(err)
raise_unconditionally(lineno(), "endpoint {} halt failed".format(ep.bEndpointAddress))
raise_unconditionally(lineno(), 'set_feature request (halt) failed for endpoint {} ({}).'.format(ep.bEndpointAddress, str(err).strip()))
# check if endpoint was halted
try:
ret = usb.control.get_status(dev, ep)
except usb.core.USBError as err:
print(err)
raise_unconditionally(lineno(), "endpoint status failed".format(ep.bEndpointAddress))
raise_unconditionally(lineno(), 'get_status request failed for endpoint {} ({}).'.format(ep.bEndpointAddress, str(err).strip()))
if(ret != 1):
raise_unconditionally(lineno(), "endpoint {} was not halted".format(ep.bEndpointAddress))
print("cfg({}) intf({}.{}) ep {} halted - OK".format(cfg.bConfigurationValue, intf.bInterfaceNumber, intf.bAlternateSetting, ep.bEndpointAddress))
@ -620,8 +587,7 @@ def set_clear_feature_test(dev, log):
try:
usb.control.clear_feature(dev, FEATURE_ENDPOINT_HALT, ep)
except usb.core.USBError as err:
print(err)
raise_unconditionally(lineno(), "endpoint {} unhalt failed".format(ep.bEndpointAddress))
raise_unconditionally(lineno(), "clear_feature request (unhalt) failed for endpoint {} ({})".format(ep.bEndpointAddress, str(err).strip()))
# check if endpoint was unhalted
ret = usb.control.get_status(dev, ep)
@ -655,9 +621,9 @@ def get_descriptor_test(dev, vendor_id, product_id, log):
try:
ret = get_descriptor(dev, (DESC_TYPE_DEVICE << 8) | (0 << 0), 0, DEVICE_DESC_SIZE)
dev_desc = dict(zip(device_descriptor_keys, device_descriptor_parser.unpack(ret)))
raise_if_different(DEVICE_DESC_SIZE, dev_desc['bLength'], lineno(), text='Wrong device descriptor size !!!')
raise_if_different(vendor_id, dev_desc['idVendor'], lineno(), text='Wrong vendor id !!!')
raise_if_different(product_id, dev_desc['idProduct'], lineno(), text='Wrong product id !!!')
raise_if_different(DEVICE_DESC_SIZE, dev_desc['bLength'], lineno(), text='Wrong device descriptor size.')
raise_if_different(vendor_id, dev_desc['idVendor'], lineno(), text='Wrong vendor ID.')
raise_if_different(product_id, dev_desc['idProduct'], lineno(), text='Wrong product ID.')
except usb.core.USBError:
raise_unconditionally(lineno(), "Requesting device descriptor failed")
@ -665,21 +631,21 @@ def get_descriptor_test(dev, vendor_id, product_id, log):
try:
ret = get_descriptor(dev, (DESC_TYPE_CONFIG << 8) | (0 << 0), 0, CONFIGURATION_DESC_SIZE)
conf_desc = dict(zip(configuration_descriptor_keys, configuration_descriptor_parser.unpack(ret)))
raise_if_different(CONFIGURATION_DESC_SIZE, conf_desc['bLength'], lineno(), text='Wrong configuration descriptor size !!!')
raise_if_different(CONFIGURATION_DESC_SIZE, conf_desc['bLength'], lineno(), text='Wrong configuration descriptor size.')
except usb.core.USBError:
raise_unconditionally(lineno(), "Requesting configuration descriptor failed")
# interface descriptor
try:
ret = get_descriptor(dev, (DESC_TYPE_INTERFACE << 8) | (0 << 0), 0, INTERFACE_DESC_SIZE)
raise_unconditionally(lineno(), "Requesting interface descriptor should fail since it is not directly accessible")
raise_unconditionally(lineno(), "Requesting interface descriptor should fail since it is not directly accessible.")
except usb.core.USBError:
log("interface descriptor is not directly accessible - OK")
# endpoint descriptor
try:
ret = get_descriptor(dev, (DESC_TYPE_ENDPOINT << 8) | (0 << 0), 0, ENDPOINT_DESC_SIZE)
raise_unconditionally(lineno(), "Requesting endpoint descriptor should fail since it is not directly accessible")
raise_unconditionally(lineno(), "Requesting endpoint descriptor should fail since it is not directly accessible.")
except usb.core.USBError:
log("endpoint descriptor is not directly accessible - OK")
print("") # new line
@ -707,7 +673,7 @@ def set_descriptor_test(dev, log):
data = bytearray(DEVICE_DESC_SIZE) # Descriptor data
try:
dev.ctrl_transfer(request_type, request, value, index, data)
raise_unconditionally(lineno(), "SET_DESCRIPTOR should fail since it is not implemented")
raise_unconditionally(lineno(), "set_descriptor request should fail since it is not implemented")
except usb.core.USBError:
log("SET_DESCRIPTOR is unsupported - OK")
print("") # new line
@ -735,7 +701,7 @@ def synch_frame_test(dev, log):
ret = ret[0] | (ret[1] << 8)
log("synch frame ret: %d" % (ret))
except usb.core.USBError:
raise_unconditionally(lineno(), "SYNCH_FRAME failed")
raise_unconditionally(lineno(), "SYNCH_FRAME request failed")
print("") # new line
@ -867,7 +833,7 @@ def control_data_test(dev, sizes_list, log):
ret = dev.ctrl_transfer(request_type, request, value, index, length, 5000)
raise_if_different(i, len(ret), lineno(), "send/receive data is the wrong size")
for j in range(0, i):
raise_if_different(data[j], ret[j], lineno(), "send/receive data not match")
raise_if_different(data[j], ret[j], lineno(), "send/receive data mismatch")
except usb.core.USBError:
raise_unconditionally(lineno(), "VENDOR_TEST_CTRL_IN_SIZES failed")
count += 1
@ -1021,7 +987,7 @@ def halt_ep_test(dev, ep_out, ep_in, log):
raise RuntimeError('Invalid endpoint status after halt operation')
except Exception as err:
ctrl_error.set()
log('Endpoint {:#04x} halt failed ({}).'.format(ep_to_halt.bEndpointAddress, err))
log('Endpoint {:#04x} halt failed ({!r}).'.format(ep_to_halt.bEndpointAddress, err))
# Whether the halt operation was successful or not,
# wait a bit so the main thread has a chance to run into a USBError
# or report the failure of halt operation.
@ -1035,27 +1001,32 @@ def halt_ep_test(dev, ep_out, ep_in, log):
try:
while delayed_halt.is_alive():
if ctrl_error.is_set():
raise_unconditionally(lineno(), 'Halting endpoint {0.bEndpointAddress:#04x} failed'
.format(ep_to_halt))
break
try:
loopback_ep_test(ep_out, ep_in, ep_out.wMaxPacketSize)
except usb.core.USBError as err:
if ctrl_error.is_set():
break
try:
ep_status = usb.control.get_status(dev, ep_to_halt)
except usb.core.USBError as err:
if ctrl_error.is_set():
break
raise_unconditionally(lineno(), 'Unable to get endpoint status ({!r}).'.format(err))
if ep_status == 1:
# OK, got USBError because of endpoint halt
return
else:
raise_unconditionally(lineno(), 'Unexpected error ({!r}).'.format(err))
except:
raise
if ctrl_error.is_set():
raise_unconditionally(lineno(), 'Halting endpoint {0.bEndpointAddress:#04x} failed'
.format(ep_to_halt))
finally:
# Always wait for the Timer thread created above.
delayed_halt.join()
ep_out.clear_halt()
ep_in.clear_halt()
if not ctrl_error.is_set():
ep_out.clear_halt()
ep_in.clear_halt()
raise_unconditionally(lineno(), 'Halting endpoint {0.bEndpointAddress:#04x}'
' during transmission did not raise USBError.'
.format(ep_to_halt))
@ -1668,154 +1639,3 @@ def get_descriptor(dev, type_index, lang_id, length):
ret = dev.ctrl_transfer(request_type, request, value, index, length)
return ret
"""
For documentation purpose until test writing finished
TODO: remove this if not needed anymore
def test_device(serial_number, log=print):
dev = usb.core.find(custom_match=TestMatch(serial_number))
if dev is None:
log("Device not found")
return
## --Control Tests-- ##
#control_basic_test(dev, log)
# Test control IN/OUT/NODATA
control_stall_test(dev, log)
# Invalid control in/out/nodata requests are stalled
# Stall during different points in the control transfer
#control_sizes_test(dev, log)
# Test control requests of various data stage sizes (1,8,16,32,64,255,256,...)
control_stress_test(dev, log)
# normal and delay mode
## --Endpoint test-- ##
#for each endpoint
#-test all allowed wMaxPacketSize sizes and transfer types
#-stall tests
#-set/clear stall control request
#-stall at random points of sending/receiveing data
#-test aborting an in progress transfer
#test as many endpoints at once as possible
#test biggest configuration possible
## --physical test-- ##
#-reset notification/handling
#-connect/disconnect tests - have device disconnect and then reconnect
#-disconnect during various phases of control transfers and endpoint transfers
#-suspend/resume tests (may not be possible to test with current framework)
#-suspend/resume notifications
## -- Stress tests-- ##
#-concurrent tests (all endpoints at once including control)
#-concurrent tests + reset + delay
## -- other tests-- ##
#-report throughput for in/out of control, bulk, interrupt and iso transfers
#-verify that construction/destruction repeatedly works gracefully
intf = get_interface(dev, 0, 0)
# Find endpoints
bulk_in = None
bulk_out = None
int_in = None
int_out = None
for endpoint in intf:
log("Processing endpoint %s" % endpoint)
ep_type = endpoint.bmAttributes & 0x3
if ep_type == 2:
if endpoint.bEndpointAddress & 0x80:
assert bulk_in is None
bulk_in = endpoint
else:
assert bulk_out is None
bulk_out = endpoint
elif ep_type == 3:
if endpoint.bEndpointAddress & 0x80:
assert int_in is None
int_in = endpoint
else:
assert int_out is None
int_out = endpoint
assert bulk_in is not None
assert bulk_out is not None
assert int_in is not None
assert int_out is not None
bulk_out.write("hello" + "x" *256);
int_out.write("world" + "x" *256);
dev.set_interface_altsetting(0, 1)
intf = get_interface(dev, 0, 0)
# Find endpoints
bulk_in = None
bulk_out = None
int_in = None
int_out = None
for endpoint in intf:
log("Processing endpoint %s" % endpoint)
ep_type = endpoint.bmAttributes & 0x3
if ep_type == 2:
if endpoint.bEndpointAddress & 0x80:
assert bulk_in is None
bulk_in = endpoint
else:
assert bulk_out is None
bulk_out = endpoint
elif ep_type == 3:
if endpoint.bEndpointAddress & 0x80:
assert int_in is None
int_in = endpoint
else:
assert int_out is None
int_out = endpoint
assert bulk_in is not None
assert bulk_out is not None
assert int_in is not None
assert int_out is not None
bulk_out.write("hello2" + "x" *256);
int_out.write("world2" + "x" *256);
t = Thread(target=write_data, args=(bulk_out,))
t.start()
for _ in range(10):
request_type = build_request_type(CTRL_OUT, CTRL_TYPE_VENDOR,
CTRL_RECIPIENT_DEVICE)
request = VENDOR_TEST_CTRL_NONE_DELAY
value = 0 # Always 0 for this request
index = 0 # Communication interface
length = 0 # No data
dev.ctrl_transfer(request_type, request, value, index, length, 5000)
t.join()
def write_data(pipe):
print("Write data running")
count = 0
for _ in range(40):
pipe.write("Value is %s" % count)
count += 1
print("Count %s" % count)
time.sleep(0.5)
def main():
parser = ArgumentParser(description="USB basic test")
parser.add_argument('serial', help='USB serial number of DUT')
args = parser.parse_args()
ret = test_device(args.serial)
print("Test %s" % "passed" if ret else "failed")
if __name__ == "__main__":
main()
"""