core/homeassistant/components/command_line/sensor.py

177 lines
5.8 KiB
Python

"""Allows to configure custom shell commands to turn a value for a sensor."""
from __future__ import annotations
from collections.abc import Mapping
from datetime import timedelta
import json
import logging
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA, SensorEntity
from homeassistant.const import (
CONF_COMMAND,
CONF_NAME,
CONF_UNIQUE_ID,
CONF_UNIT_OF_MEASUREMENT,
CONF_VALUE_TEMPLATE,
STATE_UNKNOWN,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import TemplateError
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.reload import setup_reload_service
from homeassistant.helpers.template import Template
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import check_output_or_log
from .const import CONF_COMMAND_TIMEOUT, DEFAULT_TIMEOUT, DOMAIN, PLATFORMS
_LOGGER = logging.getLogger(__name__)
CONF_JSON_ATTRIBUTES = "json_attributes"
DEFAULT_NAME = "Command Sensor"
SCAN_INTERVAL = timedelta(seconds=60)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_COMMAND): cv.string,
vol.Optional(CONF_COMMAND_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
vol.Optional(CONF_JSON_ATTRIBUTES): cv.ensure_list_csv,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
vol.Optional(CONF_UNIQUE_ID): cv.string,
}
)
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Command Sensor."""
setup_reload_service(hass, DOMAIN, PLATFORMS)
name: str = config[CONF_NAME]
command: str = config[CONF_COMMAND]
unit: str | None = config.get(CONF_UNIT_OF_MEASUREMENT)
value_template: Template | None = config.get(CONF_VALUE_TEMPLATE)
command_timeout: int = config[CONF_COMMAND_TIMEOUT]
unique_id: str | None = config.get(CONF_UNIQUE_ID)
if value_template is not None:
value_template.hass = hass
json_attributes: list[str] | None = config.get(CONF_JSON_ATTRIBUTES)
data = CommandSensorData(hass, command, command_timeout)
add_entities(
[CommandSensor(data, name, unit, value_template, json_attributes, unique_id)],
True,
)
class CommandSensor(SensorEntity):
"""Representation of a sensor that is using shell commands."""
def __init__(
self,
data: CommandSensorData,
name: str,
unit_of_measurement: str | None,
value_template: Template | None,
json_attributes: list[str] | None,
unique_id: str | None,
) -> None:
"""Initialize the sensor."""
self.data = data
self._attr_extra_state_attributes = {}
self._json_attributes = json_attributes
self._attr_name = name
self._attr_native_value = None
self._attr_native_unit_of_measurement = unit_of_measurement
self._value_template = value_template
self._attr_unique_id = unique_id
def update(self) -> None:
"""Get the latest data and updates the state."""
self.data.update()
value = self.data.value
if self._json_attributes:
self._attr_extra_state_attributes = {}
if value:
try:
json_dict = json.loads(value)
if isinstance(json_dict, Mapping):
self._attr_extra_state_attributes = {
k: json_dict[k]
for k in self._json_attributes
if k in json_dict
}
else:
_LOGGER.warning("JSON result was not a dictionary")
except ValueError:
_LOGGER.warning("Unable to parse output as JSON: %s", value)
else:
_LOGGER.warning("Empty reply found when expecting JSON data")
if value is None:
value = STATE_UNKNOWN
elif self._value_template is not None:
self._attr_native_value = (
self._value_template.render_with_possible_json_value(
value, STATE_UNKNOWN
)
)
else:
self._attr_native_value = value
class CommandSensorData:
"""The class for handling the data retrieval."""
def __init__(self, hass: HomeAssistant, command: str, command_timeout: int) -> None:
"""Initialize the data object."""
self.value: str | None = None
self.hass = hass
self.command = command
self.timeout = command_timeout
def update(self) -> None:
"""Get the latest data with a shell command."""
command = self.command
if " " not in command:
prog = command
args = None
args_compiled = None
else:
prog, args = command.split(" ", 1)
args_compiled = Template(args, self.hass)
if args_compiled:
try:
args_to_render = {"arguments": args}
rendered_args = args_compiled.render(args_to_render)
except TemplateError as ex:
_LOGGER.exception("Error rendering command template: %s", ex)
return
else:
rendered_args = None
if rendered_args == args:
# No template used. default behavior
pass
else:
# Template used. Construct the string used in the shell
command = f"{prog} {rendered_args}"
_LOGGER.debug("Running command: %s", command)
self.value = check_output_or_log(command, self.timeout)