Add configurable zha switch entity (#71784)

* add configurable zha switch entity

* final zha configurable switch

* fix codecov

* replaced errorneous cluster with local quirk

* test fix

* minor changes
pull/72467/head
rforro 2022-05-25 01:43:35 +02:00 committed by GitHub
parent 6cac1dadeb
commit 0c2f22d478
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 314 additions and 1 deletions

View File

@ -2,8 +2,10 @@
from __future__ import annotations
import functools
from typing import Any
import logging
from typing import TYPE_CHECKING, Any
import zigpy.exceptions
from zigpy.zcl.clusters.general import OnOff
from zigpy.zcl.foundation import Status
@ -12,6 +14,7 @@ from homeassistant.config_entries import ConfigEntry
from homeassistant.const import STATE_ON, STATE_UNAVAILABLE, Platform
from homeassistant.core import HomeAssistant, State, callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .core import discovery
@ -24,8 +27,17 @@ from .core.const import (
from .core.registries import ZHA_ENTITIES
from .entity import ZhaEntity, ZhaGroupEntity
if TYPE_CHECKING:
from .core.channels.base import ZigbeeChannel
from .core.device import ZHADevice
STRICT_MATCH = functools.partial(ZHA_ENTITIES.strict_match, Platform.SWITCH)
GROUP_MATCH = functools.partial(ZHA_ENTITIES.group_match, Platform.SWITCH)
CONFIG_DIAGNOSTIC_MATCH = functools.partial(
ZHA_ENTITIES.config_diagnostic_match, Platform.SWITCH
)
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
@ -138,3 +150,118 @@ class SwitchGroup(ZhaGroupEntity, SwitchEntity):
self._state = len(on_states) > 0
self._available = any(state.state != STATE_UNAVAILABLE for state in states)
class ZHASwitchConfigurationEntity(ZhaEntity, SwitchEntity):
"""Representation of a ZHA switch configuration entity."""
_zcl_attribute: str
_zcl_inverter_attribute: str = ""
@classmethod
def create_entity(
cls,
unique_id: str,
zha_device: ZHADevice,
channels: list[ZigbeeChannel],
**kwargs,
) -> ZhaEntity | None:
"""Entity Factory.
Return entity if it is a supported configuration, otherwise return None
"""
channel = channels[0]
if (
cls._zcl_attribute in channel.cluster.unsupported_attributes
or channel.cluster.get(cls._zcl_attribute) is None
):
_LOGGER.debug(
"%s is not supported - skipping %s entity creation",
cls._zcl_attribute,
cls.__name__,
)
return None
return cls(unique_id, zha_device, channels, **kwargs)
def __init__(
self,
unique_id: str,
zha_device: ZHADevice,
channels: list[ZigbeeChannel],
**kwargs,
) -> None:
"""Init this number configuration entity."""
self._channel: ZigbeeChannel = channels[0]
super().__init__(unique_id, zha_device, channels, **kwargs)
async def async_added_to_hass(self) -> None:
"""Run when about to be added to hass."""
await super().async_added_to_hass()
self.async_accept_signal(
self._channel, SIGNAL_ATTR_UPDATED, self.async_set_state
)
@callback
def async_set_state(self, attr_id: int, attr_name: str, value: Any):
"""Handle state update from channel."""
self.async_write_ha_state()
@property
def is_on(self) -> bool:
"""Return if the switch is on based on the statemachine."""
val = bool(self._channel.cluster.get(self._zcl_attribute))
invert = bool(self._channel.cluster.get(self._zcl_inverter_attribute))
return (not val) if invert else val
async def async_turn_on_off(self, state) -> None:
"""Turn the entity on or off."""
try:
invert = bool(self._channel.cluster.get(self._zcl_inverter_attribute))
result = await self._channel.cluster.write_attributes(
{self._zcl_attribute: not state if invert else state}
)
except zigpy.exceptions.ZigbeeException as ex:
self.error("Could not set value: %s", ex)
return
if not isinstance(result, Exception) and all(
record.status == Status.SUCCESS for record in result[0]
):
self.async_write_ha_state()
async def async_turn_on(self, **kwargs) -> None:
"""Turn the entity on."""
await self.async_turn_on_off(True)
async def async_turn_off(self, **kwargs) -> None:
"""Turn the entity off."""
await self.async_turn_on_off(False)
async def async_update(self) -> None:
"""Attempt to retrieve the state of the entity."""
await super().async_update()
_LOGGER.error("Polling current state")
if self._channel:
value = await self._channel.get_attribute_value(
self._zcl_attribute, from_cache=False
)
invert = await self._channel.get_attribute_value(
self._zcl_inverter_attribute, from_cache=False
)
_LOGGER.debug("read value=%s, inverter=%s", value, bool(invert))
@CONFIG_DIAGNOSTIC_MATCH(
channel_names="tuya_manufacturer",
manufacturers={
"_TZE200_b6wax7g0",
},
)
class OnOffWindowDetectionFunctionConfigurationEntity(
ZHASwitchConfigurationEntity, id_suffix="on_off_window_opened_detection"
):
"""Representation of a ZHA on off transition time configuration entity."""
_attr_entity_category = EntityCategory.CONFIG
_zcl_attribute = "window_detection_function"
_zcl_inverter_attribute = "window_detection_function_inverter"

View File

@ -2,13 +2,25 @@
from unittest.mock import call, patch
import pytest
from zhaquirks.const import (
DEVICE_TYPE,
ENDPOINTS,
INPUT_CLUSTERS,
OUTPUT_CLUSTERS,
PROFILE_ID,
)
from zigpy.exceptions import ZigbeeException
import zigpy.profiles.zha as zha
from zigpy.quirks import CustomCluster, CustomDevice
import zigpy.types as t
import zigpy.zcl.clusters.general as general
from zigpy.zcl.clusters.manufacturer_specific import ManufacturerSpecificCluster
import zigpy.zcl.foundation as zcl_f
from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN
from homeassistant.components.zha.core.group import GroupMember
from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE, Platform
from homeassistant.setup import async_setup_component
from .common import (
async_enable_traffic,
@ -174,6 +186,61 @@ async def test_switch(hass, zha_device_joined_restored, zigpy_device):
await async_test_rejoin(hass, zigpy_device, [cluster], (1,))
class WindowDetectionFunctionQuirk(CustomDevice):
"""Quirk with window detection function attribute."""
class TuyaManufCluster(CustomCluster, ManufacturerSpecificCluster):
"""Tuya manufacturer specific cluster."""
cluster_id = 0xEF00
ep_attribute = "tuya_manufacturer"
attributes = {
0xEF01: ("window_detection_function", t.Bool),
0xEF02: ("window_detection_function_inverter", t.Bool),
}
def __init__(self, *args, **kwargs):
"""Initialize with task."""
super().__init__(*args, **kwargs)
self._attr_cache.update(
{0xEF01: False}
) # entity won't be created without this
replacement = {
ENDPOINTS: {
1: {
PROFILE_ID: zha.PROFILE_ID,
DEVICE_TYPE: zha.DeviceType.ON_OFF_SWITCH,
INPUT_CLUSTERS: [general.Basic.cluster_id, TuyaManufCluster],
OUTPUT_CLUSTERS: [],
},
}
}
@pytest.fixture
async def zigpy_device_tuya(hass, zigpy_device_mock, zha_device_joined):
"""Device tracker zigpy tuya device."""
zigpy_device = zigpy_device_mock(
{
1: {
SIG_EP_INPUT: [general.Basic.cluster_id],
SIG_EP_OUTPUT: [],
SIG_EP_TYPE: zha.DeviceType.ON_OFF_SWITCH,
}
},
manufacturer="_TZE200_b6wax7g0",
quirk=WindowDetectionFunctionQuirk,
)
zha_device = await zha_device_joined(zigpy_device)
zha_device.available = True
await hass.async_block_till_done()
return zigpy_device
@patch(
"homeassistant.components.zha.entity.UPDATE_GROUP_FROM_CHILD_DELAY",
new=0,
@ -292,3 +359,122 @@ async def test_zha_group_switch_entity(
# test that group light is now back on
assert hass.states.get(entity_id).state == STATE_ON
async def test_switch_configurable(hass, zha_device_joined_restored, zigpy_device_tuya):
"""Test zha configurable switch platform."""
zha_device = await zha_device_joined_restored(zigpy_device_tuya)
cluster = zigpy_device_tuya.endpoints.get(1).tuya_manufacturer
entity_id = await find_entity_id(Platform.SWITCH, zha_device, hass)
assert entity_id is not None
assert hass.states.get(entity_id).state == STATE_OFF
await async_enable_traffic(hass, [zha_device], enabled=False)
# test that the switch was created and that its state is unavailable
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, [zha_device])
# test that the state has changed from unavailable to off
assert hass.states.get(entity_id).state == STATE_OFF
# turn on at switch
await send_attributes_report(hass, cluster, {"window_detection_function": True})
assert hass.states.get(entity_id).state == STATE_ON
# turn off at switch
await send_attributes_report(hass, cluster, {"window_detection_function": False})
assert hass.states.get(entity_id).state == STATE_OFF
# turn on from HA
with patch(
"zigpy.zcl.Cluster.write_attributes",
return_value=mock_coro([zcl_f.Status.SUCCESS, zcl_f.Status.SUCCESS]),
):
# turn on via UI
await hass.services.async_call(
SWITCH_DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True
)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call(
{"window_detection_function": True}
)
# turn off from HA
with patch(
"zigpy.zcl.Cluster.write_attributes",
return_value=mock_coro([zcl_f.Status.SUCCESS, zcl_f.Status.SUCCESS]),
):
# turn off via UI
await hass.services.async_call(
SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True
)
assert len(cluster.write_attributes.mock_calls) == 2
assert cluster.write_attributes.call_args == call(
{"window_detection_function": False}
)
cluster.read_attributes.reset_mock()
await async_setup_component(hass, "homeassistant", {})
await hass.async_block_till_done()
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
)
# the mocking doesn't update the attr cache so this flips back to initial value
assert cluster.read_attributes.call_count == 2
assert [
call(
[
"window_detection_function",
],
allow_cache=False,
only_cache=False,
manufacturer=None,
),
call(
[
"window_detection_function_inverter",
],
allow_cache=False,
only_cache=False,
manufacturer=None,
),
] == cluster.read_attributes.call_args_list
cluster.write_attributes.reset_mock()
cluster.write_attributes.side_effect = ZigbeeException
await hass.services.async_call(
SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True
)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call(
{"window_detection_function": False}
)
# test inverter
cluster.write_attributes.reset_mock()
cluster._attr_cache.update({0xEF02: True})
await hass.services.async_call(
SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True
)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call(
{"window_detection_function": True}
)
await hass.services.async_call(
SWITCH_DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True
)
assert len(cluster.write_attributes.mock_calls) == 2
assert cluster.write_attributes.call_args == call(
{"window_detection_function": False}
)
# test joining a new switch to the network and HA
await async_test_rejoin(hass, zigpy_device_tuya, [cluster], (0,))