core/tests/components/zha/test_cover.py

426 lines
15 KiB
Python
Raw Normal View History

"""Test zha cover."""
import asyncio
2021-01-01 21:31:56 +00:00
from unittest.mock import AsyncMock, patch
import pytest
import zigpy.profiles.zha
import zigpy.types
import zigpy.zcl.clusters.closures as closures
import zigpy.zcl.clusters.general as general
import zigpy.zcl.foundation as zcl_f
from homeassistant.components.cover import (
ATTR_CURRENT_POSITION,
DOMAIN,
SERVICE_CLOSE_COVER,
SERVICE_OPEN_COVER,
SERVICE_SET_COVER_POSITION,
SERVICE_STOP_COVER,
)
from homeassistant.const import (
ATTR_COMMAND,
STATE_CLOSED,
STATE_OPEN,
STATE_UNAVAILABLE,
)
from homeassistant.core import CoreState, State
from .common import (
async_enable_traffic,
async_test_rejoin,
find_entity_id,
make_zcl_header,
send_attributes_report,
)
from tests.common import async_capture_events, mock_coro, mock_restore_cache
@pytest.fixture
def zigpy_cover_device(zigpy_device_mock):
"""Zigpy cover device."""
endpoints = {
1: {
"device_type": zigpy.profiles.zha.DeviceType.IAS_ZONE,
"in_clusters": [closures.WindowCovering.cluster_id],
"out_clusters": [],
}
}
return zigpy_device_mock(endpoints)
@pytest.fixture
def zigpy_cover_remote(zigpy_device_mock):
"""Zigpy cover remote device."""
endpoints = {
1: {
"device_type": zigpy.profiles.zha.DeviceType.WINDOW_COVERING_CONTROLLER,
"in_clusters": [],
"out_clusters": [closures.WindowCovering.cluster_id],
}
}
return zigpy_device_mock(endpoints)
@pytest.fixture
def zigpy_shade_device(zigpy_device_mock):
"""Zigpy shade device."""
endpoints = {
1: {
"device_type": zigpy.profiles.zha.DeviceType.SHADE,
"in_clusters": [
closures.Shade.cluster_id,
general.LevelControl.cluster_id,
general.OnOff.cluster_id,
],
"out_clusters": [],
}
}
return zigpy_device_mock(endpoints)
@pytest.fixture
def zigpy_keen_vent(zigpy_device_mock):
"""Zigpy Keen Vent device."""
endpoints = {
1: {
"device_type": zigpy.profiles.zha.DeviceType.LEVEL_CONTROLLABLE_OUTPUT,
"in_clusters": [general.LevelControl.cluster_id, general.OnOff.cluster_id],
"out_clusters": [],
}
}
return zigpy_device_mock(
endpoints, manufacturer="Keen Home Inc", model="SV02-612-MP-1.3"
)
@patch(
"homeassistant.components.zha.core.channels.closures.WindowCovering.async_initialize"
)
async def test_cover(m1, hass, zha_device_joined_restored, zigpy_cover_device):
"""Test zha cover platform."""
# load up cover domain
cluster = zigpy_cover_device.endpoints.get(1).window_covering
cluster.PLUGGED_ATTR_READS = {"current_position_lift_percentage": 100}
zha_device = await zha_device_joined_restored(zigpy_cover_device)
assert cluster.read_attributes.call_count == 2
assert "current_position_lift_percentage" in cluster.read_attributes.call_args[0][0]
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
assert entity_id is not None
await async_enable_traffic(hass, [zha_device], enabled=False)
# test that the cover was created and that it is unavailable
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, [zha_device])
await hass.async_block_till_done()
# test that the state has changed from unavailable to off
await send_attributes_report(hass, cluster, {0: 0, 8: 100, 1: 1})
assert hass.states.get(entity_id).state == STATE_CLOSED
# test to see if it opens
await send_attributes_report(hass, cluster, {0: 1, 8: 0, 1: 100})
assert hass.states.get(entity_id).state == STATE_OPEN
# close from UI
with patch(
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x1, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN, SERVICE_CLOSE_COVER, {"entity_id": entity_id}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x01
assert cluster.request.call_args[0][2] == ()
assert cluster.request.call_args[1]["expect_reply"] is True
# open from UI
with patch(
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x0, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN, SERVICE_OPEN_COVER, {"entity_id": entity_id}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x00
assert cluster.request.call_args[0][2] == ()
assert cluster.request.call_args[1]["expect_reply"] is True
# set position UI
with patch(
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x5, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN,
SERVICE_SET_COVER_POSITION,
{"entity_id": entity_id, "position": 47},
blocking=True,
)
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x05
assert cluster.request.call_args[0][2] == (zigpy.types.uint8_t,)
assert cluster.request.call_args[0][3] == 53
assert cluster.request.call_args[1]["expect_reply"] is True
# stop from UI
with patch(
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x2, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN, SERVICE_STOP_COVER, {"entity_id": entity_id}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x02
assert cluster.request.call_args[0][2] == ()
assert cluster.request.call_args[1]["expect_reply"] is True
# test rejoin
await async_test_rejoin(hass, zigpy_cover_device, [cluster], (1,))
assert hass.states.get(entity_id).state == STATE_OPEN
async def test_shade(hass, zha_device_joined_restored, zigpy_shade_device):
"""Test zha cover platform for shade device type."""
# load up cover domain
zha_device = await zha_device_joined_restored(zigpy_shade_device)
cluster_on_off = zigpy_shade_device.endpoints.get(1).on_off
cluster_level = zigpy_shade_device.endpoints.get(1).level
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
assert entity_id is not None
await async_enable_traffic(hass, [zha_device], enabled=False)
# test that the cover was created and that it is unavailable
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, [zha_device])
await hass.async_block_till_done()
# test that the state has changed from unavailable to off
await send_attributes_report(hass, cluster_on_off, {8: 0, 0: False, 1: 1})
assert hass.states.get(entity_id).state == STATE_CLOSED
# test to see if it opens
await send_attributes_report(hass, cluster_on_off, {8: 0, 0: True, 1: 1})
assert hass.states.get(entity_id).state == STATE_OPEN
# close from UI command fails
with patch("zigpy.zcl.Cluster.request", side_effect=asyncio.TimeoutError):
await hass.services.async_call(
DOMAIN, SERVICE_CLOSE_COVER, {"entity_id": entity_id}, blocking=True
)
assert cluster_on_off.request.call_count == 1
assert cluster_on_off.request.call_args[0][0] is False
assert cluster_on_off.request.call_args[0][1] == 0x0000
assert hass.states.get(entity_id).state == STATE_OPEN
with patch(
"zigpy.zcl.Cluster.request", AsyncMock(return_value=[0x1, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN, SERVICE_CLOSE_COVER, {"entity_id": entity_id}, blocking=True
)
assert cluster_on_off.request.call_count == 1
assert cluster_on_off.request.call_args[0][0] is False
assert cluster_on_off.request.call_args[0][1] == 0x0000
assert hass.states.get(entity_id).state == STATE_CLOSED
# open from UI command fails
assert ATTR_CURRENT_POSITION not in hass.states.get(entity_id).attributes
await send_attributes_report(hass, cluster_level, {0: 0})
with patch("zigpy.zcl.Cluster.request", side_effect=asyncio.TimeoutError):
await hass.services.async_call(
DOMAIN, SERVICE_OPEN_COVER, {"entity_id": entity_id}, blocking=True
)
assert cluster_on_off.request.call_count == 1
assert cluster_on_off.request.call_args[0][0] is False
assert cluster_on_off.request.call_args[0][1] == 0x0001
assert hass.states.get(entity_id).state == STATE_CLOSED
# open from UI succeeds
with patch(
"zigpy.zcl.Cluster.request", AsyncMock(return_value=[0x0, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN, SERVICE_OPEN_COVER, {"entity_id": entity_id}, blocking=True
)
assert cluster_on_off.request.call_count == 1
assert cluster_on_off.request.call_args[0][0] is False
assert cluster_on_off.request.call_args[0][1] == 0x0001
assert hass.states.get(entity_id).state == STATE_OPEN
# set position UI command fails
with patch("zigpy.zcl.Cluster.request", side_effect=asyncio.TimeoutError):
await hass.services.async_call(
DOMAIN,
SERVICE_SET_COVER_POSITION,
{"entity_id": entity_id, "position": 47},
blocking=True,
)
assert cluster_level.request.call_count == 1
assert cluster_level.request.call_args[0][0] is False
assert cluster_level.request.call_args[0][1] == 0x0004
assert int(cluster_level.request.call_args[0][3] * 100 / 255) == 47
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 0
# set position UI success
with patch(
"zigpy.zcl.Cluster.request", AsyncMock(return_value=[0x5, zcl_f.Status.SUCCESS])
):
await hass.services.async_call(
DOMAIN,
SERVICE_SET_COVER_POSITION,
{"entity_id": entity_id, "position": 47},
blocking=True,
)
assert cluster_level.request.call_count == 1
assert cluster_level.request.call_args[0][0] is False
assert cluster_level.request.call_args[0][1] == 0x0004
assert int(cluster_level.request.call_args[0][3] * 100 / 255) == 47
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 47
# report position change
await send_attributes_report(hass, cluster_level, {8: 0, 0: 100, 1: 1})
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == int(
100 * 100 / 255
)
# test rejoin
await async_test_rejoin(
hass, zigpy_shade_device, [cluster_level, cluster_on_off], (1,)
)
assert hass.states.get(entity_id).state == STATE_OPEN
# test cover stop
with patch("zigpy.zcl.Cluster.request", side_effect=asyncio.TimeoutError):
await hass.services.async_call(
2020-08-27 11:56:20 +00:00
DOMAIN,
SERVICE_STOP_COVER,
{"entity_id": entity_id},
blocking=True,
)
assert cluster_level.request.call_count == 1
assert cluster_level.request.call_args[0][0] is False
assert cluster_level.request.call_args[0][1] in (0x0003, 0x0007)
async def test_restore_state(hass, zha_device_restored, zigpy_shade_device):
"""Ensure states are restored on startup."""
mock_restore_cache(
hass,
(
State(
"cover.fakemanufacturer_fakemodel_e769900a_level_on_off_shade",
STATE_OPEN,
{ATTR_CURRENT_POSITION: 50},
),
),
)
hass.state = CoreState.starting
zha_device = await zha_device_restored(zigpy_shade_device)
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
assert entity_id is not None
# test that the cover was created and that it is unavailable
assert hass.states.get(entity_id).state == STATE_OPEN
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 50
async def test_keen_vent(hass, zha_device_joined_restored, zigpy_keen_vent):
"""Test keen vent."""
# load up cover domain
zha_device = await zha_device_joined_restored(zigpy_keen_vent)
cluster_on_off = zigpy_keen_vent.endpoints.get(1).on_off
cluster_level = zigpy_keen_vent.endpoints.get(1).level
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
assert entity_id is not None
await async_enable_traffic(hass, [zha_device], enabled=False)
# test that the cover was created and that it is unavailable
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, [zha_device])
await hass.async_block_till_done()
# test that the state has changed from unavailable to off
await send_attributes_report(hass, cluster_on_off, {8: 0, 0: False, 1: 1})
assert hass.states.get(entity_id).state == STATE_CLOSED
# open from UI command fails
p1 = patch.object(cluster_on_off, "request", side_effect=asyncio.TimeoutError)
p2 = patch.object(cluster_level, "request", AsyncMock(return_value=[4, 0]))
with p1, p2:
await hass.services.async_call(
DOMAIN, SERVICE_OPEN_COVER, {"entity_id": entity_id}, blocking=True
)
assert cluster_on_off.request.call_count == 1
assert cluster_on_off.request.call_args[0][0] is False
assert cluster_on_off.request.call_args[0][1] == 0x0001
assert cluster_level.request.call_count == 1
assert hass.states.get(entity_id).state == STATE_CLOSED
# open from UI command success
p1 = patch.object(cluster_on_off, "request", AsyncMock(return_value=[1, 0]))
p2 = patch.object(cluster_level, "request", AsyncMock(return_value=[4, 0]))
with p1, p2:
await hass.services.async_call(
DOMAIN, SERVICE_OPEN_COVER, {"entity_id": entity_id}, blocking=True
)
await asyncio.sleep(0)
assert cluster_on_off.request.call_count == 1
assert cluster_on_off.request.call_args[0][0] is False
assert cluster_on_off.request.call_args[0][1] == 0x0001
assert cluster_level.request.call_count == 1
assert hass.states.get(entity_id).state == STATE_OPEN
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 100
async def test_cover_remote(hass, zha_device_joined_restored, zigpy_cover_remote):
"""Test zha cover remote."""
# load up cover domain
await zha_device_joined_restored(zigpy_cover_remote)
cluster = zigpy_cover_remote.endpoints[1].out_clusters[
closures.WindowCovering.cluster_id
]
zha_events = async_capture_events(hass, "zha_event")
# up command
hdr = make_zcl_header(0, global_command=False)
cluster.handle_message(hdr, [])
await hass.async_block_till_done()
assert len(zha_events) == 1
assert zha_events[0].data[ATTR_COMMAND] == "up_open"
# down command
hdr = make_zcl_header(1, global_command=False)
cluster.handle_message(hdr, [])
await hass.async_block_till_done()
assert len(zha_events) == 2
assert zha_events[1].data[ATTR_COMMAND] == "down_close"