Add MQTT siren platform (#64440)

* Add mqtt siren draft

* fix tests

* Intergrate notify platform

* tests and fixes siren platform

* Add tests notify platform

* config parameters and abbreviations

* remove duplicate key

* undo move topic abbreviation

* Move const CONF_MESSAGE_COMMAND_TEMPLATE

* Remove notify service integration

* Rework

* Update homeassistant/components/mqtt/siren.py

Co-authored-by: Erik Montnemery <erik@montnemery.com>

* Publish JSON by default

* Allow unknown state - rename value_template

Co-authored-by: Erik Montnemery <erik@montnemery.com>
pull/65296/head
Jan Bouwhuis 2022-01-31 10:31:57 +01:00 committed by GitHub
parent bfaada34e2
commit 6fdaec0847
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1266 additions and 0 deletions

View File

@ -150,6 +150,7 @@ PLATFORMS = [
Platform.SELECT,
Platform.SCENE,
Platform.SENSOR,
Platform.SIREN,
Platform.SWITCH,
Platform.VACUUM,
]

View File

@ -7,6 +7,7 @@ ABBREVIATIONS = {
"aux_cmd_t": "aux_command_topic",
"aux_stat_tpl": "aux_state_template",
"aux_stat_t": "aux_state_topic",
"av_tones": "available_tones",
"avty": "availability",
"avty_mode": "availability_mode",
"avty_t": "availability_topic",
@ -205,6 +206,8 @@ ABBREVIATIONS = {
"stat_val_tpl": "state_value_template",
"step": "step",
"stype": "subtype",
"sup_dur": "support_duration",
"sup_vol": "support_volume_set",
"sup_feat": "supported_features",
"sup_clrm": "supported_color_modes",
"swing_mode_cmd_tpl": "swing_mode_command_template",

View File

@ -51,4 +51,7 @@ DOMAIN = "mqtt"
MQTT_CONNECTED = "mqtt_connected"
MQTT_DISCONNECTED = "mqtt_disconnected"
PAYLOAD_EMPTY_JSON = "{}"
PAYLOAD_NONE = "None"
PROTOCOL_311 = "3.1.1"

View File

@ -50,6 +50,7 @@ SUPPORTED_COMPONENTS = [
"lock",
"number",
"scene",
"siren",
"select",
"sensor",
"switch",

View File

@ -0,0 +1,374 @@
"""Support for MQTT sirens."""
from __future__ import annotations
import copy
import functools
import json
import logging
from typing import Any
import voluptuous as vol
from homeassistant.components import siren
from homeassistant.components.siren import (
TURN_ON_SCHEMA,
SirenEntity,
process_turn_on_params,
)
from homeassistant.components.siren.const import (
ATTR_AVAILABLE_TONES,
ATTR_DURATION,
ATTR_TONE,
ATTR_VOLUME_LEVEL,
SUPPORT_DURATION,
SUPPORT_TONES,
SUPPORT_TURN_OFF,
SUPPORT_TURN_ON,
SUPPORT_VOLUME_SET,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_NAME,
CONF_OPTIMISTIC,
CONF_PAYLOAD_OFF,
CONF_PAYLOAD_ON,
)
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.reload import async_setup_reload_service
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import PLATFORMS, MqttCommandTemplate, MqttValueTemplate, subscription
from .. import mqtt
from .const import (
CONF_COMMAND_TEMPLATE,
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_QOS,
CONF_RETAIN,
CONF_STATE_TOPIC,
DOMAIN,
PAYLOAD_EMPTY_JSON,
PAYLOAD_NONE,
)
from .debug_info import log_messages
from .mixins import MQTT_ENTITY_COMMON_SCHEMA, MqttEntity, async_setup_entry_helper
DEFAULT_NAME = "MQTT Siren"
DEFAULT_PAYLOAD_ON = "ON"
DEFAULT_PAYLOAD_OFF = "OFF"
DEFAULT_OPTIMISTIC = False
ENTITY_ID_FORMAT = siren.DOMAIN + ".{}"
CONF_AVAILABLE_TONES = "available_tones"
CONF_COMMAND_OFF_TEMPLATE = "command_off_template"
CONF_STATE_ON = "state_on"
CONF_STATE_OFF = "state_off"
CONF_STATE_VALUE_TEMPLATE = "state_value_template"
CONF_SUPPORT_DURATION = "support_duration"
CONF_SUPPORT_VOLUME_SET = "support_volume_set"
STATE = "state"
PLATFORM_SCHEMA = mqtt.MQTT_RW_PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_AVAILABLE_TONES): cv.ensure_list,
vol.Optional(CONF_COMMAND_TEMPLATE): cv.template,
vol.Optional(CONF_COMMAND_OFF_TEMPLATE): cv.template,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_OPTIMISTIC, default=DEFAULT_OPTIMISTIC): cv.boolean,
vol.Optional(CONF_PAYLOAD_OFF, default=DEFAULT_PAYLOAD_OFF): cv.string,
vol.Optional(CONF_PAYLOAD_ON, default=DEFAULT_PAYLOAD_ON): cv.string,
vol.Optional(CONF_STATE_OFF): cv.string,
vol.Optional(CONF_STATE_ON): cv.string,
vol.Optional(CONF_STATE_VALUE_TEMPLATE): cv.template,
vol.Optional(CONF_SUPPORT_DURATION, default=True): cv.boolean,
vol.Optional(CONF_SUPPORT_VOLUME_SET, default=True): cv.boolean,
},
).extend(MQTT_ENTITY_COMMON_SCHEMA.schema)
DISCOVERY_SCHEMA = vol.All(PLATFORM_SCHEMA.extend({}, extra=vol.REMOVE_EXTRA))
MQTT_SIREN_ATTRIBUTES_BLOCKED = frozenset(
{
ATTR_AVAILABLE_TONES,
ATTR_DURATION,
ATTR_TONE,
ATTR_VOLUME_LEVEL,
}
)
SUPPORTED_BASE = SUPPORT_TURN_OFF | SUPPORT_TURN_ON
SUPPORTED_ATTRIBUTES = {
ATTR_DURATION: SUPPORT_DURATION,
ATTR_TONE: SUPPORT_TONES,
ATTR_VOLUME_LEVEL: SUPPORT_VOLUME_SET,
}
_LOGGER = logging.getLogger(__name__)
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up MQTT siren through configuration.yaml."""
await async_setup_reload_service(hass, DOMAIN, PLATFORMS)
await _async_setup_entity(hass, async_add_entities, config)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up MQTT siren dynamically through MQTT discovery."""
setup = functools.partial(
_async_setup_entity, hass, async_add_entities, config_entry=config_entry
)
await async_setup_entry_helper(hass, siren.DOMAIN, setup, DISCOVERY_SCHEMA)
async def _async_setup_entity(
hass, async_add_entities, config, config_entry=None, discovery_data=None
):
"""Set up the MQTT siren."""
async_add_entities([MqttSiren(hass, config, config_entry, discovery_data)])
class MqttSiren(MqttEntity, SirenEntity):
"""Representation of a siren that can be controlled using MQTT."""
_entity_id_format = ENTITY_ID_FORMAT
_attributes_extra_blocked = MQTT_SIREN_ATTRIBUTES_BLOCKED
def __init__(self, hass, config, config_entry, discovery_data):
"""Initialize the MQTT siren."""
self._attr_name = config[CONF_NAME]
self._attr_should_poll = False
self._supported_features = SUPPORTED_BASE
self._attr_is_on = None
self._state_on = None
self._state_off = None
self._optimistic = None
self._attr_extra_state_attributes: dict[str, Any] = {}
self.target = None
MqttEntity.__init__(self, hass, config, config_entry, discovery_data)
@staticmethod
def config_schema():
"""Return the config schema."""
return DISCOVERY_SCHEMA
def _setup_from_config(self, config):
"""(Re)Setup the entity."""
state_on = config.get(CONF_STATE_ON)
self._state_on = state_on if state_on else config[CONF_PAYLOAD_ON]
state_off = config.get(CONF_STATE_OFF)
self._state_off = state_off if state_off else config[CONF_PAYLOAD_OFF]
if config[CONF_SUPPORT_DURATION]:
self._supported_features |= SUPPORT_DURATION
self._attr_extra_state_attributes[ATTR_DURATION] = None
if config.get(CONF_AVAILABLE_TONES):
self._supported_features |= SUPPORT_TONES
self._attr_available_tones = config[CONF_AVAILABLE_TONES]
self._attr_extra_state_attributes[ATTR_TONE] = None
if config[CONF_SUPPORT_VOLUME_SET]:
self._supported_features |= SUPPORT_VOLUME_SET
self._attr_extra_state_attributes[ATTR_VOLUME_LEVEL] = None
self._optimistic = config[CONF_OPTIMISTIC] or CONF_STATE_TOPIC not in config
self._attr_is_on = False if self._optimistic else None
command_template = config.get(CONF_COMMAND_TEMPLATE)
command_off_template = config.get(CONF_COMMAND_OFF_TEMPLATE) or config.get(
CONF_COMMAND_TEMPLATE
)
self._command_templates = {
CONF_COMMAND_TEMPLATE: MqttCommandTemplate(
command_template, entity=self
).async_render
if command_template
else None,
CONF_COMMAND_OFF_TEMPLATE: MqttCommandTemplate(
command_off_template, entity=self
).async_render
if command_off_template
else None,
}
self._value_template = MqttValueTemplate(
config.get(CONF_STATE_VALUE_TEMPLATE),
entity=self,
).async_render_with_possible_json_value
async def _subscribe_topics(self):
"""(Re)Subscribe to topics."""
@callback
@log_messages(self.hass, self.entity_id)
def state_message_received(msg):
"""Handle new MQTT state messages."""
payload = self._value_template(msg.payload)
if not payload or payload == PAYLOAD_EMPTY_JSON:
_LOGGER.debug(
"Ignoring empty payload '%s' after rendering for topic %s",
payload,
msg.topic,
)
return
json_payload = {}
if payload in [self._state_on, self._state_off, PAYLOAD_NONE]:
json_payload = {STATE: payload}
else:
try:
json_payload = json.loads(payload)
_LOGGER.debug(
"JSON payload detected after processing payload '%s' on topic %s",
json_payload,
msg.topic,
)
except json.decoder.JSONDecodeError:
_LOGGER.warning(
"No valid (JSON) payload detected after processing payload '%s' on topic %s",
json_payload,
msg.topic,
)
return
if STATE in json_payload:
if json_payload[STATE] == self._state_on:
self._attr_is_on = True
if json_payload[STATE] == self._state_off:
self._attr_is_on = False
if json_payload[STATE] == PAYLOAD_NONE:
self._attr_is_on = None
del json_payload[STATE]
if json_payload:
# process attributes
try:
vol.All(TURN_ON_SCHEMA)(json_payload)
except vol.MultipleInvalid as invalid_siren_parameters:
_LOGGER.warning(
"Unable to update siren state attributes from payload '%s': %s",
json_payload,
invalid_siren_parameters,
)
return
self._update(process_turn_on_params(self, json_payload))
self.async_write_ha_state()
if self._config.get(CONF_STATE_TOPIC) is None:
# Force into optimistic mode.
self._optimistic = True
else:
self._sub_state = await subscription.async_subscribe_topics(
self.hass,
self._sub_state,
{
CONF_STATE_TOPIC: {
"topic": self._config.get(CONF_STATE_TOPIC),
"msg_callback": state_message_received,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
}
},
)
@property
def assumed_state(self):
"""Return true if we do optimistic updates."""
return self._optimistic
@property
def extra_state_attributes(self) -> dict:
"""Return the state attributes."""
mqtt_attributes = super().extra_state_attributes
attributes = (
copy.deepcopy(mqtt_attributes) if mqtt_attributes is not None else {}
)
attributes.update(self._attr_extra_state_attributes)
return attributes
@property
def supported_features(self) -> int:
"""Flag supported features."""
return self._supported_features
async def _async_publish(
self,
topic: str,
template: str,
value: Any,
variables: dict[str, Any] | None = None,
) -> None:
"""Publish MQTT payload with optional command template."""
template_variables = {STATE: value}
if variables is not None:
template_variables.update(variables)
payload = (
self._command_templates[template](value, template_variables)
if self._command_templates[template]
else json.dumps(template_variables)
)
if payload and payload not in PAYLOAD_NONE:
await mqtt.async_publish(
self.hass,
self._config[topic],
payload,
self._config[CONF_QOS],
self._config[CONF_RETAIN],
self._config[CONF_ENCODING],
)
async def async_turn_on(self, **kwargs) -> None:
"""Turn the siren on.
This method is a coroutine.
"""
await self._async_publish(
CONF_COMMAND_TOPIC,
CONF_COMMAND_TEMPLATE,
self._config[CONF_PAYLOAD_ON],
kwargs,
)
if self._optimistic:
# Optimistically assume that siren has changed state.
_LOGGER.debug("Writing state attributes %s", kwargs)
self._attr_is_on = True
self._update(kwargs)
self.async_write_ha_state()
async def async_turn_off(self, **kwargs) -> None:
"""Turn the siren off.
This method is a coroutine.
"""
await self._async_publish(
CONF_COMMAND_TOPIC,
CONF_COMMAND_OFF_TEMPLATE,
self._config[CONF_PAYLOAD_OFF],
)
if self._optimistic:
# Optimistically assume that siren has changed state.
self._attr_is_on = False
self.async_write_ha_state()
def _update(self, data: dict[str, Any]) -> None:
"""Update the extra siren state attributes."""
for attribute, support in SUPPORTED_ATTRIBUTES.items():
if self._supported_features & support and attribute in data:
self._attr_extra_state_attributes[attribute] = data[attribute]

View File

@ -0,0 +1,884 @@
"""The tests for the MQTT siren platform."""
import copy
from unittest.mock import patch
import pytest
from homeassistant.components import siren
from homeassistant.components.siren.const import ATTR_VOLUME_LEVEL
from homeassistant.const import (
ATTR_ASSUMED_STATE,
ATTR_ENTITY_ID,
ENTITY_MATCH_ALL,
SERVICE_TURN_OFF,
SERVICE_TURN_ON,
STATE_OFF,
STATE_ON,
STATE_UNKNOWN,
)
from homeassistant.setup import async_setup_component
from .test_common import (
help_test_availability_when_connection_lost,
help_test_availability_without_topic,
help_test_custom_availability_payload,
help_test_default_availability_payload,
help_test_discovery_broken,
help_test_discovery_removal,
help_test_discovery_update,
help_test_discovery_update_attr,
help_test_discovery_update_unchanged,
help_test_encoding_subscribable_topics,
help_test_entity_debug_info_message,
help_test_entity_device_info_remove,
help_test_entity_device_info_update,
help_test_entity_device_info_with_connection,
help_test_entity_device_info_with_identifier,
help_test_entity_id_update_discovery_update,
help_test_entity_id_update_subscriptions,
help_test_publishing_with_custom_encoding,
help_test_reloadable,
help_test_setting_attribute_via_mqtt_json_message,
help_test_setting_attribute_with_template,
help_test_setting_blocked_attribute_via_mqtt_json_message,
help_test_unique_id,
help_test_update_with_json_attrs_bad_JSON,
help_test_update_with_json_attrs_not_dict,
)
from tests.common import async_fire_mqtt_message
DEFAULT_CONFIG = {
siren.DOMAIN: {"platform": "mqtt", "name": "test", "command_topic": "test-topic"}
}
async def async_turn_on(hass, entity_id=ENTITY_MATCH_ALL, parameters={}) -> None:
"""Turn all or specified siren on."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
data.update(parameters)
await hass.services.async_call(siren.DOMAIN, SERVICE_TURN_ON, data, blocking=True)
async def async_turn_off(hass, entity_id=ENTITY_MATCH_ALL) -> None:
"""Turn all or specified siren off."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
await hass.services.async_call(siren.DOMAIN, SERVICE_TURN_OFF, data, blocking=True)
async def test_controlling_state_via_topic(hass, mqtt_mock):
"""Test the controlling state via topic."""
assert await async_setup_component(
hass,
siren.DOMAIN,
{
siren.DOMAIN: {
"platform": "mqtt",
"name": "test",
"state_topic": "state-topic",
"command_topic": "command-topic",
"payload_on": 1,
"payload_off": 0,
}
},
)
await hass.async_block_till_done()
state = hass.states.get("siren.test")
assert state.state == STATE_UNKNOWN
assert not state.attributes.get(ATTR_ASSUMED_STATE)
async_fire_mqtt_message(hass, "state-topic", "1")
state = hass.states.get("siren.test")
assert state.state == STATE_ON
async_fire_mqtt_message(hass, "state-topic", "0")
state = hass.states.get("siren.test")
assert state.state == STATE_OFF
async def test_sending_mqtt_commands_and_optimistic(hass, mqtt_mock):
"""Test the sending MQTT commands in optimistic mode."""
assert await async_setup_component(
hass,
siren.DOMAIN,
{
siren.DOMAIN: {
"platform": "mqtt",
"name": "test",
"command_topic": "command-topic",
"payload_on": "beer on",
"payload_off": "beer off",
"qos": "2",
}
},
)
await hass.async_block_till_done()
state = hass.states.get("siren.test")
assert state.state == STATE_OFF
assert state.attributes.get(ATTR_ASSUMED_STATE)
await async_turn_on(hass, entity_id="siren.test")
mqtt_mock.async_publish.assert_called_once_with(
"command-topic", '{"state": "beer on"}', 2, False
)
mqtt_mock.async_publish.reset_mock()
state = hass.states.get("siren.test")
assert state.state == STATE_ON
await async_turn_off(hass, entity_id="siren.test")
mqtt_mock.async_publish.assert_called_once_with(
"command-topic", '{"state": "beer off"}', 2, False
)
state = hass.states.get("siren.test")
assert state.state == STATE_OFF
async def test_controlling_state_via_topic_and_json_message(hass, mqtt_mock, caplog):
"""Test the controlling state via topic and JSON message."""
assert await async_setup_component(
hass,
siren.DOMAIN,
{
siren.DOMAIN: {
"platform": "mqtt",
"name": "test",
"state_topic": "state-topic",
"command_topic": "command-topic",
"payload_on": "beer on",
"payload_off": "beer off",
"state_value_template": "{{ value_json.val }}",
}
},
)
await hass.async_block_till_done()
state = hass.states.get("siren.test")
assert state.state == STATE_UNKNOWN
async_fire_mqtt_message(hass, "state-topic", '{"val":"beer on"}')
state = hass.states.get("siren.test")
assert state.state == STATE_ON
async_fire_mqtt_message(hass, "state-topic", '{"val": null }')
state = hass.states.get("siren.test")
assert state.state == STATE_UNKNOWN
async_fire_mqtt_message(hass, "state-topic", '{"val":"beer off"}')
state = hass.states.get("siren.test")
assert state.state == STATE_OFF
async def test_controlling_state_and_attributes_with_json_message_without_template(
hass, mqtt_mock, caplog
):
"""Test the controlling state via topic and JSON message without a value template."""
assert await async_setup_component(
hass,
siren.DOMAIN,
{
siren.DOMAIN: {
"platform": "mqtt",
"name": "test",
"state_topic": "state-topic",
"command_topic": "command-topic",
"payload_on": "beer on",
"payload_off": "beer off",
"available_tones": ["ping", "siren", "bell"],
}
},
)
await hass.async_block_till_done()
state = hass.states.get("siren.test")
assert state.state == STATE_UNKNOWN
assert state.attributes.get(siren.ATTR_TONE) is None
assert state.attributes.get(siren.ATTR_DURATION) is None
assert state.attributes.get(siren.ATTR_VOLUME_LEVEL) is None
async_fire_mqtt_message(
hass,
"state-topic",
'{"state":"beer on", "tone": "bell", "duration": 10, "volume_level": 0.5 }',
)
state = hass.states.get("siren.test")
assert state.state == STATE_ON
assert state.attributes.get(siren.ATTR_TONE) == "bell"
assert state.attributes.get(siren.ATTR_DURATION) == 10
assert state.attributes.get(siren.ATTR_VOLUME_LEVEL) == 0.5
async_fire_mqtt_message(
hass,
"state-topic",
'{"state":"beer off", "duration": 5, "volume_level": 0.6}',
)
state = hass.states.get("siren.test")
assert state.state == STATE_OFF
assert state.attributes.get(siren.ATTR_TONE) == "bell"
assert state.attributes.get(siren.ATTR_DURATION) == 5
assert state.attributes.get(siren.ATTR_VOLUME_LEVEL) == 0.6
# Test validation of received attributes, invalid
async_fire_mqtt_message(
hass,
"state-topic",
'{"state":"beer on", "duration": 6, "volume_level": 2 }',
)
state = hass.states.get("siren.test")
assert (
"Unable to update siren state attributes from payload '{'duration': 6, 'volume_level': 2}': value must be at most 1 for dictionary value @ data['volume_level']"
in caplog.text
)
assert state.state == STATE_OFF
assert state.attributes.get(siren.ATTR_TONE) == "bell"
assert state.attributes.get(siren.ATTR_DURATION) == 5
assert state.attributes.get(siren.ATTR_VOLUME_LEVEL) == 0.6
async_fire_mqtt_message(
hass,
"state-topic",
"{}",
)
assert state.state == STATE_OFF
assert state.attributes.get(siren.ATTR_TONE) == "bell"
assert state.attributes.get(siren.ATTR_DURATION) == 5
assert state.attributes.get(siren.ATTR_VOLUME_LEVEL) == 0.6
assert (
"Ignoring empty payload '{}' after rendering for topic state-topic"
in caplog.text
)
async def test_filtering_not_supported_attributes_optimistic(hass, mqtt_mock):
"""Test setting attributes with support flags optimistic."""
config = {
"platform": "mqtt",
"command_topic": "command-topic",
"available_tones": ["ping", "siren", "bell"],
}
config1 = copy.deepcopy(config)
config1["name"] = "test1"
config1["support_duration"] = False
config2 = copy.deepcopy(config)
config2["name"] = "test2"
config2["support_volume_set"] = False
config3 = copy.deepcopy(config)
config3["name"] = "test3"
del config3["available_tones"]
assert await async_setup_component(
hass,
siren.DOMAIN,
{siren.DOMAIN: [config1, config2, config3]},
)
await hass.async_block_till_done()
state1 = hass.states.get("siren.test1")
assert state1.state == STATE_OFF
assert siren.ATTR_DURATION not in state1.attributes
assert siren.ATTR_AVAILABLE_TONES in state1.attributes
assert siren.ATTR_TONE in state1.attributes
assert siren.ATTR_VOLUME_LEVEL in state1.attributes
await async_turn_on(
hass,
entity_id="siren.test1",
parameters={
siren.ATTR_DURATION: 22,
siren.ATTR_TONE: "ping",
ATTR_VOLUME_LEVEL: 0.88,
},
)
state1 = hass.states.get("siren.test1")
assert state1.attributes.get(siren.ATTR_TONE) == "ping"
assert state1.attributes.get(siren.ATTR_DURATION) is None
assert state1.attributes.get(siren.ATTR_VOLUME_LEVEL) == 0.88
state2 = hass.states.get("siren.test2")
assert siren.ATTR_DURATION in state2.attributes
assert siren.ATTR_AVAILABLE_TONES in state2.attributes
assert siren.ATTR_TONE in state2.attributes
assert siren.ATTR_VOLUME_LEVEL not in state2.attributes
await async_turn_on(
hass,
entity_id="siren.test2",
parameters={
siren.ATTR_DURATION: 22,
siren.ATTR_TONE: "ping",
ATTR_VOLUME_LEVEL: 0.88,
},
)
state2 = hass.states.get("siren.test2")
assert state2.attributes.get(siren.ATTR_TONE) == "ping"
assert state2.attributes.get(siren.ATTR_DURATION) == 22
assert state2.attributes.get(siren.ATTR_VOLUME_LEVEL) is None
state3 = hass.states.get("siren.test3")
assert siren.ATTR_DURATION in state3.attributes
assert siren.ATTR_AVAILABLE_TONES not in state3.attributes
assert siren.ATTR_TONE not in state3.attributes
assert siren.ATTR_VOLUME_LEVEL in state3.attributes
await async_turn_on(
hass,
entity_id="siren.test3",
parameters={
siren.ATTR_DURATION: 22,
siren.ATTR_TONE: "ping",
ATTR_VOLUME_LEVEL: 0.88,
},
)
state3 = hass.states.get("siren.test3")
assert state3.attributes.get(siren.ATTR_TONE) is None
assert state3.attributes.get(siren.ATTR_DURATION) == 22
assert state3.attributes.get(siren.ATTR_VOLUME_LEVEL) == 0.88
async def test_filtering_not_supported_attributes_via_state(hass, mqtt_mock):
"""Test setting attributes with support flags via state."""
config = {
"platform": "mqtt",
"command_topic": "command-topic",
"available_tones": ["ping", "siren", "bell"],
}
config1 = copy.deepcopy(config)
config1["name"] = "test1"
config1["state_topic"] = "state-topic1"
config1["support_duration"] = False
config2 = copy.deepcopy(config)
config2["name"] = "test2"
config2["state_topic"] = "state-topic2"
config2["support_volume_set"] = False
config3 = copy.deepcopy(config)
config3["name"] = "test3"
config3["state_topic"] = "state-topic3"
del config3["available_tones"]
assert await async_setup_component(
hass,
siren.DOMAIN,
{siren.DOMAIN: [config1, config2, config3]},
)
await hass.async_block_till_done()
state1 = hass.states.get("siren.test1")
assert state1.state == STATE_UNKNOWN
assert siren.ATTR_DURATION not in state1.attributes
assert siren.ATTR_AVAILABLE_TONES in state1.attributes
assert siren.ATTR_TONE in state1.attributes
assert siren.ATTR_VOLUME_LEVEL in state1.attributes
async_fire_mqtt_message(
hass,
"state-topic1",
'{"state":"ON", "duration": 22, "tone": "ping", "volume_level": 0.88}',
)
await hass.async_block_till_done()
state1 = hass.states.get("siren.test1")
assert state1.attributes.get(siren.ATTR_TONE) == "ping"
assert state1.attributes.get(siren.ATTR_DURATION) is None
assert state1.attributes.get(siren.ATTR_VOLUME_LEVEL) == 0.88
state2 = hass.states.get("siren.test2")
assert siren.ATTR_DURATION in state2.attributes
assert siren.ATTR_AVAILABLE_TONES in state2.attributes
assert siren.ATTR_TONE in state2.attributes
assert siren.ATTR_VOLUME_LEVEL not in state2.attributes
async_fire_mqtt_message(
hass,
"state-topic2",
'{"state":"ON", "duration": 22, "tone": "ping", "volume_level": 0.88}',
)
await hass.async_block_till_done()
state2 = hass.states.get("siren.test2")
assert state2.attributes.get(siren.ATTR_TONE) == "ping"
assert state2.attributes.get(siren.ATTR_DURATION) == 22
assert state2.attributes.get(siren.ATTR_VOLUME_LEVEL) is None
state3 = hass.states.get("siren.test3")
assert siren.ATTR_DURATION in state3.attributes
assert siren.ATTR_AVAILABLE_TONES not in state3.attributes
assert siren.ATTR_TONE not in state3.attributes
assert siren.ATTR_VOLUME_LEVEL in state3.attributes
async_fire_mqtt_message(
hass,
"state-topic3",
'{"state":"ON", "duration": 22, "tone": "ping", "volume_level": 0.88}',
)
await hass.async_block_till_done()
state3 = hass.states.get("siren.test3")
assert state3.attributes.get(siren.ATTR_TONE) is None
assert state3.attributes.get(siren.ATTR_DURATION) == 22
assert state3.attributes.get(siren.ATTR_VOLUME_LEVEL) == 0.88
async def test_availability_when_connection_lost(hass, mqtt_mock):
"""Test availability after MQTT disconnection."""
await help_test_availability_when_connection_lost(
hass, mqtt_mock, siren.DOMAIN, DEFAULT_CONFIG
)
async def test_availability_without_topic(hass, mqtt_mock):
"""Test availability without defined availability topic."""
await help_test_availability_without_topic(
hass, mqtt_mock, siren.DOMAIN, DEFAULT_CONFIG
)
async def test_default_availability_payload(hass, mqtt_mock):
"""Test availability by default payload with defined topic."""
config = {
siren.DOMAIN: {
"platform": "mqtt",
"name": "test",
"state_topic": "state-topic",
"command_topic": "command-topic",
"payload_on": 1,
"payload_off": 0,
}
}
await help_test_default_availability_payload(
hass, mqtt_mock, siren.DOMAIN, config, True, "state-topic", "1"
)
async def test_custom_availability_payload(hass, mqtt_mock):
"""Test availability by custom payload with defined topic."""
config = {
siren.DOMAIN: {
"platform": "mqtt",
"name": "test",
"state_topic": "state-topic",
"command_topic": "command-topic",
"payload_on": 1,
"payload_off": 0,
}
}
await help_test_custom_availability_payload(
hass, mqtt_mock, siren.DOMAIN, config, True, "state-topic", "1"
)
async def test_custom_state_payload(hass, mqtt_mock):
"""Test the state payload."""
assert await async_setup_component(
hass,
siren.DOMAIN,
{
siren.DOMAIN: {
"platform": "mqtt",
"name": "test",
"state_topic": "state-topic",
"command_topic": "command-topic",
"payload_on": 1,
"payload_off": 0,
"state_on": "HIGH",
"state_off": "LOW",
}
},
)
await hass.async_block_till_done()
state = hass.states.get("siren.test")
assert state.state == STATE_UNKNOWN
assert not state.attributes.get(ATTR_ASSUMED_STATE)
async_fire_mqtt_message(hass, "state-topic", "HIGH")
state = hass.states.get("siren.test")
assert state.state == STATE_ON
async_fire_mqtt_message(hass, "state-topic", "LOW")
state = hass.states.get("siren.test")
assert state.state == STATE_OFF
async def test_setting_attribute_via_mqtt_json_message(hass, mqtt_mock):
"""Test the setting of attribute via MQTT with JSON payload."""
await help_test_setting_attribute_via_mqtt_json_message(
hass, mqtt_mock, siren.DOMAIN, DEFAULT_CONFIG
)
async def test_setting_blocked_attribute_via_mqtt_json_message(hass, mqtt_mock):
"""Test the setting of attribute via MQTT with JSON payload."""
await help_test_setting_blocked_attribute_via_mqtt_json_message(
hass, mqtt_mock, siren.DOMAIN, DEFAULT_CONFIG, {}
)
async def test_setting_attribute_with_template(hass, mqtt_mock):
"""Test the setting of attribute via MQTT with JSON payload."""
await help_test_setting_attribute_with_template(
hass, mqtt_mock, siren.DOMAIN, DEFAULT_CONFIG
)
async def test_update_with_json_attrs_not_dict(hass, mqtt_mock, caplog):
"""Test attributes get extracted from a JSON result."""
await help_test_update_with_json_attrs_not_dict(
hass, mqtt_mock, caplog, siren.DOMAIN, DEFAULT_CONFIG
)
async def test_update_with_json_attrs_bad_JSON(hass, mqtt_mock, caplog):
"""Test attributes get extracted from a JSON result."""
await help_test_update_with_json_attrs_bad_JSON(
hass, mqtt_mock, caplog, siren.DOMAIN, DEFAULT_CONFIG
)
async def test_discovery_update_attr(hass, mqtt_mock, caplog):
"""Test update of discovered MQTTAttributes."""
await help_test_discovery_update_attr(
hass, mqtt_mock, caplog, siren.DOMAIN, DEFAULT_CONFIG
)
async def test_unique_id(hass, mqtt_mock):
"""Test unique id option only creates one siren per unique_id."""
config = {
siren.DOMAIN: [
{
"platform": "mqtt",
"name": "Test 1",
"state_topic": "test-topic",
"command_topic": "command-topic",
"unique_id": "TOTALLY_UNIQUE",
},
{
"platform": "mqtt",
"name": "Test 2",
"state_topic": "test-topic",
"command_topic": "command-topic",
"unique_id": "TOTALLY_UNIQUE",
},
]
}
await help_test_unique_id(hass, mqtt_mock, siren.DOMAIN, config)
async def test_discovery_removal_siren(hass, mqtt_mock, caplog):
"""Test removal of discovered siren."""
data = (
'{ "name": "test",'
' "state_topic": "test_topic",'
' "command_topic": "test_topic" }'
)
await help_test_discovery_removal(hass, mqtt_mock, caplog, siren.DOMAIN, data)
async def test_discovery_update_siren_topic_template(hass, mqtt_mock, caplog):
"""Test update of discovered siren."""
config1 = copy.deepcopy(DEFAULT_CONFIG[siren.DOMAIN])
config2 = copy.deepcopy(DEFAULT_CONFIG[siren.DOMAIN])
config1["name"] = "Beer"
config2["name"] = "Milk"
config1["state_topic"] = "siren/state1"
config2["state_topic"] = "siren/state2"
config1["state_value_template"] = "{{ value_json.state1.state }}"
config2["state_value_template"] = "{{ value_json.state2.state }}"
state_data1 = [
([("siren/state1", '{"state1":{"state":"ON"}}')], "on", None),
]
state_data2 = [
([("siren/state2", '{"state2":{"state":"OFF"}}')], "off", None),
([("siren/state2", '{"state2":{"state":"ON"}}')], "on", None),
([("siren/state1", '{"state1":{"state":"OFF"}}')], "on", None),
([("siren/state1", '{"state2":{"state":"OFF"}}')], "on", None),
([("siren/state2", '{"state1":{"state":"OFF"}}')], "on", None),
([("siren/state2", '{"state2":{"state":"OFF"}}')], "off", None),
]
await help_test_discovery_update(
hass,
mqtt_mock,
caplog,
siren.DOMAIN,
config1,
config2,
state_data1=state_data1,
state_data2=state_data2,
)
async def test_discovery_update_siren_template(hass, mqtt_mock, caplog):
"""Test update of discovered siren."""
config1 = copy.deepcopy(DEFAULT_CONFIG[siren.DOMAIN])
config2 = copy.deepcopy(DEFAULT_CONFIG[siren.DOMAIN])
config1["name"] = "Beer"
config2["name"] = "Milk"
config1["state_topic"] = "siren/state1"
config2["state_topic"] = "siren/state1"
config1["state_value_template"] = "{{ value_json.state1.state }}"
config2["state_value_template"] = "{{ value_json.state2.state }}"
state_data1 = [
([("siren/state1", '{"state1":{"state":"ON"}}')], "on", None),
]
state_data2 = [
([("siren/state1", '{"state2":{"state":"OFF"}}')], "off", None),
([("siren/state1", '{"state2":{"state":"ON"}}')], "on", None),
([("siren/state1", '{"state1":{"state":"OFF"}}')], "on", None),
([("siren/state1", '{"state2":{"state":"OFF"}}')], "off", None),
]
await help_test_discovery_update(
hass,
mqtt_mock,
caplog,
siren.DOMAIN,
config1,
config2,
state_data1=state_data1,
state_data2=state_data2,
)
async def test_command_templates(hass, mqtt_mock, caplog):
"""Test siren with command templates optimistic."""
config1 = copy.deepcopy(DEFAULT_CONFIG[siren.DOMAIN])
config1["name"] = "Beer"
config1["available_tones"] = ["ping", "chimes"]
config1[
"command_template"
] = "CMD: {{ value }}, DURATION: {{ duration }}, TONE: {{ tone }}, VOLUME: {{ volume_level }}"
config2 = copy.deepcopy(config1)
config2["name"] = "Milk"
config2["command_off_template"] = "CMD_OFF: {{ value }}"
assert await async_setup_component(
hass,
siren.DOMAIN,
{siren.DOMAIN: [config1, config2]},
)
await hass.async_block_till_done()
state1 = hass.states.get("siren.beer")
assert state1.state == STATE_OFF
assert state1.attributes.get(ATTR_ASSUMED_STATE)
state2 = hass.states.get("siren.milk")
assert state2.state == STATE_OFF
assert state1.attributes.get(ATTR_ASSUMED_STATE)
await async_turn_on(
hass,
entity_id="siren.beer",
parameters={
siren.ATTR_DURATION: 22,
siren.ATTR_TONE: "ping",
ATTR_VOLUME_LEVEL: 0.88,
},
)
state1 = hass.states.get("siren.beer")
assert state1.attributes.get(siren.ATTR_TONE) == "ping"
assert state1.attributes.get(siren.ATTR_DURATION) == 22
assert state1.attributes.get(siren.ATTR_VOLUME_LEVEL) == 0.88
mqtt_mock.async_publish.assert_any_call(
"test-topic", "CMD: ON, DURATION: 22, TONE: ping, VOLUME: 0.88", 0, False
)
mqtt_mock.async_publish.call_count == 1
mqtt_mock.reset_mock()
await async_turn_off(
hass,
entity_id="siren.beer",
)
mqtt_mock.async_publish.assert_any_call(
"test-topic", "CMD: OFF, DURATION: , TONE: , VOLUME:", 0, False
)
mqtt_mock.async_publish.call_count == 1
mqtt_mock.reset_mock()
await async_turn_on(
hass,
entity_id="siren.milk",
parameters={
siren.ATTR_DURATION: 22,
siren.ATTR_TONE: "ping",
ATTR_VOLUME_LEVEL: 0.88,
},
)
state2 = hass.states.get("siren.milk")
assert state2.attributes.get(siren.ATTR_TONE) == "ping"
assert state2.attributes.get(siren.ATTR_DURATION) == 22
assert state2.attributes.get(siren.ATTR_VOLUME_LEVEL) == 0.88
await async_turn_off(
hass,
entity_id="siren.milk",
)
mqtt_mock.async_publish.assert_any_call("test-topic", "CMD_OFF: OFF", 0, False)
mqtt_mock.async_publish.call_count == 1
mqtt_mock.reset_mock()
async def test_discovery_update_unchanged_siren(hass, mqtt_mock, caplog):
"""Test update of discovered siren."""
data1 = (
'{ "name": "Beer",'
' "device_class": "siren",'
' "state_topic": "test_topic",'
' "command_topic": "test_topic" }'
)
with patch(
"homeassistant.components.mqtt.siren.MqttSiren.discovery_update"
) as discovery_update:
await help_test_discovery_update_unchanged(
hass, mqtt_mock, caplog, siren.DOMAIN, data1, discovery_update
)
@pytest.mark.no_fail_on_log_exception
async def test_discovery_broken(hass, mqtt_mock, caplog):
"""Test handling of bad discovery message."""
data1 = '{ "name": "Beer" }'
data2 = (
'{ "name": "Milk",'
' "state_topic": "test_topic",'
' "command_topic": "test_topic" }'
)
await help_test_discovery_broken(
hass, mqtt_mock, caplog, siren.DOMAIN, data1, data2
)
async def test_entity_device_info_with_connection(hass, mqtt_mock):
"""Test MQTT siren device registry integration."""
await help_test_entity_device_info_with_connection(
hass, mqtt_mock, siren.DOMAIN, DEFAULT_CONFIG
)
async def test_entity_device_info_with_identifier(hass, mqtt_mock):
"""Test MQTT siren device registry integration."""
await help_test_entity_device_info_with_identifier(
hass, mqtt_mock, siren.DOMAIN, DEFAULT_CONFIG
)
async def test_entity_device_info_update(hass, mqtt_mock):
"""Test device registry update."""
await help_test_entity_device_info_update(
hass, mqtt_mock, siren.DOMAIN, DEFAULT_CONFIG
)
async def test_entity_device_info_remove(hass, mqtt_mock):
"""Test device registry remove."""
await help_test_entity_device_info_remove(
hass, mqtt_mock, siren.DOMAIN, DEFAULT_CONFIG
)
async def test_entity_id_update_subscriptions(hass, mqtt_mock):
"""Test MQTT subscriptions are managed when entity_id is updated."""
await help_test_entity_id_update_subscriptions(
hass, mqtt_mock, siren.DOMAIN, DEFAULT_CONFIG
)
async def test_entity_id_update_discovery_update(hass, mqtt_mock):
"""Test MQTT discovery update when entity_id is updated."""
await help_test_entity_id_update_discovery_update(
hass, mqtt_mock, siren.DOMAIN, DEFAULT_CONFIG
)
async def test_entity_debug_info_message(hass, mqtt_mock):
"""Test MQTT debug info."""
await help_test_entity_debug_info_message(
hass, mqtt_mock, siren.DOMAIN, DEFAULT_CONFIG
)
@pytest.mark.parametrize(
"service,topic,parameters,payload,template",
[
(
siren.SERVICE_TURN_ON,
"command_topic",
None,
'{"state": "ON"}',
None,
),
(
siren.SERVICE_TURN_OFF,
"command_topic",
None,
'{"state": "OFF"}',
None,
),
],
)
async def test_publishing_with_custom_encoding(
hass,
mqtt_mock,
caplog,
service,
topic,
parameters,
payload,
template,
):
"""Test publishing MQTT payload with command templates and different encoding."""
domain = siren.DOMAIN
config = copy.deepcopy(DEFAULT_CONFIG[domain])
config[siren.ATTR_AVAILABLE_TONES] = ["siren", "xylophone"]
await help_test_publishing_with_custom_encoding(
hass,
mqtt_mock,
caplog,
domain,
config,
service,
topic,
parameters,
payload,
template,
)
async def test_reloadable(hass, mqtt_mock, caplog, tmp_path):
"""Test reloading the MQTT platform."""
domain = siren.DOMAIN
config = DEFAULT_CONFIG[domain]
await help_test_reloadable(hass, mqtt_mock, caplog, tmp_path, domain, config)
@pytest.mark.parametrize(
"topic,value,attribute,attribute_value",
[
("state_topic", "ON", None, "on"),
],
)
async def test_encoding_subscribable_topics(
hass, mqtt_mock, caplog, topic, value, attribute, attribute_value
):
"""Test handling of incoming encoded payload."""
await help_test_encoding_subscribable_topics(
hass,
mqtt_mock,
caplog,
siren.DOMAIN,
DEFAULT_CONFIG[siren.DOMAIN],
topic,
value,
attribute,
attribute_value,
)