Add mysensors cover tests (#84626)

pull/84547/head^2
Martin Hjelmare 2022-12-27 20:04:22 +01:00 committed by GitHub
parent d1489fe76f
commit eae8154753
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 399 additions and 0 deletions

View File

@ -192,6 +192,38 @@ def update_gateway_nodes(
return nodes return nodes
@pytest.fixture(name="cover_node_binary_state", scope="session")
def cover_node_binary_state_fixture() -> dict:
"""Load the cover node state."""
return load_nodes_state("cover_node_binary_state.json")
@pytest.fixture
def cover_node_binary(
gateway_nodes: dict[int, Sensor], cover_node_binary_state: dict
) -> Sensor:
"""Load the cover child node."""
nodes = update_gateway_nodes(gateway_nodes, deepcopy(cover_node_binary_state))
node = nodes[1]
return node
@pytest.fixture(name="cover_node_percentage_state", scope="session")
def cover_node_percentage_state_fixture() -> dict:
"""Load the cover node state."""
return load_nodes_state("cover_node_percentage_state.json")
@pytest.fixture
def cover_node_percentage(
gateway_nodes: dict[int, Sensor], cover_node_percentage_state: dict
) -> Sensor:
"""Load the cover child node."""
nodes = update_gateway_nodes(gateway_nodes, deepcopy(cover_node_percentage_state))
node = nodes[1]
return node
@pytest.fixture(name="door_sensor_state", scope="session") @pytest.fixture(name="door_sensor_state", scope="session")
def door_sensor_state_fixture() -> dict: def door_sensor_state_fixture() -> dict:
"""Load the door sensor state.""" """Load the door sensor state."""

View File

@ -0,0 +1,24 @@
{
"1": {
"sensor_id": 1,
"children": {
"1": {
"id": 1,
"type": 5,
"description": "",
"values": {
"2": "0",
"29": "0",
"30": "0",
"31": "0"
}
}
},
"type": 17,
"sketch_name": "Cover Node",
"sketch_version": "1.0",
"battery_level": 0,
"protocol_version": "2.3.2",
"heartbeat": 0
}
}

View File

@ -0,0 +1,24 @@
{
"1": {
"sensor_id": 1,
"children": {
"1": {
"id": 1,
"type": 5,
"description": "",
"values": {
"3": "0",
"29": "0",
"30": "0",
"31": "0"
}
}
},
"type": 17,
"sketch_name": "Cover Node",
"sketch_version": "1.0",
"battery_level": 0,
"protocol_version": "2.3.2",
"heartbeat": 0
}
}

View File

@ -0,0 +1,319 @@
"""Provide tests for mysensors cover platform."""
from __future__ import annotations
from collections.abc import Callable
from unittest.mock import MagicMock, call
from mysensors.sensor import Sensor
from homeassistant.components.cover import (
ATTR_CURRENT_POSITION,
ATTR_POSITION,
DOMAIN as COVER_DOMAIN,
SERVICE_CLOSE_COVER,
SERVICE_OPEN_COVER,
SERVICE_SET_COVER_POSITION,
SERVICE_STOP_COVER,
STATE_CLOSED,
STATE_CLOSING,
STATE_OPEN,
STATE_OPENING,
)
from homeassistant.const import ATTR_ENTITY_ID
from homeassistant.core import HomeAssistant
async def test_cover_node_percentage(
hass: HomeAssistant,
cover_node_percentage: Sensor,
receive_message: Callable[[str], None],
transport_write: MagicMock,
) -> None:
"""Test a cover percentage node."""
entity_id = "cover.cover_node_1_1"
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_CLOSED
assert state.attributes[ATTR_CURRENT_POSITION] == 0
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_OPEN_COVER,
{ATTR_ENTITY_ID: entity_id},
blocking=True,
)
assert transport_write.call_count == 1
assert transport_write.call_args == call("1;1;1;1;29;1\n")
receive_message("1;1;1;0;29;1\n")
receive_message("1;1;1;0;3;50\n")
# the integration adds multiple jobs to do the update currently
await hass.async_block_till_done()
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OPENING
assert state.attributes[ATTR_CURRENT_POSITION] == 50
transport_write.reset_mock()
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_STOP_COVER,
{ATTR_ENTITY_ID: entity_id},
blocking=True,
)
assert transport_write.call_count == 1
assert transport_write.call_args == call("1;1;1;1;31;1\n")
receive_message("1;1;1;0;31;1\n")
receive_message("1;1;1;0;3;50\n")
# the integration adds multiple jobs to do the update currently
await hass.async_block_till_done()
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OPEN
assert state.attributes[ATTR_CURRENT_POSITION] == 50
transport_write.reset_mock()
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_OPEN_COVER,
{ATTR_ENTITY_ID: entity_id},
blocking=True,
)
assert transport_write.call_count == 1
assert transport_write.call_args == call("1;1;1;1;29;1\n")
receive_message("1;1;1;0;31;0\n")
receive_message("1;1;1;0;29;1\n")
receive_message("1;1;1;0;3;75\n")
# the integration adds multiple jobs to do the update currently
await hass.async_block_till_done()
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OPENING
assert state.attributes[ATTR_CURRENT_POSITION] == 75
receive_message("1;1;1;0;29;0\n")
receive_message("1;1;1;0;3;100\n")
# the integration adds multiple jobs to do the update currently
await hass.async_block_till_done()
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OPEN
assert state.attributes[ATTR_CURRENT_POSITION] == 100
transport_write.reset_mock()
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_CLOSE_COVER,
{ATTR_ENTITY_ID: entity_id},
blocking=True,
)
assert transport_write.call_count == 1
assert transport_write.call_args == call("1;1;1;1;30;1\n")
receive_message("1;1;1;0;30;1\n")
receive_message("1;1;1;0;3;50\n")
# the integration adds multiple jobs to do the update currently
await hass.async_block_till_done()
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_CLOSING
assert state.attributes[ATTR_CURRENT_POSITION] == 50
receive_message("1;1;1;0;30;0\n")
receive_message("1;1;1;0;3;0\n")
# the integration adds multiple jobs to do the update currently
await hass.async_block_till_done()
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_CLOSED
assert state.attributes[ATTR_CURRENT_POSITION] == 0
transport_write.reset_mock()
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_SET_COVER_POSITION,
{ATTR_ENTITY_ID: entity_id, ATTR_POSITION: 25},
blocking=True,
)
assert transport_write.call_count == 1
assert transport_write.call_args == call("1;1;1;1;3;25\n")
receive_message("1;1;1;0;3;25\n")
# the integration adds multiple jobs to do the update currently
await hass.async_block_till_done()
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OPEN
assert state.attributes[ATTR_CURRENT_POSITION] == 25
async def test_cover_node_binary(
hass: HomeAssistant,
cover_node_binary: Sensor,
receive_message: Callable[[str], None],
transport_write: MagicMock,
) -> None:
"""Test a cover binary node."""
entity_id = "cover.cover_node_1_1"
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_CLOSED
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_OPEN_COVER,
{ATTR_ENTITY_ID: entity_id},
blocking=True,
)
assert transport_write.call_count == 1
assert transport_write.call_args == call("1;1;1;1;29;1\n")
receive_message("1;1;1;0;29;1\n")
receive_message("1;1;1;0;2;1\n")
# the integration adds multiple jobs to do the update currently
await hass.async_block_till_done()
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OPENING
transport_write.reset_mock()
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_STOP_COVER,
{ATTR_ENTITY_ID: entity_id},
blocking=True,
)
assert transport_write.call_count == 1
assert transport_write.call_args == call("1;1;1;1;31;1\n")
receive_message("1;1;1;0;31;1\n")
# the integration adds multiple jobs to do the update currently
await hass.async_block_till_done()
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OPEN
transport_write.reset_mock()
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_OPEN_COVER,
{ATTR_ENTITY_ID: entity_id},
blocking=True,
)
assert transport_write.call_count == 1
assert transport_write.call_args == call("1;1;1;1;29;1\n")
receive_message("1;1;1;0;31;0\n")
receive_message("1;1;1;0;29;1\n")
# the integration adds multiple jobs to do the update currently
await hass.async_block_till_done()
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OPENING
receive_message("1;1;1;0;29;0\n")
receive_message("1;1;1;0;2;1\n")
# the integration adds multiple jobs to do the update currently
await hass.async_block_till_done()
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OPEN
transport_write.reset_mock()
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_CLOSE_COVER,
{ATTR_ENTITY_ID: entity_id},
blocking=True,
)
assert transport_write.call_count == 1
assert transport_write.call_args == call("1;1;1;1;30;1\n")
receive_message("1;1;1;0;30;1\n")
# the integration adds multiple jobs to do the update currently
await hass.async_block_till_done()
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_CLOSING
receive_message("1;1;1;0;30;0\n")
receive_message("1;1;1;0;2;0\n")
# the integration adds multiple jobs to do the update currently
await hass.async_block_till_done()
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_CLOSED