Add coordinator to Rova (#114288)
* Add coordinator to Rova * Add coordinator to Rova * Fix * Update homeassistant/components/rova/sensor.py Co-authored-by: Jan-Philipp Benecke <github@bnck.me> * Fix --------- Co-authored-by: Jan-Philipp Benecke <github@bnck.me>pull/114303/head
parent
aaecbbdc40
commit
c222cfd692
|
@ -1155,8 +1155,6 @@ omit =
|
|||
homeassistant/components/roon/media_player.py
|
||||
homeassistant/components/roon/server.py
|
||||
homeassistant/components/route53/*
|
||||
homeassistant/components/rova/__init__.py
|
||||
homeassistant/components/rova/sensor.py
|
||||
homeassistant/components/rpi_camera/*
|
||||
homeassistant/components/rtorrent/sensor.py
|
||||
homeassistant/components/ruuvi_gateway/__init__.py
|
||||
|
|
|
@ -11,6 +11,7 @@ from homeassistant.core import HomeAssistant
|
|||
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
|
||||
|
||||
from .const import CONF_HOUSE_NUMBER, CONF_HOUSE_NUMBER_SUFFIX, CONF_ZIP_CODE, DOMAIN
|
||||
from .coordinator import RovaCoordinator
|
||||
|
||||
PLATFORMS: list[Platform] = [Platform.SENSOR]
|
||||
|
||||
|
@ -32,7 +33,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
if not rova_area:
|
||||
raise ConfigEntryError("Rova does not collect garbage in this area")
|
||||
|
||||
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = api
|
||||
coordinator = RovaCoordinator(hass, api)
|
||||
|
||||
await coordinator.async_config_entry_first_refresh()
|
||||
|
||||
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
|
||||
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
||||
return True
|
||||
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
"""Coordinator for Rova."""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from rova.rova import Rova
|
||||
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
|
||||
from homeassistant.util.dt import get_time_zone
|
||||
|
||||
from .const import DOMAIN, LOGGER
|
||||
|
||||
|
||||
class RovaCoordinator(DataUpdateCoordinator[dict[str, datetime]]):
|
||||
"""Class to manage fetching Rova data."""
|
||||
|
||||
def __init__(self, hass: HomeAssistant, api: Rova) -> None:
|
||||
"""Initialize."""
|
||||
super().__init__(
|
||||
hass,
|
||||
LOGGER,
|
||||
name=DOMAIN,
|
||||
update_interval=timedelta(hours=12),
|
||||
)
|
||||
self.api = api
|
||||
|
||||
async def _async_update_data(self) -> dict[str, datetime]:
|
||||
"""Fetch data from Rova API."""
|
||||
|
||||
items = await self.hass.async_add_executor_job(self.api.get_calendar_items)
|
||||
|
||||
data = {}
|
||||
|
||||
for item in items:
|
||||
date = datetime.strptime(item["Date"], "%Y-%m-%dT%H:%M:%S").replace(
|
||||
tzinfo=get_time_zone("Europe/Amsterdam")
|
||||
)
|
||||
code = item["GarbageTypeCode"].lower()
|
||||
if code not in data:
|
||||
data[code] = date
|
||||
|
||||
return data
|
|
@ -2,11 +2,8 @@
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
from datetime import datetime
|
||||
|
||||
from requests.exceptions import ConnectTimeout, HTTPError
|
||||
from rova.rova import Rova
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.sensor import (
|
||||
|
@ -23,22 +20,13 @@ import homeassistant.helpers.config_validation as cv
|
|||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
|
||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||
from homeassistant.util import Throttle
|
||||
from homeassistant.util.dt import get_time_zone
|
||||
from homeassistant.helpers.update_coordinator import CoordinatorEntity
|
||||
|
||||
from .const import (
|
||||
CONF_HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX,
|
||||
CONF_ZIP_CODE,
|
||||
DOMAIN,
|
||||
LOGGER,
|
||||
)
|
||||
from .const import CONF_HOUSE_NUMBER, CONF_HOUSE_NUMBER_SUFFIX, CONF_ZIP_CODE, DOMAIN
|
||||
from .coordinator import RovaCoordinator
|
||||
|
||||
ISSUE_PLACEHOLDER = {"url": "/config/integrations/dashboard/add?domain=rova"}
|
||||
|
||||
UPDATE_DELAY = timedelta(hours=12)
|
||||
SCAN_INTERVAL = timedelta(hours=12)
|
||||
|
||||
SENSOR_TYPES = {
|
||||
"bio": SensorEntityDescription(
|
||||
key="gft",
|
||||
|
@ -125,71 +113,35 @@ async def async_setup_entry(
|
|||
async_add_entities: AddEntitiesCallback,
|
||||
) -> None:
|
||||
"""Add Rova entry."""
|
||||
# get api from hass
|
||||
api: Rova = hass.data[DOMAIN][entry.entry_id]
|
||||
coordinator: RovaCoordinator = hass.data[DOMAIN][entry.entry_id]
|
||||
|
||||
# Create rova data service which will retrieve and update the data.
|
||||
data_service = RovaData(api)
|
||||
assert entry.unique_id
|
||||
unique_id = entry.unique_id
|
||||
|
||||
# generate unique name for rova integration
|
||||
name = f"{entry.data[CONF_ZIP_CODE]}{entry.data[CONF_HOUSE_NUMBER]}{entry.data[CONF_HOUSE_NUMBER_SUFFIX]}"
|
||||
|
||||
# Create a new sensor for each garbage type.
|
||||
entities = [
|
||||
RovaSensor(name, description, data_service)
|
||||
async_add_entities(
|
||||
RovaSensor(unique_id, description, coordinator)
|
||||
for key, description in SENSOR_TYPES.items()
|
||||
]
|
||||
async_add_entities(entities, True)
|
||||
)
|
||||
|
||||
|
||||
class RovaSensor(SensorEntity):
|
||||
class RovaSensor(CoordinatorEntity[RovaCoordinator], SensorEntity):
|
||||
"""Representation of a Rova sensor."""
|
||||
|
||||
def __init__(
|
||||
self, platform_name, description: SensorEntityDescription, data_service
|
||||
self,
|
||||
unique_id: str,
|
||||
description: SensorEntityDescription,
|
||||
coordinator: RovaCoordinator,
|
||||
) -> None:
|
||||
"""Initialize the sensor."""
|
||||
super().__init__(coordinator)
|
||||
self.entity_description = description
|
||||
self.data_service = data_service
|
||||
|
||||
self._attr_name = f"{platform_name}_{description.key}"
|
||||
self._attr_unique_id = f"{platform_name}_{description.key}"
|
||||
self._attr_name = f"{unique_id}_{description.key}"
|
||||
self._attr_unique_id = f"{unique_id}_{description.key}"
|
||||
self._attr_device_class = SensorDeviceClass.TIMESTAMP
|
||||
|
||||
def update(self) -> None:
|
||||
"""Get the latest data from the sensor and update the state."""
|
||||
self.data_service.update()
|
||||
pickup_date = self.data_service.data.get(self.entity_description.key)
|
||||
if pickup_date is not None:
|
||||
self._attr_native_value = pickup_date
|
||||
|
||||
|
||||
class RovaData:
|
||||
"""Get and update the latest data from the Rova API."""
|
||||
|
||||
def __init__(self, api) -> None:
|
||||
"""Initialize the data object."""
|
||||
self.api = api
|
||||
self.data: dict[str, Any] = {}
|
||||
|
||||
@Throttle(UPDATE_DELAY)
|
||||
def update(self):
|
||||
"""Update the data from the Rova API."""
|
||||
|
||||
try:
|
||||
items = self.api.get_calendar_items()
|
||||
except (ConnectTimeout, HTTPError):
|
||||
LOGGER.error("Could not retrieve data, retry again later")
|
||||
return
|
||||
|
||||
self.data = {}
|
||||
|
||||
for item in items:
|
||||
date = datetime.strptime(item["Date"], "%Y-%m-%dT%H:%M:%S").replace(
|
||||
tzinfo=get_time_zone("Europe/Amsterdam")
|
||||
)
|
||||
code = item["GarbageTypeCode"].lower()
|
||||
if code not in self.data:
|
||||
self.data[code] = date
|
||||
|
||||
LOGGER.debug("Updated Rova calendar: %s", self.data)
|
||||
@property
|
||||
def native_value(self) -> datetime | None:
|
||||
"""Return the state of the sensor."""
|
||||
return self.coordinator.data.get(self.entity_description.key)
|
||||
|
|
Loading…
Reference in New Issue