core/homeassistant/components/openuv/coordinator.py

74 lines
2.3 KiB
Python
Raw Normal View History

"""Define an update coordinator for OpenUV."""
from __future__ import annotations
from collections.abc import Awaitable, Callable
from typing import Any, cast
2022-11-08 14:41:09 +00:00
from pyopenuv.errors import InvalidApiKeyError, OpenUvError
from homeassistant.config_entries import SOURCE_REAUTH, ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import LOGGER
DEFAULT_DEBOUNCER_COOLDOWN_SECONDS = 15 * 60
class OpenUvCoordinator(DataUpdateCoordinator):
"""Define an OpenUV data coordinator."""
2022-11-08 14:41:09 +00:00
config_entry: ConfigEntry
update_method: Callable[[], Awaitable[dict[str, Any]]]
def __init__(
self,
hass: HomeAssistant,
*,
entry: ConfigEntry,
name: str,
latitude: str,
longitude: str,
update_method: Callable[[], Awaitable[dict[str, Any]]],
) -> None:
"""Initialize."""
super().__init__(
hass,
LOGGER,
name=name,
update_method=update_method,
request_refresh_debouncer=Debouncer(
hass,
LOGGER,
cooldown=DEFAULT_DEBOUNCER_COOLDOWN_SECONDS,
immediate=True,
),
)
self._entry = entry
self.latitude = latitude
self.longitude = longitude
async def _async_update_data(self) -> dict[str, Any]:
"""Fetch data from OpenUV."""
try:
data = await self.update_method()
except InvalidApiKeyError as err:
raise ConfigEntryAuthFailed("Invalid API key") from err
except OpenUvError as err:
raise UpdateFailed(str(err)) from err
2022-11-08 14:41:09 +00:00
# OpenUV uses HTTP 403 to indicate both an invalid API key and an API key that
# has hit its daily/monthly limit; both cases will result in a reauth flow. If
# coordinator update succeeds after a reauth flow has been started, terminate
# it:
if reauth_flow := next(
iter(self._entry.async_get_active_flows(self.hass, {SOURCE_REAUTH})),
None,
):
self.hass.config_entries.flow.async_abort(reauth_flow["flow_id"])
return cast(dict[str, Any], data["result"])