Tests: USB: Extract common code in basic test

pull/11540/head
Filip Jagodzinski 2019-09-16 13:39:22 +02:00
parent d4ef5f72e7
commit 3c37831521
1 changed files with 98 additions and 217 deletions

View File

@ -160,251 +160,132 @@ def format_local_error_msg(fmt):
class PyusbBasicTest(BaseHostTest):
def test_usb_device(self, usb_dev_serial_number, test_fun, **test_fun_kwargs):
"""Find a USB device and execute a testing function.
Search is based on usb_dev_serial_number. If the device is found, the
test_fun is executed with its dev argument set to the device found and
all other kwargs set as specified by test_fun_kwargs.
The DUT is notified with either success, failure or error status.
"""
usb_device = self.find_device(usb_dev_serial_number)
if usb_device is None:
self.notify_error('USB device (SN={}) not found.'.format(usb_dev_serial_number))
return
try:
test_fun(usb_device, **test_fun_kwargs)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
def _callback_control_basic_test(self, key, value, timestamp):
serial_number, vendor_id, product_id = value.split(' ')
self.log("Received serial %s" % (serial_number))
self.log("Received vendor_id %s" % (vendor_id))
self.log("Received product_id %s" % (product_id))
dev = self.find_device(serial_number)
if dev is None:
self.notify_error('USB device (SN={}) not found.'.format(serial_number))
return
try:
control_basic_test(dev, int(vendor_id), int(product_id), log=print)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
self.test_usb_device(
usb_dev_serial_number=serial_number,
test_fun=control_basic_test,
log=print,
vendor_id=int(vendor_id),
product_id=int(product_id)
)
def _callback_control_stall_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if dev is None:
self.notify_error('USB device (SN={}) not found.'.format(value))
return
try:
control_stall_test(dev, log=print)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=control_stall_test,
log=print
)
def _callback_control_sizes_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if dev is None:
self.notify_error('USB device (SN={}) not found.'.format(value))
return
try:
control_sizes_test(dev, log=print)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=control_sizes_test,
log=print
)
def _callback_control_stress_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if dev is None:
self.notify_error('USB device (SN={}) not found.'.format(value))
return
try:
control_stress_test(dev, log=print)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=control_stress_test,
log=print
)
def _callback_device_reset_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if dev is None:
self.notify_error('USB device (SN={}) not found.'.format(value))
return
try:
self.device_reset_test.send(dev)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
self.test_usb_device(
usb_dev_serial_number=value,
# Advance the coroutine to the next yield statement
# and send the usb_device to use.
test_fun=self.device_reset_test.send
)
def _callback_device_soft_reconnection_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if dev is None:
self.notify_error('USB device (SN={}) not found.'.format(value))
return
try:
self.device_soft_reconnection_test.send(dev)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
self.test_usb_device(
usb_dev_serial_number=value,
# Advance the coroutine to the next yield statement
# and send the usb_device to use.
test_fun=self.device_soft_reconnection_test.send
)
def _callback_device_suspend_resume_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if dev is None:
self.notify_error('USB device (SN={}) not found.'.format(value))
return
try:
self.device_suspend_resume_test.send(dev)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
self.test_usb_device(
usb_dev_serial_number=value,
# Advance the coroutine to the next yield statement
# and send the usb_device to use.
test_fun=self.device_suspend_resume_test.send
)
def _callback_repeated_construction_destruction_test(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if dev is None:
self.notify_error('USB device (SN={}) not found.'.format(value))
return
try:
self.repeated_construction_destruction_test.send(dev)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
self.test_usb_device(
usb_dev_serial_number=value,
# Advance the coroutine to the next yield statement
# and send the usb_device to use.
test_fun=self.repeated_construction_destruction_test.send
)
def _callback_ep_test_data_correctness(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if dev is None:
self.notify_error('USB device (SN={}) not found.'.format(value))
return
try:
ep_test_data_correctness(dev, log=print)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=ep_test_data_correctness,
log=print
)
def _callback_ep_test_halt(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if dev is None:
self.notify_error('USB device (SN={}) not found.'.format(value))
return
try:
ep_test_halt(dev, log=print)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=ep_test_halt,
log=print
)
def _callback_ep_test_parallel_transfers(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if dev is None:
self.notify_error('USB device (SN={}) not found.'.format(value))
return
try:
ep_test_parallel_transfers(dev, log=print)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=ep_test_parallel_transfers,
log=print
)
def _callback_ep_test_parallel_transfers_ctrl(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if dev is None:
self.notify_error('USB device (SN={}) not found.'.format(value))
return
try:
ep_test_parallel_transfers_ctrl(dev, log=print)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=ep_test_parallel_transfers_ctrl,
log=print
)
def _callback_ep_test_abort(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if dev is None:
self.notify_error('USB device (SN={}) not found.'.format(value))
return
try:
ep_test_abort(dev, log=print)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=ep_test_abort,
log=print
)
def _callback_ep_test_data_toggle(self, key, value, timestamp):
self.log("Received serial %s" % (value))
dev = self.find_device(value)
if dev is None:
self.notify_error('USB device (SN={}) not found.'.format(value))
return
try:
ep_test_data_toggle(dev, log=print)
self.notify_success()
except RuntimeError as exc:
self.notify_failure(exc)
except usb.core.USBError as exc:
error_msg = format_local_error_msg('[{filename}]:{line_number}, Dev-host transfer error ({exc_value}).')
self.notify_failure(error_msg if error_msg is not None else exc)
self.test_usb_device(
usb_dev_serial_number=value,
test_fun=ep_test_data_toggle,
log=print
)
def _callback_reset_support(self, key, value, timestamp):
status = "false" if sys.platform == "darwin" else "true"