"""Allows to configure custom shell commands to turn a value for a sensor.""" from __future__ import annotations from collections.abc import Mapping from datetime import timedelta import json import logging import voluptuous as vol from homeassistant.components.sensor import PLATFORM_SCHEMA, SensorEntity from homeassistant.const import ( CONF_COMMAND, CONF_NAME, CONF_UNIQUE_ID, CONF_UNIT_OF_MEASUREMENT, CONF_VALUE_TEMPLATE, STATE_UNKNOWN, ) from homeassistant.core import HomeAssistant from homeassistant.exceptions import TemplateError from homeassistant.helpers import template import homeassistant.helpers.config_validation as cv from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.reload import setup_reload_service from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from . import check_output_or_log from .const import CONF_COMMAND_TIMEOUT, DEFAULT_TIMEOUT, DOMAIN, PLATFORMS _LOGGER = logging.getLogger(__name__) CONF_JSON_ATTRIBUTES = "json_attributes" DEFAULT_NAME = "Command Sensor" SCAN_INTERVAL = timedelta(seconds=60) PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( { vol.Required(CONF_COMMAND): cv.string, vol.Optional(CONF_COMMAND_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int, vol.Optional(CONF_JSON_ATTRIBUTES): cv.ensure_list_csv, vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string, vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string, vol.Optional(CONF_VALUE_TEMPLATE): cv.template, vol.Optional(CONF_UNIQUE_ID): cv.string, } ) def setup_platform( hass: HomeAssistant, config: ConfigType, add_entities: AddEntitiesCallback, discovery_info: DiscoveryInfoType | None = None, ) -> None: """Set up the Command Sensor.""" setup_reload_service(hass, DOMAIN, PLATFORMS) name = config.get(CONF_NAME) command = config.get(CONF_COMMAND) unit = config.get(CONF_UNIT_OF_MEASUREMENT) value_template = config.get(CONF_VALUE_TEMPLATE) command_timeout = config.get(CONF_COMMAND_TIMEOUT) unique_id = config.get(CONF_UNIQUE_ID) if value_template is not None: value_template.hass = hass json_attributes = config.get(CONF_JSON_ATTRIBUTES) data = CommandSensorData(hass, command, command_timeout) add_entities( [ CommandSensor( hass, data, name, unit, value_template, json_attributes, unique_id ) ], True, ) class CommandSensor(SensorEntity): """Representation of a sensor that is using shell commands.""" def __init__( self, hass, data, name, unit_of_measurement, value_template, json_attributes, unique_id, ): """Initialize the sensor.""" self._hass = hass self.data = data self._attributes = None self._json_attributes = json_attributes self._name = name self._state = None self._unit_of_measurement = unit_of_measurement self._value_template = value_template self._attr_unique_id = unique_id @property def name(self): """Return the name of the sensor.""" return self._name @property def native_unit_of_measurement(self): """Return the unit the value is expressed in.""" return self._unit_of_measurement @property def native_value(self): """Return the state of the device.""" return self._state @property def extra_state_attributes(self): """Return the state attributes.""" return self._attributes def update(self): """Get the latest data and updates the state.""" self.data.update() value = self.data.value if self._json_attributes: self._attributes = {} if value: try: json_dict = json.loads(value) if isinstance(json_dict, Mapping): self._attributes = { k: json_dict[k] for k in self._json_attributes if k in json_dict } else: _LOGGER.warning("JSON result was not a dictionary") except ValueError: _LOGGER.warning("Unable to parse output as JSON: %s", value) else: _LOGGER.warning("Empty reply found when expecting JSON data") if value is None: value = STATE_UNKNOWN elif self._value_template is not None: self._state = self._value_template.render_with_possible_json_value( value, STATE_UNKNOWN ) else: self._state = value class CommandSensorData: """The class for handling the data retrieval.""" def __init__(self, hass, command, command_timeout): """Initialize the data object.""" self.value = None self.hass = hass self.command = command self.timeout = command_timeout def update(self): """Get the latest data with a shell command.""" command = self.command if " " not in command: prog = command args = None args_compiled = None else: prog, args = command.split(" ", 1) args_compiled = template.Template(args, self.hass) if args_compiled: try: args_to_render = {"arguments": args} rendered_args = args_compiled.render(args_to_render) except TemplateError as ex: _LOGGER.exception("Error rendering command template: %s", ex) return else: rendered_args = None if rendered_args == args: # No template used. default behavior pass else: # Template used. Construct the string used in the shell command = f"{prog} {rendered_args}" _LOGGER.debug("Running command: %s", command) self.value = check_output_or_log(command, self.timeout)