pyusb_basic test formatting fixes

pull/9768/head
Maciej Bocianski 2018-04-26 15:18:57 +02:00 committed by Russ Butler
parent 9b4970342b
commit 972fee6143
1 changed files with 154 additions and 146 deletions

View File

@ -55,17 +55,17 @@ VENDOR_TEST_CTRL_IN_SIZES = 9
VENDOR_TEST_CTRL_OUT_SIZES = 10
VENDOR_TEST_UNSUPPORTED_REQUEST = 32
REQUEST_GET_STATUS = 0 # done
REQUEST_CLEAR_FEATURE = 1 # done
REQUEST_SET_FEATURE = 3 # done
REQUEST_SET_ADDRESS = 5 # ???
REQUEST_GET_STATUS = 0
REQUEST_CLEAR_FEATURE = 1
REQUEST_SET_FEATURE = 3
REQUEST_SET_ADDRESS = 5
REQUEST_GET_DESCRIPTOR = 6
REQUEST_SET_DESCRIPTOR = 7 # done
REQUEST_GET_CONFIGURATION = 8 # done
REQUEST_SET_CONFIGURATION = 9 # done
REQUEST_GET_INTERFACE = 10 # done
REQUEST_SET_INTERFACE = 11 # done
REQUEST_SYNCH_FRAME = 12 # almost done
REQUEST_SET_DESCRIPTOR = 7
REQUEST_GET_CONFIGURATION = 8
REQUEST_SET_CONFIGURATION = 9
REQUEST_GET_INTERFACE = 10
REQUEST_SET_INTERFACE = 11
REQUEST_SYNCH_FRAME = 12
FEATURE_ENDPOINT_HALT = 0
FEATURE_DEVICE_REMOTE_WAKEUP = 1
@ -269,9 +269,11 @@ class PyusbBasicTest(BaseHostTest):
self.register_callback('reset_support', self._callback_reset_support)
def result(self):
return self.__result
def teardown(self):
pass
@ -292,150 +294,18 @@ def lineno():
"""Returns the current line number in our program."""
return inspect.currentframe().f_back.f_lineno
def raise_if_different(expected, actual, line, text=''):
"""Raise a RuntimeError if actual is different than expected."""
if expected != actual:
raise RuntimeError('[{}]:{}, {} Got {!r}, expected {!r}'.format(__file__, line, text, actual, expected))
def raise_unconditionally(line, text=''):
"""Raise a RuntimeError unconditionally."""
raise RuntimeError('[{}]:{}, {}'.format(__file__, line, text))
def test_device(serial_number, log=print):
dev = usb.core.find(custom_match=TestMatch(serial_number))
if dev is None:
log("Device not found")
return
## --Control Tests-- ##
#control_basic_test(dev, log)
# Test control IN/OUT/NODATA
control_stall_test(dev, log)
# Invalid control in/out/nodata requests are stalled
# Stall during different points in the control transfer
#control_sizes_test(dev, log)
# Test control requests of various data stage sizes (1,8,16,32,64,255,256,...)
control_stress_test(dev, log)
# normal and delay mode
## --Endpoint test-- ##
#for each endpoint
#-test all allowed wMaxPacketSize sizes and transfer types
#-stall tests
#-set/clear stall control request
#-stall at random points of sending/receiveing data
#-test aborting an in progress transfer
#test as many endpoints at once as possible
#test biggest configuration possible
## --physical test-- ##
#-reset notification/handling
#-connect/disconnect tests - have device disconnect and then reconnect
#-disconnect during various phases of control transfers and endpoint transfers
#-suspend/resume tests (may not be possible to test with current framework)
#-suspend/resume notifications
## -- Stress tests-- ##
#-concurrent tests (all endpoints at once including control)
#-concurrent tests + reset + delay
## -- other tests-- ##
#-report throughput for in/out of control, bulk, interrupt and iso transfers
#-verify that construction/destruction repeatedly works gracefully
intf = get_interface(dev, 0, 0)
# Find endpoints
bulk_in = None
bulk_out = None
int_in = None
int_out = None
for endpoint in intf:
log("Processing endpoint %s" % endpoint)
ep_type = endpoint.bmAttributes & 0x3
if ep_type == 2:
if endpoint.bEndpointAddress & 0x80:
assert bulk_in is None
bulk_in = endpoint
else:
assert bulk_out is None
bulk_out = endpoint
elif ep_type == 3:
if endpoint.bEndpointAddress & 0x80:
assert int_in is None
int_in = endpoint
else:
assert int_out is None
int_out = endpoint
assert bulk_in is not None
assert bulk_out is not None
assert int_in is not None
assert int_out is not None
bulk_out.write("hello" + "x" *256);
int_out.write("world" + "x" *256);
dev.set_interface_altsetting(0, 1)
intf = get_interface(dev, 0, 0)
# Find endpoints
bulk_in = None
bulk_out = None
int_in = None
int_out = None
for endpoint in intf:
log("Processing endpoint %s" % endpoint)
ep_type = endpoint.bmAttributes & 0x3
if ep_type == 2:
if endpoint.bEndpointAddress & 0x80:
assert bulk_in is None
bulk_in = endpoint
else:
assert bulk_out is None
bulk_out = endpoint
elif ep_type == 3:
if endpoint.bEndpointAddress & 0x80:
assert int_in is None
int_in = endpoint
else:
assert int_out is None
int_out = endpoint
assert bulk_in is not None
assert bulk_out is not None
assert int_in is not None
assert int_out is not None
bulk_out.write("hello2" + "x" *256);
int_out.write("world2" + "x" *256);
t = Thread(target=write_data, args=(bulk_out,))
t.start()
for _ in range(10):
request_type = build_request_type(CTRL_OUT, CTRL_TYPE_VENDOR,
CTRL_RECIPIENT_DEVICE)
request = VENDOR_TEST_CTRL_NONE_DELAY
value = 0 # Always 0 for this request
index = 0 # Communication interface
length = 0 # No data
dev.ctrl_transfer(request_type, request, value, index, length, 5000)
t.join()
def write_data(pipe):
print("Write data running")
count = 0
for _ in range(40):
pipe.write("Value is %s" % count)
count += 1
print("Count %s" % count)
time.sleep(0.5)
def control_basic_test(dev, vendor_id, product_id, log):
get_status_test(dev, log)
set_clear_feature_test(dev, log)
@ -915,7 +785,6 @@ def control_data_test(dev, sizes_list, log):
def control_stress_test(dev, log):
# Test various patterns of control transfers
#
# Some devices have had problems with back-to-back
@ -1025,6 +894,145 @@ def repeated_construction_destruction_test(log):
yield
"""
For documentation purpose until test writing finished
TODO: remove this if not needed anymore
def test_device(serial_number, log=print):
dev = usb.core.find(custom_match=TestMatch(serial_number))
if dev is None:
log("Device not found")
return
## --Control Tests-- ##
#control_basic_test(dev, log)
# Test control IN/OUT/NODATA
control_stall_test(dev, log)
# Invalid control in/out/nodata requests are stalled
# Stall during different points in the control transfer
#control_sizes_test(dev, log)
# Test control requests of various data stage sizes (1,8,16,32,64,255,256,...)
control_stress_test(dev, log)
# normal and delay mode
## --Endpoint test-- ##
#for each endpoint
#-test all allowed wMaxPacketSize sizes and transfer types
#-stall tests
#-set/clear stall control request
#-stall at random points of sending/receiveing data
#-test aborting an in progress transfer
#test as many endpoints at once as possible
#test biggest configuration possible
## --physical test-- ##
#-reset notification/handling
#-connect/disconnect tests - have device disconnect and then reconnect
#-disconnect during various phases of control transfers and endpoint transfers
#-suspend/resume tests (may not be possible to test with current framework)
#-suspend/resume notifications
## -- Stress tests-- ##
#-concurrent tests (all endpoints at once including control)
#-concurrent tests + reset + delay
## -- other tests-- ##
#-report throughput for in/out of control, bulk, interrupt and iso transfers
#-verify that construction/destruction repeatedly works gracefully
intf = get_interface(dev, 0, 0)
# Find endpoints
bulk_in = None
bulk_out = None
int_in = None
int_out = None
for endpoint in intf:
log("Processing endpoint %s" % endpoint)
ep_type = endpoint.bmAttributes & 0x3
if ep_type == 2:
if endpoint.bEndpointAddress & 0x80:
assert bulk_in is None
bulk_in = endpoint
else:
assert bulk_out is None
bulk_out = endpoint
elif ep_type == 3:
if endpoint.bEndpointAddress & 0x80:
assert int_in is None
int_in = endpoint
else:
assert int_out is None
int_out = endpoint
assert bulk_in is not None
assert bulk_out is not None
assert int_in is not None
assert int_out is not None
bulk_out.write("hello" + "x" *256);
int_out.write("world" + "x" *256);
dev.set_interface_altsetting(0, 1)
intf = get_interface(dev, 0, 0)
# Find endpoints
bulk_in = None
bulk_out = None
int_in = None
int_out = None
for endpoint in intf:
log("Processing endpoint %s" % endpoint)
ep_type = endpoint.bmAttributes & 0x3
if ep_type == 2:
if endpoint.bEndpointAddress & 0x80:
assert bulk_in is None
bulk_in = endpoint
else:
assert bulk_out is None
bulk_out = endpoint
elif ep_type == 3:
if endpoint.bEndpointAddress & 0x80:
assert int_in is None
int_in = endpoint
else:
assert int_out is None
int_out = endpoint
assert bulk_in is not None
assert bulk_out is not None
assert int_in is not None
assert int_out is not None
bulk_out.write("hello2" + "x" *256);
int_out.write("world2" + "x" *256);
t = Thread(target=write_data, args=(bulk_out,))
t.start()
for _ in range(10):
request_type = build_request_type(CTRL_OUT, CTRL_TYPE_VENDOR,
CTRL_RECIPIENT_DEVICE)
request = VENDOR_TEST_CTRL_NONE_DELAY
value = 0 # Always 0 for this request
index = 0 # Communication interface
length = 0 # No data
dev.ctrl_transfer(request_type, request, value, index, length, 5000)
t.join()
def write_data(pipe):
print("Write data running")
count = 0
for _ in range(40):
pipe.write("Value is %s" % count)
count += 1
print("Count %s" % count)
time.sleep(0.5)
def main():
parser = ArgumentParser(description="USB basic test")
parser.add_argument('serial', help='USB serial number of DUT')
@ -1032,6 +1040,6 @@ def main():
ret = test_device(args.serial)
print("Test %s" % "passed" if ret else "failed")
if __name__ == "__main__":
main()
"""