From 69a177e864a45d83b8b1f7a0227ed30973495862 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 27 May 2024 18:14:58 -1000 Subject: [PATCH] Migrate mqtt discovery subscribes to use internal helper (#118279) --- homeassistant/components/mqtt/discovery.py | 89 +++++++++++----------- 1 file changed, 45 insertions(+), 44 deletions(-) diff --git a/homeassistant/components/mqtt/discovery.py b/homeassistant/components/mqtt/discovery.py index 43c07688a43..2cdd900690c 100644 --- a/homeassistant/components/mqtt/discovery.py +++ b/homeassistant/components/mqtt/discovery.py @@ -329,54 +329,55 @@ async def async_start( # noqa: C901 mqtt_data.last_discovery = time.monotonic() mqtt_integrations = await async_get_mqtt(hass) + integration_unsubscribe = mqtt_data.integration_unsubscribe - for integration, topics in mqtt_integrations.items(): + async def async_integration_message_received( + integration: str, msg: ReceiveMessage + ) -> None: + """Process the received message.""" + if TYPE_CHECKING: + assert mqtt_data.data_config_flow_lock + key = f"{integration}_{msg.subscribed_topic}" - async def async_integration_message_received( - integration: str, msg: ReceiveMessage - ) -> None: - """Process the received message.""" - if TYPE_CHECKING: - assert mqtt_data.data_config_flow_lock - key = f"{integration}_{msg.subscribed_topic}" + # Lock to prevent initiating many parallel config flows. + # Note: The lock is not intended to prevent a race, only for performance + async with mqtt_data.data_config_flow_lock: + # Already unsubscribed + if key not in integration_unsubscribe: + return - # Lock to prevent initiating many parallel config flows. - # Note: The lock is not intended to prevent a race, only for performance - async with mqtt_data.data_config_flow_lock: - # Already unsubscribed - if key not in mqtt_data.integration_unsubscribe: - return + data = MqttServiceInfo( + topic=msg.topic, + payload=msg.payload, + qos=msg.qos, + retain=msg.retain, + subscribed_topic=msg.subscribed_topic, + timestamp=msg.timestamp, + ) + result = await hass.config_entries.flow.async_init( + integration, context={"source": DOMAIN}, data=data + ) + if ( + result + and result["type"] == FlowResultType.ABORT + and result["reason"] + in ("already_configured", "single_instance_allowed") + ): + integration_unsubscribe.pop(key)() - data = MqttServiceInfo( - topic=msg.topic, - payload=msg.payload, - qos=msg.qos, - retain=msg.retain, - subscribed_topic=msg.subscribed_topic, - timestamp=msg.timestamp, - ) - result = await hass.config_entries.flow.async_init( - integration, context={"source": DOMAIN}, data=data - ) - if ( - result - and result["type"] == FlowResultType.ABORT - and result["reason"] - in ("already_configured", "single_instance_allowed") - ): - mqtt_data.integration_unsubscribe.pop(key)() - - mqtt_data.integration_unsubscribe.update( - { - f"{integration}_{topic}": await mqtt.async_subscribe( - hass, - topic, - functools.partial(async_integration_message_received, integration), - 0, - ) - for topic in topics - } - ) + integration_unsubscribe.update( + { + f"{integration}_{topic}": mqtt.async_subscribe_internal( + hass, + topic, + functools.partial(async_integration_message_received, integration), + 0, + job_type=HassJobType.Coroutinefunction, + ) + for integration, topics in mqtt_integrations.items() + for topic in topics + } + ) async def async_stop(hass: HomeAssistant) -> None: