core/tests/components/knx/test_events.py

84 lines
2.7 KiB
Python

"""Test KNX events."""
from homeassistant.components.knx import CONF_KNX_EVENT_FILTER
from homeassistant.core import HomeAssistant
from .conftest import KNXTestKit
from tests.common import async_capture_events
async def test_knx_event(hass: HomeAssistant, knx: KNXTestKit):
"""Test `knx_event` event."""
test_group_a = "0/4/*"
test_address_a_1 = "0/4/0"
test_address_a_2 = "0/4/100"
test_group_b = "1/3-6/*"
test_address_b_1 = "1/3/0"
test_address_b_2 = "1/6/200"
test_group_c = "2/6/4,5"
test_address_c_1 = "2/6/4"
test_address_c_2 = "2/6/5"
test_address_d = "5/4/3"
events = async_capture_events(hass, "knx_event")
async def test_event_data(address, payload):
await hass.async_block_till_done()
assert len(events) == 1
event = events.pop()
assert event.data["data"] == payload
assert event.data["direction"] == "Incoming"
assert event.data["destination"] == address
if payload is None:
assert event.data["telegramtype"] == "GroupValueRead"
else:
assert event.data["telegramtype"] in (
"GroupValueWrite",
"GroupValueResponse",
)
assert event.data["source"] == KNXTestKit.INDIVIDUAL_ADDRESS
await knx.setup_integration(
{
CONF_KNX_EVENT_FILTER: [
test_group_a,
test_group_b,
test_group_c,
test_address_d,
]
}
)
# no event received
await hass.async_block_till_done()
assert len(events) == 0
# receive telegrams for group addresses matching the filter
await knx.receive_write(test_address_a_1, True)
await test_event_data(test_address_a_1, True)
await knx.receive_response(test_address_a_2, False)
await test_event_data(test_address_a_2, False)
await knx.receive_write(test_address_b_1, (1,))
await test_event_data(test_address_b_1, (1,))
await knx.receive_response(test_address_b_2, (255,))
await test_event_data(test_address_b_2, (255,))
await knx.receive_write(test_address_c_1, (89, 43, 34, 11))
await test_event_data(test_address_c_1, (89, 43, 34, 11))
await knx.receive_response(test_address_c_2, (255, 255, 255, 255))
await test_event_data(test_address_c_2, (255, 255, 255, 255))
await knx.receive_read(test_address_d)
await test_event_data(test_address_d, None)
# receive telegrams for group addresses not matching the filter
await knx.receive_write("0/5/0", True)
await knx.receive_write("1/7/0", True)
await knx.receive_write("2/6/6", True)
await hass.async_block_till_done()
assert len(events) == 0