Fix reload of MQTT config entries (#76089)
* Wait for unsubscribes * Spelling comment * Remove notify_all() during _register_mid() * Update homeassistant/components/mqtt/client.py Co-authored-by: Erik Montnemery <erik@montnemery.com> * Correct handling reload manual set up MQTT items * Save and restore device trigger subscriptions * Clarify we are storing all remaining subscriptions Co-authored-by: Erik Montnemery <erik@montnemery.com>pull/77734/head
parent
2e4d5aca09
commit
2e34814d7a
homeassistant/components/mqtt
tests/components/mqtt
|
@ -71,6 +71,7 @@ from .const import ( # noqa: F401
|
|||
DATA_MQTT_RELOAD_DISPATCHERS,
|
||||
DATA_MQTT_RELOAD_ENTRY,
|
||||
DATA_MQTT_RELOAD_NEEDED,
|
||||
DATA_MQTT_SUBSCRIPTIONS_TO_RESTORE,
|
||||
DATA_MQTT_UPDATED_CONFIG,
|
||||
DEFAULT_ENCODING,
|
||||
DEFAULT_QOS,
|
||||
|
@ -315,6 +316,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
return False
|
||||
|
||||
hass.data[DATA_MQTT] = MQTT(hass, entry, conf)
|
||||
# Restore saved subscriptions
|
||||
if DATA_MQTT_SUBSCRIPTIONS_TO_RESTORE in hass.data:
|
||||
hass.data[DATA_MQTT].subscriptions = hass.data.pop(
|
||||
DATA_MQTT_SUBSCRIPTIONS_TO_RESTORE
|
||||
)
|
||||
entry.add_update_listener(_async_config_entry_updated)
|
||||
|
||||
await hass.data[DATA_MQTT].async_connect()
|
||||
|
@ -438,6 +444,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
|
||||
async def async_forward_entry_setup_and_setup_discovery(config_entry):
|
||||
"""Forward the config entry setup to the platforms and set up discovery."""
|
||||
reload_manual_setup: bool = False
|
||||
# Local import to avoid circular dependencies
|
||||
# pylint: disable-next=import-outside-toplevel
|
||||
from . import device_automation, tag
|
||||
|
@ -460,8 +467,17 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
await _async_setup_discovery(hass, conf, entry)
|
||||
# Setup reload service after all platforms have loaded
|
||||
await async_setup_reload_service()
|
||||
# When the entry is reloaded, also reload manual set up items to enable MQTT
|
||||
if DATA_MQTT_RELOAD_ENTRY in hass.data:
|
||||
hass.data.pop(DATA_MQTT_RELOAD_ENTRY)
|
||||
reload_manual_setup = True
|
||||
|
||||
# When the entry was disabled before, reload manual set up items to enable MQTT again
|
||||
if DATA_MQTT_RELOAD_NEEDED in hass.data:
|
||||
hass.data.pop(DATA_MQTT_RELOAD_NEEDED)
|
||||
reload_manual_setup = True
|
||||
|
||||
if reload_manual_setup:
|
||||
await async_reload_manual_mqtt_items(hass)
|
||||
|
||||
await async_forward_entry_setup_and_setup_discovery(entry)
|
||||
|
@ -613,8 +629,6 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
mqtt_client.cleanup()
|
||||
|
||||
# Trigger reload manual MQTT items at entry setup
|
||||
# Reload the legacy yaml platform
|
||||
await async_reload_integration_platforms(hass, DOMAIN, RELOADABLE_PLATFORMS)
|
||||
if (mqtt_entry_status := mqtt_config_entry_enabled(hass)) is False:
|
||||
# The entry is disabled reload legacy manual items when the entry is enabled again
|
||||
hass.data[DATA_MQTT_RELOAD_NEEDED] = True
|
||||
|
@ -622,7 +636,13 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
# The entry is reloaded:
|
||||
# Trigger re-fetching the yaml config at entry setup
|
||||
hass.data[DATA_MQTT_RELOAD_ENTRY] = True
|
||||
# Stop the loop
|
||||
# Reload the legacy yaml platform to make entities unavailable
|
||||
await async_reload_integration_platforms(hass, DOMAIN, RELOADABLE_PLATFORMS)
|
||||
# Wait for all ACKs and stop the loop
|
||||
await mqtt_client.async_disconnect()
|
||||
# Store remaining subscriptions to be able to restore or reload them
|
||||
# when the entry is set up again
|
||||
if mqtt_client.subscriptions:
|
||||
hass.data[DATA_MQTT_SUBSCRIPTIONS_TO_RESTORE] = mqtt_client.subscriptions
|
||||
|
||||
return True
|
||||
|
|
|
@ -309,7 +309,7 @@ class MQTT:
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
hass,
|
||||
config_entry,
|
||||
conf,
|
||||
) -> None:
|
||||
|
@ -435,12 +435,13 @@ class MQTT:
|
|||
"""Return False if there are unprocessed ACKs."""
|
||||
return not bool(self._pending_operations)
|
||||
|
||||
# wait for ACK-s to be processesed (unsubscribe only)
|
||||
# wait for ACKs to be processed
|
||||
async with self._pending_operations_condition:
|
||||
await self._pending_operations_condition.wait_for(no_more_acks)
|
||||
|
||||
# stop the MQTT loop
|
||||
await self.hass.async_add_executor_job(stop)
|
||||
async with self._paho_lock:
|
||||
await self.hass.async_add_executor_job(stop)
|
||||
|
||||
async def async_subscribe(
|
||||
self,
|
||||
|
@ -501,7 +502,8 @@ class MQTT:
|
|||
async with self._paho_lock:
|
||||
mid = await self.hass.async_add_executor_job(_client_unsubscribe, topic)
|
||||
await self._register_mid(mid)
|
||||
self.hass.async_create_task(self._wait_for_mid(mid))
|
||||
|
||||
self.hass.async_create_task(self._wait_for_mid(mid))
|
||||
|
||||
async def _async_perform_subscriptions(
|
||||
self, subscriptions: Iterable[tuple[str, int]]
|
||||
|
|
|
@ -32,6 +32,7 @@ CONF_TLS_VERSION = "tls_version"
|
|||
|
||||
CONFIG_ENTRY_IS_SETUP = "mqtt_config_entry_is_setup"
|
||||
DATA_MQTT = "mqtt"
|
||||
DATA_MQTT_SUBSCRIPTIONS_TO_RESTORE = "mqtt_client_subscriptions"
|
||||
DATA_MQTT_CONFIG = "mqtt_config"
|
||||
MQTT_DATA_DEVICE_TRACKER_LEGACY = "mqtt_device_tracker_legacy"
|
||||
DATA_MQTT_RELOAD_DISPATCHERS = "mqtt_reload_dispatchers"
|
||||
|
|
|
@ -65,6 +65,7 @@ from .const import (
|
|||
DATA_MQTT,
|
||||
DATA_MQTT_CONFIG,
|
||||
DATA_MQTT_RELOAD_DISPATCHERS,
|
||||
DATA_MQTT_RELOAD_ENTRY,
|
||||
DATA_MQTT_UPDATED_CONFIG,
|
||||
DEFAULT_ENCODING,
|
||||
DEFAULT_PAYLOAD_AVAILABLE,
|
||||
|
@ -363,6 +364,12 @@ async def async_setup_platform_helper(
|
|||
async_setup_entities: SetupEntity,
|
||||
) -> None:
|
||||
"""Help to set up the platform for manual configured MQTT entities."""
|
||||
if DATA_MQTT_RELOAD_ENTRY in hass.data:
|
||||
_LOGGER.debug(
|
||||
"MQTT integration is %s, skipping setup of manually configured MQTT items while unloading the config entry",
|
||||
platform_domain,
|
||||
)
|
||||
return
|
||||
if not (entry_status := mqtt_config_entry_enabled(hass)):
|
||||
_LOGGER.warning(
|
||||
"MQTT integration is %s, skipping setup of manually configured MQTT %s",
|
||||
|
|
|
@ -4,6 +4,7 @@ from unittest.mock import patch
|
|||
|
||||
import pytest
|
||||
|
||||
from homeassistant import config as hass_config
|
||||
import homeassistant.components.automation as automation
|
||||
from homeassistant.components.device_automation import DeviceAutomationType
|
||||
from homeassistant.components.mqtt import _LOGGER, DOMAIN, debug_info
|
||||
|
@ -1425,7 +1426,24 @@ async def test_unload_entry(hass, calls, device_reg, mqtt_mock, tmp_path) -> Non
|
|||
|
||||
await help_test_unload_config_entry(hass, tmp_path, {})
|
||||
|
||||
# Fake short press 2
|
||||
# Rediscover message and fake short press 2 (non impact)
|
||||
async_fire_mqtt_message(hass, "homeassistant/device_automation/bla1/config", data1)
|
||||
await hass.async_block_till_done()
|
||||
async_fire_mqtt_message(hass, "foobar/triggers/button1", "short_press")
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
mqtt_entry = hass.config_entries.async_entries("mqtt")[0]
|
||||
|
||||
# Load the entry again
|
||||
new_yaml_config_file = tmp_path / "configuration.yaml"
|
||||
new_yaml_config_file.write_text("")
|
||||
with patch.object(hass_config, "YAML_CONFIG_FILE", new_yaml_config_file):
|
||||
await mqtt_entry.async_setup(hass)
|
||||
|
||||
# Rediscover and fake short press 3
|
||||
async_fire_mqtt_message(hass, "homeassistant/device_automation/bla1/config", data1)
|
||||
await hass.async_block_till_done()
|
||||
async_fire_mqtt_message(hass, "foobar/triggers/button1", "short_press")
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 2
|
||||
|
|
Loading…
Reference in New Issue