Move pymodbus test fixtures to test_init (#51244)
parent
783e545a67
commit
10dccc6734
|
@ -57,6 +57,7 @@ async def mock_modbus(hass, mock_pymodbus):
|
|||
yield mock_pymodbus
|
||||
|
||||
|
||||
# dataclass
|
||||
class ReadResult:
|
||||
"""Storage class for register read results."""
|
||||
|
||||
|
|
|
@ -5,9 +5,12 @@ This file is responsible for testing:
|
|||
- Functionality of class ModbusHub
|
||||
- Coverage 100%:
|
||||
__init__.py
|
||||
base_platform.py
|
||||
const.py
|
||||
modbus.py
|
||||
validators.py
|
||||
baseplatform.py (only BasePlatform)
|
||||
|
||||
It uses binary_sensors/sensors to do black box testing of the read calls.
|
||||
"""
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
|
@ -35,19 +38,31 @@ from homeassistant.components.modbus.const import (
|
|||
CALL_TYPE_WRITE_REGISTERS,
|
||||
CONF_BAUDRATE,
|
||||
CONF_BYTESIZE,
|
||||
CONF_DATA_TYPE,
|
||||
CONF_INPUT_TYPE,
|
||||
CONF_PARITY,
|
||||
CONF_REVERSE_ORDER,
|
||||
CONF_STOPBITS,
|
||||
CONF_SWAP,
|
||||
CONF_SWAP_BYTE,
|
||||
CONF_SWAP_WORD,
|
||||
DATA_TYPE_CUSTOM,
|
||||
DATA_TYPE_INT,
|
||||
DATA_TYPE_STRING,
|
||||
DEFAULT_SCAN_INTERVAL,
|
||||
MODBUS_DOMAIN as DOMAIN,
|
||||
SERVICE_WRITE_COIL,
|
||||
SERVICE_WRITE_REGISTER,
|
||||
)
|
||||
from homeassistant.components.modbus.validators import number_validator
|
||||
from homeassistant.components.modbus.validators import (
|
||||
number_validator,
|
||||
sensor_schema_validator,
|
||||
)
|
||||
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
|
||||
from homeassistant.const import (
|
||||
CONF_ADDRESS,
|
||||
CONF_BINARY_SENSORS,
|
||||
CONF_COUNT,
|
||||
CONF_DELAY,
|
||||
CONF_HOST,
|
||||
CONF_METHOD,
|
||||
|
@ -55,6 +70,7 @@ from homeassistant.const import (
|
|||
CONF_PORT,
|
||||
CONF_SCAN_INTERVAL,
|
||||
CONF_SENSORS,
|
||||
CONF_STRUCTURE,
|
||||
CONF_TIMEOUT,
|
||||
CONF_TYPE,
|
||||
STATE_ON,
|
||||
|
@ -63,13 +79,27 @@ from homeassistant.const import (
|
|||
from homeassistant.setup import async_setup_component
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from .conftest import TEST_MODBUS_NAME, ReadResult
|
||||
from .conftest import ReadResult
|
||||
|
||||
from tests.common import async_fire_time_changed
|
||||
|
||||
TEST_SENSOR_NAME = "testSensor"
|
||||
TEST_ENTITY_ID = f"{SENSOR_DOMAIN}.{TEST_SENSOR_NAME}"
|
||||
TEST_HOST = "modbusTestHost"
|
||||
TEST_MODBUS_NAME = "modbusTest"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def mock_modbus_with_pymodbus(hass, caplog, do_config, mock_pymodbus):
|
||||
"""Load integration modbus using mocked pymodbus."""
|
||||
caplog.clear()
|
||||
caplog.set_level(logging.ERROR)
|
||||
config = {DOMAIN: do_config}
|
||||
assert await async_setup_component(hass, DOMAIN, config) is True
|
||||
await hass.async_block_till_done()
|
||||
assert DOMAIN in hass.config.components
|
||||
assert caplog.text == ""
|
||||
yield mock_pymodbus
|
||||
|
||||
|
||||
async def test_number_validator():
|
||||
|
@ -94,6 +124,84 @@ async def test_number_validator():
|
|||
pytest.fail("Number_validator not throwing exception")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"do_config",
|
||||
[
|
||||
{
|
||||
CONF_NAME: TEST_SENSOR_NAME,
|
||||
CONF_COUNT: 2,
|
||||
CONF_DATA_TYPE: DATA_TYPE_STRING,
|
||||
},
|
||||
{
|
||||
CONF_NAME: TEST_SENSOR_NAME,
|
||||
CONF_COUNT: 2,
|
||||
CONF_DATA_TYPE: DATA_TYPE_INT,
|
||||
CONF_REVERSE_ORDER: True,
|
||||
},
|
||||
{
|
||||
CONF_NAME: TEST_SENSOR_NAME,
|
||||
CONF_COUNT: 2,
|
||||
CONF_DATA_TYPE: DATA_TYPE_INT,
|
||||
CONF_REVERSE_ORDER: False,
|
||||
},
|
||||
{
|
||||
CONF_NAME: TEST_SENSOR_NAME,
|
||||
CONF_COUNT: 2,
|
||||
CONF_DATA_TYPE: DATA_TYPE_INT,
|
||||
CONF_SWAP: CONF_SWAP_BYTE,
|
||||
},
|
||||
],
|
||||
)
|
||||
async def test_ok_sensor_schema_validator(do_config):
|
||||
"""Test struct validator."""
|
||||
try:
|
||||
sensor_schema_validator(do_config)
|
||||
except vol.Invalid:
|
||||
pytest.fail("Sensor_schema_validator unexpected exception")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"do_config",
|
||||
[
|
||||
{
|
||||
CONF_NAME: TEST_SENSOR_NAME,
|
||||
CONF_COUNT: 8,
|
||||
CONF_DATA_TYPE: DATA_TYPE_INT,
|
||||
},
|
||||
{
|
||||
CONF_NAME: TEST_SENSOR_NAME,
|
||||
CONF_COUNT: 8,
|
||||
CONF_DATA_TYPE: DATA_TYPE_CUSTOM,
|
||||
},
|
||||
{
|
||||
CONF_NAME: TEST_SENSOR_NAME,
|
||||
CONF_COUNT: 8,
|
||||
CONF_DATA_TYPE: DATA_TYPE_CUSTOM,
|
||||
CONF_STRUCTURE: "no good",
|
||||
},
|
||||
{
|
||||
CONF_NAME: TEST_SENSOR_NAME,
|
||||
CONF_COUNT: 20,
|
||||
CONF_DATA_TYPE: DATA_TYPE_CUSTOM,
|
||||
CONF_STRUCTURE: ">f",
|
||||
},
|
||||
{
|
||||
CONF_NAME: TEST_SENSOR_NAME,
|
||||
CONF_COUNT: 1,
|
||||
CONF_DATA_TYPE: DATA_TYPE_INT,
|
||||
CONF_SWAP: CONF_SWAP_WORD,
|
||||
},
|
||||
],
|
||||
)
|
||||
async def test_exception_sensor_schema_validator(do_config):
|
||||
"""Test struct validator."""
|
||||
try:
|
||||
sensor_schema_validator(do_config)
|
||||
except vol.Invalid:
|
||||
return
|
||||
pytest.fail("Sensor_schema_validator missing exception")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"do_config",
|
||||
[
|
||||
|
@ -187,16 +295,23 @@ async def test_number_validator():
|
|||
CONF_NAME: TEST_MODBUS_NAME + "3",
|
||||
},
|
||||
],
|
||||
{
|
||||
# Special test for scan_interval validator with scan_interval: 0
|
||||
CONF_TYPE: "tcp",
|
||||
CONF_HOST: TEST_HOST,
|
||||
CONF_PORT: 5501,
|
||||
CONF_SENSORS: [
|
||||
{
|
||||
CONF_NAME: TEST_SENSOR_NAME,
|
||||
CONF_ADDRESS: 117,
|
||||
CONF_SCAN_INTERVAL: 0,
|
||||
}
|
||||
],
|
||||
},
|
||||
],
|
||||
)
|
||||
async def test_config_modbus(hass, caplog, do_config, mock_pymodbus):
|
||||
async def test_config_modbus(hass, caplog, mock_modbus_with_pymodbus):
|
||||
"""Run configuration test for modbus."""
|
||||
config = {DOMAIN: do_config}
|
||||
caplog.set_level(logging.ERROR)
|
||||
assert await async_setup_component(hass, DOMAIN, config) is True
|
||||
await hass.async_block_till_done()
|
||||
assert DOMAIN in hass.config.components
|
||||
assert len(caplog.records) == 0
|
||||
|
||||
|
||||
VALUE = "value"
|
||||
|
@ -205,6 +320,17 @@ DATA = "data"
|
|||
SERVICE = "service"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"do_config",
|
||||
[
|
||||
{
|
||||
CONF_NAME: TEST_MODBUS_NAME,
|
||||
CONF_TYPE: "tcp",
|
||||
CONF_HOST: TEST_HOST,
|
||||
CONF_PORT: 5501,
|
||||
},
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"do_write",
|
||||
[
|
||||
|
@ -234,14 +360,25 @@ SERVICE = "service"
|
|||
},
|
||||
],
|
||||
)
|
||||
async def test_pb_service_write(hass, do_write, caplog, mock_modbus):
|
||||
@pytest.mark.parametrize(
|
||||
"do_return",
|
||||
[
|
||||
{VALUE: ReadResult([0x0001]), DATA: ""},
|
||||
{VALUE: ExceptionResponse(0x06), DATA: "Pymodbus:"},
|
||||
{VALUE: IllegalFunctionRequest(0x06), DATA: "Pymodbus:"},
|
||||
{VALUE: ModbusException("fail write_"), DATA: "Pymodbus:"},
|
||||
],
|
||||
)
|
||||
async def test_pb_service_write(
|
||||
hass, do_write, do_return, caplog, mock_modbus_with_pymodbus
|
||||
):
|
||||
"""Run test for service write_register."""
|
||||
|
||||
func_name = {
|
||||
CALL_TYPE_WRITE_COIL: mock_modbus.write_coil,
|
||||
CALL_TYPE_WRITE_COILS: mock_modbus.write_coils,
|
||||
CALL_TYPE_WRITE_REGISTER: mock_modbus.write_register,
|
||||
CALL_TYPE_WRITE_REGISTERS: mock_modbus.write_registers,
|
||||
CALL_TYPE_WRITE_COIL: mock_modbus_with_pymodbus.write_coil,
|
||||
CALL_TYPE_WRITE_COILS: mock_modbus_with_pymodbus.write_coils,
|
||||
CALL_TYPE_WRITE_REGISTER: mock_modbus_with_pymodbus.write_register,
|
||||
CALL_TYPE_WRITE_REGISTERS: mock_modbus_with_pymodbus.write_registers,
|
||||
}
|
||||
|
||||
data = {
|
||||
|
@ -250,28 +387,42 @@ async def test_pb_service_write(hass, do_write, caplog, mock_modbus):
|
|||
ATTR_ADDRESS: 16,
|
||||
do_write[DATA]: do_write[VALUE],
|
||||
}
|
||||
mock_modbus_with_pymodbus.reset_mock()
|
||||
caplog.clear()
|
||||
caplog.set_level(logging.DEBUG)
|
||||
func_name[do_write[FUNC]].return_value = do_return[VALUE]
|
||||
await hass.services.async_call(DOMAIN, do_write[SERVICE], data, blocking=True)
|
||||
assert func_name[do_write[FUNC]].called
|
||||
assert func_name[do_write[FUNC]].call_args[0] == (
|
||||
data[ATTR_ADDRESS],
|
||||
data[do_write[DATA]],
|
||||
)
|
||||
mock_modbus.reset_mock()
|
||||
|
||||
for return_value in [
|
||||
ExceptionResponse(0x06),
|
||||
IllegalFunctionRequest(0x06),
|
||||
ModbusException("fail write_"),
|
||||
]:
|
||||
caplog.set_level(logging.DEBUG)
|
||||
func_name[do_write[FUNC]].return_value = return_value
|
||||
await hass.services.async_call(DOMAIN, do_write[SERVICE], data, blocking=True)
|
||||
assert func_name[do_write[FUNC]].called
|
||||
if do_return[DATA]:
|
||||
assert caplog.messages[-1].startswith("Pymodbus:")
|
||||
mock_modbus.reset_mock()
|
||||
|
||||
|
||||
async def _read_helper(hass, do_group, do_type, do_return, do_exception, mock_pymodbus):
|
||||
@pytest.fixture
|
||||
async def mock_modbus_read_pymodbus(
|
||||
hass,
|
||||
do_group,
|
||||
do_type,
|
||||
do_scan_interval,
|
||||
do_return,
|
||||
do_exception,
|
||||
caplog,
|
||||
mock_pymodbus,
|
||||
):
|
||||
"""Load integration modbus using mocked pymodbus."""
|
||||
caplog.clear()
|
||||
caplog.set_level(logging.ERROR)
|
||||
mock_pymodbus.read_coils.side_effect = do_exception
|
||||
mock_pymodbus.read_discrete_inputs.side_effect = do_exception
|
||||
mock_pymodbus.read_input_registers.side_effect = do_exception
|
||||
mock_pymodbus.read_holding_registers.side_effect = do_exception
|
||||
mock_pymodbus.read_coils.return_value = do_return
|
||||
mock_pymodbus.read_discrete_inputs.return_value = do_return
|
||||
mock_pymodbus.read_input_registers.return_value = do_return
|
||||
mock_pymodbus.read_holding_registers.return_value = do_return
|
||||
config = {
|
||||
DOMAIN: [
|
||||
{
|
||||
|
@ -284,91 +435,63 @@ async def _read_helper(hass, do_group, do_type, do_return, do_exception, mock_py
|
|||
CONF_INPUT_TYPE: do_type,
|
||||
CONF_NAME: TEST_SENSOR_NAME,
|
||||
CONF_ADDRESS: 51,
|
||||
CONF_SCAN_INTERVAL: 1,
|
||||
CONF_SCAN_INTERVAL: do_scan_interval,
|
||||
}
|
||||
],
|
||||
}
|
||||
]
|
||||
],
|
||||
}
|
||||
mock_pymodbus.read_coils.side_effect = do_exception
|
||||
mock_pymodbus.read_discrete_inputs.side_effect = do_exception
|
||||
mock_pymodbus.read_input_registers.side_effect = do_exception
|
||||
mock_pymodbus.read_holding_registers.side_effect = do_exception
|
||||
mock_pymodbus.read_coils.return_value = do_return
|
||||
mock_pymodbus.read_discrete_inputs.return_value = do_return
|
||||
mock_pymodbus.read_input_registers.return_value = do_return
|
||||
mock_pymodbus.read_holding_registers.return_value = do_return
|
||||
now = dt_util.utcnow()
|
||||
with mock.patch("homeassistant.helpers.event.dt_util.utcnow", return_value=now):
|
||||
assert await async_setup_component(hass, DOMAIN, config) is True
|
||||
await hass.async_block_till_done()
|
||||
assert DOMAIN in hass.config.components
|
||||
assert caplog.text == ""
|
||||
now = now + timedelta(seconds=DEFAULT_SCAN_INTERVAL + 60)
|
||||
with mock.patch("homeassistant.helpers.event.dt_util.utcnow", return_value=now):
|
||||
async_fire_time_changed(hass, now)
|
||||
await hass.async_block_till_done()
|
||||
yield mock_pymodbus
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"do_return,do_exception,do_expect",
|
||||
"do_domain, do_group,do_type,do_scan_interval",
|
||||
[
|
||||
[ReadResult([7]), None, "7"],
|
||||
[IllegalFunctionRequest(0x99), None, STATE_UNAVAILABLE],
|
||||
[ExceptionResponse(0x99), None, STATE_UNAVAILABLE],
|
||||
[ReadResult([7]), ModbusException("fail read_"), STATE_UNAVAILABLE],
|
||||
[SENSOR_DOMAIN, CONF_SENSORS, CALL_TYPE_REGISTER_HOLDING, 1],
|
||||
[SENSOR_DOMAIN, CONF_SENSORS, CALL_TYPE_REGISTER_INPUT, 1],
|
||||
[BINARY_SENSOR_DOMAIN, CONF_BINARY_SENSORS, CALL_TYPE_DISCRETE, 1],
|
||||
[BINARY_SENSOR_DOMAIN, CONF_BINARY_SENSORS, CALL_TYPE_COIL, 1],
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"do_type",
|
||||
[CALL_TYPE_REGISTER_HOLDING, CALL_TYPE_REGISTER_INPUT],
|
||||
"do_return,do_exception,do_expect_state,do_expect_value",
|
||||
[
|
||||
[ReadResult([1]), None, STATE_ON, "1"],
|
||||
[IllegalFunctionRequest(0x99), None, STATE_UNAVAILABLE, STATE_UNAVAILABLE],
|
||||
[ExceptionResponse(0x99), None, STATE_UNAVAILABLE, STATE_UNAVAILABLE],
|
||||
[
|
||||
ReadResult([1]),
|
||||
ModbusException("fail read_"),
|
||||
STATE_UNAVAILABLE,
|
||||
STATE_UNAVAILABLE,
|
||||
],
|
||||
],
|
||||
)
|
||||
async def test_pb_read_value(
|
||||
hass, caplog, do_type, do_return, do_exception, do_expect, mock_pymodbus
|
||||
async def test_pb_read(
|
||||
hass, do_domain, do_expect_state, do_expect_value, caplog, mock_modbus_read_pymodbus
|
||||
):
|
||||
"""Run test for different read."""
|
||||
|
||||
# the purpose of this test is to test the special
|
||||
# return values from pymodbus:
|
||||
# ExceptionResponse, IllegalResponse
|
||||
# and exceptions.
|
||||
# We "hijiack" binary_sensor and sensor in order
|
||||
# to make a proper blackbox test.
|
||||
await _read_helper(
|
||||
hass, CONF_SENSORS, do_type, do_return, do_exception, mock_pymodbus
|
||||
)
|
||||
|
||||
# Check state
|
||||
entity_id = f"{SENSOR_DOMAIN}.{TEST_SENSOR_NAME}"
|
||||
entity_id = f"{do_domain}.{TEST_SENSOR_NAME}"
|
||||
state = hass.states.get(entity_id).state
|
||||
assert hass.states.get(entity_id).state
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"do_return,do_exception,do_expect",
|
||||
[
|
||||
[ReadResult([0x01]), None, STATE_ON],
|
||||
[IllegalFunctionRequest(0x99), None, STATE_UNAVAILABLE],
|
||||
[ExceptionResponse(0x99), None, STATE_UNAVAILABLE],
|
||||
[ReadResult([7]), ModbusException("fail read_"), STATE_UNAVAILABLE],
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize("do_type", [CALL_TYPE_DISCRETE, CALL_TYPE_COIL])
|
||||
async def test_pb_read_state(
|
||||
hass, caplog, do_type, do_return, do_exception, do_expect, mock_pymodbus
|
||||
):
|
||||
"""Run test for different read."""
|
||||
|
||||
# the purpose of this test is to test the special
|
||||
# return values from pymodbus:
|
||||
# ExceptionResponse, IllegalResponse
|
||||
# and exceptions.
|
||||
# We "hijiack" binary_sensor and sensor in order
|
||||
# to make a proper blackbox test.
|
||||
await _read_helper(
|
||||
hass, CONF_BINARY_SENSORS, do_type, do_return, do_exception, mock_pymodbus
|
||||
)
|
||||
|
||||
# Check state
|
||||
entity_id = f"{BINARY_SENSOR_DOMAIN}.{TEST_SENSOR_NAME}"
|
||||
state = hass.states.get(entity_id).state
|
||||
# this if is needed to avoid explode the
|
||||
if do_domain == SENSOR_DOMAIN:
|
||||
do_expect = do_expect_value
|
||||
else:
|
||||
do_expect = do_expect_state
|
||||
assert state == do_expect
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue