Filter replaying unrelated retained MQTT messages when subscribing to share topics (#88826)

* Do not replay already processed retained subscr.

* Add tests

* Always replay wildcards

* Update tests for debouncer

* Rework for retained topics

* Fix test

* Correct comment

* Add cleanup and test

* Fix key error

* Correct helper

* Rename mock

* Add comment on function _retained_init

* Always replay initial retained payload

* Apply suggestion moving msg.retain to outer check

* Improve test on edge case

* Improve comment formatting

* Follow up comment - improve comments on test

* Update homeassistant/components/mqtt/client.py

Co-authored-by: Erik Montnemery <erik@montnemery.com>

---------

Co-authored-by: Erik Montnemery <erik@montnemery.com>
pull/92967/head^2
Jan Bouwhuis 2023-05-12 15:23:05 +02:00 committed by GitHub
parent bd7e943efe
commit a05c20a498
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 328 additions and 11 deletions

View File

@ -376,6 +376,11 @@ class MQTT:
self._simple_subscriptions: dict[str, list[Subscription]] = {}
self._wildcard_subscriptions: list[Subscription] = []
# _retained_topics prevents a Subscription from receiving a
# retained message more than once per topic. This prevents flooding
# already active subscribers when new subscribers subscribe to a topic
# which has subscribed messages.
self._retained_topics: dict[Subscription, set[str]] = {}
self.connected = False
self._ha_started = asyncio.Event()
self._cleanup_on_unload: list[Callable[[], None]] = []
@ -618,6 +623,8 @@ class MQTT:
"""Remove subscription."""
self._async_untrack_subscription(subscription)
self._matching_subscriptions.cache_clear()
if subscription in self._retained_topics:
del self._retained_topics[subscription]
# Only unsubscribe if currently connected
if self.connected:
self._async_unsubscribe(topic)
@ -637,7 +644,7 @@ class MQTT:
if topic in self._max_qos:
del self._max_qos[topic]
if topic in self._pending_subscriptions:
# avoid any pending subscription to be executed
# Avoid any pending subscription to be executed
del self._pending_subscriptions[topic]
self._pending_unsubscribes.add(topic)
@ -754,8 +761,9 @@ class MQTT:
async def _async_resubscribe(self) -> None:
"""Resubscribe on reconnect."""
# Group subscriptions to only re-subscribe once for each topic.
self._max_qos.clear()
self._retained_topics.clear()
# Group subscriptions to only re-subscribe once for each topic.
keyfunc = attrgetter("topic")
self._async_queue_subscriptions(
[
@ -799,6 +807,14 @@ class MQTT:
subscriptions = self._matching_subscriptions(msg.topic)
for subscription in subscriptions:
if msg.retain:
retained_topics = self._retained_topics.setdefault(subscription, set())
# Skip if the subscription already received a retained message
if msg.topic in retained_topics:
continue
# Remember the subscription had an initial retained message
self._retained_topics[subscription].add(msg.topic)
payload: SubscribePayloadType = msg.payload
if subscription.encoding is not None:
try:

View File

@ -114,6 +114,26 @@ def record_calls(calls: list[ReceiveMessage]) -> MessageCallbackType:
return record_calls
def help_assert_message(
msg: ReceiveMessage,
topic: str | None = None,
payload: str | None = None,
qos: int | None = None,
retain: bool | None = None,
) -> bool:
"""Return True if all of the given attributes match with the message."""
match: bool = True
if topic is not None:
match &= msg.topic == topic
if payload is not None:
match &= msg.payload == payload
if qos is not None:
match &= msg.qos == qos
if retain is not None:
match &= msg.retain == retain
return match
async def test_mqtt_connects_on_home_assistant_mqtt_setup(
hass: HomeAssistant,
mqtt_client_mock: MqttMockPahoClient,
@ -1272,9 +1292,8 @@ async def test_subscribe_same_topic(
calls_b.append(msg)
await mqtt.async_subscribe(hass, "test/state", _callback_a, qos=0)
async_fire_mqtt_message(
hass, "test/state", "online"
) # Simulate a (retained) message replaying
# Simulate a non retained message after the first subscription
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False)
async_fire_time_changed(hass, utcnow() + timedelta(seconds=1))
await hass.async_block_till_done()
assert len(calls_a) == 1
@ -1285,18 +1304,271 @@ async def test_subscribe_same_topic(
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
await hass.async_block_till_done()
await mqtt.async_subscribe(hass, "test/state", _callback_b, qos=1)
async_fire_mqtt_message(
hass, "test/state", "online"
) # Simulate a (retained) message replaying
# Simulate an other non retained message after the second subscription
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False)
async_fire_time_changed(hass, utcnow() + timedelta(seconds=1))
await hass.async_block_till_done()
async_fire_time_changed(hass, utcnow() + timedelta(seconds=1))
await hass.async_block_till_done()
# Both subscriptions should receive updates
assert len(calls_a) == 1
assert len(calls_b) == 1
mqtt_client_mock.subscribe.assert_called()
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0)
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
async def test_replaying_payload_same_topic(
hass: HomeAssistant,
mqtt_client_mock: MqttMockPahoClient,
mqtt_mock_entry: MqttMockHAClientGenerator,
) -> None:
"""Test replaying retained messages.
When subscribing to the same topic again, SUBSCRIBE must be sent to the broker again
for it to resend any retained messages for new subscriptions.
Retained messages must only be replayed for new subscriptions, except
when the MQTT client is reconnecting.
"""
mqtt_mock = await mqtt_mock_entry()
# Fake that the client is connected
mqtt_mock().connected = True
calls_a: list[ReceiveMessage] = []
calls_b: list[ReceiveMessage] = []
def _callback_a(msg: ReceiveMessage) -> None:
calls_a.append(msg)
def _callback_b(msg: ReceiveMessage) -> None:
calls_b.append(msg)
await mqtt.async_subscribe(hass, "test/state", _callback_a)
async_fire_mqtt_message(
hass, "test/state", "online", qos=0, retain=True
) # Simulate a (retained) message played back
await hass.async_block_till_done()
assert len(calls_a) == 1
mqtt_client_mock.subscribe.assert_called()
calls_a = []
mqtt_client_mock.reset_mock()
await mqtt.async_subscribe(hass, "test/state", _callback_b)
# Simulate edge case where non retained message was received
# after subscription at HA but before the debouncer delay was passed.
# The message without retain flag directly after a subscription should
# be processed by both subscriptions.
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False)
# Simulate a (retained) message played back on new subscriptions
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True)
# Make sure the debouncer delay was passed
await hass.async_block_till_done()
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
await hass.async_block_till_done()
# The current subscription only received the message without retain flag
assert len(calls_a) == 1
assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=False)
# The retained message playback should only be processed by the new subscription.
# The existing subscription already got the latest update, hence the existing
# subscription should not receive the replayed (retained) message.
# Messages without retain flag are received on both subscriptions.
assert len(calls_b) == 2
assert help_assert_message(calls_b[0], "test/state", "online", qos=0, retain=False)
assert help_assert_message(calls_b[1], "test/state", "online", qos=0, retain=True)
mqtt_client_mock.subscribe.assert_called()
calls_a = []
calls_b = []
mqtt_client_mock.reset_mock()
# Simulate new message played back on new subscriptions
# After connecting the retain flag will not be set, even if the
# payload published was retained, we cannot see that
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False)
await hass.async_block_till_done()
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
await hass.async_block_till_done()
assert len(calls_a) == 1
assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=False)
assert len(calls_b) == 1
assert help_assert_message(calls_b[0], "test/state", "online", qos=0, retain=False)
# Now simulate the broker was disconnected shortly
calls_a = []
calls_b = []
mqtt_client_mock.reset_mock()
mqtt_client_mock.on_disconnect(None, None, 0)
mqtt_client_mock.on_connect(None, None, None, 0)
await hass.async_block_till_done()
mqtt_client_mock.subscribe.assert_called()
# Simulate a (retained) message played back after reconnecting
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True)
await hass.async_block_till_done()
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
await hass.async_block_till_done()
# Both subscriptions now should replay the retained message
assert len(calls_a) == 1
assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True)
assert len(calls_b) == 1
assert help_assert_message(calls_b[0], "test/state", "online", qos=0, retain=True)
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0)
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
async def test_replaying_payload_after_resubscribing(
hass: HomeAssistant,
mqtt_client_mock: MqttMockPahoClient,
mqtt_mock_entry: MqttMockHAClientGenerator,
) -> None:
"""Test replaying and filtering retained messages after resubscribing.
When subscribing to the same topic again, SUBSCRIBE must be sent to the broker again
for it to resend any retained messages for new subscriptions.
Retained messages must only be replayed for new subscriptions, except
when the MQTT client is reconnection.
"""
mqtt_mock = await mqtt_mock_entry()
# Fake that the client is connected
mqtt_mock().connected = True
calls_a: list[ReceiveMessage] = []
def _callback_a(msg: ReceiveMessage) -> None:
calls_a.append(msg)
unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a)
await hass.async_block_till_done()
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
await hass.async_block_till_done()
mqtt_client_mock.subscribe.assert_called()
# Simulate a (retained) message played back
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True)
await hass.async_block_till_done()
assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True)
calls_a.clear()
# Test we get updates
async_fire_mqtt_message(hass, "test/state", "offline", qos=0, retain=False)
await hass.async_block_till_done()
assert help_assert_message(calls_a[0], "test/state", "offline", qos=0, retain=False)
calls_a.clear()
# Test we filter new retained updates
async_fire_mqtt_message(hass, "test/state", "offline", qos=0, retain=True)
await hass.async_block_till_done()
assert len(calls_a) == 0
# Unsubscribe an resubscribe again
unsub()
unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a)
await hass.async_block_till_done()
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
await hass.async_block_till_done()
mqtt_client_mock.subscribe.assert_called()
# Simulate we can receive a (retained) played back message again
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True)
await hass.async_block_till_done()
assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True)
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0)
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
async def test_replaying_payload_wildcard_topic(
hass: HomeAssistant,
mqtt_client_mock: MqttMockPahoClient,
mqtt_mock_entry: MqttMockHAClientGenerator,
) -> None:
"""Test replaying retained messages.
When we have multiple subscriptions to the same wildcard topic,
SUBSCRIBE must be sent to the broker again
for it to resend any retained messages for new subscriptions.
Retained messages should only be replayed for new subscriptions, except
when the MQTT client is reconnection.
"""
mqtt_mock = await mqtt_mock_entry()
# Fake that the client is connected
mqtt_mock().connected = True
calls_a: list[ReceiveMessage] = []
calls_b: list[ReceiveMessage] = []
def _callback_a(msg: ReceiveMessage) -> None:
calls_a.append(msg)
def _callback_b(msg: ReceiveMessage) -> None:
calls_b.append(msg)
await mqtt.async_subscribe(hass, "test/#", _callback_a)
# Simulate (retained) messages being played back on new subscriptions
async_fire_mqtt_message(hass, "test/state1", "new_value_1", qos=0, retain=True)
async_fire_mqtt_message(hass, "test/state2", "new_value_2", qos=0, retain=True)
await hass.async_block_till_done()
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
await hass.async_block_till_done()
assert len(calls_a) == 2
mqtt_client_mock.subscribe.assert_called()
calls_a = []
mqtt_client_mock.reset_mock()
# resubscribe to the wild card topic again
await mqtt.async_subscribe(hass, "test/#", _callback_b)
# Simulate (retained) messages being played back on new subscriptions
async_fire_mqtt_message(hass, "test/state1", "initial_value_1", qos=0, retain=True)
async_fire_mqtt_message(hass, "test/state2", "initial_value_2", qos=0, retain=True)
await hass.async_block_till_done()
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
await hass.async_block_till_done()
# The retained messages playback should only be processed for the new subscriptions
assert len(calls_a) == 0
assert len(calls_b) == 2
mqtt_client_mock.subscribe.assert_called()
calls_a = []
calls_b = []
mqtt_client_mock.reset_mock()
# Simulate new messages being received
async_fire_mqtt_message(hass, "test/state1", "update_value_1", qos=0, retain=False)
async_fire_mqtt_message(hass, "test/state2", "update_value_2", qos=0, retain=False)
await hass.async_block_till_done()
assert len(calls_a) == 2
assert len(calls_b) == 2
# Now simulate the broker was disconnected shortly
calls_a = []
calls_b = []
mqtt_client_mock.reset_mock()
mqtt_client_mock.on_disconnect(None, None, 0)
mqtt_client_mock.on_connect(None, None, None, 0)
await hass.async_block_till_done()
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
await hass.async_block_till_done()
mqtt_client_mock.subscribe.assert_called()
# Simulate the (retained) messages are played back after reconnecting
# for all subscriptions
async_fire_mqtt_message(hass, "test/state1", "update_value_1", qos=0, retain=True)
async_fire_mqtt_message(hass, "test/state2", "update_value_2", qos=0, retain=True)
await hass.async_block_till_done()
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
await hass.async_block_till_done()
# Both subscriptions should replay
assert len(calls_a) == 2
assert len(calls_b) == 2
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0)
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
@ -3032,33 +3304,62 @@ async def test_debug_info_qos_retain(
start_dt = datetime(2019, 1, 1, 0, 0, 0)
with patch("homeassistant.util.dt.utcnow") as dt_utcnow:
dt_utcnow.return_value = start_dt
# simulate the first message was replayed from the broker with retained flag
async_fire_mqtt_message(hass, "sensor/abc", "123", qos=0, retain=True)
# simulate an update message
async_fire_mqtt_message(hass, "sensor/abc", "123", qos=0, retain=False)
# simpulate someone else subscribed and retained messages were replayed
async_fire_mqtt_message(hass, "sensor/abc", "123", qos=1, retain=True)
# simulate an update message
async_fire_mqtt_message(hass, "sensor/abc", "123", qos=1, retain=False)
# simulate an update message
async_fire_mqtt_message(hass, "sensor/abc", "123", qos=2, retain=False)
debug_info_data = debug_info.info_for_device(hass, device.id)
assert len(debug_info_data["entities"][0]["subscriptions"]) == 1
# The replayed retained payload was processed
messages = debug_info_data["entities"][0]["subscriptions"][0]["messages"]
assert {
"payload": "123",
"qos": 0,
"retain": True,
"time": start_dt,
"topic": "sensor/abc",
} in messages
# The not retained update was processed normally
assert {
"payload": "123",
"qos": 0,
"retain": False,
"time": start_dt,
"topic": "sensor/abc",
} in debug_info_data["entities"][0]["subscriptions"][0]["messages"]
} in messages
# Since the MQTT client has not lost the connection and has not resubscribed
# The retained payload is not replayed and filtered out as it already
# received a value and appears to be received on an existing subscription
assert {
"payload": "123",
"qos": 1,
"retain": True,
"time": start_dt,
"topic": "sensor/abc",
} in debug_info_data["entities"][0]["subscriptions"][0]["messages"]
} not in messages
# The not retained update was processed normally
assert {
"payload": "123",
"qos": 1,
"retain": False,
"time": start_dt,
"topic": "sensor/abc",
} in messages
# The not retained update was processed normally
assert {
"payload": "123",
"qos": 2,
"retain": False,
"time": start_dt,
"topic": "sensor/abc",
} in debug_info_data["entities"][0]["subscriptions"][0]["messages"]
} in messages
async def test_publish_json_from_template(