Refactor ZHA tests to allow attribute reads during device initialization (#43357)
* Allow plugging zigpy attribute reads in tests * Migrate ZHA tests to use new patched attribute reads * Remove logging in testspull/43370/head
parent
a1306059af
commit
f1693e2433
tests/components/zha
|
@ -70,12 +70,34 @@ FakeEndpoint.remove_from_group = zigpy_ep.remove_from_group
|
|||
|
||||
def patch_cluster(cluster):
|
||||
"""Patch a cluster for testing."""
|
||||
cluster.PLUGGED_ATTR_READS = {}
|
||||
|
||||
async def _read_attribute_raw(attributes, *args, **kwargs):
|
||||
result = []
|
||||
for attr_id in attributes:
|
||||
value = cluster.PLUGGED_ATTR_READS.get(attr_id)
|
||||
if value is None:
|
||||
# try converting attr_id to attr_name and lookup the plugs again
|
||||
attr_name = cluster.attributes.get(attr_id)
|
||||
value = attr_name and cluster.PLUGGED_ATTR_READS.get(attr_name[0])
|
||||
if value is not None:
|
||||
result.append(
|
||||
zcl_f.ReadAttributeRecord(
|
||||
attr_id,
|
||||
zcl_f.Status.SUCCESS,
|
||||
zcl_f.TypeValue(python_type=None, value=value),
|
||||
)
|
||||
)
|
||||
else:
|
||||
result.append(zcl_f.ReadAttributeRecord(attr_id, zcl_f.Status.FAILURE))
|
||||
return (result,)
|
||||
|
||||
cluster.bind = AsyncMock(return_value=[0])
|
||||
cluster.configure_reporting = AsyncMock(return_value=[0])
|
||||
cluster.deserialize = Mock()
|
||||
cluster.handle_cluster_request = Mock()
|
||||
cluster.read_attributes = AsyncMock(return_value=[{}, {}])
|
||||
cluster.read_attributes_raw = Mock()
|
||||
cluster.read_attributes = AsyncMock(wraps=cluster.read_attributes)
|
||||
cluster.read_attributes_raw = AsyncMock(side_effect=_read_attribute_raw)
|
||||
cluster.unbind = AsyncMock(return_value=[0])
|
||||
cluster.write_attributes = AsyncMock(
|
||||
return_value=[zcl_f.WriteAttributesResponse.deserialize(b"\x00")[0]]
|
||||
|
|
|
@ -140,19 +140,8 @@ def device_climate_mock(hass, zigpy_device_mock, zha_device_joined):
|
|||
else:
|
||||
plugged_attrs = {**ZCL_ATTR_PLUG, **plug}
|
||||
|
||||
async def _read_attr(attrs, *args, **kwargs):
|
||||
res = {}
|
||||
failed = {}
|
||||
|
||||
for attr in attrs:
|
||||
if attr in plugged_attrs:
|
||||
res[attr] = plugged_attrs[attr]
|
||||
else:
|
||||
failed[attr] = zcl_f.Status.UNSUPPORTED_ATTRIBUTE
|
||||
return res, failed
|
||||
|
||||
zigpy_device = zigpy_device_mock(clusters, manufacturer=manuf)
|
||||
zigpy_device.endpoints[1].thermostat.read_attributes.side_effect = _read_attr
|
||||
zigpy_device.endpoints[1].thermostat.PLUGGED_ATTR_READS = plugged_attrs
|
||||
zha_device = await zha_device_joined(zigpy_device)
|
||||
await async_enable_traffic(hass, [zha_device])
|
||||
await hass.async_block_till_done()
|
||||
|
@ -1039,7 +1028,6 @@ async def test_occupancy_reset(hass, device_climate_sinope):
|
|||
state = hass.states.get(entity_id)
|
||||
assert state.attributes[ATTR_PRESET_MODE] == PRESET_AWAY
|
||||
|
||||
thrm_cluster.read_attributes.return_value = [True], {}
|
||||
await send_attributes_report(
|
||||
hass, thrm_cluster, {"occupied_heating_setpoint": 1950}
|
||||
)
|
||||
|
|
|
@ -32,7 +32,7 @@ from .common import (
|
|||
send_attributes_report,
|
||||
)
|
||||
|
||||
from tests.async_mock import AsyncMock, MagicMock, patch
|
||||
from tests.async_mock import AsyncMock, patch
|
||||
from tests.common import async_capture_events, mock_coro, mock_restore_cache
|
||||
|
||||
|
||||
|
@ -104,19 +104,13 @@ def zigpy_keen_vent(zigpy_device_mock):
|
|||
async def test_cover(m1, hass, zha_device_joined_restored, zigpy_cover_device):
|
||||
"""Test zha cover platform."""
|
||||
|
||||
async def get_chan_attr(*args, **kwargs):
|
||||
return 100
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.zha.core.channels.base.ZigbeeChannel.get_attribute_value",
|
||||
new=MagicMock(side_effect=get_chan_attr),
|
||||
) as get_attr_mock:
|
||||
# load up cover domain
|
||||
zha_device = await zha_device_joined_restored(zigpy_cover_device)
|
||||
assert get_attr_mock.call_count == 2
|
||||
assert get_attr_mock.call_args[0][0] == "current_position_lift_percentage"
|
||||
|
||||
# load up cover domain
|
||||
cluster = zigpy_cover_device.endpoints.get(1).window_covering
|
||||
cluster.PLUGGED_ATTR_READS = {"current_position_lift_percentage": 100}
|
||||
zha_device = await zha_device_joined_restored(zigpy_cover_device)
|
||||
assert cluster.read_attributes.call_count == 2
|
||||
assert "current_position_lift_percentage" in cluster.read_attributes.call_args[0][0]
|
||||
|
||||
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
|
||||
assert entity_id is not None
|
||||
|
||||
|
|
Loading…
Reference in New Issue