core/homeassistant/components/unifi/hub/hub.py

217 lines
7.4 KiB
Python

"""UniFi Network abstraction."""
from __future__ import annotations
from datetime import datetime, timedelta
import aiounifi
from aiounifi.models.device import DeviceSetPoePortModeRequest
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.device_registry import (
DeviceEntry,
DeviceEntryType,
DeviceInfo,
)
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_call_later, async_track_time_interval
import homeassistant.util.dt as dt_util
from ..const import ATTR_MANUFACTURER, CONF_SITE_ID, DOMAIN as UNIFI_DOMAIN, PLATFORMS
from .config import UnifiConfig
from .entity_loader import UnifiEntityLoader
from .websocket import UnifiWebsocket
CHECK_HEARTBEAT_INTERVAL = timedelta(seconds=1)
class UnifiHub:
"""Manages a single UniFi Network instance."""
def __init__(
self, hass: HomeAssistant, config_entry: ConfigEntry, api: aiounifi.Controller
) -> None:
"""Initialize the system."""
self.hass = hass
self.api = api
self.config = UnifiConfig.from_config_entry(config_entry)
self.entity_loader = UnifiEntityLoader(self)
self.websocket = UnifiWebsocket(hass, api, self.signal_reachable)
self.site = config_entry.data[CONF_SITE_ID]
self.is_admin = False
self._cancel_heartbeat_check: CALLBACK_TYPE | None = None
self._heartbeat_time: dict[str, datetime] = {}
self.poe_command_queue: dict[str, dict[int, str]] = {}
self._cancel_poe_command: CALLBACK_TYPE | None = None
@callback
@staticmethod
def get_hub(hass: HomeAssistant, config_entry: ConfigEntry) -> UnifiHub:
"""Get UniFi hub from config entry."""
hub: UnifiHub = hass.data[UNIFI_DOMAIN][config_entry.entry_id]
return hub
@property
def available(self) -> bool:
"""Websocket connection state."""
return self.websocket.available
@property
def signal_reachable(self) -> str:
"""Integration specific event to signal a change in connection status."""
return f"unifi-reachable-{self.config.entry.entry_id}"
@property
def signal_options_update(self) -> str:
"""Event specific per UniFi entry to signal new options."""
return f"unifi-options-{self.config.entry.entry_id}"
@property
def signal_heartbeat_missed(self) -> str:
"""Event specific per UniFi device tracker to signal new heartbeat missed."""
return "unifi-heartbeat-missed"
async def initialize(self) -> None:
"""Set up a UniFi Network instance."""
await self.entity_loader.initialize()
assert self.config.entry.unique_id is not None
self.is_admin = self.api.sites[self.config.entry.unique_id].role == "admin"
self.config.entry.add_update_listener(self.async_config_entry_updated)
self._cancel_heartbeat_check = async_track_time_interval(
self.hass, self._async_check_for_stale, CHECK_HEARTBEAT_INTERVAL
)
@callback
def async_heartbeat(
self, unique_id: str, heartbeat_expire_time: datetime | None = None
) -> None:
"""Signal when a device has fresh home state."""
if heartbeat_expire_time is not None:
self._heartbeat_time[unique_id] = heartbeat_expire_time
return
if unique_id in self._heartbeat_time:
del self._heartbeat_time[unique_id]
@callback
def _async_check_for_stale(self, *_: datetime) -> None:
"""Check for any devices scheduled to be marked disconnected."""
now = dt_util.utcnow()
unique_ids_to_remove = []
for unique_id, heartbeat_expire_time in self._heartbeat_time.items():
if now > heartbeat_expire_time:
async_dispatcher_send(
self.hass, f"{self.signal_heartbeat_missed}_{unique_id}"
)
unique_ids_to_remove.append(unique_id)
for unique_id in unique_ids_to_remove:
del self._heartbeat_time[unique_id]
@callback
def async_queue_poe_port_command(
self, device_id: str, port_idx: int, poe_mode: str
) -> None:
"""Queue commands to execute them together per device."""
if self._cancel_poe_command:
self._cancel_poe_command()
self._cancel_poe_command = None
device_queue = self.poe_command_queue.setdefault(device_id, {})
device_queue[port_idx] = poe_mode
async def async_execute_command(now: datetime) -> None:
"""Execute previously queued commands."""
queue = self.poe_command_queue.copy()
self.poe_command_queue.clear()
for device_id, device_commands in queue.items():
device = self.api.devices[device_id]
commands = list(device_commands.items())
await self.api.request(
DeviceSetPoePortModeRequest.create(device, targets=commands)
)
self._cancel_poe_command = async_call_later(self.hass, 5, async_execute_command)
@property
def device_info(self) -> DeviceInfo:
"""UniFi Network device info."""
assert self.config.entry.unique_id is not None
version: str | None = None
if sysinfo := next(iter(self.api.system_information.values()), None):
version = sysinfo.version
return DeviceInfo(
entry_type=DeviceEntryType.SERVICE,
identifiers={(UNIFI_DOMAIN, self.config.entry.unique_id)},
manufacturer=ATTR_MANUFACTURER,
model="UniFi Network Application",
name="UniFi Network",
sw_version=version,
)
@callback
def async_update_device_registry(self) -> DeviceEntry:
"""Update device registry."""
device_registry = dr.async_get(self.hass)
return device_registry.async_get_or_create(
config_entry_id=self.config.entry.entry_id, **self.device_info
)
@staticmethod
async def async_config_entry_updated(
hass: HomeAssistant, config_entry: ConfigEntry
) -> None:
"""Handle signals of config entry being updated.
If config entry is updated due to reauth flow
the entry might already have been reset and thus is not available.
"""
if not (hub := hass.data[UNIFI_DOMAIN].get(config_entry.entry_id)):
return
hub.config = UnifiConfig.from_config_entry(config_entry)
async_dispatcher_send(hass, hub.signal_options_update)
@callback
def shutdown(self, event: Event) -> None:
"""Wrap the call to unifi.close.
Used as an argument to EventBus.async_listen_once.
"""
self.websocket.stop()
async def async_reset(self) -> bool:
"""Reset this hub to default state.
Will cancel any scheduled setup retry and will unload
the config entry.
"""
await self.websocket.stop_and_wait()
unload_ok = await self.hass.config_entries.async_unload_platforms(
self.config.entry, PLATFORMS
)
if not unload_ok:
return False
if self._cancel_heartbeat_check:
self._cancel_heartbeat_check()
self._cancel_heartbeat_check = None
if self._cancel_poe_command:
self._cancel_poe_command()
self._cancel_poe_command = None
return True