"""Support for UPnP/IGD Sensors.""" from datetime import timedelta from typing import Any, Mapping, Optional from homeassistant.config_entries import ConfigEntry from homeassistant.const import DATA_BYTES, DATA_RATE_KIBIBYTES_PER_SECOND from homeassistant.helpers import device_registry as dr from homeassistant.helpers.typing import HomeAssistantType from homeassistant.helpers.update_coordinator import ( CoordinatorEntity, DataUpdateCoordinator, ) from .const import ( BYTES_RECEIVED, BYTES_SENT, CONFIG_ENTRY_SCAN_INTERVAL, CONFIG_ENTRY_UDN, DATA_PACKETS, DATA_RATE_PACKETS_PER_SECOND, DEFAULT_SCAN_INTERVAL, DOMAIN, DOMAIN_COORDINATORS, DOMAIN_DEVICES, KIBIBYTE, LOGGER as _LOGGER, PACKETS_RECEIVED, PACKETS_SENT, TIMESTAMP, ) from .device import Device SENSOR_TYPES = { BYTES_RECEIVED: { "device_value_key": BYTES_RECEIVED, "name": f"{DATA_BYTES} received", "unit": DATA_BYTES, "unique_id": BYTES_RECEIVED, "derived_name": f"{DATA_RATE_KIBIBYTES_PER_SECOND} received", "derived_unit": DATA_RATE_KIBIBYTES_PER_SECOND, "derived_unique_id": "KiB/sec_received", }, BYTES_SENT: { "device_value_key": BYTES_SENT, "name": f"{DATA_BYTES} sent", "unit": DATA_BYTES, "unique_id": BYTES_SENT, "derived_name": f"{DATA_RATE_KIBIBYTES_PER_SECOND} sent", "derived_unit": DATA_RATE_KIBIBYTES_PER_SECOND, "derived_unique_id": "KiB/sec_sent", }, PACKETS_RECEIVED: { "device_value_key": PACKETS_RECEIVED, "name": f"{DATA_PACKETS} received", "unit": DATA_PACKETS, "unique_id": PACKETS_RECEIVED, "derived_name": f"{DATA_RATE_PACKETS_PER_SECOND} received", "derived_unit": DATA_RATE_PACKETS_PER_SECOND, "derived_unique_id": "packets/sec_received", }, PACKETS_SENT: { "device_value_key": PACKETS_SENT, "name": f"{DATA_PACKETS} sent", "unit": DATA_PACKETS, "unique_id": PACKETS_SENT, "derived_name": f"{DATA_RATE_PACKETS_PER_SECOND} sent", "derived_unit": DATA_RATE_PACKETS_PER_SECOND, "derived_unique_id": "packets/sec_sent", }, } async def async_setup_platform( hass: HomeAssistantType, config, async_add_entities, discovery_info=None ) -> None: """Old way of setting up UPnP/IGD sensors.""" _LOGGER.debug( "async_setup_platform: config: %s, discovery: %s", config, discovery_info ) async def async_setup_entry( hass, config_entry: ConfigEntry, async_add_entities ) -> None: """Set up the UPnP/IGD sensors.""" udn = config_entry.data[CONFIG_ENTRY_UDN] device: Device = hass.data[DOMAIN][DOMAIN_DEVICES][udn] update_interval_sec = config_entry.options.get( CONFIG_ENTRY_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL ) update_interval = timedelta(seconds=update_interval_sec) _LOGGER.debug("update_interval: %s", update_interval) _LOGGER.debug("Adding sensors") coordinator = DataUpdateCoordinator[Mapping[str, Any]]( hass, _LOGGER, name=device.name, update_method=device.async_get_traffic_data, update_interval=update_interval, ) await coordinator.async_refresh() hass.data[DOMAIN][DOMAIN_COORDINATORS][udn] = coordinator sensors = [ RawUpnpSensor(coordinator, device, SENSOR_TYPES[BYTES_RECEIVED]), RawUpnpSensor(coordinator, device, SENSOR_TYPES[BYTES_SENT]), RawUpnpSensor(coordinator, device, SENSOR_TYPES[PACKETS_RECEIVED]), RawUpnpSensor(coordinator, device, SENSOR_TYPES[PACKETS_SENT]), DerivedUpnpSensor(coordinator, device, SENSOR_TYPES[BYTES_RECEIVED]), DerivedUpnpSensor(coordinator, device, SENSOR_TYPES[BYTES_SENT]), DerivedUpnpSensor(coordinator, device, SENSOR_TYPES[PACKETS_RECEIVED]), DerivedUpnpSensor(coordinator, device, SENSOR_TYPES[PACKETS_SENT]), ] async_add_entities(sensors, True) class UpnpSensor(CoordinatorEntity): """Base class for UPnP/IGD sensors.""" def __init__( self, coordinator: DataUpdateCoordinator[Mapping[str, Any]], device: Device, sensor_type: Mapping[str, str], update_multiplier: int = 2, ) -> None: """Initialize the base sensor.""" super().__init__(coordinator) self._device = device self._sensor_type = sensor_type self._update_counter_max = update_multiplier self._update_counter = 0 @property def icon(self) -> str: """Icon to use in the frontend, if any.""" return "mdi:server-network" @property def available(self) -> bool: """Return if entity is available.""" device_value_key = self._sensor_type["device_value_key"] return ( self.coordinator.last_update_success and device_value_key in self.coordinator.data ) @property def name(self) -> str: """Return the name of the sensor.""" return f"{self._device.name} {self._sensor_type['name']}" @property def unique_id(self) -> str: """Return an unique ID.""" return f"{self._device.udn}_{self._sensor_type['unique_id']}" @property def unit_of_measurement(self) -> str: """Return the unit of measurement of this entity, if any.""" return self._sensor_type["unit"] @property def device_info(self) -> Mapping[str, Any]: """Get device info.""" return { "connections": {(dr.CONNECTION_UPNP, self._device.udn)}, "name": self._device.name, "manufacturer": self._device.manufacturer, "model": self._device.model_name, } class RawUpnpSensor(UpnpSensor): """Representation of a UPnP/IGD sensor.""" @property def state(self) -> Optional[str]: """Return the state of the device.""" device_value_key = self._sensor_type["device_value_key"] value = self.coordinator.data[device_value_key] if value is None: return None return format(value, "d") class DerivedUpnpSensor(UpnpSensor): """Representation of a UNIT Sent/Received per second sensor.""" def __init__(self, coordinator, device, sensor_type) -> None: """Initialize sensor.""" super().__init__(coordinator, device, sensor_type) self._last_value = None self._last_timestamp = None @property def name(self) -> str: """Return the name of the sensor.""" return f"{self._device.name} {self._sensor_type['derived_name']}" @property def unique_id(self) -> str: """Return an unique ID.""" return f"{self._device.udn}_{self._sensor_type['derived_unique_id']}" @property def unit_of_measurement(self) -> str: """Return the unit of measurement of this entity, if any.""" return self._sensor_type["derived_unit"] def _has_overflowed(self, current_value) -> bool: """Check if value has overflowed.""" return current_value < self._last_value @property def state(self) -> Optional[str]: """Return the state of the device.""" # Can't calculate any derivative if we have only one value. device_value_key = self._sensor_type["device_value_key"] current_value = self.coordinator.data[device_value_key] if current_value is None: return None current_timestamp = self.coordinator.data[TIMESTAMP] if self._last_value is None or self._has_overflowed(current_value): self._last_value = current_value self._last_timestamp = current_timestamp return None # Calculate derivative. delta_value = current_value - self._last_value if self._sensor_type["unit"] == DATA_BYTES: delta_value /= KIBIBYTE delta_time = current_timestamp - self._last_timestamp if delta_time.seconds == 0: # Prevent division by 0. return None derived = delta_value / delta_time.seconds # Store current values for future use. self._last_value = current_value self._last_timestamp = current_timestamp return format(derived, ".1f")