diff --git a/homeassistant/components/mqtt/client.py b/homeassistant/components/mqtt/client.py index 3bd08403b78..e8eabe887f2 100644 --- a/homeassistant/components/mqtt/client.py +++ b/homeassistant/components/mqtt/client.py @@ -376,6 +376,11 @@ class MQTT: self._simple_subscriptions: dict[str, list[Subscription]] = {} self._wildcard_subscriptions: list[Subscription] = [] + # _retained_topics prevents a Subscription from receiving a + # retained message more than once per topic. This prevents flooding + # already active subscribers when new subscribers subscribe to a topic + # which has subscribed messages. + self._retained_topics: dict[Subscription, set[str]] = {} self.connected = False self._ha_started = asyncio.Event() self._cleanup_on_unload: list[Callable[[], None]] = [] @@ -618,6 +623,8 @@ class MQTT: """Remove subscription.""" self._async_untrack_subscription(subscription) self._matching_subscriptions.cache_clear() + if subscription in self._retained_topics: + del self._retained_topics[subscription] # Only unsubscribe if currently connected if self.connected: self._async_unsubscribe(topic) @@ -637,7 +644,7 @@ class MQTT: if topic in self._max_qos: del self._max_qos[topic] if topic in self._pending_subscriptions: - # avoid any pending subscription to be executed + # Avoid any pending subscription to be executed del self._pending_subscriptions[topic] self._pending_unsubscribes.add(topic) @@ -754,8 +761,9 @@ class MQTT: async def _async_resubscribe(self) -> None: """Resubscribe on reconnect.""" - # Group subscriptions to only re-subscribe once for each topic. self._max_qos.clear() + self._retained_topics.clear() + # Group subscriptions to only re-subscribe once for each topic. keyfunc = attrgetter("topic") self._async_queue_subscriptions( [ @@ -799,6 +807,14 @@ class MQTT: subscriptions = self._matching_subscriptions(msg.topic) for subscription in subscriptions: + if msg.retain: + retained_topics = self._retained_topics.setdefault(subscription, set()) + # Skip if the subscription already received a retained message + if msg.topic in retained_topics: + continue + # Remember the subscription had an initial retained message + self._retained_topics[subscription].add(msg.topic) + payload: SubscribePayloadType = msg.payload if subscription.encoding is not None: try: diff --git a/tests/components/mqtt/test_init.py b/tests/components/mqtt/test_init.py index 5494b24c398..fa8d2e42dda 100644 --- a/tests/components/mqtt/test_init.py +++ b/tests/components/mqtt/test_init.py @@ -114,6 +114,26 @@ def record_calls(calls: list[ReceiveMessage]) -> MessageCallbackType: return record_calls +def help_assert_message( + msg: ReceiveMessage, + topic: str | None = None, + payload: str | None = None, + qos: int | None = None, + retain: bool | None = None, +) -> bool: + """Return True if all of the given attributes match with the message.""" + match: bool = True + if topic is not None: + match &= msg.topic == topic + if payload is not None: + match &= msg.payload == payload + if qos is not None: + match &= msg.qos == qos + if retain is not None: + match &= msg.retain == retain + return match + + async def test_mqtt_connects_on_home_assistant_mqtt_setup( hass: HomeAssistant, mqtt_client_mock: MqttMockPahoClient, @@ -1272,9 +1292,8 @@ async def test_subscribe_same_topic( calls_b.append(msg) await mqtt.async_subscribe(hass, "test/state", _callback_a, qos=0) - async_fire_mqtt_message( - hass, "test/state", "online" - ) # Simulate a (retained) message replaying + # Simulate a non retained message after the first subscription + async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False) async_fire_time_changed(hass, utcnow() + timedelta(seconds=1)) await hass.async_block_till_done() assert len(calls_a) == 1 @@ -1285,18 +1304,271 @@ async def test_subscribe_same_topic( async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) await hass.async_block_till_done() await mqtt.async_subscribe(hass, "test/state", _callback_b, qos=1) - async_fire_mqtt_message( - hass, "test/state", "online" - ) # Simulate a (retained) message replaying + # Simulate an other non retained message after the second subscription + async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False) async_fire_time_changed(hass, utcnow() + timedelta(seconds=1)) await hass.async_block_till_done() async_fire_time_changed(hass, utcnow() + timedelta(seconds=1)) await hass.async_block_till_done() + # Both subscriptions should receive updates assert len(calls_a) == 1 assert len(calls_b) == 1 mqtt_client_mock.subscribe.assert_called() +@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0) +@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0) +@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0) +async def test_replaying_payload_same_topic( + hass: HomeAssistant, + mqtt_client_mock: MqttMockPahoClient, + mqtt_mock_entry: MqttMockHAClientGenerator, +) -> None: + """Test replaying retained messages. + + When subscribing to the same topic again, SUBSCRIBE must be sent to the broker again + for it to resend any retained messages for new subscriptions. + Retained messages must only be replayed for new subscriptions, except + when the MQTT client is reconnecting. + """ + mqtt_mock = await mqtt_mock_entry() + + # Fake that the client is connected + mqtt_mock().connected = True + + calls_a: list[ReceiveMessage] = [] + calls_b: list[ReceiveMessage] = [] + + def _callback_a(msg: ReceiveMessage) -> None: + calls_a.append(msg) + + def _callback_b(msg: ReceiveMessage) -> None: + calls_b.append(msg) + + await mqtt.async_subscribe(hass, "test/state", _callback_a) + async_fire_mqtt_message( + hass, "test/state", "online", qos=0, retain=True + ) # Simulate a (retained) message played back + await hass.async_block_till_done() + assert len(calls_a) == 1 + mqtt_client_mock.subscribe.assert_called() + calls_a = [] + mqtt_client_mock.reset_mock() + + await mqtt.async_subscribe(hass, "test/state", _callback_b) + + # Simulate edge case where non retained message was received + # after subscription at HA but before the debouncer delay was passed. + # The message without retain flag directly after a subscription should + # be processed by both subscriptions. + async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False) + + # Simulate a (retained) message played back on new subscriptions + async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True) + + # Make sure the debouncer delay was passed + await hass.async_block_till_done() + async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) + await hass.async_block_till_done() + + # The current subscription only received the message without retain flag + assert len(calls_a) == 1 + assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=False) + # The retained message playback should only be processed by the new subscription. + # The existing subscription already got the latest update, hence the existing + # subscription should not receive the replayed (retained) message. + # Messages without retain flag are received on both subscriptions. + assert len(calls_b) == 2 + assert help_assert_message(calls_b[0], "test/state", "online", qos=0, retain=False) + assert help_assert_message(calls_b[1], "test/state", "online", qos=0, retain=True) + mqtt_client_mock.subscribe.assert_called() + + calls_a = [] + calls_b = [] + mqtt_client_mock.reset_mock() + + # Simulate new message played back on new subscriptions + # After connecting the retain flag will not be set, even if the + # payload published was retained, we cannot see that + async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False) + await hass.async_block_till_done() + async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) + await hass.async_block_till_done() + assert len(calls_a) == 1 + assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=False) + assert len(calls_b) == 1 + assert help_assert_message(calls_b[0], "test/state", "online", qos=0, retain=False) + + # Now simulate the broker was disconnected shortly + calls_a = [] + calls_b = [] + mqtt_client_mock.reset_mock() + mqtt_client_mock.on_disconnect(None, None, 0) + mqtt_client_mock.on_connect(None, None, None, 0) + await hass.async_block_till_done() + mqtt_client_mock.subscribe.assert_called() + # Simulate a (retained) message played back after reconnecting + async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True) + await hass.async_block_till_done() + async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) + await hass.async_block_till_done() + # Both subscriptions now should replay the retained message + assert len(calls_a) == 1 + assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True) + assert len(calls_b) == 1 + assert help_assert_message(calls_b[0], "test/state", "online", qos=0, retain=True) + + +@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0) +@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0) +@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0) +async def test_replaying_payload_after_resubscribing( + hass: HomeAssistant, + mqtt_client_mock: MqttMockPahoClient, + mqtt_mock_entry: MqttMockHAClientGenerator, +) -> None: + """Test replaying and filtering retained messages after resubscribing. + + When subscribing to the same topic again, SUBSCRIBE must be sent to the broker again + for it to resend any retained messages for new subscriptions. + Retained messages must only be replayed for new subscriptions, except + when the MQTT client is reconnection. + """ + mqtt_mock = await mqtt_mock_entry() + + # Fake that the client is connected + mqtt_mock().connected = True + + calls_a: list[ReceiveMessage] = [] + + def _callback_a(msg: ReceiveMessage) -> None: + calls_a.append(msg) + + unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a) + await hass.async_block_till_done() + async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) + await hass.async_block_till_done() + mqtt_client_mock.subscribe.assert_called() + + # Simulate a (retained) message played back + async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True) + await hass.async_block_till_done() + assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True) + calls_a.clear() + + # Test we get updates + async_fire_mqtt_message(hass, "test/state", "offline", qos=0, retain=False) + await hass.async_block_till_done() + assert help_assert_message(calls_a[0], "test/state", "offline", qos=0, retain=False) + calls_a.clear() + + # Test we filter new retained updates + async_fire_mqtt_message(hass, "test/state", "offline", qos=0, retain=True) + await hass.async_block_till_done() + assert len(calls_a) == 0 + + # Unsubscribe an resubscribe again + unsub() + unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a) + await hass.async_block_till_done() + async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) + await hass.async_block_till_done() + mqtt_client_mock.subscribe.assert_called() + + # Simulate we can receive a (retained) played back message again + async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True) + await hass.async_block_till_done() + assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True) + + +@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0) +@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0) +@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0) +async def test_replaying_payload_wildcard_topic( + hass: HomeAssistant, + mqtt_client_mock: MqttMockPahoClient, + mqtt_mock_entry: MqttMockHAClientGenerator, +) -> None: + """Test replaying retained messages. + + When we have multiple subscriptions to the same wildcard topic, + SUBSCRIBE must be sent to the broker again + for it to resend any retained messages for new subscriptions. + Retained messages should only be replayed for new subscriptions, except + when the MQTT client is reconnection. + """ + mqtt_mock = await mqtt_mock_entry() + + # Fake that the client is connected + mqtt_mock().connected = True + + calls_a: list[ReceiveMessage] = [] + calls_b: list[ReceiveMessage] = [] + + def _callback_a(msg: ReceiveMessage) -> None: + calls_a.append(msg) + + def _callback_b(msg: ReceiveMessage) -> None: + calls_b.append(msg) + + await mqtt.async_subscribe(hass, "test/#", _callback_a) + # Simulate (retained) messages being played back on new subscriptions + async_fire_mqtt_message(hass, "test/state1", "new_value_1", qos=0, retain=True) + async_fire_mqtt_message(hass, "test/state2", "new_value_2", qos=0, retain=True) + await hass.async_block_till_done() + async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown + await hass.async_block_till_done() + assert len(calls_a) == 2 + mqtt_client_mock.subscribe.assert_called() + calls_a = [] + mqtt_client_mock.reset_mock() + + # resubscribe to the wild card topic again + await mqtt.async_subscribe(hass, "test/#", _callback_b) + # Simulate (retained) messages being played back on new subscriptions + async_fire_mqtt_message(hass, "test/state1", "initial_value_1", qos=0, retain=True) + async_fire_mqtt_message(hass, "test/state2", "initial_value_2", qos=0, retain=True) + await hass.async_block_till_done() + async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown + await hass.async_block_till_done() + # The retained messages playback should only be processed for the new subscriptions + assert len(calls_a) == 0 + assert len(calls_b) == 2 + mqtt_client_mock.subscribe.assert_called() + + calls_a = [] + calls_b = [] + mqtt_client_mock.reset_mock() + + # Simulate new messages being received + async_fire_mqtt_message(hass, "test/state1", "update_value_1", qos=0, retain=False) + async_fire_mqtt_message(hass, "test/state2", "update_value_2", qos=0, retain=False) + await hass.async_block_till_done() + assert len(calls_a) == 2 + assert len(calls_b) == 2 + + # Now simulate the broker was disconnected shortly + calls_a = [] + calls_b = [] + mqtt_client_mock.reset_mock() + mqtt_client_mock.on_disconnect(None, None, 0) + mqtt_client_mock.on_connect(None, None, None, 0) + await hass.async_block_till_done() + async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown + await hass.async_block_till_done() + mqtt_client_mock.subscribe.assert_called() + # Simulate the (retained) messages are played back after reconnecting + # for all subscriptions + async_fire_mqtt_message(hass, "test/state1", "update_value_1", qos=0, retain=True) + async_fire_mqtt_message(hass, "test/state2", "update_value_2", qos=0, retain=True) + await hass.async_block_till_done() + async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown + await hass.async_block_till_done() + # Both subscriptions should replay + assert len(calls_a) == 2 + assert len(calls_b) == 2 + + @patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0) @patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0) @patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0) @@ -3032,33 +3304,62 @@ async def test_debug_info_qos_retain( start_dt = datetime(2019, 1, 1, 0, 0, 0) with patch("homeassistant.util.dt.utcnow") as dt_utcnow: dt_utcnow.return_value = start_dt + # simulate the first message was replayed from the broker with retained flag + async_fire_mqtt_message(hass, "sensor/abc", "123", qos=0, retain=True) + # simulate an update message async_fire_mqtt_message(hass, "sensor/abc", "123", qos=0, retain=False) + # simpulate someone else subscribed and retained messages were replayed async_fire_mqtt_message(hass, "sensor/abc", "123", qos=1, retain=True) + # simulate an update message + async_fire_mqtt_message(hass, "sensor/abc", "123", qos=1, retain=False) + # simulate an update message async_fire_mqtt_message(hass, "sensor/abc", "123", qos=2, retain=False) debug_info_data = debug_info.info_for_device(hass, device.id) assert len(debug_info_data["entities"][0]["subscriptions"]) == 1 + # The replayed retained payload was processed + messages = debug_info_data["entities"][0]["subscriptions"][0]["messages"] + assert { + "payload": "123", + "qos": 0, + "retain": True, + "time": start_dt, + "topic": "sensor/abc", + } in messages + # The not retained update was processed normally assert { "payload": "123", "qos": 0, "retain": False, "time": start_dt, "topic": "sensor/abc", - } in debug_info_data["entities"][0]["subscriptions"][0]["messages"] + } in messages + # Since the MQTT client has not lost the connection and has not resubscribed + # The retained payload is not replayed and filtered out as it already + # received a value and appears to be received on an existing subscription assert { "payload": "123", "qos": 1, "retain": True, "time": start_dt, "topic": "sensor/abc", - } in debug_info_data["entities"][0]["subscriptions"][0]["messages"] + } not in messages + # The not retained update was processed normally + assert { + "payload": "123", + "qos": 1, + "retain": False, + "time": start_dt, + "topic": "sensor/abc", + } in messages + # The not retained update was processed normally assert { "payload": "123", "qos": 2, "retain": False, "time": start_dt, "topic": "sensor/abc", - } in debug_info_data["entities"][0]["subscriptions"][0]["messages"] + } in messages async def test_publish_json_from_template(