Migrate mqtt discovery subscribes to use internal helper (#118279)

pull/118287/head
J. Nick Koston 2024-05-27 18:14:58 -10:00 committed by GitHub
parent 63227f14ed
commit 69a177e864
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 45 additions and 44 deletions

View File

@ -329,54 +329,55 @@ async def async_start( # noqa: C901
mqtt_data.last_discovery = time.monotonic()
mqtt_integrations = await async_get_mqtt(hass)
integration_unsubscribe = mqtt_data.integration_unsubscribe
for integration, topics in mqtt_integrations.items():
async def async_integration_message_received(
integration: str, msg: ReceiveMessage
) -> None:
"""Process the received message."""
if TYPE_CHECKING:
assert mqtt_data.data_config_flow_lock
key = f"{integration}_{msg.subscribed_topic}"
async def async_integration_message_received(
integration: str, msg: ReceiveMessage
) -> None:
"""Process the received message."""
if TYPE_CHECKING:
assert mqtt_data.data_config_flow_lock
key = f"{integration}_{msg.subscribed_topic}"
# Lock to prevent initiating many parallel config flows.
# Note: The lock is not intended to prevent a race, only for performance
async with mqtt_data.data_config_flow_lock:
# Already unsubscribed
if key not in integration_unsubscribe:
return
# Lock to prevent initiating many parallel config flows.
# Note: The lock is not intended to prevent a race, only for performance
async with mqtt_data.data_config_flow_lock:
# Already unsubscribed
if key not in mqtt_data.integration_unsubscribe:
return
data = MqttServiceInfo(
topic=msg.topic,
payload=msg.payload,
qos=msg.qos,
retain=msg.retain,
subscribed_topic=msg.subscribed_topic,
timestamp=msg.timestamp,
)
result = await hass.config_entries.flow.async_init(
integration, context={"source": DOMAIN}, data=data
)
if (
result
and result["type"] == FlowResultType.ABORT
and result["reason"]
in ("already_configured", "single_instance_allowed")
):
integration_unsubscribe.pop(key)()
data = MqttServiceInfo(
topic=msg.topic,
payload=msg.payload,
qos=msg.qos,
retain=msg.retain,
subscribed_topic=msg.subscribed_topic,
timestamp=msg.timestamp,
)
result = await hass.config_entries.flow.async_init(
integration, context={"source": DOMAIN}, data=data
)
if (
result
and result["type"] == FlowResultType.ABORT
and result["reason"]
in ("already_configured", "single_instance_allowed")
):
mqtt_data.integration_unsubscribe.pop(key)()
mqtt_data.integration_unsubscribe.update(
{
f"{integration}_{topic}": await mqtt.async_subscribe(
hass,
topic,
functools.partial(async_integration_message_received, integration),
0,
)
for topic in topics
}
)
integration_unsubscribe.update(
{
f"{integration}_{topic}": mqtt.async_subscribe_internal(
hass,
topic,
functools.partial(async_integration_message_received, integration),
0,
job_type=HassJobType.Coroutinefunction,
)
for integration, topics in mqtt_integrations.items()
for topic in topics
}
)
async def async_stop(hass: HomeAssistant) -> None: