core/homeassistant/components/starlink/coordinator.py

137 lines
4.6 KiB
Python

"""Contains the shared Coordinator for Starlink systems."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from datetime import timedelta
import logging
from zoneinfo import ZoneInfo
from starlink_grpc import (
AlertDict,
ChannelContext,
GrpcError,
LocationDict,
ObstructionDict,
StatusDict,
get_sleep_config,
location_data,
reboot,
set_sleep_config,
set_stow_state,
status_data,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
_LOGGER = logging.getLogger(__name__)
@dataclass
class StarlinkData:
"""Contains data pulled from the Starlink system."""
location: LocationDict
sleep: tuple[int, int, bool]
status: StatusDict
obstruction: ObstructionDict
alert: AlertDict
class StarlinkUpdateCoordinator(DataUpdateCoordinator[StarlinkData]):
"""Coordinates updates between all Starlink sensors defined in this file."""
def __init__(self, hass: HomeAssistant, name: str, url: str) -> None:
"""Initialize an UpdateCoordinator for a group of sensors."""
self.channel_context = ChannelContext(target=url)
self.timezone = ZoneInfo(hass.config.time_zone)
super().__init__(
hass,
_LOGGER,
name=name,
update_interval=timedelta(seconds=5),
)
def _get_starlink_data(self) -> StarlinkData:
"""Retrieve Starlink data."""
channel_context = self.channel_context
status = status_data(channel_context)
location = location_data(channel_context)
sleep = get_sleep_config(channel_context)
return StarlinkData(location, sleep, *status)
async def _async_update_data(self) -> StarlinkData:
async with asyncio.timeout(4):
try:
result = await self.hass.async_add_executor_job(self._get_starlink_data)
except GrpcError as exc:
raise UpdateFailed from exc
return result
async def async_stow_starlink(self, stow: bool) -> None:
"""Set whether Starlink system tied to this coordinator should be stowed."""
async with asyncio.timeout(4):
try:
await self.hass.async_add_executor_job(
set_stow_state, not stow, self.channel_context
)
except GrpcError as exc:
raise HomeAssistantError from exc
async def async_reboot_starlink(self) -> None:
"""Reboot the Starlink system tied to this coordinator."""
async with asyncio.timeout(4):
try:
await self.hass.async_add_executor_job(reboot, self.channel_context)
except GrpcError as exc:
raise HomeAssistantError from exc
async def async_set_sleep_schedule_enabled(self, sleep_schedule: bool) -> None:
"""Set whether Starlink system uses the configured sleep schedule."""
async with asyncio.timeout(4):
try:
await self.hass.async_add_executor_job(
set_sleep_config,
self.data.sleep[0],
self.data.sleep[1],
sleep_schedule,
self.channel_context,
)
except GrpcError as exc:
raise HomeAssistantError from exc
async def async_set_sleep_start(self, start: int) -> None:
"""Set Starlink system sleep schedule start time."""
async with asyncio.timeout(4):
try:
await self.hass.async_add_executor_job(
set_sleep_config,
start,
self.data.sleep[1],
self.data.sleep[2],
self.channel_context,
)
except GrpcError as exc:
raise HomeAssistantError from exc
async def async_set_sleep_duration(self, end: int) -> None:
"""Set Starlink system sleep schedule end time."""
duration = end - self.data.sleep[0]
if duration < 0:
# If the duration pushed us into the next day, add one days worth to correct that.
duration += 1440
async with asyncio.timeout(4):
try:
await self.hass.async_add_executor_job(
set_sleep_config,
self.data.sleep[0],
duration,
self.data.sleep[2],
self.channel_context,
)
except GrpcError as exc:
raise HomeAssistantError from exc