99 lines
3.7 KiB
Python
99 lines
3.7 KiB
Python
"""Data update coordinator for the EHEIM Digital integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import Callable
|
|
|
|
from aiohttp import ClientError
|
|
from eheimdigital.device import EheimDigitalDevice
|
|
from eheimdigital.hub import EheimDigitalHub
|
|
from eheimdigital.types import EheimDeviceType, EheimDigitalClientError
|
|
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import CONF_HOST
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.exceptions import ConfigEntryNotReady
|
|
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
|
from homeassistant.helpers.entity_component import DEFAULT_SCAN_INTERVAL
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
|
|
|
from .const import DOMAIN, LOGGER
|
|
|
|
type AsyncSetupDeviceEntitiesCallback = Callable[[dict[str, EheimDigitalDevice]], None]
|
|
|
|
type EheimDigitalConfigEntry = ConfigEntry[EheimDigitalUpdateCoordinator]
|
|
|
|
|
|
class EheimDigitalUpdateCoordinator(
|
|
DataUpdateCoordinator[dict[str, EheimDigitalDevice]]
|
|
):
|
|
"""The EHEIM Digital data update coordinator."""
|
|
|
|
config_entry: EheimDigitalConfigEntry
|
|
|
|
def __init__(
|
|
self, hass: HomeAssistant, config_entry: EheimDigitalConfigEntry
|
|
) -> None:
|
|
"""Initialize the EHEIM Digital data update coordinator."""
|
|
super().__init__(
|
|
hass,
|
|
LOGGER,
|
|
config_entry=config_entry,
|
|
name=DOMAIN,
|
|
update_interval=DEFAULT_SCAN_INTERVAL,
|
|
)
|
|
self.main_device_added_event = asyncio.Event()
|
|
self.hub = EheimDigitalHub(
|
|
host=self.config_entry.data[CONF_HOST],
|
|
session=async_get_clientsession(hass),
|
|
loop=hass.loop,
|
|
receive_callback=self._async_receive_callback,
|
|
device_found_callback=self._async_device_found,
|
|
main_device_added_event=self.main_device_added_event,
|
|
)
|
|
self.known_devices: set[str] = set()
|
|
self.platform_callbacks: set[AsyncSetupDeviceEntitiesCallback] = set()
|
|
|
|
def add_platform_callback(
|
|
self,
|
|
async_setup_device_entities: AsyncSetupDeviceEntitiesCallback,
|
|
) -> None:
|
|
"""Add the setup callbacks from a specific platform."""
|
|
self.platform_callbacks.add(async_setup_device_entities)
|
|
|
|
async def _async_device_found(
|
|
self, device_address: str, device_type: EheimDeviceType
|
|
) -> None:
|
|
"""Set up a new device found.
|
|
|
|
This function is called from the library whenever a new device is added.
|
|
"""
|
|
|
|
if device_address not in self.known_devices:
|
|
for platform_callback in self.platform_callbacks:
|
|
platform_callback({device_address: self.hub.devices[device_address]})
|
|
|
|
async def _async_receive_callback(self) -> None:
|
|
self.async_set_updated_data(self.hub.devices)
|
|
|
|
async def _async_setup(self) -> None:
|
|
try:
|
|
await self.hub.connect()
|
|
async with asyncio.timeout(2):
|
|
# This event gets triggered when the first message is received from
|
|
# the device, it contains the data necessary to create the main device.
|
|
# This removes the race condition where the main device is accessed
|
|
# before the response from the device is parsed.
|
|
await self.main_device_added_event.wait()
|
|
await self.hub.update()
|
|
except (TimeoutError, EheimDigitalClientError) as err:
|
|
raise ConfigEntryNotReady from err
|
|
|
|
async def _async_update_data(self) -> dict[str, EheimDigitalDevice]:
|
|
try:
|
|
await self.hub.update()
|
|
except ClientError as ex:
|
|
raise UpdateFailed from ex
|
|
return self.data
|