83 lines
2.6 KiB
Python
83 lines
2.6 KiB
Python
"""Contains the shared Coordinator for Starlink systems."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from dataclasses import dataclass
|
|
from datetime import timedelta
|
|
import logging
|
|
|
|
from starlink_grpc import (
|
|
AlertDict,
|
|
ChannelContext,
|
|
GrpcError,
|
|
LocationDict,
|
|
ObstructionDict,
|
|
StatusDict,
|
|
location_data,
|
|
reboot,
|
|
set_stow_state,
|
|
status_data,
|
|
)
|
|
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.exceptions import HomeAssistantError
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class StarlinkData:
|
|
"""Contains data pulled from the Starlink system."""
|
|
|
|
location: LocationDict
|
|
status: StatusDict
|
|
obstruction: ObstructionDict
|
|
alert: AlertDict
|
|
|
|
|
|
class StarlinkUpdateCoordinator(DataUpdateCoordinator[StarlinkData]):
|
|
"""Coordinates updates between all Starlink sensors defined in this file."""
|
|
|
|
def __init__(self, hass: HomeAssistant, name: str, url: str) -> None:
|
|
"""Initialize an UpdateCoordinator for a group of sensors."""
|
|
self.channel_context = ChannelContext(target=url)
|
|
|
|
super().__init__(
|
|
hass,
|
|
_LOGGER,
|
|
name=name,
|
|
update_interval=timedelta(seconds=5),
|
|
)
|
|
|
|
async def _async_update_data(self) -> StarlinkData:
|
|
async with asyncio.timeout(4):
|
|
try:
|
|
status = await self.hass.async_add_executor_job(
|
|
status_data, self.channel_context
|
|
)
|
|
location = await self.hass.async_add_executor_job(
|
|
location_data, self.channel_context
|
|
)
|
|
return StarlinkData(location, *status)
|
|
except GrpcError as exc:
|
|
raise UpdateFailed from exc
|
|
|
|
async def async_stow_starlink(self, stow: bool) -> None:
|
|
"""Set whether Starlink system tied to this coordinator should be stowed."""
|
|
async with asyncio.timeout(4):
|
|
try:
|
|
await self.hass.async_add_executor_job(
|
|
set_stow_state, not stow, self.channel_context
|
|
)
|
|
except GrpcError as exc:
|
|
raise HomeAssistantError from exc
|
|
|
|
async def async_reboot_starlink(self) -> None:
|
|
"""Reboot the Starlink system tied to this coordinator."""
|
|
async with asyncio.timeout(4):
|
|
try:
|
|
await self.hass.async_add_executor_job(reboot, self.channel_context)
|
|
except GrpcError as exc:
|
|
raise HomeAssistantError from exc
|