Fix loading KNX UI entities with entity category set (#126290)

* Fix loading KNX UI entities with entity category set

* add test

* docstring fixes

* telegram order

* Optionally ignore telegram sending order in tests

because we can't know which platform initialises first
pull/126314/head
Matthias Alphart 2024-09-20 11:16:58 +02:00 committed by GitHub
parent 2062e49ae1
commit 87240bb96f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 134 additions and 66 deletions

View File

@ -2,20 +2,23 @@
from __future__ import annotations from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
from xknx.devices import Device as XknxDevice from xknx.devices import Device as XknxDevice
from homeassistant.const import CONF_ENTITY_CATEGORY, EntityCategory
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity import Entity from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_platform import EntityPlatform from homeassistant.helpers.entity_platform import EntityPlatform
from homeassistant.helpers.entity_registry import RegistryEntry from homeassistant.helpers.entity_registry import RegistryEntry
from .const import DOMAIN
from .storage.config_store import PlatformControllerBase
from .storage.const import CONF_DEVICE_INFO
if TYPE_CHECKING: if TYPE_CHECKING:
from . import KNXModule from . import KNXModule
from .storage.config_store import PlatformControllerBase
class KnxUiEntityPlatformController(PlatformControllerBase): class KnxUiEntityPlatformController(PlatformControllerBase):
"""Class to manage dynamic adding and reloading of UI entities.""" """Class to manage dynamic adding and reloading of UI entities."""
@ -93,13 +96,19 @@ class KnxYamlEntity(_KnxEntityBase):
self._device = device self._device = device
class KnxUiEntity(_KnxEntityBase, ABC): class KnxUiEntity(_KnxEntityBase):
"""Representation of a KNX UI entity.""" """Representation of a KNX UI entity."""
_attr_unique_id: str _attr_unique_id: str
_attr_has_entity_name = True
@abstractmethod
def __init__( def __init__(
self, knx_module: KNXModule, unique_id: str, config: dict[str, Any] self, knx_module: KNXModule, unique_id: str, entity_config: dict[str, Any]
) -> None: ) -> None:
"""Initialize the UI entity.""" """Initialize the UI entity."""
self._knx_module = knx_module
self._attr_unique_id = unique_id
if entity_category := entity_config.get(CONF_ENTITY_CATEGORY):
self._attr_entity_category = EntityCategory(entity_category)
if device_info := entity_config.get(CONF_DEVICE_INFO):
self._attr_device_info = DeviceInfo(identifiers={(DOMAIN, device_info)})

View File

@ -20,7 +20,6 @@ from homeassistant.components.light import (
) )
from homeassistant.const import CONF_ENTITY_CATEGORY, CONF_NAME, Platform from homeassistant.const import CONF_ENTITY_CATEGORY, CONF_NAME, Platform
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import ( from homeassistant.helpers.entity_platform import (
AddEntitiesCallback, AddEntitiesCallback,
async_get_current_platform, async_get_current_platform,
@ -35,7 +34,6 @@ from .schema import LightSchema
from .storage.const import ( from .storage.const import (
CONF_COLOR_TEMP_MAX, CONF_COLOR_TEMP_MAX,
CONF_COLOR_TEMP_MIN, CONF_COLOR_TEMP_MIN,
CONF_DEVICE_INFO,
CONF_DPT, CONF_DPT,
CONF_ENTITY, CONF_ENTITY,
CONF_GA_BLUE_BRIGHTNESS, CONF_GA_BLUE_BRIGHTNESS,
@ -554,21 +552,19 @@ class KnxYamlLight(_KnxLight, KnxYamlEntity):
class KnxUiLight(_KnxLight, KnxUiEntity): class KnxUiLight(_KnxLight, KnxUiEntity):
"""Representation of a KNX light.""" """Representation of a KNX light."""
_attr_has_entity_name = True
_device: XknxLight _device: XknxLight
def __init__( def __init__(
self, knx_module: KNXModule, unique_id: str, config: ConfigType self, knx_module: KNXModule, unique_id: str, config: ConfigType
) -> None: ) -> None:
"""Initialize of KNX light.""" """Initialize of KNX light."""
self._knx_module = knx_module super().__init__(
knx_module=knx_module,
unique_id=unique_id,
entity_config=config[CONF_ENTITY],
)
self._device = _create_ui_light( self._device = _create_ui_light(
knx_module.xknx, config[DOMAIN], config[CONF_ENTITY][CONF_NAME] knx_module.xknx, config[DOMAIN], config[CONF_ENTITY][CONF_NAME]
) )
self._attr_max_color_temp_kelvin: int = config[DOMAIN][CONF_COLOR_TEMP_MAX] self._attr_max_color_temp_kelvin: int = config[DOMAIN][CONF_COLOR_TEMP_MAX]
self._attr_min_color_temp_kelvin: int = config[DOMAIN][CONF_COLOR_TEMP_MIN] self._attr_min_color_temp_kelvin: int = config[DOMAIN][CONF_COLOR_TEMP_MIN]
self._attr_entity_category = config[CONF_ENTITY][CONF_ENTITY_CATEGORY]
self._attr_unique_id = unique_id
if device_info := config[CONF_ENTITY].get(CONF_DEVICE_INFO):
self._attr_device_info = DeviceInfo(identifiers={(DOMAIN, device_info)})

View File

@ -18,7 +18,6 @@ from homeassistant.const import (
Platform, Platform,
) )
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import ( from homeassistant.helpers.entity_platform import (
AddEntitiesCallback, AddEntitiesCallback,
async_get_current_platform, async_get_current_platform,
@ -38,7 +37,6 @@ from .const import (
from .entity import KnxUiEntity, KnxUiEntityPlatformController, KnxYamlEntity from .entity import KnxUiEntity, KnxUiEntityPlatformController, KnxYamlEntity
from .schema import SwitchSchema from .schema import SwitchSchema
from .storage.const import ( from .storage.const import (
CONF_DEVICE_INFO,
CONF_ENTITY, CONF_ENTITY,
CONF_GA_PASSIVE, CONF_GA_PASSIVE,
CONF_GA_STATE, CONF_GA_STATE,
@ -133,14 +131,17 @@ class KnxYamlSwitch(_KnxSwitch, KnxYamlEntity):
class KnxUiSwitch(_KnxSwitch, KnxUiEntity): class KnxUiSwitch(_KnxSwitch, KnxUiEntity):
"""Representation of a KNX switch configured from UI.""" """Representation of a KNX switch configured from UI."""
_attr_has_entity_name = True
_device: XknxSwitch _device: XknxSwitch
def __init__( def __init__(
self, knx_module: KNXModule, unique_id: str, config: dict[str, Any] self, knx_module: KNXModule, unique_id: str, config: dict[str, Any]
) -> None: ) -> None:
"""Initialize KNX switch.""" """Initialize KNX switch."""
self._knx_module = knx_module super().__init__(
knx_module=knx_module,
unique_id=unique_id,
entity_config=config[CONF_ENTITY],
)
self._device = XknxSwitch( self._device = XknxSwitch(
knx_module.xknx, knx_module.xknx,
name=config[CONF_ENTITY][CONF_NAME], name=config[CONF_ENTITY][CONF_NAME],
@ -153,7 +154,3 @@ class KnxUiSwitch(_KnxSwitch, KnxUiEntity):
sync_state=config[DOMAIN][CONF_SYNC_STATE], sync_state=config[DOMAIN][CONF_SYNC_STATE],
invert=config[DOMAIN][CONF_INVERT], invert=config[DOMAIN][CONF_INVERT],
) )
self._attr_entity_category = config[CONF_ENTITY][CONF_ENTITY_CATEGORY]
self._attr_unique_id = unique_id
if device_info := config[CONF_ENTITY].get(CONF_DEVICE_INFO):
self._attr_device_info = DeviceInfo(identifiers={(DOMAIN, device_info)})

View File

@ -18,22 +18,22 @@ async def test_something(hass, knx):
## Asserting outgoing telegrams ## Asserting outgoing telegrams
All outgoing telegrams are pushed to an assertion queue. Assert them in order they were sent. All outgoing telegrams are appended to an assertion list. Assert them in order they were sent or pass `ignore_order=True` to the assertion method.
- `knx.assert_no_telegram` - `knx.assert_no_telegram`
Asserts that no telegram was sent (assertion queue is empty). Asserts that no telegram was sent (assertion list is empty).
- `knx.assert_telegram_count(count: int)` - `knx.assert_telegram_count(count: int)`
Asserts that `count` telegrams were sent. Asserts that `count` telegrams were sent.
- `knx.assert_read(group_address: str, response: int | tuple[int, ...] | None = None)` - `knx.assert_read(group_address: str, response: int | tuple[int, ...] | None = None, ignore_order: bool = False)`
Asserts that a GroupValueRead telegram was sent to `group_address`. Asserts that a GroupValueRead telegram was sent to `group_address`.
The telegram will be removed from the assertion queue. The telegram will be removed from the assertion list.
Optionally inject incoming GroupValueResponse telegram after reception to clear the value reader waiting task. This can also be done manually with `knx.receive_response`. Optionally inject incoming GroupValueResponse telegram after reception to clear the value reader waiting task. This can also be done manually with `knx.receive_response`.
- `knx.assert_response(group_address: str, payload: int | tuple[int, ...])` - `knx.assert_response(group_address: str, payload: int | tuple[int, ...], ignore_order: bool = False)`
Asserts that a GroupValueResponse telegram with `payload` was sent to `group_address`. Asserts that a GroupValueResponse telegram with `payload` was sent to `group_address`.
The telegram will be removed from the assertion queue. The telegram will be removed from the assertion list.
- `knx.assert_write(group_address: str, payload: int | tuple[int, ...])` - `knx.assert_write(group_address: str, payload: int | tuple[int, ...], ignore_order: bool = False)`
Asserts that a GroupValueWrite telegram with `payload` was sent to `group_address`. Asserts that a GroupValueWrite telegram with `payload` was sent to `group_address`.
The telegram will be removed from the assertion queue. The telegram will be removed from the assertion list.
Change some states or call some services and assert outgoing telegrams. Change some states or call some services and assert outgoing telegrams.

View File

@ -57,9 +57,9 @@ class KNXTestKit:
self.hass: HomeAssistant = hass self.hass: HomeAssistant = hass
self.mock_config_entry: MockConfigEntry = mock_config_entry self.mock_config_entry: MockConfigEntry = mock_config_entry
self.xknx: XKNX self.xknx: XKNX
# outgoing telegrams will be put in the Queue instead of sent to the interface # outgoing telegrams will be put in the List instead of sent to the interface
# telegrams to an InternalGroupAddress won't be queued here # telegrams to an InternalGroupAddress won't be queued here
self._outgoing_telegrams: asyncio.Queue = asyncio.Queue() self._outgoing_telegrams: list[Telegram] = []
def assert_state(self, entity_id: str, state: str, **attributes) -> None: def assert_state(self, entity_id: str, state: str, **attributes) -> None:
"""Assert the state of an entity.""" """Assert the state of an entity."""
@ -76,7 +76,7 @@ class KNXTestKit:
async def patch_xknx_start(): async def patch_xknx_start():
"""Patch `xknx.start` for unittests.""" """Patch `xknx.start` for unittests."""
self.xknx.cemi_handler.send_telegram = AsyncMock( self.xknx.cemi_handler.send_telegram = AsyncMock(
side_effect=self._outgoing_telegrams.put side_effect=self._outgoing_telegrams.append
) )
# after XKNX.__init__() to not overwrite it by the config entry again # after XKNX.__init__() to not overwrite it by the config entry again
# before StateUpdater starts to avoid slow down of tests # before StateUpdater starts to avoid slow down of tests
@ -117,24 +117,22 @@ class KNXTestKit:
######################## ########################
def _list_remaining_telegrams(self) -> str: def _list_remaining_telegrams(self) -> str:
"""Return a string containing remaining outgoing telegrams in test Queue. One per line.""" """Return a string containing remaining outgoing telegrams in test List."""
remaining_telegrams = [] return "\n".join(map(str, self._outgoing_telegrams))
while not self._outgoing_telegrams.empty():
remaining_telegrams.append(self._outgoing_telegrams.get_nowait())
return "\n".join(map(str, remaining_telegrams))
async def assert_no_telegram(self) -> None: async def assert_no_telegram(self) -> None:
"""Assert if every telegram in test Queue was checked.""" """Assert if every telegram in test List was checked."""
await self.hass.async_block_till_done() await self.hass.async_block_till_done()
assert self._outgoing_telegrams.empty(), ( remaining_telegram_count = len(self._outgoing_telegrams)
f"Found remaining unasserted Telegrams: {self._outgoing_telegrams.qsize()}\n" assert not remaining_telegram_count, (
f"Found remaining unasserted Telegrams: {remaining_telegram_count}\n"
f"{self._list_remaining_telegrams()}" f"{self._list_remaining_telegrams()}"
) )
async def assert_telegram_count(self, count: int) -> None: async def assert_telegram_count(self, count: int) -> None:
"""Assert outgoing telegram count in test Queue.""" """Assert outgoing telegram count in test List."""
await self.hass.async_block_till_done() await self.hass.async_block_till_done()
actual_count = self._outgoing_telegrams.qsize() actual_count = len(self._outgoing_telegrams)
assert actual_count == count, ( assert actual_count == count, (
f"Outgoing telegrams: {actual_count} - Expected: {count}\n" f"Outgoing telegrams: {actual_count} - Expected: {count}\n"
f"{self._list_remaining_telegrams()}" f"{self._list_remaining_telegrams()}"
@ -149,52 +147,79 @@ class KNXTestKit:
group_address: str, group_address: str,
payload: int | tuple[int, ...] | None, payload: int | tuple[int, ...] | None,
apci_type: type[APCI], apci_type: type[APCI],
ignore_order: bool = False,
) -> None: ) -> None:
"""Assert outgoing telegram. One by one in timely order.""" """Assert outgoing telegram. Optionally in timely order."""
await self.xknx.telegrams.join() await self.xknx.telegrams.join()
try: if not self._outgoing_telegrams:
telegram = self._outgoing_telegrams.get_nowait()
except asyncio.QueueEmpty as err:
raise AssertionError( raise AssertionError(
f"No Telegram found. Expected: {apci_type.__name__} -" f"No Telegram found. Expected: {apci_type.__name__} -"
f" {group_address} - {payload}" f" {group_address} - {payload}"
) from err )
_expected_ga = GroupAddress(group_address)
if ignore_order:
for telegram in self._outgoing_telegrams:
if (
telegram.destination_address == _expected_ga
and isinstance(telegram.payload, apci_type)
and (payload is None or telegram.payload.value.value == payload)
):
self._outgoing_telegrams.remove(telegram)
return
raise AssertionError(
f"Telegram not found. Expected: {apci_type.__name__} -"
f" {group_address} - {payload}"
f"\nUnasserted telegrams:\n{self._list_remaining_telegrams()}"
)
telegram = self._outgoing_telegrams.pop(0)
assert isinstance( assert isinstance(
telegram.payload, apci_type telegram.payload, apci_type
), f"APCI type mismatch in {telegram} - Expected: {apci_type.__name__}" ), f"APCI type mismatch in {telegram} - Expected: {apci_type.__name__}"
assert ( assert (
str(telegram.destination_address) == group_address telegram.destination_address == _expected_ga
), f"Group address mismatch in {telegram} - Expected: {group_address}" ), f"Group address mismatch in {telegram} - Expected: {group_address}"
if payload is not None: if payload is not None:
assert ( assert (
telegram.payload.value.value == payload # type: ignore[attr-defined] telegram.payload.value.value == payload # type: ignore[attr-defined]
), f"Payload mismatch in {telegram} - Expected: {payload}" ), f"Payload mismatch in {telegram} - Expected: {payload}"
async def assert_read( async def assert_read(
self, group_address: str, response: int | tuple[int, ...] | None = None self,
group_address: str,
response: int | tuple[int, ...] | None = None,
ignore_order: bool = False,
) -> None: ) -> None:
"""Assert outgoing GroupValueRead telegram. One by one in timely order. """Assert outgoing GroupValueRead telegram. Optionally in timely order.
Optionally inject incoming GroupValueResponse telegram after reception. Optionally inject incoming GroupValueResponse telegram after reception.
""" """
await self.assert_telegram(group_address, None, GroupValueRead) await self.assert_telegram(group_address, None, GroupValueRead, ignore_order)
if response is not None: if response is not None:
await self.receive_response(group_address, response) await self.receive_response(group_address, response)
async def assert_response( async def assert_response(
self, group_address: str, payload: int | tuple[int, ...] self,
group_address: str,
payload: int | tuple[int, ...],
ignore_order: bool = False,
) -> None: ) -> None:
"""Assert outgoing GroupValueResponse telegram. One by one in timely order.""" """Assert outgoing GroupValueResponse telegram. Optionally in timely order."""
await self.assert_telegram(group_address, payload, GroupValueResponse) await self.assert_telegram(
group_address, payload, GroupValueResponse, ignore_order
)
async def assert_write( async def assert_write(
self, group_address: str, payload: int | tuple[int, ...] self,
group_address: str,
payload: int | tuple[int, ...],
ignore_order: bool = False,
) -> None: ) -> None:
"""Assert outgoing GroupValueWrite telegram. One by one in timely order.""" """Assert outgoing GroupValueWrite telegram. Optionally in timely order."""
await self.assert_telegram(group_address, payload, GroupValueWrite) await self.assert_telegram(
group_address, payload, GroupValueWrite, ignore_order
)
#################### ####################
# Incoming telegrams # Incoming telegrams

View File

@ -23,7 +23,26 @@
} }
} }
}, },
"light": {} "light": {
"knx_es_01J85ZKTFHSZNG4X9DYBE592TF": {
"entity": {
"name": "test",
"device_info": null,
"entity_category": "config"
},
"knx": {
"color_temp_min": 2700,
"color_temp_max": 6000,
"_light_color_mode_schema": "default",
"ga_switch": {
"write": "1/1/21",
"state": "1/0/21",
"passive": []
},
"sync_state": true
}
}
}
} }
} }
} }

View File

@ -58,7 +58,8 @@ async def test_remove_device(
await knx.setup_integration({}) await knx.setup_integration({})
client = await hass_ws_client(hass) client = await hass_ws_client(hass)
await knx.assert_read("1/0/45", response=True) await knx.assert_read("1/0/21", response=True, ignore_order=True) # test light
await knx.assert_read("1/0/45", response=True, ignore_order=True) # test switch
assert hass_storage[KNX_CONFIG_STORAGE_KEY]["data"]["entities"].get("switch") assert hass_storage[KNX_CONFIG_STORAGE_KEY]["data"]["entities"].get("switch")
test_device = device_registry.async_get_device( test_device = device_registry.async_get_device(

View File

@ -19,8 +19,9 @@ from homeassistant.components.light import (
ATTR_RGBW_COLOR, ATTR_RGBW_COLOR,
ColorMode, ColorMode,
) )
from homeassistant.const import CONF_NAME, STATE_OFF, STATE_ON from homeassistant.const import CONF_NAME, STATE_OFF, STATE_ON, EntityCategory
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from . import KnxEntityGenerator from . import KnxEntityGenerator
from .conftest import KNXTestKit from .conftest import KNXTestKit
@ -1159,7 +1160,7 @@ async def test_light_ui_create(
knx: KNXTestKit, knx: KNXTestKit,
create_ui_entity: KnxEntityGenerator, create_ui_entity: KnxEntityGenerator,
) -> None: ) -> None:
"""Test creating a switch.""" """Test creating a light."""
await knx.setup_integration({}) await knx.setup_integration({})
await create_ui_entity( await create_ui_entity(
platform=Platform.LIGHT, platform=Platform.LIGHT,
@ -1192,7 +1193,7 @@ async def test_light_ui_color_temp(
color_temp_mode: str, color_temp_mode: str,
raw_ct: tuple[int, ...], raw_ct: tuple[int, ...],
) -> None: ) -> None:
"""Test creating a switch.""" """Test creating a color-temp light."""
await knx.setup_integration({}) await knx.setup_integration({})
await create_ui_entity( await create_ui_entity(
platform=Platform.LIGHT, platform=Platform.LIGHT,
@ -1218,3 +1219,23 @@ async def test_light_ui_color_temp(
state = hass.states.get("light.test") state = hass.states.get("light.test")
assert state.state is STATE_ON assert state.state is STATE_ON
assert state.attributes[ATTR_COLOR_TEMP_KELVIN] == pytest.approx(4200, abs=1) assert state.attributes[ATTR_COLOR_TEMP_KELVIN] == pytest.approx(4200, abs=1)
async def test_light_ui_load(
hass: HomeAssistant,
knx: KNXTestKit,
load_config_store: None,
entity_registry: er.EntityRegistry,
) -> None:
"""Test loading a light from storage."""
await knx.setup_integration({})
await knx.assert_read("1/0/21", response=True, ignore_order=True)
# unrelated switch in config store
await knx.assert_read("1/0/45", response=True, ignore_order=True)
state = hass.states.get("light.test")
assert state.state is STATE_ON
entity = entity_registry.async_get("light.test")
assert entity.entity_category is EntityCategory.CONFIG