"""Module to coordinate user intentions.""" from __future__ import annotations from abc import abstractmethod import asyncio from collections.abc import Collection, Coroutine, Iterable import dataclasses from dataclasses import dataclass, field from enum import Enum, auto from functools import cached_property from itertools import groupby import logging from typing import Any import voluptuous as vol from homeassistant.components.homeassistant.exposed_entities import async_should_expose from homeassistant.const import ( ATTR_DEVICE_CLASS, ATTR_ENTITY_ID, ATTR_SUPPORTED_FEATURES, ) from homeassistant.core import Context, HomeAssistant, State, callback from homeassistant.exceptions import HomeAssistantError from homeassistant.loader import bind_hass from homeassistant.util.hass_dict import HassKey from . import ( area_registry, config_validation as cv, device_registry, entity_registry, floor_registry, ) _LOGGER = logging.getLogger(__name__) type _SlotsType = dict[str, Any] INTENT_TURN_OFF = "HassTurnOff" INTENT_TURN_ON = "HassTurnOn" INTENT_TOGGLE = "HassToggle" INTENT_GET_STATE = "HassGetState" INTENT_NEVERMIND = "HassNevermind" INTENT_SET_POSITION = "HassSetPosition" INTENT_START_TIMER = "HassStartTimer" INTENT_CANCEL_TIMER = "HassCancelTimer" INTENT_INCREASE_TIMER = "HassIncreaseTimer" INTENT_DECREASE_TIMER = "HassDecreaseTimer" INTENT_PAUSE_TIMER = "HassPauseTimer" INTENT_UNPAUSE_TIMER = "HassUnpauseTimer" INTENT_TIMER_STATUS = "HassTimerStatus" SLOT_SCHEMA = vol.Schema({}, extra=vol.ALLOW_EXTRA) DATA_KEY: HassKey[dict[str, IntentHandler]] = HassKey("intent") SPEECH_TYPE_PLAIN = "plain" SPEECH_TYPE_SSML = "ssml" @callback @bind_hass def async_register(hass: HomeAssistant, handler: IntentHandler) -> None: """Register an intent with Home Assistant.""" if (intents := hass.data.get(DATA_KEY)) is None: intents = {} hass.data[DATA_KEY] = intents assert getattr(handler, "intent_type", None), "intent_type should be set" if handler.intent_type in intents: _LOGGER.warning( "Intent %s is being overwritten by %s", handler.intent_type, handler ) intents[handler.intent_type] = handler @callback @bind_hass def async_remove(hass: HomeAssistant, intent_type: str) -> None: """Remove an intent from Home Assistant.""" if (intents := hass.data.get(DATA_KEY)) is None: return intents.pop(intent_type, None) @callback def async_get(hass: HomeAssistant) -> Iterable[IntentHandler]: """Return registered intents.""" return hass.data.get(DATA_KEY, {}).values() @bind_hass async def async_handle( hass: HomeAssistant, platform: str, intent_type: str, slots: _SlotsType | None = None, text_input: str | None = None, context: Context | None = None, language: str | None = None, assistant: str | None = None, device_id: str | None = None, ) -> IntentResponse: """Handle an intent.""" handler = hass.data.get(DATA_KEY, {}).get(intent_type) if handler is None: raise UnknownIntent(f"Unknown intent {intent_type}") if context is None: context = Context() if language is None: language = hass.config.language intent = Intent( hass, platform=platform, intent_type=intent_type, slots=slots or {}, text_input=text_input, context=context, language=language, assistant=assistant, device_id=device_id, ) try: _LOGGER.info("Triggering intent handler %s", handler) result = await handler.async_handle(intent) except vol.Invalid as err: _LOGGER.warning("Received invalid slot info for %s: %s", intent_type, err) raise InvalidSlotInfo(f"Received invalid slot info for {intent_type}") from err except IntentError: raise # bubble up intent related errors except Exception as err: raise IntentUnexpectedError(f"Error handling {intent_type}") from err return result class IntentError(HomeAssistantError): """Base class for intent related errors.""" class UnknownIntent(IntentError): """When the intent is not registered.""" class InvalidSlotInfo(IntentError): """When the slot data is invalid.""" class IntentHandleError(IntentError): """Error while handling intent.""" def __init__(self, message: str = "", response_key: str | None = None) -> None: """Initialize error.""" super().__init__(message) self.response_key = response_key class IntentUnexpectedError(IntentError): """Unexpected error while handling intent.""" class MatchFailedReason(Enum): """Possible reasons for match failure in async_match_targets.""" NAME = auto() """No entities matched name constraint.""" AREA = auto() """No entities matched area constraint.""" FLOOR = auto() """No entities matched floor constraint.""" DOMAIN = auto() """No entities matched domain constraint.""" DEVICE_CLASS = auto() """No entities matched device class constraint.""" FEATURE = auto() """No entities matched supported features constraint.""" STATE = auto() """No entities matched required states constraint.""" ASSISTANT = auto() """No entities matched exposed to assistant constraint.""" INVALID_AREA = auto() """Area name from constraint does not exist.""" INVALID_FLOOR = auto() """Floor name from constraint does not exist.""" DUPLICATE_NAME = auto() """Two or more entities matched the same name constraint and could not be disambiguated.""" def is_no_entities_reason(self) -> bool: """Return True if the match failed because no entities matched.""" return self not in ( MatchFailedReason.INVALID_AREA, MatchFailedReason.INVALID_FLOOR, MatchFailedReason.DUPLICATE_NAME, ) @dataclass class MatchTargetsConstraints: """Constraints for async_match_targets.""" name: str | None = None """Entity name or alias.""" area_name: str | None = None """Area name, id, or alias.""" floor_name: str | None = None """Floor name, id, or alias.""" domains: Collection[str] | None = None """Domain names.""" device_classes: Collection[str] | None = None """Device class names.""" features: int | None = None """Required supported features.""" states: Collection[str] | None = None """Required states for entities.""" assistant: str | None = None """Name of assistant that entities should be exposed to.""" allow_duplicate_names: bool = False """True if entities with duplicate names are allowed in result.""" @dataclass class MatchTargetsPreferences: """Preferences used to disambiguate duplicate name matches in async_match_targets.""" area_id: str | None = None """Id of area to use when deduplicating names.""" floor_id: str | None = None """Id of floor to use when deduplicating names.""" @dataclass class MatchTargetsResult: """Result from async_match_targets.""" is_match: bool """True if one or more entities matched.""" no_match_reason: MatchFailedReason | None = None """Reason for failed match when is_match = False.""" states: list[State] = field(default_factory=list) """List of matched entity states when is_match = True.""" no_match_name: str | None = None """Name of invalid area/floor or duplicate name when match fails for those reasons.""" areas: list[area_registry.AreaEntry] = field(default_factory=list) """Areas that were targeted.""" floors: list[floor_registry.FloorEntry] = field(default_factory=list) """Floors that were targeted.""" class MatchFailedError(IntentError): """Error when target matching fails.""" def __init__( self, result: MatchTargetsResult, constraints: MatchTargetsConstraints, preferences: MatchTargetsPreferences | None = None, ) -> None: """Initialize error.""" super().__init__() self.result = result self.constraints = constraints self.preferences = preferences def __str__(self) -> str: """Return string representation.""" return f"" class NoStatesMatchedError(MatchFailedError): """Error when no states match the intent's constraints.""" def __init__( self, reason: MatchFailedReason, name: str | None = None, area: str | None = None, floor: str | None = None, domains: set[str] | None = None, device_classes: set[str] | None = None, ) -> None: """Initialize error.""" super().__init__( result=MatchTargetsResult(False, reason), constraints=MatchTargetsConstraints( name=name, area_name=area, floor_name=floor, domains=domains, device_classes=device_classes, ), ) @dataclass class MatchTargetsCandidate: """Candidate for async_match_targets.""" state: State entity: entity_registry.RegistryEntry | None = None area: area_registry.AreaEntry | None = None floor: floor_registry.FloorEntry | None = None device: device_registry.DeviceEntry | None = None matched_name: str | None = None def _find_areas( name: str, areas: area_registry.AreaRegistry ) -> Iterable[area_registry.AreaEntry]: """Find all areas matching a name (including aliases).""" name_norm = _normalize_name(name) for area in areas.async_list_areas(): # Accept name or area id if (area.id == name) or (_normalize_name(area.name) == name_norm): yield area continue if not area.aliases: continue for alias in area.aliases: if _normalize_name(alias) == name_norm: yield area break def _find_floors( name: str, floors: floor_registry.FloorRegistry ) -> Iterable[floor_registry.FloorEntry]: """Find all floors matching a name (including aliases).""" name_norm = _normalize_name(name) for floor in floors.async_list_floors(): # Accept name or floor id if (floor.floor_id == name) or (_normalize_name(floor.name) == name_norm): yield floor continue if not floor.aliases: continue for alias in floor.aliases: if _normalize_name(alias) == name_norm: yield floor break def _normalize_name(name: str) -> str: """Normalize name for comparison.""" return name.strip().casefold() def _filter_by_name( name: str, candidates: Iterable[MatchTargetsCandidate], ) -> Iterable[MatchTargetsCandidate]: """Filter candidates by name.""" name_norm = _normalize_name(name) for candidate in candidates: # Accept name or entity id if (candidate.state.entity_id == name) or _normalize_name( candidate.state.name ) == name_norm: candidate.matched_name = name yield candidate continue if candidate.entity is None: continue if candidate.entity.name and ( _normalize_name(candidate.entity.name) == name_norm ): candidate.matched_name = name yield candidate continue # Check aliases if candidate.entity.aliases: for alias in candidate.entity.aliases: if _normalize_name(alias) == name_norm: candidate.matched_name = name yield candidate break def _filter_by_features( features: int, candidates: Iterable[MatchTargetsCandidate], ) -> Iterable[MatchTargetsCandidate]: """Filter candidates by supported features.""" for candidate in candidates: if (candidate.entity is not None) and ( (candidate.entity.supported_features & features) == features ): yield candidate continue supported_features = candidate.state.attributes.get(ATTR_SUPPORTED_FEATURES, 0) if (supported_features & features) == features: yield candidate def _filter_by_device_classes( device_classes: Iterable[str], candidates: Iterable[MatchTargetsCandidate], ) -> Iterable[MatchTargetsCandidate]: """Filter candidates by device classes.""" for candidate in candidates: if ( (candidate.entity is not None) and candidate.entity.device_class and (candidate.entity.device_class in device_classes) ): yield candidate continue device_class = candidate.state.attributes.get(ATTR_DEVICE_CLASS) if device_class and (device_class in device_classes): yield candidate def _add_areas( areas: area_registry.AreaRegistry, devices: device_registry.DeviceRegistry, candidates: Iterable[MatchTargetsCandidate], ) -> None: """Add area and device entries to match candidates.""" for candidate in candidates: if candidate.entity is None: continue if candidate.entity.device_id: candidate.device = devices.async_get(candidate.entity.device_id) if candidate.entity.area_id: # Use entity area first candidate.area = areas.async_get_area(candidate.entity.area_id) assert candidate.area is not None elif (candidate.device is not None) and candidate.device.area_id: # Fall back to device area candidate.area = areas.async_get_area(candidate.device.area_id) @callback def async_match_targets( # noqa: C901 hass: HomeAssistant, constraints: MatchTargetsConstraints, preferences: MatchTargetsPreferences | None = None, states: list[State] | None = None, ) -> MatchTargetsResult: """Match entities based on constraints in order to handle an intent.""" preferences = preferences or MatchTargetsPreferences() filtered_by_domain = False if not states: # Get all states and filter by domain states = hass.states.async_all(constraints.domains) filtered_by_domain = True if not states: return MatchTargetsResult(False, MatchFailedReason.DOMAIN) if constraints.assistant: # Filter by exposure states = [ s for s in states if async_should_expose(hass, constraints.assistant, s.entity_id) ] if not states: return MatchTargetsResult(False, MatchFailedReason.ASSISTANT) if constraints.domains and (not filtered_by_domain): # Filter by domain (if we didn't already do it) states = [s for s in states if s.domain in constraints.domains] if not states: return MatchTargetsResult(False, MatchFailedReason.DOMAIN) if constraints.states: # Filter by state states = [s for s in states if s.state in constraints.states] if not states: return MatchTargetsResult(False, MatchFailedReason.STATE) # Exit early so we can to avoid registry lookups if not ( constraints.name or constraints.features or constraints.device_classes or constraints.area_name or constraints.floor_name ): return MatchTargetsResult(True, states=states) # We need entity registry entries now er = entity_registry.async_get(hass) candidates = [MatchTargetsCandidate(s, er.async_get(s.entity_id)) for s in states] if constraints.name: # Filter by entity name or alias candidates = list(_filter_by_name(constraints.name, candidates)) if not candidates: return MatchTargetsResult(False, MatchFailedReason.NAME) if constraints.features: # Filter by supported features candidates = list(_filter_by_features(constraints.features, candidates)) if not candidates: return MatchTargetsResult(False, MatchFailedReason.FEATURE) if constraints.device_classes: # Filter by device class candidates = list( _filter_by_device_classes(constraints.device_classes, candidates) ) if not candidates: return MatchTargetsResult(False, MatchFailedReason.DEVICE_CLASS) # Check floor/area constraints targeted_floors: list[floor_registry.FloorEntry] | None = None targeted_areas: list[area_registry.AreaEntry] | None = None # True when area information has been added to candidates areas_added = False if constraints.floor_name or constraints.area_name: ar = area_registry.async_get(hass) dr = device_registry.async_get(hass) _add_areas(ar, dr, candidates) areas_added = True if constraints.floor_name: # Filter by areas associated with floor fr = floor_registry.async_get(hass) targeted_floors = list(_find_floors(constraints.floor_name, fr)) if not targeted_floors: return MatchTargetsResult( False, MatchFailedReason.INVALID_FLOOR, no_match_name=constraints.floor_name, ) possible_floor_ids = {floor.floor_id for floor in targeted_floors} possible_area_ids = { area.id for area in ar.async_list_areas() if area.floor_id in possible_floor_ids } candidates = [ c for c in candidates if (c.area is not None) and (c.area.id in possible_area_ids) ] if not candidates: return MatchTargetsResult( False, MatchFailedReason.FLOOR, floors=targeted_floors ) else: # All areas are possible possible_area_ids = {area.id for area in ar.async_list_areas()} if constraints.area_name: targeted_areas = list(_find_areas(constraints.area_name, ar)) if not targeted_areas: return MatchTargetsResult( False, MatchFailedReason.INVALID_AREA, no_match_name=constraints.area_name, ) matching_area_ids = {area.id for area in targeted_areas} # May be constrained by floors above possible_area_ids.intersection_update(matching_area_ids) candidates = [ c for c in candidates if (c.area is not None) and (c.area.id in possible_area_ids) ] if not candidates: return MatchTargetsResult( False, MatchFailedReason.AREA, areas=targeted_areas ) if constraints.name and (not constraints.allow_duplicate_names): # Check for duplicates if not areas_added: ar = area_registry.async_get(hass) dr = device_registry.async_get(hass) _add_areas(ar, dr, candidates) areas_added = True sorted_candidates = sorted( [c for c in candidates if c.matched_name], key=lambda c: c.matched_name or "", ) final_candidates: list[MatchTargetsCandidate] = [] for name, group in groupby(sorted_candidates, key=lambda c: c.matched_name): group_candidates = list(group) if len(group_candidates) < 2: # No duplicates for name final_candidates.extend(group_candidates) continue # Try to disambiguate by preferences if preferences.floor_id: group_candidates = [ c for c in group_candidates if (c.area is not None) and (c.area.floor_id == preferences.floor_id) ] if len(group_candidates) < 2: # Disambiguated by floor final_candidates.extend(group_candidates) continue if preferences.area_id: group_candidates = [ c for c in group_candidates if (c.area is not None) and (c.area.id == preferences.area_id) ] if len(group_candidates) < 2: # Disambiguated by area final_candidates.extend(group_candidates) continue # Couldn't disambiguate duplicate names return MatchTargetsResult( False, MatchFailedReason.DUPLICATE_NAME, no_match_name=name, areas=targeted_areas or [], floors=targeted_floors or [], ) if not final_candidates: return MatchTargetsResult( False, MatchFailedReason.NAME, areas=targeted_areas or [], floors=targeted_floors or [], ) candidates = final_candidates return MatchTargetsResult( True, None, states=[c.state for c in candidates], areas=targeted_areas or [], floors=targeted_floors or [], ) @callback @bind_hass def async_match_states( hass: HomeAssistant, name: str | None = None, area_name: str | None = None, floor_name: str | None = None, domains: Collection[str] | None = None, device_classes: Collection[str] | None = None, states: list[State] | None = None, ) -> Iterable[State]: """Simplified interface to async_match_targets that returns states matching the constraints.""" result = async_match_targets( hass, constraints=MatchTargetsConstraints( name=name, area_name=area_name, floor_name=floor_name, domains=domains, device_classes=device_classes, ), states=states, ) return result.states @callback def async_test_feature(state: State, feature: int, feature_name: str) -> None: """Test if state supports a feature.""" if state.attributes.get(ATTR_SUPPORTED_FEATURES, 0) & feature == 0: raise IntentHandleError(f"Entity {state.name} does not support {feature_name}") class IntentHandler: """Intent handler registration.""" intent_type: str platforms: Iterable[str] | None = [] description: str | None = None @property def slot_schema(self) -> dict | None: """Return a slot schema.""" return None @callback def async_can_handle(self, intent_obj: Intent) -> bool: """Test if an intent can be handled.""" return self.platforms is None or intent_obj.platform in self.platforms @callback def async_validate_slots(self, slots: _SlotsType) -> _SlotsType: """Validate slot information.""" if self.slot_schema is None: return slots return self._slot_schema(slots) # type: ignore[no-any-return] @cached_property def _slot_schema(self) -> vol.Schema: """Create validation schema for slots.""" assert self.slot_schema is not None return vol.Schema( { key: SLOT_SCHEMA.extend({"value": validator}) for key, validator in self.slot_schema.items() }, extra=vol.ALLOW_EXTRA, ) async def async_handle(self, intent_obj: Intent) -> IntentResponse: """Handle the intent.""" raise NotImplementedError def __repr__(self) -> str: """Represent a string of an intent handler.""" return f"<{self.__class__.__name__} - {self.intent_type}>" class DynamicServiceIntentHandler(IntentHandler): """Service Intent handler registration (dynamic). Service specific intent handler that calls a service by name/entity_id. """ # We use a small timeout in service calls to (hopefully) pass validation # checks, but not try to wait for the call to fully complete. service_timeout: float = 0.2 def __init__( self, intent_type: str, speech: str | None = None, required_slots: dict[str | tuple[str, str], vol.Schema] | None = None, optional_slots: dict[str | tuple[str, str], vol.Schema] | None = None, required_domains: set[str] | None = None, required_features: int | None = None, required_states: set[str] | None = None, description: str | None = None, ) -> None: """Create Service Intent Handler.""" self.intent_type = intent_type self.speech = speech self.required_domains = required_domains self.required_features = required_features self.required_states = required_states self.description = description self.required_slots: dict[tuple[str, str], vol.Schema] = {} if required_slots: for key, value_schema in required_slots.items(): if isinstance(key, str): # Slot name/service data key key = (key, key) self.required_slots[key] = value_schema self.optional_slots: dict[tuple[str, str], vol.Schema] = {} if optional_slots: for key, value_schema in optional_slots.items(): if isinstance(key, str): # Slot name/service data key key = (key, key) self.optional_slots[key] = value_schema @cached_property def slot_schema(self) -> dict: """Return a slot schema.""" slot_schema = { vol.Any("name", "area", "floor"): cv.string, vol.Optional("domain"): vol.All(cv.ensure_list, [cv.string]), vol.Optional("device_class"): vol.All(cv.ensure_list, [cv.string]), vol.Optional("preferred_area_id"): cv.string, vol.Optional("preferred_floor_id"): cv.string, } if self.required_slots: slot_schema.update( { vol.Required(key[0]): validator for key, validator in self.required_slots.items() } ) if self.optional_slots: slot_schema.update( { vol.Optional(key[0]): validator for key, validator in self.optional_slots.items() } ) return slot_schema @abstractmethod def get_domain_and_service( self, intent_obj: Intent, state: State ) -> tuple[str, str]: """Get the domain and service name to call.""" raise NotImplementedError async def async_handle(self, intent_obj: Intent) -> IntentResponse: """Handle the hass intent.""" hass = intent_obj.hass slots = self.async_validate_slots(intent_obj.slots) name_slot = slots.get("name", {}) entity_name: str | None = name_slot.get("value") entity_text: str | None = name_slot.get("text") if entity_name == "all": # Don't match on name if targeting all entities entity_name = None # Get area/floor info area_slot = slots.get("area", {}) area_id = area_slot.get("value") floor_slot = slots.get("floor", {}) floor_id = floor_slot.get("value") # Optional domain/device class filters. # Convert to sets for speed. domains: set[str] | None = self.required_domains device_classes: set[str] | None = None if "domain" in slots: domains = set(slots["domain"]["value"]) if self.required_domains: # Must be a subset of intent's required domain(s) domains.intersection_update(self.required_domains) if "device_class" in slots: device_classes = set(slots["device_class"]["value"]) match_constraints = MatchTargetsConstraints( name=entity_name, area_name=area_id, floor_name=floor_id, domains=domains, device_classes=device_classes, assistant=intent_obj.assistant, features=self.required_features, states=self.required_states, ) match_preferences = MatchTargetsPreferences( area_id=slots.get("preferred_area_id", {}).get("value"), floor_id=slots.get("preferred_floor_id", {}).get("value"), ) match_result = async_match_targets(hass, match_constraints, match_preferences) if not match_result.is_match: raise MatchFailedError( result=match_result, constraints=match_constraints, preferences=match_preferences, ) # Ensure name is text if ("name" in slots) and entity_text: slots["name"]["value"] = entity_text # Replace area/floor values with the resolved ids for use in templates if ("area" in slots) and match_result.areas: slots["area"]["value"] = match_result.areas[0].id if ("floor" in slots) and match_result.floors: slots["floor"]["value"] = match_result.floors[0].floor_id # Update intent slots to include any transformations done by the schemas intent_obj.slots = slots response = await self.async_handle_states( intent_obj, match_result, match_constraints, match_preferences ) # Make the matched states available in the response response.async_set_states( matched_states=match_result.states, unmatched_states=[] ) return response async def async_handle_states( self, intent_obj: Intent, match_result: MatchTargetsResult, match_constraints: MatchTargetsConstraints, match_preferences: MatchTargetsPreferences | None = None, ) -> IntentResponse: """Complete action on matched entity states.""" states = match_result.states response = intent_obj.create_response() hass = intent_obj.hass success_results: list[IntentResponseTarget] = [] if match_result.floors: success_results.extend( IntentResponseTarget( type=IntentResponseTargetType.FLOOR, name=floor.name, id=floor.floor_id, ) for floor in match_result.floors ) speech_name = match_result.floors[0].name elif match_result.areas: success_results.extend( IntentResponseTarget( type=IntentResponseTargetType.AREA, name=area.name, id=area.id ) for area in match_result.areas ) speech_name = match_result.areas[0].name else: speech_name = states[0].name service_coros: list[Coroutine[Any, Any, None]] = [] for state in states: domain, service = self.get_domain_and_service(intent_obj, state) service_coros.append( self.async_call_service(domain, service, intent_obj, state) ) # Handle service calls in parallel, noting failures as they occur. failed_results: list[IntentResponseTarget] = [] for state, service_coro in zip( states, asyncio.as_completed(service_coros), strict=False ): target = IntentResponseTarget( type=IntentResponseTargetType.ENTITY, name=state.name, id=state.entity_id, ) try: await service_coro success_results.append(target) except Exception: failed_results.append(target) _LOGGER.exception("Service call failed for %s", state.entity_id) if not success_results: # If no entities succeeded, raise an error. failed_entity_ids = [target.id for target in failed_results] raise IntentHandleError( f"Failed to call {service} for: {failed_entity_ids}" ) response.async_set_results( success_results=success_results, failed_results=failed_results ) # Update all states states = [hass.states.get(state.entity_id) or state for state in states] response.async_set_states(states) if self.speech is not None: response.async_set_speech(self.speech.format(speech_name)) return response async def async_call_service( self, domain: str, service: str, intent_obj: Intent, state: State ) -> None: """Call service on entity.""" hass = intent_obj.hass service_data: dict[str, Any] = {ATTR_ENTITY_ID: state.entity_id} if self.required_slots: service_data.update( { key[1]: intent_obj.slots[key[0]]["value"] for key in self.required_slots } ) if self.optional_slots: for key in self.optional_slots: value = intent_obj.slots.get(key[0]) if value: service_data[key[1]] = value["value"] await self._run_then_background( hass.async_create_task_internal( hass.services.async_call( domain, service, service_data, context=intent_obj.context, blocking=True, ), f"intent_call_service_{domain}_{service}", ) ) async def _run_then_background(self, task: asyncio.Task[Any]) -> None: """Run task with timeout to (hopefully) catch validation errors. After the timeout the task will continue to run in the background. """ try: await asyncio.wait({task}, timeout=self.service_timeout) except TimeoutError: pass except asyncio.CancelledError: # Task calling us was cancelled, so cancel service call task, and wait for # it to be cancelled, within reason, before leaving. _LOGGER.debug("Service call was cancelled: %s", task.get_name()) task.cancel() await asyncio.wait({task}, timeout=5) raise class ServiceIntentHandler(DynamicServiceIntentHandler): """Service Intent handler registration. Service specific intent handler that calls a service by name/entity_id. """ def __init__( self, intent_type: str, domain: str, service: str, speech: str | None = None, required_slots: dict[str | tuple[str, str], vol.Schema] | None = None, optional_slots: dict[str | tuple[str, str], vol.Schema] | None = None, required_domains: set[str] | None = None, required_features: int | None = None, required_states: set[str] | None = None, description: str | None = None, ) -> None: """Create service handler.""" super().__init__( intent_type, speech=speech, required_slots=required_slots, optional_slots=optional_slots, required_domains=required_domains, required_features=required_features, required_states=required_states, description=description, ) self.domain = domain self.service = service def get_domain_and_service( self, intent_obj: Intent, state: State ) -> tuple[str, str]: """Get the domain and service name to call.""" return (self.domain, self.service) class IntentCategory(Enum): """Category of an intent.""" ACTION = "action" """Trigger an action like turning an entity on or off""" QUERY = "query" """Get information about the state of an entity""" class Intent: """Hold the intent.""" __slots__ = [ "hass", "platform", "intent_type", "slots", "text_input", "context", "language", "category", "assistant", "device_id", ] def __init__( self, hass: HomeAssistant, platform: str, intent_type: str, slots: _SlotsType, text_input: str | None, context: Context, language: str, category: IntentCategory | None = None, assistant: str | None = None, device_id: str | None = None, ) -> None: """Initialize an intent.""" self.hass = hass self.platform = platform self.intent_type = intent_type self.slots = slots self.text_input = text_input self.context = context self.language = language self.category = category self.assistant = assistant self.device_id = device_id @callback def create_response(self) -> IntentResponse: """Create a response.""" return IntentResponse(language=self.language, intent=self) class IntentResponseType(Enum): """Type of the intent response.""" ACTION_DONE = "action_done" """Intent caused an action to occur""" PARTIAL_ACTION_DONE = "partial_action_done" """Intent caused an action, but it could only be partially done""" QUERY_ANSWER = "query_answer" """Response is an answer to a query""" ERROR = "error" """Response is an error""" class IntentResponseErrorCode(str, Enum): """Reason for an intent response error.""" NO_INTENT_MATCH = "no_intent_match" """Text could not be matched to an intent""" NO_VALID_TARGETS = "no_valid_targets" """Intent was matched, but no valid areas/devices/entities were targeted""" FAILED_TO_HANDLE = "failed_to_handle" """Unexpected error occurred while handling intent""" UNKNOWN = "unknown" """Error outside the scope of intent processing""" class IntentResponseTargetType(str, Enum): """Type of target for an intent response.""" AREA = "area" FLOOR = "floor" DEVICE = "device" ENTITY = "entity" DOMAIN = "domain" DEVICE_CLASS = "device_class" CUSTOM = "custom" @dataclass(slots=True) class IntentResponseTarget: """Target of the intent response.""" name: str type: IntentResponseTargetType id: str | None = None class IntentResponse: """Response to an intent.""" def __init__( self, language: str, intent: Intent | None = None, ) -> None: """Initialize an IntentResponse.""" self.language = language self.intent = intent self.speech: dict[str, dict[str, Any]] = {} self.reprompt: dict[str, dict[str, Any]] = {} self.card: dict[str, dict[str, str]] = {} self.error_code: IntentResponseErrorCode | None = None self.intent_targets: list[IntentResponseTarget] = [] self.success_results: list[IntentResponseTarget] = [] self.failed_results: list[IntentResponseTarget] = [] self.matched_states: list[State] = [] self.unmatched_states: list[State] = [] self.speech_slots: dict[str, Any] = {} if (self.intent is not None) and (self.intent.category == IntentCategory.QUERY): # speech will be the answer to the query self.response_type = IntentResponseType.QUERY_ANSWER else: self.response_type = IntentResponseType.ACTION_DONE @callback def async_set_speech( self, speech: str, speech_type: str = "plain", extra_data: Any | None = None, ) -> None: """Set speech response.""" self.speech[speech_type] = { "speech": speech, "extra_data": extra_data, } @callback def async_set_reprompt( self, speech: str, speech_type: str = "plain", extra_data: Any | None = None, ) -> None: """Set reprompt response.""" self.reprompt[speech_type] = { "reprompt": speech, "extra_data": extra_data, } @callback def async_set_card( self, title: str, content: str, card_type: str = "simple" ) -> None: """Set card response.""" self.card[card_type] = {"title": title, "content": content} @callback def async_set_error(self, code: IntentResponseErrorCode, message: str) -> None: """Set response error.""" self.response_type = IntentResponseType.ERROR self.error_code = code # Speak error message self.async_set_speech(message) @callback def async_set_targets( self, intent_targets: list[IntentResponseTarget], ) -> None: """Set response targets.""" self.intent_targets = intent_targets @callback def async_set_results( self, success_results: list[IntentResponseTarget], failed_results: list[IntentResponseTarget] | None = None, ) -> None: """Set response results.""" self.success_results = success_results self.failed_results = failed_results if failed_results is not None else [] @callback def async_set_states( self, matched_states: list[State], unmatched_states: list[State] | None = None ) -> None: """Set entity states that were matched or not matched during intent handling (query).""" self.matched_states = matched_states self.unmatched_states = unmatched_states or [] @callback def async_set_speech_slots(self, speech_slots: dict[str, Any]) -> None: """Set slots that will be used in the response template of the default agent.""" self.speech_slots = speech_slots @callback def as_dict(self) -> dict[str, Any]: """Return a dictionary representation of an intent response.""" response_dict: dict[str, Any] = { "speech": self.speech, "card": self.card, "language": self.language, "response_type": self.response_type.value, } if self.reprompt: response_dict["reprompt"] = self.reprompt response_data: dict[str, Any] = {} if self.response_type == IntentResponseType.ERROR: assert self.error_code is not None, "error code is required" response_data["code"] = self.error_code.value else: # action done or query answer response_data["targets"] = [ dataclasses.asdict(target) for target in self.intent_targets ] # Add success/failed targets response_data["success"] = [ dataclasses.asdict(target) for target in self.success_results ] response_data["failed"] = [ dataclasses.asdict(target) for target in self.failed_results ] response_dict["data"] = response_data return response_dict