core/homeassistant/components/prusalink/__init__.py

147 lines
4.4 KiB
Python

"""The PrusaLink integration."""
from __future__ import annotations
from abc import ABC, abstractmethod
import asyncio
from datetime import timedelta
import logging
from time import monotonic
from typing import Generic, TypeVar
from pyprusalink import InvalidAuth, JobInfo, PrinterInfo, PrusaLink, PrusaLinkError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
UpdateFailed,
)
from .const import DOMAIN
PLATFORMS: list[Platform] = [Platform.BUTTON, Platform.CAMERA, Platform.SENSOR]
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up PrusaLink from a config entry."""
api = PrusaLink(
async_get_clientsession(hass),
entry.data["host"],
entry.data["api_key"],
)
coordinators = {
"printer": PrinterUpdateCoordinator(hass, api),
"job": JobUpdateCoordinator(hass, api),
}
for coordinator in coordinators.values():
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinators
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
T = TypeVar("T", PrinterInfo, JobInfo)
class PrusaLinkUpdateCoordinator(DataUpdateCoordinator, Generic[T], ABC):
"""Update coordinator for the printer."""
config_entry: ConfigEntry
expect_change_until = 0.0
def __init__(self, hass: HomeAssistant, api: PrusaLink) -> None:
"""Initialize the update coordinator."""
self.api = api
super().__init__(
hass, _LOGGER, name=DOMAIN, update_interval=self._get_update_interval(None)
)
async def _async_update_data(self) -> T:
"""Update the data."""
try:
async with asyncio.timeout(5):
data = await self._fetch_data()
except InvalidAuth:
raise UpdateFailed("Invalid authentication") from None
except PrusaLinkError as err:
raise UpdateFailed(str(err)) from err
self.update_interval = self._get_update_interval(data)
return data
@abstractmethod
async def _fetch_data(self) -> T:
"""Fetch the actual data."""
raise NotImplementedError
@callback
def expect_change(self) -> None:
"""Expect a change."""
self.expect_change_until = monotonic() + 30
def _get_update_interval(self, data: T) -> timedelta:
"""Get new update interval."""
if self.expect_change_until > monotonic():
return timedelta(seconds=5)
return timedelta(seconds=30)
class PrinterUpdateCoordinator(PrusaLinkUpdateCoordinator[PrinterInfo]):
"""Printer update coordinator."""
async def _fetch_data(self) -> PrinterInfo:
"""Fetch the printer data."""
return await self.api.get_printer()
def _get_update_interval(self, data: T) -> timedelta:
"""Get new update interval."""
if data and any(
data["state"]["flags"][key] for key in ("pausing", "cancelling")
):
return timedelta(seconds=5)
return super()._get_update_interval(data)
class JobUpdateCoordinator(PrusaLinkUpdateCoordinator[JobInfo]):
"""Job update coordinator."""
async def _fetch_data(self) -> JobInfo:
"""Fetch the printer data."""
return await self.api.get_job()
class PrusaLinkEntity(CoordinatorEntity[PrusaLinkUpdateCoordinator]):
"""Defines a base PrusaLink entity."""
_attr_has_entity_name = True
@property
def device_info(self) -> DeviceInfo:
"""Return device information about this PrusaLink device."""
return DeviceInfo(
identifiers={(DOMAIN, self.coordinator.config_entry.entry_id)},
name=self.coordinator.config_entry.title,
manufacturer="Prusa",
configuration_url=self.coordinator.api.host,
)