From d25b4aae14906c5dd73ca7906497230910989973 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Bed=C5=99ich?= Date: Tue, 24 Oct 2023 21:40:41 +0200 Subject: [PATCH] Add ZHA cover tilt (#102072) * cover tilt reimplementation * rework tilt test * Fix ZHA cover tests * Match ZHA cover tilt code-style with the rest * Increase coverage for ZHA cover, optimize update --------- Co-authored-by: josef109 --- .../zha/core/cluster_handlers/closures.py | 25 ++- homeassistant/components/zha/cover.py | 68 ++++-- tests/components/zha/test_cover.py | 211 +++++++++++++++++- 3 files changed, 270 insertions(+), 34 deletions(-) diff --git a/homeassistant/components/zha/core/cluster_handlers/closures.py b/homeassistant/components/zha/core/cluster_handlers/closures.py index 4262a16800d..980a6f88a75 100644 --- a/homeassistant/components/zha/core/cluster_handlers/closures.py +++ b/homeassistant/components/zha/core/cluster_handlers/closures.py @@ -124,11 +124,19 @@ class WindowCoveringClient(ClientClusterHandler): class WindowCovering(ClusterHandler): """Window cluster handler.""" - _value_attribute = 8 + _value_attribute_lift = ( + closures.WindowCovering.AttributeDefs.current_position_lift_percentage.id + ) + _value_attribute_tilt = ( + closures.WindowCovering.AttributeDefs.current_position_tilt_percentage.id + ) REPORT_CONFIG = ( AttrReportConfig( attr="current_position_lift_percentage", config=REPORT_CONFIG_IMMEDIATE ), + AttrReportConfig( + attr="current_position_tilt_percentage", config=REPORT_CONFIG_IMMEDIATE + ), ) async def async_update(self): @@ -140,10 +148,21 @@ class WindowCovering(ClusterHandler): if result is not None: self.async_send_signal( f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", - 8, + self._value_attribute_lift, "current_position_lift_percentage", result, ) + result = await self.get_attribute_value( + "current_position_tilt_percentage", from_cache=False + ) + self.debug("read current tilt position: %s", result) + if result is not None: + self.async_send_signal( + f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", + self._value_attribute_tilt, + "current_position_tilt_percentage", + result, + ) @callback def attribute_updated(self, attrid: int, value: Any, _: Any) -> None: @@ -152,7 +171,7 @@ class WindowCovering(ClusterHandler): self.debug( "Attribute report '%s'[%s] = %s", self.cluster.name, attr_name, value ) - if attrid == self._value_attribute: + if attrid in (self._value_attribute_lift, self._value_attribute_tilt): self.async_send_signal( f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", attrid, attr_name, value ) diff --git a/homeassistant/components/zha/cover.py b/homeassistant/components/zha/cover.py index d142aa2726b..f36cbc13533 100644 --- a/homeassistant/components/zha/cover.py +++ b/homeassistant/components/zha/cover.py @@ -11,6 +11,7 @@ from zigpy.zcl.foundation import Status from homeassistant.components.cover import ( ATTR_CURRENT_POSITION, ATTR_POSITION, + ATTR_TILT_POSITION, CoverDeviceClass, CoverEntity, ) @@ -80,6 +81,7 @@ class ZhaCover(ZhaEntity, CoverEntity): super().__init__(unique_id, zha_device, cluster_handlers, **kwargs) self._cover_cluster_handler = self.cluster_handlers.get(CLUSTER_HANDLER_COVER) self._current_position = None + self._tilt_position = None async def async_added_to_hass(self) -> None: """Run when about to be added to hass.""" @@ -94,6 +96,10 @@ class ZhaCover(ZhaEntity, CoverEntity): self._state = last_state.state if "current_position" in last_state.attributes: self._current_position = last_state.attributes["current_position"] + if "current_tilt_position" in last_state.attributes: + self._tilt_position = last_state.attributes[ + "current_tilt_position" + ] # first allocation activate tilt @property def is_closed(self) -> bool | None: @@ -120,11 +126,20 @@ class ZhaCover(ZhaEntity, CoverEntity): """ return self._current_position + @property + def current_cover_tilt_position(self) -> int | None: + """Return the current tilt position of the cover.""" + return self._tilt_position + @callback def async_set_position(self, attr_id, attr_name, value): """Handle position update from cluster handler.""" - _LOGGER.debug("setting position: %s", value) - self._current_position = 100 - value + _LOGGER.debug("setting position: %s %s %s", attr_id, attr_name, value) + if attr_name == "current_position_lift_percentage": + self._current_position = 100 - value + elif attr_name == "current_position_tilt_percentage": + self._tilt_position = 100 - value + if self._current_position == 0: self._state = STATE_CLOSED elif self._current_position == 100: @@ -145,6 +160,13 @@ class ZhaCover(ZhaEntity, CoverEntity): raise HomeAssistantError(f"Failed to open cover: {res[1]}") self.async_update_state(STATE_OPENING) + async def async_open_cover_tilt(self, **kwargs: Any) -> None: + """Open the cover tilt.""" + res = await self._cover_cluster_handler.go_to_tilt_percentage(0) + if res[1] is not Status.SUCCESS: + raise HomeAssistantError(f"Failed to open cover tilt: {res[1]}") + self.async_update_state(STATE_OPENING) + async def async_close_cover(self, **kwargs: Any) -> None: """Close the window cover.""" res = await self._cover_cluster_handler.down_close() @@ -152,6 +174,13 @@ class ZhaCover(ZhaEntity, CoverEntity): raise HomeAssistantError(f"Failed to close cover: {res[1]}") self.async_update_state(STATE_CLOSING) + async def async_close_cover_tilt(self, **kwargs: Any) -> None: + """Close the cover tilt.""" + res = await self._cover_cluster_handler.go_to_tilt_percentage(100) + if res[1] is not Status.SUCCESS: + raise HomeAssistantError(f"Failed to close cover tilt: {res[1]}") + self.async_update_state(STATE_CLOSING) + async def async_set_cover_position(self, **kwargs: Any) -> None: """Move the roller shutter to a specific position.""" new_pos = kwargs[ATTR_POSITION] @@ -162,6 +191,16 @@ class ZhaCover(ZhaEntity, CoverEntity): STATE_CLOSING if new_pos < self._current_position else STATE_OPENING ) + async def async_set_cover_tilt_position(self, **kwargs: Any) -> None: + """Move the cover til to a specific position.""" + new_pos = kwargs[ATTR_TILT_POSITION] + res = await self._cover_cluster_handler.go_to_tilt_percentage(100 - new_pos) + if res[1] is not Status.SUCCESS: + raise HomeAssistantError(f"Failed to set cover tilt position: {res[1]}") + self.async_update_state( + STATE_CLOSING if new_pos < self._tilt_position else STATE_OPENING + ) + async def async_stop_cover(self, **kwargs: Any) -> None: """Stop the window cover.""" res = await self._cover_cluster_handler.stop() @@ -170,28 +209,9 @@ class ZhaCover(ZhaEntity, CoverEntity): self._state = STATE_OPEN if self._current_position > 0 else STATE_CLOSED self.async_write_ha_state() - async def async_update(self) -> None: - """Attempt to retrieve the open/close state of the cover.""" - await super().async_update() - await self.async_get_state() - - async def async_get_state(self, from_cache=True): - """Fetch the current state.""" - _LOGGER.debug("polling current state") - if self._cover_cluster_handler: - pos = await self._cover_cluster_handler.get_attribute_value( - "current_position_lift_percentage", from_cache=from_cache - ) - _LOGGER.debug("read pos=%s", pos) - - if pos is not None: - self._current_position = 100 - pos - self._state = ( - STATE_OPEN if self.current_cover_position > 0 else STATE_CLOSED - ) - else: - self._current_position = None - self._state = None + async def async_stop_cover_tilt(self, **kwargs: Any) -> None: + """Stop the cover tilt.""" + await self.async_stop_cover() @MULTI_MATCH( diff --git a/tests/components/zha/test_cover.py b/tests/components/zha/test_cover.py index 08f84613ff3..0adb7583d31 100644 --- a/tests/components/zha/test_cover.py +++ b/tests/components/zha/test_cover.py @@ -11,11 +11,18 @@ import zigpy.zcl.foundation as zcl_f from homeassistant.components.cover import ( ATTR_CURRENT_POSITION, + ATTR_CURRENT_TILT_POSITION, + ATTR_TILT_POSITION, DOMAIN as COVER_DOMAIN, SERVICE_CLOSE_COVER, + SERVICE_CLOSE_COVER_TILT, SERVICE_OPEN_COVER, + SERVICE_OPEN_COVER_TILT, SERVICE_SET_COVER_POSITION, + SERVICE_SET_COVER_TILT_POSITION, SERVICE_STOP_COVER, + SERVICE_STOP_COVER_TILT, + SERVICE_TOGGLE_COVER_TILT, ) from homeassistant.components.zha.core.const import ZHA_EVENT from homeassistant.const import ( @@ -27,6 +34,7 @@ from homeassistant.const import ( ) from homeassistant.core import CoreState, HomeAssistant, State from homeassistant.exceptions import HomeAssistantError +from homeassistant.helpers.entity_component import async_update_entity from .common import ( async_enable_traffic, @@ -64,7 +72,7 @@ def zigpy_cover_device(zigpy_device_mock): endpoints = { 1: { SIG_EP_PROFILE: zigpy.profiles.zha.PROFILE_ID, - SIG_EP_TYPE: zigpy.profiles.zha.DeviceType.IAS_ZONE, + SIG_EP_TYPE: zigpy.profiles.zha.DeviceType.WINDOW_COVERING_DEVICE, SIG_EP_INPUT: [closures.WindowCovering.cluster_id], SIG_EP_OUTPUT: [], } @@ -130,10 +138,14 @@ async def test_cover( # load up cover domain cluster = zigpy_cover_device.endpoints.get(1).window_covering - cluster.PLUGGED_ATTR_READS = {"current_position_lift_percentage": 100} + cluster.PLUGGED_ATTR_READS = { + "current_position_lift_percentage": 65, + "current_position_tilt_percentage": 42, + } zha_device = await zha_device_joined_restored(zigpy_cover_device) assert cluster.read_attributes.call_count == 1 assert "current_position_lift_percentage" in cluster.read_attributes.call_args[0][0] + assert "current_position_tilt_percentage" in cluster.read_attributes.call_args[0][0] entity_id = find_entity_id(Platform.COVER, zha_device, hass) assert entity_id is not None @@ -146,6 +158,16 @@ async def test_cover( await async_enable_traffic(hass, [zha_device]) await hass.async_block_till_done() + # test update + prev_call_count = cluster.read_attributes.call_count + await async_update_entity(hass, entity_id) + assert cluster.read_attributes.call_count == prev_call_count + 2 + state = hass.states.get(entity_id) + assert state + assert state.state == STATE_OPEN + assert state.attributes[ATTR_CURRENT_POSITION] == 35 + assert state.attributes[ATTR_CURRENT_TILT_POSITION] == 58 + # test that the state has changed from unavailable to off await send_attributes_report(hass, cluster, {0: 0, 8: 100, 1: 1}) assert hass.states.get(entity_id).state == STATE_CLOSED @@ -154,6 +176,14 @@ async def test_cover( await send_attributes_report(hass, cluster, {0: 1, 8: 0, 1: 100}) assert hass.states.get(entity_id).state == STATE_OPEN + # test that the state remains after tilting to 100% + await send_attributes_report(hass, cluster, {0: 0, 9: 100, 1: 1}) + assert hass.states.get(entity_id).state == STATE_OPEN + + # test to see the state remains after tilting to 0% + await send_attributes_report(hass, cluster, {0: 1, 9: 0, 1: 100}) + assert hass.states.get(entity_id).state == STATE_OPEN + # close from UI with patch("zigpy.zcl.Cluster.request", return_value=[0x1, zcl_f.Status.SUCCESS]): await hass.services.async_call( @@ -165,6 +195,20 @@ async def test_cover( assert cluster.request.call_args[0][2].command.name == "down_close" assert cluster.request.call_args[1]["expect_reply"] is True + with patch("zigpy.zcl.Cluster.request", return_value=[0x1, zcl_f.Status.SUCCESS]): + await hass.services.async_call( + COVER_DOMAIN, + SERVICE_CLOSE_COVER_TILT, + {"entity_id": entity_id}, + blocking=True, + ) + assert cluster.request.call_count == 1 + assert cluster.request.call_args[0][0] is False + assert cluster.request.call_args[0][1] == 0x08 + assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage" + assert cluster.request.call_args[0][3] == 100 + assert cluster.request.call_args[1]["expect_reply"] is True + # open from UI with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]): await hass.services.async_call( @@ -176,6 +220,20 @@ async def test_cover( assert cluster.request.call_args[0][2].command.name == "up_open" assert cluster.request.call_args[1]["expect_reply"] is True + with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]): + await hass.services.async_call( + COVER_DOMAIN, + SERVICE_OPEN_COVER_TILT, + {"entity_id": entity_id}, + blocking=True, + ) + assert cluster.request.call_count == 1 + assert cluster.request.call_args[0][0] is False + assert cluster.request.call_args[0][1] == 0x08 + assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage" + assert cluster.request.call_args[0][3] == 0 + assert cluster.request.call_args[1]["expect_reply"] is True + # set position UI with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): await hass.services.async_call( @@ -191,6 +249,20 @@ async def test_cover( assert cluster.request.call_args[0][3] == 53 assert cluster.request.call_args[1]["expect_reply"] is True + with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + await hass.services.async_call( + COVER_DOMAIN, + SERVICE_SET_COVER_TILT_POSITION, + {"entity_id": entity_id, ATTR_TILT_POSITION: 47}, + blocking=True, + ) + assert cluster.request.call_count == 1 + assert cluster.request.call_args[0][0] is False + assert cluster.request.call_args[0][1] == 0x08 + assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage" + assert cluster.request.call_args[0][3] == 53 + assert cluster.request.call_args[1]["expect_reply"] is True + # stop from UI with patch("zigpy.zcl.Cluster.request", return_value=[0x2, zcl_f.Status.SUCCESS]): await hass.services.async_call( @@ -202,11 +274,39 @@ async def test_cover( assert cluster.request.call_args[0][2].command.name == "stop" assert cluster.request.call_args[1]["expect_reply"] is True + with patch("zigpy.zcl.Cluster.request", return_value=[0x2, zcl_f.Status.SUCCESS]): + await hass.services.async_call( + COVER_DOMAIN, + SERVICE_STOP_COVER_TILT, + {"entity_id": entity_id}, + blocking=True, + ) + assert cluster.request.call_count == 1 + assert cluster.request.call_args[0][0] is False + assert cluster.request.call_args[0][1] == 0x02 + assert cluster.request.call_args[0][2].command.name == "stop" + assert cluster.request.call_args[1]["expect_reply"] is True + # test rejoin cluster.PLUGGED_ATTR_READS = {"current_position_lift_percentage": 0} await async_test_rejoin(hass, zigpy_cover_device, [cluster], (1,)) assert hass.states.get(entity_id).state == STATE_OPEN + # test toggle + with patch("zigpy.zcl.Cluster.request", return_value=[0x2, zcl_f.Status.SUCCESS]): + await hass.services.async_call( + COVER_DOMAIN, + SERVICE_TOGGLE_COVER_TILT, + {"entity_id": entity_id}, + blocking=True, + ) + assert cluster.request.call_count == 1 + assert cluster.request.call_args[0][0] is False + assert cluster.request.call_args[0][1] == 0x08 + assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage" + assert cluster.request.call_args[0][3] == 100 + assert cluster.request.call_args[1]["expect_reply"] is True + async def test_cover_failures( hass: HomeAssistant, zha_device_joined_restored, zigpy_cover_device @@ -215,7 +315,10 @@ async def test_cover_failures( # load up cover domain cluster = zigpy_cover_device.endpoints.get(1).window_covering - cluster.PLUGGED_ATTR_READS = {"current_position_lift_percentage": 100} + cluster.PLUGGED_ATTR_READS = { + "current_position_lift_percentage": None, + "current_position_tilt_percentage": 42, + } zha_device = await zha_device_joined_restored(zigpy_cover_device) entity_id = find_entity_id(Platform.COVER, zha_device, hass) @@ -225,11 +328,17 @@ async def test_cover_failures( # test that the cover was created and that it is unavailable assert hass.states.get(entity_id).state == STATE_UNAVAILABLE + # test update returned None + prev_call_count = cluster.read_attributes.call_count + await async_update_entity(hass, entity_id) + assert cluster.read_attributes.call_count == prev_call_count + 2 + assert hass.states.get(entity_id).state == STATE_UNAVAILABLE + # allow traffic to flow through the gateway and device await async_enable_traffic(hass, [zha_device]) await hass.async_block_till_done() - # test that the state has changed from unavailable to off + # test that the state has changed from unavailable to closed await send_attributes_report(hass, cluster, {0: 0, 8: 100, 1: 1}) assert hass.states.get(entity_id).state == STATE_CLOSED @@ -258,6 +367,26 @@ async def test_cover_failures( == closures.WindowCovering.ServerCommandDefs.down_close.id ) + with patch( + "zigpy.zcl.Cluster.request", + return_value=Default_Response( + command_id=closures.WindowCovering.ServerCommandDefs.go_to_tilt_percentage.id, + status=zcl_f.Status.UNSUP_CLUSTER_COMMAND, + ), + ): + with pytest.raises(HomeAssistantError, match=r"Failed to close cover tilt"): + await hass.services.async_call( + COVER_DOMAIN, + SERVICE_CLOSE_COVER_TILT, + {"entity_id": entity_id}, + blocking=True, + ) + assert cluster.request.call_count == 1 + assert ( + cluster.request.call_args[0][1] + == closures.WindowCovering.ServerCommandDefs.go_to_tilt_percentage.id + ) + # open from UI with patch( "zigpy.zcl.Cluster.request", @@ -279,6 +408,26 @@ async def test_cover_failures( == closures.WindowCovering.ServerCommandDefs.up_open.id ) + with patch( + "zigpy.zcl.Cluster.request", + return_value=Default_Response( + command_id=closures.WindowCovering.ServerCommandDefs.go_to_tilt_percentage.id, + status=zcl_f.Status.UNSUP_CLUSTER_COMMAND, + ), + ): + with pytest.raises(HomeAssistantError, match=r"Failed to open cover tilt"): + await hass.services.async_call( + COVER_DOMAIN, + SERVICE_OPEN_COVER_TILT, + {"entity_id": entity_id}, + blocking=True, + ) + assert cluster.request.call_count == 1 + assert ( + cluster.request.call_args[0][1] + == closures.WindowCovering.ServerCommandDefs.go_to_tilt_percentage.id + ) + # set position UI with patch( "zigpy.zcl.Cluster.request", @@ -301,6 +450,28 @@ async def test_cover_failures( == closures.WindowCovering.ServerCommandDefs.go_to_lift_percentage.id ) + with patch( + "zigpy.zcl.Cluster.request", + return_value=Default_Response( + command_id=closures.WindowCovering.ServerCommandDefs.go_to_tilt_percentage.id, + status=zcl_f.Status.UNSUP_CLUSTER_COMMAND, + ), + ): + with pytest.raises( + HomeAssistantError, match=r"Failed to set cover tilt position" + ): + await hass.services.async_call( + COVER_DOMAIN, + SERVICE_SET_COVER_TILT_POSITION, + {"entity_id": entity_id, "tilt_position": 42}, + blocking=True, + ) + assert cluster.request.call_count == 1 + assert ( + cluster.request.call_args[0][1] + == closures.WindowCovering.ServerCommandDefs.go_to_tilt_percentage.id + ) + # stop from UI with patch( "zigpy.zcl.Cluster.request", @@ -499,11 +670,10 @@ async def test_shade( assert cluster_level.request.call_args[0][1] in (0x0003, 0x0007) -async def test_restore_state( +async def test_shade_restore_state( hass: HomeAssistant, zha_device_restored, zigpy_shade_device ) -> None: """Ensure states are restored on startup.""" - mock_restore_cache( hass, ( @@ -521,11 +691,38 @@ async def test_restore_state( entity_id = find_entity_id(Platform.COVER, zha_device, hass) assert entity_id is not None - # test that the cover was created and that it is unavailable + # test that the cover was created and that it is available assert hass.states.get(entity_id).state == STATE_OPEN assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 50 +async def test_cover_restore_state( + hass: HomeAssistant, zha_device_restored, zigpy_cover_device +) -> None: + """Ensure states are restored on startup.""" + mock_restore_cache( + hass, + ( + State( + "cover.fakemanufacturer_fakemodel_cover", + STATE_OPEN, + {ATTR_CURRENT_POSITION: 50, ATTR_CURRENT_TILT_POSITION: 42}, + ), + ), + ) + + hass.state = CoreState.starting + + zha_device = await zha_device_restored(zigpy_cover_device) + entity_id = find_entity_id(Platform.COVER, zha_device, hass) + assert entity_id is not None + + # test that the cover was created and that it is available + assert hass.states.get(entity_id).state == STATE_OPEN + assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 50 + assert hass.states.get(entity_id).attributes[ATTR_CURRENT_TILT_POSITION] == 42 + + async def test_keen_vent( hass: HomeAssistant, zha_device_joined_restored, zigpy_keen_vent ) -> None: