159 lines
5.4 KiB
Python
159 lines
5.4 KiB
Python
"""Support for Modbus Register sensors."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import struct
|
|
from typing import Any
|
|
|
|
from homeassistant.components.sensor import SensorEntity
|
|
from homeassistant.const import (
|
|
CONF_COUNT,
|
|
CONF_NAME,
|
|
CONF_OFFSET,
|
|
CONF_SENSORS,
|
|
CONF_STRUCTURE,
|
|
CONF_UNIT_OF_MEASUREMENT,
|
|
)
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.restore_state import RestoreEntity
|
|
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
|
|
|
from .base_platform import BasePlatform
|
|
from .const import (
|
|
CONF_DATA_TYPE,
|
|
CONF_PRECISION,
|
|
CONF_SCALE,
|
|
CONF_SWAP,
|
|
CONF_SWAP_BYTE,
|
|
CONF_SWAP_WORD,
|
|
CONF_SWAP_WORD_BYTE,
|
|
DATA_TYPE_STRING,
|
|
MODBUS_DOMAIN,
|
|
)
|
|
from .modbus import ModbusHub
|
|
|
|
PARALLEL_UPDATES = 1
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
async def async_setup_platform(
|
|
hass: HomeAssistant,
|
|
config: ConfigType,
|
|
async_add_entities,
|
|
discovery_info: DiscoveryInfoType | None = None,
|
|
):
|
|
"""Set up the Modbus sensors."""
|
|
sensors = []
|
|
|
|
if discovery_info is None: # pragma: no cover
|
|
return
|
|
|
|
for entry in discovery_info[CONF_SENSORS]:
|
|
hub = hass.data[MODBUS_DOMAIN][discovery_info[CONF_NAME]]
|
|
sensors.append(ModbusRegisterSensor(hub, entry))
|
|
|
|
async_add_entities(sensors)
|
|
|
|
|
|
class ModbusRegisterSensor(BasePlatform, RestoreEntity, SensorEntity):
|
|
"""Modbus register sensor."""
|
|
|
|
def __init__(
|
|
self,
|
|
hub: ModbusHub,
|
|
entry: dict[str, Any],
|
|
) -> None:
|
|
"""Initialize the modbus register sensor."""
|
|
super().__init__(hub, entry)
|
|
self._unit_of_measurement = entry.get(CONF_UNIT_OF_MEASUREMENT)
|
|
self._count = int(entry[CONF_COUNT])
|
|
self._swap = entry[CONF_SWAP]
|
|
self._scale = entry[CONF_SCALE]
|
|
self._offset = entry[CONF_OFFSET]
|
|
self._precision = entry[CONF_PRECISION]
|
|
self._structure = entry.get(CONF_STRUCTURE)
|
|
self._data_type = entry[CONF_DATA_TYPE]
|
|
|
|
async def async_added_to_hass(self):
|
|
"""Handle entity which will be added."""
|
|
await self.async_base_added_to_hass()
|
|
state = await self.async_get_last_state()
|
|
if state:
|
|
self._value = state.state
|
|
|
|
@property
|
|
def state(self):
|
|
"""Return the state of the sensor."""
|
|
return self._value
|
|
|
|
@property
|
|
def unit_of_measurement(self):
|
|
"""Return the unit of measurement."""
|
|
return self._unit_of_measurement
|
|
|
|
def _swap_registers(self, registers):
|
|
"""Do swap as needed."""
|
|
if self._swap in [CONF_SWAP_BYTE, CONF_SWAP_WORD_BYTE]:
|
|
# convert [12][34] --> [21][43]
|
|
for i, register in enumerate(registers):
|
|
registers[i] = int.from_bytes(
|
|
register.to_bytes(2, byteorder="little"),
|
|
byteorder="big",
|
|
signed=False,
|
|
)
|
|
if self._swap in [CONF_SWAP_WORD, CONF_SWAP_WORD_BYTE]:
|
|
# convert [12][34] ==> [34][12]
|
|
registers.reverse()
|
|
return registers
|
|
|
|
async def async_update(self, now=None):
|
|
"""Update the state of the sensor."""
|
|
# remark "now" is a dummy parameter to avoid problems with
|
|
# async_track_time_interval
|
|
result = await self._hub.async_pymodbus_call(
|
|
self._slave, self._address, self._count, self._input_type
|
|
)
|
|
if result is None:
|
|
self._available = False
|
|
self.async_write_ha_state()
|
|
return
|
|
|
|
registers = self._swap_registers(result.registers)
|
|
byte_string = b"".join([x.to_bytes(2, byteorder="big") for x in registers])
|
|
if self._data_type == DATA_TYPE_STRING:
|
|
self._value = byte_string.decode()
|
|
else:
|
|
val = struct.unpack(self._structure, byte_string)
|
|
|
|
# Issue: https://github.com/home-assistant/core/issues/41944
|
|
# If unpack() returns a tuple greater than 1, don't try to process the value.
|
|
# Instead, return the values of unpack(...) separated by commas.
|
|
if len(val) > 1:
|
|
# Apply scale and precision to floats and ints
|
|
v_result = []
|
|
for entry in val:
|
|
v_temp = self._scale * entry + self._offset
|
|
|
|
# We could convert int to float, and the code would still work; however
|
|
# we lose some precision, and unit tests will fail. Therefore, we do
|
|
# the conversion only when it's absolutely necessary.
|
|
if isinstance(v_temp, int) and self._precision == 0:
|
|
v_result.append(str(v_temp))
|
|
else:
|
|
v_result.append(f"{float(v_temp):.{self._precision}f}")
|
|
self._value = ",".join(map(str, v_result))
|
|
else:
|
|
# Apply scale and precision to floats and ints
|
|
val = self._scale * val[0] + self._offset
|
|
|
|
# We could convert int to float, and the code would still work; however
|
|
# we lose some precision, and unit tests will fail. Therefore, we do
|
|
# the conversion only when it's absolutely necessary.
|
|
if isinstance(val, int) and self._precision == 0:
|
|
self._value = str(val)
|
|
else:
|
|
self._value = f"{float(val):.{self._precision}f}"
|
|
|
|
self._available = True
|
|
self.async_write_ha_state()
|