"""Support for Modbus.""" import logging import threading from pymodbus.client.sync import ModbusSerialClient, ModbusTcpClient, ModbusUdpClient from pymodbus.constants import Defaults from pymodbus.exceptions import ModbusException from pymodbus.transaction import ModbusRtuFramer from homeassistant.const import ( CONF_BINARY_SENSORS, CONF_COVERS, CONF_DELAY, CONF_HOST, CONF_METHOD, CONF_NAME, CONF_PORT, CONF_SENSORS, CONF_SWITCHES, CONF_TIMEOUT, CONF_TYPE, EVENT_HOMEASSISTANT_STOP, ) from homeassistant.helpers.discovery import load_platform from homeassistant.helpers.event import async_call_later from .const import ( ATTR_ADDRESS, ATTR_HUB, ATTR_STATE, ATTR_UNIT, ATTR_VALUE, CONF_BAUDRATE, CONF_BINARY_SENSOR, CONF_BYTESIZE, CONF_CLIMATE, CONF_CLIMATES, CONF_COVER, CONF_PARITY, CONF_SENSOR, CONF_STOPBITS, CONF_SWITCH, DEFAULT_HUB, MODBUS_DOMAIN as DOMAIN, SERVICE_WRITE_COIL, SERVICE_WRITE_REGISTER, ) _LOGGER = logging.getLogger(__name__) def modbus_setup( hass, config, service_write_register_schema, service_write_coil_schema ): """Set up Modbus component.""" hass.data[DOMAIN] = hub_collect = {} for conf_hub in config[DOMAIN]: hub_collect[conf_hub[CONF_NAME]] = ModbusHub(conf_hub) # modbus needs to be activated before components are loaded # to avoid a racing problem hub_collect[conf_hub[CONF_NAME]].setup(hass) # load platforms for component, conf_key in ( (CONF_CLIMATE, CONF_CLIMATES), (CONF_COVER, CONF_COVERS), (CONF_BINARY_SENSOR, CONF_BINARY_SENSORS), (CONF_SENSOR, CONF_SENSORS), (CONF_SWITCH, CONF_SWITCHES), ): if conf_key in conf_hub: load_platform(hass, component, DOMAIN, conf_hub, config) def stop_modbus(event): """Stop Modbus service.""" for client in hub_collect.values(): client.close() del client def write_register(service): """Write Modbus registers.""" unit = int(float(service.data[ATTR_UNIT])) address = int(float(service.data[ATTR_ADDRESS])) value = service.data[ATTR_VALUE] client_name = ( service.data[ATTR_HUB] if ATTR_HUB in service.data else DEFAULT_HUB ) if isinstance(value, list): hub_collect[client_name].write_registers( unit, address, [int(float(i)) for i in value] ) else: hub_collect[client_name].write_register(unit, address, int(float(value))) def write_coil(service): """Write Modbus coil.""" unit = service.data[ATTR_UNIT] address = service.data[ATTR_ADDRESS] state = service.data[ATTR_STATE] client_name = ( service.data[ATTR_HUB] if ATTR_HUB in service.data else DEFAULT_HUB ) if isinstance(state, list): hub_collect[client_name].write_coils(unit, address, state) else: hub_collect[client_name].write_coil(unit, address, state) # register function to gracefully stop modbus hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop_modbus) # Register services for modbus hass.services.register( DOMAIN, SERVICE_WRITE_REGISTER, write_register, schema=service_write_register_schema, ) hass.services.register( DOMAIN, SERVICE_WRITE_COIL, write_coil, schema=service_write_coil_schema ) return True class ModbusHub: """Thread safe wrapper class for pymodbus.""" def __init__(self, client_config): """Initialize the Modbus hub.""" # generic configuration self._client = None self._cancel_listener = None self._in_error = False self._lock = threading.Lock() self._config_name = client_config[CONF_NAME] self._config_type = client_config[CONF_TYPE] self._config_port = client_config[CONF_PORT] self._config_timeout = client_config[CONF_TIMEOUT] self._config_delay = client_config[CONF_DELAY] Defaults.Timeout = 10 if self._config_type == "serial": # serial configuration self._config_method = client_config[CONF_METHOD] self._config_baudrate = client_config[CONF_BAUDRATE] self._config_stopbits = client_config[CONF_STOPBITS] self._config_bytesize = client_config[CONF_BYTESIZE] self._config_parity = client_config[CONF_PARITY] else: # network configuration self._config_host = client_config[CONF_HOST] @property def name(self): """Return the name of this hub.""" return self._config_name def _log_error(self, exception_error: ModbusException, error_state=True): log_text = "Pymodbus: " + str(exception_error) if self._in_error: _LOGGER.debug(log_text) else: _LOGGER.error(log_text) self._in_error = error_state def setup(self, hass): """Set up pymodbus client.""" try: if self._config_type == "serial": self._client = ModbusSerialClient( method=self._config_method, port=self._config_port, baudrate=self._config_baudrate, stopbits=self._config_stopbits, bytesize=self._config_bytesize, parity=self._config_parity, timeout=self._config_timeout, retry_on_empty=True, ) elif self._config_type == "rtuovertcp": self._client = ModbusTcpClient( host=self._config_host, port=self._config_port, framer=ModbusRtuFramer, timeout=self._config_timeout, ) elif self._config_type == "tcp": self._client = ModbusTcpClient( host=self._config_host, port=self._config_port, timeout=self._config_timeout, ) elif self._config_type == "udp": self._client = ModbusUdpClient( host=self._config_host, port=self._config_port, timeout=self._config_timeout, ) except ModbusException as exception_error: self._log_error(exception_error, error_state=False) return # Connect device self.connect() # Start counting down to allow modbus requests. if self._config_delay: self._cancel_listener = async_call_later( hass, self._config_delay, self.end_delay ) def end_delay(self, args): """End startup delay.""" self._cancel_listener = None self._config_delay = 0 def close(self): """Disconnect client.""" if self._cancel_listener: self._cancel_listener() self._cancel_listener = None with self._lock: try: if self._client: self._client.close() self._client = None except ModbusException as exception_error: self._log_error(exception_error) return def connect(self): """Connect client.""" with self._lock: try: self._client.connect() except ModbusException as exception_error: self._log_error(exception_error, error_state=False) return def read_coils(self, unit, address, count): """Read coils.""" if self._config_delay: return None with self._lock: kwargs = {"unit": unit} if unit else {} try: result = self._client.read_coils(address, count, **kwargs) except ModbusException as exception_error: self._log_error(exception_error) result = exception_error if not hasattr(result, "bits"): self._log_error(result) return None self._in_error = False return result def read_discrete_inputs(self, unit, address, count): """Read discrete inputs.""" if self._config_delay: return None with self._lock: kwargs = {"unit": unit} if unit else {} try: result = self._client.read_discrete_inputs(address, count, **kwargs) except ModbusException as exception_error: result = exception_error if not hasattr(result, "bits"): self._log_error(result) return None self._in_error = False return result def read_input_registers(self, unit, address, count): """Read input registers.""" if self._config_delay: return None with self._lock: kwargs = {"unit": unit} if unit else {} try: result = self._client.read_input_registers(address, count, **kwargs) except ModbusException as exception_error: result = exception_error if not hasattr(result, "registers"): self._log_error(result) return None self._in_error = False return result def read_holding_registers(self, unit, address, count): """Read holding registers.""" if self._config_delay: return None with self._lock: kwargs = {"unit": unit} if unit else {} try: result = self._client.read_holding_registers(address, count, **kwargs) except ModbusException as exception_error: result = exception_error if not hasattr(result, "registers"): self._log_error(result) return None self._in_error = False return result def write_coil(self, unit, address, value) -> bool: """Write coil.""" if self._config_delay: return False with self._lock: kwargs = {"unit": unit} if unit else {} try: result = self._client.write_coil(address, value, **kwargs) except ModbusException as exception_error: result = exception_error if not hasattr(result, "value"): self._log_error(result) return False self._in_error = False return True def write_coils(self, unit, address, values) -> bool: """Write coil.""" if self._config_delay: return False with self._lock: kwargs = {"unit": unit} if unit else {} try: result = self._client.write_coils(address, values, **kwargs) except ModbusException as exception_error: result = exception_error if not hasattr(result, "count"): self._log_error(result) return False self._in_error = False return True def write_register(self, unit, address, value) -> bool: """Write register.""" if self._config_delay: return False with self._lock: kwargs = {"unit": unit} if unit else {} try: result = self._client.write_register(address, value, **kwargs) except ModbusException as exception_error: result = exception_error if not hasattr(result, "value"): self._log_error(result) return False self._in_error = False return True def write_registers(self, unit, address, values) -> bool: """Write registers.""" if self._config_delay: return False with self._lock: kwargs = {"unit": unit} if unit else {} try: result = self._client.write_registers(address, values, **kwargs) except ModbusException as exception_error: result = exception_error if not hasattr(result, "count"): self._log_error(result) return False self._in_error = False return True