Add ZHA cover tilt (#102072)

* cover tilt reimplementation

* rework tilt test

* Fix ZHA cover tests

* Match ZHA cover tilt code-style with the rest

* Increase coverage for ZHA cover, optimize update

---------

Co-authored-by: josef109 <josefglaze@gmail.com>
pull/102735/head
Tomáš Bedřich 2023-10-24 21:40:41 +02:00 committed by GitHub
parent 4febb2e1d3
commit d25b4aae14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 270 additions and 34 deletions

View File

@ -124,11 +124,19 @@ class WindowCoveringClient(ClientClusterHandler):
class WindowCovering(ClusterHandler):
"""Window cluster handler."""
_value_attribute = 8
_value_attribute_lift = (
closures.WindowCovering.AttributeDefs.current_position_lift_percentage.id
)
_value_attribute_tilt = (
closures.WindowCovering.AttributeDefs.current_position_tilt_percentage.id
)
REPORT_CONFIG = (
AttrReportConfig(
attr="current_position_lift_percentage", config=REPORT_CONFIG_IMMEDIATE
),
AttrReportConfig(
attr="current_position_tilt_percentage", config=REPORT_CONFIG_IMMEDIATE
),
)
async def async_update(self):
@ -140,10 +148,21 @@ class WindowCovering(ClusterHandler):
if result is not None:
self.async_send_signal(
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}",
8,
self._value_attribute_lift,
"current_position_lift_percentage",
result,
)
result = await self.get_attribute_value(
"current_position_tilt_percentage", from_cache=False
)
self.debug("read current tilt position: %s", result)
if result is not None:
self.async_send_signal(
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}",
self._value_attribute_tilt,
"current_position_tilt_percentage",
result,
)
@callback
def attribute_updated(self, attrid: int, value: Any, _: Any) -> None:
@ -152,7 +171,7 @@ class WindowCovering(ClusterHandler):
self.debug(
"Attribute report '%s'[%s] = %s", self.cluster.name, attr_name, value
)
if attrid == self._value_attribute:
if attrid in (self._value_attribute_lift, self._value_attribute_tilt):
self.async_send_signal(
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", attrid, attr_name, value
)

View File

@ -11,6 +11,7 @@ from zigpy.zcl.foundation import Status
from homeassistant.components.cover import (
ATTR_CURRENT_POSITION,
ATTR_POSITION,
ATTR_TILT_POSITION,
CoverDeviceClass,
CoverEntity,
)
@ -80,6 +81,7 @@ class ZhaCover(ZhaEntity, CoverEntity):
super().__init__(unique_id, zha_device, cluster_handlers, **kwargs)
self._cover_cluster_handler = self.cluster_handlers.get(CLUSTER_HANDLER_COVER)
self._current_position = None
self._tilt_position = None
async def async_added_to_hass(self) -> None:
"""Run when about to be added to hass."""
@ -94,6 +96,10 @@ class ZhaCover(ZhaEntity, CoverEntity):
self._state = last_state.state
if "current_position" in last_state.attributes:
self._current_position = last_state.attributes["current_position"]
if "current_tilt_position" in last_state.attributes:
self._tilt_position = last_state.attributes[
"current_tilt_position"
] # first allocation activate tilt
@property
def is_closed(self) -> bool | None:
@ -120,11 +126,20 @@ class ZhaCover(ZhaEntity, CoverEntity):
"""
return self._current_position
@property
def current_cover_tilt_position(self) -> int | None:
"""Return the current tilt position of the cover."""
return self._tilt_position
@callback
def async_set_position(self, attr_id, attr_name, value):
"""Handle position update from cluster handler."""
_LOGGER.debug("setting position: %s", value)
self._current_position = 100 - value
_LOGGER.debug("setting position: %s %s %s", attr_id, attr_name, value)
if attr_name == "current_position_lift_percentage":
self._current_position = 100 - value
elif attr_name == "current_position_tilt_percentage":
self._tilt_position = 100 - value
if self._current_position == 0:
self._state = STATE_CLOSED
elif self._current_position == 100:
@ -145,6 +160,13 @@ class ZhaCover(ZhaEntity, CoverEntity):
raise HomeAssistantError(f"Failed to open cover: {res[1]}")
self.async_update_state(STATE_OPENING)
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
"""Open the cover tilt."""
res = await self._cover_cluster_handler.go_to_tilt_percentage(0)
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to open cover tilt: {res[1]}")
self.async_update_state(STATE_OPENING)
async def async_close_cover(self, **kwargs: Any) -> None:
"""Close the window cover."""
res = await self._cover_cluster_handler.down_close()
@ -152,6 +174,13 @@ class ZhaCover(ZhaEntity, CoverEntity):
raise HomeAssistantError(f"Failed to close cover: {res[1]}")
self.async_update_state(STATE_CLOSING)
async def async_close_cover_tilt(self, **kwargs: Any) -> None:
"""Close the cover tilt."""
res = await self._cover_cluster_handler.go_to_tilt_percentage(100)
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to close cover tilt: {res[1]}")
self.async_update_state(STATE_CLOSING)
async def async_set_cover_position(self, **kwargs: Any) -> None:
"""Move the roller shutter to a specific position."""
new_pos = kwargs[ATTR_POSITION]
@ -162,6 +191,16 @@ class ZhaCover(ZhaEntity, CoverEntity):
STATE_CLOSING if new_pos < self._current_position else STATE_OPENING
)
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
"""Move the cover til to a specific position."""
new_pos = kwargs[ATTR_TILT_POSITION]
res = await self._cover_cluster_handler.go_to_tilt_percentage(100 - new_pos)
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to set cover tilt position: {res[1]}")
self.async_update_state(
STATE_CLOSING if new_pos < self._tilt_position else STATE_OPENING
)
async def async_stop_cover(self, **kwargs: Any) -> None:
"""Stop the window cover."""
res = await self._cover_cluster_handler.stop()
@ -170,28 +209,9 @@ class ZhaCover(ZhaEntity, CoverEntity):
self._state = STATE_OPEN if self._current_position > 0 else STATE_CLOSED
self.async_write_ha_state()
async def async_update(self) -> None:
"""Attempt to retrieve the open/close state of the cover."""
await super().async_update()
await self.async_get_state()
async def async_get_state(self, from_cache=True):
"""Fetch the current state."""
_LOGGER.debug("polling current state")
if self._cover_cluster_handler:
pos = await self._cover_cluster_handler.get_attribute_value(
"current_position_lift_percentage", from_cache=from_cache
)
_LOGGER.debug("read pos=%s", pos)
if pos is not None:
self._current_position = 100 - pos
self._state = (
STATE_OPEN if self.current_cover_position > 0 else STATE_CLOSED
)
else:
self._current_position = None
self._state = None
async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
"""Stop the cover tilt."""
await self.async_stop_cover()
@MULTI_MATCH(

View File

@ -11,11 +11,18 @@ import zigpy.zcl.foundation as zcl_f
from homeassistant.components.cover import (
ATTR_CURRENT_POSITION,
ATTR_CURRENT_TILT_POSITION,
ATTR_TILT_POSITION,
DOMAIN as COVER_DOMAIN,
SERVICE_CLOSE_COVER,
SERVICE_CLOSE_COVER_TILT,
SERVICE_OPEN_COVER,
SERVICE_OPEN_COVER_TILT,
SERVICE_SET_COVER_POSITION,
SERVICE_SET_COVER_TILT_POSITION,
SERVICE_STOP_COVER,
SERVICE_STOP_COVER_TILT,
SERVICE_TOGGLE_COVER_TILT,
)
from homeassistant.components.zha.core.const import ZHA_EVENT
from homeassistant.const import (
@ -27,6 +34,7 @@ from homeassistant.const import (
)
from homeassistant.core import CoreState, HomeAssistant, State
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity_component import async_update_entity
from .common import (
async_enable_traffic,
@ -64,7 +72,7 @@ def zigpy_cover_device(zigpy_device_mock):
endpoints = {
1: {
SIG_EP_PROFILE: zigpy.profiles.zha.PROFILE_ID,
SIG_EP_TYPE: zigpy.profiles.zha.DeviceType.IAS_ZONE,
SIG_EP_TYPE: zigpy.profiles.zha.DeviceType.WINDOW_COVERING_DEVICE,
SIG_EP_INPUT: [closures.WindowCovering.cluster_id],
SIG_EP_OUTPUT: [],
}
@ -130,10 +138,14 @@ async def test_cover(
# load up cover domain
cluster = zigpy_cover_device.endpoints.get(1).window_covering
cluster.PLUGGED_ATTR_READS = {"current_position_lift_percentage": 100}
cluster.PLUGGED_ATTR_READS = {
"current_position_lift_percentage": 65,
"current_position_tilt_percentage": 42,
}
zha_device = await zha_device_joined_restored(zigpy_cover_device)
assert cluster.read_attributes.call_count == 1
assert "current_position_lift_percentage" in cluster.read_attributes.call_args[0][0]
assert "current_position_tilt_percentage" in cluster.read_attributes.call_args[0][0]
entity_id = find_entity_id(Platform.COVER, zha_device, hass)
assert entity_id is not None
@ -146,6 +158,16 @@ async def test_cover(
await async_enable_traffic(hass, [zha_device])
await hass.async_block_till_done()
# test update
prev_call_count = cluster.read_attributes.call_count
await async_update_entity(hass, entity_id)
assert cluster.read_attributes.call_count == prev_call_count + 2
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OPEN
assert state.attributes[ATTR_CURRENT_POSITION] == 35
assert state.attributes[ATTR_CURRENT_TILT_POSITION] == 58
# test that the state has changed from unavailable to off
await send_attributes_report(hass, cluster, {0: 0, 8: 100, 1: 1})
assert hass.states.get(entity_id).state == STATE_CLOSED
@ -154,6 +176,14 @@ async def test_cover(
await send_attributes_report(hass, cluster, {0: 1, 8: 0, 1: 100})
assert hass.states.get(entity_id).state == STATE_OPEN
# test that the state remains after tilting to 100%
await send_attributes_report(hass, cluster, {0: 0, 9: 100, 1: 1})
assert hass.states.get(entity_id).state == STATE_OPEN
# test to see the state remains after tilting to 0%
await send_attributes_report(hass, cluster, {0: 1, 9: 0, 1: 100})
assert hass.states.get(entity_id).state == STATE_OPEN
# close from UI
with patch("zigpy.zcl.Cluster.request", return_value=[0x1, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
@ -165,6 +195,20 @@ async def test_cover(
assert cluster.request.call_args[0][2].command.name == "down_close"
assert cluster.request.call_args[1]["expect_reply"] is True
with patch("zigpy.zcl.Cluster.request", return_value=[0x1, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_CLOSE_COVER_TILT,
{"entity_id": entity_id},
blocking=True,
)
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x08
assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage"
assert cluster.request.call_args[0][3] == 100
assert cluster.request.call_args[1]["expect_reply"] is True
# open from UI
with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
@ -176,6 +220,20 @@ async def test_cover(
assert cluster.request.call_args[0][2].command.name == "up_open"
assert cluster.request.call_args[1]["expect_reply"] is True
with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_OPEN_COVER_TILT,
{"entity_id": entity_id},
blocking=True,
)
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x08
assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage"
assert cluster.request.call_args[0][3] == 0
assert cluster.request.call_args[1]["expect_reply"] is True
# set position UI
with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
@ -191,6 +249,20 @@ async def test_cover(
assert cluster.request.call_args[0][3] == 53
assert cluster.request.call_args[1]["expect_reply"] is True
with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_SET_COVER_TILT_POSITION,
{"entity_id": entity_id, ATTR_TILT_POSITION: 47},
blocking=True,
)
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x08
assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage"
assert cluster.request.call_args[0][3] == 53
assert cluster.request.call_args[1]["expect_reply"] is True
# stop from UI
with patch("zigpy.zcl.Cluster.request", return_value=[0x2, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
@ -202,11 +274,39 @@ async def test_cover(
assert cluster.request.call_args[0][2].command.name == "stop"
assert cluster.request.call_args[1]["expect_reply"] is True
with patch("zigpy.zcl.Cluster.request", return_value=[0x2, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_STOP_COVER_TILT,
{"entity_id": entity_id},
blocking=True,
)
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x02
assert cluster.request.call_args[0][2].command.name == "stop"
assert cluster.request.call_args[1]["expect_reply"] is True
# test rejoin
cluster.PLUGGED_ATTR_READS = {"current_position_lift_percentage": 0}
await async_test_rejoin(hass, zigpy_cover_device, [cluster], (1,))
assert hass.states.get(entity_id).state == STATE_OPEN
# test toggle
with patch("zigpy.zcl.Cluster.request", return_value=[0x2, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_TOGGLE_COVER_TILT,
{"entity_id": entity_id},
blocking=True,
)
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x08
assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage"
assert cluster.request.call_args[0][3] == 100
assert cluster.request.call_args[1]["expect_reply"] is True
async def test_cover_failures(
hass: HomeAssistant, zha_device_joined_restored, zigpy_cover_device
@ -215,7 +315,10 @@ async def test_cover_failures(
# load up cover domain
cluster = zigpy_cover_device.endpoints.get(1).window_covering
cluster.PLUGGED_ATTR_READS = {"current_position_lift_percentage": 100}
cluster.PLUGGED_ATTR_READS = {
"current_position_lift_percentage": None,
"current_position_tilt_percentage": 42,
}
zha_device = await zha_device_joined_restored(zigpy_cover_device)
entity_id = find_entity_id(Platform.COVER, zha_device, hass)
@ -225,11 +328,17 @@ async def test_cover_failures(
# test that the cover was created and that it is unavailable
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# test update returned None
prev_call_count = cluster.read_attributes.call_count
await async_update_entity(hass, entity_id)
assert cluster.read_attributes.call_count == prev_call_count + 2
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, [zha_device])
await hass.async_block_till_done()
# test that the state has changed from unavailable to off
# test that the state has changed from unavailable to closed
await send_attributes_report(hass, cluster, {0: 0, 8: 100, 1: 1})
assert hass.states.get(entity_id).state == STATE_CLOSED
@ -258,6 +367,26 @@ async def test_cover_failures(
== closures.WindowCovering.ServerCommandDefs.down_close.id
)
with patch(
"zigpy.zcl.Cluster.request",
return_value=Default_Response(
command_id=closures.WindowCovering.ServerCommandDefs.go_to_tilt_percentage.id,
status=zcl_f.Status.UNSUP_CLUSTER_COMMAND,
),
):
with pytest.raises(HomeAssistantError, match=r"Failed to close cover tilt"):
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_CLOSE_COVER_TILT,
{"entity_id": entity_id},
blocking=True,
)
assert cluster.request.call_count == 1
assert (
cluster.request.call_args[0][1]
== closures.WindowCovering.ServerCommandDefs.go_to_tilt_percentage.id
)
# open from UI
with patch(
"zigpy.zcl.Cluster.request",
@ -279,6 +408,26 @@ async def test_cover_failures(
== closures.WindowCovering.ServerCommandDefs.up_open.id
)
with patch(
"zigpy.zcl.Cluster.request",
return_value=Default_Response(
command_id=closures.WindowCovering.ServerCommandDefs.go_to_tilt_percentage.id,
status=zcl_f.Status.UNSUP_CLUSTER_COMMAND,
),
):
with pytest.raises(HomeAssistantError, match=r"Failed to open cover tilt"):
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_OPEN_COVER_TILT,
{"entity_id": entity_id},
blocking=True,
)
assert cluster.request.call_count == 1
assert (
cluster.request.call_args[0][1]
== closures.WindowCovering.ServerCommandDefs.go_to_tilt_percentage.id
)
# set position UI
with patch(
"zigpy.zcl.Cluster.request",
@ -301,6 +450,28 @@ async def test_cover_failures(
== closures.WindowCovering.ServerCommandDefs.go_to_lift_percentage.id
)
with patch(
"zigpy.zcl.Cluster.request",
return_value=Default_Response(
command_id=closures.WindowCovering.ServerCommandDefs.go_to_tilt_percentage.id,
status=zcl_f.Status.UNSUP_CLUSTER_COMMAND,
),
):
with pytest.raises(
HomeAssistantError, match=r"Failed to set cover tilt position"
):
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_SET_COVER_TILT_POSITION,
{"entity_id": entity_id, "tilt_position": 42},
blocking=True,
)
assert cluster.request.call_count == 1
assert (
cluster.request.call_args[0][1]
== closures.WindowCovering.ServerCommandDefs.go_to_tilt_percentage.id
)
# stop from UI
with patch(
"zigpy.zcl.Cluster.request",
@ -499,11 +670,10 @@ async def test_shade(
assert cluster_level.request.call_args[0][1] in (0x0003, 0x0007)
async def test_restore_state(
async def test_shade_restore_state(
hass: HomeAssistant, zha_device_restored, zigpy_shade_device
) -> None:
"""Ensure states are restored on startup."""
mock_restore_cache(
hass,
(
@ -521,11 +691,38 @@ async def test_restore_state(
entity_id = find_entity_id(Platform.COVER, zha_device, hass)
assert entity_id is not None
# test that the cover was created and that it is unavailable
# test that the cover was created and that it is available
assert hass.states.get(entity_id).state == STATE_OPEN
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 50
async def test_cover_restore_state(
hass: HomeAssistant, zha_device_restored, zigpy_cover_device
) -> None:
"""Ensure states are restored on startup."""
mock_restore_cache(
hass,
(
State(
"cover.fakemanufacturer_fakemodel_cover",
STATE_OPEN,
{ATTR_CURRENT_POSITION: 50, ATTR_CURRENT_TILT_POSITION: 42},
),
),
)
hass.state = CoreState.starting
zha_device = await zha_device_restored(zigpy_cover_device)
entity_id = find_entity_id(Platform.COVER, zha_device, hass)
assert entity_id is not None
# test that the cover was created and that it is available
assert hass.states.get(entity_id).state == STATE_OPEN
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 50
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_TILT_POSITION] == 42
async def test_keen_vent(
hass: HomeAssistant, zha_device_joined_restored, zigpy_keen_vent
) -> None: