* Add support for DoorLock cluster * Add test for zha lock * Change lock_state report to REPORT_CONFIG_IMMEDIATE * Update channel command wrapper to return the entire result This allows for return values other than result[1] * Fix tests * Fix lint * Update DoorLock test to work with updated zigpy schema * Fix lint * Fix unlock testpull/24377/head
parent
592d30d495
commit
cb460a85ba
|
@ -44,7 +44,6 @@ def decorate_command(channel, command):
|
|||
"""Wrap a cluster command to make it safe."""
|
||||
@wraps(command)
|
||||
async def wrapper(*args, **kwds):
|
||||
from zigpy.zcl.foundation import Status
|
||||
from zigpy.exceptions import DeliveryError
|
||||
try:
|
||||
result = await command(*args, **kwds)
|
||||
|
@ -54,9 +53,8 @@ def decorate_command(channel, command):
|
|||
"{}: {}".format("with args", args),
|
||||
"{}: {}".format("with kwargs", kwds),
|
||||
"{}: {}".format("and result", result))
|
||||
if isinstance(result, bool):
|
||||
return result
|
||||
return result[1] is Status.SUCCESS
|
||||
return result
|
||||
|
||||
except (DeliveryError, Timeout) as ex:
|
||||
_LOGGER.debug(
|
||||
"%s: command failed: %s exception: %s",
|
||||
|
@ -64,7 +62,7 @@ def decorate_command(channel, command):
|
|||
command.__name__,
|
||||
str(ex)
|
||||
)
|
||||
return False
|
||||
return ex
|
||||
return wrapper
|
||||
|
||||
|
||||
|
|
|
@ -5,5 +5,44 @@ For more details about this component, please refer to the documentation at
|
|||
https://home-assistant.io/components/zha/
|
||||
"""
|
||||
import logging
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
||||
from . import ZigbeeChannel
|
||||
from ..const import SIGNAL_ATTR_UPDATED
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DoorLockChannel(ZigbeeChannel):
|
||||
"""Door lock channel."""
|
||||
|
||||
_value_attribute = 0
|
||||
|
||||
async def async_update(self):
|
||||
"""Retrieve latest state."""
|
||||
result = await self.get_attribute_value('lock_state', from_cache=True)
|
||||
|
||||
async_dispatcher_send(
|
||||
self._zha_device.hass,
|
||||
"{}_{}".format(self.unique_id, SIGNAL_ATTR_UPDATED),
|
||||
result
|
||||
)
|
||||
|
||||
@callback
|
||||
def attribute_updated(self, attrid, value):
|
||||
"""Handle attribute update from lock cluster."""
|
||||
attr_name = self.cluster.attributes.get(attrid, [attrid])[0]
|
||||
_LOGGER.debug("%s: Attribute report '%s'[%s] = %s",
|
||||
self.unique_id, self.cluster.name, attr_name, value)
|
||||
if attrid == self._value_attribute:
|
||||
async_dispatcher_send(
|
||||
self._zha_device.hass,
|
||||
"{}_{}".format(self.unique_id, SIGNAL_ATTR_UPDATED),
|
||||
value
|
||||
)
|
||||
|
||||
async def async_initialize(self, from_cache):
|
||||
"""Initialize channel."""
|
||||
await self.get_attribute_value(
|
||||
self._value_attribute, from_cache=from_cache)
|
||||
await super().async_initialize(from_cache)
|
||||
|
|
|
@ -5,6 +5,8 @@ For more details about this component, please refer to the documentation at
|
|||
https://home-assistant.io/components/zha/
|
||||
"""
|
||||
from . import ZigbeeChannel
|
||||
|
||||
from .closures import DoorLockChannel
|
||||
from .general import (
|
||||
OnOffChannel, LevelControlChannel, PowerConfigurationChannel, BasicChannel
|
||||
)
|
||||
|
@ -13,7 +15,6 @@ from .hvac import FanChannel
|
|||
from .lighting import ColorChannel
|
||||
from .security import IASZoneChannel
|
||||
|
||||
|
||||
ZIGBEE_CHANNEL_REGISTRY = {}
|
||||
|
||||
|
||||
|
@ -44,4 +45,5 @@ def populate_channel_registry():
|
|||
zcl.clusters.security.IasZone.cluster_id: IASZoneChannel,
|
||||
zcl.clusters.hvac.Fan.cluster_id: FanChannel,
|
||||
zcl.clusters.lightlink.LightLink.cluster_id: ZigbeeChannel,
|
||||
zcl.clusters.closures.DoorLock.cluster_id: DoorLockChannel,
|
||||
})
|
||||
|
|
|
@ -5,6 +5,7 @@ import logging
|
|||
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR
|
||||
from homeassistant.components.fan import DOMAIN as FAN
|
||||
from homeassistant.components.light import DOMAIN as LIGHT
|
||||
from homeassistant.components.lock import DOMAIN as LOCK
|
||||
from homeassistant.components.sensor import DOMAIN as SENSOR
|
||||
from homeassistant.components.switch import DOMAIN as SWITCH
|
||||
|
||||
|
@ -27,6 +28,7 @@ COMPONENTS = (
|
|||
BINARY_SENSOR,
|
||||
FAN,
|
||||
LIGHT,
|
||||
LOCK,
|
||||
SENSOR,
|
||||
SWITCH,
|
||||
)
|
||||
|
@ -92,6 +94,7 @@ ZONE_CHANNEL = ZONE = 'ias_zone'
|
|||
ELECTRICAL_MEASUREMENT_CHANNEL = 'electrical_measurement'
|
||||
POWER_CONFIGURATION_CHANNEL = 'power'
|
||||
EVENT_RELAY_CHANNEL = 'event_relay'
|
||||
DOORLOCK_CHANNEL = 'door_lock'
|
||||
|
||||
SIGNAL_ATTR_UPDATED = 'attribute_updated'
|
||||
SIGNAL_MOVE_LEVEL = "move_level"
|
||||
|
|
|
@ -8,6 +8,7 @@ https://home-assistant.io/components/zha/
|
|||
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR
|
||||
from homeassistant.components.fan import DOMAIN as FAN
|
||||
from homeassistant.components.light import DOMAIN as LIGHT
|
||||
from homeassistant.components.lock import DOMAIN as LOCK
|
||||
from homeassistant.components.sensor import DOMAIN as SENSOR
|
||||
from homeassistant.components.switch import DOMAIN as SWITCH
|
||||
|
||||
|
@ -143,7 +144,8 @@ def establish_device_mappings():
|
|||
zcl.clusters.hvac.Fan: FAN,
|
||||
SMARTTHINGS_ACCELERATION_CLUSTER: BINARY_SENSOR,
|
||||
zcl.clusters.general.MultistateInput.cluster_id: SENSOR,
|
||||
zcl.clusters.general.AnalogInput.cluster_id: SENSOR
|
||||
zcl.clusters.general.AnalogInput.cluster_id: SENSOR,
|
||||
zcl.clusters.closures.DoorLock: LOCK
|
||||
})
|
||||
|
||||
SINGLE_OUTPUT_CLUSTER_DEVICE_CLASS.update({
|
||||
|
@ -271,6 +273,10 @@ def establish_device_mappings():
|
|||
'attr': 'fan_mode',
|
||||
'config': REPORT_CONFIG_OP
|
||||
}],
|
||||
zcl.clusters.closures.DoorLock.cluster_id: [{
|
||||
'attr': 'lock_state',
|
||||
'config': REPORT_CONFIG_IMMEDIATE
|
||||
}],
|
||||
})
|
||||
|
||||
BINARY_SENSOR_CLUSTERS.add(zcl.clusters.general.OnOff.cluster_id)
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
from datetime import timedelta
|
||||
import logging
|
||||
|
||||
from zigpy.zcl.foundation import Status
|
||||
from homeassistant.components import light
|
||||
from homeassistant.const import STATE_ON
|
||||
from homeassistant.core import callback
|
||||
|
@ -14,7 +15,6 @@ from .const import (
|
|||
)
|
||||
from .entity import ZhaEntity
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_DURATION = 5
|
||||
|
@ -173,12 +173,12 @@ class Light(ZhaEntity, light.Light):
|
|||
level = min(254, brightness)
|
||||
else:
|
||||
level = self._brightness or 254
|
||||
success = await self._level_channel.move_to_level_with_on_off(
|
||||
result = await self._level_channel.move_to_level_with_on_off(
|
||||
level,
|
||||
duration
|
||||
)
|
||||
t_log['move_to_level_with_on_off'] = success
|
||||
if not success:
|
||||
t_log['move_to_level_with_on_off'] = result
|
||||
if not isinstance(result, list) or result[1] is not Status.SUCCESS:
|
||||
self.debug("turned on: %s", t_log)
|
||||
return
|
||||
self._state = bool(level)
|
||||
|
@ -186,9 +186,9 @@ class Light(ZhaEntity, light.Light):
|
|||
self._brightness = level
|
||||
|
||||
if brightness is None or brightness:
|
||||
success = await self._on_off_channel.on()
|
||||
t_log['on_off'] = success
|
||||
if not success:
|
||||
result = await self._on_off_channel.on()
|
||||
t_log['on_off'] = result
|
||||
if not isinstance(result, list) or result[1] is not Status.SUCCESS:
|
||||
self.debug("turned on: %s", t_log)
|
||||
return
|
||||
self._state = True
|
||||
|
@ -196,10 +196,10 @@ class Light(ZhaEntity, light.Light):
|
|||
if light.ATTR_COLOR_TEMP in kwargs and \
|
||||
self.supported_features & light.SUPPORT_COLOR_TEMP:
|
||||
temperature = kwargs[light.ATTR_COLOR_TEMP]
|
||||
success = await self._color_channel.move_to_color_temp(
|
||||
result = await self._color_channel.move_to_color_temp(
|
||||
temperature, duration)
|
||||
t_log['move_to_color_temp'] = success
|
||||
if not success:
|
||||
t_log['move_to_color_temp'] = result
|
||||
if not isinstance(result, list) or result[1] is not Status.SUCCESS:
|
||||
self.debug("turned on: %s", t_log)
|
||||
return
|
||||
self._color_temp = temperature
|
||||
|
@ -208,13 +208,13 @@ class Light(ZhaEntity, light.Light):
|
|||
self.supported_features & light.SUPPORT_COLOR:
|
||||
hs_color = kwargs[light.ATTR_HS_COLOR]
|
||||
xy_color = color_util.color_hs_to_xy(*hs_color)
|
||||
success = await self._color_channel.move_to_color(
|
||||
result = await self._color_channel.move_to_color(
|
||||
int(xy_color[0] * 65535),
|
||||
int(xy_color[1] * 65535),
|
||||
duration,
|
||||
)
|
||||
t_log['move_to_color'] = success
|
||||
if not success:
|
||||
t_log['move_to_color'] = result
|
||||
if not isinstance(result, list) or result[1] is not Status.SUCCESS:
|
||||
self.debug("turned on: %s", t_log)
|
||||
return
|
||||
self._hs_color = hs_color
|
||||
|
@ -227,14 +227,14 @@ class Light(ZhaEntity, light.Light):
|
|||
duration = kwargs.get(light.ATTR_TRANSITION)
|
||||
supports_level = self.supported_features & light.SUPPORT_BRIGHTNESS
|
||||
if duration and supports_level:
|
||||
success = await self._level_channel.move_to_level_with_on_off(
|
||||
result = await self._level_channel.move_to_level_with_on_off(
|
||||
0,
|
||||
duration*10
|
||||
)
|
||||
else:
|
||||
success = await self._on_off_channel.off()
|
||||
self.debug("turned off: %s", success)
|
||||
if not success:
|
||||
result = await self._on_off_channel.off()
|
||||
self.debug("turned off: %s", result)
|
||||
if not isinstance(result, list) or result[1] is not Status.SUCCESS:
|
||||
return
|
||||
self._state = False
|
||||
self.async_schedule_update_ha_state()
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
"""Locks on Zigbee Home Automation networks."""
|
||||
import logging
|
||||
|
||||
from zigpy.zcl.foundation import Status
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.components.lock import (
|
||||
DOMAIN, STATE_UNLOCKED, STATE_LOCKED, LockDevice)
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||
from .core.const import (
|
||||
DATA_ZHA, DATA_ZHA_DISPATCHERS, ZHA_DISCOVERY_NEW, DOORLOCK_CHANNEL,
|
||||
SIGNAL_ATTR_UPDATED
|
||||
)
|
||||
from .entity import ZhaEntity
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
""" The first state is Zigbee 'Not fully locked' """
|
||||
|
||||
STATE_LIST = [
|
||||
STATE_UNLOCKED,
|
||||
STATE_LOCKED,
|
||||
STATE_UNLOCKED
|
||||
]
|
||||
|
||||
VALUE_TO_STATE = {i: state for i, state in enumerate(STATE_LIST)}
|
||||
|
||||
|
||||
async def async_setup_platform(hass, config, async_add_entities,
|
||||
discovery_info=None):
|
||||
"""Old way of setting up Zigbee Home Automation locks."""
|
||||
pass
|
||||
|
||||
|
||||
async def async_setup_entry(hass, config_entry, async_add_entities):
|
||||
"""Set up the Zigbee Home Automation Door Lock from config entry."""
|
||||
async def async_discover(discovery_info):
|
||||
await _async_setup_entities(hass, config_entry, async_add_entities,
|
||||
[discovery_info])
|
||||
|
||||
unsub = async_dispatcher_connect(
|
||||
hass, ZHA_DISCOVERY_NEW.format(DOMAIN), async_discover)
|
||||
hass.data[DATA_ZHA][DATA_ZHA_DISPATCHERS].append(unsub)
|
||||
|
||||
locks = hass.data.get(DATA_ZHA, {}).get(DOMAIN)
|
||||
if locks is not None:
|
||||
await _async_setup_entities(hass, config_entry, async_add_entities,
|
||||
locks.values())
|
||||
del hass.data[DATA_ZHA][DOMAIN]
|
||||
|
||||
|
||||
async def _async_setup_entities(hass, config_entry, async_add_entities,
|
||||
discovery_infos):
|
||||
"""Set up the ZHA locks."""
|
||||
entities = []
|
||||
for discovery_info in discovery_infos:
|
||||
entities.append(ZhaDoorLock(**discovery_info))
|
||||
|
||||
async_add_entities(entities, update_before_add=True)
|
||||
|
||||
|
||||
class ZhaDoorLock(ZhaEntity, LockDevice):
|
||||
"""Representation of a ZHA lock."""
|
||||
|
||||
_domain = DOMAIN
|
||||
|
||||
def __init__(self, unique_id, zha_device, channels, **kwargs):
|
||||
"""Init this sensor."""
|
||||
super().__init__(unique_id, zha_device, channels, **kwargs)
|
||||
self._doorlock_channel = self.cluster_channels.get(DOORLOCK_CHANNEL)
|
||||
|
||||
async def async_added_to_hass(self):
|
||||
"""Run when about to be added to hass."""
|
||||
await super().async_added_to_hass()
|
||||
await self.async_accept_signal(
|
||||
self._doorlock_channel, SIGNAL_ATTR_UPDATED, self.async_set_state)
|
||||
|
||||
@callback
|
||||
def async_restore_last_state(self, last_state):
|
||||
"""Restore previous state."""
|
||||
self._state = VALUE_TO_STATE.get(last_state.state, last_state.state)
|
||||
|
||||
@property
|
||||
def is_locked(self) -> bool:
|
||||
"""Return true if entity is locked."""
|
||||
if self._state is None:
|
||||
return False
|
||||
return self._state == STATE_LOCKED
|
||||
|
||||
@property
|
||||
def device_state_attributes(self):
|
||||
"""Return state attributes."""
|
||||
return self.state_attributes
|
||||
|
||||
async def async_lock(self, **kwargs):
|
||||
"""Lock the lock."""
|
||||
result = await self._doorlock_channel.lock_door()
|
||||
if not isinstance(result, list) or result[0] is not Status.SUCCESS:
|
||||
_LOGGER.error("Error with lock_door: %s", result)
|
||||
return
|
||||
self.async_schedule_update_ha_state()
|
||||
|
||||
async def async_unlock(self, **kwargs):
|
||||
"""Unlock the lock."""
|
||||
result = await self._doorlock_channel.unlock_door()
|
||||
if not isinstance(result, list) or result[0] is not Status.SUCCESS:
|
||||
_LOGGER.error("Error with unlock_door: %s", result)
|
||||
return
|
||||
self.async_schedule_update_ha_state()
|
||||
|
||||
async def async_update(self):
|
||||
"""Attempt to retrieve state from the lock."""
|
||||
await super().async_update()
|
||||
await self.async_get_state()
|
||||
|
||||
def async_set_state(self, state):
|
||||
"""Handle state update from channel."""
|
||||
self._state = VALUE_TO_STATE.get(state, self._state)
|
||||
self.async_schedule_update_ha_state()
|
||||
|
||||
async def async_get_state(self, from_cache=True):
|
||||
"""Attempt to retrieve state from the lock."""
|
||||
if self._doorlock_channel:
|
||||
state = await self._doorlock_channel.get_attribute_value(
|
||||
'lock_state', from_cache=from_cache)
|
||||
if state is not None:
|
||||
self._state = VALUE_TO_STATE.get(state, self._state)
|
||||
|
||||
async def refresh(self, time):
|
||||
"""Call async_get_state at an interval."""
|
||||
await self.async_get_state(from_cache=False)
|
||||
|
||||
def debug(self, msg, *args):
|
||||
"""Log debug message."""
|
||||
_LOGGER.debug('%s: ' + msg, self.entity_id, *args)
|
|
@ -1,6 +1,7 @@
|
|||
"""Switches on Zigbee Home Automation networks."""
|
||||
import logging
|
||||
|
||||
from zigpy.zcl.foundation import Status
|
||||
from homeassistant.components.switch import DOMAIN, SwitchDevice
|
||||
from homeassistant.const import STATE_ON
|
||||
from homeassistant.core import callback
|
||||
|
@ -66,16 +67,16 @@ class Switch(ZhaEntity, SwitchDevice):
|
|||
|
||||
async def async_turn_on(self, **kwargs):
|
||||
"""Turn the entity on."""
|
||||
success = await self._on_off_channel.on()
|
||||
if not success:
|
||||
result = await self._on_off_channel.on()
|
||||
if not isinstance(result, list) or result[1] is not Status.SUCCESS:
|
||||
return
|
||||
self._state = True
|
||||
self.async_schedule_update_ha_state()
|
||||
|
||||
async def async_turn_off(self, **kwargs):
|
||||
"""Turn the entity off."""
|
||||
success = await self._on_off_channel.off()
|
||||
if not success:
|
||||
result = await self._on_off_channel.off()
|
||||
if not isinstance(result, list) or result[1] is not Status.SUCCESS:
|
||||
return
|
||||
self._state = False
|
||||
self.async_schedule_update_ha_state()
|
||||
|
|
|
@ -57,9 +57,9 @@ async def test_light(hass, config_entry, zha_gateway, monkeypatch):
|
|||
level_device_on_off_cluster = zigpy_device_level.endpoints.get(1).on_off
|
||||
level_device_level_cluster = zigpy_device_level.endpoints.get(1).level
|
||||
on_off_mock = MagicMock(side_effect=asyncio.coroutine(MagicMock(
|
||||
return_value=(sentinel.data, Status.SUCCESS))))
|
||||
return_value=[sentinel.data, Status.SUCCESS])))
|
||||
level_mock = MagicMock(side_effect=asyncio.coroutine(MagicMock(
|
||||
return_value=(sentinel.data, Status.SUCCESS))))
|
||||
return_value=[sentinel.data, Status.SUCCESS])))
|
||||
monkeypatch.setattr(level_device_on_off_cluster, 'request', on_off_mock)
|
||||
monkeypatch.setattr(level_device_level_cluster, 'request', level_mock)
|
||||
level_entity_id = make_entity_id(DOMAIN, zigpy_device_level,
|
||||
|
@ -137,7 +137,7 @@ async def async_test_on_off_from_hass(hass, cluster, entity_id):
|
|||
from zigpy.zcl.foundation import Status
|
||||
with patch(
|
||||
'zigpy.zcl.Cluster.request',
|
||||
return_value=mock_coro([Status.SUCCESS, Status.SUCCESS])):
|
||||
return_value=mock_coro([0x00, Status.SUCCESS])):
|
||||
# turn on via UI
|
||||
await hass.services.async_call(DOMAIN, 'turn_on', {
|
||||
'entity_id': entity_id
|
||||
|
@ -154,7 +154,7 @@ async def async_test_off_from_hass(hass, cluster, entity_id):
|
|||
from zigpy.zcl.foundation import Status
|
||||
with patch(
|
||||
'zigpy.zcl.Cluster.request',
|
||||
return_value=mock_coro([Status.SUCCESS, Status.SUCCESS])):
|
||||
return_value=mock_coro([0x01, Status.SUCCESS])):
|
||||
# turn off via UI
|
||||
await hass.services.async_call(DOMAIN, 'turn_off', {
|
||||
'entity_id': entity_id
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
"""Test zha lock."""
|
||||
from unittest.mock import patch
|
||||
from homeassistant.const import (
|
||||
STATE_LOCKED, STATE_UNLOCKED, STATE_UNAVAILABLE)
|
||||
from homeassistant.components.lock import DOMAIN
|
||||
from tests.common import mock_coro
|
||||
from .common import (
|
||||
async_init_zigpy_device, make_attribute, make_entity_id,
|
||||
async_enable_traffic)
|
||||
|
||||
LOCK_DOOR = 0
|
||||
UNLOCK_DOOR = 1
|
||||
|
||||
|
||||
async def test_lock(hass, config_entry, zha_gateway):
|
||||
"""Test zha lock platform."""
|
||||
from zigpy.zcl.clusters.closures import DoorLock
|
||||
from zigpy.zcl.clusters.general import Basic
|
||||
|
||||
# create zigpy device
|
||||
zigpy_device = await async_init_zigpy_device(
|
||||
hass, [DoorLock.cluster_id, Basic.cluster_id], [], None, zha_gateway)
|
||||
|
||||
# load up lock domain
|
||||
await hass.config_entries.async_forward_entry_setup(
|
||||
config_entry, DOMAIN)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
cluster = zigpy_device.endpoints.get(1).door_lock
|
||||
entity_id = make_entity_id(DOMAIN, zigpy_device, cluster)
|
||||
zha_device = zha_gateway.get_device(zigpy_device.ieee)
|
||||
|
||||
# test that the lock was created and that it is unavailable
|
||||
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
|
||||
|
||||
# allow traffic to flow through the gateway and device
|
||||
await async_enable_traffic(hass, zha_gateway, [zha_device])
|
||||
|
||||
# test that the state has changed from unavailable to unlocked
|
||||
assert hass.states.get(entity_id).state == STATE_UNLOCKED
|
||||
|
||||
# set state to locked
|
||||
attr = make_attribute(0, 1)
|
||||
cluster.handle_message(False, 1, 0x0a, [[attr]])
|
||||
await hass.async_block_till_done()
|
||||
assert hass.states.get(entity_id).state == STATE_LOCKED
|
||||
|
||||
# set state to unlocked
|
||||
attr.value.value = 2
|
||||
cluster.handle_message(False, 0, 0x0a, [[attr]])
|
||||
await hass.async_block_till_done()
|
||||
assert hass.states.get(entity_id).state == STATE_UNLOCKED
|
||||
|
||||
# lock from HA
|
||||
await async_lock(hass, cluster, entity_id)
|
||||
|
||||
# unlock from HA
|
||||
await async_unlock(hass, cluster, entity_id)
|
||||
|
||||
|
||||
async def async_lock(hass, cluster, entity_id):
|
||||
"""Test lock functionality from hass."""
|
||||
from zigpy.zcl.foundation import Status
|
||||
with patch(
|
||||
'zigpy.zcl.Cluster.request',
|
||||
return_value=mock_coro([Status.SUCCESS, ])):
|
||||
# lock via UI
|
||||
await hass.services.async_call(DOMAIN, 'lock', {
|
||||
'entity_id': entity_id
|
||||
}, blocking=True)
|
||||
assert cluster.request.call_count == 1
|
||||
assert cluster.request.call_args[0][0] is False
|
||||
assert cluster.request.call_args[0][1] == LOCK_DOOR
|
||||
|
||||
|
||||
async def async_unlock(hass, cluster, entity_id):
|
||||
"""Test lock functionality from hass."""
|
||||
from zigpy.zcl.foundation import Status
|
||||
with patch(
|
||||
'zigpy.zcl.Cluster.request',
|
||||
return_value=mock_coro([Status.SUCCESS, ])):
|
||||
# lock via UI
|
||||
await hass.services.async_call(DOMAIN, 'unlock', {
|
||||
'entity_id': entity_id
|
||||
}, blocking=True)
|
||||
assert cluster.request.call_count == 1
|
||||
assert cluster.request.call_args[0][0] is False
|
||||
assert cluster.request.call_args[0][1] == UNLOCK_DOOR
|
|
@ -54,7 +54,7 @@ async def test_switch(hass, config_entry, zha_gateway):
|
|||
# turn on from HA
|
||||
with patch(
|
||||
'zigpy.zcl.Cluster.request',
|
||||
return_value=mock_coro([Status.SUCCESS, Status.SUCCESS])):
|
||||
return_value=mock_coro([0x00, Status.SUCCESS])):
|
||||
# turn on via UI
|
||||
await hass.services.async_call(DOMAIN, 'turn_on', {
|
||||
'entity_id': entity_id
|
||||
|
@ -66,7 +66,7 @@ async def test_switch(hass, config_entry, zha_gateway):
|
|||
# turn off from HA
|
||||
with patch(
|
||||
'zigpy.zcl.Cluster.request',
|
||||
return_value=mock_coro([Status.SUCCESS, Status.SUCCESS])):
|
||||
return_value=mock_coro([0x01, Status.SUCCESS])):
|
||||
# turn off via UI
|
||||
await hass.services.async_call(DOMAIN, 'turn_off', {
|
||||
'entity_id': entity_id
|
||||
|
|
Loading…
Reference in New Issue