From 10dccc673437db223b94eff8a7b6f1990d100c85 Mon Sep 17 00:00:00 2001 From: jan iversen Date: Wed, 2 Jun 2021 00:00:44 +0200 Subject: [PATCH] Move pymodbus test fixtures to test_init (#51244) --- tests/components/modbus/conftest.py | 1 + tests/components/modbus/test_init.py | 297 +++++++++++++++++++-------- 2 files changed, 211 insertions(+), 87 deletions(-) diff --git a/tests/components/modbus/conftest.py b/tests/components/modbus/conftest.py index d066e33437c..7f67a0653fb 100644 --- a/tests/components/modbus/conftest.py +++ b/tests/components/modbus/conftest.py @@ -57,6 +57,7 @@ async def mock_modbus(hass, mock_pymodbus): yield mock_pymodbus +# dataclass class ReadResult: """Storage class for register read results.""" diff --git a/tests/components/modbus/test_init.py b/tests/components/modbus/test_init.py index 8e45ee06976..0e9bf43eec2 100644 --- a/tests/components/modbus/test_init.py +++ b/tests/components/modbus/test_init.py @@ -5,9 +5,12 @@ This file is responsible for testing: - Functionality of class ModbusHub - Coverage 100%: __init__.py - base_platform.py const.py modbus.py + validators.py + baseplatform.py (only BasePlatform) + +It uses binary_sensors/sensors to do black box testing of the read calls. """ from datetime import timedelta import logging @@ -35,19 +38,31 @@ from homeassistant.components.modbus.const import ( CALL_TYPE_WRITE_REGISTERS, CONF_BAUDRATE, CONF_BYTESIZE, + CONF_DATA_TYPE, CONF_INPUT_TYPE, CONF_PARITY, + CONF_REVERSE_ORDER, CONF_STOPBITS, + CONF_SWAP, + CONF_SWAP_BYTE, + CONF_SWAP_WORD, + DATA_TYPE_CUSTOM, + DATA_TYPE_INT, + DATA_TYPE_STRING, DEFAULT_SCAN_INTERVAL, MODBUS_DOMAIN as DOMAIN, SERVICE_WRITE_COIL, SERVICE_WRITE_REGISTER, ) -from homeassistant.components.modbus.validators import number_validator +from homeassistant.components.modbus.validators import ( + number_validator, + sensor_schema_validator, +) from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN from homeassistant.const import ( CONF_ADDRESS, CONF_BINARY_SENSORS, + CONF_COUNT, CONF_DELAY, CONF_HOST, CONF_METHOD, @@ -55,6 +70,7 @@ from homeassistant.const import ( CONF_PORT, CONF_SCAN_INTERVAL, CONF_SENSORS, + CONF_STRUCTURE, CONF_TIMEOUT, CONF_TYPE, STATE_ON, @@ -63,13 +79,27 @@ from homeassistant.const import ( from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util -from .conftest import TEST_MODBUS_NAME, ReadResult +from .conftest import ReadResult from tests.common import async_fire_time_changed TEST_SENSOR_NAME = "testSensor" TEST_ENTITY_ID = f"{SENSOR_DOMAIN}.{TEST_SENSOR_NAME}" TEST_HOST = "modbusTestHost" +TEST_MODBUS_NAME = "modbusTest" + + +@pytest.fixture +async def mock_modbus_with_pymodbus(hass, caplog, do_config, mock_pymodbus): + """Load integration modbus using mocked pymodbus.""" + caplog.clear() + caplog.set_level(logging.ERROR) + config = {DOMAIN: do_config} + assert await async_setup_component(hass, DOMAIN, config) is True + await hass.async_block_till_done() + assert DOMAIN in hass.config.components + assert caplog.text == "" + yield mock_pymodbus async def test_number_validator(): @@ -94,6 +124,84 @@ async def test_number_validator(): pytest.fail("Number_validator not throwing exception") +@pytest.mark.parametrize( + "do_config", + [ + { + CONF_NAME: TEST_SENSOR_NAME, + CONF_COUNT: 2, + CONF_DATA_TYPE: DATA_TYPE_STRING, + }, + { + CONF_NAME: TEST_SENSOR_NAME, + CONF_COUNT: 2, + CONF_DATA_TYPE: DATA_TYPE_INT, + CONF_REVERSE_ORDER: True, + }, + { + CONF_NAME: TEST_SENSOR_NAME, + CONF_COUNT: 2, + CONF_DATA_TYPE: DATA_TYPE_INT, + CONF_REVERSE_ORDER: False, + }, + { + CONF_NAME: TEST_SENSOR_NAME, + CONF_COUNT: 2, + CONF_DATA_TYPE: DATA_TYPE_INT, + CONF_SWAP: CONF_SWAP_BYTE, + }, + ], +) +async def test_ok_sensor_schema_validator(do_config): + """Test struct validator.""" + try: + sensor_schema_validator(do_config) + except vol.Invalid: + pytest.fail("Sensor_schema_validator unexpected exception") + + +@pytest.mark.parametrize( + "do_config", + [ + { + CONF_NAME: TEST_SENSOR_NAME, + CONF_COUNT: 8, + CONF_DATA_TYPE: DATA_TYPE_INT, + }, + { + CONF_NAME: TEST_SENSOR_NAME, + CONF_COUNT: 8, + CONF_DATA_TYPE: DATA_TYPE_CUSTOM, + }, + { + CONF_NAME: TEST_SENSOR_NAME, + CONF_COUNT: 8, + CONF_DATA_TYPE: DATA_TYPE_CUSTOM, + CONF_STRUCTURE: "no good", + }, + { + CONF_NAME: TEST_SENSOR_NAME, + CONF_COUNT: 20, + CONF_DATA_TYPE: DATA_TYPE_CUSTOM, + CONF_STRUCTURE: ">f", + }, + { + CONF_NAME: TEST_SENSOR_NAME, + CONF_COUNT: 1, + CONF_DATA_TYPE: DATA_TYPE_INT, + CONF_SWAP: CONF_SWAP_WORD, + }, + ], +) +async def test_exception_sensor_schema_validator(do_config): + """Test struct validator.""" + try: + sensor_schema_validator(do_config) + except vol.Invalid: + return + pytest.fail("Sensor_schema_validator missing exception") + + @pytest.mark.parametrize( "do_config", [ @@ -187,16 +295,23 @@ async def test_number_validator(): CONF_NAME: TEST_MODBUS_NAME + "3", }, ], + { + # Special test for scan_interval validator with scan_interval: 0 + CONF_TYPE: "tcp", + CONF_HOST: TEST_HOST, + CONF_PORT: 5501, + CONF_SENSORS: [ + { + CONF_NAME: TEST_SENSOR_NAME, + CONF_ADDRESS: 117, + CONF_SCAN_INTERVAL: 0, + } + ], + }, ], ) -async def test_config_modbus(hass, caplog, do_config, mock_pymodbus): +async def test_config_modbus(hass, caplog, mock_modbus_with_pymodbus): """Run configuration test for modbus.""" - config = {DOMAIN: do_config} - caplog.set_level(logging.ERROR) - assert await async_setup_component(hass, DOMAIN, config) is True - await hass.async_block_till_done() - assert DOMAIN in hass.config.components - assert len(caplog.records) == 0 VALUE = "value" @@ -205,6 +320,17 @@ DATA = "data" SERVICE = "service" +@pytest.mark.parametrize( + "do_config", + [ + { + CONF_NAME: TEST_MODBUS_NAME, + CONF_TYPE: "tcp", + CONF_HOST: TEST_HOST, + CONF_PORT: 5501, + }, + ], +) @pytest.mark.parametrize( "do_write", [ @@ -234,14 +360,25 @@ SERVICE = "service" }, ], ) -async def test_pb_service_write(hass, do_write, caplog, mock_modbus): +@pytest.mark.parametrize( + "do_return", + [ + {VALUE: ReadResult([0x0001]), DATA: ""}, + {VALUE: ExceptionResponse(0x06), DATA: "Pymodbus:"}, + {VALUE: IllegalFunctionRequest(0x06), DATA: "Pymodbus:"}, + {VALUE: ModbusException("fail write_"), DATA: "Pymodbus:"}, + ], +) +async def test_pb_service_write( + hass, do_write, do_return, caplog, mock_modbus_with_pymodbus +): """Run test for service write_register.""" func_name = { - CALL_TYPE_WRITE_COIL: mock_modbus.write_coil, - CALL_TYPE_WRITE_COILS: mock_modbus.write_coils, - CALL_TYPE_WRITE_REGISTER: mock_modbus.write_register, - CALL_TYPE_WRITE_REGISTERS: mock_modbus.write_registers, + CALL_TYPE_WRITE_COIL: mock_modbus_with_pymodbus.write_coil, + CALL_TYPE_WRITE_COILS: mock_modbus_with_pymodbus.write_coils, + CALL_TYPE_WRITE_REGISTER: mock_modbus_with_pymodbus.write_register, + CALL_TYPE_WRITE_REGISTERS: mock_modbus_with_pymodbus.write_registers, } data = { @@ -250,28 +387,42 @@ async def test_pb_service_write(hass, do_write, caplog, mock_modbus): ATTR_ADDRESS: 16, do_write[DATA]: do_write[VALUE], } + mock_modbus_with_pymodbus.reset_mock() + caplog.clear() + caplog.set_level(logging.DEBUG) + func_name[do_write[FUNC]].return_value = do_return[VALUE] await hass.services.async_call(DOMAIN, do_write[SERVICE], data, blocking=True) assert func_name[do_write[FUNC]].called assert func_name[do_write[FUNC]].call_args[0] == ( data[ATTR_ADDRESS], data[do_write[DATA]], ) - mock_modbus.reset_mock() - - for return_value in [ - ExceptionResponse(0x06), - IllegalFunctionRequest(0x06), - ModbusException("fail write_"), - ]: - caplog.set_level(logging.DEBUG) - func_name[do_write[FUNC]].return_value = return_value - await hass.services.async_call(DOMAIN, do_write[SERVICE], data, blocking=True) - assert func_name[do_write[FUNC]].called + if do_return[DATA]: assert caplog.messages[-1].startswith("Pymodbus:") - mock_modbus.reset_mock() -async def _read_helper(hass, do_group, do_type, do_return, do_exception, mock_pymodbus): +@pytest.fixture +async def mock_modbus_read_pymodbus( + hass, + do_group, + do_type, + do_scan_interval, + do_return, + do_exception, + caplog, + mock_pymodbus, +): + """Load integration modbus using mocked pymodbus.""" + caplog.clear() + caplog.set_level(logging.ERROR) + mock_pymodbus.read_coils.side_effect = do_exception + mock_pymodbus.read_discrete_inputs.side_effect = do_exception + mock_pymodbus.read_input_registers.side_effect = do_exception + mock_pymodbus.read_holding_registers.side_effect = do_exception + mock_pymodbus.read_coils.return_value = do_return + mock_pymodbus.read_discrete_inputs.return_value = do_return + mock_pymodbus.read_input_registers.return_value = do_return + mock_pymodbus.read_holding_registers.return_value = do_return config = { DOMAIN: [ { @@ -284,91 +435,63 @@ async def _read_helper(hass, do_group, do_type, do_return, do_exception, mock_py CONF_INPUT_TYPE: do_type, CONF_NAME: TEST_SENSOR_NAME, CONF_ADDRESS: 51, - CONF_SCAN_INTERVAL: 1, + CONF_SCAN_INTERVAL: do_scan_interval, } ], } - ] + ], } - mock_pymodbus.read_coils.side_effect = do_exception - mock_pymodbus.read_discrete_inputs.side_effect = do_exception - mock_pymodbus.read_input_registers.side_effect = do_exception - mock_pymodbus.read_holding_registers.side_effect = do_exception - mock_pymodbus.read_coils.return_value = do_return - mock_pymodbus.read_discrete_inputs.return_value = do_return - mock_pymodbus.read_input_registers.return_value = do_return - mock_pymodbus.read_holding_registers.return_value = do_return now = dt_util.utcnow() with mock.patch("homeassistant.helpers.event.dt_util.utcnow", return_value=now): assert await async_setup_component(hass, DOMAIN, config) is True await hass.async_block_till_done() + assert DOMAIN in hass.config.components + assert caplog.text == "" now = now + timedelta(seconds=DEFAULT_SCAN_INTERVAL + 60) with mock.patch("homeassistant.helpers.event.dt_util.utcnow", return_value=now): async_fire_time_changed(hass, now) await hass.async_block_till_done() + yield mock_pymodbus @pytest.mark.parametrize( - "do_return,do_exception,do_expect", + "do_domain, do_group,do_type,do_scan_interval", [ - [ReadResult([7]), None, "7"], - [IllegalFunctionRequest(0x99), None, STATE_UNAVAILABLE], - [ExceptionResponse(0x99), None, STATE_UNAVAILABLE], - [ReadResult([7]), ModbusException("fail read_"), STATE_UNAVAILABLE], + [SENSOR_DOMAIN, CONF_SENSORS, CALL_TYPE_REGISTER_HOLDING, 1], + [SENSOR_DOMAIN, CONF_SENSORS, CALL_TYPE_REGISTER_INPUT, 1], + [BINARY_SENSOR_DOMAIN, CONF_BINARY_SENSORS, CALL_TYPE_DISCRETE, 1], + [BINARY_SENSOR_DOMAIN, CONF_BINARY_SENSORS, CALL_TYPE_COIL, 1], ], ) @pytest.mark.parametrize( - "do_type", - [CALL_TYPE_REGISTER_HOLDING, CALL_TYPE_REGISTER_INPUT], + "do_return,do_exception,do_expect_state,do_expect_value", + [ + [ReadResult([1]), None, STATE_ON, "1"], + [IllegalFunctionRequest(0x99), None, STATE_UNAVAILABLE, STATE_UNAVAILABLE], + [ExceptionResponse(0x99), None, STATE_UNAVAILABLE, STATE_UNAVAILABLE], + [ + ReadResult([1]), + ModbusException("fail read_"), + STATE_UNAVAILABLE, + STATE_UNAVAILABLE, + ], + ], ) -async def test_pb_read_value( - hass, caplog, do_type, do_return, do_exception, do_expect, mock_pymodbus +async def test_pb_read( + hass, do_domain, do_expect_state, do_expect_value, caplog, mock_modbus_read_pymodbus ): """Run test for different read.""" - # the purpose of this test is to test the special - # return values from pymodbus: - # ExceptionResponse, IllegalResponse - # and exceptions. - # We "hijiack" binary_sensor and sensor in order - # to make a proper blackbox test. - await _read_helper( - hass, CONF_SENSORS, do_type, do_return, do_exception, mock_pymodbus - ) - # Check state - entity_id = f"{SENSOR_DOMAIN}.{TEST_SENSOR_NAME}" + entity_id = f"{do_domain}.{TEST_SENSOR_NAME}" + state = hass.states.get(entity_id).state assert hass.states.get(entity_id).state - -@pytest.mark.parametrize( - "do_return,do_exception,do_expect", - [ - [ReadResult([0x01]), None, STATE_ON], - [IllegalFunctionRequest(0x99), None, STATE_UNAVAILABLE], - [ExceptionResponse(0x99), None, STATE_UNAVAILABLE], - [ReadResult([7]), ModbusException("fail read_"), STATE_UNAVAILABLE], - ], -) -@pytest.mark.parametrize("do_type", [CALL_TYPE_DISCRETE, CALL_TYPE_COIL]) -async def test_pb_read_state( - hass, caplog, do_type, do_return, do_exception, do_expect, mock_pymodbus -): - """Run test for different read.""" - - # the purpose of this test is to test the special - # return values from pymodbus: - # ExceptionResponse, IllegalResponse - # and exceptions. - # We "hijiack" binary_sensor and sensor in order - # to make a proper blackbox test. - await _read_helper( - hass, CONF_BINARY_SENSORS, do_type, do_return, do_exception, mock_pymodbus - ) - - # Check state - entity_id = f"{BINARY_SENSOR_DOMAIN}.{TEST_SENSOR_NAME}" - state = hass.states.get(entity_id).state + # this if is needed to avoid explode the + if do_domain == SENSOR_DOMAIN: + do_expect = do_expect_value + else: + do_expect = do_expect_state assert state == do_expect