"""Test helpers for UniFi Protect.""" from __future__ import annotations from collections.abc import Callable, Sequence from dataclasses import dataclass from datetime import timedelta from typing import Any from unittest.mock import Mock from pyunifiprotect import ProtectApiClient from pyunifiprotect.data import ( Bootstrap, Camera, Event, EventType, ModelType, ProtectAdoptableDeviceModel, WSSubscriptionMessage, ) from pyunifiprotect.data.bootstrap import ProtectDeviceRef from pyunifiprotect.test_util.anonymize import random_hex from homeassistant.const import Platform from homeassistant.core import HomeAssistant, split_entity_id from homeassistant.helpers import entity_registry as er from homeassistant.helpers.entity import EntityDescription import homeassistant.util.dt as dt_util from tests.common import MockConfigEntry, async_fire_time_changed @dataclass class MockUFPFixture: """Mock for NVR.""" entry: MockConfigEntry api: ProtectApiClient ws_subscription: Callable[[WSSubscriptionMessage], None] | None = None def ws_msg(self, msg: WSSubscriptionMessage) -> Any: """Emit WS message for testing.""" if self.ws_subscription is not None: return self.ws_subscription(msg) def reset_objects(bootstrap: Bootstrap): """Reset bootstrap objects.""" bootstrap.cameras = {} bootstrap.lights = {} bootstrap.sensors = {} bootstrap.viewers = {} bootstrap.events = {} bootstrap.doorlocks = {} bootstrap.chimes = {} async def time_changed(hass: HomeAssistant, seconds: int) -> None: """Trigger time changed.""" next_update = dt_util.utcnow() + timedelta(seconds) async_fire_time_changed(hass, next_update) await hass.async_block_till_done() async def enable_entity( hass: HomeAssistant, entry_id: str, entity_id: str ) -> er.RegistryEntry: """Enable a disabled entity.""" entity_registry = er.async_get(hass) updated_entity = entity_registry.async_update_entity(entity_id, disabled_by=None) assert not updated_entity.disabled await hass.config_entries.async_reload(entry_id) await hass.async_block_till_done() return updated_entity def assert_entity_counts( hass: HomeAssistant, platform: Platform, total: int, enabled: int ) -> None: """Assert entity counts for a given platform.""" entity_registry = er.async_get(hass) entities = [ e for e in entity_registry.entities if split_entity_id(e)[0] == platform.value ] assert len(entities) == total assert len(hass.states.async_all(platform.value)) == enabled def normalize_name(name: str) -> str: """Normalize name.""" return name.lower().replace(":", "").replace(" ", "_").replace("-", "_") def ids_from_device_description( platform: Platform, device: ProtectAdoptableDeviceModel, description: EntityDescription, ) -> tuple[str, str]: """Return expected unique_id and entity_id for a give platform/device/description combination.""" entity_name = normalize_name(device.display_name) description_entity_name = normalize_name(str(description.name)) unique_id = f"{device.mac}_{description.key}" entity_id = f"{platform.value}.{entity_name}_{description_entity_name}" return unique_id, entity_id def generate_random_ids() -> tuple[str, str]: """Generate random IDs for device.""" return random_hex(24).lower(), random_hex(12).upper() def regenerate_device_ids(device: ProtectAdoptableDeviceModel) -> None: """Regenerate the IDs on UFP device.""" device.id, device.mac = generate_random_ids() def add_device_ref(bootstrap: Bootstrap, device: ProtectAdoptableDeviceModel) -> None: """Manually add device ref to bootstrap for lookup.""" ref = ProtectDeviceRef(id=device.id, model=device.model) bootstrap.id_lookup[device.id] = ref bootstrap.mac_lookup[device.mac.lower()] = ref def add_device( bootstrap: Bootstrap, device: ProtectAdoptableDeviceModel, regenerate_ids: bool ) -> None: """Add test device to bootstrap.""" if device.model is None: return device._api = bootstrap.api if isinstance(device, Camera): for channel in device.channels: channel._api = bootstrap.api if regenerate_ids: regenerate_device_ids(device) device._initial_data = device.dict() devices = getattr(bootstrap, f"{device.model.value}s") devices[device.id] = device add_device_ref(bootstrap, device) async def init_entry( hass: HomeAssistant, ufp: MockUFPFixture, devices: Sequence[ProtectAdoptableDeviceModel], regenerate_ids: bool = True, ) -> None: """Initialize Protect entry with given devices.""" reset_objects(ufp.api.bootstrap) for device in devices: add_device(ufp.api.bootstrap, device, regenerate_ids) await hass.config_entries.async_setup(ufp.entry.entry_id) await hass.async_block_till_done() async def remove_entities( hass: HomeAssistant, ufp: MockUFPFixture, ufp_devices: list[ProtectAdoptableDeviceModel], ) -> None: """Remove all entities for given Protect devices.""" for ufp_device in ufp_devices: if not ufp_device.is_adopted_by_us: continue devices = getattr(ufp.api.bootstrap, f"{ufp_device.model.value}s") del devices[ufp_device.id] mock_msg = Mock() mock_msg.changed_data = {} mock_msg.old_obj = ufp_device mock_msg.new_obj = None ufp.ws_msg(mock_msg) await time_changed(hass, 30) async def adopt_devices( hass: HomeAssistant, ufp: MockUFPFixture, ufp_devices: list[ProtectAdoptableDeviceModel], fully_adopt: bool = False, ): """Emit WS to re-adopt give Protect devices.""" for ufp_device in ufp_devices: if fully_adopt: ufp_device.is_adopted = True ufp_device.is_adopted_by_other = False ufp_device.can_adopt = False devices = getattr(ufp.api.bootstrap, f"{ufp_device.model.value}s") devices[ufp_device.id] = ufp_device mock_msg = Mock() mock_msg.changed_data = {} mock_msg.new_obj = Event( api=ufp_device.api, id=random_hex(24), smart_detect_types=[], smart_detect_event_ids=[], type=EventType.DEVICE_ADOPTED, start=dt_util.utcnow(), score=100, metadata={"device_id": ufp_device.id}, model=ModelType.EVENT, ) ufp.ws_msg(mock_msg) await hass.async_block_till_done()