"""Axis conftest.""" from __future__ import annotations from collections.abc import Callable, Generator from copy import deepcopy from types import MappingProxyType from typing import Any from unittest.mock import AsyncMock, patch from axis.rtsp import Signal, State import pytest import respx from homeassistant.components.axis.const import DOMAIN as AXIS_DOMAIN from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( CONF_HOST, CONF_MODEL, CONF_NAME, CONF_PASSWORD, CONF_PORT, CONF_USERNAME, ) from homeassistant.core import HomeAssistant from .const import ( API_DISCOVERY_RESPONSE, APP_AOA_RESPONSE, APP_VMD4_RESPONSE, APPLICATIONS_LIST_RESPONSE, BASIC_DEVICE_INFO_RESPONSE, BRAND_RESPONSE, DEFAULT_HOST, FORMATTED_MAC, IMAGE_RESPONSE, MODEL, MQTT_CLIENT_RESPONSE, NAME, PORT_MANAGEMENT_RESPONSE, PORTS_RESPONSE, PROPERTIES_RESPONSE, PTZ_RESPONSE, STREAM_PROFILES_RESPONSE, VIEW_AREAS_RESPONSE, ) from tests.common import MockConfigEntry @pytest.fixture def mock_setup_entry() -> Generator[AsyncMock, None, None]: """Override async_setup_entry.""" with patch( "homeassistant.components.axis.async_setup_entry", return_value=True ) as mock_setup_entry: yield mock_setup_entry # Config entry fixtures @pytest.fixture(name="config_entry") def config_entry_fixture( hass: HomeAssistant, config_entry_data: MappingProxyType[str, Any], config_entry_options: MappingProxyType[str, Any], config_entry_version: int, ) -> ConfigEntry: """Define a config entry fixture.""" config_entry = MockConfigEntry( domain=AXIS_DOMAIN, entry_id="676abe5b73621446e6550a2e86ffe3dd", unique_id=FORMATTED_MAC, data=config_entry_data, options=config_entry_options, version=config_entry_version, ) config_entry.add_to_hass(hass) return config_entry @pytest.fixture(name="config_entry_version") def config_entry_version_fixture() -> int: """Define a config entry version fixture.""" return 3 @pytest.fixture(name="config_entry_data") def config_entry_data_fixture() -> MappingProxyType[str, Any]: """Define a config entry data fixture.""" return { CONF_HOST: DEFAULT_HOST, CONF_USERNAME: "root", CONF_PASSWORD: "pass", CONF_PORT: 80, CONF_MODEL: MODEL, CONF_NAME: NAME, } @pytest.fixture(name="config_entry_options") def config_entry_options_fixture() -> MappingProxyType[str, Any]: """Define a config entry options fixture.""" return {} # Axis API fixtures @pytest.fixture(name="mock_vapix_requests") def default_request_fixture( respx_mock: respx, port_management_payload: dict[str, Any], param_properties_payload: dict[str, Any], param_ports_payload: dict[str, Any], mqtt_status_code: int, ) -> Callable[[str], None]: """Mock default Vapix requests responses.""" def __mock_default_requests(host: str) -> None: respx_mock(base_url=f"http://{host}:80") if host != DEFAULT_HOST: respx.post("/axis-cgi/apidiscovery.cgi").respond( json=API_DISCOVERY_RESPONSE, ) respx.post("/axis-cgi/basicdeviceinfo.cgi").respond( json=BASIC_DEVICE_INFO_RESPONSE, ) respx.post("/axis-cgi/io/portmanagement.cgi").respond( json=port_management_payload, ) respx.post("/axis-cgi/mqtt/client.cgi").respond( json=MQTT_CLIENT_RESPONSE, status_code=mqtt_status_code ) respx.post("/axis-cgi/streamprofile.cgi").respond( json=STREAM_PROFILES_RESPONSE, ) respx.post("/axis-cgi/viewarea/info.cgi").respond(json=VIEW_AREAS_RESPONSE) respx.post( "/axis-cgi/param.cgi", data={"action": "list", "group": "root.Brand"}, ).respond( text=BRAND_RESPONSE, headers={"Content-Type": "text/plain"}, ) respx.post( "/axis-cgi/param.cgi", data={"action": "list", "group": "root.Image"}, ).respond( text=IMAGE_RESPONSE, headers={"Content-Type": "text/plain"}, ) respx.post( "/axis-cgi/param.cgi", data={"action": "list", "group": "root.Input"}, ).respond( text=PORTS_RESPONSE, headers={"Content-Type": "text/plain"}, ) respx.post( "/axis-cgi/param.cgi", data={"action": "list", "group": "root.IOPort"}, ).respond( text=param_ports_payload, headers={"Content-Type": "text/plain"}, ) respx.post( "/axis-cgi/param.cgi", data={"action": "list", "group": "root.Output"}, ).respond( text=PORTS_RESPONSE, headers={"Content-Type": "text/plain"}, ) respx.post( "/axis-cgi/param.cgi", data={"action": "list", "group": "root.Properties"}, ).respond( text=param_properties_payload, headers={"Content-Type": "text/plain"}, ) respx.post( "/axis-cgi/param.cgi", data={"action": "list", "group": "root.PTZ"}, ).respond( text=PTZ_RESPONSE, headers={"Content-Type": "text/plain"}, ) respx.post( "/axis-cgi/param.cgi", data={"action": "list", "group": "root.StreamProfile"}, ).respond( text=STREAM_PROFILES_RESPONSE, headers={"Content-Type": "text/plain"}, ) respx.post("/axis-cgi/applications/list.cgi").respond( text=APPLICATIONS_LIST_RESPONSE, headers={"Content-Type": "text/xml"}, ) respx.post("/local/fenceguard/control.cgi").respond(json=APP_VMD4_RESPONSE) respx.post("/local/loiteringguard/control.cgi").respond(json=APP_VMD4_RESPONSE) respx.post("/local/motionguard/control.cgi").respond(json=APP_VMD4_RESPONSE) respx.post("/local/vmd/control.cgi").respond(json=APP_VMD4_RESPONSE) respx.post("/local/objectanalytics/control.cgi").respond(json=APP_AOA_RESPONSE) return __mock_default_requests @pytest.fixture def api_discovery_items() -> dict[str, Any]: """Additional Apidiscovery items.""" return {} @pytest.fixture(autouse=True) def api_discovery_fixture(api_discovery_items: dict[str, Any]) -> None: """Apidiscovery mock response.""" data = deepcopy(API_DISCOVERY_RESPONSE) if api_discovery_items: data["data"]["apiList"].append(api_discovery_items) respx.post(f"http://{DEFAULT_HOST}:80/axis-cgi/apidiscovery.cgi").respond(json=data) @pytest.fixture(name="port_management_payload") def io_port_management_data_fixture() -> dict[str, Any]: """Property parameter data.""" return PORT_MANAGEMENT_RESPONSE @pytest.fixture(name="param_properties_payload") def param_properties_data_fixture() -> dict[str, Any]: """Property parameter data.""" return PROPERTIES_RESPONSE @pytest.fixture(name="param_ports_payload") def param_ports_data_fixture() -> dict[str, Any]: """Property parameter data.""" return PORTS_RESPONSE @pytest.fixture(name="mqtt_status_code") def mqtt_status_code_fixture(): """Property parameter data.""" return 200 @pytest.fixture(name="setup_default_vapix_requests") def default_vapix_requests_fixture(mock_vapix_requests: Callable[[str], None]) -> None: """Mock default Vapix requests responses.""" mock_vapix_requests(DEFAULT_HOST) @pytest.fixture(name="prepare_config_entry") async def prep_config_entry_fixture( hass: HomeAssistant, config_entry: ConfigEntry, setup_default_vapix_requests: None ) -> Callable[[], ConfigEntry]: """Fixture factory to set up Axis network device.""" async def __mock_setup_config_entry() -> ConfigEntry: assert await hass.config_entries.async_setup(config_entry.entry_id) await hass.async_block_till_done() return config_entry return __mock_setup_config_entry @pytest.fixture(name="setup_config_entry") async def setup_config_entry_fixture( hass: HomeAssistant, config_entry: ConfigEntry, setup_default_vapix_requests: None ) -> ConfigEntry: """Define a fixture to set up Axis network device.""" assert await hass.config_entries.async_setup(config_entry.entry_id) await hass.async_block_till_done() return config_entry # RTSP fixtures @pytest.fixture(autouse=True) def mock_axis_rtspclient() -> Generator[Callable[[dict | None, str], None], None, None]: """No real RTSP communication allowed.""" with patch("axis.stream_manager.RTSPClient") as rtsp_client_mock: rtsp_client_mock.return_value.session.state = State.STOPPED async def start_stream() -> None: """Set state to playing when calling RTSPClient.start.""" rtsp_client_mock.return_value.session.state = State.PLAYING rtsp_client_mock.return_value.start = start_stream def stop_stream() -> None: """Set state to stopped when calling RTSPClient.stop.""" rtsp_client_mock.return_value.session.state = State.STOPPED rtsp_client_mock.return_value.stop = stop_stream def make_rtsp_call(data: dict | None = None, state: str = "") -> None: """Generate a RTSP call.""" axis_streammanager_session_callback = rtsp_client_mock.call_args[0][4] if data: rtsp_client_mock.return_value.rtp.data = data axis_streammanager_session_callback(signal=Signal.DATA) elif state: axis_streammanager_session_callback(signal=state) else: raise NotImplementedError yield make_rtsp_call @pytest.fixture(autouse=True) def mock_rtsp_event( mock_axis_rtspclient: Callable[[dict | None, str], None], ) -> Callable[[str, str, str, str, str, str], None]: """Fixture to allow mocking received RTSP events.""" def send_event( topic: str, data_type: str, data_value: str, operation: str = "Initialized", source_name: str = "", source_idx: str = "", ) -> None: source = "" if source_name != "" and source_idx != "": source = f'' event = f""" {topic} uri://bf32a3b9-e5e7-4d57-a48d-1b5be9ae7b16/ProducerReference {source} """ mock_axis_rtspclient(data=event.encode("utf-8")) return send_event @pytest.fixture(autouse=True) def mock_rtsp_signal_state( mock_axis_rtspclient: Callable[[dict | None, str], None], ) -> Callable[[bool], None]: """Fixture to allow mocking RTSP state signalling.""" def send_signal(connected: bool) -> None: """Signal state change of RTSP connection.""" signal = Signal.PLAYING if connected else Signal.FAILED mock_axis_rtspclient(state=signal) return send_signal