Streamline modbus test_init (#50990)

* Streamline test_init.

* Review comments.

* Remove hub name.
pull/51031/head
jan iversen 2021-05-24 14:53:54 +02:00 committed by GitHub
parent 1546dbbf25
commit c74e65ac2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 120 additions and 236 deletions

View File

@ -158,8 +158,7 @@ class ModbusFan(BasePlatform, FanEntity, RestoreEntity):
self._is_on = False
elif value is not None:
_LOGGER.error(
"Unexpected response from hub %s, slave %s register %s, got 0x%2x",
self._hub.name,
"Unexpected response from modbus device slave %s register %s, got 0x%2x",
self._slave,
self._verify_address,
value,

View File

@ -149,8 +149,7 @@ class ModbusLight(BasePlatform, LightEntity, RestoreEntity):
self._is_on = False
elif value is not None:
_LOGGER.error(
"Unexpected response from hub %s, slave %s register %s, got 0x%2x",
self._hub.name,
"Unexpected response from modbus device slave %s register %s, got 0x%2x",
self._slave,
self._verify_address,
value,

View File

@ -195,11 +195,6 @@ class ModbusHub:
},
}
@property
def name(self):
"""Return the name of this hub."""
return self._config_name
def _log_error(self, exception_error: ModbusException, error_state=True):
log_text = "Pymodbus: " + str(exception_error)
if self._in_error:

View File

@ -147,8 +147,7 @@ class ModbusSwitch(BasePlatform, SwitchEntity, RestoreEntity):
self._is_on = False
elif value is not None:
_LOGGER.error(
"Unexpected response from hub %s, slave %s register %s, got 0x%2x",
self._hub.name,
"Unexpected response from modbus device slave %s register %s, got 0x%2x",
self._slave,
self._verify_address,
value,

View File

@ -1,4 +1,14 @@
"""The tests for the Modbus init."""
"""The tests for the Modbus init.
This file is responsible for testing:
- pymodbus API
- Functionality of class ModbusHub
- Coverage 100%:
__init__.py
base_platform.py
const.py
modbus.py
"""
from datetime import timedelta
import logging
from unittest import mock
@ -20,6 +30,10 @@ from homeassistant.components.modbus.const import (
CALL_TYPE_DISCRETE,
CALL_TYPE_REGISTER_HOLDING,
CALL_TYPE_REGISTER_INPUT,
CALL_TYPE_WRITE_COIL,
CALL_TYPE_WRITE_COILS,
CALL_TYPE_WRITE_REGISTER,
CALL_TYPE_WRITE_REGISTERS,
CONF_BAUDRATE,
CONF_BYTESIZE,
CONF_INPUT_TYPE,
@ -54,11 +68,14 @@ from .conftest import TEST_MODBUS_NAME, ReadResult
from tests.common import async_fire_time_changed
TEST_SENSOR_NAME = "testSensor"
TEST_ENTITY_ID = f"{SENSOR_DOMAIN}.{TEST_SENSOR_NAME}"
TEST_HOST = "modbusTestHost"
@pytest.mark.parametrize(
"value,value_type",
[
async def test_number_validator():
"""Test number validator."""
for value, value_type in [
(15, int),
(15.1, float),
("15", int),
@ -67,48 +84,27 @@ TEST_SENSOR_NAME = "testSensor"
(-15.1, float),
("-15", int),
("-15.1", float),
],
)
async def test_number_validator(value, value_type):
"""Test number validator."""
assert isinstance(number(value), value_type)
async def test_number_exception():
"""Test number exception."""
]:
assert isinstance(number(value), value_type)
try:
number("x15.1")
except (vol.Invalid):
return
pytest.fail("Number not throwing exception")
async def _config_helper(hass, do_config, caplog):
"""Run test for modbus."""
config = {DOMAIN: do_config}
caplog.set_level(logging.ERROR)
assert await async_setup_component(hass, DOMAIN, config) is True
await hass.async_block_till_done()
assert DOMAIN in hass.config.components
assert len(caplog.records) == 0
@pytest.mark.parametrize(
"do_config",
[
{
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_HOST: TEST_HOST,
CONF_PORT: 5501,
},
{
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_HOST: TEST_HOST,
CONF_PORT: 5501,
CONF_NAME: TEST_MODBUS_NAME,
CONF_TIMEOUT: 30,
@ -116,12 +112,12 @@ async def _config_helper(hass, do_config, caplog):
},
{
CONF_TYPE: "udp",
CONF_HOST: "modbusTestHost",
CONF_HOST: TEST_HOST,
CONF_PORT: 5501,
},
{
CONF_TYPE: "udp",
CONF_HOST: "modbusTestHost",
CONF_HOST: TEST_HOST,
CONF_PORT: 5501,
CONF_NAME: TEST_MODBUS_NAME,
CONF_TIMEOUT: 30,
@ -129,12 +125,12 @@ async def _config_helper(hass, do_config, caplog):
},
{
CONF_TYPE: "rtuovertcp",
CONF_HOST: "modbusTestHost",
CONF_HOST: TEST_HOST,
CONF_PORT: 5501,
},
{
CONF_TYPE: "rtuovertcp",
CONF_HOST: "modbusTestHost",
CONF_HOST: TEST_HOST,
CONF_PORT: 5501,
CONF_NAME: TEST_MODBUS_NAME,
CONF_TIMEOUT: 30,
@ -163,176 +159,116 @@ async def _config_helper(hass, do_config, caplog):
},
{
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_HOST: TEST_HOST,
CONF_PORT: 5501,
CONF_DELAY: 5,
},
[
{
CONF_TYPE: "tcp",
CONF_HOST: TEST_HOST,
CONF_PORT: 5501,
CONF_NAME: TEST_MODBUS_NAME,
},
{
CONF_TYPE: "tcp",
CONF_HOST: TEST_HOST,
CONF_PORT: 5501,
CONF_NAME: TEST_MODBUS_NAME + "2",
},
{
CONF_TYPE: "serial",
CONF_BAUDRATE: 9600,
CONF_BYTESIZE: 8,
CONF_METHOD: "rtu",
CONF_PORT: "usb01",
CONF_PARITY: "E",
CONF_STOPBITS: 1,
CONF_NAME: TEST_MODBUS_NAME + "3",
},
],
],
)
async def test_config_modbus(hass, caplog, do_config, mock_pymodbus):
"""Run test for modbus."""
await _config_helper(hass, do_config, caplog)
"""Run configuration test for modbus."""
config = {DOMAIN: do_config}
caplog.set_level(logging.ERROR)
assert await async_setup_component(hass, DOMAIN, config) is True
await hass.async_block_till_done()
assert DOMAIN in hass.config.components
assert len(caplog.records) == 0
async def test_config_multiple_modbus(hass, caplog, mock_pymodbus):
"""Run test for multiple modbus."""
do_config = [
VALUE = "value"
FUNC = "func"
DATA = "data"
SERVICE = "service"
@pytest.mark.parametrize(
"do_write",
[
{
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_PORT: 5501,
CONF_NAME: TEST_MODBUS_NAME,
DATA: ATTR_VALUE,
VALUE: 15,
SERVICE: SERVICE_WRITE_REGISTER,
FUNC: CALL_TYPE_WRITE_REGISTER,
},
{
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_PORT: 5501,
CONF_NAME: TEST_MODBUS_NAME + "2",
DATA: ATTR_VALUE,
VALUE: [1, 2, 3],
SERVICE: SERVICE_WRITE_REGISTER,
FUNC: CALL_TYPE_WRITE_REGISTERS,
},
{
CONF_TYPE: "serial",
CONF_BAUDRATE: 9600,
CONF_BYTESIZE: 8,
CONF_METHOD: "rtu",
CONF_PORT: "usb01",
CONF_PARITY: "E",
CONF_STOPBITS: 1,
CONF_NAME: TEST_MODBUS_NAME + "3",
DATA: ATTR_STATE,
VALUE: False,
SERVICE: SERVICE_WRITE_COIL,
FUNC: CALL_TYPE_WRITE_COIL,
},
]
await _config_helper(hass, do_config, caplog)
async def test_pb_service_write_register(hass, caplog, mock_modbus):
{
DATA: ATTR_STATE,
VALUE: [True, False, True],
SERVICE: SERVICE_WRITE_COIL,
FUNC: CALL_TYPE_WRITE_COILS,
},
],
)
async def test_pb_service_write(hass, do_write, caplog, mock_modbus):
"""Run test for service write_register."""
# Pymodbus write single, response OK.
data = {ATTR_HUB: TEST_MODBUS_NAME, ATTR_UNIT: 17, ATTR_ADDRESS: 16, ATTR_VALUE: 15}
await hass.services.async_call(DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True)
assert mock_modbus.write_register.called
assert mock_modbus.write_register.call_args[0] == (
data[ATTR_ADDRESS],
data[ATTR_VALUE],
)
mock_modbus.reset_mock()
func_name = {
CALL_TYPE_WRITE_COIL: mock_modbus.write_coil,
CALL_TYPE_WRITE_COILS: mock_modbus.write_coils,
CALL_TYPE_WRITE_REGISTER: mock_modbus.write_register,
CALL_TYPE_WRITE_REGISTERS: mock_modbus.write_registers,
}
# Pymodbus write single, response error or exception
caplog.set_level(logging.DEBUG)
mock_modbus.write_register.return_value = ExceptionResponse(0x06)
await hass.services.async_call(DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True)
assert mock_modbus.write_register.called
assert caplog.messages[-1].startswith("Pymodbus:")
mock_modbus.reset_mock()
mock_modbus.write_register.return_value = IllegalFunctionRequest(0x06)
await hass.services.async_call(DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True)
assert mock_modbus.write_register.called
assert caplog.messages[-1].startswith("Pymodbus:")
mock_modbus.reset_mock()
mock_modbus.write_register.side_effect = ModbusException("fail write_")
await hass.services.async_call(DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True)
assert mock_modbus.write_register.called
assert caplog.messages[-1].startswith("Pymodbus:")
mock_modbus.reset_mock()
# Pymodbus write multiple, response OK.
data[ATTR_VALUE] = [1, 2, 3]
await hass.services.async_call(DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True)
assert mock_modbus.write_registers.called
assert mock_modbus.write_registers.call_args[0] == (
data[ATTR_ADDRESS],
data[ATTR_VALUE],
)
mock_modbus.reset_mock()
# Pymodbus write multiple, response error or exception
mock_modbus.write_registers.return_value = ExceptionResponse(0x06)
await hass.services.async_call(DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True)
assert mock_modbus.write_registers.called
assert caplog.messages[-1].startswith("Pymodbus:")
mock_modbus.reset_mock()
mock_modbus.write_registers.return_value = IllegalFunctionRequest(0x06)
await hass.services.async_call(DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True)
assert mock_modbus.write_registers.called
assert caplog.messages[-1].startswith("Pymodbus:")
mock_modbus.reset_mock()
mock_modbus.write_registers.side_effect = ModbusException("fail write_")
await hass.services.async_call(DOMAIN, SERVICE_WRITE_REGISTER, data, blocking=True)
assert mock_modbus.write_registers.called
assert caplog.messages[-1].startswith("Pymodbus:")
mock_modbus.reset_mock()
async def test_pb_service_write_coil(hass, caplog, mock_modbus):
"""Run test for service write_coil."""
# Pymodbus write single, response OK.
data = {
ATTR_HUB: TEST_MODBUS_NAME,
ATTR_UNIT: 17,
ATTR_ADDRESS: 16,
ATTR_STATE: False,
do_write[DATA]: do_write[VALUE],
}
await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True)
assert mock_modbus.write_coil.called
assert mock_modbus.write_coil.call_args[0] == (
await hass.services.async_call(DOMAIN, do_write[SERVICE], data, blocking=True)
assert func_name[do_write[FUNC]].called
assert func_name[do_write[FUNC]].call_args[0] == (
data[ATTR_ADDRESS],
data[ATTR_STATE],
data[do_write[DATA]],
)
mock_modbus.reset_mock()
# Pymodbus write single, response error or exception
caplog.set_level(logging.DEBUG)
mock_modbus.write_coil.return_value = ExceptionResponse(0x06)
await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True)
assert mock_modbus.write_coil.called
assert caplog.messages[-1].startswith("Pymodbus:")
mock_modbus.reset_mock()
mock_modbus.write_coil.return_value = IllegalFunctionRequest(0x06)
await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True)
assert mock_modbus.write_coil.called
assert caplog.messages[-1].startswith("Pymodbus:")
mock_modbus.reset_mock()
mock_modbus.write_coil.side_effect = ModbusException("fail write_")
await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True)
assert mock_modbus.write_coil.called
assert caplog.messages[-1].startswith("Pymodbus:")
mock_modbus.reset_mock()
# Pymodbus write multiple, response OK.
data[ATTR_STATE] = [True, False, True]
await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True)
assert mock_modbus.write_coils.called
assert mock_modbus.write_coils.call_args[0] == (
data[ATTR_ADDRESS],
data[ATTR_STATE],
)
mock_modbus.reset_mock()
# Pymodbus write multiple, response error or exception
mock_modbus.write_coils.return_value = ExceptionResponse(0x06)
await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True)
assert mock_modbus.write_coils.called
assert caplog.messages[-1].startswith("Pymodbus:")
mock_modbus.reset_mock()
mock_modbus.write_coils.return_value = IllegalFunctionRequest(0x06)
await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True)
assert mock_modbus.write_coils.called
assert caplog.messages[-1].startswith("Pymodbus:")
mock_modbus.reset_mock()
mock_modbus.write_coils.side_effect = ModbusException("fail write_")
await hass.services.async_call(DOMAIN, SERVICE_WRITE_COIL, data, blocking=True)
assert mock_modbus.write_coils.called
assert caplog.messages[-1].startswith("Pymodbus:")
mock_modbus.reset_mock()
for return_value in [
ExceptionResponse(0x06),
IllegalFunctionRequest(0x06),
ModbusException("fail write_"),
]:
caplog.set_level(logging.DEBUG)
func_name[do_write[FUNC]].return_value = return_value
await hass.services.async_call(DOMAIN, do_write[SERVICE], data, blocking=True)
assert func_name[do_write[FUNC]].called
assert caplog.messages[-1].startswith("Pymodbus:")
mock_modbus.reset_mock()
async def _read_helper(hass, do_group, do_type, do_return, do_exception, mock_pymodbus):
@ -340,7 +276,7 @@ async def _read_helper(hass, do_group, do_type, do_return, do_exception, mock_py
DOMAIN: [
{
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_HOST: TEST_HOST,
CONF_PORT: 5501,
CONF_NAME: TEST_MODBUS_NAME,
do_group: [
@ -442,7 +378,7 @@ async def test_pymodbus_constructor_fail(hass, caplog):
DOMAIN: [
{
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_HOST: TEST_HOST,
CONF_PORT: 5501,
}
]
@ -465,7 +401,7 @@ async def test_pymodbus_connect_fail(hass, caplog, mock_pymodbus):
DOMAIN: [
{
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_HOST: TEST_HOST,
CONF_PORT: 5501,
}
]
@ -491,7 +427,7 @@ async def test_delay(hass, mock_pymodbus):
DOMAIN: [
{
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_HOST: TEST_HOST,
CONF_PORT: 5501,
CONF_NAME: TEST_MODBUS_NAME,
CONF_DELAY: test_delay,
@ -533,47 +469,3 @@ async def test_delay(hass, mock_pymodbus):
async_fire_time_changed(hass, now)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_ON
async def test_thread_lock(hass, mock_pymodbus):
"""Run test for block of threads."""
# the purpose of this test is to test the threads are not being blocked
# We "hijiack" a binary_sensor to make a proper blackbox test.
test_scan_interval = 5
sensors = []
for i in range(200):
sensors.append(
{
CONF_INPUT_TYPE: CALL_TYPE_COIL,
CONF_NAME: f"{TEST_SENSOR_NAME}_{i}",
CONF_ADDRESS: 52 + i,
CONF_SCAN_INTERVAL: test_scan_interval,
}
)
config = {
DOMAIN: [
{
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_PORT: 5501,
CONF_NAME: TEST_MODBUS_NAME,
CONF_BINARY_SENSORS: sensors,
}
]
}
mock_pymodbus.read_coils.return_value = ReadResult([0x01])
now = dt_util.utcnow()
with mock.patch("homeassistant.helpers.event.dt_util.utcnow", return_value=now):
assert await async_setup_component(hass, DOMAIN, config) is True
await hass.async_block_till_done()
stop_time = now + timedelta(seconds=10)
step_timedelta = timedelta(seconds=1)
while now < stop_time:
now = now + step_timedelta
with mock.patch("homeassistant.helpers.event.dt_util.utcnow", return_value=now):
async_fire_time_changed(hass, now)
await hass.async_block_till_done()
for i in range(200):
entity_id = f"{BINARY_SENSOR_DOMAIN}.{TEST_SENSOR_NAME}_{i}"
assert hass.states.get(entity_id).state == STATE_ON