core/homeassistant/components/tradfri/coordinator.py

101 lines
3.4 KiB
Python

"""Tradfri DataUpdateCoordinator."""
from __future__ import annotations
from collections.abc import Callable
from datetime import timedelta
from typing import Any
from pytradfri.command import Command
from pytradfri.device import Device
from pytradfri.error import RequestError
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import LOGGER
SCAN_INTERVAL = 60 # Interval for updating the coordinator
class TradfriDeviceDataUpdateCoordinator(DataUpdateCoordinator[Device]):
"""Coordinator to manage data for a specific Tradfri device."""
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: ConfigEntry,
api: Callable[[Command | list[Command]], Any],
device: Device,
) -> None:
"""Initialize device coordinator."""
self.api = api
self.device = device
self._exception: Exception | None = None
super().__init__(
hass,
LOGGER,
config_entry=config_entry,
name=f"Update coordinator for {device}",
update_interval=timedelta(seconds=SCAN_INTERVAL),
)
async def set_hub_available(self, available: bool) -> None:
"""Set status of hub."""
if available != self.last_update_success:
if not available:
self.last_update_success = False
await self.async_request_refresh()
@callback
def _observe_update(self, device: Device) -> None:
"""Update the coordinator for a device when a change is detected."""
self.async_set_updated_data(data=device)
@callback
def _exception_callback(self, exc: Exception) -> None:
"""Schedule handling exception.."""
self.hass.async_create_task(self._handle_exception(exc))
async def _handle_exception(self, exc: Exception) -> None:
"""Handle observe exceptions in a coroutine."""
# Store exception so that it gets raised in _async_update_data
self._exception = exc
LOGGER.debug(
"Observation failed for %s, trying again", self.device, exc_info=exc
)
# Change interval so we get a swift refresh
self.update_interval = timedelta(seconds=5)
await self.async_request_refresh()
async def _async_update_data(self) -> Device:
"""Fetch data from the gateway for a specific device."""
try:
if self._exception:
exc = self._exception
self._exception = None # Clear stored exception
raise exc
except RequestError as err:
raise UpdateFailed(f"Error communicating with API: {err}.") from err
if not self.data or not self.last_update_success: # Start subscription
try:
cmd = self.device.observe(
callback=self._observe_update,
err_callback=self._exception_callback,
duration=0,
)
await self.api(cmd)
except RequestError as err:
raise UpdateFailed(f"Error communicating with API: {err}.") from err
# Reset update interval
self.update_interval = timedelta(seconds=SCAN_INTERVAL)
return self.device