core/homeassistant/components/airzone/coordinator.py

44 lines
1.3 KiB
Python
Raw Normal View History

"""The Airzone integration."""
from __future__ import annotations
from datetime import timedelta
import logging
from typing import Any
from aioairzone.exceptions import AirzoneError
from aioairzone.localapi import AirzoneLocalApi
import async_timeout
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import AIOAIRZONE_DEVICE_TIMEOUT_SEC, DOMAIN
SCAN_INTERVAL = timedelta(seconds=60)
_LOGGER = logging.getLogger(__name__)
2022-05-09 17:56:59 +00:00
class AirzoneUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Class to manage fetching data from the Airzone device."""
def __init__(self, hass: HomeAssistant, airzone: AirzoneLocalApi) -> None:
"""Initialize."""
self.airzone = airzone
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=SCAN_INTERVAL,
)
async def _async_update_data(self) -> dict[str, Any]:
"""Update data via library."""
async with async_timeout.timeout(AIOAIRZONE_DEVICE_TIMEOUT_SEC):
try:
2022-04-19 15:03:13 +00:00
await self.airzone.update()
except AirzoneError as error:
raise UpdateFailed(error) from error
return self.airzone.data()