2020-10-14 08:19:12 +00:00
|
|
|
"""Tests for 1-Wire sensor platform."""
|
2021-10-18 17:16:53 +00:00
|
|
|
from unittest.mock import MagicMock, patch
|
2021-01-01 21:31:56 +00:00
|
|
|
|
2020-12-07 01:09:32 +00:00
|
|
|
from pyownet.protocol import Error as ProtocolError
|
|
|
|
import pytest
|
|
|
|
|
2021-10-20 07:10:32 +00:00
|
|
|
from homeassistant.components.onewire.const import DOMAIN
|
2021-08-24 08:37:59 +00:00
|
|
|
from homeassistant.components.sensor import ATTR_STATE_CLASS, DOMAIN as SENSOR_DOMAIN
|
2021-10-18 17:16:53 +00:00
|
|
|
from homeassistant.config_entries import ConfigEntry
|
2021-08-24 08:37:59 +00:00
|
|
|
from homeassistant.const import (
|
|
|
|
ATTR_DEVICE_CLASS,
|
2021-10-19 19:41:01 +00:00
|
|
|
ATTR_ENTITY_ID,
|
2021-08-24 08:37:59 +00:00
|
|
|
ATTR_MANUFACTURER,
|
|
|
|
ATTR_MODEL,
|
|
|
|
ATTR_NAME,
|
2021-10-19 19:41:01 +00:00
|
|
|
ATTR_STATE,
|
2021-08-24 08:37:59 +00:00
|
|
|
ATTR_UNIT_OF_MEASUREMENT,
|
|
|
|
)
|
2021-10-18 17:16:53 +00:00
|
|
|
from homeassistant.core import HomeAssistant
|
2020-10-14 08:19:12 +00:00
|
|
|
|
2021-10-18 17:16:53 +00:00
|
|
|
from . import setup_owproxy_mock_devices, setup_sysbus_mock_devices
|
2021-10-19 19:41:01 +00:00
|
|
|
from .const import (
|
|
|
|
ATTR_DEFAULT_DISABLED,
|
|
|
|
ATTR_DEVICE_FILE,
|
|
|
|
ATTR_DEVICE_INFO,
|
|
|
|
ATTR_INJECT_READS,
|
|
|
|
ATTR_UNIQUE_ID,
|
|
|
|
MOCK_OWPROXY_DEVICES,
|
|
|
|
MOCK_SYSBUS_DEVICES,
|
|
|
|
)
|
2020-12-07 01:09:32 +00:00
|
|
|
|
2021-10-20 07:10:32 +00:00
|
|
|
from tests.common import mock_device_registry, mock_registry
|
2020-12-07 01:09:32 +00:00
|
|
|
|
|
|
|
MOCK_COUPLERS = {
|
2021-04-01 13:06:47 +00:00
|
|
|
key: value for (key, value) in MOCK_OWPROXY_DEVICES.items() if "branches" in value
|
|
|
|
}
|
|
|
|
|
2020-10-14 08:19:12 +00:00
|
|
|
|
2021-10-18 17:16:53 +00:00
|
|
|
@pytest.fixture(autouse=True)
|
|
|
|
def override_platforms():
|
|
|
|
"""Override PLATFORMS."""
|
|
|
|
with patch("homeassistant.components.onewire.PLATFORMS", [SENSOR_DOMAIN]):
|
|
|
|
yield
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("device_id", ["1F.111111111111"], indirect=True)
|
|
|
|
async def test_sensors_on_owserver_coupler(
|
|
|
|
hass: HomeAssistant, config_entry: ConfigEntry, owproxy: MagicMock, device_id: str
|
|
|
|
):
|
2020-12-07 01:09:32 +00:00
|
|
|
"""Test for 1-Wire sensors connected to DS2409 coupler."""
|
2021-10-07 10:58:00 +00:00
|
|
|
|
2020-12-07 01:09:32 +00:00
|
|
|
entity_registry = mock_registry(hass)
|
|
|
|
|
|
|
|
mock_coupler = MOCK_COUPLERS[device_id]
|
|
|
|
|
|
|
|
dir_side_effect = [] # List of lists of string
|
|
|
|
read_side_effect = [] # List of byte arrays
|
|
|
|
|
|
|
|
dir_side_effect.append([f"/{device_id}/"]) # dir on root
|
|
|
|
read_side_effect.append(device_id[0:2].encode()) # read family on root
|
2021-10-19 19:41:01 +00:00
|
|
|
if ATTR_INJECT_READS in mock_coupler:
|
|
|
|
read_side_effect += mock_coupler[ATTR_INJECT_READS]
|
2020-12-07 01:09:32 +00:00
|
|
|
|
2021-10-19 19:41:01 +00:00
|
|
|
expected_entities = []
|
2020-12-07 01:09:32 +00:00
|
|
|
for branch, branch_details in mock_coupler["branches"].items():
|
|
|
|
dir_side_effect.append(
|
|
|
|
[ # dir on branch
|
|
|
|
f"/{device_id}/{branch}/{sub_device_id}/"
|
|
|
|
for sub_device_id in branch_details
|
|
|
|
]
|
|
|
|
)
|
|
|
|
|
|
|
|
for sub_device_id, sub_device in branch_details.items():
|
|
|
|
read_side_effect.append(sub_device_id[0:2].encode())
|
2021-10-19 19:41:01 +00:00
|
|
|
if ATTR_INJECT_READS in sub_device:
|
|
|
|
read_side_effect.extend(sub_device[ATTR_INJECT_READS])
|
2020-12-07 01:09:32 +00:00
|
|
|
|
2021-10-19 19:41:01 +00:00
|
|
|
expected_entities += sub_device[SENSOR_DOMAIN]
|
|
|
|
for expected_entity in sub_device[SENSOR_DOMAIN]:
|
|
|
|
read_side_effect.append(expected_entity[ATTR_INJECT_READS])
|
2020-12-07 01:09:32 +00:00
|
|
|
|
|
|
|
# Ensure enough read side effect
|
|
|
|
read_side_effect.extend([ProtocolError("Missing injected value")] * 10)
|
|
|
|
owproxy.return_value.dir.side_effect = dir_side_effect
|
|
|
|
owproxy.return_value.read.side_effect = read_side_effect
|
|
|
|
|
2021-10-18 17:16:53 +00:00
|
|
|
await hass.config_entries.async_setup(config_entry.entry_id)
|
|
|
|
await hass.async_block_till_done()
|
2020-12-07 01:09:32 +00:00
|
|
|
|
2021-10-19 19:41:01 +00:00
|
|
|
assert len(entity_registry.entities) == len(expected_entities)
|
2020-12-07 01:09:32 +00:00
|
|
|
|
2021-10-19 19:41:01 +00:00
|
|
|
for expected_entity in expected_entities:
|
|
|
|
entity_id = expected_entity[ATTR_ENTITY_ID]
|
2020-12-07 01:09:32 +00:00
|
|
|
registry_entry = entity_registry.entities.get(entity_id)
|
|
|
|
assert registry_entry is not None
|
2021-10-19 19:41:01 +00:00
|
|
|
assert registry_entry.unique_id == expected_entity[ATTR_UNIQUE_ID]
|
2020-12-07 01:09:32 +00:00
|
|
|
state = hass.states.get(entity_id)
|
2021-10-19 19:41:01 +00:00
|
|
|
assert state.state == expected_entity[ATTR_STATE]
|
2021-08-24 08:37:59 +00:00
|
|
|
for attr in (ATTR_DEVICE_CLASS, ATTR_STATE_CLASS, ATTR_UNIT_OF_MEASUREMENT):
|
2021-10-19 19:41:01 +00:00
|
|
|
assert state.attributes.get(attr) == expected_entity[attr]
|
|
|
|
assert state.attributes[ATTR_DEVICE_FILE] == expected_entity[ATTR_DEVICE_FILE]
|
2021-04-01 13:06:47 +00:00
|
|
|
|
|
|
|
|
2021-10-18 17:16:53 +00:00
|
|
|
async def test_owserver_setup_valid_device(
|
|
|
|
hass: HomeAssistant, config_entry: ConfigEntry, owproxy: MagicMock, device_id: str
|
|
|
|
):
|
2021-04-01 13:06:47 +00:00
|
|
|
"""Test for 1-Wire device.
|
|
|
|
|
|
|
|
As they would be on a clean setup: all binary-sensors and switches disabled.
|
|
|
|
"""
|
|
|
|
entity_registry = mock_registry(hass)
|
|
|
|
device_registry = mock_device_registry(hass)
|
|
|
|
|
|
|
|
mock_device = MOCK_OWPROXY_DEVICES[device_id]
|
2021-10-18 17:16:53 +00:00
|
|
|
expected_entities = mock_device.get(SENSOR_DOMAIN, [])
|
2021-04-01 13:06:47 +00:00
|
|
|
|
2021-10-19 07:10:26 +00:00
|
|
|
setup_owproxy_mock_devices(owproxy, SENSOR_DOMAIN, [device_id])
|
2021-10-18 17:16:53 +00:00
|
|
|
await hass.config_entries.async_setup(config_entry.entry_id)
|
|
|
|
await hass.async_block_till_done()
|
2021-04-01 13:06:47 +00:00
|
|
|
|
|
|
|
assert len(entity_registry.entities) == len(expected_entities)
|
|
|
|
|
2021-10-19 07:10:26 +00:00
|
|
|
# Ensure all entities are enabled
|
|
|
|
for expected_entity in expected_entities:
|
|
|
|
if expected_entity.get(ATTR_DEFAULT_DISABLED):
|
2021-10-19 19:41:01 +00:00
|
|
|
entity_id = expected_entity[ATTR_ENTITY_ID]
|
2021-10-19 07:10:26 +00:00
|
|
|
registry_entry = entity_registry.entities.get(entity_id)
|
|
|
|
assert registry_entry.disabled
|
|
|
|
assert registry_entry.disabled_by == "integration"
|
|
|
|
entity_registry.async_update_entity(entity_id, **{"disabled_by": None})
|
|
|
|
|
|
|
|
setup_owproxy_mock_devices(owproxy, SENSOR_DOMAIN, [device_id])
|
|
|
|
await hass.config_entries.async_reload(config_entry.entry_id)
|
|
|
|
await hass.async_block_till_done()
|
|
|
|
|
2021-04-01 13:06:47 +00:00
|
|
|
if len(expected_entities) > 0:
|
2021-10-19 19:41:01 +00:00
|
|
|
device_info = mock_device[ATTR_DEVICE_INFO]
|
2021-04-01 13:06:47 +00:00
|
|
|
assert len(device_registry.devices) == 1
|
|
|
|
registry_entry = device_registry.async_get_device({(DOMAIN, device_id)})
|
|
|
|
assert registry_entry is not None
|
|
|
|
assert registry_entry.identifiers == {(DOMAIN, device_id)}
|
2021-07-30 11:35:49 +00:00
|
|
|
assert registry_entry.manufacturer == device_info[ATTR_MANUFACTURER]
|
|
|
|
assert registry_entry.name == device_info[ATTR_NAME]
|
|
|
|
assert registry_entry.model == device_info[ATTR_MODEL]
|
2021-04-01 13:06:47 +00:00
|
|
|
|
|
|
|
for expected_entity in expected_entities:
|
2021-10-19 19:41:01 +00:00
|
|
|
entity_id = expected_entity[ATTR_ENTITY_ID]
|
2021-04-01 13:06:47 +00:00
|
|
|
registry_entry = entity_registry.entities.get(entity_id)
|
|
|
|
assert registry_entry is not None
|
2021-10-19 19:41:01 +00:00
|
|
|
assert registry_entry.unique_id == expected_entity[ATTR_UNIQUE_ID]
|
2021-04-01 13:06:47 +00:00
|
|
|
state = hass.states.get(entity_id)
|
2021-10-19 19:41:01 +00:00
|
|
|
assert state.state == expected_entity[ATTR_STATE]
|
2021-10-19 07:10:26 +00:00
|
|
|
for attr in (ATTR_DEVICE_CLASS, ATTR_STATE_CLASS, ATTR_UNIT_OF_MEASUREMENT):
|
|
|
|
assert state.attributes.get(attr) == expected_entity[attr]
|
2021-10-19 19:41:01 +00:00
|
|
|
assert state.attributes[ATTR_DEVICE_FILE] == expected_entity.get(
|
|
|
|
ATTR_DEVICE_FILE, registry_entry.unique_id
|
2021-10-19 07:10:26 +00:00
|
|
|
)
|
2021-04-01 13:06:47 +00:00
|
|
|
|
|
|
|
|
2021-10-18 17:16:53 +00:00
|
|
|
@pytest.mark.usefixtures("sysbus")
|
|
|
|
@pytest.mark.parametrize("device_id", MOCK_SYSBUS_DEVICES.keys(), indirect=True)
|
|
|
|
async def test_onewiredirect_setup_valid_device(
|
|
|
|
hass: HomeAssistant, sysbus_config_entry: ConfigEntry, device_id: str
|
|
|
|
):
|
2021-04-01 13:06:47 +00:00
|
|
|
"""Test that sysbus config entry works correctly."""
|
2021-10-07 10:58:00 +00:00
|
|
|
|
2021-04-01 13:06:47 +00:00
|
|
|
entity_registry = mock_registry(hass)
|
|
|
|
device_registry = mock_device_registry(hass)
|
|
|
|
|
2021-04-03 21:08:35 +00:00
|
|
|
glob_result, read_side_effect = setup_sysbus_mock_devices(
|
|
|
|
SENSOR_DOMAIN, [device_id]
|
|
|
|
)
|
2021-04-01 13:06:47 +00:00
|
|
|
|
2021-04-03 21:08:35 +00:00
|
|
|
mock_device = MOCK_SYSBUS_DEVICES[device_id]
|
|
|
|
expected_entities = mock_device.get(SENSOR_DOMAIN, [])
|
2021-04-01 13:06:47 +00:00
|
|
|
|
2021-07-30 05:12:00 +00:00
|
|
|
with patch("pi1wire._finder.glob.glob", return_value=glob_result,), patch(
|
2021-04-01 13:06:47 +00:00
|
|
|
"pi1wire.OneWire.get_temperature",
|
|
|
|
side_effect=read_side_effect,
|
|
|
|
):
|
2021-10-18 17:16:53 +00:00
|
|
|
await hass.config_entries.async_setup(sysbus_config_entry.entry_id)
|
2021-04-01 13:06:47 +00:00
|
|
|
await hass.async_block_till_done()
|
|
|
|
|
2021-04-03 21:08:35 +00:00
|
|
|
assert len(entity_registry.entities) == len(expected_entities)
|
2021-04-01 13:06:47 +00:00
|
|
|
|
2021-04-03 21:08:35 +00:00
|
|
|
if len(expected_entities) > 0:
|
2021-10-19 19:41:01 +00:00
|
|
|
device_info = mock_device[ATTR_DEVICE_INFO]
|
2021-04-01 13:06:47 +00:00
|
|
|
assert len(device_registry.devices) == 1
|
|
|
|
registry_entry = device_registry.async_get_device({(DOMAIN, device_id)})
|
|
|
|
assert registry_entry is not None
|
|
|
|
assert registry_entry.identifiers == {(DOMAIN, device_id)}
|
2021-07-30 11:35:49 +00:00
|
|
|
assert registry_entry.manufacturer == device_info[ATTR_MANUFACTURER]
|
|
|
|
assert registry_entry.name == device_info[ATTR_NAME]
|
|
|
|
assert registry_entry.model == device_info[ATTR_MODEL]
|
2021-04-01 13:06:47 +00:00
|
|
|
|
2021-10-19 19:41:01 +00:00
|
|
|
for expected_entity in expected_entities:
|
|
|
|
entity_id = expected_entity[ATTR_ENTITY_ID]
|
2021-04-01 13:06:47 +00:00
|
|
|
registry_entry = entity_registry.entities.get(entity_id)
|
|
|
|
assert registry_entry is not None
|
2021-10-19 19:41:01 +00:00
|
|
|
assert registry_entry.unique_id == expected_entity[ATTR_UNIQUE_ID]
|
2021-04-01 13:06:47 +00:00
|
|
|
state = hass.states.get(entity_id)
|
2021-10-19 19:41:01 +00:00
|
|
|
assert state.state == expected_entity[ATTR_STATE]
|
2021-08-24 08:37:59 +00:00
|
|
|
for attr in (ATTR_DEVICE_CLASS, ATTR_STATE_CLASS, ATTR_UNIT_OF_MEASUREMENT):
|
2021-10-19 19:41:01 +00:00
|
|
|
assert state.attributes.get(attr) == expected_entity[attr]
|