core/homeassistant/components/ping/coordinator.py

61 lines
1.6 KiB
Python

"""DataUpdateCoordinator for the ping integration."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import timedelta
import logging
from typing import Any
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .helpers import PingDataICMPLib, PingDataSubProcess
_LOGGER = logging.getLogger(__name__)
type PingConfigEntry = ConfigEntry[PingUpdateCoordinator]
@dataclass(slots=True, frozen=True)
class PingResult:
"""Dataclass returned by the coordinator."""
ip_address: str
is_alive: bool
data: dict[str, Any]
class PingUpdateCoordinator(DataUpdateCoordinator[PingResult]):
"""The Ping update coordinator."""
config_entry: PingConfigEntry
ping: PingDataSubProcess | PingDataICMPLib
def __init__(
self,
hass: HomeAssistant,
config_entry: PingConfigEntry,
ping: PingDataSubProcess | PingDataICMPLib,
) -> None:
"""Initialize the Ping coordinator."""
self.ping = ping
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=f"Ping {ping.ip_address}",
update_interval=timedelta(seconds=30),
)
async def _async_update_data(self) -> PingResult:
"""Trigger ping check."""
await self.ping.async_update()
return PingResult(
ip_address=self.ping.ip_address,
is_alive=self.ping.is_alive,
data=self.ping.data or {},
)