2021-07-20 04:39:19 +00:00
|
|
|
"""Conftest for the KNX integration."""
|
|
|
|
from __future__ import annotations
|
2021-04-11 21:01:30 +00:00
|
|
|
|
2021-07-20 04:39:19 +00:00
|
|
|
import asyncio
|
|
|
|
from unittest.mock import DEFAULT, AsyncMock, Mock, patch
|
2021-04-11 21:01:30 +00:00
|
|
|
|
|
|
|
import pytest
|
2021-07-20 04:39:19 +00:00
|
|
|
from xknx import XKNX
|
2021-10-17 07:43:18 +00:00
|
|
|
from xknx.core import XknxConnectionState
|
2021-07-20 04:39:19 +00:00
|
|
|
from xknx.dpt import DPTArray, DPTBinary
|
2021-11-20 10:30:41 +00:00
|
|
|
from xknx.io import DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT
|
2021-07-20 04:39:19 +00:00
|
|
|
from xknx.telegram import Telegram, TelegramDirection
|
|
|
|
from xknx.telegram.address import GroupAddress, IndividualAddress
|
|
|
|
from xknx.telegram.apci import APCI, GroupValueRead, GroupValueResponse, GroupValueWrite
|
2021-04-11 21:01:30 +00:00
|
|
|
|
2021-11-20 10:30:41 +00:00
|
|
|
from homeassistant.components.knx.const import (
|
|
|
|
CONF_KNX_AUTOMATIC,
|
|
|
|
CONF_KNX_CONNECTION_TYPE,
|
2022-03-29 14:46:02 +00:00
|
|
|
CONF_KNX_DEFAULT_RATE_LIMIT,
|
|
|
|
CONF_KNX_DEFAULT_STATE_UPDATER,
|
2021-11-20 10:30:41 +00:00
|
|
|
CONF_KNX_INDIVIDUAL_ADDRESS,
|
2022-03-29 14:46:02 +00:00
|
|
|
CONF_KNX_MCAST_GRP,
|
|
|
|
CONF_KNX_MCAST_PORT,
|
|
|
|
CONF_KNX_RATE_LIMIT,
|
|
|
|
CONF_KNX_STATE_UPDATER,
|
2021-11-20 10:30:41 +00:00
|
|
|
DOMAIN as KNX_DOMAIN,
|
|
|
|
)
|
2021-07-20 04:39:19 +00:00
|
|
|
from homeassistant.core import HomeAssistant
|
|
|
|
from homeassistant.setup import async_setup_component
|
2021-04-11 21:01:30 +00:00
|
|
|
|
2021-11-20 10:30:41 +00:00
|
|
|
from tests.common import MockConfigEntry
|
|
|
|
|
2021-07-20 04:39:19 +00:00
|
|
|
|
|
|
|
class KNXTestKit:
|
|
|
|
"""Test helper for the KNX integration."""
|
|
|
|
|
2021-07-28 17:04:11 +00:00
|
|
|
INDIVIDUAL_ADDRESS = "1.2.3"
|
|
|
|
|
2021-11-20 10:30:41 +00:00
|
|
|
def __init__(self, hass: HomeAssistant, mock_config_entry: MockConfigEntry):
|
2021-07-20 04:39:19 +00:00
|
|
|
"""Init KNX test helper class."""
|
|
|
|
self.hass: HomeAssistant = hass
|
2021-11-20 10:30:41 +00:00
|
|
|
self.mock_config_entry: MockConfigEntry = mock_config_entry
|
2021-07-20 04:39:19 +00:00
|
|
|
self.xknx: XKNX
|
|
|
|
# outgoing telegrams will be put in the Queue instead of sent to the interface
|
|
|
|
# telegrams to an InternalGroupAddress won't be queued here
|
|
|
|
self._outgoing_telegrams: asyncio.Queue = asyncio.Queue()
|
|
|
|
|
2021-11-04 21:43:38 +00:00
|
|
|
def assert_state(self, entity_id: str, state: str, **attributes) -> None:
|
|
|
|
"""Assert the state of an entity."""
|
|
|
|
test_state = self.hass.states.get(entity_id)
|
|
|
|
assert test_state.state == state
|
|
|
|
for attribute, value in attributes.items():
|
|
|
|
assert test_state.attributes.get(attribute) == value
|
|
|
|
|
2021-07-20 04:39:19 +00:00
|
|
|
async def setup_integration(self, config):
|
|
|
|
"""Create the KNX integration."""
|
|
|
|
|
2022-08-28 10:36:31 +00:00
|
|
|
async def patch_xknx_start():
|
|
|
|
"""Patch `xknx.start` for unittests."""
|
2022-07-26 08:32:26 +00:00
|
|
|
# after XKNX.__init__() to not overwrite it by the config entry again
|
|
|
|
# before StateUpdater starts to avoid slow down of tests
|
|
|
|
self.xknx.rate_limit = 0
|
2022-08-28 10:36:31 +00:00
|
|
|
# set XknxConnectionState.CONNECTED to avoid `unavailable` entities at startup
|
|
|
|
# and start StateUpdater. This would be awaited on normal startup too.
|
|
|
|
await self.xknx.connection_manager.connection_state_changed(
|
|
|
|
XknxConnectionState.CONNECTED
|
|
|
|
)
|
2022-07-26 08:32:26 +00:00
|
|
|
|
2021-07-20 04:39:19 +00:00
|
|
|
def knx_ip_interface_mock():
|
|
|
|
"""Create a xknx knx ip interface mock."""
|
|
|
|
mock = Mock()
|
2022-08-28 10:36:31 +00:00
|
|
|
mock.start = AsyncMock(side_effect=patch_xknx_start)
|
2021-07-20 04:39:19 +00:00
|
|
|
mock.stop = AsyncMock()
|
|
|
|
mock.send_telegram = AsyncMock(side_effect=self._outgoing_telegrams.put)
|
|
|
|
return mock
|
|
|
|
|
|
|
|
def fish_xknx(*args, **kwargs):
|
|
|
|
"""Get the XKNX object from the constructor call."""
|
2022-07-26 08:32:26 +00:00
|
|
|
self.xknx = args[0]
|
2021-07-20 04:39:19 +00:00
|
|
|
return DEFAULT
|
|
|
|
|
|
|
|
with patch(
|
2022-01-18 20:38:13 +00:00
|
|
|
"xknx.xknx.knx_interface_factory",
|
2021-07-20 04:39:19 +00:00
|
|
|
return_value=knx_ip_interface_mock(),
|
|
|
|
side_effect=fish_xknx,
|
|
|
|
):
|
2021-11-20 10:30:41 +00:00
|
|
|
self.mock_config_entry.add_to_hass(self.hass)
|
2021-07-20 04:39:19 +00:00
|
|
|
await async_setup_component(self.hass, KNX_DOMAIN, {KNX_DOMAIN: config})
|
|
|
|
await self.hass.async_block_till_done()
|
|
|
|
|
|
|
|
########################
|
|
|
|
# Telegram counter tests
|
|
|
|
########################
|
|
|
|
|
|
|
|
def _list_remaining_telegrams(self) -> str:
|
|
|
|
"""Return a string containing remaining outgoing telegrams in test Queue. One per line."""
|
|
|
|
remaining_telegrams = []
|
|
|
|
while not self._outgoing_telegrams.empty():
|
|
|
|
remaining_telegrams.append(self._outgoing_telegrams.get_nowait())
|
|
|
|
return "\n".join(map(str, remaining_telegrams))
|
|
|
|
|
|
|
|
async def assert_no_telegram(self) -> None:
|
|
|
|
"""Assert if every telegram in test Queue was checked."""
|
|
|
|
await self.hass.async_block_till_done()
|
|
|
|
assert self._outgoing_telegrams.empty(), (
|
|
|
|
f"Found remaining unasserted Telegrams: {self._outgoing_telegrams.qsize()}\n"
|
|
|
|
f"{self._list_remaining_telegrams()}"
|
|
|
|
)
|
|
|
|
|
|
|
|
async def assert_telegram_count(self, count: int) -> None:
|
|
|
|
"""Assert outgoing telegram count in test Queue."""
|
|
|
|
await self.hass.async_block_till_done()
|
|
|
|
actual_count = self._outgoing_telegrams.qsize()
|
|
|
|
assert actual_count == count, (
|
|
|
|
f"Outgoing telegrams: {actual_count} - Expected: {count}\n"
|
|
|
|
f"{self._list_remaining_telegrams()}"
|
|
|
|
)
|
|
|
|
|
|
|
|
####################
|
|
|
|
# APCI Service tests
|
|
|
|
####################
|
|
|
|
|
2021-12-29 17:15:48 +00:00
|
|
|
async def assert_telegram(
|
2021-07-20 04:39:19 +00:00
|
|
|
self,
|
|
|
|
group_address: str,
|
|
|
|
payload: int | tuple[int, ...] | None,
|
|
|
|
apci_type: type[APCI],
|
|
|
|
) -> None:
|
|
|
|
"""Assert outgoing telegram. One by one in timely order."""
|
2021-10-17 07:43:18 +00:00
|
|
|
await self.xknx.telegrams.join()
|
2021-07-20 04:39:19 +00:00
|
|
|
await self.hass.async_block_till_done()
|
|
|
|
try:
|
|
|
|
telegram = self._outgoing_telegrams.get_nowait()
|
|
|
|
except asyncio.QueueEmpty:
|
|
|
|
raise AssertionError(
|
|
|
|
f"No Telegram found. Expected: {apci_type.__name__} -"
|
|
|
|
f" {group_address} - {payload}"
|
|
|
|
)
|
|
|
|
|
|
|
|
assert isinstance(
|
|
|
|
telegram.payload, apci_type
|
|
|
|
), f"APCI type mismatch in {telegram} - Expected: {apci_type.__name__}"
|
|
|
|
|
2021-07-21 22:04:14 +00:00
|
|
|
assert (
|
|
|
|
str(telegram.destination_address) == group_address
|
|
|
|
), f"Group address mismatch in {telegram} - Expected: {group_address}"
|
|
|
|
|
2021-07-20 04:39:19 +00:00
|
|
|
if payload is not None:
|
|
|
|
assert (
|
|
|
|
telegram.payload.value.value == payload # type: ignore
|
|
|
|
), f"Payload mismatch in {telegram} - Expected: {payload}"
|
|
|
|
|
|
|
|
async def assert_read(self, group_address: str) -> None:
|
|
|
|
"""Assert outgoing GroupValueRead telegram. One by one in timely order."""
|
2021-12-29 17:15:48 +00:00
|
|
|
await self.assert_telegram(group_address, None, GroupValueRead)
|
2021-07-20 04:39:19 +00:00
|
|
|
|
|
|
|
async def assert_response(
|
|
|
|
self, group_address: str, payload: int | tuple[int, ...]
|
|
|
|
) -> None:
|
|
|
|
"""Assert outgoing GroupValueResponse telegram. One by one in timely order."""
|
2021-12-29 17:15:48 +00:00
|
|
|
await self.assert_telegram(group_address, payload, GroupValueResponse)
|
2021-07-20 04:39:19 +00:00
|
|
|
|
|
|
|
async def assert_write(
|
|
|
|
self, group_address: str, payload: int | tuple[int, ...]
|
|
|
|
) -> None:
|
|
|
|
"""Assert outgoing GroupValueWrite telegram. One by one in timely order."""
|
2021-12-29 17:15:48 +00:00
|
|
|
await self.assert_telegram(group_address, payload, GroupValueWrite)
|
2021-07-20 04:39:19 +00:00
|
|
|
|
|
|
|
####################
|
|
|
|
# Incoming telegrams
|
|
|
|
####################
|
|
|
|
|
2021-07-21 22:04:14 +00:00
|
|
|
@staticmethod
|
|
|
|
def _payload_value(payload: int | tuple[int, ...]) -> DPTArray | DPTBinary:
|
|
|
|
"""Prepare payload value for GroupValueWrite or GroupValueResponse."""
|
|
|
|
if isinstance(payload, int):
|
|
|
|
return DPTBinary(payload)
|
|
|
|
return DPTArray(payload)
|
|
|
|
|
2021-07-20 04:39:19 +00:00
|
|
|
async def _receive_telegram(self, group_address: str, payload: APCI) -> None:
|
|
|
|
"""Inject incoming KNX telegram."""
|
|
|
|
self.xknx.telegrams.put_nowait(
|
|
|
|
Telegram(
|
|
|
|
destination_address=GroupAddress(group_address),
|
|
|
|
direction=TelegramDirection.INCOMING,
|
|
|
|
payload=payload,
|
2021-07-28 17:04:11 +00:00
|
|
|
source_address=IndividualAddress(self.INDIVIDUAL_ADDRESS),
|
2021-07-20 04:39:19 +00:00
|
|
|
)
|
|
|
|
)
|
2021-10-14 17:54:48 +00:00
|
|
|
await self.xknx.telegrams.join()
|
2021-07-20 04:39:19 +00:00
|
|
|
await self.hass.async_block_till_done()
|
|
|
|
|
|
|
|
async def receive_read(
|
|
|
|
self,
|
|
|
|
group_address: str,
|
|
|
|
) -> None:
|
|
|
|
"""Inject incoming GroupValueRead telegram."""
|
|
|
|
await self._receive_telegram(group_address, GroupValueRead())
|
|
|
|
|
|
|
|
async def receive_response(
|
|
|
|
self, group_address: str, payload: int | tuple[int, ...]
|
|
|
|
) -> None:
|
|
|
|
"""Inject incoming GroupValueResponse telegram."""
|
|
|
|
payload_value = self._payload_value(payload)
|
|
|
|
await self._receive_telegram(group_address, GroupValueResponse(payload_value))
|
|
|
|
|
|
|
|
async def receive_write(
|
|
|
|
self, group_address: str, payload: int | tuple[int, ...]
|
|
|
|
) -> None:
|
|
|
|
"""Inject incoming GroupValueWrite telegram."""
|
|
|
|
payload_value = self._payload_value(payload)
|
|
|
|
await self._receive_telegram(group_address, GroupValueWrite(payload_value))
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
2021-11-20 10:30:41 +00:00
|
|
|
def mock_config_entry() -> MockConfigEntry:
|
|
|
|
"""Return the default mocked config entry."""
|
|
|
|
return MockConfigEntry(
|
|
|
|
title="KNX",
|
|
|
|
domain=KNX_DOMAIN,
|
|
|
|
data={
|
|
|
|
CONF_KNX_CONNECTION_TYPE: CONF_KNX_AUTOMATIC,
|
2022-03-29 14:46:02 +00:00
|
|
|
CONF_KNX_RATE_LIMIT: CONF_KNX_DEFAULT_RATE_LIMIT,
|
|
|
|
CONF_KNX_STATE_UPDATER: CONF_KNX_DEFAULT_STATE_UPDATER,
|
|
|
|
CONF_KNX_MCAST_PORT: DEFAULT_MCAST_PORT,
|
|
|
|
CONF_KNX_MCAST_GRP: DEFAULT_MCAST_GRP,
|
|
|
|
CONF_KNX_INDIVIDUAL_ADDRESS: XKNX.DEFAULT_ADDRESS,
|
2021-11-20 10:30:41 +00:00
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
async def knx(request, hass, mock_config_entry: MockConfigEntry):
|
2021-07-20 04:39:19 +00:00
|
|
|
"""Create a KNX TestKit instance."""
|
2021-11-20 10:30:41 +00:00
|
|
|
knx_test_kit = KNXTestKit(hass, mock_config_entry)
|
2021-07-20 04:39:19 +00:00
|
|
|
yield knx_test_kit
|
|
|
|
await knx_test_kit.assert_no_telegram()
|