From 6f398f59df8771b0633c604a6acf8b387fa4b55c Mon Sep 17 00:00:00 2001 From: Moonshot Date: Tue, 12 Jan 2016 21:01:53 -0500 Subject: [PATCH] Fix filtering of EVENT_CALL_SERVICE and EVENT_SERVICE_EXECUTED events --- homeassistant/components/mqtt_eventstream.py | 61 +++++++++++++------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/homeassistant/components/mqtt_eventstream.py b/homeassistant/components/mqtt_eventstream.py index e52124182ff..a90e4b0d42a 100644 --- a/homeassistant/components/mqtt_eventstream.py +++ b/homeassistant/components/mqtt_eventstream.py @@ -18,6 +18,8 @@ mqtt_eventstream: """ import json from homeassistant.core import EventOrigin, State +from homeassistant.components.mqtt import DOMAIN as MQTT_DOMAIN +from homeassistant.components.mqtt import SERVICE_PUBLISH as MQTT_SVC_PUBLISH from homeassistant.const import ( MATCH_ALL, EVENT_TIME_CHANGED, @@ -25,7 +27,6 @@ from homeassistant.const import ( EVENT_SERVICE_EXECUTED, EVENT_STATE_CHANGED, ) - import homeassistant.loader as loader from homeassistant.remote import JSONEncoder @@ -38,41 +39,59 @@ DEPENDENCIES = ['mqtt'] def setup(hass, config): """ Setup our mqtt_eventstream component. """ - def _event_handler(event): - """ Handle events by publishing them on the mqtt queue. """ - if event.origin != EventOrigin.local: - return - if event.event_type in ( - EVENT_TIME_CHANGED, - EVENT_CALL_SERVICE, - EVENT_SERVICE_EXECUTED - ): - return - event = {'event_type': event.event_type, 'event_data': event.data} - msg = json.dumps(event, cls=JSONEncoder) - mqtt.publish(hass, pub_topic, msg) - mqtt = loader.get_component('mqtt') pub_topic = config[DOMAIN].get('publish_topic', None) sub_topic = config[DOMAIN].get('subscribe_topic', None) + def _event_publisher(event): + """ Handle events by publishing them on the mqtt queue. """ + if event.origin != EventOrigin.local: + return + if event.event_type == EVENT_TIME_CHANGED: + return + + # Filter out the events that were triggered by publishing + # to the MQTT topic, or you will end up in an infinite loop. + if event.event_type == EVENT_CALL_SERVICE: + if ( + event.data.get('domain') == MQTT_DOMAIN and + event.data.get('service') == MQTT_SVC_PUBLISH and + event.data.get('topic') == pub_topic + ): + return + + # Filter out all the "event service executed" events because they + # are only used internally by core as callbacks for blocking + # during the interval while a service is being executed. + # They will serve no purpose to the external system, + # and thus are unnecessary traffic. + # And at any rate it would cause an infinite loop to publish them + # because publishing to an MQTT topic itself triggers one. + if event.event_type == EVENT_SERVICE_EXECUTED: + return + + event_info = {'event_type': event.event_type, 'event_data': event.data} + msg = json.dumps(event_info, cls=JSONEncoder) + mqtt.publish(hass, pub_topic, msg) + # Only listen for local events if you are going to publish them - if (pub_topic): - hass.bus.listen(MATCH_ALL, _event_handler) + if pub_topic: + hass.bus.listen(MATCH_ALL, _event_publisher) # Process events from a remote server that are received on a queue def _event_receiver(topic, payload, qos): """ - A new MQTT message, published by the other HA instance, - has been received. + Receive events published by the other HA instance and fire + them on this hass instance. """ - # TODO error handling event = json.loads(payload) event_type = event.get('event_type') event_data = event.get('event_data') # Special case handling for event STATE_CHANGED # We will try to convert state dicts back to State objects + # Copied over from the _handle_api_post_events_event method + # of the api component. if event_type == EVENT_STATE_CHANGED and event_data: for key in ('old_state', 'new_state'): state = State.from_dict(event_data.get(key)) @@ -87,7 +106,7 @@ def setup(hass, config): ) # Only subscribe if you specified a topic - if (sub_topic): + if sub_topic: mqtt.subscribe(hass, sub_topic, _event_receiver) hass.states.set('{domain}.initialized'.format(domain=DOMAIN), True)