Add Modbus light integration (#42120)

* Add  Modbus Light and add unit tests

* Update to original PR.

* Review comments.

* Review 2.

Co-authored-by: jan Iversen <jancasacondor@gmail.com>
pull/48558/head
Vladimír Záhradník 2021-05-21 08:57:17 +02:00 committed by GitHub
parent 19aee19efd
commit 80d172140f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 479 additions and 1 deletions

View File

@ -28,6 +28,7 @@ from homeassistant.const import (
CONF_DELAY,
CONF_DEVICE_CLASS,
CONF_HOST,
CONF_LIGHTS,
CONF_METHOD,
CONF_NAME,
CONF_OFFSET,
@ -239,6 +240,32 @@ SWITCH_SCHEMA = BASE_COMPONENT_SCHEMA.extend(
}
)
LIGHT_SCHEMA = BASE_COMPONENT_SCHEMA.extend(
{
vol.Required(CONF_ADDRESS): cv.positive_int,
vol.Optional(CONF_WRITE_TYPE, default=CALL_TYPE_REGISTER_HOLDING): vol.In(
[CALL_TYPE_REGISTER_HOLDING, CALL_TYPE_COIL]
),
vol.Optional(CONF_COMMAND_OFF, default=0x00): cv.positive_int,
vol.Optional(CONF_COMMAND_ON, default=0x01): cv.positive_int,
vol.Optional(CONF_VERIFY): vol.Maybe(
{
vol.Optional(CONF_ADDRESS): cv.positive_int,
vol.Optional(CONF_INPUT_TYPE): vol.In(
[
CALL_TYPE_REGISTER_HOLDING,
CALL_TYPE_DISCRETE,
CALL_TYPE_REGISTER_INPUT,
CALL_TYPE_COIL,
]
),
vol.Optional(CONF_STATE_OFF): cv.positive_int,
vol.Optional(CONF_STATE_ON): cv.positive_int,
}
),
}
)
SENSOR_SCHEMA = BASE_COMPONENT_SCHEMA.extend(
{
vol.Required(CONF_ADDRESS): cv.positive_int,
@ -289,6 +316,7 @@ MODBUS_SCHEMA = vol.Schema(
),
vol.Optional(CONF_CLIMATES): vol.All(cv.ensure_list, [CLIMATE_SCHEMA]),
vol.Optional(CONF_COVERS): vol.All(cv.ensure_list, [COVERS_SCHEMA]),
vol.Optional(CONF_LIGHTS): vol.All(cv.ensure_list, [LIGHT_SCHEMA]),
vol.Optional(CONF_SENSORS): vol.All(cv.ensure_list, [SENSOR_SCHEMA]),
vol.Optional(CONF_SWITCHES): vol.All(cv.ensure_list, [SWITCH_SCHEMA]),
}

View File

@ -3,11 +3,13 @@
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN
from homeassistant.components.climate.const import DOMAIN as CLIMATE_DOMAIN
from homeassistant.components.cover import DOMAIN as COVER_DOMAIN
from homeassistant.components.light import DOMAIN as LIGHT_DOMAIN
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN
from homeassistant.const import (
CONF_BINARY_SENSORS,
CONF_COVERS,
CONF_LIGHTS,
CONF_SENSORS,
CONF_SWITCHES,
)
@ -99,9 +101,10 @@ DEFAULT_TEMP_UNIT = "C"
MODBUS_DOMAIN = "modbus"
PLATFORMS = (
(BINARY_SENSOR_DOMAIN, CONF_BINARY_SENSORS),
(CLIMATE_DOMAIN, CONF_CLIMATES),
(COVER_DOMAIN, CONF_COVERS),
(BINARY_SENSOR_DOMAIN, CONF_BINARY_SENSORS),
(LIGHT_DOMAIN, CONF_LIGHTS),
(SENSOR_DOMAIN, CONF_SENSORS),
(SWITCH_DOMAIN, CONF_SWITCHES),
)

View File

@ -0,0 +1,158 @@
"""Support for Modbus lights."""
from __future__ import annotations
import logging
from homeassistant.components.light import LightEntity
from homeassistant.const import (
CONF_ADDRESS,
CONF_COMMAND_OFF,
CONF_COMMAND_ON,
CONF_LIGHTS,
CONF_NAME,
STATE_ON,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers.typing import ConfigType
from .base_platform import BasePlatform
from .const import (
CALL_TYPE_COIL,
CALL_TYPE_WRITE_COIL,
CALL_TYPE_WRITE_REGISTER,
CONF_INPUT_TYPE,
CONF_STATE_OFF,
CONF_STATE_ON,
CONF_VERIFY,
CONF_WRITE_TYPE,
MODBUS_DOMAIN,
)
from .modbus import ModbusHub
PARALLEL_UPDATES = 1
_LOGGER = logging.getLogger(__name__)
async def async_setup_platform(
hass: HomeAssistant, config: ConfigType, async_add_entities, discovery_info=None
):
"""Read configuration and create Modbus lights."""
if discovery_info is None:
return
lights = []
for entry in discovery_info[CONF_LIGHTS]:
hub: ModbusHub = hass.data[MODBUS_DOMAIN][discovery_info[CONF_NAME]]
lights.append(ModbusLight(hub, entry))
async_add_entities(lights)
class ModbusLight(BasePlatform, LightEntity, RestoreEntity):
"""Base class representing a Modbus light."""
def __init__(self, hub: ModbusHub, config: dict) -> None:
"""Initialize the light."""
config[CONF_INPUT_TYPE] = ""
super().__init__(hub, config)
self._is_on = None
if config[CONF_WRITE_TYPE] == CALL_TYPE_COIL:
self._write_type = CALL_TYPE_WRITE_COIL
else:
self._write_type = CALL_TYPE_WRITE_REGISTER
self._command_on = config[CONF_COMMAND_ON]
self._command_off = config[CONF_COMMAND_OFF]
if CONF_VERIFY in config:
if config[CONF_VERIFY] is None:
config[CONF_VERIFY] = {}
self._verify_active = True
self._verify_address = config[CONF_VERIFY].get(
CONF_ADDRESS, config[CONF_ADDRESS]
)
self._verify_type = config[CONF_VERIFY].get(
CONF_INPUT_TYPE, config[CONF_WRITE_TYPE]
)
self._state_on = config[CONF_VERIFY].get(CONF_STATE_ON, self._command_on)
self._state_off = config[CONF_VERIFY].get(CONF_STATE_OFF, self._command_off)
else:
self._verify_active = False
async def async_added_to_hass(self):
"""Handle entity which will be added."""
await self.async_base_added_to_hass()
state = await self.async_get_last_state()
if state:
self._is_on = state.state == STATE_ON
@property
def is_on(self):
"""Return true if light is on."""
return self._is_on
async def async_turn_on(self, **kwargs):
"""Set light on."""
result = await self._hub.async_pymodbus_call(
self._slave, self._address, self._command_on, self._write_type
)
if result is None:
self._available = False
self.async_write_ha_state()
else:
self._available = True
if self._verify_active:
await self.async_update()
else:
self._is_on = True
self.async_write_ha_state()
async def async_turn_off(self, **kwargs):
"""Set light off."""
result = await self._hub.async_pymodbus_call(
self._slave, self._address, self._command_off, self._write_type
)
if result is None:
self._available = False
self.async_write_ha_state()
else:
self._available = True
if self._verify_active:
await self.async_update()
else:
self._is_on = False
self.async_write_ha_state()
async def async_update(self, now=None):
"""Update the entity state."""
# remark "now" is a dummy parameter to avoid problems with
# async_track_time_interval
if not self._verify_active:
self._available = True
self.async_write_ha_state()
return
result = await self._hub.async_pymodbus_call(
self._slave, self._verify_address, 1, self._verify_type
)
if result is None:
self._available = False
self.async_write_ha_state()
return
self._available = True
if self._verify_type == CALL_TYPE_COIL:
self._is_on = bool(result.bits[0] & 1)
else:
value = int(result.registers[0])
if value == self._state_on:
self._is_on = True
elif value == self._state_off:
self._is_on = False
elif value is not None:
_LOGGER.error(
"Unexpected response from hub %s, slave %s register %s, got 0x%2x",
self._hub.name,
self._slave,
self._verify_address,
value,
)
self.async_write_ha_state()

View File

@ -0,0 +1,289 @@
"""The tests for the Modbus light component."""
from pymodbus.exceptions import ModbusException
import pytest
from homeassistant.components.light import DOMAIN as LIGHT_DOMAIN
from homeassistant.components.modbus.const import (
CALL_TYPE_COIL,
CALL_TYPE_DISCRETE,
CALL_TYPE_REGISTER_HOLDING,
CALL_TYPE_REGISTER_INPUT,
CONF_INPUT_TYPE,
CONF_STATE_OFF,
CONF_STATE_ON,
CONF_VERIFY,
CONF_WRITE_TYPE,
MODBUS_DOMAIN,
)
from homeassistant.const import (
CONF_ADDRESS,
CONF_COMMAND_OFF,
CONF_COMMAND_ON,
CONF_HOST,
CONF_LIGHTS,
CONF_NAME,
CONF_PORT,
CONF_SLAVE,
CONF_TYPE,
STATE_OFF,
STATE_ON,
STATE_UNAVAILABLE,
)
from homeassistant.core import State
from homeassistant.setup import async_setup_component
from .conftest import ReadResult, base_config_test, base_test, prepare_service_update
from tests.common import mock_restore_cache
@pytest.mark.parametrize(
"do_config",
[
{
CONF_ADDRESS: 1234,
},
{
CONF_ADDRESS: 1234,
CONF_WRITE_TYPE: CALL_TYPE_COIL,
},
{
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
CONF_COMMAND_ON: 0x01,
CONF_VERIFY: {
CONF_INPUT_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_ADDRESS: 1235,
CONF_STATE_OFF: 0,
CONF_STATE_ON: 1,
},
},
{
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
CONF_COMMAND_ON: 0x01,
CONF_VERIFY: {
CONF_INPUT_TYPE: CALL_TYPE_REGISTER_INPUT,
CONF_ADDRESS: 1235,
CONF_STATE_OFF: 0,
CONF_STATE_ON: 1,
},
},
{
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
CONF_COMMAND_ON: 0x01,
CONF_VERIFY: {
CONF_INPUT_TYPE: CALL_TYPE_DISCRETE,
CONF_ADDRESS: 1235,
CONF_STATE_OFF: 0,
CONF_STATE_ON: 1,
},
},
{
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_COMMAND_OFF: 0x00,
CONF_COMMAND_ON: 0x01,
CONF_VERIFY: None,
},
],
)
async def test_config_light(hass, do_config):
"""Run test for light."""
device_name = "test_light"
device_config = {
CONF_NAME: device_name,
**do_config,
}
await base_config_test(
hass,
device_config,
device_name,
LIGHT_DOMAIN,
CONF_LIGHTS,
None,
method_discovery=True,
)
@pytest.mark.parametrize("call_type", [CALL_TYPE_COIL, CALL_TYPE_REGISTER_HOLDING])
@pytest.mark.parametrize(
"regs,verify,expected",
[
(
[0x00],
{CONF_VERIFY: {}},
STATE_OFF,
),
(
[0x01],
{CONF_VERIFY: {}},
STATE_ON,
),
(
[0xFE],
{CONF_VERIFY: {}},
STATE_OFF,
),
(
None,
{CONF_VERIFY: {}},
STATE_UNAVAILABLE,
),
(
None,
{},
STATE_OFF,
),
],
)
async def test_all_light(hass, call_type, regs, verify, expected):
"""Run test for given config."""
light_name = "modbus_test_light"
state = await base_test(
hass,
{
CONF_NAME: light_name,
CONF_ADDRESS: 1234,
CONF_SLAVE: 1,
CONF_WRITE_TYPE: call_type,
**verify,
},
light_name,
LIGHT_DOMAIN,
CONF_LIGHTS,
None,
regs,
expected,
method_discovery=True,
scan_interval=5,
)
assert state == expected
async def test_restore_state_light(hass):
"""Run test for sensor restore state."""
light_name = "test_light"
entity_id = f"{LIGHT_DOMAIN}.{light_name}"
test_value = STATE_ON
config_light = {CONF_NAME: light_name, CONF_ADDRESS: 17}
mock_restore_cache(
hass,
(State(f"{entity_id}", test_value),),
)
await base_config_test(
hass,
config_light,
light_name,
LIGHT_DOMAIN,
CONF_LIGHTS,
None,
method_discovery=True,
)
assert hass.states.get(entity_id).state == test_value
async def test_light_service_turn(hass, caplog, mock_pymodbus):
"""Run test for service turn_on/turn_off."""
entity_id1 = f"{LIGHT_DOMAIN}.light1"
entity_id2 = f"{LIGHT_DOMAIN}.light2"
config = {
MODBUS_DOMAIN: {
CONF_TYPE: "tcp",
CONF_HOST: "modbusTestHost",
CONF_PORT: 5501,
CONF_LIGHTS: [
{
CONF_NAME: "light1",
CONF_ADDRESS: 17,
CONF_WRITE_TYPE: CALL_TYPE_REGISTER_HOLDING,
},
{
CONF_NAME: "light2",
CONF_ADDRESS: 17,
CONF_WRITE_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_VERIFY: {},
},
],
},
}
assert await async_setup_component(hass, MODBUS_DOMAIN, config) is True
await hass.async_block_till_done()
assert MODBUS_DOMAIN in hass.config.components
assert hass.states.get(entity_id1).state == STATE_OFF
await hass.services.async_call(
"light", "turn_on", service_data={"entity_id": entity_id1}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id1).state == STATE_ON
await hass.services.async_call(
"light", "turn_off", service_data={"entity_id": entity_id1}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id1).state == STATE_OFF
mock_pymodbus.read_holding_registers.return_value = ReadResult([0x01])
assert hass.states.get(entity_id2).state == STATE_OFF
await hass.services.async_call(
"light", "turn_on", service_data={"entity_id": entity_id2}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id2).state == STATE_ON
mock_pymodbus.read_holding_registers.return_value = ReadResult([0x00])
await hass.services.async_call(
"light", "turn_off", service_data={"entity_id": entity_id2}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id2).state == STATE_OFF
mock_pymodbus.write_register.side_effect = ModbusException("fail write_")
await hass.services.async_call(
"light", "turn_on", service_data={"entity_id": entity_id2}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id2).state == STATE_UNAVAILABLE
mock_pymodbus.write_coil.side_effect = ModbusException("fail write_")
await hass.services.async_call(
"light", "turn_off", service_data={"entity_id": entity_id1}
)
await hass.async_block_till_done()
assert hass.states.get(entity_id1).state == STATE_UNAVAILABLE
async def test_service_light_update(hass, mock_pymodbus):
"""Run test for service homeassistant.update_entity."""
entity_id = "light.test"
config = {
CONF_LIGHTS: [
{
CONF_NAME: "test",
CONF_ADDRESS: 1234,
CONF_WRITE_TYPE: CALL_TYPE_COIL,
CONF_VERIFY: {},
}
]
}
mock_pymodbus.read_discrete_inputs.return_value = ReadResult([0x01])
await prepare_service_update(
hass,
config,
)
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
)
assert hass.states.get(entity_id).state == STATE_ON
mock_pymodbus.read_coils.return_value = ReadResult([0x00])
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
)
assert hass.states.get(entity_id).state == STATE_OFF