"""Test MQTT utils.""" from collections.abc import Callable from random import getrandbits from unittest.mock import patch import pytest from homeassistant.components import mqtt from homeassistant.config_entries import ConfigEntryDisabler, ConfigEntryState from homeassistant.core import CoreState, HomeAssistant from tests.common import MockConfigEntry from tests.typing import MqttMockHAClient, MqttMockPahoClient @pytest.fixture(autouse=True) def mock_temp_dir(): """Mock the certificate temp directory.""" with patch( # Patch temp dir name to avoid tests fail running in parallel "homeassistant.components.mqtt.util.TEMP_DIR_NAME", "home-assistant-mqtt" + f"-{getrandbits(10):03x}", ) as mocked_temp_dir: yield mocked_temp_dir @pytest.mark.parametrize( ("option", "content", "file_created"), [ (mqtt.CONF_CERTIFICATE, "auto", False), (mqtt.CONF_CERTIFICATE, "### CA CERTIFICATE ###", True), (mqtt.CONF_CLIENT_CERT, "### CLIENT CERTIFICATE ###", True), (mqtt.CONF_CLIENT_KEY, "### PRIVATE KEY ###", True), ], ) async def test_async_create_certificate_temp_files( hass: HomeAssistant, mock_temp_dir, option, content, file_created ) -> None: """Test creating and reading certificate files.""" config = {option: content} await mqtt.util.async_create_certificate_temp_files(hass, config) file_path = mqtt.util.get_file_path(option) assert bool(file_path) is file_created assert ( mqtt.util.migrate_certificate_file_to_content(file_path or content) == content ) async def test_reading_non_exitisting_certificate_file() -> None: """Test reading a non existing certificate file.""" assert ( mqtt.util.migrate_certificate_file_to_content("/home/file_not_exists") is None ) @patch("homeassistant.components.mqtt.PLATFORMS", []) async def test_waiting_for_client_not_loaded( hass: HomeAssistant, mqtt_client_mock: MqttMockPahoClient, ) -> None: """Test waiting for client while mqtt entry is not yet loaded.""" hass.state = CoreState.starting await hass.async_block_till_done() entry = MockConfigEntry( domain=mqtt.DOMAIN, data={"broker": "test-broker"}, state=ConfigEntryState.NOT_LOADED, ) entry.add_to_hass(hass) unsubs: list[Callable[[], None]] = [] async def _async_just_in_time_subscribe() -> Callable[[], None]: nonlocal unsub assert await mqtt.async_wait_for_mqtt_client(hass) # Awaiting a second time should work too and return True assert await mqtt.async_wait_for_mqtt_client(hass) unsubs.append(await mqtt.async_subscribe(hass, "test_topic", lambda msg: None)) # Simulate some integration waiting for the client to become available hass.async_add_job(_async_just_in_time_subscribe) hass.async_add_job(_async_just_in_time_subscribe) hass.async_add_job(_async_just_in_time_subscribe) hass.async_add_job(_async_just_in_time_subscribe) assert entry.state == ConfigEntryState.NOT_LOADED assert await hass.config_entries.async_setup(entry.entry_id) assert len(unsubs) == 4 for unsub in unsubs: unsub() @patch("homeassistant.components.mqtt.PLATFORMS", []) async def test_waiting_for_client_loaded( hass: HomeAssistant, mqtt_mock: MqttMockHAClient, ) -> None: """Test waiting for client where mqtt entry is loaded.""" unsub: Callable[[], None] | None = None async def _async_just_in_time_subscribe() -> Callable[[], None]: nonlocal unsub assert await mqtt.async_wait_for_mqtt_client(hass) unsub = await mqtt.async_subscribe(hass, "test_topic", lambda msg: None) entry = hass.config_entries.async_entries(mqtt.DATA_MQTT)[0] assert entry.state == ConfigEntryState.LOADED await _async_just_in_time_subscribe() assert unsub is not None unsub() async def test_waiting_for_client_entry_fails( hass: HomeAssistant, mqtt_client_mock: MqttMockPahoClient, ) -> None: """Test waiting for client where mqtt entry is failing.""" hass.state = CoreState.starting await hass.async_block_till_done() entry = MockConfigEntry( domain=mqtt.DOMAIN, data={"broker": "test-broker"}, state=ConfigEntryState.NOT_LOADED, ) entry.add_to_hass(hass) async def _async_just_in_time_subscribe() -> Callable[[], None]: assert not await mqtt.async_wait_for_mqtt_client(hass) hass.async_add_job(_async_just_in_time_subscribe) assert entry.state == ConfigEntryState.NOT_LOADED with patch( "homeassistant.components.mqtt.async_setup_entry", side_effect=Exception, ): await hass.config_entries.async_setup(entry.entry_id) assert entry.state == ConfigEntryState.SETUP_ERROR async def test_waiting_for_client_setup_fails( hass: HomeAssistant, mqtt_client_mock: MqttMockPahoClient, ) -> None: """Test waiting for client where mqtt entry is failing during setup.""" hass.state = CoreState.starting await hass.async_block_till_done() entry = MockConfigEntry( domain=mqtt.DOMAIN, data={"broker": "test-broker"}, state=ConfigEntryState.NOT_LOADED, ) entry.add_to_hass(hass) async def _async_just_in_time_subscribe() -> Callable[[], None]: assert not await mqtt.async_wait_for_mqtt_client(hass) hass.async_add_job(_async_just_in_time_subscribe) assert entry.state == ConfigEntryState.NOT_LOADED # Simulate MQTT setup fails before the client would become available mqtt_client_mock.connect.side_effect = Exception assert not await hass.config_entries.async_setup(entry.entry_id) assert entry.state == ConfigEntryState.SETUP_ERROR @patch("homeassistant.components.mqtt.util.AVAILABILITY_TIMEOUT", 0.01) async def test_waiting_for_client_timeout( hass: HomeAssistant, ) -> None: """Test waiting for client with timeout.""" hass.state = CoreState.starting await hass.async_block_till_done() entry = MockConfigEntry( domain=mqtt.DOMAIN, data={"broker": "test-broker"}, state=ConfigEntryState.NOT_LOADED, ) entry.add_to_hass(hass) assert entry.state == ConfigEntryState.NOT_LOADED # returns False after timeout assert not await mqtt.async_wait_for_mqtt_client(hass) async def test_waiting_for_client_with_disabled_entry( hass: HomeAssistant, ) -> None: """Test waiting for client with timeout.""" hass.state = CoreState.starting await hass.async_block_till_done() entry = MockConfigEntry( domain=mqtt.DOMAIN, data={"broker": "test-broker"}, state=ConfigEntryState.NOT_LOADED, ) entry.add_to_hass(hass) # Disable MQTT config entry await hass.config_entries.async_set_disabled_by( entry.entry_id, ConfigEntryDisabler.USER ) assert entry.state == ConfigEntryState.NOT_LOADED # returns False because entry is disabled assert not await mqtt.async_wait_for_mqtt_client(hass)