core/homeassistant/components/ping/coordinator.py

55 lines
1.4 KiB
Python
Raw Normal View History

"""DataUpdateCoordinator for the ping integration."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import timedelta
import logging
from typing import Any
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .helpers import PingDataICMPLib, PingDataSubProcess
_LOGGER = logging.getLogger(__name__)
@dataclass(slots=True, frozen=True)
class PingResult:
"""Dataclass returned by the coordinator."""
ip_address: str
is_alive: bool
data: dict[str, Any]
class PingUpdateCoordinator(DataUpdateCoordinator[PingResult]):
"""The Ping update coordinator."""
ping: PingDataSubProcess | PingDataICMPLib
def __init__(
self,
hass: HomeAssistant,
ping: PingDataSubProcess | PingDataICMPLib,
) -> None:
"""Initialize the Ping coordinator."""
self.ping = ping
super().__init__(
hass,
_LOGGER,
name=f"Ping {ping.ip_address}",
update_interval=timedelta(seconds=30),
)
async def _async_update_data(self) -> PingResult:
"""Trigger ping check."""
await self.ping.async_update()
return PingResult(
ip_address=self.ping.ip_address,
is_alive=self.ping.is_alive,
data=self.ping.data or {},
)