core/homeassistant/components/mqtt/sensor.py

204 lines
6.7 KiB
Python

"""Support for MQTT sensors."""
from datetime import timedelta
import functools
from typing import Optional
import voluptuous as vol
from homeassistant.components import sensor
from homeassistant.components.sensor import DEVICE_CLASSES_SCHEMA
from homeassistant.const import (
CONF_DEVICE,
CONF_DEVICE_CLASS,
CONF_FORCE_UPDATE,
CONF_ICON,
CONF_NAME,
CONF_UNIQUE_ID,
CONF_UNIT_OF_MEASUREMENT,
CONF_VALUE_TEMPLATE,
)
from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import async_track_point_in_utc_time
from homeassistant.helpers.reload import async_setup_reload_service
from homeassistant.helpers.typing import ConfigType, HomeAssistantType
from homeassistant.util import dt as dt_util
from . import CONF_QOS, CONF_STATE_TOPIC, DOMAIN, PLATFORMS, subscription
from .. import mqtt
from .debug_info import log_messages
from .mixins import (
MQTT_AVAILABILITY_SCHEMA,
MQTT_ENTITY_DEVICE_INFO_SCHEMA,
MQTT_JSON_ATTRS_SCHEMA,
MqttAvailability,
MqttEntity,
async_setup_entry_helper,
)
CONF_EXPIRE_AFTER = "expire_after"
DEFAULT_NAME = "MQTT Sensor"
DEFAULT_FORCE_UPDATE = False
PLATFORM_SCHEMA = (
mqtt.MQTT_RO_PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_DEVICE): MQTT_ENTITY_DEVICE_INFO_SCHEMA,
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
vol.Optional(CONF_EXPIRE_AFTER): cv.positive_int,
vol.Optional(CONF_FORCE_UPDATE, default=DEFAULT_FORCE_UPDATE): cv.boolean,
vol.Optional(CONF_ICON): cv.icon,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_UNIQUE_ID): cv.string,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
}
)
.extend(MQTT_AVAILABILITY_SCHEMA.schema)
.extend(MQTT_JSON_ATTRS_SCHEMA.schema)
)
async def async_setup_platform(
hass: HomeAssistantType, config: ConfigType, async_add_entities, discovery_info=None
):
"""Set up MQTT sensors through configuration.yaml."""
await async_setup_reload_service(hass, DOMAIN, PLATFORMS)
await _async_setup_entity(hass, async_add_entities, config)
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up MQTT sensors dynamically through MQTT discovery."""
setup = functools.partial(
_async_setup_entity, hass, async_add_entities, config_entry=config_entry
)
await async_setup_entry_helper(hass, sensor.DOMAIN, setup, PLATFORM_SCHEMA)
async def _async_setup_entity(
hass, async_add_entities, config: ConfigType, config_entry=None, discovery_data=None
):
"""Set up MQTT sensor."""
async_add_entities([MqttSensor(hass, config, config_entry, discovery_data)])
class MqttSensor(MqttEntity, Entity):
"""Representation of a sensor that can be updated using MQTT."""
def __init__(self, hass, config, config_entry, discovery_data):
"""Initialize the sensor."""
self._state = None
self._expiration_trigger = None
expire_after = config.get(CONF_EXPIRE_AFTER)
if expire_after is not None and expire_after > 0:
self._expired = True
else:
self._expired = None
MqttEntity.__init__(self, hass, config, config_entry, discovery_data)
@staticmethod
def config_schema():
"""Return the config schema."""
return PLATFORM_SCHEMA
def _setup_from_config(self, config):
"""(Re)Setup the entity."""
self._config = config
template = self._config.get(CONF_VALUE_TEMPLATE)
if template is not None:
template.hass = self.hass
async def _subscribe_topics(self):
"""(Re)Subscribe to topics."""
@callback
@log_messages(self.hass, self.entity_id)
def message_received(msg):
"""Handle new MQTT messages."""
payload = msg.payload
# auto-expire enabled?
expire_after = self._config.get(CONF_EXPIRE_AFTER)
if expire_after is not None and expire_after > 0:
# When expire_after is set, and we receive a message, assume device is not expired since it has to be to receive the message
self._expired = False
# Reset old trigger
if self._expiration_trigger:
self._expiration_trigger()
self._expiration_trigger = None
# Set new trigger
expiration_at = dt_util.utcnow() + timedelta(seconds=expire_after)
self._expiration_trigger = async_track_point_in_utc_time(
self.hass, self._value_is_expired, expiration_at
)
template = self._config.get(CONF_VALUE_TEMPLATE)
if template is not None:
payload = template.async_render_with_possible_json_value(
payload, self._state
)
self._state = payload
self.async_write_ha_state()
self._sub_state = await subscription.async_subscribe_topics(
self.hass,
self._sub_state,
{
"state_topic": {
"topic": self._config[CONF_STATE_TOPIC],
"msg_callback": message_received,
"qos": self._config[CONF_QOS],
}
},
)
@callback
def _value_is_expired(self, *_):
"""Triggered when value is expired."""
self._expiration_trigger = None
self._expired = True
self.async_write_ha_state()
@property
def name(self):
"""Return the name of the sensor."""
return self._config[CONF_NAME]
@property
def unit_of_measurement(self):
"""Return the unit this state is expressed in."""
return self._config.get(CONF_UNIT_OF_MEASUREMENT)
@property
def force_update(self):
"""Force update."""
return self._config[CONF_FORCE_UPDATE]
@property
def state(self):
"""Return the state of the entity."""
return self._state
@property
def icon(self):
"""Return the icon."""
return self._config.get(CONF_ICON)
@property
def device_class(self) -> Optional[str]:
"""Return the device class of the sensor."""
return self._config.get(CONF_DEVICE_CLASS)
@property
def available(self) -> bool:
"""Return true if the device is available and value has not expired."""
expire_after = self._config.get(CONF_EXPIRE_AFTER)
# pylint: disable=no-member
return MqttAvailability.available.fget(self) and (
expire_after is None or not self._expired
)