Update additional platforms to use new MQTT message callback (#22030)

* Move additional platforms to new MQTT callback

* Fix automation.mqtt
pull/22049/head
emontnemery 2019-03-14 18:58:32 +01:00 committed by Paulus Schoutsen
parent 8d2d71c16a
commit b25e951dcc
8 changed files with 32 additions and 32 deletions

View File

@ -342,18 +342,18 @@ class ManualMQTTAlarm(alarm.AlarmControlPanel):
) )
@callback @callback
def message_received(topic, payload, qos): def message_received(msg):
"""Run when new MQTT message has been received.""" """Run when new MQTT message has been received."""
if payload == self._payload_disarm: if msg.payload == self._payload_disarm:
self.async_alarm_disarm(self._code) self.async_alarm_disarm(self._code)
elif payload == self._payload_arm_home: elif msg.payload == self._payload_arm_home:
self.async_alarm_arm_home(self._code) self.async_alarm_arm_home(self._code)
elif payload == self._payload_arm_away: elif msg.payload == self._payload_arm_away:
self.async_alarm_arm_away(self._code) self.async_alarm_arm_away(self._code)
elif payload == self._payload_arm_night: elif msg.payload == self._payload_arm_night:
self.async_alarm_arm_night(self._code) self.async_alarm_arm_night(self._code)
else: else:
_LOGGER.warning("Received unexpected payload: %s", payload) _LOGGER.warning("Received unexpected payload: %s", msg.payload)
return return
await mqtt.async_subscribe( await mqtt.async_subscribe(

View File

@ -29,18 +29,18 @@ async def async_trigger(hass, config, action, automation_info):
encoding = config[CONF_ENCODING] or None encoding = config[CONF_ENCODING] or None
@callback @callback
def mqtt_automation_listener(msg_topic, msg_payload, qos): def mqtt_automation_listener(mqttmsg):
"""Listen for MQTT messages.""" """Listen for MQTT messages."""
if payload is None or payload == msg_payload: if payload is None or payload == mqttmsg.payload:
data = { data = {
'platform': 'mqtt', 'platform': 'mqtt',
'topic': msg_topic, 'topic': mqttmsg.topic,
'payload': msg_payload, 'payload': mqttmsg.payload,
'qos': qos, 'qos': mqttmsg.qos,
} }
try: try:
data['payload_json'] = json.loads(msg_payload) data['payload_json'] = json.loads(mqttmsg.payload)
except ValueError: except ValueError:
pass pass

View File

@ -41,17 +41,17 @@ async def async_setup_scanner(hass, config, async_see, discovery_info=None):
for dev_id, topic in devices.items(): for dev_id, topic in devices.items():
@callback @callback
def async_message_received(topic, payload, qos, dev_id=dev_id): def async_message_received(msg, dev_id=dev_id):
"""Handle received MQTT message.""" """Handle received MQTT message."""
try: try:
data = GPS_JSON_PAYLOAD_SCHEMA(json.loads(payload)) data = GPS_JSON_PAYLOAD_SCHEMA(json.loads(msg.payload))
except vol.MultipleInvalid: except vol.MultipleInvalid:
_LOGGER.error("Skipping update for following data " _LOGGER.error("Skipping update for following data "
"because of missing or malformatted data: %s", "because of missing or malformatted data: %s",
payload) msg.payload)
return return
except ValueError: except ValueError:
_LOGGER.error("Error parsing JSON payload: %s", payload) _LOGGER.error("Error parsing JSON payload: %s", msg.payload)
return return
kwargs = _parse_see_args(dev_id, data) kwargs = _parse_see_args(dev_id, data)

View File

@ -74,9 +74,9 @@ def async_setup(hass, config):
# Process events from a remote server that are received on a queue. # Process events from a remote server that are received on a queue.
@callback @callback
def _event_receiver(topic, payload, qos): def _event_receiver(msg):
"""Receive events published by and fire them on this hass instance.""" """Receive events published by and fire them on this hass instance."""
event = json.loads(payload) event = json.loads(msg.payload)
event_type = event.get('event_type') event_type = event.get('event_type')
event_data = event.get('event_data') event_data = event.get('event_data')

View File

@ -99,16 +99,16 @@ async def async_connect_mqtt(hass, component):
"""Subscribe to MQTT topic.""" """Subscribe to MQTT topic."""
context = hass.data[DOMAIN]['context'] context = hass.data[DOMAIN]['context']
async def async_handle_mqtt_message(topic, payload, qos): async def async_handle_mqtt_message(msg):
"""Handle incoming OwnTracks message.""" """Handle incoming OwnTracks message."""
try: try:
message = json.loads(payload) message = json.loads(msg.payload)
except ValueError: except ValueError:
# If invalid JSON # If invalid JSON
_LOGGER.error("Unable to parse payload as JSON: %s", payload) _LOGGER.error("Unable to parse payload as JSON: %s", msg.payload)
return return
message['topic'] = topic message['topic'] = msg.topic
hass.helpers.dispatcher.async_dispatcher_send( hass.helpers.dispatcher.async_dispatcher_send(
DOMAIN, hass, context, message) DOMAIN, hass, context, message)

View File

@ -61,7 +61,7 @@ async def async_setup_platform(hass, config, async_add_entities,
discovery_info=None): discovery_info=None):
"""Set up the ARWN platform.""" """Set up the ARWN platform."""
@callback @callback
def async_sensor_event_received(topic, payload, qos): def async_sensor_event_received(msg):
"""Process events as sensors. """Process events as sensors.
When a new event on our topic (arwn/#) is received we map it When a new event on our topic (arwn/#) is received we map it
@ -74,8 +74,8 @@ async def async_setup_platform(hass, config, async_add_entities,
This lets us dynamically incorporate sensors without any This lets us dynamically incorporate sensors without any
configuration on our side. configuration on our side.
""" """
event = json.loads(payload) event = json.loads(msg.payload)
sensors = discover_sensors(topic, event) sensors = discover_sensors(msg.topic, event)
if not sensors: if not sensors:
return return

View File

@ -90,16 +90,16 @@ class MQTTRoomSensor(Entity):
self.async_schedule_update_ha_state() self.async_schedule_update_ha_state()
@callback @callback
def message_received(topic, payload, qos): def message_received(msg):
"""Handle new MQTT messages.""" """Handle new MQTT messages."""
try: try:
data = MQTT_PAYLOAD(payload) data = MQTT_PAYLOAD(msg.payload)
except vol.MultipleInvalid as error: except vol.MultipleInvalid as error:
_LOGGER.debug( _LOGGER.debug(
"Skipping update because of malformatted data: %s", error) "Skipping update because of malformatted data: %s", error)
return return
device = _parse_update_data(topic, data) device = _parse_update_data(msg.topic, data)
if device.get(CONF_DEVICE_ID) == self._device_id: if device.get(CONF_DEVICE_ID) == self._device_id:
if self._distance is None or self._updated is None: if self._distance is None or self._updated is None:
update_state(**device) update_state(**device)

View File

@ -95,14 +95,14 @@ async def async_setup(hass, config):
if CONF_FEEDBACK in config[DOMAIN]: if CONF_FEEDBACK in config[DOMAIN]:
async_set_feedback(None, config[DOMAIN][CONF_FEEDBACK]) async_set_feedback(None, config[DOMAIN][CONF_FEEDBACK])
async def message_received(topic, payload, qos): async def message_received(msg):
"""Handle new messages on MQTT.""" """Handle new messages on MQTT."""
_LOGGER.debug("New intent: %s", payload) _LOGGER.debug("New intent: %s", msg.payload)
try: try:
request = json.loads(payload) request = json.loads(msg.payload)
except TypeError: except TypeError:
_LOGGER.error('Received invalid JSON: %s', payload) _LOGGER.error('Received invalid JSON: %s', msg.payload)
return return
if (request['intent']['confidenceScore'] if (request['intent']['confidenceScore']