2019-04-03 15:40:03 +00:00
|
|
|
"""Helper to handle a set of topics to subscribe to."""
|
2018-11-19 15:49:04 +00:00
|
|
|
import logging
|
2018-12-03 18:16:58 +00:00
|
|
|
from typing import Any, Callable, Dict, Optional
|
|
|
|
|
|
|
|
import attr
|
2018-11-19 15:49:04 +00:00
|
|
|
|
|
|
|
from homeassistant.components import mqtt
|
|
|
|
from homeassistant.helpers.typing import HomeAssistantType
|
2018-12-03 18:16:58 +00:00
|
|
|
from homeassistant.loader import bind_hass
|
2018-11-19 15:49:04 +00:00
|
|
|
|
2020-04-01 17:00:40 +00:00
|
|
|
from . import debug_info
|
2019-10-18 00:04:27 +00:00
|
|
|
from .const import DEFAULT_QOS
|
|
|
|
from .models import MessageCallbackType
|
2019-03-21 05:56:46 +00:00
|
|
|
|
2018-11-19 15:49:04 +00:00
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
2018-12-03 18:16:58 +00:00
|
|
|
@attr.s(slots=True)
|
|
|
|
class EntitySubscription:
|
|
|
|
"""Class to hold data about an active entity topic subscription."""
|
|
|
|
|
2020-07-14 17:30:30 +00:00
|
|
|
hass: HomeAssistantType = attr.ib()
|
|
|
|
topic: str = attr.ib()
|
|
|
|
message_callback: MessageCallbackType = attr.ib()
|
|
|
|
unsubscribe_callback: Optional[Callable[[], None]] = attr.ib()
|
|
|
|
qos: int = attr.ib(default=0)
|
|
|
|
encoding: str = attr.ib(default="utf-8")
|
2018-12-03 18:16:58 +00:00
|
|
|
|
|
|
|
async def resubscribe_if_necessary(self, hass, other):
|
|
|
|
"""Re-subscribe to the new topic if necessary."""
|
|
|
|
if not self._should_resubscribe(other):
|
2020-11-10 20:55:26 +00:00
|
|
|
self.unsubscribe_callback = other.unsubscribe_callback
|
2018-12-03 18:16:58 +00:00
|
|
|
return
|
|
|
|
|
|
|
|
if other is not None and other.unsubscribe_callback is not None:
|
|
|
|
other.unsubscribe_callback()
|
2020-04-01 17:00:40 +00:00
|
|
|
# Clear debug data if it exists
|
2020-04-07 16:38:22 +00:00
|
|
|
debug_info.remove_subscription(
|
|
|
|
self.hass, other.message_callback, other.topic
|
|
|
|
)
|
2018-12-03 18:16:58 +00:00
|
|
|
|
|
|
|
if self.topic is None:
|
|
|
|
# We were asked to remove the subscription or not to create it
|
|
|
|
return
|
|
|
|
|
2020-04-01 17:00:40 +00:00
|
|
|
# Prepare debug data
|
2020-04-07 16:38:22 +00:00
|
|
|
debug_info.add_subscription(self.hass, self.message_callback, self.topic)
|
2020-04-01 17:00:40 +00:00
|
|
|
|
2018-12-03 18:16:58 +00:00
|
|
|
self.unsubscribe_callback = await mqtt.async_subscribe(
|
2019-07-31 19:25:30 +00:00
|
|
|
hass, self.topic, self.message_callback, self.qos, self.encoding
|
2018-12-03 18:16:58 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
def _should_resubscribe(self, other):
|
|
|
|
"""Check if we should re-subscribe to the topic using the old state."""
|
|
|
|
if other is None:
|
|
|
|
return True
|
|
|
|
|
2019-07-31 19:25:30 +00:00
|
|
|
return (self.topic, self.qos, self.encoding) != (
|
|
|
|
other.topic,
|
|
|
|
other.qos,
|
|
|
|
other.encoding,
|
|
|
|
)
|
2018-12-03 18:16:58 +00:00
|
|
|
|
|
|
|
|
2018-11-19 15:49:04 +00:00
|
|
|
@bind_hass
|
2019-07-31 19:25:30 +00:00
|
|
|
async def async_subscribe_topics(
|
|
|
|
hass: HomeAssistantType,
|
|
|
|
new_state: Optional[Dict[str, EntitySubscription]],
|
|
|
|
topics: Dict[str, Any],
|
|
|
|
):
|
2018-11-19 15:49:04 +00:00
|
|
|
"""(Re)Subscribe to a set of MQTT topics.
|
|
|
|
|
2018-12-03 18:16:58 +00:00
|
|
|
State is kept in sub_state and a dictionary mapping from the subscription
|
|
|
|
key to the subscription state.
|
|
|
|
|
|
|
|
Please note that the sub state must not be shared between multiple
|
|
|
|
sets of topics. Every call to async_subscribe_topics must always
|
|
|
|
contain _all_ the topics the subscription state should manage.
|
2018-11-19 15:49:04 +00:00
|
|
|
"""
|
2018-12-03 18:16:58 +00:00
|
|
|
current_subscriptions = new_state if new_state is not None else {}
|
|
|
|
new_state = {}
|
|
|
|
for key, value in topics.items():
|
|
|
|
# Extract the new requested subscription
|
|
|
|
requested = EntitySubscription(
|
2019-07-31 19:25:30 +00:00
|
|
|
topic=value.get("topic", None),
|
|
|
|
message_callback=value.get("msg_callback", None),
|
2018-12-03 18:16:58 +00:00
|
|
|
unsubscribe_callback=None,
|
2019-07-31 19:25:30 +00:00
|
|
|
qos=value.get("qos", DEFAULT_QOS),
|
|
|
|
encoding=value.get("encoding", "utf-8"),
|
2020-04-01 17:00:40 +00:00
|
|
|
hass=hass,
|
2018-12-03 18:16:58 +00:00
|
|
|
)
|
|
|
|
# Get the current subscription state
|
|
|
|
current = current_subscriptions.pop(key, None)
|
|
|
|
await requested.resubscribe_if_necessary(hass, current)
|
|
|
|
new_state[key] = requested
|
2018-11-19 15:49:04 +00:00
|
|
|
|
2018-12-03 18:16:58 +00:00
|
|
|
# Go through all remaining subscriptions and unsubscribe them
|
|
|
|
for remaining in current_subscriptions.values():
|
|
|
|
if remaining.unsubscribe_callback is not None:
|
|
|
|
remaining.unsubscribe_callback()
|
2020-04-01 17:00:40 +00:00
|
|
|
# Clear debug data if it exists
|
2020-04-07 16:38:22 +00:00
|
|
|
debug_info.remove_subscription(
|
|
|
|
hass, remaining.message_callback, remaining.topic
|
|
|
|
)
|
2018-12-03 18:16:58 +00:00
|
|
|
|
|
|
|
return new_state
|
2018-11-19 15:49:04 +00:00
|
|
|
|
|
|
|
|
|
|
|
@bind_hass
|
|
|
|
async def async_unsubscribe_topics(hass: HomeAssistantType, sub_state: dict):
|
|
|
|
"""Unsubscribe from all MQTT topics managed by async_subscribe_topics."""
|
2018-12-19 13:05:24 +00:00
|
|
|
return await async_subscribe_topics(hass, sub_state, {})
|