core/tests/components/zha/test_cover.py

134 lines
4.3 KiB
Python
Raw Normal View History

"""Test zha cover."""
from unittest.mock import MagicMock, call, patch
import asynctest
import pytest
import zigpy.types
import zigpy.zcl.clusters.closures as closures
import zigpy.zcl.foundation as zcl_f
from homeassistant.components.cover import DOMAIN
from homeassistant.const import STATE_CLOSED, STATE_OPEN, STATE_UNAVAILABLE
from .common import (
async_enable_traffic,
async_test_rejoin,
find_entity_id,
send_attributes_report,
)
from tests.common import mock_coro
@pytest.fixture
def zigpy_cover_device(zigpy_device_mock):
"""Zigpy cover device."""
endpoints = {
1: {
"device_type": 1026,
"in_clusters": [closures.WindowCovering.cluster_id],
"out_clusters": [],
}
}
return zigpy_device_mock(endpoints)
@asynctest.patch(
"homeassistant.components.zha.core.channels.closures.WindowCovering.async_initialize"
)
async def test_cover(m1, hass, zha_device_joined_restored, zigpy_cover_device):
"""Test zha cover platform."""
async def get_chan_attr(*args, **kwargs):
return 100
with patch(
ZHA device channel refactoring (#31971) * Add ZHA core typing helper. * Add aux_channels to ZHA rule matching. * Add match rule claim_channels() method. * Expose underlying zigpy device. * Not sure we need this one. * Move "base" channels. * Framework for channel discovery. * Make DEVICE_CLASS and REMOTE_DEVICE_TYPE default dicts. * Remove attribute reporting configuration registry. * Refactor channels. - Refactor zha events - Use compound IDs and unique_ids - Refactor signal dispatching on attribute updates * Use unique id compatible with entities unique ids. * Refactor ZHA Entity registry. Let match rule to check for the match. * Refactor discovery to use new channels. * Cleanup ZDO channel. Remove unused zha store call. * Handle channel configuration and initialization. * Refactor ZHA Device to use new channels. * Refactor ZHA Gateway to use new discovery framework. Use hass.data for entity info intermediate store. * Don't keep entities in hass.data. * ZHA gateway new discovery framework. * Refactor ZHA platform loading. * Don't update ZHA entities, when restoring from zigpy. * ZHA entity discover tests. * Add AnalogInput sensor. * Remove 0xFC02 based entity from Keen smart vents. * Clean up IAS channels. * Refactor entity restoration. * Fix lumi.router entities name. * Rename EndpointsChannel to ChannelPool. * Make Channels.pools a list. * Fix cover test. * Fix FakeDevice class. * Fix device actions. * Fix channels typing. * Revert update_before_add=False * Refactor channel class matching. * Use a helper function for adding entities. * Make Pylint happy. * Rebase cleanup. * Update coverage for ZHA device type overrides. * Use cluster_id for single output cluster registry. * Remove ZHA typing from coverage. * Fix tests. * Address comments. * Address comments.
2020-02-21 23:06:57 +00:00
"homeassistant.components.zha.core.channels.base.ZigbeeChannel.get_attribute_value",
new=MagicMock(side_effect=get_chan_attr),
) as get_attr_mock:
# load up cover domain
zha_device = await zha_device_joined_restored(zigpy_cover_device)
assert get_attr_mock.call_count == 2
assert get_attr_mock.call_args[0][0] == "current_position_lift_percentage"
cluster = zigpy_cover_device.endpoints.get(1).window_covering
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
assert entity_id is not None
# test that the cover was created and that it is unavailable
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, [zha_device])
await hass.async_block_till_done()
# test that the state has changed from unavailable to off
await send_attributes_report(hass, cluster, {0: 0, 8: 100, 1: 1})
assert hass.states.get(entity_id).state == STATE_CLOSED
# test to see if it opens
await send_attributes_report(hass, cluster, {0: 1, 8: 0, 1: 100})
assert hass.states.get(entity_id).state == STATE_OPEN
# close from UI
with patch(
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x1, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN, "close_cover", {"entity_id": entity_id}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.call_args == call(
False, 0x1, (), expect_reply=True, manufacturer=None, tsn=None
)
# open from UI
with patch(
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x0, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN, "open_cover", {"entity_id": entity_id}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.call_args == call(
False, 0x0, (), expect_reply=True, manufacturer=None, tsn=None
)
# set position UI
with patch(
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x5, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN,
"set_cover_position",
{"entity_id": entity_id, "position": 47},
blocking=True,
)
assert cluster.request.call_count == 1
assert cluster.request.call_args == call(
False,
0x5,
(zigpy.types.uint8_t,),
53,
expect_reply=True,
manufacturer=None,
tsn=None,
)
# stop from UI
with patch(
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x2, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN, "stop_cover", {"entity_id": entity_id}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.call_args == call(
False, 0x2, (), expect_reply=True, manufacturer=None, tsn=None
)
# test rejoin
await async_test_rejoin(hass, zigpy_cover_device, [cluster], (1,))
assert hass.states.get(entity_id).state == STATE_OPEN