Add test coverage for ESPHome service calls (#107042)

pull/107080/head
J. Nick Koston 2024-01-03 22:37:56 -10:00 committed by GitHub
parent 4b3a1b5d2d
commit 9c69212ad5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 213 additions and 4 deletions

View File

@ -203,14 +203,19 @@ class ESPHomeManager:
template.render_complex(data_template, service.variables)
)
except TemplateError as ex:
_LOGGER.error("Error rendering data template for %s: %s", self.host, ex)
_LOGGER.error(
"Error rendering data template %s for %s: %s",
service.data_template,
self.host,
ex,
)
return
if service.is_event:
device_id = self.device_id
# ESPHome uses service call packet for both events and service calls
# Ensure the user can only send events of form 'esphome.xyz'
if domain != "esphome":
if domain != DOMAIN:
_LOGGER.error(
"Can only generate events under esphome domain! (%s)", self.host
)

View File

@ -14,6 +14,7 @@ from aioesphomeapi import (
DeviceInfo,
EntityInfo,
EntityState,
HomeassistantServiceCall,
ReconnectLogic,
UserService,
)
@ -176,6 +177,7 @@ class MockESPHomeDevice:
"""Init the mock."""
self.entry = entry
self.state_callback: Callable[[EntityState], None]
self.service_call_callback: Callable[[HomeassistantServiceCall], None]
self.on_disconnect: Callable[[bool], None]
self.on_connect: Callable[[bool], None]
@ -183,6 +185,16 @@ class MockESPHomeDevice:
"""Set the state callback."""
self.state_callback = state_callback
def set_service_call_callback(
self, callback: Callable[[HomeassistantServiceCall], None]
) -> None:
"""Set the service call callback."""
self.service_call_callback = callback
def mock_service_call(self, service_call: HomeassistantServiceCall) -> None:
"""Mock a service call."""
self.service_call_callback(service_call)
def set_state(self, state: EntityState) -> None:
"""Mock setting state."""
self.state_callback(state)
@ -242,12 +254,19 @@ async def _mock_generic_device_entry(
for state in states:
callback(state)
async def _subscribe_service_calls(
callback: Callable[[HomeassistantServiceCall], None],
) -> None:
"""Subscribe to service calls."""
mock_device.set_service_call_callback(callback)
mock_client.device_info = AsyncMock(return_value=device_info)
mock_client.subscribe_voice_assistant = AsyncMock(return_value=Mock())
mock_client.list_entities_services = AsyncMock(
return_value=mock_list_entities_services
)
mock_client.subscribe_states = _subscribe_states
mock_client.subscribe_service_calls = _subscribe_service_calls
try_connect_done = Event()

View File

@ -7,6 +7,7 @@ from aioesphomeapi import (
DeviceInfo,
EntityInfo,
EntityState,
HomeassistantServiceCall,
UserService,
UserServiceArg,
UserServiceArgType,
@ -16,19 +17,203 @@ import pytest
from homeassistant import config_entries
from homeassistant.components import dhcp
from homeassistant.components.esphome.const import (
CONF_ALLOW_SERVICE_CALLS,
CONF_DEVICE_NAME,
DOMAIN,
STABLE_BLE_VERSION_STR,
)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_PORT
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers import issue_registry as ir
from homeassistant.setup import async_setup_component
from .conftest import MockESPHomeDevice
from tests.common import MockConfigEntry
from tests.common import MockConfigEntry, async_capture_events, async_mock_service
async def test_esphome_device_service_calls_not_allowed(
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: Callable[
[APIClient, list[EntityInfo], list[UserService], list[EntityState]],
Awaitable[MockESPHomeDevice],
],
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test a device with service calls not allowed."""
entity_info = []
states = []
user_service = []
device: MockESPHomeDevice = await mock_esphome_device(
mock_client=mock_client,
entity_info=entity_info,
user_service=user_service,
states=states,
device_info={"esphome_version": "2023.3.0"},
)
await hass.async_block_till_done()
mock_esphome_test = async_mock_service(hass, "esphome", "test")
device.mock_service_call(
HomeassistantServiceCall(
service="esphome.test",
data={},
)
)
await hass.async_block_till_done()
assert len(mock_esphome_test) == 0
issue_registry = ir.async_get(hass)
issue = issue_registry.async_get_issue(
"esphome", "service_calls_not_enabled-11:22:33:44:55:aa"
)
assert issue is not None
assert (
"If you trust this device and want to allow access "
"for it to make Home Assistant service calls, you can "
"enable this functionality in the options flow"
) in caplog.text
async def test_esphome_device_service_calls_allowed(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_client: APIClient,
mock_esphome_device: Callable[
[APIClient, list[EntityInfo], list[UserService], list[EntityState]],
Awaitable[MockESPHomeDevice],
],
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test a device with service calls are allowed."""
entity_info = []
states = []
user_service = []
mock_config_entry.options = {CONF_ALLOW_SERVICE_CALLS: True}
device: MockESPHomeDevice = await mock_esphome_device(
mock_client=mock_client,
entity_info=entity_info,
user_service=user_service,
states=states,
device_info={"esphome_version": "2023.3.0"},
entry=mock_config_entry,
)
await hass.async_block_till_done()
mock_calls: list[ServiceCall] = []
async def _mock_service(call: ServiceCall) -> None:
mock_calls.append(call)
hass.services.async_register(DOMAIN, "test", _mock_service)
device.mock_service_call(
HomeassistantServiceCall(
service="esphome.test",
data={"raw": "data"},
)
)
await hass.async_block_till_done()
issue_registry = ir.async_get(hass)
issue = issue_registry.async_get_issue(
"esphome", "service_calls_not_enabled-11:22:33:44:55:aa"
)
assert issue is None
assert len(mock_calls) == 1
service_call = mock_calls[0]
assert service_call.domain == DOMAIN
assert service_call.service == "test"
assert service_call.data == {"raw": "data"}
mock_calls.clear()
device.mock_service_call(
HomeassistantServiceCall(
service="esphome.test",
data_template={"raw": "{{invalid}}"},
)
)
await hass.async_block_till_done()
assert (
"Template variable warning: 'invalid' is undefined when rendering '{{invalid}}'"
in caplog.text
)
assert len(mock_calls) == 1
service_call = mock_calls[0]
assert service_call.domain == DOMAIN
assert service_call.service == "test"
assert service_call.data == {"raw": ""}
mock_calls.clear()
caplog.clear()
device.mock_service_call(
HomeassistantServiceCall(
service="esphome.test",
data_template={"raw": "{{-- invalid --}}"},
)
)
await hass.async_block_till_done()
assert "TemplateSyntaxError" in caplog.text
assert "{{-- invalid --}}" in caplog.text
assert len(mock_calls) == 0
mock_calls.clear()
caplog.clear()
device.mock_service_call(
HomeassistantServiceCall(
service="esphome.test",
data_template={"raw": "{{var}}"},
variables={"var": "value"},
)
)
await hass.async_block_till_done()
assert len(mock_calls) == 1
service_call = mock_calls[0]
assert service_call.domain == DOMAIN
assert service_call.service == "test"
assert service_call.data == {"raw": "value"}
mock_calls.clear()
device.mock_service_call(
HomeassistantServiceCall(
service="esphome.test",
data_template={"raw": "valid"},
)
)
await hass.async_block_till_done()
assert len(mock_calls) == 1
service_call = mock_calls[0]
assert service_call.domain == DOMAIN
assert service_call.service == "test"
assert service_call.data == {"raw": "valid"}
mock_calls.clear()
# Try firing events
events = async_capture_events(hass, "esphome.test")
device.mock_service_call(
HomeassistantServiceCall(
service="esphome.test",
is_event=True,
data={"raw": "event"},
)
)
await hass.async_block_till_done()
assert len(events) == 1
event = events[0]
assert event.data["raw"] == "event"
assert event.event_type == "esphome.test"
events.clear()
caplog.clear()
# Try firing events for disallowed domain
events = async_capture_events(hass, "wrong.test")
device.mock_service_call(
HomeassistantServiceCall(
service="wrong.test",
is_event=True,
data={"raw": "event"},
)
)
await hass.async_block_till_done()
assert len(events) == 0
assert "Can only generate events under esphome domain" in caplog.text
events.clear()
async def test_esphome_device_with_old_bluetooth(