core/homeassistant/helpers/intent.py

1396 lines
44 KiB
Python
Raw Normal View History

"""Module to coordinate user intentions."""
from __future__ import annotations
from abc import abstractmethod
import asyncio
2024-06-26 09:30:07 +00:00
from collections.abc import Callable, Collection, Coroutine, Iterable
import dataclasses
from dataclasses import dataclass, field
from enum import Enum, auto
from functools import cached_property
from itertools import groupby
import logging
from typing import Any
import voluptuous as vol
from homeassistant.components.homeassistant.exposed_entities import async_should_expose
from homeassistant.const import (
ATTR_DEVICE_CLASS,
ATTR_ENTITY_ID,
ATTR_SUPPORTED_FEATURES,
)
from homeassistant.core import Context, HomeAssistant, State, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.loader import bind_hass
2024-05-07 16:25:16 +00:00
from homeassistant.util.hass_dict import HassKey
from . import (
area_registry,
config_validation as cv,
device_registry,
entity_registry,
floor_registry,
)
2024-06-25 17:02:04 +00:00
from .typing import VolSchemaType
_LOGGER = logging.getLogger(__name__)
type _SlotsType = dict[str, Any]
2024-06-26 09:30:07 +00:00
type _IntentSlotsType = dict[
str | tuple[str, str], VolSchemaType | Callable[[Any], Any]
]
2019-07-31 19:25:30 +00:00
INTENT_TURN_OFF = "HassTurnOff"
INTENT_TURN_ON = "HassTurnOn"
INTENT_TOGGLE = "HassToggle"
INTENT_GET_STATE = "HassGetState"
INTENT_NEVERMIND = "HassNevermind"
INTENT_SET_POSITION = "HassSetPosition"
INTENT_START_TIMER = "HassStartTimer"
INTENT_CANCEL_TIMER = "HassCancelTimer"
INTENT_INCREASE_TIMER = "HassIncreaseTimer"
INTENT_DECREASE_TIMER = "HassDecreaseTimer"
INTENT_PAUSE_TIMER = "HassPauseTimer"
INTENT_UNPAUSE_TIMER = "HassUnpauseTimer"
INTENT_TIMER_STATUS = "HassTimerStatus"
2019-07-31 19:25:30 +00:00
SLOT_SCHEMA = vol.Schema({}, extra=vol.ALLOW_EXTRA)
2024-05-07 16:25:16 +00:00
DATA_KEY: HassKey[dict[str, IntentHandler]] = HassKey("intent")
2019-07-31 19:25:30 +00:00
SPEECH_TYPE_PLAIN = "plain"
SPEECH_TYPE_SSML = "ssml"
@callback
@bind_hass
def async_register(hass: HomeAssistant, handler: IntentHandler) -> None:
"""Register an intent with Home Assistant."""
2021-10-17 18:08:11 +00:00
if (intents := hass.data.get(DATA_KEY)) is None:
intents = {}
hass.data[DATA_KEY] = intents
2024-05-15 04:32:11 +00:00
assert getattr(handler, "intent_type", None), "intent_type should be set"
if handler.intent_type in intents:
2019-07-31 19:25:30 +00:00
_LOGGER.warning(
"Intent %s is being overwritten by %s", handler.intent_type, handler
2019-07-31 19:25:30 +00:00
)
intents[handler.intent_type] = handler
@callback
@bind_hass
def async_remove(hass: HomeAssistant, intent_type: str) -> None:
"""Remove an intent from Home Assistant."""
if (intents := hass.data.get(DATA_KEY)) is None:
return
intents.pop(intent_type, None)
Add LLM tools (#115464) * Add llm helper * break out Tool.specification as class members * Format state output * Fix intent tests * Removed auto initialization of intents - let conversation platforms do that * Handle DynamicServiceIntentHandler.extra_slots * Add optional description to IntentTool init * Add device_id and conversation_id parameters * intent tests * Add LLM tools tests * coverage * add agent_id parameter * Apply suggestions from code review * Apply suggestions from code review * Apply suggestions from code review * Apply suggestions from code review * Apply suggestions from code review * Fix tests * Fix intent schema * Allow a Python function to be registered as am LLM tool * Add IntentHandler.effective_slot_schema * Ensure IntentHandler.slot_schema to be vol.Schema * Raise meaningful error on tool not found * Move this change to a separate PR * Update todo integration intent * Remove Tool constructor * Move IntentTool to intent helper * Convert custom serializer into class method * Remove tool_input from FunctionTool auto arguments to avoid recursion * Remove conversion into Open API format * Apply suggestions from code review * Fix tests * Use HassKey for helpers (see #117012) * Add support for functions with typed lists, dicts, and sets as type hints * Remove FunctionTool * Added API to get registered intents * Move IntentTool to the llm library * Return only handlers in intents.async.get * Removed llm tool registration from intent library * Removed tool registration * Add bind_hass back for now * removed area and floor resolving * fix test * Apply suggestions from code review * Improve coverage * Fix intent_type type * Temporary disable HassClimateGetTemperature intent * Remove bind_hass * Fix usage of slot schema * Fix test * Revert some test changes * Don't mutate tool_input --------- Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io> Co-authored-by: Paulus Schoutsen <balloob@gmail.com>
2024-05-15 23:16:47 +00:00
@callback
def async_get(hass: HomeAssistant) -> Iterable[IntentHandler]:
"""Return registered intents."""
return hass.data.get(DATA_KEY, {}).values()
@bind_hass
2019-07-31 19:25:30 +00:00
async def async_handle(
hass: HomeAssistant,
2019-07-31 19:25:30 +00:00
platform: str,
intent_type: str,
2021-03-17 17:34:19 +00:00
slots: _SlotsType | None = None,
text_input: str | None = None,
context: Context | None = None,
language: str | None = None,
assistant: str | None = None,
device_id: str | None = None,
conversation_agent_id: str | None = None,
) -> IntentResponse:
"""Handle an intent."""
2024-05-07 16:25:16 +00:00
handler = hass.data.get(DATA_KEY, {}).get(intent_type)
if handler is None:
raise UnknownIntent(f"Unknown intent {intent_type}")
if context is None:
context = Context()
if language is None:
language = hass.config.language
intent = Intent(
hass,
platform=platform,
intent_type=intent_type,
slots=slots or {},
text_input=text_input,
context=context,
language=language,
assistant=assistant,
device_id=device_id,
conversation_agent_id=conversation_agent_id,
)
try:
_LOGGER.info("Triggering intent handler %s", handler)
result = await handler.async_handle(intent)
except vol.Invalid as err:
2019-07-31 19:25:30 +00:00
_LOGGER.warning("Received invalid slot info for %s: %s", intent_type, err)
raise InvalidSlotInfo(f"Received invalid slot info for {intent_type}") from err
except IntentError:
raise # bubble up intent related errors
except Exception as err:
_LOGGER.exception("Error handling %s", intent_type)
raise IntentUnexpectedError(f"Error handling {intent_type}") from err
return result
class IntentError(HomeAssistantError):
"""Base class for intent related errors."""
class UnknownIntent(IntentError):
"""When the intent is not registered."""
class InvalidSlotInfo(IntentError):
"""When the slot data is invalid."""
class IntentHandleError(IntentError):
"""Error while handling intent."""
def __init__(self, message: str = "", response_key: str | None = None) -> None:
"""Initialize error."""
super().__init__(message)
self.response_key = response_key
class IntentUnexpectedError(IntentError):
"""Unexpected error while handling intent."""
class MatchFailedReason(Enum):
"""Possible reasons for match failure in async_match_targets."""
NAME = auto()
"""No entities matched name constraint."""
AREA = auto()
"""No entities matched area constraint."""
FLOOR = auto()
"""No entities matched floor constraint."""
DOMAIN = auto()
"""No entities matched domain constraint."""
DEVICE_CLASS = auto()
"""No entities matched device class constraint."""
FEATURE = auto()
"""No entities matched supported features constraint."""
STATE = auto()
"""No entities matched required states constraint."""
ASSISTANT = auto()
"""No entities matched exposed to assistant constraint."""
INVALID_AREA = auto()
"""Area name from constraint does not exist."""
INVALID_FLOOR = auto()
"""Floor name from constraint does not exist."""
DUPLICATE_NAME = auto()
"""Two or more entities matched the same name constraint and could not be disambiguated."""
def is_no_entities_reason(self) -> bool:
"""Return True if the match failed because no entities matched."""
return self not in (
MatchFailedReason.INVALID_AREA,
MatchFailedReason.INVALID_FLOOR,
MatchFailedReason.DUPLICATE_NAME,
)
@dataclass
class MatchTargetsConstraints:
"""Constraints for async_match_targets."""
name: str | None = None
"""Entity name or alias."""
area_name: str | None = None
"""Area name, id, or alias."""
floor_name: str | None = None
"""Floor name, id, or alias."""
domains: Collection[str] | None = None
"""Domain names."""
device_classes: Collection[str] | None = None
"""Device class names."""
features: int | None = None
"""Required supported features."""
states: Collection[str] | None = None
"""Required states for entities."""
assistant: str | None = None
"""Name of assistant that entities should be exposed to."""
allow_duplicate_names: bool = False
"""True if entities with duplicate names are allowed in result."""
@property
def has_constraints(self) -> bool:
"""Returns True if at least one constraint is set (ignores assistant)."""
return bool(
self.name
or self.area_name
or self.floor_name
or self.domains
or self.device_classes
or self.features
or self.states
)
@dataclass
class MatchTargetsPreferences:
"""Preferences used to disambiguate duplicate name matches in async_match_targets."""
area_id: str | None = None
"""Id of area to use when deduplicating names."""
floor_id: str | None = None
"""Id of floor to use when deduplicating names."""
@dataclass
class MatchTargetsResult:
"""Result from async_match_targets."""
is_match: bool
"""True if one or more entities matched."""
no_match_reason: MatchFailedReason | None = None
"""Reason for failed match when is_match = False."""
states: list[State] = field(default_factory=list)
"""List of matched entity states when is_match = True."""
no_match_name: str | None = None
"""Name of invalid area/floor or duplicate name when match fails for those reasons."""
areas: list[area_registry.AreaEntry] = field(default_factory=list)
"""Areas that were targeted."""
floors: list[floor_registry.FloorEntry] = field(default_factory=list)
"""Floors that were targeted."""
class MatchFailedError(IntentError):
"""Error when target matching fails."""
def __init__(
self,
result: MatchTargetsResult,
constraints: MatchTargetsConstraints,
preferences: MatchTargetsPreferences | None = None,
) -> None:
"""Initialize error."""
super().__init__()
self.result = result
self.constraints = constraints
self.preferences = preferences
def __str__(self) -> str:
"""Return string representation."""
return f"<MatchFailedError result={self.result}, constraints={self.constraints}, preferences={self.preferences}>"
class NoStatesMatchedError(MatchFailedError):
"""Error when no states match the intent's constraints."""
def __init__(
self,
reason: MatchFailedReason,
name: str | None = None,
area: str | None = None,
floor: str | None = None,
domains: set[str] | None = None,
device_classes: set[str] | None = None,
) -> None:
"""Initialize error."""
super().__init__(
result=MatchTargetsResult(False, reason),
constraints=MatchTargetsConstraints(
name=name,
area_name=area,
floor_name=floor,
domains=domains,
device_classes=device_classes,
),
)
@dataclass
class MatchTargetsCandidate:
"""Candidate for async_match_targets."""
state: State
entity: entity_registry.RegistryEntry | None = None
area: area_registry.AreaEntry | None = None
floor: floor_registry.FloorEntry | None = None
device: device_registry.DeviceEntry | None = None
matched_name: str | None = None
def find_areas(
name: str, areas: area_registry.AreaRegistry
) -> Iterable[area_registry.AreaEntry]:
"""Find all areas matching a name (including aliases)."""
name_norm = _normalize_name(name)
for area in areas.async_list_areas():
# Accept name or area id
if (area.id == name) or (_normalize_name(area.name) == name_norm):
yield area
continue
if not area.aliases:
continue
for alias in area.aliases:
if _normalize_name(alias) == name_norm:
yield area
break
def find_floors(
name: str, floors: floor_registry.FloorRegistry
) -> Iterable[floor_registry.FloorEntry]:
"""Find all floors matching a name (including aliases)."""
name_norm = _normalize_name(name)
for floor in floors.async_list_floors():
# Accept name or floor id
if (floor.floor_id == name) or (_normalize_name(floor.name) == name_norm):
yield floor
continue
if not floor.aliases:
continue
for alias in floor.aliases:
if _normalize_name(alias) == name_norm:
yield floor
break
def _normalize_name(name: str) -> str:
"""Normalize name for comparison."""
return name.strip().casefold()
def _filter_by_name(
name: str,
candidates: Iterable[MatchTargetsCandidate],
) -> Iterable[MatchTargetsCandidate]:
"""Filter candidates by name."""
name_norm = _normalize_name(name)
for candidate in candidates:
# Accept name or entity id
if (candidate.state.entity_id == name) or _normalize_name(
candidate.state.name
) == name_norm:
candidate.matched_name = name
yield candidate
continue
if candidate.entity is None:
continue
if candidate.entity.name and (
_normalize_name(candidate.entity.name) == name_norm
):
candidate.matched_name = name
yield candidate
continue
# Check aliases
if candidate.entity.aliases:
for alias in candidate.entity.aliases:
if _normalize_name(alias) == name_norm:
candidate.matched_name = name
yield candidate
break
def _filter_by_features(
features: int,
candidates: Iterable[MatchTargetsCandidate],
) -> Iterable[MatchTargetsCandidate]:
"""Filter candidates by supported features."""
for candidate in candidates:
if (candidate.entity is not None) and (
(candidate.entity.supported_features & features) == features
):
yield candidate
continue
supported_features = candidate.state.attributes.get(ATTR_SUPPORTED_FEATURES, 0)
if (supported_features & features) == features:
yield candidate
def _filter_by_device_classes(
device_classes: Iterable[str],
candidates: Iterable[MatchTargetsCandidate],
) -> Iterable[MatchTargetsCandidate]:
"""Filter candidates by device classes."""
for candidate in candidates:
if (
(candidate.entity is not None)
and candidate.entity.device_class
and (candidate.entity.device_class in device_classes)
):
yield candidate
continue
device_class = candidate.state.attributes.get(ATTR_DEVICE_CLASS)
if device_class and (device_class in device_classes):
yield candidate
def _add_areas(
areas: area_registry.AreaRegistry,
devices: device_registry.DeviceRegistry,
candidates: Iterable[MatchTargetsCandidate],
) -> None:
"""Add area and device entries to match candidates."""
for candidate in candidates:
if candidate.entity is None:
continue
if candidate.entity.device_id:
candidate.device = devices.async_get(candidate.entity.device_id)
if candidate.entity.area_id:
# Use entity area first
candidate.area = areas.async_get_area(candidate.entity.area_id)
assert candidate.area is not None
elif (candidate.device is not None) and candidate.device.area_id:
# Fall back to device area
candidate.area = areas.async_get_area(candidate.device.area_id)
@callback
def async_match_targets( # noqa: C901
hass: HomeAssistant,
constraints: MatchTargetsConstraints,
preferences: MatchTargetsPreferences | None = None,
states: list[State] | None = None,
) -> MatchTargetsResult:
"""Match entities based on constraints in order to handle an intent."""
preferences = preferences or MatchTargetsPreferences()
filtered_by_domain = False
if not states:
# Get all states and filter by domain
states = hass.states.async_all(constraints.domains)
filtered_by_domain = True
if not states:
return MatchTargetsResult(False, MatchFailedReason.DOMAIN)
if constraints.assistant:
# Filter by exposure
states = [
s
for s in states
if async_should_expose(hass, constraints.assistant, s.entity_id)
]
if not states:
return MatchTargetsResult(False, MatchFailedReason.ASSISTANT)
if constraints.domains and (not filtered_by_domain):
# Filter by domain (if we didn't already do it)
states = [s for s in states if s.domain in constraints.domains]
if not states:
return MatchTargetsResult(False, MatchFailedReason.DOMAIN)
if constraints.states:
# Filter by state
states = [s for s in states if s.state in constraints.states]
if not states:
return MatchTargetsResult(False, MatchFailedReason.STATE)
# Exit early so we can avoid registry lookups
if not (
constraints.name
or constraints.features
or constraints.device_classes
or constraints.area_name
or constraints.floor_name
):
return MatchTargetsResult(True, states=states)
# We need entity registry entries now
er = entity_registry.async_get(hass)
candidates = [MatchTargetsCandidate(s, er.async_get(s.entity_id)) for s in states]
if constraints.name:
# Filter by entity name or alias
candidates = list(_filter_by_name(constraints.name, candidates))
if not candidates:
return MatchTargetsResult(False, MatchFailedReason.NAME)
if constraints.features:
# Filter by supported features
candidates = list(_filter_by_features(constraints.features, candidates))
if not candidates:
return MatchTargetsResult(False, MatchFailedReason.FEATURE)
if constraints.device_classes:
# Filter by device class
candidates = list(
_filter_by_device_classes(constraints.device_classes, candidates)
)
if not candidates:
return MatchTargetsResult(False, MatchFailedReason.DEVICE_CLASS)
# Check floor/area constraints
targeted_floors: list[floor_registry.FloorEntry] | None = None
targeted_areas: list[area_registry.AreaEntry] | None = None
# True when area information has been added to candidates
areas_added = False
if constraints.floor_name or constraints.area_name:
ar = area_registry.async_get(hass)
dr = device_registry.async_get(hass)
_add_areas(ar, dr, candidates)
areas_added = True
if constraints.floor_name:
# Filter by areas associated with floor
fr = floor_registry.async_get(hass)
targeted_floors = list(find_floors(constraints.floor_name, fr))
if not targeted_floors:
return MatchTargetsResult(
False,
MatchFailedReason.INVALID_FLOOR,
no_match_name=constraints.floor_name,
)
possible_floor_ids = {floor.floor_id for floor in targeted_floors}
possible_area_ids = {
area.id
for area in ar.async_list_areas()
if area.floor_id in possible_floor_ids
}
candidates = [
c
for c in candidates
if (c.area is not None) and (c.area.id in possible_area_ids)
]
if not candidates:
return MatchTargetsResult(
False, MatchFailedReason.FLOOR, floors=targeted_floors
)
else:
# All areas are possible
possible_area_ids = {area.id for area in ar.async_list_areas()}
if constraints.area_name:
targeted_areas = list(find_areas(constraints.area_name, ar))
if not targeted_areas:
return MatchTargetsResult(
False,
MatchFailedReason.INVALID_AREA,
no_match_name=constraints.area_name,
)
matching_area_ids = {area.id for area in targeted_areas}
# May be constrained by floors above
possible_area_ids.intersection_update(matching_area_ids)
candidates = [
c
for c in candidates
if (c.area is not None) and (c.area.id in possible_area_ids)
]
if not candidates:
return MatchTargetsResult(
False, MatchFailedReason.AREA, areas=targeted_areas
)
if constraints.name and (not constraints.allow_duplicate_names):
# Check for duplicates
if not areas_added:
ar = area_registry.async_get(hass)
dr = device_registry.async_get(hass)
_add_areas(ar, dr, candidates)
areas_added = True
sorted_candidates = sorted(
[c for c in candidates if c.matched_name],
key=lambda c: c.matched_name or "",
)
final_candidates: list[MatchTargetsCandidate] = []
for name, group in groupby(sorted_candidates, key=lambda c: c.matched_name):
group_candidates = list(group)
if len(group_candidates) < 2:
# No duplicates for name
final_candidates.extend(group_candidates)
continue
# Try to disambiguate by preferences
if preferences.floor_id:
group_candidates = [
c
for c in group_candidates
if (c.area is not None)
and (c.area.floor_id == preferences.floor_id)
]
if len(group_candidates) < 2:
# Disambiguated by floor
final_candidates.extend(group_candidates)
continue
if preferences.area_id:
group_candidates = [
c
for c in group_candidates
if (c.area is not None) and (c.area.id == preferences.area_id)
]
if len(group_candidates) < 2:
# Disambiguated by area
final_candidates.extend(group_candidates)
continue
# Couldn't disambiguate duplicate names
return MatchTargetsResult(
False,
MatchFailedReason.DUPLICATE_NAME,
no_match_name=name,
areas=targeted_areas or [],
floors=targeted_floors or [],
)
if not final_candidates:
return MatchTargetsResult(
False,
MatchFailedReason.NAME,
areas=targeted_areas or [],
floors=targeted_floors or [],
)
candidates = final_candidates
return MatchTargetsResult(
True,
None,
states=[c.state for c in candidates],
areas=targeted_areas or [],
floors=targeted_floors or [],
)
@callback
@bind_hass
def async_match_states(
hass: HomeAssistant,
name: str | None = None,
area_name: str | None = None,
floor_name: str | None = None,
domains: Collection[str] | None = None,
device_classes: Collection[str] | None = None,
states: list[State] | None = None,
assistant: str | None = None,
) -> Iterable[State]:
"""Simplified interface to async_match_targets that returns states matching the constraints."""
result = async_match_targets(
hass,
constraints=MatchTargetsConstraints(
name=name,
area_name=area_name,
floor_name=floor_name,
domains=domains,
device_classes=device_classes,
assistant=assistant,
),
states=states,
)
return result.states
@callback
def async_test_feature(state: State, feature: int, feature_name: str) -> None:
"""Test if state supports a feature."""
if state.attributes.get(ATTR_SUPPORTED_FEATURES, 0) & feature == 0:
raise IntentHandleError(f"Entity {state.name} does not support {feature_name}")
class IntentHandler:
"""Intent handler registration."""
2024-05-15 04:32:11 +00:00
intent_type: str
platforms: set[str] | None = None
description: str | None = None
@property
def slot_schema(self) -> dict | None:
"""Return a slot schema."""
return None
@callback
def async_can_handle(self, intent_obj: Intent) -> bool:
"""Test if an intent can be handled."""
return self.platforms is None or intent_obj.platform in self.platforms
@callback
def async_validate_slots(self, slots: _SlotsType) -> _SlotsType:
"""Validate slot information."""
if self.slot_schema is None:
return slots
return self._slot_schema(slots) # type: ignore[no-any-return]
@cached_property
def _slot_schema(self) -> vol.Schema:
"""Create validation schema for slots."""
assert self.slot_schema is not None
return vol.Schema(
{
key: SLOT_SCHEMA.extend({"value": validator})
for key, validator in self.slot_schema.items()
},
extra=vol.ALLOW_EXTRA,
)
async def async_handle(self, intent_obj: Intent) -> IntentResponse:
"""Handle the intent."""
2024-03-17 23:40:38 +00:00
raise NotImplementedError
def __repr__(self) -> str:
"""Represent a string of an intent handler."""
return f"<{self.__class__.__name__} - {self.intent_type}>"
def non_empty_string(value: Any) -> str:
"""Coerce value to string and fail if string is empty or whitespace."""
value_str = cv.string(value)
if not value_str.strip():
raise vol.Invalid("string value is empty")
return value_str
class DynamicServiceIntentHandler(IntentHandler):
"""Service Intent handler registration (dynamic).
Service specific intent handler that calls a service by name/entity_id.
"""
# We use a small timeout in service calls to (hopefully) pass validation
# checks, but not try to wait for the call to fully complete.
service_timeout: float = 0.2
2019-07-31 19:25:30 +00:00
def __init__(
self,
intent_type: str,
speech: str | None = None,
2024-06-26 09:30:07 +00:00
required_slots: _IntentSlotsType | None = None,
optional_slots: _IntentSlotsType | None = None,
required_domains: set[str] | None = None,
required_features: int | None = None,
required_states: set[str] | None = None,
description: str | None = None,
platforms: set[str] | None = None,
2019-07-31 19:25:30 +00:00
) -> None:
"""Create Service Intent Handler."""
self.intent_type = intent_type
self.speech = speech
self.required_domains = required_domains
self.required_features = required_features
self.required_states = required_states
self.description = description
self.platforms = platforms
2024-06-26 09:30:07 +00:00
self.required_slots: _IntentSlotsType = {}
if required_slots:
for key, value_schema in required_slots.items():
if isinstance(key, str):
# Slot name/service data key
key = (key, key)
self.required_slots[key] = value_schema
2024-06-26 09:30:07 +00:00
self.optional_slots: _IntentSlotsType = {}
if optional_slots:
for key, value_schema in optional_slots.items():
if isinstance(key, str):
# Slot name/service data key
key = (key, key)
self.optional_slots[key] = value_schema
@cached_property
def slot_schema(self) -> dict:
"""Return a slot schema."""
slot_schema = {
vol.Any("name", "area", "floor"): non_empty_string,
vol.Optional("domain"): vol.All(cv.ensure_list, [cv.string]),
vol.Optional("device_class"): vol.All(cv.ensure_list, [cv.string]),
vol.Optional("preferred_area_id"): cv.string,
vol.Optional("preferred_floor_id"): cv.string,
}
if self.required_slots:
slot_schema.update(
{
vol.Required(key[0]): validator
for key, validator in self.required_slots.items()
}
)
if self.optional_slots:
slot_schema.update(
{
vol.Optional(key[0]): validator
for key, validator in self.optional_slots.items()
}
)
return slot_schema
@abstractmethod
def get_domain_and_service(
self, intent_obj: Intent, state: State
) -> tuple[str, str]:
"""Get the domain and service name to call."""
2024-03-17 23:40:38 +00:00
raise NotImplementedError
async def async_handle(self, intent_obj: Intent) -> IntentResponse:
"""Handle the hass intent."""
hass = intent_obj.hass
slots = self.async_validate_slots(intent_obj.slots)
name_slot = slots.get("name", {})
entity_name: str | None = name_slot.get("value")
entity_text: str | None = name_slot.get("text")
if entity_name == "all":
# Don't match on name if targeting all entities
entity_name = None
# Get area/floor info
area_slot = slots.get("area", {})
area_id = area_slot.get("value")
floor_slot = slots.get("floor", {})
floor_id = floor_slot.get("value")
# Optional domain/device class filters.
# Convert to sets for speed.
domains: set[str] | None = self.required_domains
device_classes: set[str] | None = None
if "domain" in slots:
domains = set(slots["domain"]["value"])
if self.required_domains:
# Must be a subset of intent's required domain(s)
domains.intersection_update(self.required_domains)
if "device_class" in slots:
device_classes = set(slots["device_class"]["value"])
match_constraints = MatchTargetsConstraints(
name=entity_name,
area_name=area_id,
floor_name=floor_id,
domains=domains,
device_classes=device_classes,
assistant=intent_obj.assistant,
features=self.required_features,
states=self.required_states,
)
if not match_constraints.has_constraints:
# Fail if attempting to target all devices in the house
raise IntentHandleError("Service handler cannot target all devices")
match_preferences = MatchTargetsPreferences(
area_id=slots.get("preferred_area_id", {}).get("value"),
floor_id=slots.get("preferred_floor_id", {}).get("value"),
)
match_result = async_match_targets(hass, match_constraints, match_preferences)
if not match_result.is_match:
raise MatchFailedError(
result=match_result,
constraints=match_constraints,
preferences=match_preferences,
)
# Ensure name is text
if ("name" in slots) and entity_text:
slots["name"]["value"] = entity_text
# Replace area/floor values with the resolved ids for use in templates
if ("area" in slots) and match_result.areas:
slots["area"]["value"] = match_result.areas[0].id
if ("floor" in slots) and match_result.floors:
slots["floor"]["value"] = match_result.floors[0].floor_id
# Update intent slots to include any transformations done by the schemas
intent_obj.slots = slots
response = await self.async_handle_states(
intent_obj, match_result, match_constraints, match_preferences
)
# Make the matched states available in the response
response.async_set_states(
matched_states=match_result.states, unmatched_states=[]
)
return response
async def async_handle_states(
self,
intent_obj: Intent,
match_result: MatchTargetsResult,
match_constraints: MatchTargetsConstraints,
match_preferences: MatchTargetsPreferences | None = None,
) -> IntentResponse:
"""Complete action on matched entity states."""
states = match_result.states
response = intent_obj.create_response()
hass = intent_obj.hass
success_results: list[IntentResponseTarget] = []
if match_result.floors:
success_results.extend(
IntentResponseTarget(
type=IntentResponseTargetType.FLOOR,
name=floor.name,
id=floor.floor_id,
)
for floor in match_result.floors
)
speech_name = match_result.floors[0].name
elif match_result.areas:
success_results.extend(
IntentResponseTarget(
type=IntentResponseTargetType.AREA, name=area.name, id=area.id
)
for area in match_result.areas
)
speech_name = match_result.areas[0].name
else:
speech_name = states[0].name
service_coros: list[Coroutine[Any, Any, None]] = []
for state in states:
domain, service = self.get_domain_and_service(intent_obj, state)
service_coros.append(
self.async_call_service(domain, service, intent_obj, state)
)
# Handle service calls in parallel, noting failures as they occur.
failed_results: list[IntentResponseTarget] = []
2024-04-14 05:14:26 +00:00
for state, service_coro in zip(
states, asyncio.as_completed(service_coros), strict=False
):
target = IntentResponseTarget(
type=IntentResponseTargetType.ENTITY,
name=state.name,
id=state.entity_id,
)
try:
await service_coro
success_results.append(target)
except Exception:
failed_results.append(target)
_LOGGER.exception("Service call failed for %s", state.entity_id)
if not success_results:
# If no entities succeeded, raise an error.
failed_entity_ids = [target.id for target in failed_results]
raise IntentHandleError(
f"Failed to call {service} for: {failed_entity_ids}"
)
response.async_set_results(
success_results=success_results, failed_results=failed_results
)
# Update all states
states = [hass.states.get(state.entity_id) or state for state in states]
response.async_set_states(states)
if self.speech is not None:
response.async_set_speech(self.speech.format(speech_name))
return response
async def async_call_service(
self, domain: str, service: str, intent_obj: Intent, state: State
) -> None:
"""Call service on entity."""
hass = intent_obj.hass
service_data: dict[str, Any] = {ATTR_ENTITY_ID: state.entity_id}
if self.required_slots:
service_data.update(
{
key[1]: intent_obj.slots[key[0]]["value"]
for key in self.required_slots
}
)
if self.optional_slots:
for key in self.optional_slots:
value = intent_obj.slots.get(key[0])
if value:
service_data[key[1]] = value["value"]
await self._run_then_background(
hass.async_create_task_internal(
hass.services.async_call(
domain,
service,
service_data,
context=intent_obj.context,
blocking=True,
),
f"intent_call_service_{domain}_{service}",
)
)
async def _run_then_background(self, task: asyncio.Task[Any]) -> None:
"""Run task with timeout to (hopefully) catch validation errors.
After the timeout the task will continue to run in the background.
"""
try:
await asyncio.wait({task}, timeout=self.service_timeout)
except TimeoutError:
pass
except asyncio.CancelledError:
# Task calling us was cancelled, so cancel service call task, and wait for
# it to be cancelled, within reason, before leaving.
_LOGGER.debug("Service call was cancelled: %s", task.get_name())
task.cancel()
await asyncio.wait({task}, timeout=5)
raise
class ServiceIntentHandler(DynamicServiceIntentHandler):
"""Service Intent handler registration.
Service specific intent handler that calls a service by name/entity_id.
"""
def __init__(
self,
intent_type: str,
domain: str,
service: str,
speech: str | None = None,
2024-06-26 09:30:07 +00:00
required_slots: _IntentSlotsType | None = None,
optional_slots: _IntentSlotsType | None = None,
required_domains: set[str] | None = None,
required_features: int | None = None,
required_states: set[str] | None = None,
description: str | None = None,
platforms: set[str] | None = None,
) -> None:
"""Create service handler."""
super().__init__(
intent_type,
speech=speech,
required_slots=required_slots,
optional_slots=optional_slots,
required_domains=required_domains,
required_features=required_features,
required_states=required_states,
description=description,
platforms=platforms,
)
self.domain = domain
self.service = service
def get_domain_and_service(
self, intent_obj: Intent, state: State
) -> tuple[str, str]:
"""Get the domain and service name to call."""
return (self.domain, self.service)
class IntentCategory(Enum):
"""Category of an intent."""
ACTION = "action"
"""Trigger an action like turning an entity on or off"""
QUERY = "query"
"""Get information about the state of an entity"""
class Intent:
"""Hold the intent."""
__slots__ = [
"hass",
"platform",
"intent_type",
"slots",
"text_input",
"context",
"language",
"category",
"assistant",
"device_id",
"conversation_agent_id",
]
2019-07-31 19:25:30 +00:00
def __init__(
self,
hass: HomeAssistant,
2019-07-31 19:25:30 +00:00
platform: str,
intent_type: str,
slots: _SlotsType,
2021-03-17 17:34:19 +00:00
text_input: str | None,
context: Context,
language: str,
category: IntentCategory | None = None,
assistant: str | None = None,
device_id: str | None = None,
conversation_agent_id: str | None = None,
2019-07-31 19:25:30 +00:00
) -> None:
"""Initialize an intent."""
self.hass = hass
self.platform = platform
self.intent_type = intent_type
self.slots = slots
self.text_input = text_input
self.context = context
self.language = language
self.category = category
self.assistant = assistant
self.device_id = device_id
self.conversation_agent_id = conversation_agent_id
@callback
def create_response(self) -> IntentResponse:
"""Create a response."""
return IntentResponse(language=self.language, intent=self)
class IntentResponseType(Enum):
"""Type of the intent response."""
ACTION_DONE = "action_done"
"""Intent caused an action to occur"""
PARTIAL_ACTION_DONE = "partial_action_done"
"""Intent caused an action, but it could only be partially done"""
QUERY_ANSWER = "query_answer"
"""Response is an answer to a query"""
ERROR = "error"
"""Response is an error"""
class IntentResponseErrorCode(str, Enum):
"""Reason for an intent response error."""
NO_INTENT_MATCH = "no_intent_match"
"""Text could not be matched to an intent"""
NO_VALID_TARGETS = "no_valid_targets"
"""Intent was matched, but no valid areas/devices/entities were targeted"""
FAILED_TO_HANDLE = "failed_to_handle"
"""Unexpected error occurred while handling intent"""
UNKNOWN = "unknown"
"""Error outside the scope of intent processing"""
class IntentResponseTargetType(str, Enum):
"""Type of target for an intent response."""
AREA = "area"
FLOOR = "floor"
DEVICE = "device"
ENTITY = "entity"
DOMAIN = "domain"
DEVICE_CLASS = "device_class"
CUSTOM = "custom"
@dataclass(slots=True)
class IntentResponseTarget:
"""Target of the intent response."""
name: str
type: IntentResponseTargetType
id: str | None = None
class IntentResponse:
"""Response to an intent."""
def __init__(
self,
language: str,
intent: Intent | None = None,
) -> None:
"""Initialize an IntentResponse."""
self.language = language
self.intent = intent
2021-03-17 17:34:19 +00:00
self.speech: dict[str, dict[str, Any]] = {}
2022-01-31 18:23:26 +00:00
self.reprompt: dict[str, dict[str, Any]] = {}
2021-03-17 17:34:19 +00:00
self.card: dict[str, dict[str, str]] = {}
self.error_code: IntentResponseErrorCode | None = None
self.intent_targets: list[IntentResponseTarget] = []
self.success_results: list[IntentResponseTarget] = []
self.failed_results: list[IntentResponseTarget] = []
self.matched_states: list[State] = []
self.unmatched_states: list[State] = []
self.speech_slots: dict[str, Any] = {}
if (self.intent is not None) and (self.intent.category == IntentCategory.QUERY):
# speech will be the answer to the query
self.response_type = IntentResponseType.QUERY_ANSWER
else:
self.response_type = IntentResponseType.ACTION_DONE
@callback
2019-07-31 19:25:30 +00:00
def async_set_speech(
self,
speech: str,
speech_type: str = "plain",
extra_data: Any | None = None,
2019-07-31 19:25:30 +00:00
) -> None:
"""Set speech response."""
self.speech[speech_type] = {
"speech": speech,
"extra_data": extra_data,
}
2022-01-31 18:23:26 +00:00
@callback
def async_set_reprompt(
self,
speech: str,
speech_type: str = "plain",
extra_data: Any | None = None,
2022-01-31 18:23:26 +00:00
) -> None:
"""Set reprompt response."""
self.reprompt[speech_type] = {
"reprompt": speech,
"extra_data": extra_data,
}
2022-01-31 18:23:26 +00:00
@callback
2019-07-31 19:25:30 +00:00
def async_set_card(
self, title: str, content: str, card_type: str = "simple"
) -> None:
2022-01-31 18:23:26 +00:00
"""Set card response."""
2019-07-31 19:25:30 +00:00
self.card[card_type] = {"title": title, "content": content}
@callback
def async_set_error(self, code: IntentResponseErrorCode, message: str) -> None:
"""Set response error."""
self.response_type = IntentResponseType.ERROR
self.error_code = code
# Speak error message
self.async_set_speech(message)
@callback
def async_set_targets(
self,
intent_targets: list[IntentResponseTarget],
) -> None:
"""Set response targets."""
self.intent_targets = intent_targets
@callback
def async_set_results(
self,
success_results: list[IntentResponseTarget],
failed_results: list[IntentResponseTarget] | None = None,
) -> None:
"""Set response results."""
self.success_results = success_results
self.failed_results = failed_results if failed_results is not None else []
@callback
def async_set_states(
self, matched_states: list[State], unmatched_states: list[State] | None = None
) -> None:
"""Set entity states that were matched or not matched during intent handling (query)."""
self.matched_states = matched_states
self.unmatched_states = unmatched_states or []
@callback
def async_set_speech_slots(self, speech_slots: dict[str, Any]) -> None:
"""Set slots that will be used in the response template of the default agent."""
self.speech_slots = speech_slots
@callback
def as_dict(self) -> dict[str, Any]:
"""Return a dictionary representation of an intent response."""
response_dict: dict[str, Any] = {
"speech": self.speech,
"card": self.card,
"language": self.language,
"response_type": self.response_type.value,
}
if self.reprompt:
response_dict["reprompt"] = self.reprompt
if self.speech_slots:
response_dict["speech_slots"] = self.speech_slots
response_data: dict[str, Any] = {}
if self.response_type == IntentResponseType.ERROR:
assert self.error_code is not None, "error code is required"
response_data["code"] = self.error_code.value
else:
# action done or query answer
response_data["targets"] = [
dataclasses.asdict(target) for target in self.intent_targets
]
# Add success/failed targets
response_data["success"] = [
dataclasses.asdict(target) for target in self.success_results
]
response_data["failed"] = [
dataclasses.asdict(target) for target in self.failed_results
]
response_dict["data"] = response_data
return response_dict