core/homeassistant/components/search/__init__.py

606 lines
22 KiB
Python

"""The Search integration."""
from __future__ import annotations
from collections import defaultdict
from collections.abc import Iterable
from enum import StrEnum
import logging
from typing import Any
import voluptuous as vol
from homeassistant.components import automation, group, person, script, websocket_api
from homeassistant.components.homeassistant import scene
from homeassistant.core import HomeAssistant, callback, split_entity_id
from homeassistant.helpers import (
area_registry as ar,
config_validation as cv,
device_registry as dr,
entity_registry as er,
)
from homeassistant.helpers.entity import (
EntityInfo,
entity_sources as get_entity_sources,
)
from homeassistant.helpers.typing import ConfigType
DOMAIN = "search"
_LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
# enum of item types
class ItemType(StrEnum):
"""Item types."""
AREA = "area"
AUTOMATION = "automation"
AUTOMATION_BLUEPRINT = "automation_blueprint"
CONFIG_ENTRY = "config_entry"
DEVICE = "device"
ENTITY = "entity"
FLOOR = "floor"
GROUP = "group"
INTEGRATION = "integration"
LABEL = "label"
PERSON = "person"
SCENE = "scene"
SCRIPT = "script"
SCRIPT_BLUEPRINT = "script_blueprint"
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Search component."""
websocket_api.async_register_command(hass, websocket_search_related)
return True
@websocket_api.websocket_command(
{
vol.Required("type"): "search/related",
vol.Required("item_type"): vol.Coerce(ItemType),
vol.Required("item_id"): str,
}
)
@callback
def websocket_search_related(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Handle search."""
searcher = Searcher(hass, get_entity_sources(hass))
connection.send_result(
msg["id"], searcher.async_search(msg["item_type"], msg["item_id"])
)
class Searcher:
"""Find related things."""
EXIST_AS_ENTITY = {"automation", "group", "person", "scene", "script"}
def __init__(
self,
hass: HomeAssistant,
entity_sources: dict[str, EntityInfo],
) -> None:
"""Search results."""
self.hass = hass
self._area_registry = ar.async_get(hass)
self._device_registry = dr.async_get(hass)
self._entity_registry = er.async_get(hass)
self._entity_sources = entity_sources
self.results: defaultdict[ItemType, set[str]] = defaultdict(set)
@callback
def async_search(self, item_type: ItemType, item_id: str) -> dict[str, set[str]]:
"""Find results."""
_LOGGER.debug("Searching for %s/%s", item_type, item_id)
getattr(self, f"_async_search_{item_type}")(item_id)
# Remove the original requested item from the results (if present)
if item_type in self.results and item_id in self.results[item_type]:
self.results[item_type].remove(item_id)
# Filter out empty sets.
return {key: val for key, val in self.results.items() if val}
@callback
def _add(self, item_type: ItemType, item_id: str | Iterable[str] | None) -> None:
"""Add an item (or items) to the results."""
if item_id is None:
return
if isinstance(item_id, str):
self.results[item_type].add(item_id)
else:
self.results[item_type].update(item_id)
@callback
def _async_search_area(self, area_id: str, *, entry_point: bool = True) -> None:
"""Find results for an area."""
if not (area_entry := self._async_resolve_up_area(area_id)):
return
if entry_point:
# Add labels of this area
self._add(ItemType.LABEL, area_entry.labels)
# Automations referencing this area
self._add(
ItemType.AUTOMATION, automation.automations_with_area(self.hass, area_id)
)
# Scripts referencing this area
self._add(ItemType.SCRIPT, script.scripts_with_area(self.hass, area_id))
# Entity in this area, will extend this with the entities of the devices in this area
entity_entries = er.async_entries_for_area(self._entity_registry, area_id)
# Devices in this area
for device in dr.async_entries_for_area(self._device_registry, area_id):
self._add(ItemType.DEVICE, device.id)
# Config entries for devices in this area
if device_entry := self._device_registry.async_get(device.id):
self._add(ItemType.CONFIG_ENTRY, device_entry.config_entries)
# Automations referencing this device
self._add(
ItemType.AUTOMATION,
automation.automations_with_device(self.hass, device.id),
)
# Scripts referencing this device
self._add(ItemType.SCRIPT, script.scripts_with_device(self.hass, device.id))
# Entities of this device
for entity_entry in er.async_entries_for_device(
self._entity_registry, device.id
):
# Skip the entity if it's in a different area
if entity_entry.area_id is not None:
continue
entity_entries.append(entity_entry)
# Process entities in this area
for entity_entry in entity_entries:
self._add(ItemType.ENTITY, entity_entry.entity_id)
# If this entity also exists as a resource, we add it.
if entity_entry.domain in self.EXIST_AS_ENTITY:
self._add(ItemType(entity_entry.domain), entity_entry.entity_id)
# Automations referencing this entity
self._add(
ItemType.AUTOMATION,
automation.automations_with_entity(self.hass, entity_entry.entity_id),
)
# Scripts referencing this entity
self._add(
ItemType.SCRIPT,
script.scripts_with_entity(self.hass, entity_entry.entity_id),
)
# Groups that have this entity as a member
self._add(
ItemType.GROUP,
group.groups_with_entity(self.hass, entity_entry.entity_id),
)
# Persons that use this entity
self._add(
ItemType.PERSON,
person.persons_with_entity(self.hass, entity_entry.entity_id),
)
# Scenes that reference this entity
self._add(
ItemType.SCENE,
scene.scenes_with_entity(self.hass, entity_entry.entity_id),
)
# Config entries for entities in this area
self._add(ItemType.CONFIG_ENTRY, entity_entry.config_entry_id)
@callback
def _async_search_automation(self, automation_entity_id: str) -> None:
"""Find results for an automation."""
# Up resolve the automation entity itself
if entity_entry := self._async_resolve_up_entity(automation_entity_id):
# Add labels of this automation entity
self._add(ItemType.LABEL, entity_entry.labels)
# Find the blueprint used in this automation
self._add(
ItemType.AUTOMATION_BLUEPRINT,
automation.blueprint_in_automation(self.hass, automation_entity_id),
)
# Floors referenced in this automation
self._add(
ItemType.FLOOR,
automation.floors_in_automation(self.hass, automation_entity_id),
)
# Areas referenced in this automation
for area in automation.areas_in_automation(self.hass, automation_entity_id):
self._add(ItemType.AREA, area)
self._async_resolve_up_area(area)
# Devices referenced in this automation
for device in automation.devices_in_automation(self.hass, automation_entity_id):
self._add(ItemType.DEVICE, device)
self._async_resolve_up_device(device)
# Entities referenced in this automation
for entity_id in automation.entities_in_automation(
self.hass, automation_entity_id
):
self._add(ItemType.ENTITY, entity_id)
self._async_resolve_up_entity(entity_id)
# If this entity also exists as a resource, we add it.
domain = split_entity_id(entity_id)[0]
if domain in self.EXIST_AS_ENTITY:
self._add(ItemType(domain), entity_id)
# For an automation, we want to unwrap the groups, to ensure we
# relate this automation to all those members as well.
if domain == "group":
for group_entity_id in group.get_entity_ids(self.hass, entity_id):
self._add(ItemType.ENTITY, group_entity_id)
self._async_resolve_up_entity(group_entity_id)
# For an automation, we want to unwrap the scenes, to ensure we
# relate this automation to all referenced entities as well.
if domain == "scene":
for scene_entity_id in scene.entities_in_scene(self.hass, entity_id):
self._add(ItemType.ENTITY, scene_entity_id)
self._async_resolve_up_entity(scene_entity_id)
# Fully search the script if it is part of an automation.
# This makes the automation return all results of the embedded script.
if domain == "script":
self._async_search_script(entity_id, entry_point=False)
@callback
def _async_search_automation_blueprint(self, blueprint_path: str) -> None:
"""Find results for an automation blueprint."""
self._add(
ItemType.AUTOMATION,
automation.automations_with_blueprint(self.hass, blueprint_path),
)
@callback
def _async_search_config_entry(self, config_entry_id: str) -> None:
"""Find results for a config entry."""
for device_entry in dr.async_entries_for_config_entry(
self._device_registry, config_entry_id
):
self._add(ItemType.DEVICE, device_entry.id)
self._async_search_device(device_entry.id, entry_point=False)
for entity_entry in er.async_entries_for_config_entry(
self._entity_registry, config_entry_id
):
self._add(ItemType.ENTITY, entity_entry.entity_id)
self._async_search_entity(entity_entry.entity_id, entry_point=False)
@callback
def _async_search_device(self, device_id: str, *, entry_point: bool = True) -> None:
"""Find results for a device."""
if not (device_entry := self._async_resolve_up_device(device_id)):
return
if entry_point:
# Add labels of this device
self._add(ItemType.LABEL, device_entry.labels)
# Automations referencing this device
self._add(
ItemType.AUTOMATION,
automation.automations_with_device(self.hass, device_id),
)
# Scripts referencing this device
self._add(ItemType.SCRIPT, script.scripts_with_device(self.hass, device_id))
# Entities of this device
for entity_entry in er.async_entries_for_device(
self._entity_registry, device_id
):
self._add(ItemType.ENTITY, entity_entry.entity_id)
# Add all entity information as well
self._async_search_entity(entity_entry.entity_id, entry_point=False)
@callback
def _async_search_entity(self, entity_id: str, *, entry_point: bool = True) -> None:
"""Find results for an entity."""
# Resolve up the entity itself
entity_entry = self._async_resolve_up_entity(entity_id)
if entity_entry and entry_point:
# Add labels of this entity
self._add(ItemType.LABEL, entity_entry.labels)
# Automations referencing this entity
self._add(
ItemType.AUTOMATION,
automation.automations_with_entity(self.hass, entity_id),
)
# Scripts referencing this entity
self._add(ItemType.SCRIPT, script.scripts_with_entity(self.hass, entity_id))
# Groups that have this entity as a member
self._add(ItemType.GROUP, group.groups_with_entity(self.hass, entity_id))
# Persons referencing this entity
self._add(ItemType.PERSON, person.persons_with_entity(self.hass, entity_id))
# Scenes referencing this entity
self._add(ItemType.SCENE, scene.scenes_with_entity(self.hass, entity_id))
@callback
def _async_search_floor(self, floor_id: str) -> None:
"""Find results for a floor."""
# Automations referencing this floor
self._add(
ItemType.AUTOMATION,
automation.automations_with_floor(self.hass, floor_id),
)
# Scripts referencing this floor
self._add(ItemType.SCRIPT, script.scripts_with_floor(self.hass, floor_id))
for area_entry in ar.async_entries_for_floor(self._area_registry, floor_id):
self._add(ItemType.AREA, area_entry.id)
self._async_search_area(area_entry.id, entry_point=False)
@callback
def _async_search_group(self, group_entity_id: str) -> None:
"""Find results for a group.
Note: We currently only support the classic groups, thus
we don't look up the area/floor for a group entity.
"""
# Automations referencing this group
self._add(
ItemType.AUTOMATION,
automation.automations_with_entity(self.hass, group_entity_id),
)
# Scripts referencing this group
self._add(
ItemType.SCRIPT, script.scripts_with_entity(self.hass, group_entity_id)
)
# Scenes that reference this group
self._add(ItemType.SCENE, scene.scenes_with_entity(self.hass, group_entity_id))
# Entities in this group
for entity_id in group.get_entity_ids(self.hass, group_entity_id):
self._add(ItemType.ENTITY, entity_id)
self._async_resolve_up_entity(entity_id)
@callback
def _async_search_label(self, label_id: str) -> None:
"""Find results for a label."""
# Areas with this label
for area_entry in ar.async_entries_for_label(self._area_registry, label_id):
self._add(ItemType.AREA, area_entry.id)
# Devices with this label
for device in dr.async_entries_for_label(self._device_registry, label_id):
self._add(ItemType.DEVICE, device.id)
# Entities with this label
for entity_entry in er.async_entries_for_label(self._entity_registry, label_id):
self._add(ItemType.ENTITY, entity_entry.entity_id)
# If this entity also exists as a resource, we add it.
domain = split_entity_id(entity_entry.entity_id)[0]
if domain in self.EXIST_AS_ENTITY:
self._add(ItemType(domain), entity_entry.entity_id)
# Automations referencing this label
self._add(
ItemType.AUTOMATION,
automation.automations_with_label(self.hass, label_id),
)
# Scripts referencing this label
self._add(ItemType.SCRIPT, script.scripts_with_label(self.hass, label_id))
@callback
def _async_search_person(self, person_entity_id: str) -> None:
"""Find results for a person."""
# Up resolve the scene entity itself
if entity_entry := self._async_resolve_up_entity(person_entity_id):
# Add labels of this person entity
self._add(ItemType.LABEL, entity_entry.labels)
# Automations referencing this person
self._add(
ItemType.AUTOMATION,
automation.automations_with_entity(self.hass, person_entity_id),
)
# Scripts referencing this person
self._add(
ItemType.SCRIPT, script.scripts_with_entity(self.hass, person_entity_id)
)
# Add all member entities of this person
self._add(
ItemType.ENTITY, person.entities_in_person(self.hass, person_entity_id)
)
@callback
def _async_search_scene(self, scene_entity_id: str) -> None:
"""Find results for a scene."""
# Up resolve the scene entity itself
if entity_entry := self._async_resolve_up_entity(scene_entity_id):
# Add labels of this scene entity
self._add(ItemType.LABEL, entity_entry.labels)
# Automations referencing this scene
self._add(
ItemType.AUTOMATION,
automation.automations_with_entity(self.hass, scene_entity_id),
)
# Scripts referencing this scene
self._add(
ItemType.SCRIPT, script.scripts_with_entity(self.hass, scene_entity_id)
)
# Add all entities in this scene
for entity in scene.entities_in_scene(self.hass, scene_entity_id):
self._add(ItemType.ENTITY, entity)
self._async_resolve_up_entity(entity)
@callback
def _async_search_script(
self, script_entity_id: str, *, entry_point: bool = True
) -> None:
"""Find results for a script."""
# Up resolve the script entity itself
entity_entry = self._async_resolve_up_entity(script_entity_id)
if entity_entry and entry_point:
# Add labels of this script entity
self._add(ItemType.LABEL, entity_entry.labels)
# Find the blueprint used in this script
self._add(
ItemType.SCRIPT_BLUEPRINT,
script.blueprint_in_script(self.hass, script_entity_id),
)
# Floors referenced in this script
self._add(ItemType.FLOOR, script.floors_in_script(self.hass, script_entity_id))
# Areas referenced in this script
for area in script.areas_in_script(self.hass, script_entity_id):
self._add(ItemType.AREA, area)
self._async_resolve_up_area(area)
# Devices referenced in this script
for device in script.devices_in_script(self.hass, script_entity_id):
self._add(ItemType.DEVICE, device)
self._async_resolve_up_device(device)
# Entities referenced in this script
for entity_id in script.entities_in_script(self.hass, script_entity_id):
self._add(ItemType.ENTITY, entity_id)
self._async_resolve_up_entity(entity_id)
# If this entity also exists as a resource, we add it.
domain = split_entity_id(entity_id)[0]
if domain in self.EXIST_AS_ENTITY:
self._add(ItemType(domain), entity_id)
# For an script, we want to unwrap the groups, to ensure we
# relate this script to all those members as well.
if domain == "group":
for group_entity_id in group.get_entity_ids(self.hass, entity_id):
self._add(ItemType.ENTITY, group_entity_id)
self._async_resolve_up_entity(group_entity_id)
# For an script, we want to unwrap the scenes, to ensure we
# relate this script to all referenced entities as well.
if domain == "scene":
for scene_entity_id in scene.entities_in_scene(self.hass, entity_id):
self._add(ItemType.ENTITY, scene_entity_id)
self._async_resolve_up_entity(scene_entity_id)
# Fully search the script if it is nested.
# This makes the script return all results of the embedded script.
if domain == "script":
self._async_search_script(entity_id, entry_point=False)
@callback
def _async_search_script_blueprint(self, blueprint_path: str) -> None:
"""Find results for a script blueprint."""
self._add(
ItemType.SCRIPT, script.scripts_with_blueprint(self.hass, blueprint_path)
)
@callback
def _async_resolve_up_device(self, device_id: str) -> dr.DeviceEntry | None:
"""Resolve up from a device.
Above a device is an area or floor.
Above a device is also the config entry.
"""
if device_entry := self._device_registry.async_get(device_id):
if device_entry.area_id:
self._add(ItemType.AREA, device_entry.area_id)
self._async_resolve_up_area(device_entry.area_id)
self._add(ItemType.CONFIG_ENTRY, device_entry.config_entries)
for config_entry_id in device_entry.config_entries:
if entry := self.hass.config_entries.async_get_entry(config_entry_id):
self._add(ItemType.INTEGRATION, entry.domain)
return device_entry
@callback
def _async_resolve_up_entity(self, entity_id: str) -> er.RegistryEntry | None:
"""Resolve up from an entity.
Above an entity is a device, area or floor.
Above an entity is also the config entry.
"""
if entity_entry := self._entity_registry.async_get(entity_id):
# Entity has an overridden area
if entity_entry.area_id:
self._add(ItemType.AREA, entity_entry.area_id)
self._async_resolve_up_area(entity_entry.area_id)
# Inherit area from device
elif entity_entry.device_id and (
device_entry := self._device_registry.async_get(entity_entry.device_id)
):
if device_entry.area_id:
self._add(ItemType.AREA, device_entry.area_id)
self._async_resolve_up_area(device_entry.area_id)
# Add device that provided this entity
self._add(ItemType.DEVICE, entity_entry.device_id)
# Add config entry that provided this entity
if entity_entry.config_entry_id:
self._add(ItemType.CONFIG_ENTRY, entity_entry.config_entry_id)
if entry := self.hass.config_entries.async_get_entry(
entity_entry.config_entry_id
):
# Add integration that provided this entity
self._add(ItemType.INTEGRATION, entry.domain)
elif source := self._entity_sources.get(entity_id):
# Add config entry that provided this entity
self._add(ItemType.CONFIG_ENTRY, source.get("config_entry"))
self._add(ItemType.INTEGRATION, source["domain"])
return entity_entry
@callback
def _async_resolve_up_area(self, area_id: str) -> ar.AreaEntry | None:
"""Resolve up from an area.
Above an area can be a floor.
"""
if area_entry := self._area_registry.async_get_area(area_id):
self._add(ItemType.FLOOR, area_entry.floor_id)
return area_entry