372 lines
11 KiB
Python
372 lines
11 KiB
Python
"""Test ZHA button."""
|
|
|
|
from typing import Final
|
|
from unittest.mock import call, patch
|
|
|
|
from freezegun import freeze_time
|
|
import pytest
|
|
from zhaquirks.const import (
|
|
DEVICE_TYPE,
|
|
ENDPOINTS,
|
|
INPUT_CLUSTERS,
|
|
OUTPUT_CLUSTERS,
|
|
PROFILE_ID,
|
|
)
|
|
from zhaquirks.tuya.ts0601_valve import ParksideTuyaValveManufCluster
|
|
from zigpy.const import SIG_EP_PROFILE
|
|
from zigpy.exceptions import ZigbeeException
|
|
import zigpy.profiles.zha as zha
|
|
from zigpy.quirks import CustomCluster, CustomDevice
|
|
from zigpy.quirks.v2 import add_to_registry_v2
|
|
import zigpy.types as t
|
|
import zigpy.zcl.clusters.general as general
|
|
from zigpy.zcl.clusters.manufacturer_specific import ManufacturerSpecificCluster
|
|
import zigpy.zcl.clusters.security as security
|
|
import zigpy.zcl.foundation as zcl_f
|
|
|
|
from homeassistant.components.button import DOMAIN, SERVICE_PRESS, ButtonDeviceClass
|
|
from homeassistant.const import (
|
|
ATTR_DEVICE_CLASS,
|
|
ATTR_ENTITY_ID,
|
|
STATE_UNKNOWN,
|
|
EntityCategory,
|
|
Platform,
|
|
)
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.exceptions import HomeAssistantError
|
|
from homeassistant.helpers import entity_registry as er
|
|
|
|
from .common import find_entity_id, update_attribute_cache
|
|
from .conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_TYPE
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def button_platform_only():
|
|
"""Only set up the button and required base platforms to speed up tests."""
|
|
with patch(
|
|
"homeassistant.components.zha.PLATFORMS",
|
|
(
|
|
Platform.BINARY_SENSOR,
|
|
Platform.BUTTON,
|
|
Platform.DEVICE_TRACKER,
|
|
Platform.NUMBER,
|
|
Platform.SELECT,
|
|
Platform.SENSOR,
|
|
Platform.SWITCH,
|
|
),
|
|
):
|
|
yield
|
|
|
|
|
|
@pytest.fixture
|
|
async def contact_sensor(
|
|
hass: HomeAssistant, zigpy_device_mock, zha_device_joined_restored
|
|
):
|
|
"""Contact sensor fixture."""
|
|
|
|
zigpy_device = zigpy_device_mock(
|
|
{
|
|
1: {
|
|
SIG_EP_INPUT: [
|
|
general.Basic.cluster_id,
|
|
general.Identify.cluster_id,
|
|
security.IasZone.cluster_id,
|
|
],
|
|
SIG_EP_OUTPUT: [],
|
|
SIG_EP_TYPE: zha.DeviceType.IAS_ZONE,
|
|
SIG_EP_PROFILE: zha.PROFILE_ID,
|
|
}
|
|
},
|
|
)
|
|
|
|
zha_device = await zha_device_joined_restored(zigpy_device)
|
|
return zha_device, zigpy_device.endpoints[1].identify
|
|
|
|
|
|
class FrostLockQuirk(CustomDevice):
|
|
"""Quirk with frost lock attribute."""
|
|
|
|
class TuyaManufCluster(CustomCluster, ManufacturerSpecificCluster):
|
|
"""Tuya manufacturer specific cluster."""
|
|
|
|
cluster_id = 0xEF00
|
|
ep_attribute = "tuya_manufacturer"
|
|
|
|
attributes = {0xEF01: ("frost_lock_reset", t.Bool)}
|
|
|
|
replacement = {
|
|
ENDPOINTS: {
|
|
1: {
|
|
PROFILE_ID: zha.PROFILE_ID,
|
|
DEVICE_TYPE: zha.DeviceType.ON_OFF_SWITCH,
|
|
INPUT_CLUSTERS: [general.Basic.cluster_id, TuyaManufCluster],
|
|
OUTPUT_CLUSTERS: [],
|
|
},
|
|
}
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
async def tuya_water_valve(
|
|
hass: HomeAssistant, zigpy_device_mock, zha_device_joined_restored
|
|
):
|
|
"""Tuya Water Valve fixture."""
|
|
|
|
zigpy_device = zigpy_device_mock(
|
|
{
|
|
1: {
|
|
PROFILE_ID: zha.PROFILE_ID,
|
|
DEVICE_TYPE: zha.DeviceType.ON_OFF_SWITCH,
|
|
INPUT_CLUSTERS: [
|
|
general.Basic.cluster_id,
|
|
general.Identify.cluster_id,
|
|
general.Groups.cluster_id,
|
|
general.Scenes.cluster_id,
|
|
general.OnOff.cluster_id,
|
|
ParksideTuyaValveManufCluster.cluster_id,
|
|
],
|
|
OUTPUT_CLUSTERS: [general.Time.cluster_id, general.Ota.cluster_id],
|
|
},
|
|
},
|
|
manufacturer="_TZE200_htnnfasr",
|
|
model="TS0601",
|
|
)
|
|
|
|
zha_device = await zha_device_joined_restored(zigpy_device)
|
|
return zha_device, zigpy_device.endpoints[1].tuya_manufacturer
|
|
|
|
|
|
@freeze_time("2021-11-04 17:37:00", tz_offset=-1)
|
|
async def test_button(hass: HomeAssistant, contact_sensor) -> None:
|
|
"""Test ZHA button platform."""
|
|
|
|
entity_registry = er.async_get(hass)
|
|
zha_device, cluster = contact_sensor
|
|
assert cluster is not None
|
|
entity_id = find_entity_id(DOMAIN, zha_device, hass)
|
|
assert entity_id is not None
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state
|
|
assert state.state == STATE_UNKNOWN
|
|
assert state.attributes[ATTR_DEVICE_CLASS] == ButtonDeviceClass.IDENTIFY
|
|
|
|
entry = entity_registry.async_get(entity_id)
|
|
assert entry
|
|
assert entry.entity_category == EntityCategory.DIAGNOSTIC
|
|
|
|
with patch(
|
|
"zigpy.zcl.Cluster.request",
|
|
return_value=[0x00, zcl_f.Status.SUCCESS],
|
|
):
|
|
await hass.services.async_call(
|
|
DOMAIN,
|
|
SERVICE_PRESS,
|
|
{ATTR_ENTITY_ID: entity_id},
|
|
blocking=True,
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(cluster.request.mock_calls) == 1
|
|
assert cluster.request.call_args[0][0] is False
|
|
assert cluster.request.call_args[0][1] == 0
|
|
assert cluster.request.call_args[0][3] == 5 # duration in seconds
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state
|
|
assert state.state == "2021-11-04T16:37:00+00:00"
|
|
assert state.attributes[ATTR_DEVICE_CLASS] == ButtonDeviceClass.IDENTIFY
|
|
|
|
|
|
async def test_frost_unlock(hass: HomeAssistant, tuya_water_valve) -> None:
|
|
"""Test custom frost unlock ZHA button."""
|
|
|
|
entity_registry = er.async_get(hass)
|
|
zha_device, cluster = tuya_water_valve
|
|
assert cluster is not None
|
|
entity_id = find_entity_id(DOMAIN, zha_device, hass, qualifier="frost_lock_reset")
|
|
assert entity_id is not None
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state
|
|
assert state.state == STATE_UNKNOWN
|
|
assert state.attributes[ATTR_DEVICE_CLASS] == ButtonDeviceClass.RESTART
|
|
|
|
entry = entity_registry.async_get(entity_id)
|
|
assert entry
|
|
assert entry.entity_category == EntityCategory.CONFIG
|
|
|
|
with patch(
|
|
"zigpy.zcl.Cluster.request",
|
|
return_value=[0x00, zcl_f.Status.SUCCESS],
|
|
):
|
|
await hass.services.async_call(
|
|
DOMAIN,
|
|
SERVICE_PRESS,
|
|
{ATTR_ENTITY_ID: entity_id},
|
|
blocking=True,
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert cluster.write_attributes.mock_calls == [
|
|
call({"frost_lock_reset": 0}, manufacturer=None)
|
|
]
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state
|
|
assert state.attributes[ATTR_DEVICE_CLASS] == ButtonDeviceClass.RESTART
|
|
|
|
cluster.write_attributes.reset_mock()
|
|
cluster.write_attributes.side_effect = ZigbeeException
|
|
|
|
with pytest.raises(HomeAssistantError):
|
|
await hass.services.async_call(
|
|
DOMAIN,
|
|
SERVICE_PRESS,
|
|
{ATTR_ENTITY_ID: entity_id},
|
|
blocking=True,
|
|
)
|
|
|
|
# There are three retries
|
|
assert cluster.write_attributes.mock_calls == [
|
|
call({"frost_lock_reset": 0}, manufacturer=None),
|
|
call({"frost_lock_reset": 0}, manufacturer=None),
|
|
call({"frost_lock_reset": 0}, manufacturer=None),
|
|
]
|
|
|
|
|
|
class FakeManufacturerCluster(CustomCluster, ManufacturerSpecificCluster):
|
|
"""Fake manufacturer cluster."""
|
|
|
|
cluster_id: Final = 0xFFF3
|
|
ep_attribute: Final = "mfg_identify"
|
|
|
|
class AttributeDefs(zcl_f.BaseAttributeDefs):
|
|
"""Attribute definitions."""
|
|
|
|
feed: Final = zcl_f.ZCLAttributeDef(
|
|
id=0x0000, type=t.uint8_t, access="rw", is_manufacturer_specific=True
|
|
)
|
|
|
|
class ServerCommandDefs(zcl_f.BaseCommandDefs):
|
|
"""Server command definitions."""
|
|
|
|
self_test: Final = zcl_f.ZCLCommandDef(
|
|
id=0x00, schema={"identify_time": t.uint16_t}, direction=False
|
|
)
|
|
|
|
|
|
(
|
|
add_to_registry_v2("Fake_Model", "Fake_Manufacturer")
|
|
.replaces(FakeManufacturerCluster)
|
|
.command_button(
|
|
FakeManufacturerCluster.ServerCommandDefs.self_test.name,
|
|
FakeManufacturerCluster.cluster_id,
|
|
command_args=(5,),
|
|
)
|
|
.write_attr_button(
|
|
FakeManufacturerCluster.AttributeDefs.feed.name,
|
|
2,
|
|
FakeManufacturerCluster.cluster_id,
|
|
)
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
async def custom_button_device(
|
|
hass: HomeAssistant, zigpy_device_mock, zha_device_joined_restored
|
|
):
|
|
"""Button device fixture for quirks button tests."""
|
|
|
|
zigpy_device = zigpy_device_mock(
|
|
{
|
|
1: {
|
|
SIG_EP_INPUT: [
|
|
general.Basic.cluster_id,
|
|
FakeManufacturerCluster.cluster_id,
|
|
],
|
|
SIG_EP_OUTPUT: [],
|
|
SIG_EP_TYPE: zha.DeviceType.REMOTE_CONTROL,
|
|
SIG_EP_PROFILE: zha.PROFILE_ID,
|
|
}
|
|
},
|
|
manufacturer="Fake_Model",
|
|
model="Fake_Manufacturer",
|
|
)
|
|
|
|
zigpy_device.endpoints[1].mfg_identify.PLUGGED_ATTR_READS = {
|
|
FakeManufacturerCluster.AttributeDefs.feed.name: 0,
|
|
}
|
|
update_attribute_cache(zigpy_device.endpoints[1].mfg_identify)
|
|
zha_device = await zha_device_joined_restored(zigpy_device)
|
|
return zha_device, zigpy_device.endpoints[1].mfg_identify
|
|
|
|
|
|
@freeze_time("2021-11-04 17:37:00", tz_offset=-1)
|
|
async def test_quirks_command_button(hass: HomeAssistant, custom_button_device) -> None:
|
|
"""Test ZHA button platform."""
|
|
|
|
zha_device, cluster = custom_button_device
|
|
assert cluster is not None
|
|
entity_id = find_entity_id(DOMAIN, zha_device, hass, qualifier="self_test")
|
|
assert entity_id is not None
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state
|
|
assert state.state == STATE_UNKNOWN
|
|
|
|
with patch(
|
|
"zigpy.zcl.Cluster.request",
|
|
return_value=[0x00, zcl_f.Status.SUCCESS],
|
|
):
|
|
await hass.services.async_call(
|
|
DOMAIN,
|
|
SERVICE_PRESS,
|
|
{ATTR_ENTITY_ID: entity_id},
|
|
blocking=True,
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert len(cluster.request.mock_calls) == 1
|
|
assert cluster.request.call_args[0][0] is False
|
|
assert cluster.request.call_args[0][1] == 0
|
|
assert cluster.request.call_args[0][3] == 5 # duration in seconds
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state
|
|
assert state.state == "2021-11-04T16:37:00+00:00"
|
|
|
|
|
|
@freeze_time("2021-11-04 17:37:00", tz_offset=-1)
|
|
async def test_quirks_write_attr_button(
|
|
hass: HomeAssistant, custom_button_device
|
|
) -> None:
|
|
"""Test ZHA button platform."""
|
|
|
|
zha_device, cluster = custom_button_device
|
|
assert cluster is not None
|
|
entity_id = find_entity_id(DOMAIN, zha_device, hass, qualifier="feed")
|
|
assert entity_id is not None
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state
|
|
assert state.state == STATE_UNKNOWN
|
|
assert cluster.get(cluster.AttributeDefs.feed.name) == 0
|
|
|
|
with patch(
|
|
"zigpy.zcl.Cluster.request",
|
|
return_value=[0x00, zcl_f.Status.SUCCESS],
|
|
):
|
|
await hass.services.async_call(
|
|
DOMAIN,
|
|
SERVICE_PRESS,
|
|
{ATTR_ENTITY_ID: entity_id},
|
|
blocking=True,
|
|
)
|
|
await hass.async_block_till_done()
|
|
assert cluster.write_attributes.mock_calls == [
|
|
call({cluster.AttributeDefs.feed.name: 2}, manufacturer=None)
|
|
]
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state
|
|
assert state.state == "2021-11-04T16:37:00+00:00"
|
|
assert cluster.get(cluster.AttributeDefs.feed.name) == 2
|