Wrap most ZHA exceptions in `HomeAssistantError` (#98421)

* Wrap attribute writes in a helper throwing `HomeAssistantError`

* Do not check for `Exception` instances, they are now propagated

* Write `cie_addr` synchronously

* Fix unnecessary `if` in `async_set_native_value`

* Fix unit tests

* Use `HomeAssistantError` in cover commands

* Revert writing `cie_addr` synchronously

* Disallow proxying of some cluster methods to fix unit test warnings

* Unit test cover failures to increase coverage

* Unit test missing climate device

* Unit test remaining cover commands
pull/99248/head
puddly 2023-08-28 17:24:12 -04:00 committed by GitHub
parent 97fd73f9f7
commit 23839a7f10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 628 additions and 399 deletions

View File

@ -6,9 +6,6 @@ import functools
import logging
from typing import TYPE_CHECKING, Any, Self
import zigpy.exceptions
from zigpy.zcl.foundation import Status
from homeassistant.components.button import ButtonDeviceClass, ButtonEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EntityCategory, Platform
@ -134,17 +131,10 @@ class ZHAAttributeButton(ZhaEntity, ButtonEntity):
async def async_press(self) -> None:
"""Write attribute with defined value."""
try:
result = await self._cluster_handler.cluster.write_attributes(
{self._attribute_name: self._attribute_value}
)
except zigpy.exceptions.ZigbeeException as ex:
self.error("Could not set value: %s", ex)
return
if not isinstance(result, Exception) and all(
record.status == Status.SUCCESS for record in result[0]
):
self.async_write_ha_state()
await self._cluster_handler.write_attributes_safe(
{self._attribute_name: self._attribute_value}
)
self.async_write_ha_state()
@CONFIG_DIAGNOSTIC_MATCH(

View File

@ -416,15 +416,12 @@ class Thermostat(ZhaEntity, ClimateEntity):
if self.preset_mode not in (
preset_mode,
PRESET_NONE,
) and not await self.async_preset_handler(self.preset_mode, enable=False):
self.debug("Couldn't turn off '%s' preset", self.preset_mode)
return
if preset_mode != PRESET_NONE and not await self.async_preset_handler(
preset_mode, enable=True
):
self.debug("Couldn't turn on '%s' preset", preset_mode)
return
await self.async_preset_handler(self.preset_mode, enable=False)
if preset_mode != PRESET_NONE:
await self.async_preset_handler(preset_mode, enable=True)
self._preset = preset_mode
self.async_write_ha_state()
@ -438,30 +435,29 @@ class Thermostat(ZhaEntity, ClimateEntity):
if hvac_mode is not None:
await self.async_set_hvac_mode(hvac_mode)
thrm = self._thrm
is_away = self.preset_mode == PRESET_AWAY
if self.hvac_mode == HVACMode.HEAT_COOL:
success = True
if low_temp is not None:
low_temp = int(low_temp * ZCL_TEMP)
success = success and await thrm.async_set_heating_setpoint(
low_temp, self.preset_mode == PRESET_AWAY
await self._thrm.async_set_heating_setpoint(
temperature=int(low_temp * ZCL_TEMP),
is_away=is_away,
)
self.debug("Setting heating %s setpoint: %s", low_temp, success)
if high_temp is not None:
high_temp = int(high_temp * ZCL_TEMP)
success = success and await thrm.async_set_cooling_setpoint(
high_temp, self.preset_mode == PRESET_AWAY
await self._thrm.async_set_cooling_setpoint(
temperature=int(high_temp * ZCL_TEMP),
is_away=is_away,
)
self.debug("Setting cooling %s setpoint: %s", low_temp, success)
elif temp is not None:
temp = int(temp * ZCL_TEMP)
if self.hvac_mode == HVACMode.COOL:
success = await thrm.async_set_cooling_setpoint(
temp, self.preset_mode == PRESET_AWAY
await self._thrm.async_set_cooling_setpoint(
temperature=int(temp * ZCL_TEMP),
is_away=is_away,
)
elif self.hvac_mode == HVACMode.HEAT:
success = await thrm.async_set_heating_setpoint(
temp, self.preset_mode == PRESET_AWAY
await self._thrm.async_set_heating_setpoint(
temperature=int(temp * ZCL_TEMP),
is_away=is_away,
)
else:
self.debug("Not setting temperature for '%s' mode", self.hvac_mode)
@ -470,14 +466,13 @@ class Thermostat(ZhaEntity, ClimateEntity):
self.debug("incorrect %s setting for '%s' mode", kwargs, self.hvac_mode)
return
if success:
self.async_write_ha_state()
self.async_write_ha_state()
async def async_preset_handler(self, preset: str, enable: bool = False) -> bool:
async def async_preset_handler(self, preset: str, enable: bool = False) -> None:
"""Set the preset mode via handler."""
handler = getattr(self, f"async_preset_handler_{preset}")
return await handler(enable)
await handler(enable)
@MULTI_MATCH(
@ -529,7 +524,7 @@ class SinopeTechnologiesThermostat(Thermostat):
self.debug("Updating time: %s", secs_2k)
self._manufacturer_ch.cluster.create_catching_task(
self._manufacturer_ch.cluster.write_attributes(
self._manufacturer_ch.write_attributes_safe(
{"secs_since_2k": secs_2k}, manufacturer=self.manufacturer
)
)
@ -544,16 +539,13 @@ class SinopeTechnologiesThermostat(Thermostat):
)
self._async_update_time()
async def async_preset_handler_away(self, is_away: bool = False) -> bool:
async def async_preset_handler_away(self, is_away: bool = False) -> None:
"""Set occupancy."""
mfg_code = self._zha_device.manufacturer_code
res = await self._thrm.write_attributes(
await self._thrm.write_attributes_safe(
{"set_occupancy": 0 if is_away else 1}, manufacturer=mfg_code
)
self.debug("set occupancy to %s. Status: %s", 0 if is_away else 1, res)
return res
@MULTI_MATCH(
cluster_handler_names=CLUSTER_HANDLER_THERMOSTAT,
@ -635,40 +627,38 @@ class MoesThermostat(Thermostat):
self._preset = PRESET_COMPLEX
await super().async_attribute_updated(record)
async def async_preset_handler(self, preset: str, enable: bool = False) -> bool:
async def async_preset_handler(self, preset: str, enable: bool = False) -> None:
"""Set the preset mode."""
mfg_code = self._zha_device.manufacturer_code
if not enable:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 2}, manufacturer=mfg_code
)
if preset == PRESET_AWAY:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 0}, manufacturer=mfg_code
)
if preset == PRESET_SCHEDULE:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 1}, manufacturer=mfg_code
)
if preset == PRESET_COMFORT:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 3}, manufacturer=mfg_code
)
if preset == PRESET_ECO:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 4}, manufacturer=mfg_code
)
if preset == PRESET_BOOST:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 5}, manufacturer=mfg_code
)
if preset == PRESET_COMPLEX:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 6}, manufacturer=mfg_code
)
return False
@STRICT_MATCH(
cluster_handler_names=CLUSTER_HANDLER_THERMOSTAT,
@ -714,36 +704,34 @@ class BecaThermostat(Thermostat):
self._preset = PRESET_TEMP_MANUAL
await super().async_attribute_updated(record)
async def async_preset_handler(self, preset: str, enable: bool = False) -> bool:
async def async_preset_handler(self, preset: str, enable: bool = False) -> None:
"""Set the preset mode."""
mfg_code = self._zha_device.manufacturer_code
if not enable:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 2}, manufacturer=mfg_code
)
if preset == PRESET_AWAY:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 0}, manufacturer=mfg_code
)
if preset == PRESET_SCHEDULE:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 1}, manufacturer=mfg_code
)
if preset == PRESET_ECO:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 4}, manufacturer=mfg_code
)
if preset == PRESET_BOOST:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 5}, manufacturer=mfg_code
)
if preset == PRESET_TEMP_MANUAL:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 7}, manufacturer=mfg_code
)
return False
@MULTI_MATCH(
cluster_handler_names=CLUSTER_HANDLER_THERMOSTAT,
@ -809,23 +797,22 @@ class ZONNSMARTThermostat(Thermostat):
self._preset = self.PRESET_FROST
await super().async_attribute_updated(record)
async def async_preset_handler(self, preset: str, enable: bool = False) -> bool:
async def async_preset_handler(self, preset: str, enable: bool = False) -> None:
"""Set the preset mode."""
mfg_code = self._zha_device.manufacturer_code
if not enable:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 1}, manufacturer=mfg_code
)
if preset == PRESET_SCHEDULE:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 0}, manufacturer=mfg_code
)
if preset == self.PRESET_HOLIDAY:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 3}, manufacturer=mfg_code
)
if preset == self.PRESET_FROST:
return await self._thrm.write_attributes(
return await self._thrm.write_attributes_safe(
{"operation_preset": 4}, manufacturer=mfg_code
)
return False

View File

@ -2,7 +2,8 @@
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable, Coroutine
from collections.abc import Awaitable, Callable, Coroutine, Iterator
import contextlib
from enum import Enum
import functools
import logging
@ -48,6 +49,7 @@ if TYPE_CHECKING:
_LOGGER = logging.getLogger(__name__)
RETRYABLE_REQUEST_DECORATOR = zigpy.util.retryable_request(tries=3)
UNPROXIED_CLUSTER_METHODS = {"general_command"}
_P = ParamSpec("_P")
@ -55,24 +57,31 @@ _FuncType = Callable[_P, Awaitable[Any]]
_ReturnFuncType = Callable[_P, Coroutine[Any, Any, Any]]
@contextlib.contextmanager
def wrap_zigpy_exceptions() -> Iterator[None]:
"""Wrap zigpy exceptions in `HomeAssistantError` exceptions."""
try:
yield
except asyncio.TimeoutError as exc:
raise HomeAssistantError(
"Failed to send request: device did not respond"
) from exc
except zigpy.exceptions.ZigbeeException as exc:
message = "Failed to send request"
if str(exc):
message = f"{message}: {exc}"
raise HomeAssistantError(message) from exc
def retry_request(func: _FuncType[_P]) -> _ReturnFuncType[_P]:
"""Send a request with retries and wrap expected zigpy exceptions."""
@functools.wraps(func)
async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> Any:
try:
with wrap_zigpy_exceptions():
return await RETRYABLE_REQUEST_DECORATOR(func)(*args, **kwargs)
except asyncio.TimeoutError as exc:
raise HomeAssistantError(
"Failed to send request: device did not respond"
) from exc
except zigpy.exceptions.ZigbeeException as exc:
message = "Failed to send request"
if str(exc):
message = f"{message}: {exc}"
raise HomeAssistantError(message) from exc
return wrapper
@ -501,6 +510,26 @@ class ClusterHandler(LogMixin):
get_attributes = functools.partialmethod(_get_attributes, False)
async def write_attributes_safe(
self, attributes: dict[str, Any], manufacturer: int | None = None
) -> None:
"""Wrap `write_attributes` to throw an exception on attribute write failure."""
res = await self.write_attributes(attributes, manufacturer=manufacturer)
for record in res[0]:
if record.status != Status.SUCCESS:
try:
name = self.cluster.attributes[record.attrid].name
value = attributes.get(name, "unknown")
except KeyError:
name = f"0x{record.attrid:04x}"
value = "unknown"
raise HomeAssistantError(
f"Failed to write attribute {name}={value}: {record.status}",
)
def log(self, level, msg, *args, **kwargs):
"""Log a message."""
msg = f"[%s:%s]: {msg}"
@ -509,11 +538,16 @@ class ClusterHandler(LogMixin):
def __getattr__(self, name):
"""Get attribute or a decorated cluster command."""
if hasattr(self._cluster, name) and callable(getattr(self._cluster, name)):
if (
hasattr(self._cluster, name)
and callable(getattr(self._cluster, name))
and name not in UNPROXIED_CLUSTER_METHODS
):
command = getattr(self._cluster, name)
command.__name__ = name
wrapped_command = retry_request(command)
wrapped_command.__name__ = name
return retry_request(command)
return wrapped_command
return self.__getattribute__(name)

View File

@ -1,7 +1,6 @@
"""General cluster handlers module for Zigbee Home Automation."""
from __future__ import annotations
import asyncio
from collections.abc import Coroutine
from typing import TYPE_CHECKING, Any
@ -12,6 +11,7 @@ from zigpy.zcl.clusters import general
from zigpy.zcl.foundation import Status
from homeassistant.core import callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.event import async_call_later
from .. import registries
@ -111,18 +111,9 @@ class AnalogOutput(ClusterHandler):
"""Return cached value of application_type."""
return self.cluster.get("application_type")
async def async_set_present_value(self, value: float) -> bool:
async def async_set_present_value(self, value: float) -> None:
"""Update present_value."""
try:
res = await self.cluster.write_attributes({"present_value": value})
except zigpy.exceptions.ZigbeeException as ex:
self.error("Could not set value: %s", ex)
return False
if not isinstance(res, Exception) and all(
record.status == Status.SUCCESS for record in res[0]
):
return True
return False
await self.write_attributes_safe({"present_value": value})
@registries.ZIGBEE_CLUSTER_HANDLER_REGISTRY.register(general.AnalogValue.cluster_id)
@ -392,21 +383,19 @@ class OnOffClusterHandler(ClusterHandler):
"""Return cached value of on/off attribute."""
return self.cluster.get("on_off")
async def turn_on(self) -> bool:
async def turn_on(self) -> None:
"""Turn the on off cluster on."""
result = await self.on()
if isinstance(result, Exception) or result[1] is not Status.SUCCESS:
return False
if result[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to turn on: {result[1]}")
self.cluster.update_attribute(self.ON_OFF, t.Bool.true)
return True
async def turn_off(self) -> bool:
async def turn_off(self) -> None:
"""Turn the on off cluster off."""
result = await self.off()
if isinstance(result, Exception) or result[1] is not Status.SUCCESS:
return False
if result[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to turn off: {result[1]}")
self.cluster.update_attribute(self.ON_OFF, t.Bool.false)
return True
@callback
def cluster_command(self, tsn, command_id, args):
@ -508,13 +497,7 @@ class PollControl(ClusterHandler):
async def async_configure_cluster_handler_specific(self) -> None:
"""Configure cluster handler: set check-in interval."""
try:
res = await self.cluster.write_attributes(
{"checkin_interval": self.CHECKIN_INTERVAL}
)
self.debug("%ss check-in interval set: %s", self.CHECKIN_INTERVAL / 4, res)
except (asyncio.TimeoutError, zigpy.exceptions.ZigbeeException) as ex:
self.debug("Couldn't set check-in interval: %s", ex)
await self.write_attributes_safe({"checkin_interval": self.CHECKIN_INTERVAL})
@callback
def cluster_command(

View File

@ -8,9 +8,7 @@ from __future__ import annotations
from collections import namedtuple
from typing import Any
from zigpy.exceptions import ZigbeeException
from zigpy.zcl.clusters import hvac
from zigpy.zcl.foundation import Status
from homeassistant.core import callback
@ -55,12 +53,7 @@ class FanClusterHandler(ClusterHandler):
async def async_set_speed(self, value) -> None:
"""Set the speed of the fan."""
try:
await self.cluster.write_attributes({"fan_mode": value})
except ZigbeeException as ex:
self.error("Could not set speed: %s", ex)
return
await self.write_attributes_safe({"fan_mode": value})
async def async_update(self) -> None:
"""Retrieve latest state."""
@ -247,71 +240,32 @@ class ThermostatClusterHandler(ClusterHandler):
async def async_set_operation_mode(self, mode) -> bool:
"""Set Operation mode."""
if not await self.write_attributes({"system_mode": mode}):
self.debug("couldn't set '%s' operation mode", mode)
return False
self.debug("set system to %s", mode)
await self.write_attributes_safe({"system_mode": mode})
return True
async def async_set_heating_setpoint(
self, temperature: int, is_away: bool = False
) -> bool:
"""Set heating setpoint."""
if is_away:
data = {"unoccupied_heating_setpoint": temperature}
else:
data = {"occupied_heating_setpoint": temperature}
if not await self.write_attributes(data):
self.debug("couldn't set heating setpoint")
return False
attr = "unoccupied_heating_setpoint" if is_away else "occupied_heating_setpoint"
await self.write_attributes_safe({attr: temperature})
return True
async def async_set_cooling_setpoint(
self, temperature: int, is_away: bool = False
) -> bool:
"""Set cooling setpoint."""
if is_away:
data = {"unoccupied_cooling_setpoint": temperature}
else:
data = {"occupied_cooling_setpoint": temperature}
if not await self.write_attributes(data):
self.debug("couldn't set cooling setpoint")
return False
self.debug("set cooling setpoint to %s", temperature)
attr = "unoccupied_cooling_setpoint" if is_away else "occupied_cooling_setpoint"
await self.write_attributes_safe({attr: temperature})
return True
async def get_occupancy(self) -> bool | None:
"""Get unreportable occupancy attribute."""
try:
res, fail = await self.cluster.read_attributes(["occupancy"])
self.debug("read 'occupancy' attr, success: %s, fail: %s", res, fail)
if "occupancy" not in res:
return None
return bool(self.occupancy)
except ZigbeeException as ex:
self.debug("Couldn't read 'occupancy' attribute: %s", ex)
res, fail = await self.read_attributes(["occupancy"])
self.debug("read 'occupancy' attr, success: %s, fail: %s", res, fail)
if "occupancy" not in res:
return None
async def write_attributes(self, data, **kwargs):
"""Write attributes helper."""
try:
res = await self.cluster.write_attributes(data, **kwargs)
except ZigbeeException as exc:
self.debug("couldn't write %s: %s", data, exc)
return False
self.debug("wrote %s attrs, Status: %s", data, res)
return self.check_result(res)
@staticmethod
def check_result(res: list) -> bool:
"""Normalize the result."""
if isinstance(res, Exception):
return False
return all(record.status == Status.SUCCESS for record in res[0])
return bool(self.occupancy)
@registries.ZIGBEE_CLUSTER_HANDLER_REGISTRY.register(hvac.UserInterface.cluster_id)

View File

@ -5,7 +5,6 @@ import logging
from typing import TYPE_CHECKING, Any
from zhaquirks.inovelli.types import AllLEDEffectType, SingleLEDEffectType
from zigpy.exceptions import ZigbeeException
import zigpy.zcl
from homeassistant.core import callback
@ -351,12 +350,7 @@ class IkeaAirPurifierClusterHandler(ClusterHandler):
async def async_set_speed(self, value) -> None:
"""Set the speed of the fan."""
try:
await self.cluster.write_attributes({"fan_mode": value})
except ZigbeeException as ex:
self.error("Could not set speed: %s", ex)
return
await self.write_attributes_safe({"fan_mode": value})
async def async_update(self) -> None:
"""Retrieve latest state."""

View File

@ -8,12 +8,12 @@ from __future__ import annotations
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
from zigpy.exceptions import ZigbeeException
import zigpy.zcl
from zigpy.zcl.clusters import security
from zigpy.zcl.clusters.security import IasAce as AceCluster, IasZone
from homeassistant.core import callback
from homeassistant.exceptions import HomeAssistantError
from .. import registries
from ..const import (
@ -350,8 +350,11 @@ class IASZoneClusterHandler(ClusterHandler):
self.debug("Updated alarm state: %s", zone_status)
elif command_id == 1:
self.debug("Enroll requested")
res = self._cluster.enroll_response(0, 0)
self._cluster.create_catching_task(res)
self._cluster.create_catching_task(
self.enroll_response(
enroll_response_code=IasZone.EnrollResponse.Success, zone_id=0
)
)
async def async_configure(self):
"""Configure IAS device."""
@ -366,14 +369,14 @@ class IASZoneClusterHandler(ClusterHandler):
ieee = self.cluster.endpoint.device.application.state.node_info.ieee
try:
res = await self._cluster.write_attributes({"cie_addr": ieee})
res = await self.write_attributes_safe({"cie_addr": ieee})
self.debug(
"wrote cie_addr: %s to '%s' cluster: %s",
str(ieee),
self._cluster.ep_attribute,
res[0],
)
except ZigbeeException as ex:
except HomeAssistantError as ex:
self.debug(
"Failed to write cie_addr: %s to '%s' cluster: %s",
str(ieee),
@ -382,7 +385,11 @@ class IASZoneClusterHandler(ClusterHandler):
)
self.debug("Sending pro-active IAS enroll response")
self._cluster.create_catching_task(self._cluster.enroll_response(0, 0))
self._cluster.create_catching_task(
self.enroll_response(
enroll_response_code=IasZone.EnrollResponse.Success, zone_id=0
)
)
self._status = ClusterHandlerStatus.CONFIGURED
self.debug("finished IASZoneClusterHandler configuration")

View File

@ -23,6 +23,7 @@ from homeassistant.const import (
Platform,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddEntitiesCallback
@ -139,30 +140,34 @@ class ZhaCover(ZhaEntity, CoverEntity):
async def async_open_cover(self, **kwargs: Any) -> None:
"""Open the window cover."""
res = await self._cover_cluster_handler.up_open()
if not isinstance(res, Exception) and res[1] is Status.SUCCESS:
self.async_update_state(STATE_OPENING)
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to open cover: {res[1]}")
self.async_update_state(STATE_OPENING)
async def async_close_cover(self, **kwargs: Any) -> None:
"""Close the window cover."""
res = await self._cover_cluster_handler.down_close()
if not isinstance(res, Exception) and res[1] is Status.SUCCESS:
self.async_update_state(STATE_CLOSING)
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to close cover: {res[1]}")
self.async_update_state(STATE_CLOSING)
async def async_set_cover_position(self, **kwargs: Any) -> None:
"""Move the roller shutter to a specific position."""
new_pos = kwargs[ATTR_POSITION]
res = await self._cover_cluster_handler.go_to_lift_percentage(100 - new_pos)
if not isinstance(res, Exception) and res[1] is Status.SUCCESS:
self.async_update_state(
STATE_CLOSING if new_pos < self._current_position else STATE_OPENING
)
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to set cover position: {res[1]}")
self.async_update_state(
STATE_CLOSING if new_pos < self._current_position else STATE_OPENING
)
async def async_stop_cover(self, **kwargs: Any) -> None:
"""Stop the window cover."""
res = await self._cover_cluster_handler.stop()
if not isinstance(res, Exception) and res[1] is Status.SUCCESS:
self._state = STATE_OPEN if self._current_position > 0 else STATE_CLOSED
self.async_write_ha_state()
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to stop cover: {res[1]}")
self._state = STATE_OPEN if self._current_position > 0 else STATE_CLOSED
self.async_write_ha_state()
async def async_update(self) -> None:
"""Attempt to retrieve the open/close state of the cover."""
@ -265,9 +270,8 @@ class Shade(ZhaEntity, CoverEntity):
async def async_open_cover(self, **kwargs: Any) -> None:
"""Open the window cover."""
res = await self._on_off_cluster_handler.on()
if isinstance(res, Exception) or res[1] != Status.SUCCESS:
self.debug("couldn't open cover: %s", res)
return
if res[1] != Status.SUCCESS:
raise HomeAssistantError(f"Failed to open cover: {res[1]}")
self._is_open = True
self.async_write_ha_state()
@ -275,9 +279,8 @@ class Shade(ZhaEntity, CoverEntity):
async def async_close_cover(self, **kwargs: Any) -> None:
"""Close the window cover."""
res = await self._on_off_cluster_handler.off()
if isinstance(res, Exception) or res[1] != Status.SUCCESS:
self.debug("couldn't open cover: %s", res)
return
if res[1] != Status.SUCCESS:
raise HomeAssistantError(f"Failed to close cover: {res[1]}")
self._is_open = False
self.async_write_ha_state()
@ -289,9 +292,8 @@ class Shade(ZhaEntity, CoverEntity):
new_pos * 255 / 100, 1
)
if isinstance(res, Exception) or res[1] != Status.SUCCESS:
self.debug("couldn't set cover's position: %s", res)
return
if res[1] != Status.SUCCESS:
raise HomeAssistantError(f"Failed to set cover position: {res[1]}")
self._position = new_pos
self.async_write_ha_state()
@ -299,9 +301,8 @@ class Shade(ZhaEntity, CoverEntity):
async def async_stop_cover(self, **kwargs: Any) -> None:
"""Stop the cover."""
res = await self._level_cluster_handler.stop()
if isinstance(res, Exception) or res[1] != Status.SUCCESS:
self.debug("couldn't stop cover: %s", res)
return
if res[1] != Status.SUCCESS:
raise HomeAssistantError(f"Failed to stop cover: {res[1]}")
@MULTI_MATCH(

View File

@ -6,7 +6,6 @@ import functools
import math
from typing import Any
from zigpy.exceptions import ZigbeeException
from zigpy.zcl.clusters import hvac
from homeassistant.components.fan import (
@ -28,6 +27,7 @@ from homeassistant.util.percentage import (
)
from .core import discovery
from .core.cluster_handlers import wrap_zigpy_exceptions
from .core.const import (
CLUSTER_HANDLER_FAN,
DATA_ZHA,
@ -207,10 +207,10 @@ class FanGroup(BaseFan, ZhaGroupEntity):
async def _async_set_fan_mode(self, fan_mode: int) -> None:
"""Set the fan mode for the group."""
try:
with wrap_zigpy_exceptions():
await self._fan_cluster_handler.write_attributes({"fan_mode": fan_mode})
except ZigbeeException as ex:
self.error("Could not set fan mode: %s", ex)
self.async_set_state(0, "fan_mode", fan_mode)
async def async_update(self) -> None:

View File

@ -298,7 +298,7 @@ class BaseLight(LogMixin, light.LightEntity):
transition_time=int(10 * self._DEFAULT_MIN_TRANSITION_TIME),
)
t_log["move_to_level_with_on_off"] = result
if isinstance(result, Exception) or result[1] is not Status.SUCCESS:
if result[1] is not Status.SUCCESS:
# First 'move to level' call failed, so if the transitioning delay
# isn't running from a previous call,
# the flag can be unset immediately
@ -338,7 +338,7 @@ class BaseLight(LogMixin, light.LightEntity):
transition_time=int(10 * duration),
)
t_log["move_to_level_with_on_off"] = result
if isinstance(result, Exception) or result[1] is not Status.SUCCESS:
if result[1] is not Status.SUCCESS:
# First 'move to level' call failed, so if the transitioning delay
# isn't running from a previous call, the flag can be unset immediately
if set_transition_flag and not self._transition_listener:
@ -359,7 +359,7 @@ class BaseLight(LogMixin, light.LightEntity):
# if brightness is not 0.
result = await self._on_off_cluster_handler.on()
t_log["on_off"] = result
if isinstance(result, Exception) or result[1] is not Status.SUCCESS:
if result[1] is not Status.SUCCESS:
# 'On' call failed, but as brightness may still transition
# (for FORCE_ON lights), we start the timer to unset the flag after
# the transition_time if necessary.
@ -391,7 +391,7 @@ class BaseLight(LogMixin, light.LightEntity):
level=level, transition_time=int(10 * duration)
)
t_log["move_to_level_if_color"] = result
if isinstance(result, Exception) or result[1] is not Status.SUCCESS:
if result[1] is not Status.SUCCESS:
self.debug("turned on: %s", t_log)
return
self._attr_state = bool(level)
@ -474,7 +474,7 @@ class BaseLight(LogMixin, light.LightEntity):
if self._zha_config_enable_light_transitioning_flag:
self.async_transition_start_timer(transition_time)
self.debug("turned off: %s", result)
if isinstance(result, Exception) or result[1] is not Status.SUCCESS:
if result[1] is not Status.SUCCESS:
return
self._attr_state = False
@ -514,7 +514,7 @@ class BaseLight(LogMixin, light.LightEntity):
transition_time=int(10 * transition_time),
)
t_log["move_to_color_temp"] = result
if isinstance(result, Exception) or result[1] is not Status.SUCCESS:
if result[1] is not Status.SUCCESS:
return False
self._attr_color_mode = ColorMode.COLOR_TEMP
self._attr_color_temp = temperature
@ -539,7 +539,7 @@ class BaseLight(LogMixin, light.LightEntity):
transition_time=int(10 * transition_time),
)
t_log["move_to_hue_and_saturation"] = result
if isinstance(result, Exception) or result[1] is not Status.SUCCESS:
if result[1] is not Status.SUCCESS:
return False
self._attr_color_mode = ColorMode.HS
self._attr_hs_color = hs_color
@ -554,7 +554,7 @@ class BaseLight(LogMixin, light.LightEntity):
transition_time=int(10 * transition_time),
)
t_log["move_to_color"] = result
if isinstance(result, Exception) or result[1] is not Status.SUCCESS:
if result[1] is not Status.SUCCESS:
return False
self._attr_color_mode = ColorMode.XY
self._attr_xy_color = xy_color

View File

@ -132,7 +132,7 @@ class ZhaDoorLock(ZhaEntity, LockEntity):
async def async_lock(self, **kwargs: Any) -> None:
"""Lock the lock."""
result = await self._doorlock_cluster_handler.lock_door()
if isinstance(result, Exception) or result[0] is not Status.SUCCESS:
if result[0] is not Status.SUCCESS:
self.error("Error with lock_door: %s", result)
return
self.async_write_ha_state()
@ -140,7 +140,7 @@ class ZhaDoorLock(ZhaEntity, LockEntity):
async def async_unlock(self, **kwargs: Any) -> None:
"""Unlock the lock."""
result = await self._doorlock_cluster_handler.unlock_door()
if isinstance(result, Exception) or result[0] is not Status.SUCCESS:
if result[0] is not Status.SUCCESS:
self.error("Error with unlock_door: %s", result)
return
self.async_write_ha_state()

View File

@ -5,9 +5,6 @@ import functools
import logging
from typing import TYPE_CHECKING, Any, Self
import zigpy.exceptions
from zigpy.zcl.foundation import Status
from homeassistant.components.number import NumberEntity, NumberMode
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EntityCategory, Platform, UnitOfMass, UnitOfTemperature
@ -362,9 +359,8 @@ class ZhaNumber(ZhaEntity, NumberEntity):
async def async_set_native_value(self, value: float) -> None:
"""Update the current value from HA."""
num_value = float(value)
if await self._analog_output_cluster_handler.async_set_present_value(num_value):
self.async_write_ha_state()
await self._analog_output_cluster_handler.async_set_present_value(float(value))
self.async_write_ha_state()
async def async_update(self) -> None:
"""Attempt to retrieve the state of the entity."""
@ -434,17 +430,10 @@ class ZHANumberConfigurationEntity(ZhaEntity, NumberEntity):
async def async_set_native_value(self, value: float) -> None:
"""Update the current value from HA."""
try:
res = await self._cluster_handler.cluster.write_attributes(
{self._zcl_attribute: int(value / self._attr_multiplier)}
)
except zigpy.exceptions.ZigbeeException as ex:
self.error("Could not set value: %s", ex)
return
if not isinstance(res, Exception) and all(
record.status == Status.SUCCESS for record in res[0]
):
self.async_write_ha_state()
await self._cluster_handler.write_attributes_safe(
{self._zcl_attribute: int(value / self._attr_multiplier)}
)
self.async_write_ha_state()
async def async_update(self) -> None:
"""Attempt to retrieve the state of the entity."""

View File

@ -210,7 +210,7 @@ class ZCLEnumSelectEntity(ZhaEntity, SelectEntity):
async def async_select_option(self, option: str) -> None:
"""Change the selected option."""
await self._cluster_handler.cluster.write_attributes(
await self._cluster_handler.write_attributes_safe(
{self._select_attr: self._enum[option.replace(" ", "_")]}
)
self.async_write_ha_state()

View File

@ -5,7 +5,6 @@ import functools
import logging
from typing import TYPE_CHECKING, Any, Self
import zigpy.exceptions
from zigpy.zcl.clusters.general import OnOff
from zigpy.zcl.foundation import Status
@ -85,16 +84,12 @@ class Switch(ZhaEntity, SwitchEntity):
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the entity on."""
result = await self._on_off_cluster_handler.turn_on()
if not result:
return
await self._on_off_cluster_handler.turn_on()
self.async_write_ha_state()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the entity off."""
result = await self._on_off_cluster_handler.turn_off()
if not result:
return
await self._on_off_cluster_handler.turn_off()
self.async_write_ha_state()
@callback
@ -145,7 +140,7 @@ class SwitchGroup(ZhaGroupEntity, SwitchEntity):
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the entity on."""
result = await self._on_off_cluster_handler.on()
if isinstance(result, Exception) or result[1] is not Status.SUCCESS:
if result[1] is not Status.SUCCESS:
return
self._state = True
self.async_write_ha_state()
@ -153,7 +148,7 @@ class SwitchGroup(ZhaGroupEntity, SwitchEntity):
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the entity off."""
result = await self._on_off_cluster_handler.off()
if isinstance(result, Exception) or result[1] is not Status.SUCCESS:
if result[1] is not Status.SUCCESS:
return
self._state = False
self.async_write_ha_state()
@ -241,17 +236,10 @@ class ZHASwitchConfigurationEntity(ZhaEntity, SwitchEntity):
async def async_turn_on_off(self, state: bool) -> None:
"""Turn the entity on or off."""
try:
result = await self._cluster_handler.cluster.write_attributes(
{self._zcl_attribute: not state if self.inverted else state}
)
except zigpy.exceptions.ZigbeeException as ex:
self.error("Could not set value: %s", ex)
return
if not isinstance(result, Exception) and all(
record.status == Status.SUCCESS for record in result[0]
):
self.async_write_ha_state()
await self._cluster_handler.write_attributes_safe(
{self._zcl_attribute: not state if self.inverted else state}
)
self.async_write_ha_state()
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the entity on."""

View File

@ -173,9 +173,8 @@ def async_find_group_entity_id(hass, domain, group):
entity_ids = hass.states.async_entity_ids(domain)
if entity_id in entity_ids:
return entity_id
return None
assert entity_id in entity_ids
return entity_id
async def async_enable_traffic(hass, zha_devices, enabled=True):

View File

@ -15,6 +15,7 @@ import zigpy.group
import zigpy.profiles
import zigpy.quirks
import zigpy.types
import zigpy.util
import zigpy.zdo.types as zdo_t
import homeassistant.components.zha.core.const as zha_const
@ -30,6 +31,17 @@ FIXTURE_GRP_ID = 0x1001
FIXTURE_GRP_NAME = "fixture group"
@pytest.fixture(scope="session", autouse=True)
def disable_request_retry_delay():
"""Disable ZHA request retrying delay to speed up failures."""
with patch(
"homeassistant.components.zha.core.cluster_handlers.RETRYABLE_REQUEST_DECORATOR",
zigpy.util.retryable_request(tries=3, delay=0),
):
yield
@pytest.fixture(scope="session", autouse=True)
def globally_load_quirks():
"""Load quirks automatically so that ZHA tests run deterministically in isolation.

View File

@ -30,6 +30,7 @@ from homeassistant.const import (
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import entity_registry as er
from .common import find_entity_id
@ -198,8 +199,9 @@ async def test_frost_unlock(hass: HomeAssistant, tuya_water_valve) -> None:
blocking=True,
)
await hass.async_block_till_done()
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"frost_lock_reset": 0})
assert cluster.write_attributes.mock_calls == [
call({"frost_lock_reset": 0}, manufacturer=None)
]
state = hass.states.get(entity_id)
assert state
@ -208,11 +210,17 @@ async def test_frost_unlock(hass: HomeAssistant, tuya_water_valve) -> None:
cluster.write_attributes.reset_mock()
cluster.write_attributes.side_effect = ZigbeeException
await hass.services.async_call(
DOMAIN,
SERVICE_PRESS,
{ATTR_ENTITY_ID: entity_id},
blocking=True,
)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"frost_lock_reset": 0})
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
DOMAIN,
SERVICE_PRESS,
{ATTR_ENTITY_ID: entity_id},
blocking=True,
)
# There are three retries
assert cluster.write_attributes.mock_calls == [
call({"frost_lock_reset": 0}, manufacturer=None),
call({"frost_lock_reset": 0}, manufacturer=None),
call({"frost_lock_reset": 0}, manufacturer=None),
]

View File

@ -1,8 +1,10 @@
"""Test ZHA climate."""
from unittest.mock import patch
from typing import Literal
from unittest.mock import call, patch
import pytest
import zhaquirks.sinope.thermostat
from zhaquirks.sinope.thermostat import SinopeTechnologiesThermostatCluster
import zhaquirks.tuya.ts0601_trv
import zigpy.profiles
import zigpy.types
@ -37,7 +39,12 @@ from homeassistant.components.climate import (
HVACMode,
)
from homeassistant.components.zha.climate import HVAC_MODE_2_SYSTEM, SEQ_OF_OPERATION
from homeassistant.components.zha.core.const import PRESET_COMPLEX, PRESET_SCHEDULE
from homeassistant.components.zha.core.const import (
PRESET_COMPLEX,
PRESET_SCHEDULE,
PRESET_TEMP_MANUAL,
)
from homeassistant.components.zha.core.device import ZHADevice
from homeassistant.const import (
ATTR_ENTITY_ID,
ATTR_TEMPERATURE,
@ -45,6 +52,7 @@ from homeassistant.const import (
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from .common import async_enable_traffic, find_entity_id, send_attributes_report
from .conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_PROFILE, SIG_EP_TYPE
@ -129,6 +137,23 @@ CLIMATE_MOES = {
}
}
CLIMATE_BECA = {
1: {
SIG_EP_PROFILE: zigpy.profiles.zha.PROFILE_ID,
SIG_EP_TYPE: zigpy.profiles.zha.DeviceType.SMART_PLUG,
SIG_EP_INPUT: [
zigpy.zcl.clusters.general.Basic.cluster_id,
zigpy.zcl.clusters.general.Groups.cluster_id,
zigpy.zcl.clusters.general.Scenes.cluster_id,
61148,
],
SIG_EP_OUTPUT: [
zigpy.zcl.clusters.general.Time.cluster_id,
zigpy.zcl.clusters.general.Ota.cluster_id,
],
}
}
CLIMATE_ZONNSMART = {
1: {
SIG_EP_PROFILE: zigpy.profiles.zha.PROFILE_ID,
@ -146,6 +171,7 @@ CLIMATE_ZONNSMART = {
MANUF_SINOPE = "Sinope Technologies"
MANUF_ZEN = "Zen Within"
MANUF_MOES = "_TZE200_ckud7u2l"
MANUF_BECA = "_TZE200_b6wax7g0"
MANUF_ZONNSMART = "_TZE200_hue3yfsn"
ZCL_ATTR_PLUG = {
@ -257,6 +283,17 @@ async def device_climate_moes(device_climate_mock):
)
@pytest.fixture
async def device_climate_beca(device_climate_mock) -> ZHADevice:
"""Beca thermostat."""
return await device_climate_mock(
CLIMATE_BECA,
manuf=MANUF_BECA,
quirk=zhaquirks.tuya.ts0601_trv.MoesHY368_Type1new,
)
@pytest.fixture
async def device_climate_zonnsmart(device_climate_mock):
"""ZONNSMART thermostat."""
@ -553,7 +590,11 @@ async def test_hvac_modes(
),
)
async def test_target_temperature(
hass: HomeAssistant, device_climate_mock, sys_mode, preset, target_temp
hass: HomeAssistant,
device_climate_mock,
sys_mode: Thermostat.SystemMode,
preset: Literal[PRESET_AWAY] | None,
target_temp: int,
) -> None:
"""Test target temperature property."""
@ -720,15 +761,23 @@ async def test_preset_setting(hass: HomeAssistant, device_climate_sinope) -> Non
# unsuccessful occupancy change
thrm_cluster.write_attributes.return_value = [
zcl_f.WriteAttributesResponse.deserialize(b"\x01\x00\x00")[0]
zcl_f.WriteAttributesResponse(
[
zcl_f.WriteAttributesStatusRecord(
status=zcl_f.Status.FAILURE,
attrid=SinopeTechnologiesThermostatCluster.AttributeDefs.set_occupancy.id,
)
]
)
]
await hass.services.async_call(
CLIMATE_DOMAIN,
SERVICE_SET_PRESET_MODE,
{ATTR_ENTITY_ID: entity_id, ATTR_PRESET_MODE: PRESET_AWAY},
blocking=True,
)
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
CLIMATE_DOMAIN,
SERVICE_SET_PRESET_MODE,
{ATTR_ENTITY_ID: entity_id, ATTR_PRESET_MODE: PRESET_AWAY},
blocking=True,
)
state = hass.states.get(entity_id)
assert state.attributes[ATTR_PRESET_MODE] == PRESET_NONE
@ -738,7 +787,9 @@ async def test_preset_setting(hass: HomeAssistant, device_climate_sinope) -> Non
# successful occupancy change
thrm_cluster.write_attributes.reset_mock()
thrm_cluster.write_attributes.return_value = [
zcl_f.WriteAttributesResponse.deserialize(b"\x00")[0]
zcl_f.WriteAttributesResponse(
[zcl_f.WriteAttributesStatusRecord(status=zcl_f.Status.SUCCESS)]
)
]
await hass.services.async_call(
CLIMATE_DOMAIN,
@ -755,14 +806,23 @@ async def test_preset_setting(hass: HomeAssistant, device_climate_sinope) -> Non
# unsuccessful occupancy change
thrm_cluster.write_attributes.reset_mock()
thrm_cluster.write_attributes.return_value = [
zcl_f.WriteAttributesResponse.deserialize(b"\x01\x01\x01")[0]
zcl_f.WriteAttributesResponse(
[
zcl_f.WriteAttributesStatusRecord(
status=zcl_f.Status.FAILURE,
attrid=SinopeTechnologiesThermostatCluster.AttributeDefs.set_occupancy.id,
)
]
)
]
await hass.services.async_call(
CLIMATE_DOMAIN,
SERVICE_SET_PRESET_MODE,
{ATTR_ENTITY_ID: entity_id, ATTR_PRESET_MODE: PRESET_NONE},
blocking=True,
)
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
CLIMATE_DOMAIN,
SERVICE_SET_PRESET_MODE,
{ATTR_ENTITY_ID: entity_id, ATTR_PRESET_MODE: PRESET_NONE},
blocking=True,
)
state = hass.states.get(entity_id)
assert state.attributes[ATTR_PRESET_MODE] == PRESET_AWAY
@ -772,7 +832,9 @@ async def test_preset_setting(hass: HomeAssistant, device_climate_sinope) -> Non
# successful occupancy change
thrm_cluster.write_attributes.reset_mock()
thrm_cluster.write_attributes.return_value = [
zcl_f.WriteAttributesResponse.deserialize(b"\x00")[0]
zcl_f.WriteAttributesResponse(
[zcl_f.WriteAttributesStatusRecord(status=zcl_f.Status.SUCCESS)]
)
]
await hass.services.async_call(
CLIMATE_DOMAIN,
@ -1386,6 +1448,49 @@ async def test_set_moes_operation_mode(
assert state.attributes[ATTR_PRESET_MODE] == PRESET_COMPLEX
@pytest.mark.parametrize(
("preset_attr", "preset_mode"),
[
(0, PRESET_AWAY),
(1, PRESET_SCHEDULE),
# (2, PRESET_NONE), # TODO: why does this not work?
(4, PRESET_ECO),
(5, PRESET_BOOST),
(7, PRESET_TEMP_MANUAL),
],
)
async def test_beca_operation_mode_update(
hass: HomeAssistant,
device_climate_beca: ZHADevice,
preset_attr: int,
preset_mode: str,
) -> None:
"""Test beca trv operation mode attribute update."""
entity_id = find_entity_id(Platform.CLIMATE, device_climate_beca, hass)
thrm_cluster = device_climate_beca.device.endpoints[1].thermostat
# Test sending an attribute report
await send_attributes_report(hass, thrm_cluster, {"operation_preset": preset_attr})
state = hass.states.get(entity_id)
assert state.attributes[ATTR_PRESET_MODE] == preset_mode
# Test setting the preset
await hass.services.async_call(
CLIMATE_DOMAIN,
SERVICE_SET_PRESET_MODE,
{ATTR_ENTITY_ID: entity_id, ATTR_PRESET_MODE: preset_mode},
blocking=True,
)
assert thrm_cluster.write_attributes.mock_calls == [
call(
{"operation_preset": preset_attr},
manufacturer=device_climate_beca.manufacturer_code,
)
]
async def test_set_zonnsmart_preset(
hass: HomeAssistant, device_climate_zonnsmart
) -> None:

View File

@ -39,6 +39,8 @@ from .conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_PROFILE, SIG_EP_TYPE
from tests.common import async_capture_events, mock_restore_cache
Default_Response = zcl_f.GENERAL_COMMANDS[zcl_f.GeneralCommand.Default_Response].schema
@pytest.fixture(autouse=True)
def cover_platform_only():
@ -206,6 +208,121 @@ async def test_cover(
assert hass.states.get(entity_id).state == STATE_OPEN
async def test_cover_failures(
hass: HomeAssistant, zha_device_joined_restored, zigpy_cover_device
) -> None:
"""Test ZHA cover platform failure cases."""
# load up cover domain
cluster = zigpy_cover_device.endpoints.get(1).window_covering
cluster.PLUGGED_ATTR_READS = {"current_position_lift_percentage": 100}
zha_device = await zha_device_joined_restored(zigpy_cover_device)
entity_id = find_entity_id(Platform.COVER, zha_device, hass)
assert entity_id is not None
await async_enable_traffic(hass, [zha_device], enabled=False)
# test that the cover was created and that it is unavailable
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, [zha_device])
await hass.async_block_till_done()
# test that the state has changed from unavailable to off
await send_attributes_report(hass, cluster, {0: 0, 8: 100, 1: 1})
assert hass.states.get(entity_id).state == STATE_CLOSED
# test to see if it opens
await send_attributes_report(hass, cluster, {0: 1, 8: 0, 1: 100})
assert hass.states.get(entity_id).state == STATE_OPEN
# close from UI
with patch(
"zigpy.zcl.Cluster.request",
return_value=Default_Response(
command_id=closures.WindowCovering.ServerCommandDefs.down_close.id,
status=zcl_f.Status.UNSUP_CLUSTER_COMMAND,
),
):
with pytest.raises(HomeAssistantError, match=r"Failed to close cover"):
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_CLOSE_COVER,
{"entity_id": entity_id},
blocking=True,
)
assert cluster.request.call_count == 1
assert (
cluster.request.call_args[0][1]
== closures.WindowCovering.ServerCommandDefs.down_close.id
)
# open from UI
with patch(
"zigpy.zcl.Cluster.request",
return_value=Default_Response(
command_id=closures.WindowCovering.ServerCommandDefs.up_open.id,
status=zcl_f.Status.UNSUP_CLUSTER_COMMAND,
),
):
with pytest.raises(HomeAssistantError, match=r"Failed to open cover"):
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_OPEN_COVER,
{"entity_id": entity_id},
blocking=True,
)
assert cluster.request.call_count == 1
assert (
cluster.request.call_args[0][1]
== closures.WindowCovering.ServerCommandDefs.up_open.id
)
# set position UI
with patch(
"zigpy.zcl.Cluster.request",
return_value=Default_Response(
command_id=closures.WindowCovering.ServerCommandDefs.go_to_lift_percentage.id,
status=zcl_f.Status.UNSUP_CLUSTER_COMMAND,
),
):
with pytest.raises(HomeAssistantError, match=r"Failed to set cover position"):
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_SET_COVER_POSITION,
{"entity_id": entity_id, "position": 47},
blocking=True,
)
assert cluster.request.call_count == 1
assert (
cluster.request.call_args[0][1]
== closures.WindowCovering.ServerCommandDefs.go_to_lift_percentage.id
)
# stop from UI
with patch(
"zigpy.zcl.Cluster.request",
return_value=Default_Response(
command_id=closures.WindowCovering.ServerCommandDefs.stop.id,
status=zcl_f.Status.UNSUP_CLUSTER_COMMAND,
),
):
with pytest.raises(HomeAssistantError, match=r"Failed to stop cover"):
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_STOP_COVER,
{"entity_id": entity_id},
blocking=True,
)
assert cluster.request.call_count == 1
assert (
cluster.request.call_args[0][1]
== closures.WindowCovering.ServerCommandDefs.stop.id
)
async def test_shade(
hass: HomeAssistant, zha_device_joined_restored, zigpy_shade_device
) -> None:
@ -236,7 +353,13 @@ async def test_shade(
assert hass.states.get(entity_id).state == STATE_OPEN
# close from UI command fails
with patch("zigpy.zcl.Cluster.request", side_effect=asyncio.TimeoutError):
with patch(
"zigpy.zcl.Cluster.request",
return_value=Default_Response(
command_id=closures.WindowCovering.ServerCommandDefs.down_close.id,
status=zcl_f.Status.UNSUP_CLUSTER_COMMAND,
),
):
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
COVER_DOMAIN,
@ -244,7 +367,7 @@ async def test_shade(
{"entity_id": entity_id},
blocking=True,
)
assert cluster_on_off.request.call_count == 3
assert cluster_on_off.request.call_count == 1
assert cluster_on_off.request.call_args[0][0] is False
assert cluster_on_off.request.call_args[0][1] == 0x0000
assert hass.states.get(entity_id).state == STATE_OPEN
@ -261,7 +384,13 @@ async def test_shade(
# open from UI command fails
assert ATTR_CURRENT_POSITION not in hass.states.get(entity_id).attributes
await send_attributes_report(hass, cluster_level, {0: 0})
with patch("zigpy.zcl.Cluster.request", side_effect=asyncio.TimeoutError):
with patch(
"zigpy.zcl.Cluster.request",
return_value=Default_Response(
command_id=closures.WindowCovering.ServerCommandDefs.up_open.id,
status=zcl_f.Status.UNSUP_CLUSTER_COMMAND,
),
):
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
COVER_DOMAIN,
@ -269,11 +398,35 @@ async def test_shade(
{"entity_id": entity_id},
blocking=True,
)
assert cluster_on_off.request.call_count == 3
assert cluster_on_off.request.call_count == 1
assert cluster_on_off.request.call_args[0][0] is False
assert cluster_on_off.request.call_args[0][1] == 0x0001
assert hass.states.get(entity_id).state == STATE_CLOSED
# stop from UI command fails
with patch(
"zigpy.zcl.Cluster.request",
return_value=Default_Response(
command_id=general.LevelControl.ServerCommandDefs.stop.id,
status=zcl_f.Status.UNSUP_CLUSTER_COMMAND,
),
):
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_STOP_COVER,
{"entity_id": entity_id},
blocking=True,
)
assert cluster_level.request.call_count == 1
assert cluster_level.request.call_args[0][0] is False
assert (
cluster_level.request.call_args[0][1]
== general.LevelControl.ServerCommandDefs.stop.id
)
assert hass.states.get(entity_id).state == STATE_CLOSED
# open from UI succeeds
with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
@ -285,7 +438,13 @@ async def test_shade(
assert hass.states.get(entity_id).state == STATE_OPEN
# set position UI command fails
with patch("zigpy.zcl.Cluster.request", side_effect=asyncio.TimeoutError):
with patch(
"zigpy.zcl.Cluster.request",
return_value=Default_Response(
command_id=closures.WindowCovering.ServerCommandDefs.go_to_lift_percentage.id,
status=zcl_f.Status.UNSUP_CLUSTER_COMMAND,
),
):
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
COVER_DOMAIN,
@ -293,7 +452,8 @@ async def test_shade(
{"entity_id": entity_id, "position": 47},
blocking=True,
)
assert cluster_level.request.call_count == 3
assert cluster_level.request.call_count == 1
assert cluster_level.request.call_args[0][0] is False
assert cluster_level.request.call_args[0][1] == 0x0004
assert int(cluster_level.request.call_args[0][3] * 100 / 255) == 47

View File

@ -3,6 +3,7 @@ from unittest.mock import AsyncMock, call, patch
import pytest
import zhaquirks.ikea.starkvind
from zigpy.device import Device
from zigpy.exceptions import ZigbeeException
from zigpy.profiles import zha
from zigpy.zcl.clusters import general, hvac
@ -17,6 +18,7 @@ from homeassistant.components.fan import (
SERVICE_SET_PRESET_MODE,
NotValidPresetModeError,
)
from homeassistant.components.zha.core.device import ZHADevice
from homeassistant.components.zha.core.discovery import GROUP_PROBE
from homeassistant.components.zha.core.group import GroupMember
from homeassistant.components.zha.fan import (
@ -34,6 +36,7 @@ from homeassistant.const import (
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.setup import async_setup_component
from .common import (
@ -192,26 +195,30 @@ async def test_fan(
# turn on from HA
cluster.write_attributes.reset_mock()
await async_turn_on(hass, entity_id)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"fan_mode": 2})
assert cluster.write_attributes.mock_calls == [
call({"fan_mode": 2}, manufacturer=None)
]
# turn off from HA
cluster.write_attributes.reset_mock()
await async_turn_off(hass, entity_id)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"fan_mode": 0})
assert cluster.write_attributes.mock_calls == [
call({"fan_mode": 0}, manufacturer=None)
]
# change speed from HA
cluster.write_attributes.reset_mock()
await async_set_percentage(hass, entity_id, percentage=100)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"fan_mode": 3})
assert cluster.write_attributes.mock_calls == [
call({"fan_mode": 3}, manufacturer=None)
]
# change preset_mode from HA
cluster.write_attributes.reset_mock()
await async_set_preset_mode(hass, entity_id, preset_mode=PRESET_MODE_ON)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"fan_mode": 4})
assert cluster.write_attributes.mock_calls == [
call({"fan_mode": 4}, manufacturer=None)
]
# set invalid preset_mode from HA
cluster.write_attributes.reset_mock()
@ -443,13 +450,14 @@ async def test_zha_group_fan_entity_failure_state(
# turn on from HA
group_fan_cluster.write_attributes.reset_mock()
await async_turn_on(hass, entity_id)
with pytest.raises(HomeAssistantError):
await async_turn_on(hass, entity_id)
await hass.async_block_till_done()
assert len(group_fan_cluster.write_attributes.mock_calls) == 1
assert group_fan_cluster.write_attributes.call_args[0][0] == {"fan_mode": 2}
assert "Could not set fan mode" in caplog.text
@pytest.mark.parametrize(
("plug_read", "expected_state", "expected_percentage"),
@ -557,7 +565,9 @@ def zigpy_device_ikea(zigpy_device_mock):
async def test_fan_ikea(
hass: HomeAssistant, zha_device_joined_restored, zigpy_device_ikea
hass: HomeAssistant,
zha_device_joined_restored: ZHADevice,
zigpy_device_ikea: Device,
) -> None:
"""Test ZHA fan Ikea platform."""
zha_device = await zha_device_joined_restored(zigpy_device_ikea)
@ -587,26 +597,30 @@ async def test_fan_ikea(
# turn on from HA
cluster.write_attributes.reset_mock()
await async_turn_on(hass, entity_id)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"fan_mode": 1})
assert cluster.write_attributes.mock_calls == [
call({"fan_mode": 1}, manufacturer=None)
]
# turn off from HA
cluster.write_attributes.reset_mock()
await async_turn_off(hass, entity_id)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"fan_mode": 0})
assert cluster.write_attributes.mock_calls == [
call({"fan_mode": 0}, manufacturer=None)
]
# change speed from HA
cluster.write_attributes.reset_mock()
await async_set_percentage(hass, entity_id, percentage=100)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"fan_mode": 10})
assert cluster.write_attributes.mock_calls == [
call({"fan_mode": 10}, manufacturer=None)
]
# change preset_mode from HA
cluster.write_attributes.reset_mock()
await async_set_preset_mode(hass, entity_id, preset_mode=PRESET_MODE_AUTO)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"fan_mode": 1})
assert cluster.write_attributes.mock_calls == [
call({"fan_mode": 1}, manufacturer=None)
]
# set invalid preset_mode from HA
cluster.write_attributes.reset_mock()

View File

@ -9,8 +9,10 @@ import zigpy.zcl.clusters.lighting as lighting
import zigpy.zcl.foundation as zcl_f
from homeassistant.components.number import DOMAIN as NUMBER_DOMAIN
from homeassistant.components.zha.core.device import ZHADevice
from homeassistant.const import STATE_UNAVAILABLE, EntityCategory, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import entity_registry as er
from homeassistant.setup import async_setup_component
@ -160,8 +162,9 @@ async def test_number(
{"entity_id": entity_id, "value": 30.0},
blocking=True,
)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"present_value": 30.0})
assert cluster.write_attributes.mock_calls == [
call({"present_value": 30.0}, manufacturer=None)
]
cluster.PLUGGED_ATTR_READS["present_value"] = 30.0
# test rejoin
@ -198,7 +201,12 @@ async def test_number(
),
)
async def test_level_control_number(
hass: HomeAssistant, light, zha_device_joined, attr, initial_value, new_value
hass: HomeAssistant,
light: ZHADevice,
zha_device_joined,
attr: str,
initial_value: int,
new_value: int,
) -> None:
"""Test ZHA level control number entities - new join."""
@ -217,8 +225,7 @@ async def test_level_control_number(
)
assert entity_id is not None
assert level_control_cluster.read_attributes.call_count == 3
assert (
assert level_control_cluster.read_attributes.mock_calls == [
call(
[
"on_off_transition_time",
@ -230,21 +237,13 @@ async def test_level_control_number(
allow_cache=True,
only_cache=False,
manufacturer=None,
)
in level_control_cluster.read_attributes.call_args_list
)
assert (
),
call(
["start_up_current_level"],
allow_cache=True,
only_cache=False,
manufacturer=None,
)
in level_control_cluster.read_attributes.call_args_list
)
assert (
),
call(
[
"current_level",
@ -252,9 +251,8 @@ async def test_level_control_number(
allow_cache=False,
only_cache=False,
manufacturer=None,
)
in level_control_cluster.read_attributes.call_args_list
)
),
]
state = hass.states.get(entity_id)
assert state
@ -275,10 +273,9 @@ async def test_level_control_number(
blocking=True,
)
assert level_control_cluster.write_attributes.call_count == 1
assert level_control_cluster.write_attributes.call_args[0][0] == {
attr: new_value,
}
assert level_control_cluster.write_attributes.mock_calls == [
call({attr: new_value}, manufacturer=None)
]
state = hass.states.get(entity_id)
assert state
@ -293,36 +290,34 @@ async def test_level_control_number(
)
# the mocking doesn't update the attr cache so this flips back to initial value
assert hass.states.get(entity_id).state == str(initial_value)
assert level_control_cluster.read_attributes.call_count == 1
assert (
assert level_control_cluster.read_attributes.mock_calls == [
call(
[
attr,
],
[attr],
allow_cache=False,
only_cache=False,
manufacturer=None,
)
in level_control_cluster.read_attributes.call_args_list
)
]
level_control_cluster.write_attributes.reset_mock()
level_control_cluster.write_attributes.side_effect = ZigbeeException
await hass.services.async_call(
"number",
"set_value",
{
"entity_id": entity_id,
"value": new_value,
},
blocking=True,
)
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
"number",
"set_value",
{
"entity_id": entity_id,
"value": new_value,
},
blocking=True,
)
assert level_control_cluster.write_attributes.call_count == 1
assert level_control_cluster.write_attributes.call_args[0][0] == {
attr: new_value,
}
assert level_control_cluster.write_attributes.mock_calls == [
call({attr: new_value}, manufacturer=None),
call({attr: new_value}, manufacturer=None),
call({attr: new_value}, manufacturer=None),
]
assert hass.states.get(entity_id).state == str(initial_value)
@ -331,7 +326,12 @@ async def test_level_control_number(
(("start_up_color_temperature", 500, 350),),
)
async def test_color_number(
hass: HomeAssistant, light, zha_device_joined, attr, initial_value, new_value
hass: HomeAssistant,
light: ZHADevice,
zha_device_joined,
attr: str,
initial_value: int,
new_value: int,
) -> None:
"""Test ZHA color number entities - new join."""
@ -407,9 +407,7 @@ async def test_color_number(
assert color_cluster.read_attributes.call_count == 1
assert (
call(
[
attr,
],
[attr],
allow_cache=False,
only_cache=False,
manufacturer=None,
@ -420,18 +418,20 @@ async def test_color_number(
color_cluster.write_attributes.reset_mock()
color_cluster.write_attributes.side_effect = ZigbeeException
await hass.services.async_call(
"number",
"set_value",
{
"entity_id": entity_id,
"value": new_value,
},
blocking=True,
)
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
"number",
"set_value",
{
"entity_id": entity_id,
"value": new_value,
},
blocking=True,
)
assert color_cluster.write_attributes.call_count == 1
assert color_cluster.write_attributes.call_args[0][0] == {
attr: new_value,
}
assert color_cluster.write_attributes.mock_calls == [
call({attr: new_value}, manufacturer=None),
call({attr: new_value}, manufacturer=None),
call({attr: new_value}, manufacturer=None),
]
assert hass.states.get(entity_id).state == str(initial_value)

View File

@ -21,6 +21,7 @@ from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN
from homeassistant.components.zha.core.group import GroupMember
from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.setup import async_setup_component
from .common import (
@ -411,10 +412,11 @@ async def test_switch_configurable(
await hass.services.async_call(
SWITCH_DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True
)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call(
{"window_detection_function": True}
)
assert cluster.write_attributes.mock_calls == [
call({"window_detection_function": True}, manufacturer=None)
]
cluster.write_attributes.reset_mock()
# turn off from HA
with patch(
@ -425,10 +427,9 @@ async def test_switch_configurable(
await hass.services.async_call(
SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True
)
assert len(cluster.write_attributes.mock_calls) == 2
assert cluster.write_attributes.call_args == call(
{"window_detection_function": False}
)
assert cluster.write_attributes.mock_calls == [
call({"window_detection_function": False}, manufacturer=None)
]
cluster.read_attributes.reset_mock()
await async_setup_component(hass, "homeassistant", {})
@ -461,14 +462,18 @@ async def test_switch_configurable(
cluster.write_attributes.reset_mock()
cluster.write_attributes.side_effect = ZigbeeException
await hass.services.async_call(
SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True
)
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True
)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call(
{"window_detection_function": False}
)
assert cluster.write_attributes.mock_calls == [
call({"window_detection_function": False}, manufacturer=None),
call({"window_detection_function": False}, manufacturer=None),
call({"window_detection_function": False}, manufacturer=None),
]
cluster.write_attributes.side_effect = None
# test inverter
cluster.write_attributes.reset_mock()
@ -477,18 +482,17 @@ async def test_switch_configurable(
await hass.services.async_call(
SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True
)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call(
{"window_detection_function": True}
)
assert cluster.write_attributes.mock_calls == [
call({"window_detection_function": True}, manufacturer=None)
]
cluster.write_attributes.reset_mock()
await hass.services.async_call(
SWITCH_DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True
)
assert len(cluster.write_attributes.mock_calls) == 2
assert cluster.write_attributes.call_args == call(
{"window_detection_function": False}
)
assert cluster.write_attributes.mock_calls == [
call({"window_detection_function": False}, manufacturer=None)
]
# test joining a new switch to the network and HA
await async_test_rejoin(hass, zigpy_device_tuya, [cluster], (0,))