"""Common test objects.""" from contextlib import suppress import copy from datetime import datetime import json from pathlib import Path from typing import Any from unittest.mock import ANY, MagicMock, patch import pytest import yaml from homeassistant import config as hass_config from homeassistant.components import mqtt from homeassistant.components.mqtt import debug_info from homeassistant.components.mqtt.const import MQTT_DISCONNECTED from homeassistant.components.mqtt.mixins import MQTT_ATTRIBUTES_BLOCKED from homeassistant.config_entries import ConfigEntryState from homeassistant.const import ( ATTR_ASSUMED_STATE, ATTR_ENTITY_ID, SERVICE_RELOAD, STATE_UNAVAILABLE, ) from homeassistant.core import HomeAssistant from homeassistant.generated.mqtt import MQTT from homeassistant.helpers import device_registry as dr, entity_registry as er from homeassistant.helpers.dispatcher import async_dispatcher_send from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from homeassistant.setup import async_setup_component from tests.common import MockConfigEntry, async_fire_mqtt_message from tests.typing import MqttMockHAClientGenerator DEFAULT_CONFIG_DEVICE_INFO_ID = { "identifiers": ["helloworld"], "manufacturer": "Whatever", "name": "Beer", "model": "Glass", "hw_version": "rev1", "sw_version": "0.1-beta", "suggested_area": "default_area", "configuration_url": "http://example.com", } DEFAULT_CONFIG_DEVICE_INFO_MAC = { "connections": [[dr.CONNECTION_NETWORK_MAC, "02:5b:26:a8:dc:12"]], "manufacturer": "Whatever", "name": "Beer", "model": "Glass", "hw_version": "rev1", "sw_version": "0.1-beta", "suggested_area": "default_area", "configuration_url": "http://example.com", } _SENTINEL = object() DISCOVERY_COUNT = len(MQTT) _MqttMessageType = list[tuple[str, str]] _AttributesType = list[tuple[str, Any]] _StateDataType = list[tuple[_MqttMessageType, str | None, _AttributesType | None]] async def help_test_availability_when_connection_lost( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test availability after MQTT disconnection.""" assert await async_setup_component(hass, mqtt.DOMAIN, config) await hass.async_block_till_done() mqtt_mock = await mqtt_mock_entry_with_yaml_config() state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE mqtt_mock.connected = False async_dispatcher_send(hass, MQTT_DISCONNECTED) await hass.async_block_till_done() state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE async def help_test_availability_without_topic( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test availability without defined availability topic.""" assert "availability_topic" not in config[mqtt.DOMAIN][domain] assert await async_setup_component(hass, mqtt.DOMAIN, config) await hass.async_block_till_done() await mqtt_mock_entry_with_yaml_config() state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE async def help_test_default_availability_payload( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, no_assumed_state: bool = False, state_topic: str | None = None, state_message: str | None = None, ) -> None: """Test availability by default payload with defined topic. This is a test helper for the MqttAvailability mixin. """ # Add availability settings to config config = copy.deepcopy(config) config[mqtt.DOMAIN][domain]["availability_topic"] = "availability-topic" assert await async_setup_component( hass, mqtt.DOMAIN, config, ) await hass.async_block_till_done() await mqtt_mock_entry_with_yaml_config() state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE async_fire_mqtt_message(hass, "availability-topic", "online") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE if no_assumed_state: assert not state.attributes.get(ATTR_ASSUMED_STATE) async_fire_mqtt_message(hass, "availability-topic", "offline") state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE if state_topic is not None and state_message is not None: async_fire_mqtt_message(hass, state_topic, state_message) state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE async_fire_mqtt_message(hass, "availability-topic", "online") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE async def help_test_default_availability_list_payload( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, no_assumed_state: bool = False, state_topic: str | None = None, state_message: str | None = None, ) -> None: """Test availability by default payload with defined topic. This is a test helper for the MqttAvailability mixin. """ # Add availability settings to config config = copy.deepcopy(config) config[mqtt.DOMAIN][domain]["availability"] = [ {"topic": "availability-topic1"}, {"topic": "availability-topic2"}, ] assert await async_setup_component( hass, mqtt.DOMAIN, config, ) await hass.async_block_till_done() await mqtt_mock_entry_with_yaml_config() state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE async_fire_mqtt_message(hass, "availability-topic1", "online") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE if no_assumed_state: assert not state.attributes.get(ATTR_ASSUMED_STATE) async_fire_mqtt_message(hass, "availability-topic1", "offline") state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE async_fire_mqtt_message(hass, "availability-topic2", "online") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE if no_assumed_state: assert not state.attributes.get(ATTR_ASSUMED_STATE) async_fire_mqtt_message(hass, "availability-topic2", "offline") state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE if state_topic is not None and state_message is not None: async_fire_mqtt_message(hass, state_topic, state_message) state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE async_fire_mqtt_message(hass, "availability-topic1", "online") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE async def help_test_default_availability_list_payload_all( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, no_assumed_state: bool = False, state_topic: str | None = None, state_message: str | None = None, ) -> None: """Test availability by default payload with defined topic. This is a test helper for the MqttAvailability mixin. """ # Add availability settings to config config = copy.deepcopy(config) config[mqtt.DOMAIN][domain]["availability_mode"] = "all" config[mqtt.DOMAIN][domain]["availability"] = [ {"topic": "availability-topic1"}, {"topic": "availability-topic2"}, ] assert await async_setup_component( hass, mqtt.DOMAIN, config, ) await hass.async_block_till_done() await mqtt_mock_entry_with_yaml_config() state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE async_fire_mqtt_message(hass, "availability-topic1", "online") state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE if no_assumed_state: assert not state.attributes.get(ATTR_ASSUMED_STATE) async_fire_mqtt_message(hass, "availability-topic2", "online") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE async_fire_mqtt_message(hass, "availability-topic2", "offline") state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE if no_assumed_state: assert not state.attributes.get(ATTR_ASSUMED_STATE) async_fire_mqtt_message(hass, "availability-topic2", "online") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE async_fire_mqtt_message(hass, "availability-topic1", "offline") state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE if no_assumed_state: assert not state.attributes.get(ATTR_ASSUMED_STATE) async_fire_mqtt_message(hass, "availability-topic1", "online") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE async def help_test_default_availability_list_payload_any( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, no_assumed_state: bool = False, state_topic: str | None = None, state_message: str | None = None, ) -> None: """Test availability by default payload with defined topic. This is a test helper for the MqttAvailability mixin. """ # Add availability settings to config config = copy.deepcopy(config) config[mqtt.DOMAIN][domain]["availability_mode"] = "any" config[mqtt.DOMAIN][domain]["availability"] = [ {"topic": "availability-topic1"}, {"topic": "availability-topic2"}, ] assert await async_setup_component( hass, mqtt.DOMAIN, config, ) await hass.async_block_till_done() await mqtt_mock_entry_with_yaml_config() state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE async_fire_mqtt_message(hass, "availability-topic1", "online") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE if no_assumed_state: assert not state.attributes.get(ATTR_ASSUMED_STATE) async_fire_mqtt_message(hass, "availability-topic2", "online") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE async_fire_mqtt_message(hass, "availability-topic2", "offline") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE if no_assumed_state: assert not state.attributes.get(ATTR_ASSUMED_STATE) async_fire_mqtt_message(hass, "availability-topic1", "offline") state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE async_fire_mqtt_message(hass, "availability-topic1", "online") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE if no_assumed_state: assert not state.attributes.get(ATTR_ASSUMED_STATE) async def help_test_default_availability_list_single( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, domain: str, config: ConfigType, ) -> None: """Test availability list and availability_topic are mutually exclusive. This is a test helper for the MqttAvailability mixin. """ # Add availability settings to config config = copy.deepcopy(config) config[mqtt.DOMAIN][domain]["availability"] = [ {"topic": "availability-topic1"}, ] config[mqtt.DOMAIN][domain]["availability_topic"] = "availability-topic" assert not await async_setup_component( hass, mqtt.DOMAIN, config, ) assert ( "Invalid config for [mqtt]: two or more values in the same group of exclusion 'availability'" in caplog.text ) async def help_test_custom_availability_payload( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, no_assumed_state: bool = False, state_topic: str | None = None, state_message: str | None = None, ) -> None: """Test availability by custom payload with defined topic. This is a test helper for the MqttAvailability mixin. """ # Add availability settings to config config = copy.deepcopy(config) config[mqtt.DOMAIN][domain]["availability_topic"] = "availability-topic" config[mqtt.DOMAIN][domain]["payload_available"] = "good" config[mqtt.DOMAIN][domain]["payload_not_available"] = "nogood" assert await async_setup_component( hass, mqtt.DOMAIN, config, ) await hass.async_block_till_done() await mqtt_mock_entry_with_yaml_config() state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE async_fire_mqtt_message(hass, "availability-topic", "good") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE if no_assumed_state: assert not state.attributes.get(ATTR_ASSUMED_STATE) async_fire_mqtt_message(hass, "availability-topic", "nogood") state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE if state_topic is not None and state_message is not None: async_fire_mqtt_message(hass, state_topic, state_message) state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE async_fire_mqtt_message(hass, "availability-topic", "good") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE async def help_test_discovery_update_availability( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test update of discovered MQTTAvailability. This is a test helper for the MQTTAvailability mixin. """ await mqtt_mock_entry_no_yaml_config() # Add availability settings to config config1 = copy.deepcopy(config) config1[mqtt.DOMAIN][domain]["availability_topic"] = "availability-topic1" config2 = copy.deepcopy(config) config2[mqtt.DOMAIN][domain]["availability"] = [ {"topic": "availability-topic2"}, {"topic": "availability-topic3"}, ] config3 = copy.deepcopy(config) config3[mqtt.DOMAIN][domain]["availability_topic"] = "availability-topic4" data1 = json.dumps(config1[mqtt.DOMAIN][domain]) data2 = json.dumps(config2[mqtt.DOMAIN][domain]) data3 = json.dumps(config3[mqtt.DOMAIN][domain]) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1) await hass.async_block_till_done() state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE async_fire_mqtt_message(hass, "availability-topic1", "online") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE async_fire_mqtt_message(hass, "availability-topic1", "offline") state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE # Change availability_topic async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data2) await hass.async_block_till_done() # Verify we are no longer subscribing to the old topic async_fire_mqtt_message(hass, "availability-topic1", "online") state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE # Verify we are subscribing to the new topic async_fire_mqtt_message(hass, "availability-topic2", "online") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE # Verify we are subscribing to the new topic async_fire_mqtt_message(hass, "availability-topic3", "offline") state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE # Change availability_topic async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data3) await hass.async_block_till_done() # Verify we are no longer subscribing to the old topic async_fire_mqtt_message(hass, "availability-topic2", "online") state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE # Verify we are no longer subscribing to the old topic async_fire_mqtt_message(hass, "availability-topic3", "online") state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE # Verify we are subscribing to the new topic async_fire_mqtt_message(hass, "availability-topic4", "online") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE async def help_test_setting_attribute_via_mqtt_json_message( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test the setting of attribute via MQTT with JSON payload. This is a test helper for the MqttAttributes mixin. """ # Add JSON attributes settings to config config = copy.deepcopy(config) config[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic" assert await async_setup_component( hass, mqtt.DOMAIN, config, ) await hass.async_block_till_done() await mqtt_mock_entry_with_yaml_config() async_fire_mqtt_message(hass, "attr-topic", '{ "val": "100" }') state = hass.states.get(f"{domain}.test") assert state and state.attributes.get("val") == "100" async def help_test_setting_blocked_attribute_via_mqtt_json_message( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, extra_blocked_attributes: frozenset[str] | None, ) -> None: """Test the setting of blocked attribute via MQTT with JSON payload. This is a test helper for the MqttAttributes mixin. """ await mqtt_mock_entry_no_yaml_config() extra_blocked_attribute_list = list(extra_blocked_attributes or []) # Add JSON attributes settings to config config = copy.deepcopy(config) config[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic" data = json.dumps(config[mqtt.DOMAIN][domain]) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data) await hass.async_block_till_done() val = "abc123" for attr in MQTT_ATTRIBUTES_BLOCKED: async_fire_mqtt_message(hass, "attr-topic", json.dumps({attr: val})) state = hass.states.get(f"{domain}.test") assert state and state.attributes.get(attr) != val for attr in extra_blocked_attribute_list: async_fire_mqtt_message(hass, "attr-topic", json.dumps({attr: val})) state = hass.states.get(f"{domain}.test") assert state and state.attributes.get(attr) != val async def help_test_setting_attribute_with_template( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test the setting of attribute via MQTT with JSON payload. This is a test helper for the MqttAttributes mixin. """ # Add JSON attributes settings to config config = copy.deepcopy(config) config[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic" config[mqtt.DOMAIN][domain][ "json_attributes_template" ] = "{{ value_json['Timer1'] | tojson }}" assert await async_setup_component( hass, mqtt.DOMAIN, config, ) await hass.async_block_till_done() await mqtt_mock_entry_with_yaml_config() async_fire_mqtt_message( hass, "attr-topic", json.dumps({"Timer1": {"Arm": 0, "Time": "22:18"}}) ) state = hass.states.get(f"{domain}.test") assert state is not None assert state.attributes.get("Arm") == 0 assert state.attributes.get("Time") == "22:18" async def help_test_update_with_json_attrs_not_dict( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, caplog: pytest.LogCaptureFixture, domain: str, config: ConfigType, ) -> None: """Test attributes get extracted from a JSON result. This is a test helper for the MqttAttributes mixin. """ # Add JSON attributes settings to config config = copy.deepcopy(config) config[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic" assert await async_setup_component( hass, mqtt.DOMAIN, config, ) await hass.async_block_till_done() await mqtt_mock_entry_with_yaml_config() async_fire_mqtt_message(hass, "attr-topic", '[ "list", "of", "things"]') state = hass.states.get(f"{domain}.test") assert state and state.attributes.get("val") is None assert "JSON result was not a dictionary" in caplog.text async def help_test_update_with_json_attrs_bad_json( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, caplog: pytest.LogCaptureFixture, domain: str, config: ConfigType, ) -> None: """Test JSON validation of attributes. This is a test helper for the MqttAttributes mixin. """ # Add JSON attributes settings to config config = copy.deepcopy(config) config[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic" assert await async_setup_component( hass, mqtt.DOMAIN, config, ) await hass.async_block_till_done() await mqtt_mock_entry_with_yaml_config() async_fire_mqtt_message(hass, "attr-topic", "This is not JSON") state = hass.states.get(f"{domain}.test") assert state and state.attributes.get("val") is None assert "Erroneous JSON: This is not JSON" in caplog.text async def help_test_discovery_update_attr( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, caplog: pytest.LogCaptureFixture, domain: str, config: ConfigType, ) -> None: """Test update of discovered MQTTAttributes. This is a test helper for the MqttAttributes mixin. """ await mqtt_mock_entry_no_yaml_config() # Add JSON attributes settings to config config1 = copy.deepcopy(config) config1[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic1" config2 = copy.deepcopy(config) config2[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic2" data1 = json.dumps(config1[mqtt.DOMAIN][domain]) data2 = json.dumps(config2[mqtt.DOMAIN][domain]) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1) await hass.async_block_till_done() async_fire_mqtt_message(hass, "attr-topic1", '{ "val": "100" }') state = hass.states.get(f"{domain}.test") assert state and state.attributes.get("val") == "100" # Change json_attributes_topic async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data2) await hass.async_block_till_done() # Verify we are no longer subscribing to the old topic async_fire_mqtt_message(hass, "attr-topic1", '{ "val": "50" }') state = hass.states.get(f"{domain}.test") assert state and state.attributes.get("val") == "100" # Verify we are subscribing to the new topic async_fire_mqtt_message(hass, "attr-topic2", '{ "val": "75" }') state = hass.states.get(f"{domain}.test") assert state and state.attributes.get("val") == "75" async def help_test_unique_id( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test unique id option only creates one entity per unique_id.""" assert await async_setup_component(hass, mqtt.DOMAIN, config) await hass.async_block_till_done() await mqtt_mock_entry_with_yaml_config() assert len(hass.states.async_entity_ids(domain)) == 1 async def help_test_discovery_removal( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, caplog: pytest.LogCaptureFixture, domain: str, data: str, ) -> None: """Test removal of discovered component. This is a test helper for the MqttDiscoveryUpdate mixin. """ await mqtt_mock_entry_no_yaml_config() async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data) await hass.async_block_till_done() state = hass.states.get(f"{domain}.test") assert state is not None assert state.name == "test" async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", "") await hass.async_block_till_done() state = hass.states.get(f"{domain}.test") assert state is None async def help_test_discovery_update( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, caplog, domain, discovery_config1: DiscoveryInfoType, discovery_config2: DiscoveryInfoType, state_data1: _StateDataType | None = None, state_data2: _StateDataType | None = None, ) -> None: """Test update of discovered component. This is a test helper for the MqttDiscoveryUpdate mixin. """ await mqtt_mock_entry_no_yaml_config() # Add some future configuration to the configurations config1 = copy.deepcopy(discovery_config1) config1["some_future_option_1"] = "future_option_1" config2 = copy.deepcopy(discovery_config2) config2["some_future_option_2"] = "future_option_2" discovery_data1 = json.dumps(config1) discovery_data2 = json.dumps(config2) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", discovery_data1) await hass.async_block_till_done() state = hass.states.get(f"{domain}.beer") assert state is not None assert state.name == "Beer" if state_data1: for mqtt_messages, expected_state, attributes in state_data1: for topic, data in mqtt_messages: async_fire_mqtt_message(hass, topic, data) state = hass.states.get(f"{domain}.beer") assert state is not None if expected_state: assert state.state == expected_state if attributes: for attr, value in attributes: assert state.attributes.get(attr) == value async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", discovery_data2) await hass.async_block_till_done() state = hass.states.get(f"{domain}.beer") assert state is not None assert state.name == "Milk" if state_data2: for mqtt_messages, expected_state, attributes in state_data2: for topic, data in mqtt_messages: async_fire_mqtt_message(hass, topic, data) state = hass.states.get(f"{domain}.beer") assert state is not None if expected_state: assert state.state == expected_state if attributes: for attr, value in attributes: assert state.attributes.get(attr) == value state = hass.states.get(f"{domain}.milk") assert state is None async def help_test_discovery_update_unchanged( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, caplog: pytest.LogCaptureFixture, domain: str, data1: str, discovery_update: MagicMock, ) -> None: """Test update of discovered component without changes. This is a test helper for the MqttDiscoveryUpdate mixin. """ await mqtt_mock_entry_no_yaml_config() async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1) await hass.async_block_till_done() state = hass.states.get(f"{domain}.beer") assert state is not None assert state.name == "Beer" async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1) await hass.async_block_till_done() assert not discovery_update.called async def help_test_discovery_broken( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, caplog: pytest.LogCaptureFixture, domain: str, data1: str, data2: str, ) -> None: """Test handling of bad discovery message.""" await mqtt_mock_entry_no_yaml_config() async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1) await hass.async_block_till_done() state = hass.states.get(f"{domain}.beer") assert state is None async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data2) await hass.async_block_till_done() state = hass.states.get(f"{domain}.milk") assert state is not None assert state.name == "Milk" state = hass.states.get(f"{domain}.beer") assert state is None async def help_test_encoding_subscribable_topics( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, caplog: pytest.LogCaptureFixture, domain: str, config: ConfigType, topic: str, value: Any, attribute: str | None = None, attribute_value: Any = None, init_payload: str | None = None, skip_raw_test: bool = False, ) -> None: """Test handling of incoming encoded payload.""" async def _test_encoding( hass: HomeAssistant, entity_id, topic, encoded_value, attribute, init_payload_topic, init_payload_value, ) -> Any: state = hass.states.get(entity_id) if init_payload_value: # Sometimes a device needs to have an initialization pay load, e.g. to switch the device on. async_fire_mqtt_message(hass, init_payload_topic, init_payload_value) await hass.async_block_till_done() state = hass.states.get(entity_id) async_fire_mqtt_message(hass, topic, encoded_value) await hass.async_block_till_done() state = hass.states.get(entity_id) assert state is not None if attribute: return state.attributes.get(attribute) return state.state if state else None init_payload_value_utf8 = None init_payload_value_utf16 = None # setup test1 default encoding config1 = copy.deepcopy(config) if domain == "device_tracker": config1["unique_id"] = "test1" else: config1["name"] = "test1" config1[topic] = "topic/test1" # setup test2 alternate encoding config2 = copy.deepcopy(config) if domain == "device_tracker": config2["unique_id"] = "test2" else: config2["name"] = "test2" config2["encoding"] = "utf-16" config2[topic] = "topic/test2" # setup test3 raw encoding config3 = copy.deepcopy(config) if domain == "device_tracker": config3["unique_id"] = "test3" else: config3["name"] = "test3" config3["encoding"] = "" config3[topic] = "topic/test3" if init_payload: config1[init_payload[0]] = "topic/init_payload1" config2[init_payload[0]] = "topic/init_payload2" config3[init_payload[0]] = "topic/init_payload3" init_payload_value_utf8 = init_payload[1].encode("utf-8") init_payload_value_utf16 = init_payload[1].encode("utf-16") await hass.async_block_till_done() assert await async_setup_component( hass, mqtt.DOMAIN, {mqtt.DOMAIN: {domain: [config1, config2, config3]}} ) await hass.async_block_till_done() await mqtt_mock_entry_with_yaml_config() expected_result = attribute_value or value # test1 default encoding assert ( await _test_encoding( hass, f"{domain}.test1", "topic/test1", value.encode("utf-8"), attribute, "topic/init_payload1", init_payload_value_utf8, ) == expected_result ) # test2 alternate encoding assert ( await _test_encoding( hass, f"{domain}.test2", "topic/test2", value.encode("utf-16"), attribute, "topic/init_payload2", init_payload_value_utf16, ) == expected_result ) # test3 raw encoded input if skip_raw_test: return with suppress(AttributeError, TypeError, ValueError): result = await _test_encoding( hass, f"{domain}.test3", "topic/test3", value.encode("utf-16"), attribute, "topic/init_payload3", init_payload_value_utf16, ) assert result != expected_result async def help_test_entity_device_info_with_identifier( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test device registry integration. This is a test helper for the MqttDiscoveryUpdate mixin. """ await mqtt_mock_entry_no_yaml_config() # Add device settings to config config = copy.deepcopy(config[mqtt.DOMAIN][domain]) config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID) config["unique_id"] = "veryunique" registry = dr.async_get(hass) data = json.dumps(config) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data) await hass.async_block_till_done() device = registry.async_get_device({("mqtt", "helloworld")}) assert device is not None assert device.identifiers == {("mqtt", "helloworld")} assert device.manufacturer == "Whatever" assert device.name == "Beer" assert device.model == "Glass" assert device.hw_version == "rev1" assert device.sw_version == "0.1-beta" assert device.suggested_area == "default_area" assert device.configuration_url == "http://example.com" async def help_test_entity_device_info_with_connection( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test device registry integration. This is a test helper for the MqttDiscoveryUpdate mixin. """ await mqtt_mock_entry_no_yaml_config() # Add device settings to config config = copy.deepcopy(config[mqtt.DOMAIN][domain]) config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_MAC) config["unique_id"] = "veryunique" registry = dr.async_get(hass) data = json.dumps(config) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data) await hass.async_block_till_done() device = registry.async_get_device( set(), {(dr.CONNECTION_NETWORK_MAC, "02:5b:26:a8:dc:12")} ) assert device is not None assert device.connections == {(dr.CONNECTION_NETWORK_MAC, "02:5b:26:a8:dc:12")} assert device.manufacturer == "Whatever" assert device.name == "Beer" assert device.model == "Glass" assert device.hw_version == "rev1" assert device.sw_version == "0.1-beta" assert device.suggested_area == "default_area" assert device.configuration_url == "http://example.com" async def help_test_entity_device_info_remove( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test device registry remove.""" await mqtt_mock_entry_no_yaml_config() # Add device settings to config config = copy.deepcopy(config[mqtt.DOMAIN][domain]) config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID) config["unique_id"] = "veryunique" dev_registry = dr.async_get(hass) ent_registry = er.async_get(hass) data = json.dumps(config) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data) await hass.async_block_till_done() device = dev_registry.async_get_device({("mqtt", "helloworld")}) assert device is not None assert ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique") async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", "") await hass.async_block_till_done() device = dev_registry.async_get_device({("mqtt", "helloworld")}) assert device is None assert not ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique") async def help_test_entity_device_info_update( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test device registry update. This is a test helper for the MqttDiscoveryUpdate mixin. """ await mqtt_mock_entry_no_yaml_config() # Add device settings to config config = copy.deepcopy(config[mqtt.DOMAIN][domain]) config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID) config["unique_id"] = "veryunique" registry = dr.async_get(hass) data = json.dumps(config) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data) await hass.async_block_till_done() device = registry.async_get_device({("mqtt", "helloworld")}) assert device is not None assert device.name == "Beer" config["device"]["name"] = "Milk" data = json.dumps(config) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data) await hass.async_block_till_done() device = registry.async_get_device({("mqtt", "helloworld")}) assert device is not None assert device.name == "Milk" async def help_test_entity_id_update_subscriptions( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, topics: list[str] | None = None, ) -> None: """Test MQTT subscriptions are managed when entity_id is updated.""" # Add unique_id to config config = copy.deepcopy(config) config[mqtt.DOMAIN][domain]["unique_id"] = "TOTALLY_UNIQUE" if topics is None: # Add default topics to config config[mqtt.DOMAIN][domain]["availability_topic"] = "avty-topic" config[mqtt.DOMAIN][domain]["state_topic"] = "test-topic" topics = ["avty-topic", "test-topic"] assert len(topics) > 0 entity_registry = er.async_get(hass) assert await async_setup_component( hass, mqtt.DOMAIN, config, ) await hass.async_block_till_done() mqtt_mock = await mqtt_mock_entry_with_yaml_config() state = hass.states.get(f"{domain}.test") assert state is not None assert mqtt_mock.async_subscribe.call_count == len(topics) + 2 + DISCOVERY_COUNT for topic in topics: mqtt_mock.async_subscribe.assert_any_call(topic, ANY, ANY, ANY) mqtt_mock.async_subscribe.reset_mock() entity_registry.async_update_entity( f"{domain}.test", new_entity_id=f"{domain}.milk" ) await hass.async_block_till_done() state = hass.states.get(f"{domain}.test") assert state is None state = hass.states.get(f"{domain}.milk") assert state is not None for topic in topics: mqtt_mock.async_subscribe.assert_any_call(topic, ANY, ANY, ANY) async def help_test_entity_id_update_discovery_update( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, topic: str | None = None, ) -> None: """Test MQTT discovery update after entity_id is updated.""" # Add unique_id to config await mqtt_mock_entry_no_yaml_config() config = copy.deepcopy(config) config[mqtt.DOMAIN][domain]["unique_id"] = "TOTALLY_UNIQUE" if topic is None: # Add default topic to config config[mqtt.DOMAIN][domain]["availability_topic"] = "avty-topic" topic = "avty-topic" entity_registry = er.async_get(hass) data = json.dumps(config[mqtt.DOMAIN][domain]) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data) await hass.async_block_till_done() async_fire_mqtt_message(hass, topic, "online") state = hass.states.get(f"{domain}.test") assert state and state.state != STATE_UNAVAILABLE async_fire_mqtt_message(hass, topic, "offline") state = hass.states.get(f"{domain}.test") assert state and state.state == STATE_UNAVAILABLE entity_registry.async_update_entity( f"{domain}.test", new_entity_id=f"{domain}.milk" ) await hass.async_block_till_done() config[mqtt.DOMAIN][domain]["availability_topic"] = f"{topic}_2" data = json.dumps(config[mqtt.DOMAIN][domain]) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data) await hass.async_block_till_done() assert len(hass.states.async_entity_ids(domain)) == 1 async_fire_mqtt_message(hass, f"{topic}_2", "online") state = hass.states.get(f"{domain}.milk") assert state and state.state != STATE_UNAVAILABLE async def help_test_entity_debug_info( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test debug_info. This is a test helper for MQTT debug_info. """ await mqtt_mock_entry_no_yaml_config() # Add device settings to config config = copy.deepcopy(config[mqtt.DOMAIN][domain]) config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID) config["unique_id"] = "veryunique" config["platform"] = "mqtt" registry = dr.async_get(hass) data = json.dumps(config) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data) await hass.async_block_till_done() device = registry.async_get_device({("mqtt", "helloworld")}) assert device is not None debug_info_data = debug_info.info_for_device(hass, device.id) assert len(debug_info_data["entities"]) == 1 assert ( debug_info_data["entities"][0]["discovery_data"]["topic"] == f"homeassistant/{domain}/bla/config" ) assert debug_info_data["entities"][0]["discovery_data"]["payload"] == config assert len(debug_info_data["entities"][0]["subscriptions"]) == 1 assert {"topic": "test-topic", "messages": []} in debug_info_data["entities"][0][ "subscriptions" ] assert debug_info_data["entities"][0]["transmitted"] == [] assert len(debug_info_data["triggers"]) == 0 async def help_test_entity_debug_info_max_messages( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test debug_info message overflow. This is a test helper for MQTT debug_info. """ await mqtt_mock_entry_no_yaml_config() # Add device settings to config config = copy.deepcopy(config[mqtt.DOMAIN][domain]) config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID) config["unique_id"] = "veryunique" registry = dr.async_get(hass) data = json.dumps(config) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data) await hass.async_block_till_done() device = registry.async_get_device({("mqtt", "helloworld")}) assert device is not None debug_info_data = debug_info.info_for_device(hass, device.id) assert len(debug_info_data["entities"][0]["subscriptions"]) == 1 assert {"topic": "test-topic", "messages": []} in debug_info_data["entities"][0][ "subscriptions" ] start_dt = datetime(2019, 1, 1, 0, 0, 0) with patch("homeassistant.util.dt.utcnow") as dt_utcnow: dt_utcnow.return_value = start_dt for i in range(0, debug_info.STORED_MESSAGES + 1): async_fire_mqtt_message(hass, "test-topic", f"{i}") debug_info_data = debug_info.info_for_device(hass, device.id) assert len(debug_info_data["entities"][0]["subscriptions"]) == 1 assert ( len(debug_info_data["entities"][0]["subscriptions"][0]["messages"]) == debug_info.STORED_MESSAGES ) messages = [ { "payload": f"{i}", "qos": 0, "retain": False, "time": start_dt, "topic": "test-topic", } for i in range(1, debug_info.STORED_MESSAGES + 1) ] assert {"topic": "test-topic", "messages": messages} in debug_info_data["entities"][ 0 ]["subscriptions"] async def help_test_entity_debug_info_message( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, service: str, command_topic: str | None = None, command_payload: str | None = None, state_topic: str | object | None = _SENTINEL, state_payload: str | None = None, service_parameters: dict[str, Any] | None = None, ) -> None: """Test debug_info. This is a test helper for MQTT debug_info. """ # Add device settings to config await mqtt_mock_entry_no_yaml_config() config = copy.deepcopy(config[mqtt.DOMAIN][domain]) config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID) config["unique_id"] = "veryunique" if command_topic is None: # Add default topic to config config["command_topic"] = "command-topic" command_topic = "command-topic" if command_payload is None: command_payload = "ON" if state_topic is _SENTINEL: # Add default topic to config config["state_topic"] = "state-topic" state_topic = "state-topic" if state_payload is None: state_payload = "ON" registry = dr.async_get(hass) data = json.dumps(config) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data) await hass.async_block_till_done() device = registry.async_get_device({("mqtt", "helloworld")}) assert device is not None debug_info_data = debug_info.info_for_device(hass, device.id) start_dt = datetime(2019, 1, 1, 0, 0, 0) if state_topic is not None: assert len(debug_info_data["entities"][0]["subscriptions"]) >= 1 assert {"topic": state_topic, "messages": []} in debug_info_data["entities"][0][ "subscriptions" ] with patch("homeassistant.util.dt.utcnow") as dt_utcnow: dt_utcnow.return_value = start_dt async_fire_mqtt_message(hass, str(state_topic), state_payload) debug_info_data = debug_info.info_for_device(hass, device.id) assert len(debug_info_data["entities"][0]["subscriptions"]) >= 1 assert { "topic": state_topic, "messages": [ { "payload": str(state_payload), "qos": 0, "retain": False, "time": start_dt, "topic": state_topic, } ], } in debug_info_data["entities"][0]["subscriptions"] expected_transmissions = [] if service: # Trigger an outgoing MQTT message with patch("homeassistant.util.dt.utcnow") as dt_utcnow: dt_utcnow.return_value = start_dt if service: service_data = {ATTR_ENTITY_ID: f"{domain}.test"} if service_parameters: service_data.update(service_parameters) await hass.services.async_call( domain, service, service_data, blocking=True, ) expected_transmissions = [ { "topic": command_topic, "messages": [ { "payload": str(command_payload), "qos": 0, "retain": False, "time": start_dt, "topic": command_topic, } ], } ] debug_info_data = debug_info.info_for_device(hass, device.id) assert debug_info_data["entities"][0]["transmitted"] == expected_transmissions async def help_test_entity_debug_info_remove( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test debug_info. This is a test helper for MQTT debug_info. """ await mqtt_mock_entry_no_yaml_config() # Add device settings to config config = copy.deepcopy(config[mqtt.DOMAIN][domain]) config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID) config["unique_id"] = "veryunique" config["platform"] = "mqtt" registry = dr.async_get(hass) data = json.dumps(config) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data) await hass.async_block_till_done() device = registry.async_get_device({("mqtt", "helloworld")}) assert device is not None debug_info_data = debug_info.info_for_device(hass, device.id) assert len(debug_info_data["entities"]) == 1 assert ( debug_info_data["entities"][0]["discovery_data"]["topic"] == f"homeassistant/{domain}/bla/config" ) assert debug_info_data["entities"][0]["discovery_data"]["payload"] == config assert len(debug_info_data["entities"][0]["subscriptions"]) == 1 assert {"topic": "test-topic", "messages": []} in debug_info_data["entities"][0][ "subscriptions" ] assert len(debug_info_data["triggers"]) == 0 assert debug_info_data["entities"][0]["entity_id"] == f"{domain}.test" entity_id = debug_info_data["entities"][0]["entity_id"] async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", "") await hass.async_block_till_done() debug_info_data = debug_info.info_for_device(hass, device.id) assert len(debug_info_data["entities"]) == 0 assert len(debug_info_data["triggers"]) == 0 assert entity_id not in hass.data["mqtt"].debug_info_entities async def help_test_entity_debug_info_update_entity_id( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test debug_info. This is a test helper for MQTT debug_info. """ await mqtt_mock_entry_no_yaml_config() # Add device settings to config config = copy.deepcopy(config[mqtt.DOMAIN][domain]) config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID) config["unique_id"] = "veryunique" config["platform"] = "mqtt" device_registry = dr.async_get(hass) entity_registry = er.async_get(hass) data = json.dumps(config) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data) await hass.async_block_till_done() device = device_registry.async_get_device({("mqtt", "helloworld")}) assert device is not None debug_info_data = debug_info.info_for_device(hass, device.id) assert len(debug_info_data["entities"]) == 1 assert ( debug_info_data["entities"][0]["discovery_data"]["topic"] == f"homeassistant/{domain}/bla/config" ) assert debug_info_data["entities"][0]["discovery_data"]["payload"] == config assert debug_info_data["entities"][0]["entity_id"] == f"{domain}.test" assert len(debug_info_data["entities"][0]["subscriptions"]) == 1 assert {"topic": "test-topic", "messages": []} in debug_info_data["entities"][0][ "subscriptions" ] assert len(debug_info_data["triggers"]) == 0 entity_registry.async_update_entity( f"{domain}.test", new_entity_id=f"{domain}.milk" ) await hass.async_block_till_done() await hass.async_block_till_done() debug_info_data = debug_info.info_for_device(hass, device.id) assert len(debug_info_data["entities"]) == 1 assert ( debug_info_data["entities"][0]["discovery_data"]["topic"] == f"homeassistant/{domain}/bla/config" ) assert debug_info_data["entities"][0]["discovery_data"]["payload"] == config assert debug_info_data["entities"][0]["entity_id"] == f"{domain}.milk" assert len(debug_info_data["entities"][0]["subscriptions"]) == 1 assert {"topic": "test-topic", "messages": []} in debug_info_data["entities"][0][ "subscriptions" ] assert len(debug_info_data["triggers"]) == 0 assert f"{domain}.test" not in hass.data["mqtt"].debug_info_entities async def help_test_entity_disabled_by_default( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test device registry remove.""" await mqtt_mock_entry_no_yaml_config() # Add device settings to config config = copy.deepcopy(config[mqtt.DOMAIN][domain]) config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID) config["enabled_by_default"] = False config["unique_id"] = "veryunique1" dev_registry = dr.async_get(hass) ent_registry = er.async_get(hass) # Discover a disabled entity data = json.dumps(config) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla1/config", data) await hass.async_block_till_done() entity_id = ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique1") assert entity_id is not None and hass.states.get(entity_id) is None assert dev_registry.async_get_device({("mqtt", "helloworld")}) # Discover an enabled entity, tied to the same device config["enabled_by_default"] = True config["unique_id"] = "veryunique2" data = json.dumps(config) async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla2/config", data) await hass.async_block_till_done() entity_id = ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique2") assert entity_id is not None and hass.states.get(entity_id) is not None # Remove the enabled entity, both entities and the device should be removed async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla2/config", "") await hass.async_block_till_done() assert not ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique1") assert not ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique2") assert not dev_registry.async_get_device({("mqtt", "helloworld")}) async def help_test_entity_category( hass: HomeAssistant, mqtt_mock_entry_no_yaml_config: MqttMockHAClientGenerator, domain: str, config: ConfigType, ) -> None: """Test device registry remove.""" await mqtt_mock_entry_no_yaml_config() # Add device settings to config config = copy.deepcopy(config[mqtt.DOMAIN][domain]) config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID) ent_registry = er.async_get(hass) # Discover an entity without entity category unique_id = "veryunique1" config["unique_id"] = unique_id data = json.dumps(config) async_fire_mqtt_message(hass, f"homeassistant/{domain}/{unique_id}/config", data) await hass.async_block_till_done() entity_id = ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, unique_id) assert entity_id is not None and hass.states.get(entity_id) entry = ent_registry.async_get(entity_id) assert entry is not None and entry.entity_category is None # Discover an entity with entity category set to "config" unique_id = "veryunique2" config["entity_category"] = "config" config["unique_id"] = unique_id data = json.dumps(config) async_fire_mqtt_message(hass, f"homeassistant/{domain}/{unique_id}/config", data) await hass.async_block_till_done() entity_id = ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, unique_id) assert entity_id is not None and hass.states.get(entity_id) entry = ent_registry.async_get(entity_id) assert entry is not None and entry.entity_category == "config" # Discover an entity with entity category set to "no_such_category" unique_id = "veryunique3" config["entity_category"] = "no_such_category" config["unique_id"] = unique_id data = json.dumps(config) async_fire_mqtt_message(hass, f"homeassistant/{domain}/{unique_id}/config", data) await hass.async_block_till_done() assert not ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, unique_id) async def help_test_publishing_with_custom_encoding( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, caplog: pytest.LogCaptureFixture, domain: str, config: ConfigType, service: str, topic: str, parameters: dict[str, Any] | None, payload: str, template: str | None, tpl_par: str = "value", tpl_output: str | None = None, ) -> None: """Test a service with publishing MQTT payload with different encoding.""" # prepare config for tests test_config: dict[str, dict[str, Any]] = { "test1": {"encoding": None, "cmd_tpl": False}, "test2": {"encoding": "utf-16", "cmd_tpl": False}, "test3": {"encoding": "", "cmd_tpl": False}, "test4": {"encoding": "invalid", "cmd_tpl": False}, "test5": {"encoding": "", "cmd_tpl": True}, } setup_config = [] service_data = {} for test_id, test_data in test_config.items(): test_config_setup: dict[str, Any] = copy.copy(config[mqtt.DOMAIN][domain]) test_config_setup.update( { topic: f"cmd/{test_id}", "name": f"{test_id}", } ) if test_data["encoding"] is not None: test_config_setup["encoding"] = test_data["encoding"] if template and test_data["cmd_tpl"]: test_config_setup[ template ] = f"{{{{ (('%.1f'|format({tpl_par}))[0] if is_number({tpl_par}) else {tpl_par}[0]) | ord | pack('b') }}}}" setup_config.append(test_config_setup) # setup service data service_data[test_id] = {ATTR_ENTITY_ID: f"{domain}.{test_id}"} if parameters: service_data[test_id].update(parameters) # setup test entities assert await async_setup_component( hass, mqtt.DOMAIN, {mqtt.DOMAIN: {domain: setup_config}}, ) await hass.async_block_till_done() mqtt_mock = await mqtt_mock_entry_with_yaml_config() # 1) test with default encoding await hass.services.async_call( domain, service, service_data["test1"], blocking=True, ) mqtt_mock.async_publish.assert_any_call("cmd/test1", str(payload), 0, False) mqtt_mock.async_publish.reset_mock() # 2) test with utf-16 encoding await hass.services.async_call( domain, service, service_data["test2"], blocking=True, ) mqtt_mock.async_publish.assert_any_call( "cmd/test2", str(payload).encode("utf-16"), 0, False ) mqtt_mock.async_publish.reset_mock() # 3) test with no encoding set should fail if payload is a string await hass.services.async_call( domain, service, service_data["test3"], blocking=True, ) assert ( f"Can't pass-through payload for publishing {payload} on cmd/test3 with no encoding set, need 'bytes'" in caplog.text ) # 4) test with invalid encoding set should fail await hass.services.async_call( domain, service, service_data["test4"], blocking=True, ) assert ( f"Can't encode payload for publishing {payload} on cmd/test4 with encoding invalid" in caplog.text ) # 5) test with command template and raw encoding if specified if not template: return await hass.services.async_call( domain, service, service_data["test5"], blocking=True, ) mqtt_mock.async_publish.assert_any_call( "cmd/test5", tpl_output or str(payload)[0].encode("utf-8"), 0, False ) mqtt_mock.async_publish.reset_mock() async def help_test_reload_with_config( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, tmp_path: Path, config: ConfigType, ) -> None: """Test reloading with supplied config.""" new_yaml_config_file = tmp_path / "configuration.yaml" new_yaml_config = yaml.dump(config) new_yaml_config_file.write_text(new_yaml_config) assert new_yaml_config_file.read_text() == new_yaml_config with patch.object(hass_config, "YAML_CONFIG_FILE", new_yaml_config_file): await hass.services.async_call( "mqtt", SERVICE_RELOAD, {}, blocking=True, ) await hass.async_block_till_done() async def help_test_entry_reload_with_new_config( hass: HomeAssistant, tmp_path: Path, new_config: ConfigType ) -> None: """Test reloading with supplied config.""" mqtt_config_entry = hass.config_entries.async_entries(mqtt.DOMAIN)[0] assert mqtt_config_entry.state is ConfigEntryState.LOADED new_yaml_config_file = tmp_path / "configuration.yaml" new_yaml_config = yaml.dump(new_config) new_yaml_config_file.write_text(new_yaml_config) assert new_yaml_config_file.read_text() == new_yaml_config with patch.object(hass_config, "YAML_CONFIG_FILE", new_yaml_config_file), patch( "paho.mqtt.client.Client" ) as mock_client: mock_client().connect = lambda *args: 0 # reload the config entry assert await hass.config_entries.async_reload(mqtt_config_entry.entry_id) assert mqtt_config_entry.state is ConfigEntryState.LOADED await hass.async_block_till_done() async def help_test_reloadable( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, caplog: pytest.LogCaptureFixture, tmp_path: Path, domain: str, config: ConfigType, ) -> None: """Test reloading an MQTT platform.""" config = copy.deepcopy(config[mqtt.DOMAIN][domain]) # Create and test an old config of 2 entities based on the config supplied old_config_1 = copy.deepcopy(config) old_config_1["name"] = "test_old_1" old_config_2 = copy.deepcopy(config) old_config_2["name"] = "test_old_2" old_config = { mqtt.DOMAIN: {domain: [old_config_1, old_config_2]}, } assert await async_setup_component(hass, mqtt.DOMAIN, old_config) await hass.async_block_till_done() await mqtt_mock_entry_with_yaml_config() assert hass.states.get(f"{domain}.test_old_1") assert hass.states.get(f"{domain}.test_old_2") assert len(hass.states.async_all(domain)) == 2 # Create temporary fixture for configuration.yaml based on the supplied config and # test a reload with this new config new_config_1 = copy.deepcopy(config) new_config_1["name"] = "test_new_1" new_config_2 = copy.deepcopy(config) new_config_2["name"] = "test_new_2" new_config_extra = copy.deepcopy(config) new_config_extra["name"] = "test_new_3" new_config = { mqtt.DOMAIN: {domain: [new_config_1, new_config_2, new_config_extra]}, } await help_test_reload_with_config(hass, caplog, tmp_path, new_config) assert len(hass.states.async_all(domain)) == 3 assert hass.states.get(f"{domain}.test_new_1") assert hass.states.get(f"{domain}.test_new_2") assert hass.states.get(f"{domain}.test_new_3") async def help_test_setup_manual_entity_from_yaml( hass: HomeAssistant, config: ConfigType ) -> None: """Help to test setup from yaml through configuration entry.""" calls = MagicMock() async def mock_reload(hass: HomeAssistant) -> None: """Mock reload.""" calls() assert await async_setup_component(hass, mqtt.DOMAIN, config) # Mock config entry entry = MockConfigEntry(domain=mqtt.DOMAIN, data={mqtt.CONF_BROKER: "test-broker"}) entry.add_to_hass(hass) with patch( "homeassistant.components.mqtt.async_reload_manual_mqtt_items", side_effect=mock_reload, ), patch("paho.mqtt.client.Client") as mock_client: mock_client().connect = lambda *args: 0 assert await hass.config_entries.async_setup(entry.entry_id) await hass.async_block_till_done() calls.assert_called_once() async def help_test_unload_config_entry( hass: HomeAssistant, tmp_path: Path, newconfig: ConfigType ) -> None: """Test unloading the MQTT config entry.""" mqtt_config_entry = hass.config_entries.async_entries(mqtt.DOMAIN)[0] assert mqtt_config_entry.state is ConfigEntryState.LOADED new_yaml_config_file = tmp_path / "configuration.yaml" new_yaml_config = yaml.dump(newconfig) new_yaml_config_file.write_text(new_yaml_config) with patch.object(hass_config, "YAML_CONFIG_FILE", new_yaml_config_file): assert await hass.config_entries.async_unload(mqtt_config_entry.entry_id) # work-a-round mypy bug https://github.com/python/mypy/issues/9005#issuecomment-1280985006 updated_config_entry = mqtt_config_entry assert updated_config_entry.state is ConfigEntryState.NOT_LOADED await hass.async_block_till_done() async def help_test_unload_config_entry_with_platform( hass: HomeAssistant, mqtt_mock_entry_with_yaml_config: MqttMockHAClientGenerator, tmp_path: Path, domain: str, config: dict[str, dict[str, Any]], ) -> None: """Test unloading the MQTT config entry with a specific platform domain.""" # prepare setup through configuration.yaml config_setup: dict[str, dict[str, Any]] = copy.deepcopy(config) config_setup[mqtt.DOMAIN][domain]["name"] = "config_setup" config_name = config_setup assert await async_setup_component(hass, mqtt.DOMAIN, config_setup) await hass.async_block_till_done() await mqtt_mock_entry_with_yaml_config() # prepare setup through discovery discovery_setup = copy.deepcopy(config[mqtt.DOMAIN][domain]) discovery_setup["name"] = "discovery_setup" async_fire_mqtt_message( hass, f"homeassistant/{domain}/bla/config", json.dumps(discovery_setup) ) await hass.async_block_till_done() # check if both entities were setup correctly config_setup_entity = hass.states.get(f"{domain}.config_setup") assert config_setup_entity discovery_setup_entity = hass.states.get(f"{domain}.discovery_setup") assert discovery_setup_entity await help_test_unload_config_entry(hass, tmp_path, config_setup) async_fire_mqtt_message( hass, f"homeassistant/{domain}/bla/config", json.dumps(discovery_setup) ) await hass.async_block_till_done() # check if both entities were unloaded correctly config_setup_entity = hass.states.get(f"{domain}.{config_name}") assert config_setup_entity is None discovery_setup_entity = hass.states.get(f"{domain}.discovery_setup") assert discovery_setup_entity is None async def help_test_discovery_setup( hass: HomeAssistant, domain: str, discovery_data_payload: str, name: str ) -> None: """Test setting up an MQTT entity using discovery.""" async_fire_mqtt_message( hass, f"homeassistant/{domain}/{name}/config", discovery_data_payload ) await hass.async_block_till_done() state = hass.states.get(f"{domain}.{name}") assert state and state.state is not None