core/homeassistant/components/squeezebox/coordinator.py

139 lines
4.5 KiB
Python

"""DataUpdateCoordinator for the Squeezebox integration."""
from __future__ import annotations
from asyncio import timeout
from collections.abc import Callable
from datetime import timedelta
import logging
from typing import TYPE_CHECKING, Any
from pysqueezebox import Player, Server
from pysqueezebox.player import Alarm
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
if TYPE_CHECKING:
from . import SqueezeboxConfigEntry
from .const import (
DOMAIN,
PLAYER_UPDATE_INTERVAL,
SENSOR_UPDATE_INTERVAL,
SIGNAL_PLAYER_REDISCOVERED,
STATUS_API_TIMEOUT,
)
_LOGGER = logging.getLogger(__name__)
class LMSStatusDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""LMS Status custom coordinator."""
config_entry: SqueezeboxConfigEntry
def __init__(
self, hass: HomeAssistant, config_entry: SqueezeboxConfigEntry, lms: Server
) -> None:
"""Initialize my coordinator."""
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=lms.name,
update_interval=timedelta(seconds=SENSOR_UPDATE_INTERVAL),
always_update=False,
)
self.lms = lms
self.can_server_restart = False
async def _async_setup(self) -> None:
"""Query LMS capabilities."""
result = await self.lms.async_query("can", "restartserver", "?")
if result and "_can" in result and result["_can"] == 1:
_LOGGER.debug("Can restart %s", self.lms.name)
self.can_server_restart = True
else:
_LOGGER.warning("Can't query server capabilities %s", self.lms.name)
async def _async_update_data(self) -> dict[str, Any]:
"""Fetch data from LMS status call.
Then we process only a subset to make then nice for HA
"""
async with timeout(STATUS_API_TIMEOUT):
data: dict[str, Any] | None = await self.lms.async_prepared_status()
if not data:
raise UpdateFailed(
translation_domain=DOMAIN,
translation_key="coordinator_no_data",
)
_LOGGER.debug("Raw serverstatus %s=%s", self.lms.name, data)
return data
class SqueezeBoxPlayerUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Coordinator for Squeezebox players."""
config_entry: SqueezeboxConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: SqueezeboxConfigEntry,
player: Player,
server_uuid: str,
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=player.name,
update_interval=timedelta(seconds=PLAYER_UPDATE_INTERVAL),
always_update=True,
)
self.player = player
self.available = True
self.known_alarms: set[str] = set()
self._remove_dispatcher: Callable | None = None
self.player_uuid = format_mac(player.player_id)
self.server_uuid = server_uuid
async def _async_update_data(self) -> dict[str, Any]:
"""Update the Player() object if available, or listen for rediscovery if not."""
if self.available:
# Only update players available at last update, unavailable players are rediscovered instead
await self.player.async_update()
if not self.player.connected:
_LOGGER.info("Player %s is not available", self.name)
self.available = False
# start listening for restored players
self._remove_dispatcher = async_dispatcher_connect(
self.hass, SIGNAL_PLAYER_REDISCOVERED, self.rediscovered
)
alarm_dict: dict[str, Alarm] = (
{alarm["id"]: alarm for alarm in self.player.alarms}
if self.player.alarms
else {}
)
return {"alarms": alarm_dict}
@callback
def rediscovered(self, unique_id: str, connected: bool) -> None:
"""Make a player available again."""
if unique_id == self.player.player_id and connected:
self.available = True
_LOGGER.info("Player %s is available again", self.name)
if self._remove_dispatcher:
self._remove_dispatcher()