"""Support for scripts.""" from __future__ import annotations import asyncio import logging from typing import Any, cast import voluptuous as vol from voluptuous.humanize import humanize_error from homeassistant.components.blueprint import CONF_USE_BLUEPRINT, BlueprintInputs from homeassistant.const import ( ATTR_ENTITY_ID, ATTR_MODE, ATTR_NAME, CONF_ALIAS, CONF_DESCRIPTION, CONF_ICON, CONF_MODE, CONF_NAME, CONF_PATH, CONF_SEQUENCE, CONF_VARIABLES, SERVICE_RELOAD, SERVICE_TOGGLE, SERVICE_TURN_OFF, SERVICE_TURN_ON, STATE_ON, ) from homeassistant.core import HomeAssistant, ServiceCall, callback from homeassistant.helpers import extract_domain_configs import homeassistant.helpers.config_validation as cv from homeassistant.helpers.config_validation import make_entity_service_schema from homeassistant.helpers.entity import ToggleEntity from homeassistant.helpers.entity_component import EntityComponent from homeassistant.helpers.integration_platform import ( async_process_integration_platform_for_component, ) from homeassistant.helpers.restore_state import RestoreEntity from homeassistant.helpers.script import ( ATTR_CUR, ATTR_MAX, CONF_MAX, CONF_MAX_EXCEEDED, Script, script_stack_cv, ) from homeassistant.helpers.service import async_set_service_schema from homeassistant.helpers.trace import trace_get, trace_path from homeassistant.helpers.typing import ConfigType from homeassistant.loader import bind_hass from homeassistant.util.dt import parse_datetime from .config import ScriptConfig, async_validate_config_item from .const import ( ATTR_LAST_ACTION, ATTR_LAST_TRIGGERED, ATTR_VARIABLES, CONF_FIELDS, CONF_TRACE, DOMAIN, ENTITY_ID_FORMAT, EVENT_SCRIPT_STARTED, LOGGER, ) from .helpers import async_get_blueprints from .trace import trace_script SCRIPT_SERVICE_SCHEMA = vol.Schema(dict) SCRIPT_TURN_ONOFF_SCHEMA = make_entity_service_schema( {vol.Optional(ATTR_VARIABLES): {str: cv.match_all}} ) RELOAD_SERVICE_SCHEMA = vol.Schema({}) @bind_hass def is_on(hass, entity_id): """Return if the script is on based on the statemachine.""" return hass.states.is_state(entity_id, STATE_ON) def _scripts_with_x( hass: HomeAssistant, referenced_id: str, property_name: str ) -> list[str]: """Return all scripts that reference the x.""" if DOMAIN not in hass.data: return [] component: EntityComponent[ScriptEntity] = hass.data[DOMAIN] return [ script_entity.entity_id for script_entity in component.entities if referenced_id in getattr(script_entity.script, property_name) ] def _x_in_script(hass: HomeAssistant, entity_id: str, property_name: str) -> list[str]: """Return all x in a script.""" if DOMAIN not in hass.data: return [] component: EntityComponent[ScriptEntity] = hass.data[DOMAIN] if (script_entity := component.get_entity(entity_id)) is None: return [] return list(getattr(script_entity.script, property_name)) @callback def scripts_with_entity(hass: HomeAssistant, entity_id: str) -> list[str]: """Return all scripts that reference the entity.""" return _scripts_with_x(hass, entity_id, "referenced_entities") @callback def entities_in_script(hass: HomeAssistant, entity_id: str) -> list[str]: """Return all entities in script.""" return _x_in_script(hass, entity_id, "referenced_entities") @callback def scripts_with_device(hass: HomeAssistant, device_id: str) -> list[str]: """Return all scripts that reference the device.""" return _scripts_with_x(hass, device_id, "referenced_devices") @callback def devices_in_script(hass: HomeAssistant, entity_id: str) -> list[str]: """Return all devices in script.""" return _x_in_script(hass, entity_id, "referenced_devices") @callback def scripts_with_area(hass: HomeAssistant, area_id: str) -> list[str]: """Return all scripts that reference the area.""" return _scripts_with_x(hass, area_id, "referenced_areas") @callback def areas_in_script(hass: HomeAssistant, entity_id: str) -> list[str]: """Return all areas in a script.""" return _x_in_script(hass, entity_id, "referenced_areas") @callback def scripts_with_blueprint(hass: HomeAssistant, blueprint_path: str) -> list[str]: """Return all scripts that reference the blueprint.""" if DOMAIN not in hass.data: return [] component: EntityComponent[ScriptEntity] = hass.data[DOMAIN] return [ script_entity.entity_id for script_entity in component.entities if script_entity.referenced_blueprint == blueprint_path ] async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Load the scripts from the configuration.""" hass.data[DOMAIN] = component = EntityComponent[ScriptEntity](LOGGER, DOMAIN, hass) # Process integration platforms right away since # we will create entities before firing EVENT_COMPONENT_LOADED await async_process_integration_platform_for_component(hass, DOMAIN) # To register scripts as valid domain for Blueprint async_get_blueprints(hass) if not await _async_process_config(hass, config, component): await async_get_blueprints(hass).async_populate() async def reload_service(service: ServiceCall) -> None: """Call a service to reload scripts.""" if (conf := await component.async_prepare_reload()) is None: return async_get_blueprints(hass).async_reset_cache() await _async_process_config(hass, conf, component) async def turn_on_service(service: ServiceCall) -> None: """Call a service to turn script on.""" variables = service.data.get(ATTR_VARIABLES) script_entities = await component.async_extract_from_service(service) for script_entity in script_entities: await script_entity.async_turn_on( variables=variables, context=service.context, wait=False ) async def turn_off_service(service: ServiceCall) -> None: """Cancel a script.""" # Stopping a script is ok to be done in parallel script_entities = await component.async_extract_from_service(service) if not script_entities: return await asyncio.wait( [ asyncio.create_task(script_entity.async_turn_off()) for script_entity in script_entities ] ) async def toggle_service(service: ServiceCall) -> None: """Toggle a script.""" script_entities = await component.async_extract_from_service(service) for script_entity in script_entities: await script_entity.async_toggle(context=service.context, wait=False) hass.services.async_register( DOMAIN, SERVICE_RELOAD, reload_service, schema=RELOAD_SERVICE_SCHEMA ) hass.services.async_register( DOMAIN, SERVICE_TURN_ON, turn_on_service, schema=SCRIPT_TURN_ONOFF_SCHEMA ) hass.services.async_register( DOMAIN, SERVICE_TURN_OFF, turn_off_service, schema=SCRIPT_TURN_ONOFF_SCHEMA ) hass.services.async_register( DOMAIN, SERVICE_TOGGLE, toggle_service, schema=SCRIPT_TURN_ONOFF_SCHEMA ) return True async def _async_process_config(hass, config, component) -> bool: """Process script configuration. Return true, if Blueprints were used. """ entities = [] blueprints_used = False for config_key in extract_domain_configs(config, DOMAIN): conf: dict[str, dict[str, Any] | BlueprintInputs] = config[config_key] for object_id, config_block in conf.items(): raw_blueprint_inputs = None raw_config = None if isinstance(config_block, BlueprintInputs): blueprints_used = True blueprint_inputs = config_block raw_blueprint_inputs = blueprint_inputs.config_with_inputs try: raw_config = blueprint_inputs.async_substitute() config_block = cast( dict[str, Any], await async_validate_config_item(hass, raw_config), ) except vol.Invalid as err: LOGGER.error( "Blueprint %s generated invalid script with input %s: %s", blueprint_inputs.blueprint.name, blueprint_inputs.inputs, humanize_error(config_block, err), ) continue else: raw_config = cast(ScriptConfig, config_block).raw_config entities.append( ScriptEntity( hass, object_id, config_block, raw_config, raw_blueprint_inputs ) ) await component.async_add_entities(entities) async def service_handler(service: ServiceCall) -> None: """Execute a service call to script.