Improve rflink coverage (#19596)
* some minor tests refactor * async/await refactor * toggle have not brightness * test for race condition in unknown device * test for 'no_command' and 'not_connected' * test for race condition in unknown device * sensor events are handled in sensor devices, RflinkDevice handle command events * test race conditions & bogus entity remove * two more tests * Test race condition for unknown components * Test cleanup for `commands events` and `sensor events`pull/19716/head
parent
2ea53e0787
commit
b9f4a7220e
|
@ -178,7 +178,7 @@ async def async_setup(hass, config):
|
|||
_LOGGER.debug('device_id not known, adding new device')
|
||||
# Add bogus event_id first to avoid race if we get another
|
||||
# event before the device is created
|
||||
# Any additional events recevied before the device has been
|
||||
# Any additional events received before the device has been
|
||||
# created will thus be ignored.
|
||||
hass.data[DATA_ENTITY_LOOKUP][event_type][
|
||||
event_id].append(TMP_ENTITY.format(event_id))
|
||||
|
@ -348,9 +348,9 @@ class RflinkDevice(Entity):
|
|||
# Remove temporary bogus entity_id if added
|
||||
tmp_entity = TMP_ENTITY.format(self._device_id)
|
||||
if tmp_entity in self.hass.data[DATA_ENTITY_LOOKUP][
|
||||
EVENT_KEY_SENSOR][self._device_id]:
|
||||
EVENT_KEY_COMMAND][self._device_id]:
|
||||
self.hass.data[DATA_ENTITY_LOOKUP][
|
||||
EVENT_KEY_SENSOR][self._device_id].remove(tmp_entity)
|
||||
EVENT_KEY_COMMAND][self._device_id].remove(tmp_entity)
|
||||
|
||||
# Register id and aliases
|
||||
self.hass.data[DATA_ENTITY_LOOKUP][
|
||||
|
|
|
@ -377,9 +377,10 @@ async def test_type_toggle(hass, monkeypatch):
|
|||
event_callback, _, _, _ = await mock_rflink(
|
||||
hass, config, DOMAIN, monkeypatch)
|
||||
|
||||
# default value = 'off'
|
||||
assert hass.states.get(DOMAIN + '.toggle_test').state == 'off'
|
||||
|
||||
# test sending on command to toggle alias
|
||||
# test sending 'on' command, must set state = 'on'
|
||||
event_callback({
|
||||
'id': 'toggle_0_0',
|
||||
'command': 'on',
|
||||
|
@ -388,7 +389,7 @@ async def test_type_toggle(hass, monkeypatch):
|
|||
|
||||
assert hass.states.get(DOMAIN + '.toggle_test').state == 'on'
|
||||
|
||||
# test sending group command to group alias
|
||||
# test sending 'on' command again, must set state = 'off'
|
||||
event_callback({
|
||||
'id': 'toggle_0_0',
|
||||
'command': 'on',
|
||||
|
@ -397,6 +398,22 @@ async def test_type_toggle(hass, monkeypatch):
|
|||
|
||||
assert hass.states.get(DOMAIN + '.toggle_test').state == 'off'
|
||||
|
||||
# test async_turn_off, must set state = 'on' ('off' + toggle)
|
||||
hass.async_create_task(
|
||||
hass.services.async_call(DOMAIN, SERVICE_TURN_OFF,
|
||||
{ATTR_ENTITY_ID: DOMAIN + '.toggle_test'}))
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert hass.states.get(DOMAIN + '.toggle_test').state == 'on'
|
||||
|
||||
# test async_turn_on, must set state = 'off' (yes, sounds crazy)
|
||||
hass.async_create_task(
|
||||
hass.services.async_call(DOMAIN, SERVICE_TURN_ON,
|
||||
{ATTR_ENTITY_ID: DOMAIN + '.toggle_test'}))
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert hass.states.get(DOMAIN + '.toggle_test').state == 'off'
|
||||
|
||||
|
||||
async def test_group_alias(hass, monkeypatch):
|
||||
"""Group aliases should only respond to group commands (allon/alloff)."""
|
||||
|
@ -574,6 +591,10 @@ async def test_restore_state(hass, monkeypatch):
|
|||
'name': 'l4',
|
||||
'type': 'dimmable',
|
||||
},
|
||||
'test_restore_5': {
|
||||
'name': 'l5',
|
||||
'type': 'dimmable',
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -582,6 +603,7 @@ async def test_restore_state(hass, monkeypatch):
|
|||
State(DOMAIN + '.l1', STATE_ON, {ATTR_BRIGHTNESS: "123", }),
|
||||
State(DOMAIN + '.l2', STATE_ON, {ATTR_BRIGHTNESS: "321", }),
|
||||
State(DOMAIN + '.l3', STATE_OFF, ),
|
||||
State(DOMAIN + '.l5', STATE_ON, {ATTR_BRIGHTNESS: "222", }),
|
||||
))
|
||||
|
||||
hass.state = CoreState.starting
|
||||
|
@ -589,7 +611,7 @@ async def test_restore_state(hass, monkeypatch):
|
|||
# setup mocking rflink module
|
||||
_, _, _, _ = await mock_rflink(hass, config, DOMAIN, monkeypatch)
|
||||
|
||||
# dimmable light must restore brightness
|
||||
# hybrid light must restore brightness
|
||||
state = hass.states.get(DOMAIN + '.l1')
|
||||
assert state
|
||||
assert state.state == STATE_ON
|
||||
|
@ -612,3 +634,9 @@ async def test_restore_state(hass, monkeypatch):
|
|||
assert state.state == STATE_OFF
|
||||
assert not state.attributes.get(ATTR_BRIGHTNESS)
|
||||
assert state.attributes['assumed_state']
|
||||
|
||||
# test coverage for dimmable light
|
||||
state = hass.states.get(DOMAIN + '.l5')
|
||||
assert state
|
||||
assert state.state == STATE_ON
|
||||
assert state.attributes[ATTR_BRIGHTNESS] == 222
|
||||
|
|
|
@ -6,7 +6,8 @@ automatic sensor creation.
|
|||
"""
|
||||
|
||||
from homeassistant.components.rflink import (
|
||||
CONF_RECONNECT_INTERVAL)
|
||||
CONF_RECONNECT_INTERVAL, TMP_ENTITY, DATA_ENTITY_LOOKUP,
|
||||
EVENT_KEY_COMMAND, EVENT_KEY_SENSOR)
|
||||
from homeassistant.const import STATE_UNKNOWN
|
||||
from ..test_rflink import mock_rflink
|
||||
|
||||
|
@ -133,3 +134,111 @@ async def test_entity_availability(hass, monkeypatch):
|
|||
|
||||
# Entities should be available again
|
||||
assert hass.states.get('sensor.test').state == STATE_UNKNOWN
|
||||
|
||||
|
||||
async def test_aliasses(hass, monkeypatch):
|
||||
"""Validate the response to sensor's alias (with aliasses)."""
|
||||
config = {
|
||||
'rflink': {
|
||||
'port': '/dev/ttyABC0',
|
||||
},
|
||||
DOMAIN: {
|
||||
'platform': 'rflink',
|
||||
'devices': {
|
||||
'test_02': {
|
||||
'name': 'test_02',
|
||||
'sensor_type': 'humidity',
|
||||
'aliasses': ['test_alias_02_0'],
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
# setup mocking rflink module
|
||||
event_callback, _, _, _ = await mock_rflink(
|
||||
hass, config, DOMAIN, monkeypatch)
|
||||
|
||||
# test default state of sensor loaded from config
|
||||
config_sensor = hass.states.get('sensor.test_02')
|
||||
assert config_sensor
|
||||
assert config_sensor.state == 'unknown'
|
||||
|
||||
# test event for config sensor
|
||||
event_callback({
|
||||
'id': 'test_alias_02_0',
|
||||
'sensor': 'humidity',
|
||||
'value': 65,
|
||||
'unit': '%',
|
||||
})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# test state of new sensor
|
||||
updated_sensor = hass.states.get('sensor.test_02')
|
||||
assert updated_sensor
|
||||
assert updated_sensor.state == '65'
|
||||
assert updated_sensor.attributes['unit_of_measurement'] == '%'
|
||||
|
||||
|
||||
async def test_race_condition(hass, monkeypatch):
|
||||
"""Test race condition for unknown components."""
|
||||
config = {
|
||||
'rflink': {
|
||||
'port': '/dev/ttyABC0',
|
||||
},
|
||||
DOMAIN: {
|
||||
'platform': 'rflink',
|
||||
},
|
||||
}
|
||||
tmp_entity = TMP_ENTITY.format('test3')
|
||||
|
||||
# setup mocking rflink module
|
||||
event_callback, _, _, _ = await mock_rflink(
|
||||
hass, config, DOMAIN, monkeypatch)
|
||||
|
||||
# test event for new unconfigured sensor
|
||||
event_callback({
|
||||
'id': 'test3',
|
||||
'sensor': 'battery',
|
||||
'value': 'ok',
|
||||
'unit': '',
|
||||
})
|
||||
event_callback({
|
||||
'id': 'test3',
|
||||
'sensor': 'battery',
|
||||
'value': 'ko',
|
||||
'unit': '',
|
||||
})
|
||||
|
||||
# tmp_entity added to EVENT_KEY_SENSOR
|
||||
assert tmp_entity in hass.data[DATA_ENTITY_LOOKUP][
|
||||
EVENT_KEY_SENSOR]['test3']
|
||||
# tmp_entity must no be added to EVENT_KEY_COMMAND
|
||||
assert tmp_entity not in hass.data[DATA_ENTITY_LOOKUP][
|
||||
EVENT_KEY_COMMAND]['test3']
|
||||
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# test state of new sensor
|
||||
updated_sensor = hass.states.get('sensor.test3')
|
||||
assert updated_sensor
|
||||
|
||||
# test state of new sensor
|
||||
new_sensor = hass.states.get(DOMAIN+'.test3')
|
||||
assert new_sensor
|
||||
assert new_sensor.state == 'ok'
|
||||
|
||||
event_callback({
|
||||
'id': 'test3',
|
||||
'sensor': 'battery',
|
||||
'value': 'ko',
|
||||
'unit': '',
|
||||
})
|
||||
await hass.async_block_till_done()
|
||||
# tmp_entity must be deleted from EVENT_KEY_COMMAND
|
||||
assert tmp_entity not in hass.data[DATA_ENTITY_LOOKUP][
|
||||
EVENT_KEY_SENSOR]['test3']
|
||||
|
||||
# test state of new sensor
|
||||
new_sensor = hass.states.get(DOMAIN+'.test3')
|
||||
assert new_sensor
|
||||
assert new_sensor.state == 'ko'
|
||||
|
|
|
@ -4,21 +4,23 @@ from unittest.mock import Mock
|
|||
|
||||
from homeassistant.bootstrap import async_setup_component
|
||||
from homeassistant.components.rflink import (
|
||||
CONF_RECONNECT_INTERVAL, SERVICE_SEND_COMMAND)
|
||||
CONF_RECONNECT_INTERVAL, SERVICE_SEND_COMMAND, RflinkCommand,
|
||||
TMP_ENTITY, DATA_ENTITY_LOOKUP, EVENT_KEY_COMMAND, EVENT_KEY_SENSOR)
|
||||
from homeassistant.const import (
|
||||
ATTR_ENTITY_ID, SERVICE_TURN_OFF, SERVICE_STOP_COVER)
|
||||
|
||||
|
||||
async def mock_rflink(hass, config, domain, monkeypatch, failures=None):
|
||||
async def mock_rflink(hass, config, domain, monkeypatch, failures=None,
|
||||
failcommand=False):
|
||||
"""Create mock RFLink asyncio protocol, test component setup."""
|
||||
transport, protocol = (Mock(), Mock())
|
||||
|
||||
async def send_command_ack(*command):
|
||||
return True
|
||||
return not failcommand
|
||||
protocol.send_command_ack = Mock(wraps=send_command_ack)
|
||||
|
||||
def send_command(*command):
|
||||
return True
|
||||
return not failcommand
|
||||
protocol.send_command = Mock(wraps=send_command)
|
||||
|
||||
async def create_rflink_connection(*args, **kwargs):
|
||||
|
@ -186,9 +188,17 @@ async def test_send_command_invalid_arguments(hass, monkeypatch):
|
|||
# no arguments
|
||||
hass.async_create_task(
|
||||
hass.services.async_call(domain, SERVICE_SEND_COMMAND, {}))
|
||||
|
||||
await hass.async_block_till_done()
|
||||
assert protocol.send_command_ack.call_args_list == []
|
||||
|
||||
# bad command (no_command)
|
||||
success = await hass.services.async_call(
|
||||
domain, SERVICE_SEND_COMMAND, {
|
||||
'device_id': 'newkaku_0000c6c2_1',
|
||||
'command': 'no_command'})
|
||||
assert not success, 'send command should not succeed for unknown command'
|
||||
|
||||
|
||||
async def test_reconnecting_after_disconnect(hass, monkeypatch):
|
||||
"""An unexpected disconnect should cause a reconnect."""
|
||||
|
@ -281,3 +291,95 @@ async def test_error_when_not_connected(hass, monkeypatch):
|
|||
success = await hass.services.async_call(
|
||||
domain, SERVICE_TURN_OFF, {ATTR_ENTITY_ID: 'switch.test'})
|
||||
assert not success, 'changing state should not succeed when disconnected'
|
||||
|
||||
|
||||
async def test_async_send_command_error(hass, monkeypatch):
|
||||
"""Sending command should error when protocol fails."""
|
||||
domain = 'rflink'
|
||||
config = {
|
||||
'rflink': {
|
||||
'port': '/dev/ttyABC0',
|
||||
},
|
||||
}
|
||||
|
||||
# setup mocking rflink module
|
||||
_, _, protocol, _ = await mock_rflink(
|
||||
hass, config, domain, monkeypatch, failcommand=True)
|
||||
|
||||
success = await hass.services.async_call(
|
||||
domain, SERVICE_SEND_COMMAND, {
|
||||
'device_id': 'newkaku_0000c6c2_1',
|
||||
'command': SERVICE_TURN_OFF})
|
||||
await hass.async_block_till_done()
|
||||
assert not success, 'send command should not succeed if failcommand=True'
|
||||
assert (protocol.send_command_ack.call_args_list[0][0][0]
|
||||
== 'newkaku_0000c6c2_1')
|
||||
assert (protocol.send_command_ack.call_args_list[0][0][1]
|
||||
== SERVICE_TURN_OFF)
|
||||
|
||||
|
||||
async def test_race_condition(hass, monkeypatch):
|
||||
"""Test race condition for unknown components."""
|
||||
domain = 'light'
|
||||
config = {
|
||||
'rflink': {
|
||||
'port': '/dev/ttyABC0',
|
||||
},
|
||||
domain: {
|
||||
'platform': 'rflink',
|
||||
},
|
||||
}
|
||||
tmp_entity = TMP_ENTITY.format('test3')
|
||||
|
||||
# setup mocking rflink module
|
||||
event_callback, _, _, _ = await mock_rflink(
|
||||
hass, config, domain, monkeypatch)
|
||||
|
||||
# test event for new unconfigured sensor
|
||||
event_callback({
|
||||
'id': 'test3',
|
||||
'command': 'off',
|
||||
})
|
||||
event_callback({
|
||||
'id': 'test3',
|
||||
'command': 'on',
|
||||
})
|
||||
|
||||
# tmp_entity added to EVENT_KEY_COMMAND
|
||||
assert tmp_entity in hass.data[DATA_ENTITY_LOOKUP][
|
||||
EVENT_KEY_COMMAND]['test3']
|
||||
# tmp_entity must no be added to EVENT_KEY_SENSOR
|
||||
assert tmp_entity not in hass.data[DATA_ENTITY_LOOKUP][
|
||||
EVENT_KEY_SENSOR]['test3']
|
||||
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# test state of new sensor
|
||||
new_sensor = hass.states.get(domain+'.test3')
|
||||
assert new_sensor
|
||||
assert new_sensor.state == 'off'
|
||||
|
||||
event_callback({
|
||||
'id': 'test3',
|
||||
'command': 'on',
|
||||
})
|
||||
await hass.async_block_till_done()
|
||||
# tmp_entity must be deleted from EVENT_KEY_COMMAND
|
||||
assert tmp_entity not in hass.data[DATA_ENTITY_LOOKUP][
|
||||
EVENT_KEY_COMMAND]['test3']
|
||||
|
||||
# test state of new sensor
|
||||
new_sensor = hass.states.get(domain+'.test3')
|
||||
assert new_sensor
|
||||
assert new_sensor.state == 'on'
|
||||
|
||||
|
||||
async def test_not_connected(hass, monkeypatch):
|
||||
"""Test Error when sending commands to a disconnected device."""
|
||||
import pytest
|
||||
from homeassistant.core import HomeAssistantError
|
||||
|
||||
test_device = RflinkCommand('DUMMY_DEVICE')
|
||||
RflinkCommand.set_rflink_protocol(None)
|
||||
with pytest.raises(HomeAssistantError):
|
||||
await test_device._async_handle_command('turn_on')
|
||||
|
|
Loading…
Reference in New Issue