"""Allows to configure custom shell commands to turn a value for a sensor.""" from __future__ import annotations import asyncio from collections.abc import Mapping from datetime import datetime, timedelta import json from typing import Any from jsonpath import jsonpath from homeassistant.const import ( CONF_COMMAND, CONF_NAME, CONF_SCAN_INTERVAL, CONF_VALUE_TEMPLATE, ) from homeassistant.core import HomeAssistant from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.event import async_track_time_interval from homeassistant.helpers.template import Template from homeassistant.helpers.trigger_template_entity import ( ManualTriggerSensorEntity, ValueTemplate, ) from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from homeassistant.util import dt as dt_util from .const import ( CONF_COMMAND_TIMEOUT, CONF_JSON_ATTRIBUTES, CONF_JSON_ATTRIBUTES_PATH, LOGGER, TRIGGER_ENTITY_OPTIONS, ) from .utils import async_check_output_or_log, render_template_args DEFAULT_NAME = "Command Sensor" SCAN_INTERVAL = timedelta(seconds=60) async def async_setup_platform( hass: HomeAssistant, config: ConfigType, async_add_entities: AddEntitiesCallback, discovery_info: DiscoveryInfoType | None = None, ) -> None: """Set up the Command Sensor.""" if not discovery_info: return sensor_config = discovery_info command: str = sensor_config[CONF_COMMAND] command_timeout: int = sensor_config[CONF_COMMAND_TIMEOUT] json_attributes: list[str] | None = sensor_config.get(CONF_JSON_ATTRIBUTES) json_attributes_path: str | None = sensor_config.get(CONF_JSON_ATTRIBUTES_PATH) scan_interval: timedelta = sensor_config.get(CONF_SCAN_INTERVAL, SCAN_INTERVAL) value_template: ValueTemplate | None = sensor_config.get(CONF_VALUE_TEMPLATE) data = CommandSensorData(hass, command, command_timeout) trigger_entity_config = { CONF_NAME: Template(sensor_config[CONF_NAME], hass), **{k: v for k, v in sensor_config.items() if k in TRIGGER_ENTITY_OPTIONS}, } async_add_entities( [ CommandSensor( data, trigger_entity_config, value_template, json_attributes, json_attributes_path, scan_interval, ) ] ) class CommandSensor(ManualTriggerSensorEntity): """Representation of a sensor that is using shell commands.""" _attr_should_poll = False def __init__( self, data: CommandSensorData, config: ConfigType, value_template: ValueTemplate | None, json_attributes: list[str] | None, json_attributes_path: str | None, scan_interval: timedelta, ) -> None: """Initialize the sensor.""" super().__init__(self.hass, config) self.data = data self._attr_extra_state_attributes: dict[str, Any] = {} self._json_attributes = json_attributes self._json_attributes_path = json_attributes_path self._attr_native_value = None self._value_template = value_template self._scan_interval = scan_interval self._process_updates: asyncio.Lock | None = None @property def extra_state_attributes(self) -> dict[str, Any]: """Return extra state attributes.""" return self._attr_extra_state_attributes async def async_added_to_hass(self) -> None: """Call when entity about to be added to hass.""" await super().async_added_to_hass() await self._update_entity_state() self.async_on_remove( async_track_time_interval( self.hass, self._update_entity_state, self._scan_interval, name=f"Command Line Sensor - {self.name}", cancel_on_shutdown=True, ), ) async def _update_entity_state(self, now: datetime | None = None) -> None: """Update the state of the entity.""" if self._process_updates is None: self._process_updates = asyncio.Lock() if self._process_updates.locked(): LOGGER.warning( "Updating Command Line Sensor %s took longer than the scheduled update interval %s", self.name, self._scan_interval, ) return async with self._process_updates: await self._async_update() async def _async_update(self) -> None: """Get the latest data and updates the state.""" await self.data.async_update() value = self.data.value variables = self._template_variables_with_value(self.data.value) if not self._render_availability_template(variables): self.async_write_ha_state() return if self._json_attributes: self._attr_extra_state_attributes = {} if value: try: json_dict = json.loads(value) if self._json_attributes_path is not None: json_dict = jsonpath(json_dict, self._json_attributes_path) # jsonpath will always store the result in json_dict[0] # so the next line happens to work exactly as needed to # find the result if isinstance(json_dict, list): json_dict = json_dict[0] if isinstance(json_dict, Mapping): self._attr_extra_state_attributes = { k: json_dict[k] for k in self._json_attributes if k in json_dict } else: LOGGER.warning("JSON result was not a dictionary") except ValueError: LOGGER.warning("Unable to parse output as JSON: %s", value) else: LOGGER.warning("Empty reply found when expecting JSON data") if self._value_template is None: self._attr_native_value = None self._process_manual_data(variables) self.async_write_ha_state() return self._attr_native_value = None if self._value_template is not None and value is not None: value = self._value_template.async_render_as_value_template( self.entity_id, variables, None ) self._set_native_value_with_possible_timestamp(value) self._process_manual_data(variables) self.async_write_ha_state() async def async_update(self) -> None: """Update the entity. Only used by the generic entity update service. """ await self._update_entity_state(dt_util.now()) class CommandSensorData: """The class for handling the data retrieval.""" def __init__(self, hass: HomeAssistant, command: str, command_timeout: int) -> None: """Initialize the data object.""" self.value: str | None = None self.hass = hass self.command = command self.timeout = command_timeout async def async_update(self) -> None: """Get the latest data with a shell command.""" if not (command := render_template_args(self.hass, self.command)): return self.value = await async_check_output_or_log(command, self.timeout)