Prevent infinite loop in crossconfigured mqtt event streams (#5624)

* Prevent events about MQTT messages received to cause infinite loop when two HA instances are crossconfigured for mqtt_eventstream.

* Fix linting

* Publish all MQTT received events except incoming from eventstream. Also make it configurable.
pull/5707/head
Johan Bloemberg 2017-02-02 06:00:05 +01:00 committed by Paulus Schoutsen
parent 17c4f4d391
commit 96745abf5d
2 changed files with 57 additions and 0 deletions

View File

@ -15,18 +15,23 @@ from homeassistant.const import (
ATTR_SERVICE_DATA, EVENT_CALL_SERVICE, EVENT_SERVICE_EXECUTED, ATTR_SERVICE_DATA, EVENT_CALL_SERVICE, EVENT_SERVICE_EXECUTED,
EVENT_STATE_CHANGED, EVENT_TIME_CHANGED, MATCH_ALL) EVENT_STATE_CHANGED, EVENT_TIME_CHANGED, MATCH_ALL)
from homeassistant.core import EventOrigin, State from homeassistant.core import EventOrigin, State
import homeassistant.helpers.config_validation as cv
from homeassistant.remote import JSONEncoder from homeassistant.remote import JSONEncoder
from .mqtt import EVENT_MQTT_MESSAGE_RECEIVED
DOMAIN = "mqtt_eventstream" DOMAIN = "mqtt_eventstream"
DEPENDENCIES = ['mqtt'] DEPENDENCIES = ['mqtt']
CONF_PUBLISH_TOPIC = 'publish_topic' CONF_PUBLISH_TOPIC = 'publish_topic'
CONF_SUBSCRIBE_TOPIC = 'subscribe_topic' CONF_SUBSCRIBE_TOPIC = 'subscribe_topic'
CONF_PUBLISH_EVENTSTREAM_RECEIVED = 'publish_eventstream_received'
CONFIG_SCHEMA = vol.Schema({ CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({ DOMAIN: vol.Schema({
vol.Optional(CONF_PUBLISH_TOPIC): valid_publish_topic, vol.Optional(CONF_PUBLISH_TOPIC): valid_publish_topic,
vol.Optional(CONF_SUBSCRIBE_TOPIC): valid_subscribe_topic, vol.Optional(CONF_SUBSCRIBE_TOPIC): valid_subscribe_topic,
vol.Optional(CONF_PUBLISH_EVENTSTREAM_RECEIVED, default=False):
cv.boolean,
}), }),
}, extra=vol.ALLOW_EXTRA) }, extra=vol.ALLOW_EXTRA)
@ -45,6 +50,15 @@ def setup(hass, config):
if event.event_type == EVENT_TIME_CHANGED: if event.event_type == EVENT_TIME_CHANGED:
return return
# MQTT fires a bus event for every incoming message, also messages from
# eventstream. Disable publishing these messages to other HA instances
# and possibly creating an infinite loop if these instances publish
# back to this one.
if all([not conf.get(CONF_PUBLISH_EVENTSTREAM_RECEIVED),
event.event_type == EVENT_MQTT_MESSAGE_RECEIVED,
event.data.get('topic') == sub_topic]):
return
# Filter out the events that were triggered by publishing # Filter out the events that were triggered by publishing
# to the MQTT topic, or you will end up in an infinite loop. # to the MQTT topic, or you will end up in an infinite loop.
if event.event_type == EVENT_CALL_SERVICE: if event.event_type == EVENT_CALL_SERVICE:

View File

@ -1,10 +1,12 @@
"""The tests for the MQTT eventstream component.""" """The tests for the MQTT eventstream component."""
from collections import namedtuple
import json import json
import unittest import unittest
from unittest.mock import ANY, patch from unittest.mock import ANY, patch
from homeassistant.bootstrap import setup_component from homeassistant.bootstrap import setup_component
import homeassistant.components.mqtt_eventstream as eventstream import homeassistant.components.mqtt_eventstream as eventstream
import homeassistant.components.mqtt as mqtt
from homeassistant.const import EVENT_STATE_CHANGED from homeassistant.const import EVENT_STATE_CHANGED
from homeassistant.core import State, callback from homeassistant.core import State, callback
from homeassistant.remote import JSONEncoder from homeassistant.remote import JSONEncoder
@ -146,3 +148,44 @@ class TestMqttEventStream(unittest.TestCase):
self.hass.block_till_done() self.hass.block_till_done()
self.assertEqual(1, len(calls)) self.assertEqual(1, len(calls))
@patch('homeassistant.components.mqtt.publish')
def test_mqtt_received_event(self, mock_pub):
"""Don't filter events from the mqtt component about received message.
Mqtt component sends an event if a message is received. Also
messages that originate from an incoming eventstream.
Broadcasting these messages result in an infinite loop if two HA
instances are crossconfigured for the same mqtt topics.
"""
SUB_TOPIC = 'from_slaves'
self.assertTrue(
self.add_eventstream(
pub_topic='bar',
sub_topic=SUB_TOPIC))
self.hass.block_till_done()
# Reset the mock because it will have already gotten calls for the
# mqtt_eventstream state change on initialization, etc.
mock_pub.reset_mock()
# Use MQTT component message handler to simulate firing message
# received event.
MQTTMessage = namedtuple('MQTTMessage', ['topic', 'qos', 'payload'])
message = MQTTMessage(SUB_TOPIC, 1, 'Hello World!'.encode('utf-8'))
mqtt.MQTT._mqtt_on_message(self, None, {'hass': self.hass}, message)
self.hass.block_till_done()
# 'normal' incoming mqtt messages should be broadcasted
self.assertEqual(mock_pub.call_count, 0)
MQTTMessage = namedtuple('MQTTMessage', ['topic', 'qos', 'payload'])
message = MQTTMessage('test_topic', 1, 'Hello World!'.encode('utf-8'))
mqtt.MQTT._mqtt_on_message(self, None, {'hass': self.hass}, message)
self.hass.block_till_done()
# but event from the event stream not
self.assertEqual(mock_pub.call_count, 1)