core/tests/components/mqtt/test_common.py

1924 lines
65 KiB
Python
Raw Normal View History

"""Common test objects."""
from collections.abc import Iterable
from contextlib import suppress
2020-03-17 07:09:19 +00:00
import copy
from datetime import datetime
import json
from pathlib import Path
from typing import Any
from unittest.mock import ANY, MagicMock, patch
import pytest
import voluptuous as vol
import yaml
from homeassistant import config as module_hass_config
from homeassistant.components import mqtt
from homeassistant.components.mqtt import debug_info
from homeassistant.components.mqtt.config_integration import PLATFORM_CONFIG_SCHEMA_BASE
from homeassistant.components.mqtt.const import MQTT_DISCONNECTED
from homeassistant.components.mqtt.mixins import MQTT_ATTRIBUTES_BLOCKED
from homeassistant.components.mqtt.models import PublishPayloadType
from homeassistant.config import async_log_exception
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import (
ATTR_ASSUMED_STATE,
ATTR_ENTITY_ID,
SERVICE_RELOAD,
STATE_UNAVAILABLE,
)
from homeassistant.core import HomeAssistant
2022-09-26 06:07:50 +00:00
from homeassistant.generated.mqtt import MQTT
from homeassistant.helpers import (
config_validation as cv,
device_registry as dr,
entity_registry as er,
)
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from tests.common import MockConfigEntry, async_fire_mqtt_message
from tests.typing import MqttMockHAClient, MqttMockHAClientGenerator, MqttMockPahoClient
2020-03-17 07:09:19 +00:00
DEFAULT_CONFIG_DEVICE_INFO_ID = {
"identifiers": ["helloworld"],
"manufacturer": "Whatever",
"name": "Beer",
"model": "Glass",
"hw_version": "rev1",
2020-03-17 07:09:19 +00:00
"sw_version": "0.1-beta",
"suggested_area": "default_area",
"configuration_url": "http://example.com",
2020-03-17 07:09:19 +00:00
}
DEFAULT_CONFIG_DEVICE_INFO_MAC = {
"connections": [[dr.CONNECTION_NETWORK_MAC, "02:5b:26:a8:dc:12"]],
2020-03-17 07:09:19 +00:00
"manufacturer": "Whatever",
"name": "Beer",
"model": "Glass",
"hw_version": "rev1",
2020-03-17 07:09:19 +00:00
"sw_version": "0.1-beta",
"suggested_area": "default_area",
"configuration_url": "http://example.com",
2020-03-17 07:09:19 +00:00
}
2022-02-04 16:35:32 +00:00
_SENTINEL = object()
2022-09-26 06:07:50 +00:00
DISCOVERY_COUNT = len(MQTT)
_MqttMessageType = list[tuple[str, str]]
_AttributesType = list[tuple[str, Any]]
_StateDataType = list[tuple[_MqttMessageType, str | None, _AttributesType | None]]
MQTT_YAML_SCHEMA = vol.Schema({mqtt.DOMAIN: PLATFORM_CONFIG_SCHEMA_BASE})
def help_test_validate_platform_config(
hass: HomeAssistant, config: ConfigType
) -> ConfigType | None:
"""Test the schema validation."""
try:
# validate the schema
MQTT_YAML_SCHEMA(config)
return True
except vol.Error as exc:
# log schema exceptions
async_log_exception(exc, mqtt.DOMAIN, config, hass)
return False
2020-03-17 07:09:19 +00:00
async def help_setup_component(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator | None,
domain: str,
config: ConfigType,
use_discovery: bool = False,
) -> MqttMockHAClient | None:
"""Help to set up the MQTT component."""
# `async_setup_component` will call `async_setup` and
# after that it will also start the entry `async_start_entry`
# when `async_setup` removed mqtt_mock_entry_with_no_config should be awaited.
if use_discovery:
comp_config = cv.ensure_list(config[mqtt.DOMAIN][domain])
item = 0
assert mqtt_mock_entry is not None
mqtt_mock = await mqtt_mock_entry()
for comp in comp_config:
item += 1
topic = f"homeassistant/{domain}/item_{item}/config"
async_fire_mqtt_message(hass, topic, json.dumps(comp))
await hass.async_block_till_done()
else:
entry = MockConfigEntry(
domain=mqtt.DOMAIN, data={mqtt.CONF_BROKER: "test-broker"}
)
entry.add_to_hass(hass)
with patch("homeassistant.config.load_yaml_config_file", return_value=config):
await entry.async_setup(hass)
mqtt_mock = None
return mqtt_mock
def help_custom_config(
mqtt_entity_domain: str,
mqtt_base_config: ConfigType,
mqtt_entity_configs: Iterable[ConfigType,],
) -> ConfigType:
"""Tweak a default config for parametrization.
Returns a custom config to be used as parametrization for with hass_config,
based on the supplied mqtt_base_config and updated with mqtt_entity_configs.
For each item in mqtt_entity_configs an entity instance is added to the config.
"""
config: ConfigType = copy.deepcopy(mqtt_base_config)
entity_instances: list[ConfigType] = []
for instance in mqtt_entity_configs:
base: ConfigType = copy.deepcopy(
mqtt_base_config[mqtt.DOMAIN][mqtt_entity_domain]
)
base.update(instance)
entity_instances.append(base)
config[mqtt.DOMAIN][mqtt_entity_domain]: list[ConfigType] = entity_instances
return config
async def help_test_availability_when_connection_lost(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
) -> None:
"""Test availability after MQTT disconnection."""
mqtt_mock = await mqtt_mock_entry()
await hass.async_block_till_done()
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
mqtt_mock.connected = False
async_dispatcher_send(hass, MQTT_DISCONNECTED)
await hass.async_block_till_done()
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
async def help_test_availability_without_topic(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
) -> None:
2020-03-17 07:09:19 +00:00
"""Test availability without defined availability topic."""
assert "availability_topic" not in config[mqtt.DOMAIN][domain]
await mqtt_mock_entry()
await hass.async_block_till_done()
2020-03-17 07:09:19 +00:00
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
2020-03-17 07:09:19 +00:00
async def help_test_default_availability_payload(
hass: HomeAssistant,
mqtt_mock_entry_with_no_config: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
no_assumed_state: bool = False,
state_topic: str | None = None,
state_message: str | None = None,
) -> None:
2020-03-17 07:09:19 +00:00
"""Test availability by default payload with defined topic.
This is a test helper for the MqttAvailability mixin.
"""
# Add availability settings to config
config = copy.deepcopy(config)
config[mqtt.DOMAIN][domain]["availability_topic"] = "availability-topic"
await help_setup_component(hass, mqtt_mock_entry_with_no_config, domain, config)
2020-03-17 07:09:19 +00:00
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
2020-03-17 07:09:19 +00:00
async_fire_mqtt_message(hass, "availability-topic", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
2020-03-17 07:09:19 +00:00
if no_assumed_state:
assert not state.attributes.get(ATTR_ASSUMED_STATE)
async_fire_mqtt_message(hass, "availability-topic", "offline")
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
2020-03-17 07:09:19 +00:00
if state_topic is not None and state_message is not None:
2020-03-17 07:09:19 +00:00
async_fire_mqtt_message(hass, state_topic, state_message)
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
2020-03-17 07:09:19 +00:00
async_fire_mqtt_message(hass, "availability-topic", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
2020-03-17 07:09:19 +00:00
async def help_test_default_availability_list_payload(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
no_assumed_state: bool = False,
state_topic: str | None = None,
state_message: str | None = None,
) -> None:
"""Test availability by default payload with defined topic.
This is a test helper for the MqttAvailability mixin.
"""
# Add availability settings to config
config = copy.deepcopy(config)
config[mqtt.DOMAIN][domain]["availability"] = [
{"topic": "availability-topic1"},
{"topic": "availability-topic2"},
]
await help_setup_component(hass, mqtt_mock_entry, domain, config)
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
async_fire_mqtt_message(hass, "availability-topic1", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
if no_assumed_state:
assert not state.attributes.get(ATTR_ASSUMED_STATE)
async_fire_mqtt_message(hass, "availability-topic1", "offline")
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
async_fire_mqtt_message(hass, "availability-topic2", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
if no_assumed_state:
assert not state.attributes.get(ATTR_ASSUMED_STATE)
async_fire_mqtt_message(hass, "availability-topic2", "offline")
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
if state_topic is not None and state_message is not None:
async_fire_mqtt_message(hass, state_topic, state_message)
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
async_fire_mqtt_message(hass, "availability-topic1", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
async def help_test_default_availability_list_payload_all(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
no_assumed_state: bool = False,
state_topic: str | None = None,
state_message: str | None = None,
) -> None:
"""Test availability by default payload with defined topic.
This is a test helper for the MqttAvailability mixin.
"""
# Add availability settings to config
config = copy.deepcopy(config)
config[mqtt.DOMAIN][domain]["availability_mode"] = "all"
config[mqtt.DOMAIN][domain]["availability"] = [
{"topic": "availability-topic1"},
{"topic": "availability-topic2"},
]
await help_setup_component(hass, mqtt_mock_entry, domain, config)
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
async_fire_mqtt_message(hass, "availability-topic1", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
if no_assumed_state:
assert not state.attributes.get(ATTR_ASSUMED_STATE)
async_fire_mqtt_message(hass, "availability-topic2", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
async_fire_mqtt_message(hass, "availability-topic2", "offline")
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
if no_assumed_state:
assert not state.attributes.get(ATTR_ASSUMED_STATE)
async_fire_mqtt_message(hass, "availability-topic2", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
async_fire_mqtt_message(hass, "availability-topic1", "offline")
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
if no_assumed_state:
assert not state.attributes.get(ATTR_ASSUMED_STATE)
async_fire_mqtt_message(hass, "availability-topic1", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
async def help_test_default_availability_list_payload_any(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
no_assumed_state: bool = False,
state_topic: str | None = None,
state_message: str | None = None,
) -> None:
"""Test availability by default payload with defined topic.
This is a test helper for the MqttAvailability mixin.
"""
# Add availability settings to config
config = copy.deepcopy(config)
config[mqtt.DOMAIN][domain]["availability_mode"] = "any"
config[mqtt.DOMAIN][domain]["availability"] = [
{"topic": "availability-topic1"},
{"topic": "availability-topic2"},
]
await help_setup_component(hass, mqtt_mock_entry, domain, config)
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
async_fire_mqtt_message(hass, "availability-topic1", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
if no_assumed_state:
assert not state.attributes.get(ATTR_ASSUMED_STATE)
async_fire_mqtt_message(hass, "availability-topic2", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
async_fire_mqtt_message(hass, "availability-topic2", "offline")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
if no_assumed_state:
assert not state.attributes.get(ATTR_ASSUMED_STATE)
async_fire_mqtt_message(hass, "availability-topic1", "offline")
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
async_fire_mqtt_message(hass, "availability-topic1", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
if no_assumed_state:
assert not state.attributes.get(ATTR_ASSUMED_STATE)
async def help_test_default_availability_list_single(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
domain: str,
config: ConfigType,
) -> None:
"""Test availability list and availability_topic are mutually exclusive.
This is a test helper for the MqttAvailability mixin.
"""
# Add availability settings to config
config = copy.deepcopy(config)
config[mqtt.DOMAIN][domain]["availability"] = [
{"topic": "availability-topic1"},
]
config[mqtt.DOMAIN][domain]["availability_topic"] = "availability-topic"
help_test_validate_platform_config(hass, config)
assert (
"Invalid config for [mqtt]: two or more values in the same group of exclusion 'availability'"
in caplog.text
)
2020-03-17 07:09:19 +00:00
async def help_test_custom_availability_payload(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
no_assumed_state: bool = False,
state_topic: str | None = None,
state_message: str | None = None,
) -> None:
2020-03-17 07:09:19 +00:00
"""Test availability by custom payload with defined topic.
This is a test helper for the MqttAvailability mixin.
"""
# Add availability settings to config
config = copy.deepcopy(config)
config[mqtt.DOMAIN][domain]["availability_topic"] = "availability-topic"
config[mqtt.DOMAIN][domain]["payload_available"] = "good"
config[mqtt.DOMAIN][domain]["payload_not_available"] = "nogood"
await help_setup_component(hass, mqtt_mock_entry, domain, config)
2020-03-17 07:09:19 +00:00
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
2020-03-17 07:09:19 +00:00
async_fire_mqtt_message(hass, "availability-topic", "good")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
2020-03-17 07:09:19 +00:00
if no_assumed_state:
assert not state.attributes.get(ATTR_ASSUMED_STATE)
async_fire_mqtt_message(hass, "availability-topic", "nogood")
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
2020-03-17 07:09:19 +00:00
if state_topic is not None and state_message is not None:
2020-03-17 07:09:19 +00:00
async_fire_mqtt_message(hass, state_topic, state_message)
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
2020-03-17 07:09:19 +00:00
async_fire_mqtt_message(hass, "availability-topic", "good")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
2020-03-17 07:09:19 +00:00
async def help_test_discovery_update_availability(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
) -> None:
"""Test update of discovered MQTTAvailability.
This is a test helper for the MQTTAvailability mixin.
"""
await mqtt_mock_entry()
# Add availability settings to config
config1 = copy.deepcopy(config)
config1[mqtt.DOMAIN][domain]["availability_topic"] = "availability-topic1"
config2 = copy.deepcopy(config)
config2[mqtt.DOMAIN][domain]["availability"] = [
{"topic": "availability-topic2"},
{"topic": "availability-topic3"},
]
config3 = copy.deepcopy(config)
config3[mqtt.DOMAIN][domain]["availability_topic"] = "availability-topic4"
data1 = json.dumps(config1[mqtt.DOMAIN][domain])
data2 = json.dumps(config2[mqtt.DOMAIN][domain])
data3 = json.dumps(config3[mqtt.DOMAIN][domain])
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1)
await hass.async_block_till_done()
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
async_fire_mqtt_message(hass, "availability-topic1", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
async_fire_mqtt_message(hass, "availability-topic1", "offline")
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
# Change availability_topic
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data2)
await hass.async_block_till_done()
# Verify we are no longer subscribing to the old topic
async_fire_mqtt_message(hass, "availability-topic1", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
# Verify we are subscribing to the new topic
async_fire_mqtt_message(hass, "availability-topic2", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
# Verify we are subscribing to the new topic
async_fire_mqtt_message(hass, "availability-topic3", "offline")
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
# Change availability_topic
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data3)
await hass.async_block_till_done()
# Verify we are no longer subscribing to the old topic
async_fire_mqtt_message(hass, "availability-topic2", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
# Verify we are no longer subscribing to the old topic
async_fire_mqtt_message(hass, "availability-topic3", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
# Verify we are subscribing to the new topic
async_fire_mqtt_message(hass, "availability-topic4", "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
async def help_test_setting_attribute_via_mqtt_json_message(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
) -> None:
"""Test the setting of attribute via MQTT with JSON payload.
This is a test helper for the MqttAttributes mixin.
"""
2020-03-17 07:09:19 +00:00
# Add JSON attributes settings to config
config = copy.deepcopy(config)
config[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic"
await help_setup_component(hass, mqtt_mock_entry, domain, config)
async_fire_mqtt_message(hass, "attr-topic", '{ "val": "100" }')
state = hass.states.get(f"{domain}.test")
assert state and state.attributes.get("val") == "100"
async def help_test_setting_blocked_attribute_via_mqtt_json_message(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
extra_blocked_attributes: frozenset[str] | None,
) -> None:
"""Test the setting of blocked attribute via MQTT with JSON payload.
This is a test helper for the MqttAttributes mixin.
"""
await mqtt_mock_entry()
extra_blocked_attribute_list = list(extra_blocked_attributes or [])
# Add JSON attributes settings to config
config = copy.deepcopy(config)
config[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic"
data = json.dumps(config[mqtt.DOMAIN][domain])
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
await hass.async_block_till_done()
val = "abc123"
for attr in MQTT_ATTRIBUTES_BLOCKED:
async_fire_mqtt_message(hass, "attr-topic", json.dumps({attr: val}))
state = hass.states.get(f"{domain}.test")
assert state and state.attributes.get(attr) != val
for attr in extra_blocked_attribute_list:
async_fire_mqtt_message(hass, "attr-topic", json.dumps({attr: val}))
state = hass.states.get(f"{domain}.test")
assert state and state.attributes.get(attr) != val
async def help_test_setting_attribute_with_template(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
) -> None:
2020-03-17 07:09:19 +00:00
"""Test the setting of attribute via MQTT with JSON payload.
This is a test helper for the MqttAttributes mixin.
"""
# Add JSON attributes settings to config
config = copy.deepcopy(config)
config[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic"
config[mqtt.DOMAIN][domain][
"json_attributes_template"
] = "{{ value_json['Timer1'] | tojson }}"
await help_setup_component(hass, mqtt_mock_entry, domain, config)
2020-03-17 07:09:19 +00:00
async_fire_mqtt_message(
hass, "attr-topic", json.dumps({"Timer1": {"Arm": 0, "Time": "22:18"}})
)
state = hass.states.get(f"{domain}.test")
assert state is not None
2020-03-17 07:09:19 +00:00
assert state.attributes.get("Arm") == 0
assert state.attributes.get("Time") == "22:18"
async def help_test_update_with_json_attrs_not_dict(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
caplog: pytest.LogCaptureFixture,
domain: str,
config: ConfigType,
) -> None:
"""Test attributes get extracted from a JSON result.
This is a test helper for the MqttAttributes mixin.
"""
2020-03-17 07:09:19 +00:00
# Add JSON attributes settings to config
config = copy.deepcopy(config)
config[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic"
await help_setup_component(hass, mqtt_mock_entry, domain, config)
async_fire_mqtt_message(hass, "attr-topic", '[ "list", "of", "things"]')
state = hass.states.get(f"{domain}.test")
assert state and state.attributes.get("val") is None
assert "JSON result was not a dictionary" in caplog.text
async def help_test_update_with_json_attrs_bad_json(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
caplog: pytest.LogCaptureFixture,
domain: str,
config: ConfigType,
) -> None:
"""Test JSON validation of attributes.
This is a test helper for the MqttAttributes mixin.
"""
2020-03-17 07:09:19 +00:00
# Add JSON attributes settings to config
config = copy.deepcopy(config)
config[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic"
await help_setup_component(hass, mqtt_mock_entry, domain, config)
async_fire_mqtt_message(hass, "attr-topic", "This is not JSON")
state = hass.states.get(f"{domain}.test")
assert state and state.attributes.get("val") is None
assert "Erroneous JSON: This is not JSON" in caplog.text
async def help_test_discovery_update_attr(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
caplog: pytest.LogCaptureFixture,
domain: str,
config: ConfigType,
) -> None:
"""Test update of discovered MQTTAttributes.
This is a test helper for the MqttAttributes mixin.
"""
await mqtt_mock_entry()
2020-03-17 07:09:19 +00:00
# Add JSON attributes settings to config
config1 = copy.deepcopy(config)
config1[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic1"
2020-03-17 07:09:19 +00:00
config2 = copy.deepcopy(config)
config2[mqtt.DOMAIN][domain]["json_attributes_topic"] = "attr-topic2"
data1 = json.dumps(config1[mqtt.DOMAIN][domain])
data2 = json.dumps(config2[mqtt.DOMAIN][domain])
2020-03-17 07:09:19 +00:00
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1)
await hass.async_block_till_done()
async_fire_mqtt_message(hass, "attr-topic1", '{ "val": "100" }')
state = hass.states.get(f"{domain}.test")
assert state and state.attributes.get("val") == "100"
# Change json_attributes_topic
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data2)
await hass.async_block_till_done()
# Verify we are no longer subscribing to the old topic
async_fire_mqtt_message(hass, "attr-topic1", '{ "val": "50" }')
state = hass.states.get(f"{domain}.test")
assert state and state.attributes.get("val") == "100"
# Verify we are subscribing to the new topic
async_fire_mqtt_message(hass, "attr-topic2", '{ "val": "75" }')
state = hass.states.get(f"{domain}.test")
assert state and state.attributes.get("val") == "75"
async def help_test_unique_id(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
) -> None:
"""Test unique id option only creates one entity per unique_id."""
await mqtt_mock_entry()
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids(domain)) == 1
async def help_test_discovery_removal(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
caplog: pytest.LogCaptureFixture,
domain: str,
data: str,
) -> None:
"""Test removal of discovered component.
This is a test helper for the MqttDiscoveryUpdate mixin.
"""
await mqtt_mock_entry()
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
await hass.async_block_till_done()
state = hass.states.get(f"{domain}.test")
assert state is not None
assert state.name == "test"
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", "")
await hass.async_block_till_done()
state = hass.states.get(f"{domain}.test")
assert state is None
async def help_test_discovery_update(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
caplog,
domain,
discovery_config1: DiscoveryInfoType,
discovery_config2: DiscoveryInfoType,
state_data1: _StateDataType | None = None,
state_data2: _StateDataType | None = None,
) -> None:
"""Test update of discovered component.
This is a test helper for the MqttDiscoveryUpdate mixin.
"""
await mqtt_mock_entry()
# Add some future configuration to the configurations
config1 = copy.deepcopy(discovery_config1)
config1["some_future_option_1"] = "future_option_1"
config2 = copy.deepcopy(discovery_config2)
config2["some_future_option_2"] = "future_option_2"
discovery_data1 = json.dumps(config1)
discovery_data2 = json.dumps(config2)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", discovery_data1)
await hass.async_block_till_done()
state = hass.states.get(f"{domain}.beer")
assert state is not None
assert state.name == "Beer"
if state_data1:
2023-02-02 17:35:24 +00:00
for mqtt_messages, expected_state, attributes in state_data1:
for topic, data in mqtt_messages:
async_fire_mqtt_message(hass, topic, data)
state = hass.states.get(f"{domain}.beer")
assert state is not None
if expected_state:
assert state.state == expected_state
if attributes:
2023-02-02 17:35:24 +00:00
for attr, value in attributes:
assert state.attributes.get(attr) == value
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", discovery_data2)
await hass.async_block_till_done()
state = hass.states.get(f"{domain}.beer")
assert state is not None
assert state.name == "Milk"
if state_data2:
2023-02-02 17:35:24 +00:00
for mqtt_messages, expected_state, attributes in state_data2:
for topic, data in mqtt_messages:
async_fire_mqtt_message(hass, topic, data)
state = hass.states.get(f"{domain}.beer")
assert state is not None
if expected_state:
assert state.state == expected_state
if attributes:
2023-02-02 17:35:24 +00:00
for attr, value in attributes:
assert state.attributes.get(attr) == value
state = hass.states.get(f"{domain}.milk")
assert state is None
async def help_test_discovery_update_unchanged(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
caplog: pytest.LogCaptureFixture,
domain: str,
data1: str,
discovery_update: MagicMock,
) -> None:
"""Test update of discovered component without changes.
This is a test helper for the MqttDiscoveryUpdate mixin.
"""
await mqtt_mock_entry()
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1)
await hass.async_block_till_done()
state = hass.states.get(f"{domain}.beer")
assert state is not None
assert state.name == "Beer"
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1)
await hass.async_block_till_done()
assert not discovery_update.called
async def help_test_discovery_broken(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
caplog: pytest.LogCaptureFixture,
domain: str,
data1: str,
data2: str,
) -> None:
"""Test handling of bad discovery message."""
await mqtt_mock_entry()
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data1)
await hass.async_block_till_done()
state = hass.states.get(f"{domain}.beer")
assert state is None
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data2)
await hass.async_block_till_done()
state = hass.states.get(f"{domain}.milk")
assert state is not None
assert state.name == "Milk"
state = hass.states.get(f"{domain}.beer")
assert state is None
async def help_test_encoding_subscribable_topics(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
topic: str,
value: Any,
attribute: str | None = None,
attribute_value: Any = None,
init_payload: tuple[str, str] | None = None,
skip_raw_test: bool = False,
) -> None:
"""Test handling of incoming encoded payload."""
async def _test_encoding(
hass: HomeAssistant,
entity_id,
topic,
encoded_value,
attribute,
init_payload_topic,
init_payload_value,
) -> Any:
state = hass.states.get(entity_id)
if init_payload_value:
# Sometimes a device needs to have an initialization pay load, e.g. to switch the device on.
async_fire_mqtt_message(hass, init_payload_topic, init_payload_value)
await hass.async_block_till_done()
state = hass.states.get(entity_id)
async_fire_mqtt_message(hass, topic, encoded_value)
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state is not None
if attribute:
return state.attributes.get(attribute)
return state.state if state else None
init_payload_value_utf8 = None
init_payload_value_utf16 = None
# setup test1 default encoding
config1 = copy.deepcopy(config)
if domain == "device_tracker":
config1["unique_id"] = "test1"
else:
config1["name"] = "test1"
config1[topic] = "topic/test1"
# setup test2 alternate encoding
config2 = copy.deepcopy(config)
if domain == "device_tracker":
config2["unique_id"] = "test2"
else:
config2["name"] = "test2"
config2["encoding"] = "utf-16"
config2[topic] = "topic/test2"
# setup test3 raw encoding
config3 = copy.deepcopy(config)
if domain == "device_tracker":
config3["unique_id"] = "test3"
else:
config3["name"] = "test3"
config3["encoding"] = ""
config3[topic] = "topic/test3"
if init_payload:
config1[init_payload[0]] = "topic/init_payload1"
config2[init_payload[0]] = "topic/init_payload2"
config3[init_payload[0]] = "topic/init_payload3"
init_payload_value_utf8 = init_payload[1].encode("utf-8")
init_payload_value_utf16 = init_payload[1].encode("utf-16")
await mqtt_mock_entry()
async_fire_mqtt_message(
hass, f"homeassistant/{domain}/item1/config", json.dumps(config1)
)
async_fire_mqtt_message(
hass, f"homeassistant/{domain}/item2/config", json.dumps(config2)
)
async_fire_mqtt_message(
hass, f"homeassistant/{domain}/item3/config", json.dumps(config3)
)
await hass.async_block_till_done()
expected_result = attribute_value or value
# test1 default encoding
assert (
await _test_encoding(
hass,
f"{domain}.test1",
"topic/test1",
value.encode("utf-8"),
attribute,
"topic/init_payload1",
init_payload_value_utf8,
)
== expected_result
)
# test2 alternate encoding
assert (
await _test_encoding(
hass,
f"{domain}.test2",
"topic/test2",
value.encode("utf-16"),
attribute,
"topic/init_payload2",
init_payload_value_utf16,
)
== expected_result
)
# test3 raw encoded input
if skip_raw_test:
return
with suppress(AttributeError, TypeError, ValueError):
result = await _test_encoding(
hass,
f"{domain}.test3",
"topic/test3",
value.encode("utf-16"),
attribute,
"topic/init_payload3",
init_payload_value_utf16,
)
assert result != expected_result
async def help_test_entity_device_info_with_identifier(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
) -> None:
"""Test device registry integration.
This is a test helper for the MqttDiscoveryUpdate mixin.
"""
await mqtt_mock_entry()
2020-03-17 07:09:19 +00:00
# Add device settings to config
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
2020-03-17 07:09:19 +00:00
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
config["unique_id"] = "veryunique"
registry = dr.async_get(hass)
data = json.dumps(config)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
await hass.async_block_till_done()
device = registry.async_get_device({("mqtt", "helloworld")})
assert device is not None
assert device.identifiers == {("mqtt", "helloworld")}
2020-03-17 07:09:19 +00:00
assert device.manufacturer == "Whatever"
assert device.name == "Beer"
assert device.model == "Glass"
assert device.hw_version == "rev1"
2020-03-17 07:09:19 +00:00
assert device.sw_version == "0.1-beta"
assert device.suggested_area == "default_area"
assert device.configuration_url == "http://example.com"
2020-03-17 07:09:19 +00:00
async def help_test_entity_device_info_with_connection(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
) -> None:
2020-03-17 07:09:19 +00:00
"""Test device registry integration.
This is a test helper for the MqttDiscoveryUpdate mixin.
"""
await mqtt_mock_entry()
2020-03-17 07:09:19 +00:00
# Add device settings to config
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
2020-03-17 07:09:19 +00:00
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_MAC)
config["unique_id"] = "veryunique"
registry = dr.async_get(hass)
2020-03-17 07:09:19 +00:00
data = json.dumps(config)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
await hass.async_block_till_done()
device = registry.async_get_device(
set(), {(dr.CONNECTION_NETWORK_MAC, "02:5b:26:a8:dc:12")}
)
2020-03-17 07:09:19 +00:00
assert device is not None
assert device.connections == {(dr.CONNECTION_NETWORK_MAC, "02:5b:26:a8:dc:12")}
assert device.manufacturer == "Whatever"
assert device.name == "Beer"
assert device.model == "Glass"
assert device.hw_version == "rev1"
assert device.sw_version == "0.1-beta"
assert device.suggested_area == "default_area"
assert device.configuration_url == "http://example.com"
async def help_test_entity_device_info_remove(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
) -> None:
"""Test device registry remove."""
await mqtt_mock_entry()
2020-03-17 07:09:19 +00:00
# Add device settings to config
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
2020-03-17 07:09:19 +00:00
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
config["unique_id"] = "veryunique"
dev_registry = dr.async_get(hass)
ent_registry = er.async_get(hass)
data = json.dumps(config)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
await hass.async_block_till_done()
device = dev_registry.async_get_device({("mqtt", "helloworld")})
assert device is not None
assert ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique")
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", "")
await hass.async_block_till_done()
device = dev_registry.async_get_device({("mqtt", "helloworld")})
assert device is None
assert not ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique")
async def help_test_entity_device_info_update(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
) -> None:
"""Test device registry update.
This is a test helper for the MqttDiscoveryUpdate mixin.
"""
await mqtt_mock_entry()
2020-03-17 07:09:19 +00:00
# Add device settings to config
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
2020-03-17 07:09:19 +00:00
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
config["unique_id"] = "veryunique"
registry = dr.async_get(hass)
data = json.dumps(config)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
await hass.async_block_till_done()
device = registry.async_get_device({("mqtt", "helloworld")})
assert device is not None
assert device.name == "Beer"
config["device"]["name"] = "Milk"
data = json.dumps(config)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
await hass.async_block_till_done()
device = registry.async_get_device({("mqtt", "helloworld")})
assert device is not None
assert device.name == "Milk"
async def help_test_entity_id_update_subscriptions(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
topics: list[str] | None = None,
) -> None:
"""Test MQTT subscriptions are managed when entity_id is updated."""
2020-03-17 07:09:19 +00:00
# Add unique_id to config
config = copy.deepcopy(config)
config[mqtt.DOMAIN][domain]["unique_id"] = "TOTALLY_UNIQUE"
2020-03-17 07:09:19 +00:00
if topics is None:
# Add default topics to config
config[mqtt.DOMAIN][domain]["availability_topic"] = "avty-topic"
config[mqtt.DOMAIN][domain]["state_topic"] = "test-topic"
2020-03-17 07:09:19 +00:00
topics = ["avty-topic", "test-topic"]
assert len(topics) > 0
entity_registry = er.async_get(hass)
mqtt_mock = await help_setup_component(
hass, mqtt_mock_entry, domain, config, use_discovery=True
2020-08-27 11:56:20 +00:00
)
assert mqtt_mock is not None
2020-03-17 07:09:19 +00:00
state = hass.states.get(f"{domain}.test")
assert state is not None
2022-09-26 06:07:50 +00:00
assert mqtt_mock.async_subscribe.call_count == len(topics) + 2 + DISCOVERY_COUNT
2020-03-17 07:09:19 +00:00
for topic in topics:
mqtt_mock.async_subscribe.assert_any_call(topic, ANY, ANY, ANY)
mqtt_mock.async_subscribe.reset_mock()
entity_registry.async_update_entity(
f"{domain}.test", new_entity_id=f"{domain}.milk"
)
await hass.async_block_till_done()
2020-03-17 07:09:19 +00:00
state = hass.states.get(f"{domain}.test")
assert state is None
state = hass.states.get(f"{domain}.milk")
assert state is not None
2020-03-17 07:09:19 +00:00
for topic in topics:
mqtt_mock.async_subscribe.assert_any_call(topic, ANY, ANY, ANY)
async def help_test_entity_id_update_discovery_update(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
topic: str | None = None,
) -> None:
"""Test MQTT discovery update after entity_id is updated."""
# Add unique_id to config
await mqtt_mock_entry()
config = copy.deepcopy(config)
config[mqtt.DOMAIN][domain]["unique_id"] = "TOTALLY_UNIQUE"
if topic is None:
# Add default topic to config
config[mqtt.DOMAIN][domain]["availability_topic"] = "avty-topic"
topic = "avty-topic"
entity_registry = er.async_get(hass)
data = json.dumps(config[mqtt.DOMAIN][domain])
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
await hass.async_block_till_done()
async_fire_mqtt_message(hass, topic, "online")
state = hass.states.get(f"{domain}.test")
assert state and state.state != STATE_UNAVAILABLE
async_fire_mqtt_message(hass, topic, "offline")
state = hass.states.get(f"{domain}.test")
assert state and state.state == STATE_UNAVAILABLE
entity_registry.async_update_entity(
f"{domain}.test", new_entity_id=f"{domain}.milk"
)
await hass.async_block_till_done()
config[mqtt.DOMAIN][domain]["availability_topic"] = f"{topic}_2"
data = json.dumps(config[mqtt.DOMAIN][domain])
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids(domain)) == 1
async_fire_mqtt_message(hass, f"{topic}_2", "online")
state = hass.states.get(f"{domain}.milk")
assert state and state.state != STATE_UNAVAILABLE
async def help_test_entity_debug_info(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
) -> None:
"""Test debug_info.
This is a test helper for MQTT debug_info.
"""
await mqtt_mock_entry()
# Add device settings to config
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
config["unique_id"] = "veryunique"
config["platform"] = "mqtt"
registry = dr.async_get(hass)
data = json.dumps(config)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
await hass.async_block_till_done()
device = registry.async_get_device({("mqtt", "helloworld")})
assert device is not None
2022-02-04 16:35:32 +00:00
debug_info_data = debug_info.info_for_device(hass, device.id)
assert len(debug_info_data["entities"]) == 1
assert (
debug_info_data["entities"][0]["discovery_data"]["topic"]
== f"homeassistant/{domain}/bla/config"
)
assert debug_info_data["entities"][0]["discovery_data"]["payload"] == config
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
assert {"topic": "test-topic", "messages": []} in debug_info_data["entities"][0][
"subscriptions"
]
2022-02-04 16:35:32 +00:00
assert debug_info_data["entities"][0]["transmitted"] == []
assert len(debug_info_data["triggers"]) == 0
async def help_test_entity_debug_info_max_messages(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
) -> None:
"""Test debug_info message overflow.
This is a test helper for MQTT debug_info.
"""
await mqtt_mock_entry()
# Add device settings to config
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
config["unique_id"] = "veryunique"
registry = dr.async_get(hass)
data = json.dumps(config)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
await hass.async_block_till_done()
device = registry.async_get_device({("mqtt", "helloworld")})
assert device is not None
2022-02-04 16:35:32 +00:00
debug_info_data = debug_info.info_for_device(hass, device.id)
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
assert {"topic": "test-topic", "messages": []} in debug_info_data["entities"][0][
"subscriptions"
]
start_dt = datetime(2019, 1, 1, 0, 0, 0)
with patch("homeassistant.util.dt.utcnow") as dt_utcnow:
dt_utcnow.return_value = start_dt
for i in range(0, debug_info.STORED_MESSAGES + 1):
async_fire_mqtt_message(hass, "test-topic", f"{i}")
2022-02-04 16:35:32 +00:00
debug_info_data = debug_info.info_for_device(hass, device.id)
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
assert (
len(debug_info_data["entities"][0]["subscriptions"][0]["messages"])
== debug_info.STORED_MESSAGES
)
messages = [
{
"payload": f"{i}",
"qos": 0,
"retain": False,
"time": start_dt,
"topic": "test-topic",
}
for i in range(1, debug_info.STORED_MESSAGES + 1)
]
assert {"topic": "test-topic", "messages": messages} in debug_info_data["entities"][
0
]["subscriptions"]
async def help_test_entity_debug_info_message(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
service: str,
command_topic: str | None = None,
command_payload: str | None = None,
state_topic: str | object | None = _SENTINEL,
state_payload: str | None = None,
service_parameters: dict[str, Any] | None = None,
) -> None:
2022-02-04 16:35:32 +00:00
"""Test debug_info.
This is a test helper for MQTT debug_info.
"""
# Add device settings to config
await mqtt_mock_entry()
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
config["unique_id"] = "veryunique"
if command_topic is None:
2022-02-04 16:35:32 +00:00
# Add default topic to config
config["command_topic"] = "command-topic"
command_topic = "command-topic"
if command_payload is None:
2022-02-04 16:35:32 +00:00
command_payload = "ON"
if state_topic is _SENTINEL:
# Add default topic to config
config["state_topic"] = "state-topic"
2022-02-04 16:35:32 +00:00
state_topic = "state-topic"
if state_payload is None:
2022-02-04 16:35:32 +00:00
state_payload = "ON"
registry = dr.async_get(hass)
data = json.dumps(config)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
await hass.async_block_till_done()
device = registry.async_get_device({("mqtt", "helloworld")})
assert device is not None
2022-02-04 16:35:32 +00:00
debug_info_data = debug_info.info_for_device(hass, device.id)
start_dt = datetime(2019, 1, 1, 0, 0, 0)
2022-02-04 16:35:32 +00:00
if state_topic is not None:
assert len(debug_info_data["entities"][0]["subscriptions"]) >= 1
assert {"topic": state_topic, "messages": []} in debug_info_data["entities"][0][
"subscriptions"
]
with patch("homeassistant.util.dt.utcnow") as dt_utcnow:
dt_utcnow.return_value = start_dt
async_fire_mqtt_message(hass, str(state_topic), state_payload)
2022-02-04 16:35:32 +00:00
debug_info_data = debug_info.info_for_device(hass, device.id)
assert len(debug_info_data["entities"][0]["subscriptions"]) >= 1
assert {
"topic": state_topic,
"messages": [
{
"payload": str(state_payload),
"qos": 0,
"retain": False,
"time": start_dt,
"topic": state_topic,
}
],
} in debug_info_data["entities"][0]["subscriptions"]
expected_transmissions = []
if service:
# Trigger an outgoing MQTT message
with patch("homeassistant.util.dt.utcnow") as dt_utcnow:
dt_utcnow.return_value = start_dt
if service:
service_data = {ATTR_ENTITY_ID: f"{domain}.test"}
if service_parameters:
service_data.update(service_parameters)
await hass.services.async_call(
domain,
service,
service_data,
blocking=True,
)
expected_transmissions = [
{
2022-02-04 16:35:32 +00:00
"topic": command_topic,
"messages": [
{
"payload": str(command_payload),
"qos": 0,
"retain": False,
"time": start_dt,
"topic": command_topic,
}
],
}
2022-02-04 16:35:32 +00:00
]
debug_info_data = debug_info.info_for_device(hass, device.id)
assert debug_info_data["entities"][0]["transmitted"] == expected_transmissions
async def help_test_entity_debug_info_remove(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
) -> None:
"""Test debug_info.
This is a test helper for MQTT debug_info.
"""
await mqtt_mock_entry()
# Add device settings to config
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
config["unique_id"] = "veryunique"
config["platform"] = "mqtt"
registry = dr.async_get(hass)
data = json.dumps(config)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
await hass.async_block_till_done()
device = registry.async_get_device({("mqtt", "helloworld")})
assert device is not None
2022-02-04 16:35:32 +00:00
debug_info_data = debug_info.info_for_device(hass, device.id)
assert len(debug_info_data["entities"]) == 1
assert (
debug_info_data["entities"][0]["discovery_data"]["topic"]
== f"homeassistant/{domain}/bla/config"
)
assert debug_info_data["entities"][0]["discovery_data"]["payload"] == config
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
assert {"topic": "test-topic", "messages": []} in debug_info_data["entities"][0][
"subscriptions"
]
assert len(debug_info_data["triggers"]) == 0
assert debug_info_data["entities"][0]["entity_id"] == f"{domain}.test"
entity_id = debug_info_data["entities"][0]["entity_id"]
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", "")
await hass.async_block_till_done()
2022-02-04 16:35:32 +00:00
debug_info_data = debug_info.info_for_device(hass, device.id)
assert len(debug_info_data["entities"]) == 0
assert len(debug_info_data["triggers"]) == 0
assert entity_id not in hass.data["mqtt"].debug_info_entities
async def help_test_entity_debug_info_update_entity_id(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
) -> None:
"""Test debug_info.
This is a test helper for MQTT debug_info.
"""
await mqtt_mock_entry()
# Add device settings to config
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
config["unique_id"] = "veryunique"
config["platform"] = "mqtt"
device_registry = dr.async_get(hass)
entity_registry = er.async_get(hass)
data = json.dumps(config)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla/config", data)
await hass.async_block_till_done()
device = device_registry.async_get_device({("mqtt", "helloworld")})
assert device is not None
2022-02-04 16:35:32 +00:00
debug_info_data = debug_info.info_for_device(hass, device.id)
assert len(debug_info_data["entities"]) == 1
assert (
debug_info_data["entities"][0]["discovery_data"]["topic"]
== f"homeassistant/{domain}/bla/config"
)
assert debug_info_data["entities"][0]["discovery_data"]["payload"] == config
assert debug_info_data["entities"][0]["entity_id"] == f"{domain}.test"
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
assert {"topic": "test-topic", "messages": []} in debug_info_data["entities"][0][
"subscriptions"
]
assert len(debug_info_data["triggers"]) == 0
entity_registry.async_update_entity(
f"{domain}.test", new_entity_id=f"{domain}.milk"
)
await hass.async_block_till_done()
await hass.async_block_till_done()
2022-02-04 16:35:32 +00:00
debug_info_data = debug_info.info_for_device(hass, device.id)
assert len(debug_info_data["entities"]) == 1
assert (
debug_info_data["entities"][0]["discovery_data"]["topic"]
== f"homeassistant/{domain}/bla/config"
)
assert debug_info_data["entities"][0]["discovery_data"]["payload"] == config
assert debug_info_data["entities"][0]["entity_id"] == f"{domain}.milk"
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
assert {"topic": "test-topic", "messages": []} in debug_info_data["entities"][0][
"subscriptions"
]
assert len(debug_info_data["triggers"]) == 0
assert f"{domain}.test" not in hass.data["mqtt"].debug_info_entities
async def help_test_entity_disabled_by_default(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
) -> None:
"""Test device registry remove."""
await mqtt_mock_entry()
# Add device settings to config
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
config["enabled_by_default"] = False
config["unique_id"] = "veryunique1"
dev_registry = dr.async_get(hass)
ent_registry = er.async_get(hass)
# Discover a disabled entity
data = json.dumps(config)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla1/config", data)
await hass.async_block_till_done()
entity_id = ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique1")
assert entity_id is not None and hass.states.get(entity_id) is None
assert dev_registry.async_get_device({("mqtt", "helloworld")})
# Discover an enabled entity, tied to the same device
config["enabled_by_default"] = True
config["unique_id"] = "veryunique2"
data = json.dumps(config)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla2/config", data)
await hass.async_block_till_done()
entity_id = ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique2")
assert entity_id is not None and hass.states.get(entity_id) is not None
# Remove the enabled entity, both entities and the device should be removed
async_fire_mqtt_message(hass, f"homeassistant/{domain}/bla2/config", "")
await hass.async_block_till_done()
assert not ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique1")
assert not ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, "veryunique2")
assert not dev_registry.async_get_device({("mqtt", "helloworld")})
async def help_test_entity_category(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: ConfigType,
) -> None:
"""Test device registry remove."""
await mqtt_mock_entry()
# Add device settings to config
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
config["device"] = copy.deepcopy(DEFAULT_CONFIG_DEVICE_INFO_ID)
ent_registry = er.async_get(hass)
# Discover an entity without entity category
unique_id = "veryunique1"
config["unique_id"] = unique_id
data = json.dumps(config)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/{unique_id}/config", data)
await hass.async_block_till_done()
entity_id = ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, unique_id)
assert entity_id is not None and hass.states.get(entity_id)
entry = ent_registry.async_get(entity_id)
assert entry is not None and entry.entity_category is None
# Discover an entity with entity category set to "config"
unique_id = "veryunique2"
config["entity_category"] = "config"
config["unique_id"] = unique_id
data = json.dumps(config)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/{unique_id}/config", data)
await hass.async_block_till_done()
entity_id = ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, unique_id)
assert entity_id is not None and hass.states.get(entity_id)
entry = ent_registry.async_get(entity_id)
assert entry is not None and entry.entity_category == "config"
# Discover an entity with entity category set to "no_such_category"
unique_id = "veryunique3"
config["entity_category"] = "no_such_category"
config["unique_id"] = unique_id
data = json.dumps(config)
async_fire_mqtt_message(hass, f"homeassistant/{domain}/{unique_id}/config", data)
await hass.async_block_till_done()
assert not ent_registry.async_get_entity_id(domain, mqtt.DOMAIN, unique_id)
async def help_test_publishing_with_custom_encoding(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
caplog: pytest.LogCaptureFixture,
domain: str,
config: ConfigType,
service: str,
topic: str,
parameters: dict[str, Any] | None,
payload: str,
template: str | None,
tpl_par: str = "value",
tpl_output: PublishPayloadType = None,
) -> None:
"""Test a service with publishing MQTT payload with different encoding."""
# prepare config for tests
test_config: dict[str, dict[str, Any]] = {
"test1": {"encoding": None, "cmd_tpl": False},
"test2": {"encoding": "utf-16", "cmd_tpl": False},
"test3": {"encoding": "", "cmd_tpl": False},
"test4": {"encoding": "invalid", "cmd_tpl": False},
"test5": {"encoding": "", "cmd_tpl": True},
}
setup_config = []
service_data = {}
for test_id, test_data in test_config.items():
test_config_setup: dict[str, Any] = copy.copy(config[mqtt.DOMAIN][domain])
test_config_setup.update(
{
topic: f"cmd/{test_id}",
"name": f"{test_id}",
}
)
if test_data["encoding"] is not None:
test_config_setup["encoding"] = test_data["encoding"]
if template and test_data["cmd_tpl"]:
test_config_setup[
template
] = f"{{{{ (('%.1f'|format({tpl_par}))[0] if is_number({tpl_par}) else {tpl_par}[0]) | ord | pack('b') }}}}"
setup_config.append(test_config_setup)
# setup service data
service_data[test_id] = {ATTR_ENTITY_ID: f"{domain}.{test_id}"}
if parameters:
service_data[test_id].update(parameters)
# setup test entities using discovery
mqtt_mock = await mqtt_mock_entry()
item: int = 0
for component_config in setup_config:
conf = json.dumps(component_config)
item += 1
async_fire_mqtt_message(
hass, f"homeassistant/{domain}/component_{item}/config", conf
)
await hass.async_block_till_done()
# 1) test with default encoding
await hass.services.async_call(
domain,
service,
service_data["test1"],
blocking=True,
)
await hass.async_block_till_done()
mqtt_mock.async_publish.assert_any_call("cmd/test1", str(payload), 0, False)
mqtt_mock.async_publish.reset_mock()
# 2) test with utf-16 encoding
await hass.services.async_call(
domain,
service,
service_data["test2"],
blocking=True,
)
mqtt_mock.async_publish.assert_any_call(
"cmd/test2", str(payload).encode("utf-16"), 0, False
)
mqtt_mock.async_publish.reset_mock()
# 3) test with no encoding set should fail if payload is a string
await hass.services.async_call(
domain,
service,
service_data["test3"],
blocking=True,
)
assert (
f"Can't pass-through payload for publishing {payload} on cmd/test3 with no encoding set, need 'bytes'"
in caplog.text
)
# 4) test with invalid encoding set should fail
await hass.services.async_call(
domain,
service,
service_data["test4"],
blocking=True,
)
assert (
f"Can't encode payload for publishing {payload} on cmd/test4 with encoding invalid"
in caplog.text
)
# 5) test with command template and raw encoding if specified
if not template:
return
await hass.services.async_call(
domain,
service,
service_data["test5"],
blocking=True,
)
mqtt_mock.async_publish.assert_any_call(
"cmd/test5", tpl_output or str(payload)[0].encode("utf-8"), 0, False
)
mqtt_mock.async_publish.reset_mock()
async def help_test_reload_with_config(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
tmp_path: Path,
config: ConfigType,
) -> None:
"""Test reloading with supplied config."""
new_yaml_config_file = tmp_path / "configuration.yaml"
new_yaml_config = yaml.dump(config)
new_yaml_config_file.write_text(new_yaml_config)
assert new_yaml_config_file.read_text() == new_yaml_config
with patch.object(module_hass_config, "YAML_CONFIG_FILE", new_yaml_config_file):
await hass.services.async_call(
"mqtt",
SERVICE_RELOAD,
{},
blocking=True,
)
await hass.async_block_till_done()
async def help_test_reloadable(
hass: HomeAssistant,
mqtt_client_mock: MqttMockPahoClient,
domain: str,
config: ConfigType,
) -> None:
"""Test reloading an MQTT platform."""
# Set up with empty config
config = copy.deepcopy(config[mqtt.DOMAIN][domain])
# Create and test an old config of 2 entities based on the config supplied
old_config_1 = copy.deepcopy(config)
old_config_1["name"] = "test_old_1"
old_config_2 = copy.deepcopy(config)
old_config_2["name"] = "test_old_2"
old_config = {
mqtt.DOMAIN: {domain: [old_config_1, old_config_2]},
}
# Start the MQTT entry with the old config
entry = MockConfigEntry(domain=mqtt.DOMAIN, data={mqtt.CONF_BROKER: "test-broker"})
entry.add_to_hass(hass)
mqtt_client_mock.connect.return_value = 0
with patch("homeassistant.config.load_yaml_config_file", return_value=old_config):
await entry.async_setup(hass)
assert hass.states.get(f"{domain}.test_old_1")
assert hass.states.get(f"{domain}.test_old_2")
assert len(hass.states.async_all(domain)) == 2
# Create temporary fixture for configuration.yaml based on the supplied config and
# test a reload with this new config
new_config_1 = copy.deepcopy(config)
new_config_1["name"] = "test_new_1"
new_config_2 = copy.deepcopy(config)
new_config_2["name"] = "test_new_2"
new_config_extra = copy.deepcopy(config)
new_config_extra["name"] = "test_new_3"
new_config = {
mqtt.DOMAIN: {domain: [new_config_1, new_config_2, new_config_extra]},
}
with patch("homeassistant.config.load_yaml_config_file", return_value=new_config):
# Reload the mqtt entry with the new config
await hass.services.async_call(
"mqtt",
SERVICE_RELOAD,
{},
blocking=True,
)
await hass.async_block_till_done()
assert len(hass.states.async_all(domain)) == 3
assert hass.states.get(f"{domain}.test_new_1")
assert hass.states.get(f"{domain}.test_new_2")
assert hass.states.get(f"{domain}.test_new_3")
Move manual configuration of MQTT fan and light to the integration key (#71676) * Processing yaml config through entry setup * Setup all platforms * Update homeassistant/components/mqtt/__init__.py Co-authored-by: Martin Hjelmare <marhje52@gmail.com> * adjust mock_mqtt - reference config from cache * Fix test config entry override * Add tests yaml setup * additional tests * Introduce PLATFORM_SCHEMA_MODERN * recover temporary MQTT_BASE_PLATFORM_SCHEMA * Allow extra key in light base schema, restore test * Fix test for exception on platform key * One deprecation message per platform * Remove deprecation checks from modern schema * Update homeassistant/components/mqtt/fan.py Co-authored-by: Erik Montnemery <erik@montnemery.com> * Update homeassistant/components/mqtt/fan.py Co-authored-by: Erik Montnemery <erik@montnemery.com> * Update homeassistant/components/mqtt/light/__init__.py Co-authored-by: Erik Montnemery <erik@montnemery.com> * Update homeassistant/components/mqtt/light/__init__.py Co-authored-by: Erik Montnemery <erik@montnemery.com> * Update homeassistant/components/mqtt/light/schema_json.py Co-authored-by: Erik Montnemery <erik@montnemery.com> * Update homeassistant/components/mqtt/light/schema_template.py Co-authored-by: Erik Montnemery <erik@montnemery.com> * Update homeassistant/components/mqtt/mixins.py Co-authored-by: Erik Montnemery <erik@montnemery.com> * rename validate_modern_schema * Do not fail platform if a single config is broken * Update homeassistant/components/mqtt/__init__.py Co-authored-by: Erik Montnemery <erik@montnemery.com> * Fix tests on asserting log * Update log. Make helper transparant, remove patch * Perform parallel processing * Update tests/components/mqtt/test_init.py Co-authored-by: Martin Hjelmare <marhje52@gmail.com> * Apply suggestions from code review Co-authored-by: Martin Hjelmare <marhje52@gmail.com> * Update homeassistant/components/mqtt/mixins.py Co-authored-by: Martin Hjelmare <marhje52@gmail.com> * black * Fix tests and add #new_format anchor Co-authored-by: Martin Hjelmare <marhje52@gmail.com> Co-authored-by: Erik Montnemery <erik@montnemery.com>
2022-05-19 13:04:53 +00:00
async def help_test_unload_config_entry(hass: HomeAssistant) -> None:
"""Test unloading the MQTT config entry."""
mqtt_config_entry = hass.config_entries.async_entries(mqtt.DOMAIN)[0]
assert mqtt_config_entry.state is ConfigEntryState.LOADED
assert await hass.config_entries.async_unload(mqtt_config_entry.entry_id)
# work-a-round mypy bug https://github.com/python/mypy/issues/9005#issuecomment-1280985006
updated_config_entry = mqtt_config_entry
assert updated_config_entry.state is ConfigEntryState.NOT_LOADED
await hass.async_block_till_done()
async def help_test_unload_config_entry_with_platform(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
domain: str,
config: dict[str, dict[str, Any]],
) -> None:
"""Test unloading the MQTT config entry with a specific platform domain."""
# prepare setup through configuration.yaml
config_setup: dict[str, dict[str, Any]] = copy.deepcopy(config)
config_setup[mqtt.DOMAIN][domain]["name"] = "config_setup"
config_name = config_setup
with patch("homeassistant.config.load_yaml_config_file", return_value=config_name):
await mqtt_mock_entry()
# prepare setup through discovery
discovery_setup = copy.deepcopy(config[mqtt.DOMAIN][domain])
discovery_setup["name"] = "discovery_setup"
async_fire_mqtt_message(
hass, f"homeassistant/{domain}/bla/config", json.dumps(discovery_setup)
)
await hass.async_block_till_done()
# check if both entities were setup correctly
config_setup_entity = hass.states.get(f"{domain}.config_setup")
assert config_setup_entity
discovery_setup_entity = hass.states.get(f"{domain}.discovery_setup")
assert discovery_setup_entity
await help_test_unload_config_entry(hass)
async_fire_mqtt_message(
hass, f"homeassistant/{domain}/bla/config", json.dumps(discovery_setup)
)
await hass.async_block_till_done()
# check if both entities were unloaded correctly
config_setup_entity = hass.states.get(f"{domain}.{config_name}")
assert config_setup_entity is None
discovery_setup_entity = hass.states.get(f"{domain}.discovery_setup")
assert discovery_setup_entity is None
async def help_test_discovery_setup(
hass: HomeAssistant, domain: str, discovery_data_payload: str, name: str
) -> None:
"""Test setting up an MQTT entity using discovery."""
async_fire_mqtt_message(
hass, f"homeassistant/{domain}/{name}/config", discovery_data_payload
)
await hass.async_block_till_done()
state = hass.states.get(f"{domain}.{name}")
assert state and state.state is not None