"""Utility meter from sensors providing raw data.""" from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timedelta from decimal import Decimal, DecimalException, InvalidOperation import logging from typing import Any from croniter import croniter from typing_extensions import Self import voluptuous as vol from homeassistant.components.sensor import ( ATTR_LAST_RESET, RestoreSensor, SensorDeviceClass, SensorExtraStoredData, SensorStateClass, ) from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( ATTR_UNIT_OF_MEASUREMENT, CONF_NAME, CONF_UNIQUE_ID, STATE_UNAVAILABLE, STATE_UNKNOWN, UnitOfEnergy, ) from homeassistant.core import HomeAssistant, callback from homeassistant.helpers import entity_platform, entity_registry as er from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.event import ( async_track_point_in_time, async_track_state_change_event, ) from homeassistant.helpers.start import async_at_start from homeassistant.helpers.template import is_number from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from homeassistant.util import slugify import homeassistant.util.dt as dt_util from .const import ( ATTR_CRON_PATTERN, ATTR_VALUE, BIMONTHLY, CONF_CRON_PATTERN, CONF_METER, CONF_METER_DELTA_VALUES, CONF_METER_NET_CONSUMPTION, CONF_METER_OFFSET, CONF_METER_TYPE, CONF_SOURCE_SENSOR, CONF_TARIFF, CONF_TARIFF_ENTITY, CONF_TARIFFS, DAILY, DATA_TARIFF_SENSORS, DATA_UTILITY, HOURLY, MONTHLY, QUARTER_HOURLY, QUARTERLY, SERVICE_CALIBRATE_METER, SIGNAL_RESET_METER, WEEKLY, YEARLY, ) PERIOD2CRON = { QUARTER_HOURLY: "{minute}/15 * * * *", HOURLY: "{minute} * * * *", DAILY: "{minute} {hour} * * *", WEEKLY: "{minute} {hour} * * {day}", MONTHLY: "{minute} {hour} {day} * *", BIMONTHLY: "{minute} {hour} {day} */2 *", QUARTERLY: "{minute} {hour} {day} */3 *", YEARLY: "{minute} {hour} {day} 1/12 *", } _LOGGER = logging.getLogger(__name__) ATTR_SOURCE_ID = "source" ATTR_STATUS = "status" ATTR_PERIOD = "meter_period" ATTR_LAST_PERIOD = "last_period" ATTR_TARIFF = "tariff" DEVICE_CLASS_MAP = { UnitOfEnergy.WATT_HOUR: SensorDeviceClass.ENERGY, UnitOfEnergy.KILO_WATT_HOUR: SensorDeviceClass.ENERGY, } ICON = "mdi:counter" PRECISION = 3 PAUSED = "paused" COLLECTING = "collecting" def validate_is_number(value): """Validate value is a number.""" if is_number(value): return value raise vol.Invalid("Value is not a number") async def async_setup_entry( hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities: AddEntitiesCallback, ) -> None: """Initialize Utility Meter config entry.""" entry_id = config_entry.entry_id registry = er.async_get(hass) # Validate + resolve entity registry id to entity_id source_entity_id = er.async_validate_entity_id( registry, config_entry.options[CONF_SOURCE_SENSOR] ) cron_pattern = None delta_values = config_entry.options[CONF_METER_DELTA_VALUES] meter_offset = timedelta(days=config_entry.options[CONF_METER_OFFSET]) meter_type = config_entry.options[CONF_METER_TYPE] if meter_type == "none": meter_type = None name = config_entry.title net_consumption = config_entry.options[CONF_METER_NET_CONSUMPTION] tariff_entity = hass.data[DATA_UTILITY][entry_id][CONF_TARIFF_ENTITY] meters = [] tariffs = config_entry.options[CONF_TARIFFS] if not tariffs: # Add single sensor, not gated by a tariff selector meter_sensor = UtilityMeterSensor( cron_pattern=cron_pattern, delta_values=delta_values, meter_offset=meter_offset, meter_type=meter_type, name=name, net_consumption=net_consumption, parent_meter=entry_id, source_entity=source_entity_id, tariff_entity=tariff_entity, tariff=None, unique_id=entry_id, ) meters.append(meter_sensor) hass.data[DATA_UTILITY][entry_id][DATA_TARIFF_SENSORS].append(meter_sensor) else: # Add sensors for each tariff for tariff in tariffs: meter_sensor = UtilityMeterSensor( cron_pattern=cron_pattern, delta_values=delta_values, meter_offset=meter_offset, meter_type=meter_type, name=f"{name} {tariff}", net_consumption=net_consumption, parent_meter=entry_id, source_entity=source_entity_id, tariff_entity=tariff_entity, tariff=tariff, unique_id=f"{entry_id}_{tariff}", ) meters.append(meter_sensor) hass.data[DATA_UTILITY][entry_id][DATA_TARIFF_SENSORS].append(meter_sensor) async_add_entities(meters) platform = entity_platform.async_get_current_platform() platform.async_register_entity_service( SERVICE_CALIBRATE_METER, {vol.Required(ATTR_VALUE): validate_is_number}, "async_calibrate", ) async def async_setup_platform( hass: HomeAssistant, config: ConfigType, async_add_entities: AddEntitiesCallback, discovery_info: DiscoveryInfoType | None = None, ) -> None: """Set up the utility meter sensor.""" if discovery_info is None: _LOGGER.error( "This platform is not available to configure " "from 'sensor:' in configuration.yaml" ) return meters = [] for conf in discovery_info.values(): meter = conf[CONF_METER] conf_meter_source = hass.data[DATA_UTILITY][meter][CONF_SOURCE_SENSOR] conf_meter_unique_id = hass.data[DATA_UTILITY][meter].get(CONF_UNIQUE_ID) conf_sensor_tariff = conf.get(CONF_TARIFF, "single_tariff") conf_sensor_unique_id = ( f"{conf_meter_unique_id}_{conf_sensor_tariff}" if conf_meter_unique_id else None ) conf_meter_name = hass.data[DATA_UTILITY][meter].get(CONF_NAME, meter) conf_sensor_tariff = conf.get(CONF_TARIFF) suggested_entity_id = None if conf_sensor_tariff: conf_sensor_name = f"{conf_meter_name} {conf_sensor_tariff}" slug = slugify(f"{meter} {conf_sensor_tariff}") suggested_entity_id = f"sensor.{slug}" else: conf_sensor_name = conf_meter_name conf_meter_type = hass.data[DATA_UTILITY][meter].get(CONF_METER_TYPE) conf_meter_offset = hass.data[DATA_UTILITY][meter][CONF_METER_OFFSET] conf_meter_delta_values = hass.data[DATA_UTILITY][meter][ CONF_METER_DELTA_VALUES ] conf_meter_net_consumption = hass.data[DATA_UTILITY][meter][ CONF_METER_NET_CONSUMPTION ] conf_meter_tariff_entity = hass.data[DATA_UTILITY][meter].get( CONF_TARIFF_ENTITY ) conf_cron_pattern = hass.data[DATA_UTILITY][meter].get(CONF_CRON_PATTERN) meter_sensor = UtilityMeterSensor( cron_pattern=conf_cron_pattern, delta_values=conf_meter_delta_values, meter_offset=conf_meter_offset, meter_type=conf_meter_type, name=conf_sensor_name, net_consumption=conf_meter_net_consumption, parent_meter=meter, source_entity=conf_meter_source, tariff_entity=conf_meter_tariff_entity, tariff=conf_sensor_tariff, unique_id=conf_sensor_unique_id, suggested_entity_id=suggested_entity_id, ) meters.append(meter_sensor) hass.data[DATA_UTILITY][meter][DATA_TARIFF_SENSORS].append(meter_sensor) async_add_entities(meters) platform = entity_platform.async_get_current_platform() platform.async_register_entity_service( SERVICE_CALIBRATE_METER, {vol.Required(ATTR_VALUE): validate_is_number}, "async_calibrate", ) @dataclass class UtilitySensorExtraStoredData(SensorExtraStoredData): """Object to hold extra stored data.""" last_period: Decimal last_reset: datetime | None status: str def as_dict(self) -> dict[str, Any]: """Return a dict representation of the utility sensor data.""" data = super().as_dict() data["last_period"] = str(self.last_period) if isinstance(self.last_reset, (datetime)): data["last_reset"] = self.last_reset.isoformat() data["status"] = self.status return data @classmethod def from_dict(cls, restored: dict[str, Any]) -> Self | None: """Initialize a stored sensor state from a dict.""" extra = SensorExtraStoredData.from_dict(restored) if extra is None: return None try: last_period: Decimal = Decimal(restored["last_period"]) last_reset: datetime | None = dt_util.parse_datetime(restored["last_reset"]) status: str = restored["status"] except KeyError: # restored is a dict, but does not have all values return None except InvalidOperation: # last_period is corrupted return None return cls( extra.native_value, extra.native_unit_of_measurement, last_period, last_reset, status, ) class UtilityMeterSensor(RestoreSensor): """Representation of an utility meter sensor.""" _attr_should_poll = False def __init__( self, *, cron_pattern, delta_values, meter_offset, meter_type, name, net_consumption, parent_meter, source_entity, tariff_entity, tariff, unique_id, suggested_entity_id=None, ): """Initialize the Utility Meter sensor.""" self._attr_unique_id = unique_id self.entity_id = suggested_entity_id self._parent_meter = parent_meter self._sensor_source_id = source_entity self._state = None self._last_period = Decimal(0) self._last_reset = dt_util.utcnow() self._collecting = None self._name = name self._unit_of_measurement = None self._period = meter_type if meter_type is not None: # For backwards compatibility reasons we convert the period and offset into a cron pattern self._cron_pattern = PERIOD2CRON[meter_type].format( minute=meter_offset.seconds % 3600 // 60, hour=meter_offset.seconds // 3600, day=meter_offset.days + 1, ) _LOGGER.debug("CRON pattern: %s", self._cron_pattern) else: self._cron_pattern = cron_pattern self._sensor_delta_values = delta_values self._sensor_net_consumption = net_consumption self._tariff = tariff self._tariff_entity = tariff_entity def start(self, unit): """Initialize unit and state upon source initial update.""" self._unit_of_measurement = unit self._state = 0 self.async_write_ha_state() @callback def async_reading(self, event): """Handle the sensor state changes.""" old_state = event.data.get("old_state") new_state = event.data.get("new_state") if self._state is None and new_state.state: # First state update initializes the utility_meter sensors source_state = self.hass.states.get(self._sensor_source_id) for sensor in self.hass.data[DATA_UTILITY][self._parent_meter][ DATA_TARIFF_SENSORS ]: sensor.start(source_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)) if ( new_state is None or new_state.state in [STATE_UNKNOWN, STATE_UNAVAILABLE] or ( not self._sensor_delta_values and ( old_state is None or old_state.state in [STATE_UNKNOWN, STATE_UNAVAILABLE] ) ) ): return self._unit_of_measurement = new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) try: if self._sensor_delta_values: adjustment = Decimal(new_state.state) else: adjustment = Decimal(new_state.state) - Decimal(old_state.state) if (not self._sensor_net_consumption) and adjustment < 0: # Source sensor just rolled over for unknown reasons, return self._state += adjustment except DecimalException as err: if self._sensor_delta_values: _LOGGER.warning("Invalid adjustment of %s: %s", new_state.state, err) else: _LOGGER.warning( "Invalid state (%s > %s): %s", old_state.state, new_state.state, err ) self.async_write_ha_state() @callback def async_tariff_change(self, event): """Handle tariff changes.""" if (new_state := event.data.get("new_state")) is None: return self._change_status(new_state.state) def _change_status(self, tariff): if self._tariff == tariff: self._collecting = async_track_state_change_event( self.hass, [self._sensor_source_id], self.async_reading ) else: if self._collecting: self._collecting() self._collecting = None _LOGGER.debug( "%s - %s - source <%s>", self._name, COLLECTING if self._collecting is not None else PAUSED, self._sensor_source_id, ) self.async_write_ha_state() async def _async_reset_meter(self, event): """Determine cycle - Helper function for larger than daily cycles.""" if self._cron_pattern is not None: self.async_on_remove( async_track_point_in_time( self.hass, self._async_reset_meter, croniter(self._cron_pattern, dt_util.now()).get_next(datetime), ) ) await self.async_reset_meter(self._tariff_entity) async def async_reset_meter(self, entity_id): """Reset meter.""" if self._tariff_entity != entity_id: return _LOGGER.debug("Reset utility meter <%s>", self.entity_id) self._last_reset = dt_util.utcnow() self._last_period = Decimal(self._state) if self._state else Decimal(0) self._state = 0 self.async_write_ha_state() async def async_calibrate(self, value): """Calibrate the Utility Meter with a given value.""" _LOGGER.debug("Calibrate %s = %s type(%s)", self._name, value, type(value)) self._state = Decimal(str(value)) self.async_write_ha_state() async def async_added_to_hass(self): """Handle entity which will be added.""" await super().async_added_to_hass() if self._cron_pattern is not None: self.async_on_remove( async_track_point_in_time( self.hass, self._async_reset_meter, croniter(self._cron_pattern, dt_util.now()).get_next(datetime), ) ) self.async_on_remove( async_dispatcher_connect( self.hass, SIGNAL_RESET_METER, self.async_reset_meter ) ) if (last_sensor_data := await self.async_get_last_sensor_data()) is not None: # new introduced in 2022.04 self._state = last_sensor_data.native_value self._unit_of_measurement = last_sensor_data.native_unit_of_measurement self._last_period = last_sensor_data.last_period self._last_reset = last_sensor_data.last_reset if last_sensor_data.status == COLLECTING: # Null lambda to allow cancelling the collection on tariff change self._collecting = lambda: None elif state := await self.async_get_last_state(): # legacy to be removed on 2022.10 (we are keeping this to avoid utility_meter counter losses) try: self._state = Decimal(state.state) except InvalidOperation: _LOGGER.error( "Could not restore state <%s>. Resetting utility_meter.%s", state.state, self.name, ) else: self._unit_of_measurement = state.attributes.get( ATTR_UNIT_OF_MEASUREMENT ) self._last_period = ( Decimal(state.attributes[ATTR_LAST_PERIOD]) if state.attributes.get(ATTR_LAST_PERIOD) and is_number(state.attributes[ATTR_LAST_PERIOD]) else Decimal(0) ) self._last_reset = dt_util.as_utc( dt_util.parse_datetime(state.attributes.get(ATTR_LAST_RESET)) ) if state.attributes.get(ATTR_STATUS) == COLLECTING: # Null lambda to allow cancelling the collection on tariff change self._collecting = lambda: None @callback def async_source_tracking(event): """Wait for source to be ready, then start meter.""" if self._tariff_entity is not None: _LOGGER.debug( "<%s> tracks utility meter %s", self.name, self._tariff_entity ) self.async_on_remove( async_track_state_change_event( self.hass, [self._tariff_entity], self.async_tariff_change ) ) tariff_entity_state = self.hass.states.get(self._tariff_entity) if not tariff_entity_state: # The utility meter is not yet added return self._change_status(tariff_entity_state.state) return _LOGGER.debug( "<%s> collecting %s from %s", self.name, self._unit_of_measurement, self._sensor_source_id, ) self._collecting = async_track_state_change_event( self.hass, [self._sensor_source_id], self.async_reading ) self.async_on_remove(async_at_start(self.hass, async_source_tracking)) async def async_will_remove_from_hass(self) -> None: """Run when entity will be removed from hass.""" if self._collecting: self._collecting() self._collecting = None @property def name(self): """Return the name of the sensor.""" return self._name @property def native_value(self): """Return the state of the sensor.""" return self._state @property def device_class(self): """Return the device class of the sensor.""" return DEVICE_CLASS_MAP.get(self._unit_of_measurement) @property def state_class(self): """Return the device class of the sensor.""" return ( SensorStateClass.TOTAL if self._sensor_net_consumption else SensorStateClass.TOTAL_INCREASING ) @property def native_unit_of_measurement(self): """Return the unit the value is expressed in.""" return self._unit_of_measurement @property def extra_state_attributes(self): """Return the state attributes of the sensor.""" state_attr = { ATTR_SOURCE_ID: self._sensor_source_id, ATTR_STATUS: PAUSED if self._collecting is None else COLLECTING, ATTR_LAST_PERIOD: str(self._last_period), } if self._period is not None: state_attr[ATTR_PERIOD] = self._period if self._cron_pattern is not None: state_attr[ATTR_CRON_PATTERN] = self._cron_pattern if self._tariff is not None: state_attr[ATTR_TARIFF] = self._tariff # last_reset in utility meter was used before last_reset was added for long term # statistics in base sensor. base sensor only supports last reset # sensors with state_class set to total. # To avoid a breaking change we set last_reset directly # in extra state attributes. if last_reset := self._last_reset: state_attr[ATTR_LAST_RESET] = last_reset.isoformat() return state_attr @property def icon(self): """Return the icon to use in the frontend, if any.""" return ICON @property def extra_restore_state_data(self) -> UtilitySensorExtraStoredData: """Return sensor specific state data to be restored.""" return UtilitySensorExtraStoredData( self.native_value, self.native_unit_of_measurement, self._last_period, self._last_reset, PAUSED if self._collecting is None else COLLECTING, ) async def async_get_last_sensor_data(self) -> UtilitySensorExtraStoredData | None: """Restore Utility Meter Sensor Extra Stored Data.""" if (restored_last_extra_data := await self.async_get_last_extra_data()) is None: return None return UtilitySensorExtraStoredData.from_dict( restored_last_extra_data.as_dict() )