"""Support for Modbus switches.""" from __future__ import annotations from abc import ABC, abstractmethod from datetime import timedelta import logging from typing import Any import voluptuous as vol from homeassistant.components.switch import PLATFORM_SCHEMA, SwitchEntity from homeassistant.const import ( CONF_ADDRESS, CONF_COMMAND_OFF, CONF_COMMAND_ON, CONF_NAME, CONF_SCAN_INTERVAL, CONF_SLAVE, CONF_SWITCHES, STATE_ON, ) from homeassistant.core import HomeAssistant from homeassistant.helpers import config_validation as cv from homeassistant.helpers.event import async_track_time_interval from homeassistant.helpers.restore_state import RestoreEntity from homeassistant.helpers.typing import ConfigType from .const import ( CALL_TYPE_COIL, CALL_TYPE_REGISTER_HOLDING, CALL_TYPE_REGISTER_INPUT, CONF_COILS, CONF_HUB, CONF_INPUT_TYPE, CONF_REGISTER, CONF_REGISTER_TYPE, CONF_REGISTERS, CONF_STATE_OFF, CONF_STATE_ON, CONF_VERIFY_REGISTER, CONF_VERIFY_STATE, DEFAULT_HUB, DEFAULT_SCAN_INTERVAL, MODBUS_DOMAIN, ) from .modbus import ModbusHub _LOGGER = logging.getLogger(__name__) REGISTERS_SCHEMA = vol.Schema( { vol.Required(CONF_COMMAND_OFF): cv.positive_int, vol.Required(CONF_COMMAND_ON): cv.positive_int, vol.Required(CONF_NAME): cv.string, vol.Required(CONF_REGISTER): cv.positive_int, vol.Optional(CONF_HUB, default=DEFAULT_HUB): cv.string, vol.Optional(CONF_REGISTER_TYPE, default=CALL_TYPE_REGISTER_HOLDING): vol.In( [CALL_TYPE_REGISTER_HOLDING, CALL_TYPE_REGISTER_INPUT] ), vol.Optional(CONF_SLAVE): cv.positive_int, vol.Optional(CONF_STATE_OFF): cv.positive_int, vol.Optional(CONF_STATE_ON): cv.positive_int, vol.Optional(CONF_VERIFY_REGISTER): cv.positive_int, vol.Optional(CONF_VERIFY_STATE, default=True): cv.boolean, } ) COILS_SCHEMA = vol.Schema( { vol.Required(CALL_TYPE_COIL): cv.positive_int, vol.Required(CONF_NAME): cv.string, vol.Required(CONF_SLAVE): cv.positive_int, vol.Optional(CONF_HUB, default=DEFAULT_HUB): cv.string, } ) PLATFORM_SCHEMA = vol.All( cv.has_at_least_one_key(CONF_COILS, CONF_REGISTERS), PLATFORM_SCHEMA.extend( { vol.Optional(CONF_COILS): [COILS_SCHEMA], vol.Optional(CONF_REGISTERS): [REGISTERS_SCHEMA], } ), ) async def async_setup_platform( hass: HomeAssistant, config: ConfigType, async_add_entities, discovery_info=None ): """Read configuration and create Modbus switches.""" switches = [] #  check for old config: if discovery_info is None: _LOGGER.warning( "Switch configuration is deprecated, will be removed in a future release" ) discovery_info = { CONF_NAME: "no name", CONF_SWITCHES: [], } if CONF_COILS in config: discovery_info[CONF_SWITCHES].extend(config[CONF_COILS]) if CONF_REGISTERS in config: discovery_info[CONF_SWITCHES].extend(config[CONF_REGISTERS]) for entry in discovery_info[CONF_SWITCHES]: if CALL_TYPE_COIL in entry: entry[CONF_ADDRESS] = entry[CALL_TYPE_COIL] entry[CONF_INPUT_TYPE] = CALL_TYPE_COIL del entry[CALL_TYPE_COIL] if CONF_REGISTER in entry: entry[CONF_ADDRESS] = entry[CONF_REGISTER] del entry[CONF_REGISTER] if CONF_REGISTER_TYPE in entry: entry[CONF_INPUT_TYPE] = entry[CONF_REGISTER_TYPE] del entry[CONF_REGISTER_TYPE] if CONF_SCAN_INTERVAL not in entry: entry[CONF_SCAN_INTERVAL] = DEFAULT_SCAN_INTERVAL config = None for entry in discovery_info[CONF_SWITCHES]: if CONF_HUB in entry: # from old config! hub: ModbusHub = hass.data[MODBUS_DOMAIN][entry[CONF_HUB]] else: hub: ModbusHub = hass.data[MODBUS_DOMAIN][discovery_info[CONF_NAME]] if entry[CONF_INPUT_TYPE] == CALL_TYPE_COIL: switches.append(ModbusCoilSwitch(hub, entry)) else: switches.append(ModbusRegisterSwitch(hub, entry)) async_add_entities(switches) class ModbusBaseSwitch(SwitchEntity, RestoreEntity, ABC): """Base class representing a Modbus switch.""" def __init__(self, hub: ModbusHub, config: dict[str, Any]): """Initialize the switch.""" self._hub: ModbusHub = hub self._name = config[CONF_NAME] self._slave = config.get(CONF_SLAVE) self._is_on = None self._available = True self._scan_interval = timedelta(seconds=config[CONF_SCAN_INTERVAL]) async def async_added_to_hass(self): """Handle entity which will be added.""" state = await self.async_get_last_state() if state: self._is_on = state.state == STATE_ON async_track_time_interval( self.hass, lambda arg: self._update(), self._scan_interval ) @abstractmethod def _update(self): """Update the entity state.""" @property def is_on(self): """Return true if switch is on.""" return self._is_on @property def name(self): """Return the name of the switch.""" return self._name @property def should_poll(self): """Return True if entity has to be polled for state. False if entity pushes its state to HA. """ # Handle polling directly in this entity return False @property def available(self) -> bool: """Return True if entity is available.""" return self._available class ModbusCoilSwitch(ModbusBaseSwitch, SwitchEntity): """Representation of a Modbus coil switch.""" def __init__(self, hub: ModbusHub, config: dict[str, Any]): """Initialize the coil switch.""" super().__init__(hub, config) self._coil = config[CONF_ADDRESS] def turn_on(self, **kwargs): """Set switch on.""" self._write_coil(self._coil, True) self._is_on = True self.schedule_update_ha_state() def turn_off(self, **kwargs): """Set switch off.""" self._write_coil(self._coil, False) self._is_on = False self.schedule_update_ha_state() def _update(self): """Update the state of the switch.""" self._is_on = self._read_coil(self._coil) self.schedule_update_ha_state() def _read_coil(self, coil) -> bool: """Read coil using the Modbus hub slave.""" result = self._hub.read_coils(self._slave, coil, 1) if result is None: self._available = False return False self._available = True # bits[0] select the lowest bit in result, # is_on for a binary_sensor is true if the bit is 1 # The other bits are not considered. return bool(result.bits[0] & 1) def _write_coil(self, coil, value): """Write coil using the Modbus hub slave.""" self._available = self._hub.write_coil(self._slave, coil, value) class ModbusRegisterSwitch(ModbusBaseSwitch, SwitchEntity): """Representation of a Modbus register switch.""" def __init__(self, hub: ModbusHub, config: dict[str, Any]): """Initialize the register switch.""" super().__init__(hub, config) self._register = config[CONF_ADDRESS] self._command_on = config[CONF_COMMAND_ON] self._command_off = config[CONF_COMMAND_OFF] self._state_on = config.get(CONF_STATE_ON, self._command_on) self._state_off = config.get(CONF_STATE_OFF, self._command_off) self._verify_state = config[CONF_VERIFY_STATE] self._verify_register = config.get(CONF_VERIFY_REGISTER, self._register) self._register_type = config[CONF_INPUT_TYPE] self._available = True self._is_on = None def turn_on(self, **kwargs): """Set switch on.""" # Only holding register is writable if self._register_type == CALL_TYPE_REGISTER_HOLDING: self._write_register(self._command_on) if not self._verify_state: self._is_on = True self.schedule_update_ha_state() def turn_off(self, **kwargs): """Set switch off.""" # Only holding register is writable if self._register_type == CALL_TYPE_REGISTER_HOLDING: self._write_register(self._command_off) if not self._verify_state: self._is_on = False self.schedule_update_ha_state() @property def available(self) -> bool: """Return True if entity is available.""" return self._available def _update(self): """Update the state of the switch.""" if not self._verify_state: return value = self._read_register() if value == self._state_on: self._is_on = True elif value == self._state_off: self._is_on = False elif value is not None: _LOGGER.error( "Unexpected response from hub %s, slave %s register %s, got 0x%2x", self._hub.name, self._slave, self._register, value, ) self.schedule_update_ha_state() def _read_register(self) -> int | None: if self._register_type == CALL_TYPE_REGISTER_INPUT: result = self._hub.read_input_registers( self._slave, self._verify_register, 1 ) else: result = self._hub.read_holding_registers( self._slave, self._verify_register, 1 ) if result is None: self._available = False return self._available = True return int(result.registers[0]) def _write_register(self, value): """Write holding register using the Modbus hub slave.""" self._available = self._hub.write_register(self._slave, self._register, value)