Add Shelly script events entities (#135979)

* When an event is received from a script component on a shelly device, this event is send to the hass event bus

* Event emitted from a script will be send to the corresponding event entity

* Added tests for the shelly script event

* The event entity for script are now hidden by default

* Forgot to enable script event entities by default for the test

* Made serveral improvement for the shelly script event entity
- Added device name to event entity
- The event entity is now only created when a script has any event types
- The test for this entity now uses snapshots

* Shelly script event entities will not be create for the BLE scanning script and will now be automatically removed when the script no longer exsists

* Changed variable name to avoid confusion with _id

* Removed old const from first implementation and removed _script_event_listeners and used _event_listeners instead to listen for script events
pull/138565/head
Wouter 2025-02-03 21:41:39 +01:00 committed by GitHub
parent 649319f4ee
commit 6fa87da5bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 298 additions and 2 deletions

View File

@ -116,6 +116,10 @@ BATTERY_DEVICES_WITH_PERMANENT_CONNECTION: Final = [
# Button/Click events for Block & RPC devices
EVENT_SHELLY_CLICK: Final = "shelly.click"
SHELLY_EMIT_EVENT_PATTERN: Final = re.compile(
r"(?:Shelly\s*\.\s*emitEvent\s*\(\s*[\"'`])(\w*)"
)
ATTR_CLICK_TYPE: Final = "click_type"
ATTR_CHANNEL: Final = "channel"
ATTR_DEVICE: Final = "device"

View File

@ -6,6 +6,7 @@ from collections.abc import Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Final
from aioshelly.ble.const import BLE_SCRIPT_NAME
from aioshelly.block_device import Block
from aioshelly.const import MODEL_I3, RPC_GENERATIONS
@ -28,10 +29,12 @@ from .const import (
from .coordinator import ShellyBlockCoordinator, ShellyConfigEntry, ShellyRpcCoordinator
from .entity import ShellyBlockEntity
from .utils import (
async_remove_orphaned_entities,
async_remove_shelly_entity,
get_device_entry_gen,
get_rpc_entity_name,
get_rpc_key_instances,
get_rpc_script_event_types,
is_block_momentary_input,
is_rpc_momentary_input,
)
@ -68,6 +71,13 @@ RPC_EVENT: Final = ShellyRpcEventDescription(
config, status, key
),
)
SCRIPT_EVENT: Final = ShellyRpcEventDescription(
key="script",
translation_key="script",
device_class=None,
entity_registry_enabled_default=False,
has_entity_name=True,
)
async def async_setup_entry(
@ -95,6 +105,33 @@ async def async_setup_entry(
async_remove_shelly_entity(hass, EVENT_DOMAIN, unique_id)
else:
entities.append(ShellyRpcEvent(coordinator, key, RPC_EVENT))
script_instances = get_rpc_key_instances(
coordinator.device.status, SCRIPT_EVENT.key
)
for script in script_instances:
script_name = get_rpc_entity_name(coordinator.device, script)
if script_name == BLE_SCRIPT_NAME:
continue
event_types = await get_rpc_script_event_types(
coordinator.device, int(script.split(":")[-1])
)
if not event_types:
continue
entities.append(ShellyRpcScriptEvent(coordinator, script, event_types))
# If a script is removed, from the device configuration, we need to remove orphaned entities
async_remove_orphaned_entities(
hass,
config_entry.entry_id,
coordinator.mac,
EVENT_DOMAIN,
coordinator.device.status,
"script",
)
else:
coordinator = config_entry.runtime_data.block
if TYPE_CHECKING:
@ -170,7 +207,7 @@ class ShellyRpcEvent(CoordinatorEntity[ShellyRpcCoordinator], EventEntity):
) -> None:
"""Initialize Shelly entity."""
super().__init__(coordinator)
self.input_index = int(key.split(":")[-1])
self.event_id = int(key.split(":")[-1])
self._attr_device_info = DeviceInfo(
connections={(CONNECTION_NETWORK_MAC, coordinator.mac)}
)
@ -181,6 +218,7 @@ class ShellyRpcEvent(CoordinatorEntity[ShellyRpcCoordinator], EventEntity):
async def async_added_to_hass(self) -> None:
"""When entity is added to hass."""
await super().async_added_to_hass()
self.async_on_remove(
self.coordinator.async_subscribe_input_events(self._async_handle_event)
)
@ -188,6 +226,42 @@ class ShellyRpcEvent(CoordinatorEntity[ShellyRpcCoordinator], EventEntity):
@callback
def _async_handle_event(self, event: dict[str, Any]) -> None:
"""Handle the demo button event."""
if event["id"] == self.input_index:
if event["id"] == self.event_id:
self._trigger_event(event["event"])
self.async_write_ha_state()
class ShellyRpcScriptEvent(ShellyRpcEvent):
"""Represent RPC script event entity."""
def __init__(
self,
coordinator: ShellyRpcCoordinator,
key: str,
event_types: list[str],
) -> None:
"""Initialize Shelly script event entity."""
super().__init__(coordinator, key, SCRIPT_EVENT)
self.component = key
self._attr_event_types = event_types
async def async_added_to_hass(self) -> None:
"""When entity is added to hass."""
await super(CoordinatorEntity, self).async_added_to_hass()
self.async_on_remove(
self.coordinator.async_subscribe_events(self._async_handle_event)
)
@callback
def _async_handle_event(self, event: dict[str, Any]) -> None:
"""Handle script event."""
if event.get("component") == self.component:
event_type = event.get("event")
if event_type not in self.event_types:
# This can happen if we didn't find this event type in the script
return
self._trigger_event(event_type, event.get("data"))
self.async_write_ha_state()

View File

@ -56,6 +56,7 @@ from .const import (
RPC_INPUTS_EVENTS_TYPES,
SHBTN_INPUTS_EVENTS_TYPES,
SHBTN_MODELS,
SHELLY_EMIT_EVENT_PATTERN,
SHIX3_1_INPUTS_EVENTS_TYPES,
UPTIME_DEVIATION,
VIRTUAL_COMPONENTS_MAP,
@ -598,3 +599,10 @@ def get_rpc_ws_url(hass: HomeAssistant) -> str | None:
url = URL(raw_url)
ws_url = url.with_scheme("wss" if url.scheme == "https" else "ws")
return str(ws_url.joinpath(API_WS_URL.removeprefix("/")))
async def get_rpc_script_event_types(device: RpcDevice, id: int) -> list[str]:
"""Return a list of event types for a specific script."""
code_response = await device.script_getcode(id)
matches = SHELLY_EMIT_EVENT_PATTERN.finditer(code_response["data"])
return sorted([*{str(event_type.group(1)) for event_type in matches}])

View File

@ -2,6 +2,15 @@
from unittest.mock import AsyncMock, Mock, PropertyMock, patch
from aioshelly.ble.const import (
BLE_CODE,
BLE_SCAN_RESULT_EVENT,
BLE_SCAN_RESULT_VERSION,
BLE_SCRIPT_NAME,
VAR_ACTIVE,
VAR_EVENT_TYPE,
VAR_VERSION,
)
from aioshelly.block_device import BlockDevice, BlockUpdateType
from aioshelly.const import MODEL_1, MODEL_25, MODEL_PLUS_2PM
from aioshelly.rpc_device import RpcDevice, RpcUpdateType
@ -201,6 +210,9 @@ MOCK_CONFIG = {
"wifi": {"sta": {"enable": True}, "sta1": {"enable": False}},
"ws": {"enable": False, "server": None},
"voltmeter:100": {"xvoltage": {"unit": "ppm"}},
"script:1": {"id": 1, "name": "test_script.js", "enable": True},
"script:2": {"id": 2, "name": "test_script_2.js", "enable": False},
"script:3": {"id": 3, "name": BLE_SCRIPT_NAME, "enable": False},
}
@ -335,6 +347,15 @@ MOCK_STATUS_RPC = {
"current_C": 12.3,
"output": True,
},
"script:1": {
"id": 1,
"running": True,
"mem_used": 826,
"mem_peak": 1666,
"mem_free": 24360,
},
"script:2": {"id": 2, "running": False},
"script:3": {"id": 3, "running": False},
"humidity:0": {"rh": 44.4},
"sys": {
"available_updates": {
@ -347,6 +368,28 @@ MOCK_STATUS_RPC = {
"wifi": {"rssi": -63},
}
MOCK_SCRIPTS = [
""""
function eventHandler(event, userdata) {
if (typeof event.component !== "string")
return;
let component = event.component.substring(0, 5);
if (component === "input") {
let id = Number(event.component.substring(6));
Shelly.emitEvent("input_event", { id: id });
}
}
Shelly.addEventHandler(eventHandler);
Shelly.emitEvent("script_start");
""",
'console.log("Hello World!")',
BLE_CODE.replace(VAR_ACTIVE, "true")
.replace(VAR_EVENT_TYPE, BLE_SCAN_RESULT_EVENT)
.replace(VAR_VERSION, str(BLE_SCAN_RESULT_VERSION)),
]
@pytest.fixture(autouse=True)
def mock_coap():
@ -430,6 +473,9 @@ def _mock_rpc_device(version: str | None = None):
firmware_version="some fw string",
initialized=True,
connected=True,
script_getcode=AsyncMock(
side_effect=lambda script_id: {"data": MOCK_SCRIPTS[script_id - 1]}
),
)
type(device).name = PropertyMock(return_value="Test name")
return device

View File

@ -0,0 +1,69 @@
# serializer version: 1
# name: test_rpc_script_1_event[event.test_name_test_script_js-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': dict({
'event_types': list([
'input_event',
'script_start',
]),
}),
'config_entry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'event',
'entity_category': None,
'entity_id': 'event.test_name_test_script_js',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': None,
'original_icon': None,
'original_name': 'test_script.js',
'platform': 'shelly',
'previous_unique_id': None,
'supported_features': 0,
'translation_key': 'script',
'unique_id': '123456789ABC-script:1',
'unit_of_measurement': None,
})
# ---
# name: test_rpc_script_1_event[event.test_name_test_script_js-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'event_type': None,
'event_types': list([
'input_event',
'script_start',
]),
'friendly_name': 'Test name test_script.js',
}),
'context': <ANY>,
'entity_id': 'event.test_name_test_script_js',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'unknown',
})
# ---
# name: test_rpc_script_2_event[event.test_name_test_script_2_js-entry]
None
# ---
# name: test_rpc_script_2_event[event.test_name_test_script_2_js-state]
None
# ---
# name: test_rpc_script_ble_event[event.test_name_aioshelly_ble_integration-entry]
None
# ---
# name: test_rpc_script_ble_event[event.test_name_aioshelly_ble_integration-state]
None
# ---

View File

@ -2,9 +2,11 @@
from unittest.mock import Mock
from aioshelly.ble.const import BLE_SCRIPT_NAME
from aioshelly.const import MODEL_I3
import pytest
from pytest_unordered import unordered
from syrupy import SnapshotAssertion
from homeassistant.components.event import (
ATTR_EVENT_TYPE,
@ -64,6 +66,99 @@ async def test_rpc_button(
assert state.attributes.get(ATTR_EVENT_TYPE) == "single_push"
@pytest.mark.usefixtures("entity_registry_enabled_by_default")
async def test_rpc_script_1_event(
hass: HomeAssistant,
mock_rpc_device: Mock,
entity_registry: EntityRegistry,
monkeypatch: pytest.MonkeyPatch,
snapshot: SnapshotAssertion,
) -> None:
"""Test script event."""
await init_integration(hass, 2)
entity_id = "event.test_name_test_script_js"
state = hass.states.get(entity_id)
assert state == snapshot(name=f"{entity_id}-state")
entry = entity_registry.async_get(entity_id)
assert entry == snapshot(name=f"{entity_id}-entry")
inject_rpc_device_event(
monkeypatch,
mock_rpc_device,
{
"events": [
{
"component": "script:1",
"id": 1,
"event": "script_start",
"ts": 1668522399.2,
}
],
"ts": 1668522399.2,
},
)
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state.attributes.get(ATTR_EVENT_TYPE) == "script_start"
inject_rpc_device_event(
monkeypatch,
mock_rpc_device,
{
"events": [
{
"component": "script:1",
"id": 1,
"event": "unknown_event",
"ts": 1668522399.2,
}
],
"ts": 1668522399.2,
},
)
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state.attributes.get(ATTR_EVENT_TYPE) != "unknown_event"
@pytest.mark.usefixtures("entity_registry_enabled_by_default")
async def test_rpc_script_2_event(
hass: HomeAssistant,
entity_registry: EntityRegistry,
snapshot: SnapshotAssertion,
) -> None:
"""Test that scripts without any emitEvent will not get an event entity."""
await init_integration(hass, 2)
entity_id = "event.test_name_test_script_2_js"
state = hass.states.get(entity_id)
assert state == snapshot(name=f"{entity_id}-state")
entry = entity_registry.async_get(entity_id)
assert entry == snapshot(name=f"{entity_id}-entry")
@pytest.mark.usefixtures("entity_registry_enabled_by_default")
async def test_rpc_script_ble_event(
hass: HomeAssistant,
entity_registry: EntityRegistry,
snapshot: SnapshotAssertion,
) -> None:
"""Test that the ble script will not get an event entity."""
await init_integration(hass, 2)
entity_id = f"event.test_name_{BLE_SCRIPT_NAME}"
state = hass.states.get(entity_id)
assert state == snapshot(name=f"{entity_id}-state")
entry = entity_registry.async_get(entity_id)
assert entry == snapshot(name=f"{entity_id}-entry")
async def test_rpc_event_removal(
hass: HomeAssistant,
mock_rpc_device: Mock,