core/homeassistant/components/homeassistant_hardware/util.py

387 lines
12 KiB
Python

"""Utility functions for Home Assistant SkyConnect integration."""
from __future__ import annotations
import asyncio
from collections import defaultdict
from collections.abc import AsyncIterator, Callable, Iterable
from contextlib import AsyncExitStack, asynccontextmanager
from dataclasses import dataclass
from enum import StrEnum
import logging
from universal_silabs_flasher.const import ApplicationType as FlasherApplicationType
from universal_silabs_flasher.firmware import parse_firmware_image
from universal_silabs_flasher.flasher import Flasher
from homeassistant.components.hassio import AddonError, AddonManager, AddonState
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.hassio import is_hassio
from homeassistant.helpers.singleton import singleton
from . import DATA_COMPONENT
from .const import (
OTBR_ADDON_MANAGER_DATA,
OTBR_ADDON_NAME,
OTBR_ADDON_SLUG,
ZIGBEE_FLASHER_ADDON_MANAGER_DATA,
ZIGBEE_FLASHER_ADDON_NAME,
ZIGBEE_FLASHER_ADDON_SLUG,
)
from .silabs_multiprotocol_addon import (
WaitingAddonManager,
get_multiprotocol_addon_manager,
)
_LOGGER = logging.getLogger(__name__)
class ApplicationType(StrEnum):
"""Application type running on a device."""
GECKO_BOOTLOADER = "bootloader"
CPC = "cpc"
EZSP = "ezsp"
SPINEL = "spinel"
ROUTER = "router"
@classmethod
def from_flasher_application_type(
cls, app_type: FlasherApplicationType
) -> ApplicationType:
"""Convert a USF application type enum."""
return cls(app_type.value)
def as_flasher_application_type(self) -> FlasherApplicationType:
"""Convert the application type enum into one compatible with USF."""
return FlasherApplicationType(self.value)
@singleton(OTBR_ADDON_MANAGER_DATA)
@callback
def get_otbr_addon_manager(hass: HomeAssistant) -> WaitingAddonManager:
"""Get the OTBR add-on manager."""
return WaitingAddonManager(
hass,
_LOGGER,
OTBR_ADDON_NAME,
OTBR_ADDON_SLUG,
)
@singleton(ZIGBEE_FLASHER_ADDON_MANAGER_DATA)
@callback
def get_zigbee_flasher_addon_manager(hass: HomeAssistant) -> WaitingAddonManager:
"""Get the flasher add-on manager."""
return WaitingAddonManager(
hass,
_LOGGER,
ZIGBEE_FLASHER_ADDON_NAME,
ZIGBEE_FLASHER_ADDON_SLUG,
)
@dataclass(kw_only=True)
class OwningAddon:
"""Owning add-on."""
slug: str
def _get_addon_manager(self, hass: HomeAssistant) -> WaitingAddonManager:
return WaitingAddonManager(
hass,
_LOGGER,
f"Add-on {self.slug}",
self.slug,
)
async def is_running(self, hass: HomeAssistant) -> bool:
"""Check if the add-on is running."""
addon_manager = self._get_addon_manager(hass)
try:
addon_info = await addon_manager.async_get_addon_info()
except AddonError:
return False
else:
return addon_info.state == AddonState.RUNNING
@asynccontextmanager
async def temporarily_stop(self, hass: HomeAssistant) -> AsyncIterator[None]:
"""Temporarily stop the add-on, restarting it after completion."""
addon_manager = self._get_addon_manager(hass)
try:
addon_info = await addon_manager.async_get_addon_info()
except AddonError:
yield
return
if addon_info.state != AddonState.RUNNING:
yield
return
try:
await addon_manager.async_stop_addon()
await addon_manager.async_wait_until_addon_state(AddonState.NOT_RUNNING)
yield
finally:
await addon_manager.async_start_addon_waiting()
@dataclass(kw_only=True)
class OwningIntegration:
"""Owning integration."""
config_entry_id: str
async def is_running(self, hass: HomeAssistant) -> bool:
"""Check if the integration is running."""
if (entry := hass.config_entries.async_get_entry(self.config_entry_id)) is None:
return False
return entry.state in (
ConfigEntryState.LOADED,
ConfigEntryState.SETUP_RETRY,
ConfigEntryState.SETUP_IN_PROGRESS,
)
@asynccontextmanager
async def temporarily_stop(self, hass: HomeAssistant) -> AsyncIterator[None]:
"""Temporarily stop the integration, restarting it after completion."""
if (entry := hass.config_entries.async_get_entry(self.config_entry_id)) is None:
yield
return
if entry.state != ConfigEntryState.LOADED:
yield
return
try:
await hass.config_entries.async_unload(entry.entry_id)
yield
finally:
await hass.config_entries.async_setup(entry.entry_id)
@dataclass(kw_only=True)
class FirmwareInfo:
"""Firmware guess."""
device: str
firmware_type: ApplicationType
firmware_version: str | None
source: str
owners: list[OwningAddon | OwningIntegration]
async def is_running(self, hass: HomeAssistant) -> bool:
"""Check if the firmware owner is running."""
states = await asyncio.gather(*(o.is_running(hass) for o in self.owners))
if not states:
return False
return all(states)
async def get_otbr_addon_firmware_info(
hass: HomeAssistant, otbr_addon_manager: AddonManager
) -> FirmwareInfo | None:
"""Get firmware info from the OTBR add-on."""
try:
otbr_addon_info = await otbr_addon_manager.async_get_addon_info()
except AddonError:
return None
if otbr_addon_info.state == AddonState.NOT_INSTALLED:
return None
if (otbr_path := otbr_addon_info.options.get("device")) is None:
return None
# Only create a new entry if there are no existing OTBR ones
return FirmwareInfo(
device=otbr_path,
firmware_type=ApplicationType.SPINEL,
firmware_version=None,
source="otbr",
owners=[OwningAddon(slug=otbr_addon_manager.addon_slug)],
)
async def guess_hardware_owners(
hass: HomeAssistant, device_path: str
) -> list[FirmwareInfo]:
"""Guess the firmware info based on installed addons and other integrations."""
device_guesses: defaultdict[str, list[FirmwareInfo]] = defaultdict(list)
async for firmware_info in hass.data[DATA_COMPONENT].iter_firmware_info():
device_guesses[firmware_info.device].append(firmware_info)
# It may be possible for the OTBR addon to be present without the integration
if is_hassio(hass):
otbr_addon_manager = get_otbr_addon_manager(hass)
otbr_addon_fw_info = await get_otbr_addon_firmware_info(
hass, otbr_addon_manager
)
otbr_path = (
otbr_addon_fw_info.device if otbr_addon_fw_info is not None else None
)
# Only create a new entry if there are no existing OTBR ones
if otbr_path is not None and not any(
info.source == "otbr" for info in device_guesses[otbr_path]
):
assert otbr_addon_fw_info is not None
device_guesses[otbr_path].append(otbr_addon_fw_info)
if is_hassio(hass):
multipan_addon_manager = await get_multiprotocol_addon_manager(hass)
try:
multipan_addon_info = await multipan_addon_manager.async_get_addon_info()
except AddonError:
pass
else:
if multipan_addon_info.state != AddonState.NOT_INSTALLED:
multipan_path = multipan_addon_info.options.get("device")
if multipan_path is not None:
device_guesses[multipan_path].append(
FirmwareInfo(
device=multipan_path,
firmware_type=ApplicationType.CPC,
firmware_version=None,
source="multiprotocol",
owners=[
OwningAddon(slug=multipan_addon_manager.addon_slug)
],
)
)
return device_guesses.get(device_path, [])
async def guess_firmware_info(hass: HomeAssistant, device_path: str) -> FirmwareInfo:
"""Guess the firmware type based on installed addons and other integrations."""
hardware_owners = await guess_hardware_owners(hass, device_path)
# Fall back to EZSP if we have no way to guess
if not hardware_owners:
return FirmwareInfo(
device=device_path,
firmware_type=ApplicationType.EZSP,
firmware_version=None,
source="unknown",
owners=[],
)
# Prioritize guesses that are pulled from a real source
guesses = [
(guess, sum([await owner.is_running(hass) for owner in guess.owners]))
for guess in hardware_owners
]
guesses.sort(key=lambda p: p[1])
assert guesses
# Pick the best one. We use a stable sort so ZHA < OTBR < multi-PAN
return guesses[-1][0]
async def probe_silabs_firmware_info(
device: str, *, probe_methods: Iterable[ApplicationType] | None = None
) -> FirmwareInfo | None:
"""Probe the running firmware on a SiLabs device."""
flasher = Flasher(
device=device,
**(
{"probe_methods": [m.as_flasher_application_type() for m in probe_methods]}
if probe_methods
else {}
),
)
try:
await flasher.probe_app_type()
except Exception: # noqa: BLE001
_LOGGER.debug("Failed to probe application type", exc_info=True)
if flasher.app_type is None:
return None
return FirmwareInfo(
device=device,
firmware_type=ApplicationType.from_flasher_application_type(flasher.app_type),
firmware_version=(
flasher.app_version.orig_version
if flasher.app_version is not None
else None
),
source="probe",
owners=[],
)
async def probe_silabs_firmware_type(
device: str, *, probe_methods: Iterable[ApplicationType] | None = None
) -> ApplicationType | None:
"""Probe the running firmware type on a SiLabs device."""
fw_info = await probe_silabs_firmware_info(device, probe_methods=probe_methods)
if fw_info is None:
return None
return fw_info.firmware_type
async def async_flash_silabs_firmware(
hass: HomeAssistant,
device: str,
fw_data: bytes,
expected_installed_firmware_type: ApplicationType,
bootloader_reset_type: str | None = None,
progress_callback: Callable[[int, int], None] | None = None,
) -> FirmwareInfo:
"""Flash firmware to the SiLabs device."""
firmware_info = await guess_firmware_info(hass, device)
_LOGGER.debug("Identified firmware info: %s", firmware_info)
fw_image = await hass.async_add_executor_job(parse_firmware_image, fw_data)
flasher = Flasher(
device=device,
probe_methods=(
ApplicationType.GECKO_BOOTLOADER.as_flasher_application_type(),
ApplicationType.EZSP.as_flasher_application_type(),
ApplicationType.SPINEL.as_flasher_application_type(),
ApplicationType.CPC.as_flasher_application_type(),
),
bootloader_reset=bootloader_reset_type,
)
async with AsyncExitStack() as stack:
for owner in firmware_info.owners:
await stack.enter_async_context(owner.temporarily_stop(hass))
try:
# Enter the bootloader with indeterminate progress
await flasher.enter_bootloader()
# Flash the firmware, with progress
await flasher.flash_firmware(fw_image, progress_callback=progress_callback)
except Exception as err:
raise HomeAssistantError("Failed to flash firmware") from err
probed_firmware_info = await probe_silabs_firmware_info(
device,
probe_methods=(expected_installed_firmware_type,),
)
if probed_firmware_info is None:
raise HomeAssistantError("Failed to probe the firmware after flashing")
return probed_firmware_info