"""The tests for the MQTT client.""" import asyncio from datetime import timedelta import socket import ssl import time from typing import Any from unittest.mock import MagicMock, Mock, call, patch import certifi import paho.mqtt.client as paho_mqtt import pytest from homeassistant.components import mqtt from homeassistant.components.mqtt.client import RECONNECT_INTERVAL_SECONDS from homeassistant.components.mqtt.const import SUPPORTED_COMPONENTS from homeassistant.components.mqtt.models import MessageCallbackType, ReceiveMessage from homeassistant.config_entries import ConfigEntryDisabler, ConfigEntryState from homeassistant.const import ( CONF_PROTOCOL, EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP, UnitOfTemperature, ) from homeassistant.core import CALLBACK_TYPE, CoreState, HomeAssistant, callback from homeassistant.exceptions import HomeAssistantError from homeassistant.util.dt import utcnow from .conftest import ENTRY_DEFAULT_BIRTH_MESSAGE from .test_common import help_all_subscribe_calls from tests.common import ( MockConfigEntry, async_fire_mqtt_message, async_fire_time_changed, ) from tests.typing import MqttMockHAClient, MqttMockHAClientGenerator, MqttMockPahoClient def help_assert_message( msg: ReceiveMessage, topic: str | None = None, payload: str | None = None, qos: int | None = None, retain: bool | None = None, ) -> bool: """Return True if all of the given attributes match with the message.""" match: bool = True if topic is not None: match &= msg.topic == topic if payload is not None: match &= msg.payload == payload if qos is not None: match &= msg.qos == qos if retain is not None: match &= msg.retain == retain return match async def test_mqtt_connects_on_home_assistant_mqtt_setup( hass: HomeAssistant, setup_with_birth_msg_client_mock: MqttMockPahoClient ) -> None: """Test if client is connected after mqtt init on bootstrap.""" mqtt_client_mock = setup_with_birth_msg_client_mock assert mqtt_client_mock.connect.call_count == 1 async def test_mqtt_does_not_disconnect_on_home_assistant_stop( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test if client is not disconnected on HA stop.""" mqtt_client_mock = setup_with_birth_msg_client_mock hass.bus.fire(EVENT_HOMEASSISTANT_STOP) await mock_debouncer.wait() assert mqtt_client_mock.disconnect.call_count == 0 async def test_mqtt_await_ack_at_disconnect(hass: HomeAssistant) -> None: """Test if ACK is awaited correctly when disconnecting.""" class FakeInfo: """Returns a simulated client publish response.""" mid = 100 rc = 0 with patch( "homeassistant.components.mqtt.async_client.AsyncMQTTClient" ) as mock_client: mqtt_client = mock_client.return_value mqtt_client.connect = MagicMock( return_value=0, side_effect=lambda *args, **kwargs: hass.loop.call_soon_threadsafe( mqtt_client.on_connect, mqtt_client, None, 0, 0, 0 ), ) mqtt_client.publish = MagicMock(return_value=FakeInfo()) entry = MockConfigEntry( domain=mqtt.DOMAIN, data={ "certificate": "auto", mqtt.CONF_BROKER: "test-broker", mqtt.CONF_DISCOVERY: False, }, ) entry.add_to_hass(hass) assert await hass.config_entries.async_setup(entry.entry_id) mqtt_client = mock_client.return_value # publish from MQTT client without awaiting hass.async_create_task( mqtt.async_publish(hass, "test-topic", "some-payload", 0, False) ) await asyncio.sleep(0) # Simulate late ACK callback from client with mid 100 mqtt_client.on_publish(0, 0, 100) # disconnect the MQTT client await hass.async_stop() await hass.async_block_till_done() # assert the payload was sent through the client assert mqtt_client.publish.called assert mqtt_client.publish.call_args[0] == ( "test-topic", "some-payload", 0, False, ) await hass.async_block_till_done(wait_background_tasks=True) @pytest.mark.parametrize("mqtt_config_entry_data", [ENTRY_DEFAULT_BIRTH_MESSAGE]) async def test_publish( hass: HomeAssistant, setup_with_birth_msg_client_mock: MqttMockPahoClient ) -> None: """Test the publish function.""" publish_mock: MagicMock = setup_with_birth_msg_client_mock.publish await mqtt.async_publish(hass, "test-topic", "test-payload") await hass.async_block_till_done() assert publish_mock.called assert publish_mock.call_args[0] == ( "test-topic", "test-payload", 0, False, ) publish_mock.reset_mock() await mqtt.async_publish(hass, "test-topic", "test-payload", 2, True) await hass.async_block_till_done() assert publish_mock.called assert publish_mock.call_args[0] == ( "test-topic", "test-payload", 2, True, ) publish_mock.reset_mock() mqtt.publish(hass, "test-topic2", "test-payload2") await hass.async_block_till_done() assert publish_mock.called assert publish_mock.call_args[0] == ( "test-topic2", "test-payload2", 0, False, ) publish_mock.reset_mock() mqtt.publish(hass, "test-topic2", "test-payload2", 2, True) await hass.async_block_till_done() assert publish_mock.called assert publish_mock.call_args[0] == ( "test-topic2", "test-payload2", 2, True, ) publish_mock.reset_mock() # test binary pass-through mqtt.publish( hass, "test-topic3", b"\xde\xad\xbe\xef", 0, False, ) await hass.async_block_till_done() assert publish_mock.called assert publish_mock.call_args[0] == ( "test-topic3", b"\xde\xad\xbe\xef", 0, False, ) publish_mock.reset_mock() # test null payload mqtt.publish( hass, "test-topic3", None, 0, False, ) await hass.async_block_till_done() assert publish_mock.called assert publish_mock.call_args[0] == ( "test-topic3", None, 0, False, ) publish_mock.reset_mock() async def test_convert_outgoing_payload(hass: HomeAssistant) -> None: """Test the converting of outgoing MQTT payloads without template.""" command_template = mqtt.MqttCommandTemplate(None) assert command_template.async_render(b"\xde\xad\xbe\xef") == b"\xde\xad\xbe\xef" assert ( command_template.async_render("b'\\xde\\xad\\xbe\\xef'") == "b'\\xde\\xad\\xbe\\xef'" ) assert command_template.async_render(1234) == 1234 assert command_template.async_render(1234.56) == 1234.56 assert command_template.async_render(None) is None async def test_all_subscriptions_run_when_decode_fails( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test all other subscriptions still run when decode fails for one.""" await mqtt_mock_entry() await mqtt.async_subscribe(hass, "test-topic", record_calls, encoding="ascii") await mqtt.async_subscribe(hass, "test-topic", record_calls) async_fire_mqtt_message(hass, "test-topic", UnitOfTemperature.CELSIUS) await hass.async_block_till_done() assert len(recorded_calls) == 1 async def test_subscribe_topic( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of a topic.""" await mqtt_mock_entry() unsub = await mqtt.async_subscribe(hass, "test-topic", record_calls) async_fire_mqtt_message(hass, "test-topic", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 1 assert recorded_calls[0].topic == "test-topic" assert recorded_calls[0].payload == "test-payload" unsub() async_fire_mqtt_message(hass, "test-topic", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 1 # Cannot unsubscribe twice with pytest.raises(HomeAssistantError): unsub() @pytest.mark.usefixtures("mqtt_mock_entry") async def test_subscribe_topic_not_initialize( hass: HomeAssistant, record_calls: MessageCallbackType ) -> None: """Test the subscription of a topic when MQTT was not initialized.""" with pytest.raises( HomeAssistantError, match=r".*make sure MQTT is set up correctly" ): await mqtt.async_subscribe(hass, "test-topic", record_calls) async def test_subscribe_mqtt_config_entry_disabled( hass: HomeAssistant, mqtt_mock: MqttMockHAClient, record_calls: MessageCallbackType ) -> None: """Test the subscription of a topic when MQTT config entry is disabled.""" mqtt_mock.connected = True mqtt_config_entry = hass.config_entries.async_entries(mqtt.DOMAIN)[0] mqtt_config_entry_state = mqtt_config_entry.state assert mqtt_config_entry_state is ConfigEntryState.LOADED assert await hass.config_entries.async_unload(mqtt_config_entry.entry_id) mqtt_config_entry_state = mqtt_config_entry.state assert mqtt_config_entry_state is ConfigEntryState.NOT_LOADED await hass.config_entries.async_set_disabled_by( mqtt_config_entry.entry_id, ConfigEntryDisabler.USER ) mqtt_mock.connected = False with pytest.raises(HomeAssistantError, match=r".*MQTT is not enabled"): await mqtt.async_subscribe(hass, "test-topic", record_calls) async def test_subscribe_and_resubscribe( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test resubscribing within the debounce time.""" mqtt_client_mock = setup_with_birth_msg_client_mock with ( patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.4), patch("homeassistant.components.mqtt.client.UNSUBSCRIBE_COOLDOWN", 0.4), ): mock_debouncer.clear() unsub = await mqtt.async_subscribe(hass, "test-topic", record_calls) # This unsub will be un-done with the following subscribe # unsubscribe should not be called at the broker unsub() unsub = await mqtt.async_subscribe(hass, "test-topic", record_calls) await mock_debouncer.wait() mock_debouncer.clear() async_fire_mqtt_message(hass, "test-topic", "test-payload") assert len(recorded_calls) == 1 assert recorded_calls[0].topic == "test-topic" assert recorded_calls[0].payload == "test-payload" # assert unsubscribe was not called mqtt_client_mock.unsubscribe.assert_not_called() mock_debouncer.clear() unsub() await mock_debouncer.wait() mqtt_client_mock.unsubscribe.assert_called_once_with(["test-topic"]) async def test_subscribe_topic_non_async( hass: HomeAssistant, mock_debouncer: asyncio.Event, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of a topic using the non-async function.""" await mqtt_mock_entry() await mock_debouncer.wait() mock_debouncer.clear() unsub = await hass.async_add_executor_job( mqtt.subscribe, hass, "test-topic", record_calls ) await mock_debouncer.wait() async_fire_mqtt_message(hass, "test-topic", "test-payload") assert len(recorded_calls) == 1 assert recorded_calls[0].topic == "test-topic" assert recorded_calls[0].payload == "test-payload" mock_debouncer.clear() await hass.async_add_executor_job(unsub) await mock_debouncer.wait() async_fire_mqtt_message(hass, "test-topic", "test-payload") assert len(recorded_calls) == 1 async def test_subscribe_bad_topic( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, record_calls: MessageCallbackType, ) -> None: """Test the subscription of a topic.""" await mqtt_mock_entry() with pytest.raises(HomeAssistantError): await mqtt.async_subscribe(hass, 55, record_calls) # type: ignore[arg-type] async def test_subscribe_topic_not_match( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test if subscribed topic is not a match.""" await mqtt_mock_entry() await mqtt.async_subscribe(hass, "test-topic", record_calls) async_fire_mqtt_message(hass, "another-test-topic", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 0 async def test_subscribe_topic_level_wildcard( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of wildcard topics.""" await mqtt_mock_entry() await mqtt.async_subscribe(hass, "test-topic/+/on", record_calls) async_fire_mqtt_message(hass, "test-topic/bier/on", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 1 assert recorded_calls[0].topic == "test-topic/bier/on" assert recorded_calls[0].payload == "test-payload" async def test_subscribe_topic_level_wildcard_no_subtree_match( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of wildcard topics.""" await mqtt_mock_entry() await mqtt.async_subscribe(hass, "test-topic/+/on", record_calls) async_fire_mqtt_message(hass, "test-topic/bier", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 0 async def test_subscribe_topic_level_wildcard_root_topic_no_subtree_match( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of wildcard topics.""" await mqtt_mock_entry() await mqtt.async_subscribe(hass, "test-topic/#", record_calls) async_fire_mqtt_message(hass, "test-topic-123", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 0 async def test_subscribe_topic_subtree_wildcard_subtree_topic( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of wildcard topics.""" await mqtt_mock_entry() await mqtt.async_subscribe(hass, "test-topic/#", record_calls) async_fire_mqtt_message(hass, "test-topic/bier/on", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 1 assert recorded_calls[0].topic == "test-topic/bier/on" assert recorded_calls[0].payload == "test-payload" async def test_subscribe_topic_subtree_wildcard_root_topic( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of wildcard topics.""" await mqtt_mock_entry() await mqtt.async_subscribe(hass, "test-topic/#", record_calls) async_fire_mqtt_message(hass, "test-topic", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 1 assert recorded_calls[0].topic == "test-topic" assert recorded_calls[0].payload == "test-payload" async def test_subscribe_topic_subtree_wildcard_no_match( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of wildcard topics.""" await mqtt_mock_entry() await mqtt.async_subscribe(hass, "test-topic/#", record_calls) async_fire_mqtt_message(hass, "another-test-topic", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 0 async def test_subscribe_topic_level_wildcard_and_wildcard_root_topic( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of wildcard topics.""" await mqtt_mock_entry() await mqtt.async_subscribe(hass, "+/test-topic/#", record_calls) async_fire_mqtt_message(hass, "hi/test-topic", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 1 assert recorded_calls[0].topic == "hi/test-topic" assert recorded_calls[0].payload == "test-payload" async def test_subscribe_topic_level_wildcard_and_wildcard_subtree_topic( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of wildcard topics.""" await mqtt_mock_entry() await mqtt.async_subscribe(hass, "+/test-topic/#", record_calls) async_fire_mqtt_message(hass, "hi/test-topic/here-iam", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 1 assert recorded_calls[0].topic == "hi/test-topic/here-iam" assert recorded_calls[0].payload == "test-payload" async def test_subscribe_topic_level_wildcard_and_wildcard_level_no_match( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of wildcard topics.""" await mqtt_mock_entry() await mqtt.async_subscribe(hass, "+/test-topic/#", record_calls) async_fire_mqtt_message(hass, "hi/here-iam/test-topic", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 0 async def test_subscribe_topic_level_wildcard_and_wildcard_no_match( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of wildcard topics.""" await mqtt_mock_entry() await mqtt.async_subscribe(hass, "+/test-topic/#", record_calls) async_fire_mqtt_message(hass, "hi/another-test-topic", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 0 async def test_subscribe_topic_sys_root( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of $ root topics.""" await mqtt_mock_entry() await mqtt.async_subscribe(hass, "$test-topic/subtree/on", record_calls) async_fire_mqtt_message(hass, "$test-topic/subtree/on", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 1 assert recorded_calls[0].topic == "$test-topic/subtree/on" assert recorded_calls[0].payload == "test-payload" async def test_subscribe_topic_sys_root_and_wildcard_topic( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of $ root and wildcard topics.""" await mqtt_mock_entry() await mqtt.async_subscribe(hass, "$test-topic/#", record_calls) async_fire_mqtt_message(hass, "$test-topic/some-topic", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 1 assert recorded_calls[0].topic == "$test-topic/some-topic" assert recorded_calls[0].payload == "test-payload" async def test_subscribe_topic_sys_root_and_wildcard_subtree_topic( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of $ root and wildcard subtree topics.""" await mqtt_mock_entry() await mqtt.async_subscribe(hass, "$test-topic/subtree/#", record_calls) async_fire_mqtt_message(hass, "$test-topic/subtree/some-topic", "test-payload") await hass.async_block_till_done() assert len(recorded_calls) == 1 assert recorded_calls[0].topic == "$test-topic/subtree/some-topic" assert recorded_calls[0].payload == "test-payload" async def test_subscribe_special_characters( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription to topics with special characters.""" await mqtt_mock_entry() topic = "/test-topic/$(.)[^]{-}" payload = "p4y.l[]a|> ?" await mqtt.async_subscribe(hass, topic, record_calls) async_fire_mqtt_message(hass, topic, payload) await hass.async_block_till_done() assert len(recorded_calls) == 1 assert recorded_calls[0].topic == topic assert recorded_calls[0].payload == payload async def test_subscribe_same_topic( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test subscribing to same topic twice and simulate retained messages. When subscribing to the same topic again, SUBSCRIBE must be sent to the broker again for it to resend any retained messages. """ mqtt_client_mock = setup_with_birth_msg_client_mock calls_a: list[ReceiveMessage] = [] calls_b: list[ReceiveMessage] = [] @callback def _callback_a(msg: ReceiveMessage) -> None: calls_a.append(msg) @callback def _callback_b(msg: ReceiveMessage) -> None: calls_b.append(msg) mqtt_client_mock.reset_mock() mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/state", _callback_a, qos=0) # Simulate a non retained message after the first subscription async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False) await mock_debouncer.wait() assert len(calls_a) == 1 mqtt_client_mock.subscribe.assert_called() calls_a = [] mqtt_client_mock.reset_mock() await hass.async_block_till_done() mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/state", _callback_b, qos=1) # Simulate an other non retained message after the second subscription async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False) await mock_debouncer.wait() # Both subscriptions should receive updates assert len(calls_a) == 1 assert len(calls_b) == 1 mqtt_client_mock.subscribe.assert_called() async def test_replaying_payload_same_topic( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test replaying retained messages. When subscribing to the same topic again, SUBSCRIBE must be sent to the broker again for it to resend any retained messages for new subscriptions. Retained messages must only be replayed for new subscriptions, except when the MQTT client is reconnecting. """ mqtt_client_mock = setup_with_birth_msg_client_mock calls_a: list[ReceiveMessage] = [] calls_b: list[ReceiveMessage] = [] @callback def _callback_a(msg: ReceiveMessage) -> None: calls_a.append(msg) @callback def _callback_b(msg: ReceiveMessage) -> None: calls_b.append(msg) mqtt_client_mock.reset_mock() mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/state", _callback_a) await mock_debouncer.wait() async_fire_mqtt_message( hass, "test/state", "online", qos=0, retain=True ) # Simulate a (retained) message played back assert len(calls_a) == 1 mqtt_client_mock.subscribe.assert_called() calls_a = [] mqtt_client_mock.reset_mock() mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/state", _callback_b) await mock_debouncer.wait() # Simulate edge case where non retained message was received # after subscription at HA but before the debouncer delay was passed. # The message without retain flag directly after a subscription should # be processed by both subscriptions. async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False) # Simulate a (retained) message played back on new subscriptions async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True) # The current subscription only received the message without retain flag assert len(calls_a) == 1 assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=False) # The retained message playback should only be processed by the new subscription. # The existing subscription already got the latest update, hence the existing # subscription should not receive the replayed (retained) message. # Messages without retain flag are received on both subscriptions. assert len(calls_b) == 2 assert help_assert_message(calls_b[0], "test/state", "online", qos=0, retain=False) assert help_assert_message(calls_b[1], "test/state", "online", qos=0, retain=True) mqtt_client_mock.subscribe.assert_called() calls_a = [] calls_b = [] mqtt_client_mock.reset_mock() # Simulate new message played back on new subscriptions # After connecting the retain flag will not be set, even if the # payload published was retained, we cannot see that async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False) assert len(calls_a) == 1 assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=False) assert len(calls_b) == 1 assert help_assert_message(calls_b[0], "test/state", "online", qos=0, retain=False) # Now simulate the broker was disconnected shortly calls_a = [] calls_b = [] mqtt_client_mock.reset_mock() mqtt_client_mock.on_disconnect(None, None, 0) mock_debouncer.clear() mqtt_client_mock.on_connect(None, None, None, 0) await mock_debouncer.wait() mqtt_client_mock.subscribe.assert_called() # Simulate a (retained) message played back after reconnecting async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True) # Both subscriptions now should replay the retained message assert len(calls_a) == 1 assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True) assert len(calls_b) == 1 assert help_assert_message(calls_b[0], "test/state", "online", qos=0, retain=True) async def test_replaying_payload_after_resubscribing( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test replaying and filtering retained messages after resubscribing. When subscribing to the same topic again, SUBSCRIBE must be sent to the broker again for it to resend any retained messages for new subscriptions. Retained messages must only be replayed for new subscriptions, except when the MQTT client is reconnection. """ mqtt_client_mock = setup_with_birth_msg_client_mock calls_a: list[ReceiveMessage] = [] @callback def _callback_a(msg: ReceiveMessage) -> None: calls_a.append(msg) mqtt_client_mock.reset_mock() mock_debouncer.clear() unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a) await mock_debouncer.wait() mqtt_client_mock.subscribe.assert_called() # Simulate a (retained) message played back async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True) assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True) calls_a.clear() # Test we get updates async_fire_mqtt_message(hass, "test/state", "offline", qos=0, retain=False) assert help_assert_message(calls_a[0], "test/state", "offline", qos=0, retain=False) calls_a.clear() # Test we filter new retained updates async_fire_mqtt_message(hass, "test/state", "offline", qos=0, retain=True) await hass.async_block_till_done() assert len(calls_a) == 0 # Unsubscribe an resubscribe again mock_debouncer.clear() unsub() unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a) await mock_debouncer.wait() mqtt_client_mock.subscribe.assert_called() # Simulate we can receive a (retained) played back message again async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True) assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True) async def test_replaying_payload_wildcard_topic( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test replaying retained messages. When we have multiple subscriptions to the same wildcard topic, SUBSCRIBE must be sent to the broker again for it to resend any retained messages for new subscriptions. Retained messages should only be replayed for new subscriptions, except when the MQTT client is reconnection. """ mqtt_client_mock = setup_with_birth_msg_client_mock calls_a: list[ReceiveMessage] = [] calls_b: list[ReceiveMessage] = [] @callback def _callback_a(msg: ReceiveMessage) -> None: calls_a.append(msg) @callback def _callback_b(msg: ReceiveMessage) -> None: calls_b.append(msg) mqtt_client_mock.reset_mock() mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/#", _callback_a) await mock_debouncer.wait() # Simulate (retained) messages being played back on new subscriptions async_fire_mqtt_message(hass, "test/state1", "new_value_1", qos=0, retain=True) async_fire_mqtt_message(hass, "test/state2", "new_value_2", qos=0, retain=True) assert len(calls_a) == 2 mqtt_client_mock.subscribe.assert_called() calls_a = [] mqtt_client_mock.reset_mock() # resubscribe to the wild card topic again mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/#", _callback_b) await mock_debouncer.wait() # Simulate (retained) messages being played back on new subscriptions async_fire_mqtt_message(hass, "test/state1", "initial_value_1", qos=0, retain=True) async_fire_mqtt_message(hass, "test/state2", "initial_value_2", qos=0, retain=True) # The retained messages playback should only be processed for the new subscriptions assert len(calls_a) == 0 assert len(calls_b) == 2 mqtt_client_mock.subscribe.assert_called() calls_a = [] calls_b = [] mqtt_client_mock.reset_mock() # Simulate new messages being received async_fire_mqtt_message(hass, "test/state1", "update_value_1", qos=0, retain=False) async_fire_mqtt_message(hass, "test/state2", "update_value_2", qos=0, retain=False) assert len(calls_a) == 2 assert len(calls_b) == 2 # Now simulate the broker was disconnected shortly calls_a = [] calls_b = [] mqtt_client_mock.reset_mock() mqtt_client_mock.on_disconnect(None, None, 0) mock_debouncer.clear() mqtt_client_mock.on_connect(None, None, None, 0) await mock_debouncer.wait() mqtt_client_mock.subscribe.assert_called() # Simulate the (retained) messages are played back after reconnecting # for all subscriptions async_fire_mqtt_message(hass, "test/state1", "update_value_1", qos=0, retain=True) async_fire_mqtt_message(hass, "test/state2", "update_value_2", qos=0, retain=True) # Both subscriptions should replay assert len(calls_a) == 2 assert len(calls_b) == 2 async def test_not_calling_unsubscribe_with_active_subscribers( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, ) -> None: """Test not calling unsubscribe() when other subscribers are active.""" mqtt_client_mock = setup_with_birth_msg_client_mock mqtt_client_mock.reset_mock() mock_debouncer.clear() unsub = await mqtt.async_subscribe(hass, "test/state", record_calls, 2) await mqtt.async_subscribe(hass, "test/state", record_calls, 1) await mock_debouncer.wait() assert mqtt_client_mock.subscribe.called mock_debouncer.clear() unsub() await hass.async_block_till_done() await hass.async_block_till_done(wait_background_tasks=True) async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown assert not mqtt_client_mock.unsubscribe.called assert not mock_debouncer.is_set() async def test_not_calling_subscribe_when_unsubscribed_within_cooldown( hass: HomeAssistant, mock_debouncer: asyncio.Event, mqtt_mock_entry: MqttMockHAClientGenerator, record_calls: MessageCallbackType, ) -> None: """Test not calling subscribe() when it is unsubscribed. Make sure subscriptions are cleared if unsubscribed before the subscribe cool down period has ended. """ mqtt_mock = await mqtt_mock_entry() mqtt_client_mock = mqtt_mock._mqttc await mock_debouncer.wait() mock_debouncer.clear() mqtt_client_mock.subscribe.reset_mock() unsub = await mqtt.async_subscribe(hass, "test/state", record_calls) unsub() await mock_debouncer.wait() # The debouncer executes without an pending subscribes assert not mqtt_client_mock.subscribe.called async def test_unsubscribe_race( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test not calling unsubscribe() when other subscribers are active.""" mqtt_client_mock = setup_with_birth_msg_client_mock calls_a: list[ReceiveMessage] = [] calls_b: list[ReceiveMessage] = [] @callback def _callback_a(msg: ReceiveMessage) -> None: calls_a.append(msg) @callback def _callback_b(msg: ReceiveMessage) -> None: calls_b.append(msg) mqtt_client_mock.reset_mock() mock_debouncer.clear() unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a) unsub() await mqtt.async_subscribe(hass, "test/state", _callback_b) await mock_debouncer.wait() async_fire_mqtt_message(hass, "test/state", "online") assert not calls_a assert calls_b # We allow either calls [subscribe, unsubscribe, subscribe], [subscribe, subscribe] or # when both subscriptions were combined [subscribe] expected_calls_1 = [ call.subscribe([("test/state", 0)]), call.unsubscribe("test/state"), call.subscribe([("test/state", 0)]), ] expected_calls_2 = [ call.subscribe([("test/state", 0)]), call.subscribe([("test/state", 0)]), ] expected_calls_3 = [ call.subscribe([("test/state", 0)]), ] assert mqtt_client_mock.mock_calls in ( expected_calls_1, expected_calls_2, expected_calls_3, ) @pytest.mark.parametrize( "mqtt_config_entry_data", [{mqtt.CONF_BROKER: "mock-broker", mqtt.CONF_DISCOVERY: False}], ) async def test_restore_subscriptions_on_reconnect( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, ) -> None: """Test subscriptions are restored on reconnect.""" mqtt_client_mock = setup_with_birth_msg_client_mock mqtt_client_mock.reset_mock() mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/state", record_calls) async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown await mock_debouncer.wait() assert ("test/state", 0) in help_all_subscribe_calls(mqtt_client_mock) mqtt_client_mock.reset_mock() mqtt_client_mock.on_disconnect(None, None, 0) mock_debouncer.clear() mqtt_client_mock.on_connect(None, None, None, 0) await mock_debouncer.wait() assert ("test/state", 0) in help_all_subscribe_calls(mqtt_client_mock) @pytest.mark.parametrize( "mqtt_config_entry_data", [{mqtt.CONF_BROKER: "mock-broker", mqtt.CONF_DISCOVERY: False}], ) async def test_restore_all_active_subscriptions_on_reconnect( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, ) -> None: """Test active subscriptions are restored correctly on reconnect.""" mqtt_client_mock = setup_with_birth_msg_client_mock mqtt_client_mock.reset_mock() mock_debouncer.clear() unsub = await mqtt.async_subscribe(hass, "test/state", record_calls, qos=2) await mqtt.async_subscribe(hass, "test/state", record_calls, qos=1) await mqtt.async_subscribe(hass, "test/state", record_calls, qos=0) # cooldown await mock_debouncer.wait() # the subscription with the highest QoS should survive expected = [ call([("test/state", 2)]), ] assert mqtt_client_mock.subscribe.mock_calls == expected unsub() assert mqtt_client_mock.unsubscribe.call_count == 0 mqtt_client_mock.on_disconnect(None, None, 0) mock_debouncer.clear() mqtt_client_mock.on_connect(None, None, None, 0) # wait for cooldown await mock_debouncer.wait() expected.append(call([("test/state", 1)])) for expected_call in expected: assert mqtt_client_mock.subscribe.hass_call(expected_call) @pytest.mark.parametrize( "mqtt_config_entry_data", [{mqtt.CONF_BROKER: "mock-broker", mqtt.CONF_DISCOVERY: False}], ) async def test_subscribed_at_highest_qos( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, ) -> None: """Test the highest qos as assigned when subscribing to the same topic.""" mqtt_client_mock = setup_with_birth_msg_client_mock mqtt_client_mock.reset_mock() mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/state", record_calls, qos=0) await hass.async_block_till_done() # cooldown await mock_debouncer.wait() assert ("test/state", 0) in help_all_subscribe_calls(mqtt_client_mock) mqtt_client_mock.reset_mock() mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/state", record_calls, qos=1) await mqtt.async_subscribe(hass, "test/state", record_calls, qos=2) # cooldown await mock_debouncer.wait() # the subscription with the highest QoS should survive assert help_all_subscribe_calls(mqtt_client_mock) == [("test/state", 2)] async def test_initial_setup_logs_error( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mqtt_client_mock: MqttMockPahoClient, ) -> None: """Test for setup failure if initial client connection fails.""" entry = MockConfigEntry(domain=mqtt.DOMAIN, data={mqtt.CONF_BROKER: "test-broker"}) entry.add_to_hass(hass) mqtt_client_mock.connect.side_effect = MagicMock(return_value=1) try: assert await hass.config_entries.async_setup(entry.entry_id) except HomeAssistantError: assert True assert "Failed to connect to MQTT server:" in caplog.text async def test_logs_error_if_no_connect_broker( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test for setup failure if connection to broker is missing.""" mqtt_client_mock = setup_with_birth_msg_client_mock # test with rc = 3 -> broker unavailable mqtt_client_mock.on_disconnect(Mock(), None, 0) mqtt_client_mock.on_connect(Mock(), None, None, 3) await hass.async_block_till_done() assert ( "Unable to connect to the MQTT broker: Connection Refused: broker unavailable." in caplog.text ) @pytest.mark.parametrize("return_code", [4, 5]) async def test_triggers_reauth_flow_if_auth_fails( hass: HomeAssistant, setup_with_birth_msg_client_mock: MqttMockPahoClient, return_code: int, ) -> None: """Test re-auth is triggered if authentication is failing.""" mqtt_client_mock = setup_with_birth_msg_client_mock # test with rc = 4 -> CONNACK_REFUSED_NOT_AUTHORIZED and 5 -> CONNACK_REFUSED_BAD_USERNAME_PASSWORD mqtt_client_mock.on_disconnect(Mock(), None, 0) mqtt_client_mock.on_connect(Mock(), None, None, return_code) await hass.async_block_till_done() flows = hass.config_entries.flow.async_progress() assert len(flows) == 1 assert flows[0]["context"]["source"] == "reauth" @patch("homeassistant.components.mqtt.client.TIMEOUT_ACK", 0.3) async def test_handle_mqtt_on_callback( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test receiving an ACK callback before waiting for it.""" mqtt_client_mock = setup_with_birth_msg_client_mock with patch.object(mqtt_client_mock, "get_mid", return_value=100): # Simulate an ACK for mid == 100, this will call mqtt_mock._async_get_mid_future(mid) mqtt_client_mock.on_publish(mqtt_client_mock, None, 100) await hass.async_block_till_done() # Make sure the ACK has been received await hass.async_block_till_done() # Now call publish without call back, this will call _async_async_wait_for_mid(msg_info.mid) await mqtt.async_publish(hass, "no_callback/test-topic", "test-payload") # Since the mid event was already set, we should not see any timeout warning in the log await hass.async_block_till_done() assert "No ACK from MQTT server" not in caplog.text async def test_handle_mqtt_on_callback_after_cancellation( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mqtt_mock_entry: MqttMockHAClientGenerator, mqtt_client_mock: MqttMockPahoClient, ) -> None: """Test receiving an ACK after a cancellation.""" mqtt_mock = await mqtt_mock_entry() # Simulate the mid future getting a cancellation mqtt_mock()._async_get_mid_future(101).cancel() # Simulate an ACK for mid == 101, being received after the cancellation mqtt_client_mock.on_publish(mqtt_client_mock, None, 101) await hass.async_block_till_done() assert "No ACK from MQTT server" not in caplog.text assert "InvalidStateError" not in caplog.text async def test_handle_mqtt_on_callback_after_timeout( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mqtt_mock_entry: MqttMockHAClientGenerator, mqtt_client_mock: MqttMockPahoClient, ) -> None: """Test receiving an ACK after a timeout.""" mqtt_mock = await mqtt_mock_entry() # Simulate the mid future getting a timeout mqtt_mock()._async_get_mid_future(101).set_exception(asyncio.TimeoutError) # Simulate an ACK for mid == 101, being received after the timeout mqtt_client_mock.on_publish(mqtt_client_mock, None, 101) await hass.async_block_till_done() assert "No ACK from MQTT server" not in caplog.text assert "InvalidStateError" not in caplog.text async def test_publish_error( hass: HomeAssistant, caplog: pytest.LogCaptureFixture ) -> None: """Test publish error.""" entry = MockConfigEntry(domain=mqtt.DOMAIN, data={mqtt.CONF_BROKER: "test-broker"}) entry.add_to_hass(hass) # simulate an Out of memory error with patch( "homeassistant.components.mqtt.async_client.AsyncMQTTClient" ) as mock_client: mock_client().connect = lambda *args: 1 mock_client().publish().rc = 1 assert await hass.config_entries.async_setup(entry.entry_id) with pytest.raises(HomeAssistantError): await mqtt.async_publish( hass, "some-topic", b"test-payload", qos=0, retain=False, encoding=None ) assert "Failed to connect to MQTT server: Out of memory." in caplog.text async def test_subscribe_error( hass: HomeAssistant, setup_with_birth_msg_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, caplog: pytest.LogCaptureFixture, ) -> None: """Test publish error.""" mqtt_client_mock = setup_with_birth_msg_client_mock mqtt_client_mock.reset_mock() # simulate client is not connected error before subscribing mqtt_client_mock.subscribe.side_effect = lambda *args: (4, None) await mqtt.async_subscribe(hass, "some-topic", record_calls) while mqtt_client_mock.subscribe.call_count == 0: await hass.async_block_till_done() await hass.async_block_till_done() assert ( "Error talking to MQTT: The client is not currently connected." in caplog.text ) async def test_handle_message_callback( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test for handling an incoming message callback.""" mqtt_client_mock = setup_with_birth_msg_client_mock callbacks = [] @callback def _callback(args) -> None: callbacks.append(args) msg = ReceiveMessage( "some-topic", b"test-payload", 1, False, "some-topic", time.monotonic() ) mock_debouncer.clear() await mqtt.async_subscribe(hass, "some-topic", _callback) await mock_debouncer.wait() mqtt_client_mock.reset_mock() mqtt_client_mock.on_message(None, None, msg) assert len(callbacks) == 1 assert callbacks[0].topic == "some-topic" assert callbacks[0].qos == 1 assert callbacks[0].payload == "test-payload" @pytest.mark.parametrize( ("mqtt_config_entry_data", "protocol"), [ ( { mqtt.CONF_BROKER: "mock-broker", CONF_PROTOCOL: "3.1", }, 3, ), ( { mqtt.CONF_BROKER: "mock-broker", CONF_PROTOCOL: "3.1.1", }, 4, ), ( { mqtt.CONF_BROKER: "mock-broker", CONF_PROTOCOL: "5", }, 5, ), ], ) async def test_setup_mqtt_client_protocol( mqtt_mock_entry: MqttMockHAClientGenerator, protocol: int ) -> None: """Test MQTT client protocol setup.""" with patch( "homeassistant.components.mqtt.async_client.AsyncMQTTClient" ) as mock_client: await mqtt_mock_entry() # check if protocol setup was correctly assert mock_client.call_args[1]["protocol"] == protocol @patch("homeassistant.components.mqtt.client.TIMEOUT_ACK", 0.2) async def test_handle_mqtt_timeout_on_callback( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_debouncer: asyncio.Event ) -> None: """Test publish without receiving an ACK callback.""" mid = 0 class FakeInfo: """Returns a simulated client publish response.""" mid = 102 rc = 0 with patch( "homeassistant.components.mqtt.async_client.AsyncMQTTClient" ) as mock_client: def _mock_ack(topic: str, qos: int = 0) -> tuple[int, int]: # Handle ACK for subscribe normally nonlocal mid mid += 1 mock_client.on_subscribe(0, 0, mid) return (0, mid) # We want to simulate the publish behaviour MQTT client mock_client = mock_client.return_value mock_client.publish.return_value = FakeInfo() # Mock we get a mid and rc=0 mock_client.subscribe.side_effect = _mock_ack mock_client.unsubscribe.side_effect = _mock_ack mock_client.connect = MagicMock( return_value=0, side_effect=lambda *args, **kwargs: hass.loop.call_soon_threadsafe( mock_client.on_connect, mock_client, None, 0, 0, 0 ), ) entry = MockConfigEntry( domain=mqtt.DOMAIN, data={mqtt.CONF_BROKER: "test-broker"} ) entry.add_to_hass(hass) # Set up the integration mock_debouncer.clear() assert await hass.config_entries.async_setup(entry.entry_id) # Now call we publish without simulating and ACK callback await mqtt.async_publish(hass, "no_callback/test-topic", "test-payload") await hass.async_block_till_done() # There is no ACK so we should see a timeout in the log after publishing assert len(mock_client.publish.mock_calls) == 1 assert "No ACK from MQTT server" in caplog.text # Ensure we stop lingering background tasks await hass.config_entries.async_unload(entry.entry_id) # Assert we did not have any completed subscribes, # because the debouncer subscribe job failed to receive an ACK, # and the time auto caused the debouncer job to fail. assert not mock_debouncer.is_set() async def test_setup_raises_config_entry_not_ready_if_no_connect_broker( hass: HomeAssistant, caplog: pytest.LogCaptureFixture ) -> None: """Test for setup failure if connection to broker is missing.""" entry = MockConfigEntry(domain=mqtt.DOMAIN, data={mqtt.CONF_BROKER: "test-broker"}) entry.add_to_hass(hass) with patch( "homeassistant.components.mqtt.async_client.AsyncMQTTClient" ) as mock_client: mock_client().connect = MagicMock(side_effect=OSError("Connection error")) assert await hass.config_entries.async_setup(entry.entry_id) await hass.async_block_till_done() assert "Failed to connect to MQTT server due to exception:" in caplog.text @pytest.mark.parametrize( ("mqtt_config_entry_data", "insecure_param"), [ ({"broker": "test-broker", "certificate": "auto"}, "not set"), ( {"broker": "test-broker", "certificate": "auto", "tls_insecure": False}, False, ), ({"broker": "test-broker", "certificate": "auto", "tls_insecure": True}, True), ], ) async def test_setup_uses_certificate_on_certificate_set_to_auto_and_insecure( hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator, insecure_param: bool | str, ) -> None: """Test setup uses bundled certs when certificate is set to auto and insecure.""" calls = [] insecure_check = {"insecure": "not set"} def mock_tls_set( certificate, certfile=None, keyfile=None, tls_version=None ) -> None: calls.append((certificate, certfile, keyfile, tls_version)) def mock_tls_insecure_set(insecure_param) -> None: insecure_check["insecure"] = insecure_param with patch( "homeassistant.components.mqtt.async_client.AsyncMQTTClient" ) as mock_client: mock_client().tls_set = mock_tls_set mock_client().tls_insecure_set = mock_tls_insecure_set await mqtt_mock_entry() await hass.async_block_till_done() assert calls expected_certificate = certifi.where() assert calls[0][0] == expected_certificate # test if insecure is set assert insecure_check["insecure"] == insecure_param @pytest.mark.parametrize( "mqtt_config_entry_data", [ { mqtt.CONF_BROKER: "mock-broker", mqtt.CONF_CERTIFICATE: "auto", } ], ) async def test_tls_version( hass: HomeAssistant, mqtt_client_mock: MqttMockPahoClient, mqtt_mock_entry: MqttMockHAClientGenerator, ) -> None: """Test setup defaults for tls.""" await mqtt_mock_entry() await hass.async_block_till_done() assert ( mqtt_client_mock.tls_set.mock_calls[0][2]["tls_version"] == ssl.PROTOCOL_TLS_CLIENT ) @pytest.mark.parametrize( "mqtt_config_entry_data", [ { mqtt.CONF_BROKER: "mock-broker", mqtt.CONF_BIRTH_MESSAGE: { mqtt.ATTR_TOPIC: "birth", mqtt.ATTR_PAYLOAD: "birth", mqtt.ATTR_QOS: 0, mqtt.ATTR_RETAIN: False, }, } ], ) @patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0) @patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0) @patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0) async def test_custom_birth_message( hass: HomeAssistant, mock_debouncer: asyncio.Event, mqtt_config_entry_data: dict[str, Any], mqtt_client_mock: MqttMockPahoClient, ) -> None: """Test sending birth message.""" entry = MockConfigEntry(domain=mqtt.DOMAIN, data=mqtt_config_entry_data) entry.add_to_hass(hass) hass.config.components.add(mqtt.DOMAIN) assert await hass.config_entries.async_setup(entry.entry_id) mock_debouncer.clear() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) # discovery cooldown await mock_debouncer.wait() # Wait for publish call to finish await hass.async_block_till_done(wait_background_tasks=True) mqtt_client_mock.publish.assert_called_with("birth", "birth", 0, False) @pytest.mark.parametrize( "mqtt_config_entry_data", [ENTRY_DEFAULT_BIRTH_MESSAGE], ) async def test_default_birth_message( hass: HomeAssistant, setup_with_birth_msg_client_mock: MqttMockPahoClient ) -> None: """Test sending birth message.""" mqtt_client_mock = setup_with_birth_msg_client_mock await hass.async_block_till_done(wait_background_tasks=True) mqtt_client_mock.publish.assert_called_with( "homeassistant/status", "online", 0, False ) @pytest.mark.parametrize( "mqtt_config_entry_data", [{mqtt.CONF_BROKER: "mock-broker", mqtt.CONF_BIRTH_MESSAGE: {}}], ) @patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0) @patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0) @patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0) async def test_no_birth_message( hass: HomeAssistant, record_calls: MessageCallbackType, mock_debouncer: asyncio.Event, mqtt_config_entry_data: dict[str, Any], mqtt_client_mock: MqttMockPahoClient, ) -> None: """Test disabling birth message.""" entry = MockConfigEntry(domain=mqtt.DOMAIN, data=mqtt_config_entry_data) entry.add_to_hass(hass) hass.config.components.add(mqtt.DOMAIN) mock_debouncer.clear() assert await hass.config_entries.async_setup(entry.entry_id) # Wait for discovery cooldown await mock_debouncer.wait() # Ensure any publishing could have been processed await hass.async_block_till_done(wait_background_tasks=True) mqtt_client_mock.publish.assert_not_called() mqtt_client_mock.reset_mock() mock_debouncer.clear() await mqtt.async_subscribe(hass, "homeassistant/some-topic", record_calls) # Wait for discovery cooldown await mock_debouncer.wait() mqtt_client_mock.subscribe.assert_called() @pytest.mark.parametrize( "mqtt_config_entry_data", [ENTRY_DEFAULT_BIRTH_MESSAGE], ) @patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.2) async def test_delayed_birth_message( hass: HomeAssistant, mqtt_config_entry_data: dict[str, Any], mqtt_client_mock: MqttMockPahoClient, ) -> None: """Test sending birth message does not happen until Home Assistant starts.""" hass.set_state(CoreState.starting) await hass.async_block_till_done() birth = asyncio.Event() entry = MockConfigEntry(domain=mqtt.DOMAIN, data=mqtt_config_entry_data) entry.add_to_hass(hass) hass.config.components.add(mqtt.DOMAIN) assert await hass.config_entries.async_setup(entry.entry_id) @callback def wait_birth(msg: ReceiveMessage) -> None: """Handle birth message.""" birth.set() await mqtt.async_subscribe(hass, "homeassistant/status", wait_birth) with pytest.raises(TimeoutError): await asyncio.wait_for(birth.wait(), 0.05) assert not mqtt_client_mock.publish.called assert not birth.is_set() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await birth.wait() mqtt_client_mock.publish.assert_called_with( "homeassistant/status", "online", 0, False ) @pytest.mark.parametrize( "mqtt_config_entry_data", [ENTRY_DEFAULT_BIRTH_MESSAGE], ) async def test_subscription_done_when_birth_message_is_sent( setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test sending birth message until initial subscription has been completed.""" mqtt_client_mock = setup_with_birth_msg_client_mock subscribe_calls = help_all_subscribe_calls(mqtt_client_mock) for component in SUPPORTED_COMPONENTS: assert (f"homeassistant/{component}/+/config", 0) in subscribe_calls assert (f"homeassistant/{component}/+/+/config", 0) in subscribe_calls mqtt_client_mock.publish.assert_called_with( "homeassistant/status", "online", 0, False ) @pytest.mark.parametrize( "mqtt_config_entry_data", [ { mqtt.CONF_BROKER: "mock-broker", mqtt.CONF_WILL_MESSAGE: { mqtt.ATTR_TOPIC: "death", mqtt.ATTR_PAYLOAD: "death", mqtt.ATTR_QOS: 0, mqtt.ATTR_RETAIN: False, }, } ], ) async def test_custom_will_message( hass: HomeAssistant, mqtt_config_entry_data: dict[str, Any], mqtt_client_mock: MqttMockPahoClient, ) -> None: """Test will message.""" entry = MockConfigEntry(domain=mqtt.DOMAIN, data=mqtt_config_entry_data) entry.add_to_hass(hass) hass.config.components.add(mqtt.DOMAIN) assert await hass.config_entries.async_setup(entry.entry_id) await hass.async_block_till_done() mqtt_client_mock.will_set.assert_called_with( topic="death", payload="death", qos=0, retain=False ) async def test_default_will_message( setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test will message.""" mqtt_client_mock = setup_with_birth_msg_client_mock mqtt_client_mock.will_set.assert_called_with( topic="homeassistant/status", payload="offline", qos=0, retain=False ) @pytest.mark.parametrize( "mqtt_config_entry_data", [{mqtt.CONF_BROKER: "mock-broker", mqtt.CONF_WILL_MESSAGE: {}}], ) async def test_no_will_message( hass: HomeAssistant, mqtt_config_entry_data: dict[str, Any], mqtt_client_mock: MqttMockPahoClient, ) -> None: """Test will message.""" entry = MockConfigEntry(domain=mqtt.DOMAIN, data=mqtt_config_entry_data) entry.add_to_hass(hass) hass.config.components.add(mqtt.DOMAIN) assert await hass.config_entries.async_setup(entry.entry_id) await hass.async_block_till_done() mqtt_client_mock.will_set.assert_not_called() @pytest.mark.parametrize( "mqtt_config_entry_data", [ENTRY_DEFAULT_BIRTH_MESSAGE | {mqtt.CONF_DISCOVERY: False}], ) async def test_mqtt_subscribes_topics_on_connect( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, ) -> None: """Test subscription to topic on connect.""" mqtt_client_mock = setup_with_birth_msg_client_mock mock_debouncer.clear() await mqtt.async_subscribe(hass, "topic/test", record_calls) await mqtt.async_subscribe(hass, "home/sensor", record_calls, 2) await mqtt.async_subscribe(hass, "still/pending", record_calls) await mqtt.async_subscribe(hass, "still/pending", record_calls, 1) await mock_debouncer.wait() mqtt_client_mock.on_disconnect(Mock(), None, 0) mqtt_client_mock.reset_mock() mock_debouncer.clear() mqtt_client_mock.on_connect(Mock(), None, 0, 0) await mock_debouncer.wait() subscribe_calls = help_all_subscribe_calls(mqtt_client_mock) assert ("topic/test", 0) in subscribe_calls assert ("home/sensor", 2) in subscribe_calls assert ("still/pending", 1) in subscribe_calls @pytest.mark.parametrize( "mqtt_config_entry_data", [ENTRY_DEFAULT_BIRTH_MESSAGE | {mqtt.CONF_DISCOVERY: False}], ) async def test_mqtt_discovery_not_subscribes_when_disabled( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test discovery subscriptions not performend when discovery is disabled.""" mqtt_client_mock = setup_with_birth_msg_client_mock await mock_debouncer.wait() subscribe_calls = help_all_subscribe_calls(mqtt_client_mock) for component in SUPPORTED_COMPONENTS: assert (f"homeassistant/{component}/+/config", 0) not in subscribe_calls assert (f"homeassistant/{component}/+/+/config", 0) not in subscribe_calls mqtt_client_mock.on_disconnect(Mock(), None, 0) mqtt_client_mock.reset_mock() mock_debouncer.clear() mqtt_client_mock.on_connect(Mock(), None, 0, 0) await mock_debouncer.wait() subscribe_calls = help_all_subscribe_calls(mqtt_client_mock) for component in SUPPORTED_COMPONENTS: assert (f"homeassistant/{component}/+/config", 0) not in subscribe_calls assert (f"homeassistant/{component}/+/+/config", 0) not in subscribe_calls @pytest.mark.parametrize( "mqtt_config_entry_data", [ENTRY_DEFAULT_BIRTH_MESSAGE], ) async def test_mqtt_subscribes_in_single_call( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, ) -> None: """Test bundled client subscription to topic.""" mqtt_client_mock = setup_with_birth_msg_client_mock mqtt_client_mock.subscribe.reset_mock() mock_debouncer.clear() await mqtt.async_subscribe(hass, "topic/test", record_calls) await mqtt.async_subscribe(hass, "home/sensor", record_calls) # Make sure the debouncer finishes await mock_debouncer.wait() assert mqtt_client_mock.subscribe.call_count == 1 # Assert we have a single subscription call with both subscriptions assert mqtt_client_mock.subscribe.mock_calls[0][1][0] in [ [("topic/test", 0), ("home/sensor", 0)], [("home/sensor", 0), ("topic/test", 0)], ] @pytest.mark.parametrize("mqtt_config_entry_data", [ENTRY_DEFAULT_BIRTH_MESSAGE]) @patch("homeassistant.components.mqtt.client.MAX_SUBSCRIBES_PER_CALL", 2) @patch("homeassistant.components.mqtt.client.MAX_UNSUBSCRIBES_PER_CALL", 2) async def test_mqtt_subscribes_and_unsubscribes_in_chunks( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, ) -> None: """Test chunked client subscriptions.""" mqtt_client_mock = setup_with_birth_msg_client_mock mqtt_client_mock.subscribe.reset_mock() unsub_tasks: list[CALLBACK_TYPE] = [] mock_debouncer.clear() unsub_tasks.append(await mqtt.async_subscribe(hass, "topic/test1", record_calls)) unsub_tasks.append(await mqtt.async_subscribe(hass, "home/sensor1", record_calls)) unsub_tasks.append(await mqtt.async_subscribe(hass, "topic/test2", record_calls)) unsub_tasks.append(await mqtt.async_subscribe(hass, "home/sensor2", record_calls)) # Make sure the debouncer finishes await mock_debouncer.wait() assert mqtt_client_mock.subscribe.call_count == 2 # Assert we have a 2 subscription calls with both 2 subscriptions assert len(mqtt_client_mock.subscribe.mock_calls[0][1][0]) == 2 assert len(mqtt_client_mock.subscribe.mock_calls[1][1][0]) == 2 # Unsubscribe all topics mock_debouncer.clear() for task in unsub_tasks: task() # Make sure the debouncer finishes await mock_debouncer.wait() assert mqtt_client_mock.unsubscribe.call_count == 2 # Assert we have a 2 unsubscribe calls with both 2 topic assert len(mqtt_client_mock.unsubscribe.mock_calls[0][1][0]) == 2 assert len(mqtt_client_mock.unsubscribe.mock_calls[1][1][0]) == 2 async def test_auto_reconnect( hass: HomeAssistant, setup_with_birth_msg_client_mock: MqttMockPahoClient, caplog: pytest.LogCaptureFixture, ) -> None: """Test reconnection is automatically done.""" mqtt_client_mock = setup_with_birth_msg_client_mock assert mqtt_client_mock.connect.call_count == 1 mqtt_client_mock.reconnect.reset_mock() mqtt_client_mock.disconnect() mqtt_client_mock.on_disconnect(None, None, 0) await hass.async_block_till_done() mqtt_client_mock.reconnect.side_effect = OSError("foo") async_fire_time_changed( hass, utcnow() + timedelta(seconds=RECONNECT_INTERVAL_SECONDS) ) await hass.async_block_till_done() assert len(mqtt_client_mock.reconnect.mock_calls) == 1 assert "Error re-connecting to MQTT server due to exception: foo" in caplog.text mqtt_client_mock.reconnect.side_effect = None async_fire_time_changed( hass, utcnow() + timedelta(seconds=RECONNECT_INTERVAL_SECONDS) ) await hass.async_block_till_done() assert len(mqtt_client_mock.reconnect.mock_calls) == 2 hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP) mqtt_client_mock.disconnect() mqtt_client_mock.on_disconnect(None, None, 0) await hass.async_block_till_done() async_fire_time_changed( hass, utcnow() + timedelta(seconds=RECONNECT_INTERVAL_SECONDS) ) await hass.async_block_till_done() # Should not reconnect after stop assert len(mqtt_client_mock.reconnect.mock_calls) == 2 async def test_server_sock_connect_and_disconnect( hass: HomeAssistant, mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test handling the socket connected and disconnected.""" mqtt_client_mock = setup_with_birth_msg_client_mock assert mqtt_client_mock.connect.call_count == 1 mqtt_client_mock.loop_misc.return_value = paho_mqtt.MQTT_ERR_SUCCESS client, server = socket.socketpair( family=socket.AF_UNIX, type=socket.SOCK_STREAM, proto=0 ) client.setblocking(False) server.setblocking(False) mqtt_client_mock.on_socket_open(mqtt_client_mock, None, client) mqtt_client_mock.on_socket_register_write(mqtt_client_mock, None, client) await hass.async_block_till_done() server.close() # mock the server closing the connection on us mock_debouncer.clear() unsub = await mqtt.async_subscribe(hass, "test-topic", record_calls) await mock_debouncer.wait() mqtt_client_mock.loop_misc.return_value = paho_mqtt.MQTT_ERR_CONN_LOST mqtt_client_mock.on_socket_unregister_write(mqtt_client_mock, None, client) mqtt_client_mock.on_socket_close(mqtt_client_mock, None, client) mqtt_client_mock.on_disconnect(mqtt_client_mock, None, client) await hass.async_block_till_done() mock_debouncer.clear() unsub() await hass.async_block_till_done() assert not mock_debouncer.is_set() # Should have failed assert len(recorded_calls) == 0 async def test_server_sock_buffer_size( hass: HomeAssistant, setup_with_birth_msg_client_mock: MqttMockPahoClient, caplog: pytest.LogCaptureFixture, ) -> None: """Test handling the socket buffer size fails.""" mqtt_client_mock = setup_with_birth_msg_client_mock assert mqtt_client_mock.connect.call_count == 1 mqtt_client_mock.loop_misc.return_value = paho_mqtt.MQTT_ERR_SUCCESS client, server = socket.socketpair( family=socket.AF_UNIX, type=socket.SOCK_STREAM, proto=0 ) client.setblocking(False) server.setblocking(False) with patch.object(client, "setsockopt", side_effect=OSError("foo")): mqtt_client_mock.on_socket_open(mqtt_client_mock, None, client) mqtt_client_mock.on_socket_register_write(mqtt_client_mock, None, client) await hass.async_block_till_done() assert "Unable to increase the socket buffer size" in caplog.text async def test_server_sock_buffer_size_with_websocket( hass: HomeAssistant, setup_with_birth_msg_client_mock: MqttMockPahoClient, caplog: pytest.LogCaptureFixture, ) -> None: """Test handling the socket buffer size fails.""" mqtt_client_mock = setup_with_birth_msg_client_mock assert mqtt_client_mock.connect.call_count == 1 mqtt_client_mock.loop_misc.return_value = paho_mqtt.MQTT_ERR_SUCCESS client, server = socket.socketpair( family=socket.AF_UNIX, type=socket.SOCK_STREAM, proto=0 ) client.setblocking(False) server.setblocking(False) class FakeWebsocket(paho_mqtt.WebsocketWrapper): def _do_handshake(self, *args, **kwargs): pass wrapped_socket = FakeWebsocket(client, "127.0.01", 1, False, "/", None) with patch.object(client, "setsockopt", side_effect=OSError("foo")): mqtt_client_mock.on_socket_open(mqtt_client_mock, None, wrapped_socket) mqtt_client_mock.on_socket_register_write( mqtt_client_mock, None, wrapped_socket ) await hass.async_block_till_done() assert "Unable to increase the socket buffer size" in caplog.text async def test_client_sock_failure_after_connect( hass: HomeAssistant, setup_with_birth_msg_client_mock: MqttMockPahoClient, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test handling the socket connected and disconnected.""" mqtt_client_mock = setup_with_birth_msg_client_mock assert mqtt_client_mock.connect.call_count == 1 mqtt_client_mock.loop_misc.return_value = paho_mqtt.MQTT_ERR_SUCCESS client, server = socket.socketpair( family=socket.AF_UNIX, type=socket.SOCK_STREAM, proto=0 ) client.setblocking(False) server.setblocking(False) mqtt_client_mock.on_socket_open(mqtt_client_mock, None, client) mqtt_client_mock.on_socket_register_writer(mqtt_client_mock, None, client) await hass.async_block_till_done() mqtt_client_mock.loop_write.side_effect = OSError("foo") client.close() # close the client socket out from under the client assert mqtt_client_mock.connect.call_count == 1 unsub = await mqtt.async_subscribe(hass, "test-topic", record_calls) async_fire_time_changed(hass, utcnow() + timedelta(seconds=5)) await hass.async_block_till_done() unsub() # Should have failed assert len(recorded_calls) == 0 async def test_loop_write_failure( hass: HomeAssistant, setup_with_birth_msg_client_mock: MqttMockPahoClient, caplog: pytest.LogCaptureFixture, ) -> None: """Test handling the socket connected and disconnected.""" mqtt_client_mock = setup_with_birth_msg_client_mock assert mqtt_client_mock.connect.call_count == 1 mqtt_client_mock.loop_misc.return_value = paho_mqtt.MQTT_ERR_SUCCESS client, server = socket.socketpair( family=socket.AF_UNIX, type=socket.SOCK_STREAM, proto=0 ) client.setblocking(False) server.setblocking(False) mqtt_client_mock.on_socket_open(mqtt_client_mock, None, client) mqtt_client_mock.on_socket_register_write(mqtt_client_mock, None, client) mqtt_client_mock.loop_write.return_value = paho_mqtt.MQTT_ERR_CONN_LOST mqtt_client_mock.loop_read.return_value = paho_mqtt.MQTT_ERR_CONN_LOST # Fill up the outgoing buffer to ensure that loop_write # and loop_read are called that next time control is # returned to the event loop try: for _ in range(1000): server.send(b"long" * 100) except BlockingIOError: pass server.close() # Once for the reader callback await hass.async_block_till_done() # Another for the writer callback await hass.async_block_till_done() # Final for the disconnect callback await hass.async_block_till_done() assert "Disconnected from MQTT server test-broker:1883" in caplog.text