core/homeassistant/components/tibber/sensor.py

445 lines
15 KiB
Python

"""Support for Tibber sensors."""
from __future__ import annotations
import asyncio
from datetime import timedelta
import logging
from random import randrange
import aiohttp
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.const import (
ELECTRIC_CURRENT_AMPERE,
ELECTRIC_POTENTIAL_VOLT,
ENERGY_KILO_WATT_HOUR,
ENTITY_CATEGORY_DIAGNOSTIC,
EVENT_HOMEASSISTANT_STOP,
PERCENTAGE,
POWER_WATT,
SIGNAL_STRENGTH_DECIBELS,
)
from homeassistant.core import callback
from homeassistant.exceptions import PlatformNotReady
from homeassistant.helpers import update_coordinator
from homeassistant.helpers.device_registry import async_get as async_get_dev_reg
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_registry import async_get as async_get_entity_reg
from homeassistant.util import Throttle, dt as dt_util
from .const import DOMAIN as TIBBER_DOMAIN, MANUFACTURER
_LOGGER = logging.getLogger(__name__)
ICON = "mdi:currency-usd"
SCAN_INTERVAL = timedelta(minutes=1)
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)
PARALLEL_UPDATES = 0
RT_SENSORS: tuple[SensorEntityDescription, ...] = (
SensorEntityDescription(
key="averagePower",
name="average power",
device_class=SensorDeviceClass.POWER,
native_unit_of_measurement=POWER_WATT,
),
SensorEntityDescription(
key="power",
name="power",
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=POWER_WATT,
),
SensorEntityDescription(
key="powerProduction",
name="power production",
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=POWER_WATT,
),
SensorEntityDescription(
key="minPower",
name="min power",
device_class=SensorDeviceClass.POWER,
native_unit_of_measurement=POWER_WATT,
),
SensorEntityDescription(
key="maxPower",
name="max power",
device_class=SensorDeviceClass.POWER,
native_unit_of_measurement=POWER_WATT,
),
SensorEntityDescription(
key="accumulatedConsumption",
name="accumulated consumption",
device_class=SensorDeviceClass.ENERGY,
native_unit_of_measurement=ENERGY_KILO_WATT_HOUR,
state_class=SensorStateClass.TOTAL,
),
SensorEntityDescription(
key="accumulatedConsumptionLastHour",
name="accumulated consumption current hour",
device_class=SensorDeviceClass.ENERGY,
native_unit_of_measurement=ENERGY_KILO_WATT_HOUR,
state_class=SensorStateClass.TOTAL_INCREASING,
),
SensorEntityDescription(
key="accumulatedProduction",
name="accumulated production",
device_class=SensorDeviceClass.ENERGY,
native_unit_of_measurement=ENERGY_KILO_WATT_HOUR,
state_class=SensorStateClass.TOTAL,
),
SensorEntityDescription(
key="accumulatedProductionLastHour",
name="accumulated production current hour",
device_class=SensorDeviceClass.ENERGY,
native_unit_of_measurement=ENERGY_KILO_WATT_HOUR,
state_class=SensorStateClass.TOTAL_INCREASING,
),
SensorEntityDescription(
key="lastMeterConsumption",
name="last meter consumption",
device_class=SensorDeviceClass.ENERGY,
native_unit_of_measurement=ENERGY_KILO_WATT_HOUR,
state_class=SensorStateClass.TOTAL_INCREASING,
),
SensorEntityDescription(
key="lastMeterProduction",
name="last meter production",
device_class=SensorDeviceClass.ENERGY,
native_unit_of_measurement=ENERGY_KILO_WATT_HOUR,
state_class=SensorStateClass.TOTAL_INCREASING,
),
SensorEntityDescription(
key="voltagePhase1",
name="voltage phase1",
device_class=SensorDeviceClass.VOLTAGE,
native_unit_of_measurement=ELECTRIC_POTENTIAL_VOLT,
state_class=SensorStateClass.MEASUREMENT,
),
SensorEntityDescription(
key="voltagePhase2",
name="voltage phase2",
device_class=SensorDeviceClass.VOLTAGE,
native_unit_of_measurement=ELECTRIC_POTENTIAL_VOLT,
state_class=SensorStateClass.MEASUREMENT,
),
SensorEntityDescription(
key="voltagePhase3",
name="voltage phase3",
device_class=SensorDeviceClass.VOLTAGE,
native_unit_of_measurement=ELECTRIC_POTENTIAL_VOLT,
state_class=SensorStateClass.MEASUREMENT,
),
SensorEntityDescription(
key="currentL1",
name="current L1",
device_class=SensorDeviceClass.CURRENT,
native_unit_of_measurement=ELECTRIC_CURRENT_AMPERE,
state_class=SensorStateClass.MEASUREMENT,
),
SensorEntityDescription(
key="currentL2",
name="current L2",
device_class=SensorDeviceClass.CURRENT,
native_unit_of_measurement=ELECTRIC_CURRENT_AMPERE,
state_class=SensorStateClass.MEASUREMENT,
),
SensorEntityDescription(
key="currentL3",
name="current L3",
device_class=SensorDeviceClass.CURRENT,
native_unit_of_measurement=ELECTRIC_CURRENT_AMPERE,
state_class=SensorStateClass.MEASUREMENT,
),
SensorEntityDescription(
key="signalStrength",
name="signal strength",
device_class=SensorDeviceClass.SIGNAL_STRENGTH,
native_unit_of_measurement=SIGNAL_STRENGTH_DECIBELS,
state_class=SensorStateClass.MEASUREMENT,
entity_category=ENTITY_CATEGORY_DIAGNOSTIC,
),
SensorEntityDescription(
key="accumulatedReward",
name="accumulated reward",
device_class=SensorDeviceClass.MONETARY,
state_class=SensorStateClass.MEASUREMENT,
),
SensorEntityDescription(
key="accumulatedCost",
name="accumulated cost",
device_class=SensorDeviceClass.MONETARY,
state_class=SensorStateClass.MEASUREMENT,
),
SensorEntityDescription(
key="powerFactor",
name="power factor",
device_class=SensorDeviceClass.POWER_FACTOR,
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
),
)
async def async_setup_entry(hass, entry, async_add_entities):
"""Set up the Tibber sensor."""
tibber_connection = hass.data.get(TIBBER_DOMAIN)
entity_registry = async_get_entity_reg(hass)
device_registry = async_get_dev_reg(hass)
entities = []
for home in tibber_connection.get_homes(only_active=False):
try:
await home.update_info()
except asyncio.TimeoutError as err:
_LOGGER.error("Timeout connecting to Tibber home: %s ", err)
raise PlatformNotReady() from err
except aiohttp.ClientError as err:
_LOGGER.error("Error connecting to Tibber home: %s ", err)
raise PlatformNotReady() from err
if home.has_active_subscription:
entities.append(TibberSensorElPrice(home))
if home.has_real_time_consumption:
await home.rt_subscribe(
TibberRtDataCoordinator(
async_add_entities, home, hass
).async_set_updated_data
)
# migrate
old_id = home.info["viewer"]["home"]["meteringPointData"]["consumptionEan"]
if old_id is None:
continue
# migrate to new device ids
old_entity_id = entity_registry.async_get_entity_id(
"sensor", TIBBER_DOMAIN, old_id
)
if old_entity_id is not None:
entity_registry.async_update_entity(
old_entity_id, new_unique_id=home.home_id
)
# migrate to new device ids
device_entry = device_registry.async_get_device({(TIBBER_DOMAIN, old_id)})
if device_entry and entry.entry_id in device_entry.config_entries:
device_registry.async_update_device(
device_entry.id, new_identifiers={(TIBBER_DOMAIN, home.home_id)}
)
async_add_entities(entities, True)
class TibberSensor(SensorEntity):
"""Representation of a generic Tibber sensor."""
def __init__(self, *args, tibber_home, **kwargs):
"""Initialize the sensor."""
super().__init__(*args, **kwargs)
self._tibber_home = tibber_home
self._home_name = tibber_home.info["viewer"]["home"]["appNickname"]
if self._home_name is None:
self._home_name = tibber_home.info["viewer"]["home"]["address"].get(
"address1", ""
)
self._device_name = None
self._model = None
@property
def device_info(self):
"""Return the device_info of the device."""
device_info = DeviceInfo(
identifiers={(TIBBER_DOMAIN, self._tibber_home.home_id)},
name=self._device_name,
manufacturer=MANUFACTURER,
)
if self._model is not None:
device_info["model"] = self._model
return device_info
class TibberSensorElPrice(TibberSensor):
"""Representation of a Tibber sensor for el price."""
def __init__(self, tibber_home):
"""Initialize the sensor."""
super().__init__(tibber_home=tibber_home)
self._last_updated = None
self._spread_load_constant = randrange(5000)
self._attr_available = False
self._attr_extra_state_attributes = {
"app_nickname": None,
"grid_company": None,
"estimated_annual_consumption": None,
"price_level": None,
"max_price": None,
"avg_price": None,
"min_price": None,
"off_peak_1": None,
"peak": None,
"off_peak_2": None,
}
self._attr_icon = ICON
self._attr_name = f"Electricity price {self._home_name}"
self._attr_unique_id = self._tibber_home.home_id
self._model = "Price Sensor"
self._device_name = self._attr_name
async def async_update(self):
"""Get the latest data and updates the states."""
now = dt_util.now()
if (
not self._tibber_home.last_data_timestamp
or (self._tibber_home.last_data_timestamp - now).total_seconds()
< 5 * 3600 + self._spread_load_constant
or not self.available
):
_LOGGER.debug("Asking for new data")
await self._fetch_data()
elif (
self._tibber_home.current_price_total
and self._last_updated
and self._last_updated.hour == now.hour
and self._tibber_home.last_data_timestamp
):
return
res = self._tibber_home.current_price_data()
self._attr_native_value, price_level, self._last_updated = res
self._attr_extra_state_attributes["price_level"] = price_level
attrs = self._tibber_home.current_attributes()
self._attr_extra_state_attributes.update(attrs)
self._attr_available = self._attr_native_value is not None
self._attr_native_unit_of_measurement = self._tibber_home.price_unit
@Throttle(MIN_TIME_BETWEEN_UPDATES)
async def _fetch_data(self):
_LOGGER.debug("Fetching data")
try:
await self._tibber_home.update_info_and_price_info()
except (asyncio.TimeoutError, aiohttp.ClientError):
return
data = self._tibber_home.info["viewer"]["home"]
self._attr_extra_state_attributes["app_nickname"] = data["appNickname"]
self._attr_extra_state_attributes["grid_company"] = data["meteringPointData"][
"gridCompany"
]
self._attr_extra_state_attributes["estimated_annual_consumption"] = data[
"meteringPointData"
]["estimatedAnnualConsumption"]
class TibberSensorRT(TibberSensor, update_coordinator.CoordinatorEntity):
"""Representation of a Tibber sensor for real time consumption."""
def __init__(
self,
tibber_home,
description: SensorEntityDescription,
initial_state,
coordinator: TibberRtDataCoordinator,
):
"""Initialize the sensor."""
super().__init__(coordinator=coordinator, tibber_home=tibber_home)
self.entity_description = description
self._model = "Tibber Pulse"
self._device_name = f"{self._model} {self._home_name}"
self._attr_name = f"{description.name} {self._home_name}"
self._attr_native_value = initial_state
self._attr_unique_id = f"{self._tibber_home.home_id}_rt_{description.name}"
if description.key in ("accumulatedCost", "accumulatedReward"):
self._attr_native_unit_of_measurement = tibber_home.currency
@property
def available(self):
"""Return True if entity is available."""
return self._tibber_home.rt_subscription_running
@callback
def _handle_coordinator_update(self) -> None:
if not (live_measurement := self.coordinator.get_live_measurement()): # type: ignore[attr-defined]
return
state = live_measurement.get(self.entity_description.key)
if state is None:
return
if self.entity_description.key == "powerFactor":
state *= 100.0
self._attr_native_value = state
self.async_write_ha_state()
class TibberRtDataCoordinator(update_coordinator.DataUpdateCoordinator):
"""Handle Tibber realtime data."""
def __init__(self, async_add_entities, tibber_home, hass):
"""Initialize the data handler."""
self._async_add_entities = async_add_entities
self._tibber_home = tibber_home
self.hass = hass
self._added_sensors = set()
super().__init__(
hass,
_LOGGER,
name=tibber_home.info["viewer"]["home"]["address"].get(
"address1", "Tibber"
),
)
self._async_remove_device_updates_handler = self.async_add_listener(
self._add_sensors
)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self._handle_ha_stop)
@callback
def _handle_ha_stop(self, _event) -> None:
"""Handle Home Assistant stopping."""
self._async_remove_device_updates_handler()
@callback
def _add_sensors(self):
"""Add sensor."""
if not (live_measurement := self.get_live_measurement()):
return
new_entities = []
for sensor_description in RT_SENSORS:
if sensor_description.key in self._added_sensors:
continue
state = live_measurement.get(sensor_description.key)
if state is None:
continue
entity = TibberSensorRT(
self._tibber_home,
sensor_description,
state,
self,
)
new_entities.append(entity)
self._added_sensors.add(sensor_description.key)
if new_entities:
self._async_add_entities(new_entities)
def get_live_measurement(self):
"""Get live measurement data."""
if errors := self.data.get("errors"):
_LOGGER.error(errors[0])
return None
return self.data.get("data", {}).get("liveMeasurement")