core/homeassistant/components/lamarzocco/coordinator.py

159 lines
5.2 KiB
Python

"""Coordinator for La Marzocco API."""
from __future__ import annotations
from abc import abstractmethod
from dataclasses import dataclass
from datetime import timedelta
import logging
from typing import Any
from pylamarzocco import LaMarzoccoMachine
from pylamarzocco.exceptions import AuthFail, RequestNotSuccessful
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN
SCAN_INTERVAL = timedelta(seconds=15)
SETTINGS_UPDATE_INTERVAL = timedelta(hours=8)
SCHEDULE_UPDATE_INTERVAL = timedelta(minutes=30)
STATISTICS_UPDATE_INTERVAL = timedelta(minutes=15)
_LOGGER = logging.getLogger(__name__)
@dataclass
class LaMarzoccoRuntimeData:
"""Runtime data for La Marzocco."""
config_coordinator: LaMarzoccoConfigUpdateCoordinator
settings_coordinator: LaMarzoccoSettingsUpdateCoordinator
schedule_coordinator: LaMarzoccoScheduleUpdateCoordinator
statistics_coordinator: LaMarzoccoStatisticsUpdateCoordinator
type LaMarzoccoConfigEntry = ConfigEntry[LaMarzoccoRuntimeData]
class LaMarzoccoUpdateCoordinator(DataUpdateCoordinator[None]):
"""Base class for La Marzocco coordinators."""
_default_update_interval = SCAN_INTERVAL
config_entry: LaMarzoccoConfigEntry
websocket_terminated = True
def __init__(
self,
hass: HomeAssistant,
entry: LaMarzoccoConfigEntry,
device: LaMarzoccoMachine,
) -> None:
"""Initialize coordinator."""
super().__init__(
hass,
_LOGGER,
config_entry=entry,
name=DOMAIN,
update_interval=self._default_update_interval,
)
self.device = device
async def _async_update_data(self) -> None:
"""Do the data update."""
try:
await self._internal_async_update_data()
except AuthFail as ex:
_LOGGER.debug("Authentication failed", exc_info=True)
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN, translation_key="authentication_failed"
) from ex
except RequestNotSuccessful as ex:
_LOGGER.debug(ex, exc_info=True)
raise UpdateFailed(
translation_domain=DOMAIN, translation_key="api_error"
) from ex
@abstractmethod
async def _internal_async_update_data(self) -> None:
"""Actual data update logic."""
class LaMarzoccoConfigUpdateCoordinator(LaMarzoccoUpdateCoordinator):
"""Class to handle fetching data from the La Marzocco API centrally."""
async def _internal_async_update_data(self) -> None:
"""Fetch data from API endpoint."""
if self.device.websocket.connected:
return
await self.device.get_dashboard()
_LOGGER.debug("Current status: %s", self.device.dashboard.to_dict())
self.config_entry.async_create_background_task(
hass=self.hass,
target=self.connect_websocket(),
name="lm_websocket_task",
)
async def websocket_close(_: Any | None = None) -> None:
await self.device.websocket.disconnect()
self.config_entry.async_on_unload(
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, websocket_close)
)
self.config_entry.async_on_unload(websocket_close)
async def connect_websocket(self) -> None:
"""Connect to the websocket."""
_LOGGER.debug("Init WebSocket in background task")
self.websocket_terminated = False
self.async_update_listeners()
await self.device.connect_dashboard_websocket(
update_callback=lambda _: self.async_set_updated_data(None),
connect_callback=self.async_update_listeners,
disconnect_callback=self.async_update_listeners,
)
self.websocket_terminated = True
self.async_update_listeners()
class LaMarzoccoSettingsUpdateCoordinator(LaMarzoccoUpdateCoordinator):
"""Coordinator for La Marzocco settings."""
_default_update_interval = SETTINGS_UPDATE_INTERVAL
async def _internal_async_update_data(self) -> None:
"""Fetch data from API endpoint."""
await self.device.get_settings()
_LOGGER.debug("Current settings: %s", self.device.settings.to_dict())
class LaMarzoccoScheduleUpdateCoordinator(LaMarzoccoUpdateCoordinator):
"""Coordinator for La Marzocco schedule."""
_default_update_interval = SCHEDULE_UPDATE_INTERVAL
async def _internal_async_update_data(self) -> None:
"""Fetch data from API endpoint."""
await self.device.get_schedule()
_LOGGER.debug("Current schedule: %s", self.device.schedule.to_dict())
class LaMarzoccoStatisticsUpdateCoordinator(LaMarzoccoUpdateCoordinator):
"""Coordinator for La Marzocco statistics."""
_default_update_interval = STATISTICS_UPDATE_INTERVAL
async def _internal_async_update_data(self) -> None:
"""Fetch data from API endpoint."""
await self.device.get_coffee_and_flush_counter()
_LOGGER.debug("Current statistics: %s", self.device.statistics.to_dict())