core/homeassistant/components/recollect_waste/sensor.py

131 lines
4.3 KiB
Python
Raw Normal View History

"""Support for ReCollect Waste sensors."""
2021-03-18 13:31:38 +00:00
from __future__ import annotations
from aiorecollect.client import PickupType
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA, SensorEntity
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import (
ATTR_ATTRIBUTION,
CONF_FRIENDLY_NAME,
CONF_NAME,
DEVICE_CLASS_TIMESTAMP,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from homeassistant.util.dt import as_utc
from .const import CONF_PLACE_ID, CONF_SERVICE_ID, DATA_COORDINATOR, DOMAIN, LOGGER
2019-07-31 19:25:30 +00:00
ATTR_PICKUP_TYPES = "pickup_types"
ATTR_AREA_NAME = "area_name"
ATTR_NEXT_PICKUP_TYPES = "next_pickup_types"
ATTR_NEXT_PICKUP_DATE = "next_pickup_date"
DEFAULT_ATTRIBUTION = "Pickup data provided by ReCollect Waste"
2019-07-31 19:25:30 +00:00
DEFAULT_NAME = "recollect_waste"
2019-07-31 19:25:30 +00:00
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_PLACE_ID): cv.string,
vol.Required(CONF_SERVICE_ID): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
}
)
@callback
def async_get_pickup_type_names(
2021-03-18 13:31:38 +00:00
entry: ConfigEntry, pickup_types: list[PickupType]
) -> list[str]:
"""Return proper pickup type names from their associated objects."""
return [
t.friendly_name
if entry.options.get(CONF_FRIENDLY_NAME) and t.friendly_name
else t.name
for t in pickup_types
]
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Import Recollect Waste configuration from YAML."""
LOGGER.warning(
"Loading ReCollect Waste via platform setup is deprecated; "
"Please remove it from your configuration"
)
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=config,
)
)
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up ReCollect Waste sensors based on a config entry."""
coordinator = hass.data[DOMAIN][DATA_COORDINATOR][entry.entry_id]
async_add_entities([ReCollectWasteSensor(coordinator, entry)])
class ReCollectWasteSensor(CoordinatorEntity, SensorEntity):
"""ReCollect Waste Sensor."""
_attr_device_class = DEVICE_CLASS_TIMESTAMP
def __init__(self, coordinator: DataUpdateCoordinator, entry: ConfigEntry) -> None:
"""Initialize the sensor."""
super().__init__(coordinator)
self._attr_extra_state_attributes = {ATTR_ATTRIBUTION: DEFAULT_ATTRIBUTION}
self._attr_name = DEFAULT_NAME
self._attr_unique_id = (
f"{entry.data[CONF_PLACE_ID]}{entry.data[CONF_SERVICE_ID]}"
)
self._entry = entry
@callback
def _handle_coordinator_update(self) -> None:
"""Respond to a DataUpdateCoordinator update."""
self.update_from_latest_data()
self.async_write_ha_state()
async def async_added_to_hass(self) -> None:
"""Handle entity which will be added."""
await super().async_added_to_hass()
self.update_from_latest_data()
@callback
def update_from_latest_data(self) -> None:
"""Update the state."""
pickup_event = self.coordinator.data[0]
next_pickup_event = self.coordinator.data[1]
self._attr_extra_state_attributes.update(
{
ATTR_PICKUP_TYPES: async_get_pickup_type_names(
self._entry, pickup_event.pickup_types
),
ATTR_AREA_NAME: pickup_event.area_name,
ATTR_NEXT_PICKUP_TYPES: async_get_pickup_type_names(
self._entry, next_pickup_event.pickup_types
),
ATTR_NEXT_PICKUP_DATE: as_utc(next_pickup_event.date).isoformat(),
}
)
self._attr_native_value = as_utc(pickup_event.date).isoformat()