parent
b99cdf3144
commit
8d6473061c
|
@ -52,6 +52,15 @@ def mock_pymodbus_fixture():
|
||||||
"""Mock pymodbus."""
|
"""Mock pymodbus."""
|
||||||
mock_pb = mock.AsyncMock()
|
mock_pb = mock.AsyncMock()
|
||||||
mock_pb.close = mock.MagicMock()
|
mock_pb.close = mock.MagicMock()
|
||||||
|
read_result = ReadResult([])
|
||||||
|
mock_pb.read_coils.return_value = read_result
|
||||||
|
mock_pb.read_discrete_inputs.return_value = read_result
|
||||||
|
mock_pb.read_input_registers.return_value = read_result
|
||||||
|
mock_pb.read_holding_registers.return_value = read_result
|
||||||
|
mock_pb.write_register.return_value = read_result
|
||||||
|
mock_pb.write_registers.return_value = read_result
|
||||||
|
mock_pb.write_coil.return_value = read_result
|
||||||
|
mock_pb.write_coils.return_value = read_result
|
||||||
with (
|
with (
|
||||||
mock.patch(
|
mock.patch(
|
||||||
"homeassistant.components.modbus.modbus.AsyncModbusTcpClient",
|
"homeassistant.components.modbus.modbus.AsyncModbusTcpClient",
|
||||||
|
@ -156,7 +165,7 @@ async def mock_pymodbus_exception_fixture(hass, do_exception, mock_modbus):
|
||||||
@pytest.fixture(name="mock_pymodbus_return")
|
@pytest.fixture(name="mock_pymodbus_return")
|
||||||
async def mock_pymodbus_return_fixture(hass, register_words, mock_modbus):
|
async def mock_pymodbus_return_fixture(hass, register_words, mock_modbus):
|
||||||
"""Trigger update call with time_changed event."""
|
"""Trigger update call with time_changed event."""
|
||||||
read_result = ReadResult(register_words) if register_words else None
|
read_result = ReadResult(register_words if register_words else [])
|
||||||
mock_modbus.read_coils.return_value = read_result
|
mock_modbus.read_coils.return_value = read_result
|
||||||
mock_modbus.read_discrete_inputs.return_value = read_result
|
mock_modbus.read_discrete_inputs.return_value = read_result
|
||||||
mock_modbus.read_input_registers.return_value = read_result
|
mock_modbus.read_input_registers.return_value = read_result
|
||||||
|
@ -165,6 +174,7 @@ async def mock_pymodbus_return_fixture(hass, register_words, mock_modbus):
|
||||||
mock_modbus.write_registers.return_value = read_result
|
mock_modbus.write_registers.return_value = read_result
|
||||||
mock_modbus.write_coil.return_value = read_result
|
mock_modbus.write_coil.return_value = read_result
|
||||||
mock_modbus.write_coils.return_value = read_result
|
mock_modbus.write_coils.return_value = read_result
|
||||||
|
return mock_modbus
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name="mock_do_cycle")
|
@pytest.fixture(name="mock_do_cycle")
|
||||||
|
|
|
@ -3,3 +3,7 @@ modbus:
|
||||||
host: "testHost"
|
host: "testHost"
|
||||||
port: 5001
|
port: 5001
|
||||||
name: "testModbus"
|
name: "testModbus"
|
||||||
|
sensors:
|
||||||
|
- name: "dummy"
|
||||||
|
address: 117
|
||||||
|
slave: 0
|
||||||
|
|
|
@ -1645,7 +1645,7 @@ async def test_shutdown(
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_stop_restart(
|
async def test_stop_restart(
|
||||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_modbus
|
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_pymodbus_return
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run test for service stop."""
|
"""Run test for service stop."""
|
||||||
|
|
||||||
|
@ -1656,7 +1656,7 @@ async def test_stop_restart(
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert hass.states.get(entity_id).state == "17"
|
assert hass.states.get(entity_id).state == "17"
|
||||||
|
|
||||||
mock_modbus.reset_mock()
|
mock_pymodbus_return.reset_mock()
|
||||||
caplog.clear()
|
caplog.clear()
|
||||||
data = {
|
data = {
|
||||||
ATTR_HUB: TEST_MODBUS_NAME,
|
ATTR_HUB: TEST_MODBUS_NAME,
|
||||||
|
@ -1664,23 +1664,23 @@ async def test_stop_restart(
|
||||||
await hass.services.async_call(DOMAIN, SERVICE_STOP, data, blocking=True)
|
await hass.services.async_call(DOMAIN, SERVICE_STOP, data, blocking=True)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
|
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
|
||||||
assert mock_modbus.close.called
|
assert mock_pymodbus_return.close.called
|
||||||
assert f"modbus {TEST_MODBUS_NAME} communication closed" in caplog.text
|
assert f"modbus {TEST_MODBUS_NAME} communication closed" in caplog.text
|
||||||
|
|
||||||
mock_modbus.reset_mock()
|
mock_pymodbus_return.reset_mock()
|
||||||
caplog.clear()
|
caplog.clear()
|
||||||
await hass.services.async_call(DOMAIN, SERVICE_RESTART, data, blocking=True)
|
await hass.services.async_call(DOMAIN, SERVICE_RESTART, data, blocking=True)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert not mock_modbus.close.called
|
assert not mock_pymodbus_return.close.called
|
||||||
assert mock_modbus.connect.called
|
assert mock_pymodbus_return.connect.called
|
||||||
assert f"modbus {TEST_MODBUS_NAME} communication open" in caplog.text
|
assert f"modbus {TEST_MODBUS_NAME} communication open" in caplog.text
|
||||||
|
|
||||||
mock_modbus.reset_mock()
|
mock_pymodbus_return.reset_mock()
|
||||||
caplog.clear()
|
caplog.clear()
|
||||||
await hass.services.async_call(DOMAIN, SERVICE_RESTART, data, blocking=True)
|
await hass.services.async_call(DOMAIN, SERVICE_RESTART, data, blocking=True)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert mock_modbus.close.called
|
assert mock_pymodbus_return.close.called
|
||||||
assert mock_modbus.connect.called
|
assert mock_pymodbus_return.connect.called
|
||||||
assert f"modbus {TEST_MODBUS_NAME} communication closed" in caplog.text
|
assert f"modbus {TEST_MODBUS_NAME} communication closed" in caplog.text
|
||||||
assert f"modbus {TEST_MODBUS_NAME} communication open" in caplog.text
|
assert f"modbus {TEST_MODBUS_NAME} communication open" in caplog.text
|
||||||
|
|
||||||
|
@ -1710,7 +1710,7 @@ async def test_write_no_client(hass: HomeAssistant, mock_modbus) -> None:
|
||||||
async def test_integration_reload(
|
async def test_integration_reload(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
caplog: pytest.LogCaptureFixture,
|
caplog: pytest.LogCaptureFixture,
|
||||||
mock_modbus,
|
mock_pymodbus_return,
|
||||||
freezer: FrozenDateTimeFactory,
|
freezer: FrozenDateTimeFactory,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run test for integration reload."""
|
"""Run test for integration reload."""
|
||||||
|
@ -1731,7 +1731,7 @@ async def test_integration_reload(
|
||||||
|
|
||||||
@pytest.mark.parametrize("do_config", [{}])
|
@pytest.mark.parametrize("do_config", [{}])
|
||||||
async def test_integration_reload_failed(
|
async def test_integration_reload_failed(
|
||||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_modbus
|
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_pymodbus_return
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run test for integration connect failure on reload."""
|
"""Run test for integration connect failure on reload."""
|
||||||
caplog.set_level(logging.INFO)
|
caplog.set_level(logging.INFO)
|
||||||
|
@ -1740,7 +1740,9 @@ async def test_integration_reload_failed(
|
||||||
yaml_path = get_fixture_path("configuration.yaml", "modbus")
|
yaml_path = get_fixture_path("configuration.yaml", "modbus")
|
||||||
with (
|
with (
|
||||||
mock.patch.object(hass_config, "YAML_CONFIG_FILE", yaml_path),
|
mock.patch.object(hass_config, "YAML_CONFIG_FILE", yaml_path),
|
||||||
mock.patch.object(mock_modbus, "connect", side_effect=ModbusException("error")),
|
mock.patch.object(
|
||||||
|
mock_pymodbus_return, "connect", side_effect=ModbusException("error")
|
||||||
|
),
|
||||||
):
|
):
|
||||||
await hass.services.async_call(DOMAIN, SERVICE_RELOAD, blocking=True)
|
await hass.services.async_call(DOMAIN, SERVICE_RELOAD, blocking=True)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
@ -1751,7 +1753,7 @@ async def test_integration_reload_failed(
|
||||||
|
|
||||||
@pytest.mark.parametrize("do_config", [{}])
|
@pytest.mark.parametrize("do_config", [{}])
|
||||||
async def test_integration_setup_failed(
|
async def test_integration_setup_failed(
|
||||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_modbus
|
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_pymodbus_return
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run test for integration setup on reload."""
|
"""Run test for integration setup on reload."""
|
||||||
with mock.patch.object(
|
with mock.patch.object(
|
||||||
|
|
Loading…
Reference in New Issue