Fix attribute reporting config failures in ZHA (#91403)
parent
adc8a13f93
commit
a9db39a833
|
@ -49,8 +49,8 @@ _LOGGER = logging.getLogger(__name__)
|
||||||
class AttrReportConfig(TypedDict, total=True):
|
class AttrReportConfig(TypedDict, total=True):
|
||||||
"""Configuration to report for the attributes."""
|
"""Configuration to report for the attributes."""
|
||||||
|
|
||||||
# Could be either an attribute name or attribute id
|
# An attribute name
|
||||||
attr: str | int
|
attr: str
|
||||||
# The config for the attribute reporting configuration consists of a tuple for
|
# The config for the attribute reporting configuration consists of a tuple for
|
||||||
# (minimum_reported_time_interval_s, maximum_reported_time_interval_s, value_delta)
|
# (minimum_reported_time_interval_s, maximum_reported_time_interval_s, value_delta)
|
||||||
config: tuple[int, int, int | float]
|
config: tuple[int, int, int | float]
|
||||||
|
@ -130,15 +130,13 @@ class ZigbeeChannel(LogMixin):
|
||||||
unique_id = ch_pool.unique_id.replace("-", ":")
|
unique_id = ch_pool.unique_id.replace("-", ":")
|
||||||
self._unique_id = f"{unique_id}:0x{cluster.cluster_id:04x}"
|
self._unique_id = f"{unique_id}:0x{cluster.cluster_id:04x}"
|
||||||
if not hasattr(self, "_value_attribute") and self.REPORT_CONFIG:
|
if not hasattr(self, "_value_attribute") and self.REPORT_CONFIG:
|
||||||
attr = self.REPORT_CONFIG[0].get("attr")
|
attr_def: ZCLAttributeDef | None = self.cluster.attributes_by_name.get(
|
||||||
if isinstance(attr, str):
|
self.REPORT_CONFIG[0]["attr"]
|
||||||
attribute: ZCLAttributeDef = self.cluster.attributes_by_name.get(attr)
|
)
|
||||||
if attribute is not None:
|
if attr_def is not None:
|
||||||
self.value_attribute = attribute.id
|
self.value_attribute = attr_def.id
|
||||||
else:
|
else:
|
||||||
self.value_attribute = None
|
self.value_attribute = None
|
||||||
else:
|
|
||||||
self.value_attribute = attr
|
|
||||||
self._status = ChannelStatus.CREATED
|
self._status = ChannelStatus.CREATED
|
||||||
self._cluster.add_listener(self)
|
self._cluster.add_listener(self)
|
||||||
self.data_cache: dict[str, Enum] = {}
|
self.data_cache: dict[str, Enum] = {}
|
||||||
|
@ -233,7 +231,12 @@ class ZigbeeChannel(LogMixin):
|
||||||
|
|
||||||
for attr_report in self.REPORT_CONFIG:
|
for attr_report in self.REPORT_CONFIG:
|
||||||
attr, config = attr_report["attr"], attr_report["config"]
|
attr, config = attr_report["attr"], attr_report["config"]
|
||||||
attr_name = self.cluster.attributes.get(attr, [attr])[0]
|
|
||||||
|
try:
|
||||||
|
attr_name = self.cluster.find_attribute(attr).name
|
||||||
|
except KeyError:
|
||||||
|
attr_name = attr
|
||||||
|
|
||||||
event_data[attr_name] = {
|
event_data[attr_name] = {
|
||||||
"min": config[0],
|
"min": config[0],
|
||||||
"max": config[1],
|
"max": config[1],
|
||||||
|
@ -282,7 +285,7 @@ class ZigbeeChannel(LogMixin):
|
||||||
)
|
)
|
||||||
|
|
||||||
def _configure_reporting_status(
|
def _configure_reporting_status(
|
||||||
self, attrs: dict[int | str, tuple[int, int, float | int]], res: list | tuple
|
self, attrs: dict[str, tuple[int, int, float | int]], res: list | tuple
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Parse configure reporting result."""
|
"""Parse configure reporting result."""
|
||||||
if isinstance(res, (Exception, ConfigureReportingResponseRecord)):
|
if isinstance(res, (Exception, ConfigureReportingResponseRecord)):
|
||||||
|
@ -304,14 +307,14 @@ class ZigbeeChannel(LogMixin):
|
||||||
return
|
return
|
||||||
|
|
||||||
failed = [
|
failed = [
|
||||||
self.cluster.attributes.get(r.attrid, [r.attrid])[0]
|
self.cluster.find_attribute(record.attrid).name
|
||||||
for r in res
|
for record in res
|
||||||
if r.status != Status.SUCCESS
|
if record.status != Status.SUCCESS
|
||||||
]
|
]
|
||||||
attributes = {self.cluster.attributes.get(r, [r])[0] for r in attrs}
|
|
||||||
self.debug(
|
self.debug(
|
||||||
"Successfully configured reporting for '%s' on '%s' cluster",
|
"Successfully configured reporting for '%s' on '%s' cluster",
|
||||||
attributes - set(failed),
|
set(attrs) - set(failed),
|
||||||
self.name,
|
self.name,
|
||||||
)
|
)
|
||||||
self.debug(
|
self.debug(
|
||||||
|
|
|
@ -5,9 +5,12 @@ from unittest import mock
|
||||||
from unittest.mock import AsyncMock, patch
|
from unittest.mock import AsyncMock, patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
import zigpy.endpoint
|
||||||
import zigpy.profiles.zha
|
import zigpy.profiles.zha
|
||||||
import zigpy.types as t
|
import zigpy.types as t
|
||||||
|
from zigpy.zcl import foundation
|
||||||
import zigpy.zcl.clusters
|
import zigpy.zcl.clusters
|
||||||
|
import zigpy.zdo.types as zdo_t
|
||||||
|
|
||||||
import homeassistant.components.zha.core.channels as zha_channels
|
import homeassistant.components.zha.core.channels as zha_channels
|
||||||
import homeassistant.components.zha.core.channels.base as base_channels
|
import homeassistant.components.zha.core.channels.base as base_channels
|
||||||
|
@ -726,3 +729,56 @@ async def test_cluster_no_ep_attribute(m1, zha_device_mock) -> None:
|
||||||
pools = {pool.id: pool for pool in channels.pools}
|
pools = {pool.id: pool for pool in channels.pools}
|
||||||
assert "1:0x042e" in pools[1].all_channels
|
assert "1:0x042e" in pools[1].all_channels
|
||||||
assert pools[1].all_channels["1:0x042e"].name
|
assert pools[1].all_channels["1:0x042e"].name
|
||||||
|
|
||||||
|
|
||||||
|
async def test_configure_reporting(hass: HomeAssistant) -> None:
|
||||||
|
"""Test setting up a channel and configuring attribute reporting in two batches."""
|
||||||
|
|
||||||
|
class TestZigbeeChannel(base_channels.ZigbeeChannel):
|
||||||
|
BIND = True
|
||||||
|
REPORT_CONFIG = (
|
||||||
|
# By name
|
||||||
|
base_channels.AttrReportConfig(attr="current_x", config=(1, 60, 1)),
|
||||||
|
base_channels.AttrReportConfig(attr="current_hue", config=(1, 60, 2)),
|
||||||
|
base_channels.AttrReportConfig(attr="color_temperature", config=(1, 60, 3)),
|
||||||
|
base_channels.AttrReportConfig(attr="current_y", config=(1, 60, 4)),
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_ep = mock.AsyncMock(spec_set=zigpy.endpoint.Endpoint)
|
||||||
|
mock_ep.device.zdo = AsyncMock()
|
||||||
|
|
||||||
|
cluster = zigpy.zcl.clusters.lighting.Color(mock_ep)
|
||||||
|
cluster.bind = AsyncMock(
|
||||||
|
spec_set=cluster.bind,
|
||||||
|
return_value=[zdo_t.Status.SUCCESS], # ZDOCmd.Bind_rsp
|
||||||
|
)
|
||||||
|
cluster.configure_reporting_multiple = AsyncMock(
|
||||||
|
spec_set=cluster.configure_reporting_multiple,
|
||||||
|
return_value=[
|
||||||
|
foundation.ConfigureReportingResponseRecord(
|
||||||
|
status=foundation.Status.SUCCESS
|
||||||
|
)
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
ch_pool = mock.AsyncMock(spec_set=zha_channels.ChannelPool)
|
||||||
|
ch_pool.skip_configuration = False
|
||||||
|
|
||||||
|
channel = TestZigbeeChannel(cluster, ch_pool)
|
||||||
|
await channel.async_configure()
|
||||||
|
|
||||||
|
# Since we request reporting for five attributes, we need to make two calls (3 + 1)
|
||||||
|
assert cluster.configure_reporting_multiple.mock_calls == [
|
||||||
|
mock.call(
|
||||||
|
{
|
||||||
|
"current_x": (1, 60, 1),
|
||||||
|
"current_hue": (1, 60, 2),
|
||||||
|
"color_temperature": (1, 60, 3),
|
||||||
|
}
|
||||||
|
),
|
||||||
|
mock.call(
|
||||||
|
{
|
||||||
|
"current_y": (1, 60, 4),
|
||||||
|
}
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
Loading…
Reference in New Issue