"""Test service helpers.""" import asyncio from collections.abc import Iterable from copy import deepcopy import io from typing import Any from unittest.mock import AsyncMock, Mock, patch import pytest from pytest_unordered import unordered import voluptuous as vol # To prevent circular import when running just this file from homeassistant import exceptions from homeassistant.auth.permissions import PolicyPermissions import homeassistant.components # noqa: F401 from homeassistant.components.group import DOMAIN as DOMAIN_GROUP, Group from homeassistant.components.logger import DOMAIN as DOMAIN_LOGGER from homeassistant.components.shell_command import DOMAIN as DOMAIN_SHELL_COMMAND from homeassistant.components.system_health import DOMAIN as DOMAIN_SYSTEM_HEALTH from homeassistant.const import ( ATTR_ENTITY_ID, ENTITY_MATCH_ALL, ENTITY_MATCH_NONE, STATE_OFF, STATE_ON, EntityCategory, ) from homeassistant.core import ( Context, HassJob, HomeAssistant, ServiceCall, SupportsResponse, ) from homeassistant.helpers import ( area_registry as ar, device_registry as dr, entity_registry as er, service, ) import homeassistant.helpers.config_validation as cv from homeassistant.loader import async_get_integration from homeassistant.setup import async_setup_component from homeassistant.util.yaml.loader import parse_yaml from tests.common import ( MockEntity, MockModule, MockUser, async_mock_service, mock_area_registry, mock_device_registry, mock_integration, mock_registry, ) SUPPORT_A = 1 SUPPORT_B = 2 SUPPORT_C = 4 @pytest.fixture def mock_handle_entity_call(): """Mock service platform call.""" with patch( "homeassistant.helpers.service._handle_entity_call", return_value=None, ) as mock_call: yield mock_call @pytest.fixture def mock_entities(hass: HomeAssistant) -> dict[str, MockEntity]: """Return mock entities in an ordered dict.""" kitchen = MockEntity( entity_id="light.kitchen", available=True, should_poll=False, supported_features=SUPPORT_A, ) living_room = MockEntity( entity_id="light.living_room", available=True, should_poll=False, supported_features=SUPPORT_B, ) bedroom = MockEntity( entity_id="light.bedroom", available=True, should_poll=False, supported_features=(SUPPORT_A | SUPPORT_B), ) bathroom = MockEntity( entity_id="light.bathroom", available=True, should_poll=False, supported_features=(SUPPORT_B | SUPPORT_C), ) entities = {} entities[kitchen.entity_id] = kitchen entities[living_room.entity_id] = living_room entities[bedroom.entity_id] = bedroom entities[bathroom.entity_id] = bathroom for entity in entities.values(): entity.hass = hass return entities @pytest.fixture def floor_area_mock(hass: HomeAssistant) -> None: """Mock including floor and area info.""" hass.states.async_set("light.Bowl", STATE_ON) hass.states.async_set("light.Ceiling", STATE_OFF) hass.states.async_set("light.Kitchen", STATE_OFF) area_in_floor = ar.AreaEntry( id="test-area", name="Test area", aliases={}, floor_id="test-floor", icon=None, picture=None, ) area_in_floor_a = ar.AreaEntry( id="area-a", name="Area A", aliases={}, floor_id="floor-a", icon=None, picture=None, ) mock_area_registry( hass, { area_in_floor.id: area_in_floor, area_in_floor_a.id: area_in_floor_a, }, ) device_in_area = dr.DeviceEntry(area_id="test-area") device_no_area = dr.DeviceEntry(id="device-no-area-id") device_diff_area = dr.DeviceEntry(area_id="diff-area") device_area_a = dr.DeviceEntry(id="device-area-a-id", area_id="area-a") mock_device_registry( hass, { device_in_area.id: device_in_area, device_no_area.id: device_no_area, device_diff_area.id: device_diff_area, device_area_a.id: device_area_a, }, ) entity_in_own_area = er.RegistryEntry( entity_id="light.in_own_area", unique_id="in-own-area-id", platform="test", area_id="own-area", ) config_entity_in_own_area = er.RegistryEntry( entity_id="light.config_in_own_area", unique_id="config-in-own-area-id", platform="test", area_id="own-area", entity_category=EntityCategory.CONFIG, ) hidden_entity_in_own_area = er.RegistryEntry( entity_id="light.hidden_in_own_area", unique_id="hidden-in-own-area-id", platform="test", area_id="own-area", hidden_by=er.RegistryEntryHider.USER, ) entity_in_area = er.RegistryEntry( entity_id="light.in_area", unique_id="in-area-id", platform="test", device_id=device_in_area.id, ) config_entity_in_area = er.RegistryEntry( entity_id="light.config_in_area", unique_id="config-in-area-id", platform="test", device_id=device_in_area.id, entity_category=EntityCategory.CONFIG, ) hidden_entity_in_area = er.RegistryEntry( entity_id="light.hidden_in_area", unique_id="hidden-in-area-id", platform="test", device_id=device_in_area.id, hidden_by=er.RegistryEntryHider.USER, ) entity_in_other_area = er.RegistryEntry( entity_id="light.in_other_area", unique_id="in-area-a-id", platform="test", device_id=device_in_area.id, area_id="other-area", ) entity_assigned_to_area = er.RegistryEntry( entity_id="light.assigned_to_area", unique_id="assigned-area-id", platform="test", device_id=device_in_area.id, area_id="test-area", ) entity_no_area = er.RegistryEntry( entity_id="light.no_area", unique_id="no-area-id", platform="test", device_id=device_no_area.id, ) config_entity_no_area = er.RegistryEntry( entity_id="light.config_no_area", unique_id="config-no-area-id", platform="test", device_id=device_no_area.id, entity_category=EntityCategory.CONFIG, ) hidden_entity_no_area = er.RegistryEntry( entity_id="light.hidden_no_area", unique_id="hidden-no-area-id", platform="test", device_id=device_no_area.id, hidden_by=er.RegistryEntryHider.USER, ) entity_diff_area = er.RegistryEntry( entity_id="light.diff_area", unique_id="diff-area-id", platform="test", device_id=device_diff_area.id, ) entity_in_area_a = er.RegistryEntry( entity_id="light.in_area_a", unique_id="in-area-a-id", platform="test", device_id=device_area_a.id, area_id="area-a", ) entity_in_area_b = er.RegistryEntry( entity_id="light.in_area_b", unique_id="in-area-b-id", platform="test", device_id=device_area_a.id, area_id="area-b", ) mock_registry( hass, { entity_in_own_area.entity_id: entity_in_own_area, config_entity_in_own_area.entity_id: config_entity_in_own_area, hidden_entity_in_own_area.entity_id: hidden_entity_in_own_area, entity_in_area.entity_id: entity_in_area, config_entity_in_area.entity_id: config_entity_in_area, hidden_entity_in_area.entity_id: hidden_entity_in_area, entity_in_other_area.entity_id: entity_in_other_area, entity_assigned_to_area.entity_id: entity_assigned_to_area, entity_no_area.entity_id: entity_no_area, config_entity_no_area.entity_id: config_entity_no_area, hidden_entity_no_area.entity_id: hidden_entity_no_area, entity_diff_area.entity_id: entity_diff_area, entity_in_area_a.entity_id: entity_in_area_a, entity_in_area_b.entity_id: entity_in_area_b, }, ) @pytest.fixture def label_mock(hass: HomeAssistant) -> None: """Mock including label info.""" hass.states.async_set("light.bowl", STATE_ON) hass.states.async_set("light.ceiling", STATE_OFF) hass.states.async_set("light.kitchen", STATE_OFF) area_with_labels = ar.AreaEntry( id="area-with-labels", name="Area with labels", aliases={}, floor_id=None, icon=None, labels={"label_area"}, picture=None, ) area_without_labels = ar.AreaEntry( id="area-no-labels", name="Area without labels", aliases={}, floor_id=None, icon=None, labels=set(), picture=None, ) mock_area_registry( hass, { area_with_labels.id: area_with_labels, area_without_labels.id: area_without_labels, }, ) device_has_label1 = dr.DeviceEntry(labels={"label1"}) device_has_label2 = dr.DeviceEntry(labels={"label2"}) device_has_labels = dr.DeviceEntry( labels={"label1", "label2"}, area_id=area_with_labels.id ) device_no_labels = dr.DeviceEntry( id="device-no-labels", area_id=area_without_labels.id ) mock_device_registry( hass, { device_has_label1.id: device_has_label1, device_has_label2.id: device_has_label2, device_has_labels.id: device_has_labels, device_no_labels.id: device_no_labels, }, ) entity_with_my_label = er.RegistryEntry( entity_id="light.with_my_label", unique_id="with_my_label", platform="test", labels={"my-label"}, ) hidden_entity_with_my_label = er.RegistryEntry( entity_id="light.hidden_with_my_label", unique_id="hidden_with_my_label", platform="test", labels={"my-label"}, hidden_by=er.RegistryEntryHider.USER, ) config_entity_with_my_label = er.RegistryEntry( entity_id="light.config_with_my_label", unique_id="config_with_my_label", platform="test", labels={"my-label"}, entity_category=EntityCategory.CONFIG, ) entity_with_label1_from_device = er.RegistryEntry( entity_id="light.with_label1_from_device", unique_id="with_label1_from_device", platform="test", device_id=device_has_label1.id, ) entity_with_label1_from_device_and_different_area = er.RegistryEntry( entity_id="light.with_label1_from_device_diff_area", unique_id="with_label1_from_device_diff_area", platform="test", device_id=device_has_label1.id, area_id=area_without_labels.id, ) entity_with_label1_and_label2_from_device = er.RegistryEntry( entity_id="light.with_label1_and_label2_from_device", unique_id="with_label1_and_label2_from_device", platform="test", labels={"label1"}, device_id=device_has_label2.id, ) entity_with_labels_from_device = er.RegistryEntry( entity_id="light.with_labels_from_device", unique_id="with_labels_from_device", platform="test", device_id=device_has_labels.id, ) entity_with_no_labels = er.RegistryEntry( entity_id="light.no_labels", unique_id="no_labels", platform="test", device_id=device_no_labels.id, ) mock_registry( hass, { config_entity_with_my_label.entity_id: config_entity_with_my_label, entity_with_label1_and_label2_from_device.entity_id: entity_with_label1_and_label2_from_device, entity_with_label1_from_device.entity_id: entity_with_label1_from_device, entity_with_label1_from_device_and_different_area.entity_id: entity_with_label1_from_device_and_different_area, entity_with_labels_from_device.entity_id: entity_with_labels_from_device, entity_with_my_label.entity_id: entity_with_my_label, entity_with_no_labels.entity_id: entity_with_no_labels, hidden_entity_with_my_label.entity_id: hidden_entity_with_my_label, }, ) async def test_call_from_config(hass: HomeAssistant) -> None: """Test the sync wrapper of service.async_call_from_config.""" calls = async_mock_service(hass, "test_domain", "test_service") config = { "service_template": "{{ 'test_domain.test_service' }}", "entity_id": "hello.world", "data": {"hello": "goodbye"}, } await hass.async_add_executor_job(service.call_from_config, hass, config) await hass.async_block_till_done() assert calls[0].data == {"hello": "goodbye", "entity_id": ["hello.world"]} async def test_service_call(hass: HomeAssistant) -> None: """Test service call with templating.""" calls = async_mock_service(hass, "test_domain", "test_service") config = { "action": "{{ 'test_domain.test_service' }}", "entity_id": "hello.world", "data": { "hello": "{{ 'goodbye' }}", "effect": {"value": "{{ 'complex' }}", "simple": "simple"}, }, "data_template": {"list": ["{{ 'list' }}", "2"]}, "target": { "area_id": "test-area-id", "entity_id": "will.be_overridden", "floor_id": "test-floor-id", }, } await service.async_call_from_config(hass, config) await hass.async_block_till_done() assert dict(calls[0].data) == { "hello": "goodbye", "effect": { "value": "complex", "simple": "simple", }, "list": ["list", "2"], "entity_id": ["hello.world"], "area_id": ["test-area-id"], "floor_id": ["test-floor-id"], } config = { "action": "{{ 'test_domain.test_service' }}", "target": { "area_id": ["area-42", "{{ 'area-51' }}"], "device_id": ["abcdef", "{{ 'fedcba' }}"], "entity_id": ["light.static", "{{ 'light.dynamic' }}"], "floor_id": ["floor-first", "{{ 'floor-second' }}"], }, } await service.async_call_from_config(hass, config) await hass.async_block_till_done() assert dict(calls[1].data) == { "area_id": ["area-42", "area-51"], "device_id": ["abcdef", "fedcba"], "entity_id": ["light.static", "light.dynamic"], "floor_id": ["floor-first", "floor-second"], } config = { "action": "{{ 'test_domain.test_service' }}", "target": "{{ var_target }}", } await service.async_call_from_config( hass, config, variables={ "var_target": { "entity_id": "light.static", "area_id": ["area-42", "area-51"], }, }, ) await hass.async_block_till_done() assert dict(calls[2].data) == { "area_id": ["area-42", "area-51"], "entity_id": ["light.static"], } async def test_service_template_service_call(hass: HomeAssistant) -> None: """Test legacy service_template call with templating.""" calls = async_mock_service(hass, "test_domain", "test_service") config = { "service_template": "{{ 'test_domain.test_service' }}", "entity_id": "hello.world", "data": {"hello": "goodbye"}, } await service.async_call_from_config(hass, config) await hass.async_block_till_done() assert calls[0].data == {"hello": "goodbye", "entity_id": ["hello.world"]} async def test_passing_variables_to_templates(hass: HomeAssistant) -> None: """Test passing variables to templates.""" calls = async_mock_service(hass, "test_domain", "test_service") config = { "service_template": "{{ var_service }}", "entity_id": "hello.world", "data_template": {"hello": "{{ var_data }}"}, } await service.async_call_from_config( hass, config, variables={ "var_service": "test_domain.test_service", "var_data": "goodbye", }, ) await hass.async_block_till_done() assert calls[0].data == {"hello": "goodbye", "entity_id": ["hello.world"]} async def test_bad_template(hass: HomeAssistant) -> None: """Test passing bad template.""" calls = async_mock_service(hass, "test_domain", "test_service") config = { "service_template": "{{ var_service }}", "entity_id": "hello.world", "data_template": {"hello": "{{ states + unknown_var }}"}, } await service.async_call_from_config( hass, config, variables={ "var_service": "test_domain.test_service", "var_data": "goodbye", }, ) await hass.async_block_till_done() assert len(calls) == 0 async def test_split_entity_string(hass: HomeAssistant) -> None: """Test splitting of entity string.""" calls = async_mock_service(hass, "test_domain", "test_service") await service.async_call_from_config( hass, { "action": "test_domain.test_service", "entity_id": "hello.world, sensor.beer", }, ) await hass.async_block_till_done() assert calls[-1].data.get("entity_id") == ["hello.world", "sensor.beer"] async def test_not_mutate_input(hass: HomeAssistant) -> None: """Test for immutable input.""" async_mock_service(hass, "test_domain", "test_service") config = { "action": "test_domain.test_service", "entity_id": "hello.world, sensor.beer", "data": {"hello": 1}, "data_template": {"nested": {"value": "{{ 1 + 1 }}"}}, } orig = deepcopy(config) # Validate both the original and the copy config = cv.SERVICE_SCHEMA(config) orig = cv.SERVICE_SCHEMA(orig) await service.async_call_from_config(hass, config, validate_config=False) assert orig == config @patch("homeassistant.helpers.service._LOGGER.error") async def test_fail_silently_if_no_service(mock_log, hass: HomeAssistant) -> None: """Test failing if service is missing.""" await service.async_call_from_config(hass, None) assert mock_log.call_count == 1 await service.async_call_from_config(hass, {}) assert mock_log.call_count == 2 await service.async_call_from_config(hass, {"action": "invalid"}) assert mock_log.call_count == 3 async def test_service_call_entry_id( hass: HomeAssistant, entity_registry: er.EntityRegistry ) -> None: """Test service call with entity specified by entity registry ID.""" calls = async_mock_service(hass, "test_domain", "test_service") entry = entity_registry.async_get_or_create( "hello", "hue", "1234", suggested_object_id="world" ) assert entry.entity_id == "hello.world" config = { "action": "test_domain.test_service", "target": {"entity_id": entry.id}, } await service.async_call_from_config(hass, config) await hass.async_block_till_done() assert dict(calls[0].data) == {"entity_id": ["hello.world"]} @pytest.mark.parametrize("target", ["all", "none"]) async def test_service_call_all_none(hass: HomeAssistant, target) -> None: """Test service call targeting all.""" calls = async_mock_service(hass, "test_domain", "test_service") config = { "action": "test_domain.test_service", "target": {"entity_id": target}, } await service.async_call_from_config(hass, config) await hass.async_block_till_done() assert dict(calls[0].data) == {"entity_id": target} async def test_extract_entity_ids(hass: HomeAssistant) -> None: """Test extract_entity_ids method.""" hass.states.async_set("light.Bowl", STATE_ON) hass.states.async_set("light.Ceiling", STATE_OFF) hass.states.async_set("light.Kitchen", STATE_OFF) assert await async_setup_component(hass, "group", {}) await hass.async_block_till_done() await Group.async_create_group( hass, "test", created_by_service=False, entity_ids=["light.Ceiling", "light.Kitchen"], icon=None, mode=None, object_id=None, order=None, ) call = ServiceCall(hass, "light", "turn_on", {ATTR_ENTITY_ID: "light.Bowl"}) assert {"light.bowl"} == await service.async_extract_entity_ids(hass, call) call = ServiceCall(hass, "light", "turn_on", {ATTR_ENTITY_ID: "group.test"}) assert {"light.ceiling", "light.kitchen"} == await service.async_extract_entity_ids( hass, call ) assert {"group.test"} == await service.async_extract_entity_ids( hass, call, expand_group=False ) assert ( await service.async_extract_entity_ids( hass, ServiceCall(hass, "light", "turn_on", {ATTR_ENTITY_ID: ENTITY_MATCH_NONE}), ) == set() ) async def test_extract_entity_ids_from_area( hass: HomeAssistant, floor_area_mock ) -> None: """Test extract_entity_ids method with areas.""" call = ServiceCall(hass, "light", "turn_on", {"area_id": "own-area"}) assert { "light.in_own_area", } == await service.async_extract_entity_ids(hass, call) call = ServiceCall(hass, "light", "turn_on", {"area_id": "test-area"}) assert { "light.in_area", "light.assigned_to_area", } == await service.async_extract_entity_ids(hass, call) call = ServiceCall( hass, "light", "turn_on", {"area_id": ["test-area", "diff-area"]} ) assert { "light.in_area", "light.diff_area", "light.assigned_to_area", } == await service.async_extract_entity_ids(hass, call) assert ( await service.async_extract_entity_ids( hass, ServiceCall(hass, "light", "turn_on", {"area_id": ENTITY_MATCH_NONE}) ) == set() ) async def test_extract_entity_ids_from_devices( hass: HomeAssistant, floor_area_mock ) -> None: """Test extract_entity_ids method with devices.""" assert await service.async_extract_entity_ids( hass, ServiceCall(hass, "light", "turn_on", {"device_id": "device-no-area-id"}) ) == { "light.no_area", } assert await service.async_extract_entity_ids( hass, ServiceCall(hass, "light", "turn_on", {"device_id": "device-area-a-id"}) ) == { "light.in_area_a", "light.in_area_b", } assert ( await service.async_extract_entity_ids( hass, ServiceCall(hass, "light", "turn_on", {"device_id": "non-existing-id"}), ) == set() ) @pytest.mark.usefixtures("floor_area_mock") async def test_extract_entity_ids_from_floor(hass: HomeAssistant) -> None: """Test extract_entity_ids method with floors.""" call = ServiceCall(hass, "light", "turn_on", {"floor_id": "test-floor"}) assert { "light.in_area", "light.assigned_to_area", } == await service.async_extract_entity_ids(hass, call) call = ServiceCall( hass, "light", "turn_on", {"floor_id": ["test-floor", "floor-a"]} ) assert { "light.in_area", "light.assigned_to_area", "light.in_area_a", } == await service.async_extract_entity_ids(hass, call) assert ( await service.async_extract_entity_ids( hass, ServiceCall(hass, "light", "turn_on", {"floor_id": ENTITY_MATCH_NONE}) ) == set() ) @pytest.mark.usefixtures("label_mock") async def test_extract_entity_ids_from_labels(hass: HomeAssistant) -> None: """Test extract_entity_ids method with labels.""" call = ServiceCall(hass, "light", "turn_on", {"label_id": "my-label"}) assert { "light.with_my_label", } == await service.async_extract_entity_ids(hass, call) call = ServiceCall(hass, "light", "turn_on", {"label_id": "label1"}) assert { "light.with_label1_from_device", "light.with_label1_from_device_diff_area", "light.with_labels_from_device", "light.with_label1_and_label2_from_device", } == await service.async_extract_entity_ids(hass, call) call = ServiceCall(hass, "light", "turn_on", {"label_id": ["label2"]}) assert { "light.with_labels_from_device", "light.with_label1_and_label2_from_device", } == await service.async_extract_entity_ids(hass, call) call = ServiceCall(hass, "light", "turn_on", {"label_id": ["label_area"]}) assert { "light.with_labels_from_device", } == await service.async_extract_entity_ids(hass, call) assert ( await service.async_extract_entity_ids( hass, ServiceCall(hass, "light", "turn_on", {"label_id": ENTITY_MATCH_NONE}) ) == set() ) async def test_async_get_all_descriptions(hass: HomeAssistant) -> None: """Test async_get_all_descriptions.""" group_config = {DOMAIN_GROUP: {}} assert await async_setup_component(hass, DOMAIN_GROUP, group_config) assert await async_setup_component(hass, DOMAIN_SYSTEM_HEALTH, {}) with patch( "homeassistant.helpers.service._load_services_files", side_effect=service._load_services_files, ) as proxy_load_services_files: descriptions = await service.async_get_all_descriptions(hass) # Test we only load services.yaml for integrations with services.yaml # And system_health has no services assert proxy_load_services_files.mock_calls[0][1][1] == unordered( [ await async_get_integration(hass, DOMAIN_GROUP), ] ) assert len(descriptions) == 1 assert DOMAIN_GROUP in descriptions assert "description" in descriptions[DOMAIN_GROUP]["reload"] assert "fields" in descriptions[DOMAIN_GROUP]["reload"] # Does not have services assert DOMAIN_SYSTEM_HEALTH not in descriptions logger_config = {DOMAIN_LOGGER: {}} async def async_get_translations( hass: HomeAssistant, language: str, category: str, integrations: Iterable[str] | None = None, config_flow: bool | None = None, ) -> dict[str, Any]: """Return all backend translations.""" translation_key_prefix = f"component.{DOMAIN_LOGGER}.services.set_default_level" return { f"{translation_key_prefix}.name": "Translated name", f"{translation_key_prefix}.description": "Translated description", f"{translation_key_prefix}.fields.level.name": "Field name", f"{translation_key_prefix}.fields.level.description": "Field description", f"{translation_key_prefix}.fields.level.example": "Field example", } with patch( "homeassistant.helpers.service.translation.async_get_translations", side_effect=async_get_translations, ): await async_setup_component(hass, DOMAIN_LOGGER, logger_config) descriptions = await service.async_get_all_descriptions(hass) assert len(descriptions) == 2 assert DOMAIN_LOGGER in descriptions assert descriptions[DOMAIN_LOGGER]["set_default_level"]["name"] == "Translated name" assert ( descriptions[DOMAIN_LOGGER]["set_default_level"]["description"] == "Translated description" ) assert ( descriptions[DOMAIN_LOGGER]["set_default_level"]["fields"]["level"]["name"] == "Field name" ) assert ( descriptions[DOMAIN_LOGGER]["set_default_level"]["fields"]["level"][ "description" ] == "Field description" ) assert ( descriptions[DOMAIN_LOGGER]["set_default_level"]["fields"]["level"]["example"] == "Field example" ) hass.services.async_register(DOMAIN_LOGGER, "new_service", lambda x: None, None) service.async_set_service_schema( hass, DOMAIN_LOGGER, "new_service", {"description": "new service"} ) descriptions = await service.async_get_all_descriptions(hass) assert "description" in descriptions[DOMAIN_LOGGER]["new_service"] assert descriptions[DOMAIN_LOGGER]["new_service"]["description"] == "new service" hass.services.async_register( DOMAIN_LOGGER, "another_new_service", lambda x: None, None ) hass.services.async_register( DOMAIN_LOGGER, "service_with_optional_response", lambda x: None, None, SupportsResponse.OPTIONAL, ) hass.services.async_register( DOMAIN_LOGGER, "service_with_only_response", lambda x: None, None, SupportsResponse.ONLY, ) hass.services.async_register( DOMAIN_LOGGER, "another_service_with_response", lambda x: None, None, SupportsResponse.OPTIONAL, ) service.async_set_service_schema( hass, DOMAIN_LOGGER, "another_service_with_response", {"description": "response service"}, ) descriptions = await service.async_get_all_descriptions(hass) assert "another_new_service" in descriptions[DOMAIN_LOGGER] assert "service_with_optional_response" in descriptions[DOMAIN_LOGGER] assert descriptions[DOMAIN_LOGGER]["service_with_optional_response"][ "response" ] == {"optional": True} assert "service_with_only_response" in descriptions[DOMAIN_LOGGER] assert descriptions[DOMAIN_LOGGER]["service_with_only_response"]["response"] == { "optional": False } assert "another_service_with_response" in descriptions[DOMAIN_LOGGER] assert descriptions[DOMAIN_LOGGER]["another_service_with_response"]["response"] == { "optional": True } # Verify the cache returns the same object assert await service.async_get_all_descriptions(hass) is descriptions async def test_async_get_all_descriptions_dot_keys(hass: HomeAssistant) -> None: """Test async_get_all_descriptions with keys starting with a period.""" service_descriptions = """ .anchor: &anchor selector: text: test_service: fields: test: *anchor """ domain = "test_domain" hass.services.async_register(domain, "test_service", lambda call: None) mock_integration(hass, MockModule(domain), top_level_files={"services.yaml"}) assert await async_setup_component(hass, domain, {}) def load_yaml(fname, secrets=None): with io.StringIO(service_descriptions) as file: return parse_yaml(file) with ( patch( "homeassistant.helpers.service._load_services_files", side_effect=service._load_services_files, ) as proxy_load_services_files, patch( "homeassistant.util.yaml.loader.load_yaml", side_effect=load_yaml, ) as mock_load_yaml, ): descriptions = await service.async_get_all_descriptions(hass) mock_load_yaml.assert_called_once_with("services.yaml", None) assert proxy_load_services_files.mock_calls[0][1][1] == unordered( [ await async_get_integration(hass, domain), ] ) assert descriptions == { "test_domain": { "test_service": { "description": "", "fields": {"test": {"selector": {"text": None}}}, "name": "", } } } async def test_async_get_all_descriptions_filter(hass: HomeAssistant) -> None: """Test async_get_all_descriptions with filters.""" service_descriptions = """ test_service: target: entity: domain: alarm_control_panel supported_features: - alarm_control_panel.AlarmControlPanelEntityFeature.ARM_HOME fields: temperature: filter: supported_features: - alarm_control_panel.AlarmControlPanelEntityFeature.ARM_HOME attribute: supported_color_modes: - light.ColorMode.COLOR_TEMP selector: number: advanced_stuff: fields: temperature: filter: supported_features: - alarm_control_panel.AlarmControlPanelEntityFeature.ARM_HOME attribute: supported_color_modes: - light.ColorMode.COLOR_TEMP selector: number: """ domain = "test_domain" hass.services.async_register(domain, "test_service", lambda call: None) mock_integration(hass, MockModule(domain), top_level_files={"services.yaml"}) assert await async_setup_component(hass, domain, {}) def load_yaml(fname, secrets=None): with io.StringIO(service_descriptions) as file: return parse_yaml(file) with ( patch( "homeassistant.helpers.service._load_services_files", side_effect=service._load_services_files, ) as proxy_load_services_files, patch( "homeassistant.util.yaml.loader.load_yaml", side_effect=load_yaml, ) as mock_load_yaml, ): descriptions = await service.async_get_all_descriptions(hass) mock_load_yaml.assert_called_once_with("services.yaml", None) assert proxy_load_services_files.mock_calls[0][1][1] == unordered( [ await async_get_integration(hass, domain), ] ) test_service_schema = { "description": "", "fields": { "advanced_stuff": { "fields": { "temperature": { "filter": { "attribute": {"supported_color_modes": ["color_temp"]}, "supported_features": [1], }, "selector": {"number": None}, }, }, }, "temperature": { "filter": { "attribute": {"supported_color_modes": ["color_temp"]}, "supported_features": [1], }, "selector": {"number": None}, }, }, "name": "", "target": { "entity": [ { "domain": ["alarm_control_panel"], "supported_features": [1], }, ], }, } assert descriptions == { "test_domain": {"test_service": test_service_schema}, } async def test_async_get_all_descriptions_failing_integration( hass: HomeAssistant, caplog: pytest.LogCaptureFixture ) -> None: """Test async_get_all_descriptions when async_get_integrations returns an exception.""" group_config = {DOMAIN_GROUP: {}} await async_setup_component(hass, DOMAIN_GROUP, group_config) descriptions = await service.async_get_all_descriptions(hass) assert len(descriptions) == 1 assert "description" in descriptions["group"]["reload"] assert "fields" in descriptions["group"]["reload"] logger_config = {DOMAIN_LOGGER: {}} await async_setup_component(hass, DOMAIN_LOGGER, logger_config) with ( patch( "homeassistant.helpers.service.async_get_integrations", return_value={"logger": ImportError}, ), patch( "homeassistant.helpers.service.translation.async_get_translations", return_value={}, ), ): descriptions = await service.async_get_all_descriptions(hass) assert len(descriptions) == 2 assert "Failed to load integration: logger" in caplog.text # Services are empty defaults if the load fails but should # not raise assert descriptions[DOMAIN_LOGGER]["set_level"] == { "description": "", "fields": {}, "name": "", } hass.services.async_register(DOMAIN_LOGGER, "new_service", lambda x: None, None) service.async_set_service_schema( hass, DOMAIN_LOGGER, "new_service", {"description": "new service"} ) descriptions = await service.async_get_all_descriptions(hass) assert "description" in descriptions[DOMAIN_LOGGER]["new_service"] assert descriptions[DOMAIN_LOGGER]["new_service"]["description"] == "new service" hass.services.async_register( DOMAIN_LOGGER, "another_new_service", lambda x: None, None ) hass.services.async_register( DOMAIN_LOGGER, "service_with_optional_response", lambda x: None, None, SupportsResponse.OPTIONAL, ) hass.services.async_register( DOMAIN_LOGGER, "service_with_only_response", lambda x: None, None, SupportsResponse.ONLY, ) descriptions = await service.async_get_all_descriptions(hass) assert "another_new_service" in descriptions[DOMAIN_LOGGER] assert "service_with_optional_response" in descriptions[DOMAIN_LOGGER] assert descriptions[DOMAIN_LOGGER]["service_with_optional_response"][ "response" ] == {"optional": True} assert "service_with_only_response" in descriptions[DOMAIN_LOGGER] assert descriptions[DOMAIN_LOGGER]["service_with_only_response"]["response"] == { "optional": False } # Verify the cache returns the same object assert await service.async_get_all_descriptions(hass) is descriptions async def test_async_get_all_descriptions_dynamically_created_services( hass: HomeAssistant, caplog: pytest.LogCaptureFixture ) -> None: """Test async_get_all_descriptions when async_get_integrations when services are dynamic.""" group_config = {DOMAIN_GROUP: {}} await async_setup_component(hass, DOMAIN_GROUP, group_config) descriptions = await service.async_get_all_descriptions(hass) assert len(descriptions) == 1 assert "description" in descriptions["group"]["reload"] assert "fields" in descriptions["group"]["reload"] shell_command_config = {DOMAIN_SHELL_COMMAND: {"test_service": "ls /bin"}} await async_setup_component(hass, DOMAIN_SHELL_COMMAND, shell_command_config) descriptions = await service.async_get_all_descriptions(hass) assert len(descriptions) == 2 assert descriptions[DOMAIN_SHELL_COMMAND]["test_service"] == { "description": "", "fields": {}, "name": "", "response": {"optional": True}, } async def test_async_get_all_descriptions_new_service_added_while_loading( hass: HomeAssistant, ) -> None: """Test async_get_all_descriptions when a new service is added while loading translations.""" group_config = {DOMAIN_GROUP: {}} await async_setup_component(hass, DOMAIN_GROUP, group_config) descriptions = await service.async_get_all_descriptions(hass) assert len(descriptions) == 1 assert "description" in descriptions["group"]["reload"] assert "fields" in descriptions["group"]["reload"] logger_domain = DOMAIN_LOGGER logger_config = {logger_domain: {}} translations_called = asyncio.Event() translations_wait = asyncio.Event() async def async_get_translations( hass: HomeAssistant, language: str, category: str, integrations: Iterable[str] | None = None, config_flow: bool | None = None, ) -> dict[str, Any]: """Return all backend translations.""" translations_called.set() await translations_wait.wait() translation_key_prefix = f"component.{logger_domain}.services.set_default_level" return { f"{translation_key_prefix}.name": "Translated name", f"{translation_key_prefix}.description": "Translated description", f"{translation_key_prefix}.fields.level.name": "Field name", f"{translation_key_prefix}.fields.level.description": "Field description", f"{translation_key_prefix}.fields.level.example": "Field example", } with patch( "homeassistant.helpers.service.translation.async_get_translations", side_effect=async_get_translations, ): await async_setup_component(hass, logger_domain, logger_config) task = asyncio.create_task(service.async_get_all_descriptions(hass)) await translations_called.wait() # Now register a new service while translations are being loaded hass.services.async_register(logger_domain, "new_service", lambda x: None, None) service.async_set_service_schema( hass, logger_domain, "new_service", {"description": "new service"} ) translations_wait.set() descriptions = await task # Two domains should be present assert len(descriptions) == 2 logger_descriptions = descriptions[logger_domain] # The new service was loaded after the translations were loaded # so it should not appear until the next time we fetch assert "new_service" not in logger_descriptions set_default_level = logger_descriptions["set_default_level"] assert set_default_level["name"] == "Translated name" assert set_default_level["description"] == "Translated description" set_default_level_fields = set_default_level["fields"] assert set_default_level_fields["level"]["name"] == "Field name" assert set_default_level_fields["level"]["description"] == "Field description" assert set_default_level_fields["level"]["example"] == "Field example" descriptions = await service.async_get_all_descriptions(hass) assert "description" in descriptions[logger_domain]["new_service"] assert descriptions[logger_domain]["new_service"]["description"] == "new service" async def test_register_with_mixed_case(hass: HomeAssistant) -> None: """Test registering a service with mixed case. For backwards compatibility, we have historically allowed mixed case, and automatically converted it to lowercase. """ logger_config = {DOMAIN_LOGGER: {}} await async_setup_component(hass, DOMAIN_LOGGER, logger_config) logger_domain_mixed = "LoGgEr" hass.services.async_register( logger_domain_mixed, "NeW_SeRVICE", lambda x: None, None ) service.async_set_service_schema( hass, logger_domain_mixed, "NeW_SeRVICE", {"description": "new service"} ) descriptions = await service.async_get_all_descriptions(hass) assert "description" in descriptions[DOMAIN_LOGGER]["new_service"] assert descriptions[DOMAIN_LOGGER]["new_service"]["description"] == "new service" async def test_call_with_required_features(hass: HomeAssistant, mock_entities) -> None: """Test service calls invoked only if entity has required features.""" # Set up homeassistant component to fetch the translations await async_setup_component(hass, "homeassistant", {}) test_service_mock = AsyncMock(return_value=None) await service.entity_service_call( hass, mock_entities, HassJob(test_service_mock), ServiceCall(hass, "test_domain", "test_service", {"entity_id": "all"}), required_features=[SUPPORT_A], ) assert test_service_mock.call_count == 2 expected = [ mock_entities["light.kitchen"], mock_entities["light.bedroom"], ] actual = [call[0][0] for call in test_service_mock.call_args_list] assert all(entity in actual for entity in expected) # Test we raise if we target entity ID that does not support the service test_service_mock.reset_mock() with pytest.raises( exceptions.ServiceNotSupported, match="Entity light.living_room does not " "support action test_domain.test_service", ): await service.entity_service_call( hass, mock_entities, HassJob(test_service_mock), ServiceCall( hass, "test_domain", "test_service", {"entity_id": "light.living_room"} ), required_features=[SUPPORT_A], ) assert test_service_mock.call_count == 0 async def test_call_with_both_required_features( hass: HomeAssistant, mock_entities ) -> None: """Test service calls invoked only if entity has both features.""" test_service_mock = AsyncMock(return_value=None) await service.entity_service_call( hass, mock_entities, HassJob(test_service_mock), ServiceCall(hass, "test_domain", "test_service", {"entity_id": "all"}), required_features=[SUPPORT_A | SUPPORT_B], ) assert test_service_mock.call_count == 1 assert [call[0][0] for call in test_service_mock.call_args_list] == [ mock_entities["light.bedroom"] ] async def test_call_with_one_of_required_features( hass: HomeAssistant, mock_entities ) -> None: """Test service calls invoked with one entity having the required features.""" test_service_mock = AsyncMock(return_value=None) await service.entity_service_call( hass, mock_entities, HassJob(test_service_mock), ServiceCall(hass, "test_domain", "test_service", {"entity_id": "all"}), required_features=[SUPPORT_A, SUPPORT_C], ) assert test_service_mock.call_count == 3 expected = [ mock_entities["light.kitchen"], mock_entities["light.bedroom"], mock_entities["light.bathroom"], ] actual = [call[0][0] for call in test_service_mock.call_args_list] assert all(entity in actual for entity in expected) async def test_call_with_sync_func(hass: HomeAssistant, mock_entities) -> None: """Test invoking sync service calls.""" test_service_mock = Mock(return_value=None) await service.entity_service_call( hass, mock_entities, HassJob(test_service_mock), ServiceCall( hass, "test_domain", "test_service", {"entity_id": "light.kitchen"} ), ) assert test_service_mock.call_count == 1 async def test_call_with_sync_attr(hass: HomeAssistant, mock_entities) -> None: """Test invoking sync service calls.""" mock_method = mock_entities["light.kitchen"].sync_method = Mock(return_value=None) await service.entity_service_call( hass, mock_entities, "sync_method", ServiceCall( hass, "test_domain", "test_service", {"entity_id": "light.kitchen", "area_id": "abcd"}, ), ) assert mock_method.call_count == 1 # We pass empty kwargs because both entity_id and area_id are filtered out assert mock_method.mock_calls[0][2] == {} async def test_call_context_user_not_exist(hass: HomeAssistant) -> None: """Check we don't allow deleted users to do things.""" with pytest.raises(exceptions.UnknownUser) as err: await service.entity_service_call( hass, {}, Mock(), ServiceCall( hass, "test_domain", "test_service", context=Context(user_id="non-existing"), ), ) assert err.value.context.user_id == "non-existing" async def test_call_context_target_all( hass: HomeAssistant, mock_handle_entity_call, mock_entities ) -> None: """Check we only target allowed entities if targeting all.""" with patch( "homeassistant.auth.AuthManager.async_get_user", return_value=Mock( permissions=PolicyPermissions( {"entities": {"entity_ids": {"light.kitchen": True}}}, None ), is_admin=False, ), ): await service.entity_service_call( hass, mock_entities, Mock(), ServiceCall( hass, "test_domain", "test_service", data={"entity_id": ENTITY_MATCH_ALL}, context=Context(user_id="mock-id"), ), ) assert len(mock_handle_entity_call.mock_calls) == 1 assert mock_handle_entity_call.mock_calls[0][1][1].entity_id == "light.kitchen" async def test_call_context_target_specific( hass: HomeAssistant, mock_handle_entity_call, mock_entities ) -> None: """Check targeting specific entities.""" with patch( "homeassistant.auth.AuthManager.async_get_user", return_value=Mock( permissions=PolicyPermissions( {"entities": {"entity_ids": {"light.kitchen": True}}}, None ) ), ): await service.entity_service_call( hass, mock_entities, Mock(), ServiceCall( hass, "test_domain", "test_service", {"entity_id": "light.kitchen"}, context=Context(user_id="mock-id"), ), ) assert len(mock_handle_entity_call.mock_calls) == 1 assert mock_handle_entity_call.mock_calls[0][1][1].entity_id == "light.kitchen" async def test_call_context_target_specific_no_auth( hass: HomeAssistant, mock_handle_entity_call, mock_entities ) -> None: """Check targeting specific entities without auth.""" with ( pytest.raises(exceptions.Unauthorized) as err, patch( "homeassistant.auth.AuthManager.async_get_user", return_value=Mock(permissions=PolicyPermissions({}, None), is_admin=False), ), ): await service.entity_service_call( hass, mock_entities, Mock(), ServiceCall( hass, "test_domain", "test_service", {"entity_id": "light.kitchen"}, context=Context(user_id="mock-id"), ), ) assert err.value.context.user_id == "mock-id" assert err.value.entity_id == "light.kitchen" async def test_call_no_context_target_all( hass: HomeAssistant, mock_handle_entity_call, mock_entities ) -> None: """Check we target all if no user context given.""" await service.entity_service_call( hass, mock_entities, Mock(), ServiceCall( hass, "test_domain", "test_service", data={"entity_id": ENTITY_MATCH_ALL} ), ) assert len(mock_handle_entity_call.mock_calls) == 4 assert [call[1][1] for call in mock_handle_entity_call.mock_calls] == list( mock_entities.values() ) async def test_call_no_context_target_specific( hass: HomeAssistant, mock_handle_entity_call, mock_entities ) -> None: """Check we can target specified entities.""" await service.entity_service_call( hass, mock_entities, Mock(), ServiceCall( hass, "test_domain", "test_service", {"entity_id": ["light.kitchen", "light.non-existing"]}, ), ) assert len(mock_handle_entity_call.mock_calls) == 1 assert mock_handle_entity_call.mock_calls[0][1][1].entity_id == "light.kitchen" async def test_call_with_match_all( hass: HomeAssistant, mock_handle_entity_call, mock_entities, caplog: pytest.LogCaptureFixture, ) -> None: """Check we only target allowed entities if targeting all.""" await service.entity_service_call( hass, mock_entities, Mock(), ServiceCall(hass, "test_domain", "test_service", {"entity_id": "all"}), ) assert len(mock_handle_entity_call.mock_calls) == 4 assert [call[1][1] for call in mock_handle_entity_call.mock_calls] == list( mock_entities.values() ) async def test_call_with_omit_entity_id( hass: HomeAssistant, mock_handle_entity_call, mock_entities ) -> None: """Check service call if we do not pass an entity ID.""" await service.entity_service_call( hass, mock_entities, Mock(), ServiceCall(hass, "test_domain", "test_service"), ) assert len(mock_handle_entity_call.mock_calls) == 0 async def test_register_admin_service( hass: HomeAssistant, hass_read_only_user: MockUser, hass_admin_user: MockUser ) -> None: """Test the register admin service.""" calls = [] async def mock_service(call): calls.append(call) service.async_register_admin_service(hass, "test", "test", mock_service) service.async_register_admin_service( hass, "test", "test2", mock_service, vol.Schema({vol.Required("required"): cv.boolean}), ) with pytest.raises(exceptions.UnknownUser): await hass.services.async_call( "test", "test", {}, blocking=True, context=Context(user_id="non-existing"), ) assert len(calls) == 0 with pytest.raises(exceptions.Unauthorized): await hass.services.async_call( "test", "test", {}, blocking=True, context=Context(user_id=hass_read_only_user.id), ) assert len(calls) == 0 with pytest.raises(vol.Invalid): await hass.services.async_call( "test", "test", {"invalid": True}, blocking=True, context=Context(user_id=hass_admin_user.id), ) assert len(calls) == 0 with pytest.raises(vol.Invalid): await hass.services.async_call( "test", "test2", {}, blocking=True, context=Context(user_id=hass_admin_user.id), ) assert len(calls) == 0 await hass.services.async_call( "test", "test2", {"required": True}, blocking=True, context=Context(user_id=hass_admin_user.id), ) assert len(calls) == 1 assert calls[0].context.user_id == hass_admin_user.id async def test_domain_control_not_async(hass: HomeAssistant, mock_entities) -> None: """Test domain verification in a service call with an unknown user.""" calls = [] def mock_service_log(call): """Define a protected service.""" calls.append(call) with pytest.raises(exceptions.HomeAssistantError): service.verify_domain_control(hass, "test_domain")(mock_service_log) async def test_domain_control_unknown(hass: HomeAssistant, mock_entities) -> None: """Test domain verification in a service call with an unknown user.""" calls = [] async def mock_service_log(call): """Define a protected service.""" calls.append(call) with patch( "homeassistant.helpers.entity_registry.async_get", return_value=Mock(entities=mock_entities), ): protected_mock_service = service.verify_domain_control(hass, "test_domain")( mock_service_log ) hass.services.async_register( "test_domain", "test_service", protected_mock_service, schema=None ) with pytest.raises(exceptions.UnknownUser): await hass.services.async_call( "test_domain", "test_service", {}, blocking=True, context=Context(user_id="fake_user_id"), ) assert len(calls) == 0 async def test_domain_control_unauthorized( hass: HomeAssistant, hass_read_only_user: MockUser ) -> None: """Test domain verification in a service call with an unauthorized user.""" mock_registry( hass, { "light.kitchen": er.RegistryEntry( entity_id="light.kitchen", unique_id="kitchen", platform="test_domain", ) }, ) calls = [] async def mock_service_log(call): """Define a protected service.""" calls.append(call) protected_mock_service = service.verify_domain_control(hass, "test_domain")( mock_service_log ) hass.services.async_register( "test_domain", "test_service", protected_mock_service, schema=None ) with pytest.raises(exceptions.Unauthorized): await hass.services.async_call( "test_domain", "test_service", {}, blocking=True, context=Context(user_id=hass_read_only_user.id), ) assert len(calls) == 0 async def test_domain_control_admin( hass: HomeAssistant, hass_admin_user: MockUser ) -> None: """Test domain verification in a service call with an admin user.""" mock_registry( hass, { "light.kitchen": er.RegistryEntry( entity_id="light.kitchen", unique_id="kitchen", platform="test_domain", ) }, ) calls = [] async def mock_service_log(call): """Define a protected service.""" calls.append(call) protected_mock_service = service.verify_domain_control(hass, "test_domain")( mock_service_log ) hass.services.async_register( "test_domain", "test_service", protected_mock_service, schema=None ) await hass.services.async_call( "test_domain", "test_service", {}, blocking=True, context=Context(user_id=hass_admin_user.id), ) assert len(calls) == 1 async def test_domain_control_no_user(hass: HomeAssistant) -> None: """Test domain verification in a service call with no user.""" mock_registry( hass, { "light.kitchen": er.RegistryEntry( entity_id="light.kitchen", unique_id="kitchen", platform="test_domain", ) }, ) calls = [] async def mock_service_log(call): """Define a protected service.""" calls.append(call) protected_mock_service = service.verify_domain_control(hass, "test_domain")( mock_service_log ) hass.services.async_register( "test_domain", "test_service", protected_mock_service, schema=None ) await hass.services.async_call( "test_domain", "test_service", {}, blocking=True, context=Context(user_id=None), ) assert len(calls) == 1 async def test_extract_from_service_available_device(hass: HomeAssistant) -> None: """Test the extraction of entity from service and device is available.""" entities = [ MockEntity(name="test_1", entity_id="test_domain.test_1"), MockEntity(name="test_2", entity_id="test_domain.test_2", available=False), MockEntity(name="test_3", entity_id="test_domain.test_3"), MockEntity(name="test_4", entity_id="test_domain.test_4", available=False), ] call_1 = ServiceCall(hass, "test", "service", data={"entity_id": ENTITY_MATCH_ALL}) assert [ ent.entity_id for ent in (await service.async_extract_entities(hass, entities, call_1)) ] == ["test_domain.test_1", "test_domain.test_3"] call_2 = ServiceCall( hass, "test", "service", data={"entity_id": ["test_domain.test_3", "test_domain.test_4"]}, ) assert [ ent.entity_id for ent in (await service.async_extract_entities(hass, entities, call_2)) ] == ["test_domain.test_3"] assert ( await service.async_extract_entities( hass, entities, ServiceCall( hass, "test", "service", data={"entity_id": ENTITY_MATCH_NONE}, ), ) == [] ) async def test_extract_from_service_empty_if_no_entity_id(hass: HomeAssistant) -> None: """Test the extraction from service without specifying entity.""" entities = [ MockEntity(name="test_1", entity_id="test_domain.test_1"), MockEntity(name="test_2", entity_id="test_domain.test_2"), ] call = ServiceCall(hass, "test", "service") assert [ ent.entity_id for ent in (await service.async_extract_entities(hass, entities, call)) ] == [] async def test_extract_from_service_filter_out_non_existing_entities( hass: HomeAssistant, ) -> None: """Test the extraction of non existing entities from service.""" entities = [ MockEntity(name="test_1", entity_id="test_domain.test_1"), MockEntity(name="test_2", entity_id="test_domain.test_2"), ] call = ServiceCall( hass, "test", "service", {"entity_id": ["test_domain.test_2", "test_domain.non_exist"]}, ) assert [ ent.entity_id for ent in (await service.async_extract_entities(hass, entities, call)) ] == ["test_domain.test_2"] async def test_extract_from_service_area_id( hass: HomeAssistant, floor_area_mock ) -> None: """Test the extraction using area ID as reference.""" entities = [ MockEntity(name="in_area", entity_id="light.in_area"), MockEntity(name="no_area", entity_id="light.no_area"), MockEntity(name="diff_area", entity_id="light.diff_area"), ] call = ServiceCall(hass, "light", "turn_on", {"area_id": "test-area"}) extracted = await service.async_extract_entities(hass, entities, call) assert len(extracted) == 1 assert extracted[0].entity_id == "light.in_area" call = ServiceCall( hass, "light", "turn_on", {"area_id": ["test-area", "diff-area"]} ) extracted = await service.async_extract_entities(hass, entities, call) assert len(extracted) == 2 assert sorted(ent.entity_id for ent in extracted) == [ "light.diff_area", "light.in_area", ] call = ServiceCall( hass, "light", "turn_on", {"area_id": ["test-area", "diff-area"], "device_id": "device-no-area-id"}, ) extracted = await service.async_extract_entities(hass, entities, call) assert len(extracted) == 3 assert sorted(ent.entity_id for ent in extracted) == [ "light.diff_area", "light.in_area", "light.no_area", ] @pytest.mark.usefixtures("label_mock") async def test_extract_from_service_label_id(hass: HomeAssistant) -> None: """Test the extraction using label ID as reference.""" entities = [ MockEntity(name="with_my_label", entity_id="light.with_my_label"), MockEntity(name="no_labels", entity_id="light.no_labels"), MockEntity( name="with_labels_from_device", entity_id="light.with_labels_from_device" ), ] call = ServiceCall(hass, "light", "turn_on", {"label_id": "label_area"}) extracted = await service.async_extract_entities(hass, entities, call) assert len(extracted) == 1 assert extracted[0].entity_id == "light.with_labels_from_device" call = ServiceCall(hass, "light", "turn_on", {"label_id": "my-label"}) extracted = await service.async_extract_entities(hass, entities, call) assert len(extracted) == 1 assert extracted[0].entity_id == "light.with_my_label" call = ServiceCall(hass, "light", "turn_on", {"label_id": ["my-label", "label1"]}) extracted = await service.async_extract_entities(hass, entities, call) assert len(extracted) == 2 assert sorted(ent.entity_id for ent in extracted) == [ "light.with_labels_from_device", "light.with_my_label", ] call = ServiceCall( hass, "light", "turn_on", {"label_id": ["my-label", "label1"], "device_id": "device-no-labels"}, ) extracted = await service.async_extract_entities(hass, entities, call) assert len(extracted) == 3 assert sorted(ent.entity_id for ent in extracted) == [ "light.no_labels", "light.with_labels_from_device", "light.with_my_label", ] async def test_entity_service_call_warn_referenced( hass: HomeAssistant, caplog: pytest.LogCaptureFixture ) -> None: """Test we only warn for referenced entities in entity_service_call.""" call = ServiceCall( hass, "light", "turn_on", { "area_id": "non-existent-area", "entity_id": "non.existent", "device_id": "non-existent-device", "floor_id": "non-existent-floor", "label_id": "non-existent-label", }, ) await service.entity_service_call(hass, {}, "", call) assert ( "Referenced floors non-existent-floor, areas non-existent-area, " "devices non-existent-device, entities non.existent, " "labels non-existent-label are missing or not currently available" ) in caplog.text async def test_async_extract_entities_warn_referenced( hass: HomeAssistant, caplog: pytest.LogCaptureFixture ) -> None: """Test we only warn for referenced entities in async_extract_entities.""" call = ServiceCall( hass, "light", "turn_on", { "area_id": "non-existent-area", "entity_id": "non.existent", "device_id": "non-existent-device", "floor_id": "non-existent-floor", "label_id": "non-existent-label", }, ) extracted = await service.async_extract_entities(hass, {}, call) assert len(extracted) == 0 assert ( "Referenced floors non-existent-floor, areas non-existent-area, " "devices non-existent-device, entities non.existent, " "labels non-existent-label are missing or not currently available" ) in caplog.text async def test_async_extract_config_entry_ids(hass: HomeAssistant) -> None: """Test we can find devices that have no entities.""" device_no_entities = dr.DeviceEntry(id="device-no-entities", config_entries={"abc"}) call = ServiceCall( hass, "homeassistant", "reload_config_entry", { "device_id": "device-no-entities", }, ) mock_device_registry( hass, { device_no_entities.id: device_no_entities, }, ) assert await service.async_extract_config_entry_ids(hass, call) == {"abc"} async def test_reload_service_helper(hass: HomeAssistant) -> None: """Test the reload service helper.""" active_reload_calls = 0 reloaded = [] async def reload_service_handler(service_call: ServiceCall) -> None: """Remove all automations and load new ones from config.""" nonlocal active_reload_calls # Assert the reload helper prevents parallel reloads assert not active_reload_calls active_reload_calls += 1 if not (target := service_call.data.get("target")): reloaded.append("all") else: reloaded.append(target) await asyncio.sleep(0.01) active_reload_calls -= 1 def reload_targets(service_call: ServiceCall) -> set[str | None]: if target_id := service_call.data.get("target"): return {target_id} return {"target1", "target2", "target3", "target4"} # Test redundant reload of single targets reloader = service.ReloadServiceHelper(reload_service_handler, reload_targets) tasks = [ # This reload task will start executing first, (target1) reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target1"}) ), # These reload tasks will be deduplicated to (target2, target3, target4, target1) # while the first task is reloaded, note that target1 can't be deduplicated # because it's already being reloaded. reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target2"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target3"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target4"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target1"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target2"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target3"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target4"}) ), ] await asyncio.gather(*tasks) assert reloaded == unordered( ["target1", "target2", "target3", "target4", "target1"] ) # Test redundant reload of multiple targets + single target reloaded.clear() tasks = [ # This reload task will start executing first, (target1) reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target1"}) ), # These reload tasks will be deduplicated to (target2, target3, target4, all) # while the first task is reloaded. reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target2"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target3"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target4"}) ), reloader.execute_service(ServiceCall(hass, "test", "test")), ] await asyncio.gather(*tasks) assert reloaded == unordered(["target1", "target2", "target3", "target4", "all"]) # Test redundant reload of multiple targets + single target reloaded.clear() tasks = [ # This reload task will start executing first, (all) reloader.execute_service(ServiceCall(hass, "test", "test")), # These reload tasks will be deduplicated to (target1, target2, target3, target4) # while the first task is reloaded. reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target1"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target2"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target3"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target4"}) ), ] await asyncio.gather(*tasks) assert reloaded == unordered(["all", "target1", "target2", "target3", "target4"]) # Test redundant reload of single targets reloaded.clear() tasks = [ # This reload task will start executing first, (target1) reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target1"}) ), # These reload tasks will be deduplicated to (target2, target3, target4, target1) # while the first task is reloaded, note that target1 can't be deduplicated # because it's already being reloaded. reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target2"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target3"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target4"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target1"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target2"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target3"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target4"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target1"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target2"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target3"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target4"}) ), ] await asyncio.gather(*tasks) assert reloaded == unordered( ["target1", "target2", "target3", "target4", "target1"] ) # Test redundant reload of multiple targets + single target reloaded.clear() tasks = [ # This reload task will start executing first, (target1) reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target1"}) ), # These reload tasks will be deduplicated to (target2, target3, target4, all) # while the first task is reloaded. reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target2"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target3"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target4"}) ), reloader.execute_service(ServiceCall(hass, "test", "test")), reloader.execute_service(ServiceCall(hass, "test", "test")), ] await asyncio.gather(*tasks) assert reloaded == unordered(["target1", "target2", "target3", "target4", "all"]) # Test redundant reload of multiple targets + single target reloaded.clear() tasks = [ # This reload task will start executing first, (all) reloader.execute_service(ServiceCall(hass, "test", "test")), # These reload tasks will be deduplicated to (target1, target2, target3, target4) # while the first task is reloaded. reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target1"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target2"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target3"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target4"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target1"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target2"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target3"}) ), reloader.execute_service( ServiceCall(hass, "test", "test", {"target": "target4"}) ), ] await asyncio.gather(*tasks) assert reloaded == unordered(["all", "target1", "target2", "target3", "target4"])