core/homeassistant/components/eheimdigital/coordinator.py

99 lines
3.7 KiB
Python

"""Data update coordinator for the EHEIM Digital integration."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from aiohttp import ClientError
from eheimdigital.device import EheimDigitalDevice
from eheimdigital.hub import EheimDigitalHub
from eheimdigital.types import EheimDeviceType, EheimDigitalClientError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_HOST
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.entity_component import DEFAULT_SCAN_INTERVAL
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN, LOGGER
type AsyncSetupDeviceEntitiesCallback = Callable[[dict[str, EheimDigitalDevice]], None]
type EheimDigitalConfigEntry = ConfigEntry[EheimDigitalUpdateCoordinator]
class EheimDigitalUpdateCoordinator(
DataUpdateCoordinator[dict[str, EheimDigitalDevice]]
):
"""The EHEIM Digital data update coordinator."""
config_entry: EheimDigitalConfigEntry
def __init__(
self, hass: HomeAssistant, config_entry: EheimDigitalConfigEntry
) -> None:
"""Initialize the EHEIM Digital data update coordinator."""
super().__init__(
hass,
LOGGER,
config_entry=config_entry,
name=DOMAIN,
update_interval=DEFAULT_SCAN_INTERVAL,
)
self.main_device_added_event = asyncio.Event()
self.hub = EheimDigitalHub(
host=self.config_entry.data[CONF_HOST],
session=async_get_clientsession(hass),
loop=hass.loop,
receive_callback=self._async_receive_callback,
device_found_callback=self._async_device_found,
main_device_added_event=self.main_device_added_event,
)
self.known_devices: set[str] = set()
self.platform_callbacks: set[AsyncSetupDeviceEntitiesCallback] = set()
def add_platform_callback(
self,
async_setup_device_entities: AsyncSetupDeviceEntitiesCallback,
) -> None:
"""Add the setup callbacks from a specific platform."""
self.platform_callbacks.add(async_setup_device_entities)
async def _async_device_found(
self, device_address: str, device_type: EheimDeviceType
) -> None:
"""Set up a new device found.
This function is called from the library whenever a new device is added.
"""
if device_address not in self.known_devices:
for platform_callback in self.platform_callbacks:
platform_callback({device_address: self.hub.devices[device_address]})
async def _async_receive_callback(self) -> None:
self.async_set_updated_data(self.hub.devices)
async def _async_setup(self) -> None:
try:
await self.hub.connect()
async with asyncio.timeout(2):
# This event gets triggered when the first message is received from
# the device, it contains the data necessary to create the main device.
# This removes the race condition where the main device is accessed
# before the response from the device is parsed.
await self.main_device_added_event.wait()
await self.hub.update()
except (TimeoutError, EheimDigitalClientError) as err:
raise ConfigEntryNotReady from err
async def _async_update_data(self) -> dict[str, EheimDigitalDevice]:
try:
await self.hub.update()
except ClientError as ex:
raise UpdateFailed from ex
return self.data