core/tests/components/derivative/test_sensor.py

937 lines
31 KiB
Python

"""The tests for the derivative sensor platform."""
from datetime import timedelta
from math import sin
import random
from typing import Any
from freezegun import freeze_time
import pytest
from homeassistant.components.derivative.const import DOMAIN
from homeassistant.components.sensor import ATTR_STATE_CLASS, SensorStateClass
from homeassistant.const import (
STATE_UNAVAILABLE,
STATE_UNKNOWN,
UnitOfPower,
UnitOfTime,
)
from homeassistant.core import (
Event,
EventStateChangedData,
HomeAssistant,
State,
callback,
)
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from tests.common import (
MockConfigEntry,
async_fire_time_changed,
mock_restore_cache_with_extra_data,
)
A1 = {"attr": "value1"}
A2 = {"attr": "value2"}
@pytest.mark.parametrize("force_update", [False, True])
@pytest.mark.parametrize(
"attributes",
[
# Same attributes, fires state report
[A1, A1],
# Changing attributes, fires state change with bumped last_updated
[A1, A2],
],
)
async def test_state(
hass: HomeAssistant,
force_update: bool,
attributes: list[dict[str, Any]],
) -> None:
"""Test derivative sensor state."""
config = {
"sensor": {
"platform": "derivative",
"name": "derivative",
"source": "sensor.energy",
"unit": "kW",
"round": 2,
}
}
assert await async_setup_component(hass, "sensor", config)
entity_id = config["sensor"]["source"]
base = dt_util.utcnow()
with freeze_time(base) as freezer:
for extra_attributes in attributes:
hass.states.async_set(
entity_id, 1, extra_attributes, force_update=force_update
)
await hass.async_block_till_done()
freezer.move_to(dt_util.utcnow() + timedelta(seconds=3600))
state = hass.states.get("sensor.derivative")
assert state is not None
# Testing a energy sensor at 1 kWh for 1hour = 0kW
assert round(float(state.state), config["sensor"]["round"]) == 0.0
assert state.attributes.get("unit_of_measurement") == "kW"
# Test unchanged states work both with and without max_sub_interval
@pytest.mark.parametrize("extra_config", [{}, {"max_sub_interval": {"minutes": 9999}}])
@pytest.mark.parametrize("force_update", [False, True])
@pytest.mark.parametrize(
"attributes",
[
# Same attributes, fires state report
[A1, A1, A1, A1],
# Changing attributes, fires state change with bumped last_updated
[A1, A2, A1, A2],
],
)
async def test_no_change(
hass: HomeAssistant,
extra_config: dict[str, Any],
force_update: bool,
attributes: list[dict[str, Any]],
) -> None:
"""Test derivative sensor state updated when source sensor doesn't change."""
events: list[Event[EventStateChangedData]] = []
@callback
def _capture_event(event: Event) -> None:
events.append(event)
async_track_state_change_event(hass, "sensor.derivative", _capture_event)
config = {
"sensor": {
"platform": "derivative",
"name": "derivative",
"source": "sensor.energy",
"unit": "kW",
"round": 2,
}
| extra_config
}
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
entity_id = config["sensor"]["source"]
base = dt_util.utcnow()
with freeze_time(base) as freezer:
for value, extra_attributes in zip([0, 1, 1, 1], attributes, strict=True):
hass.states.async_set(
entity_id, value, extra_attributes, force_update=force_update
)
await hass.async_block_till_done()
freezer.move_to(dt_util.utcnow() + timedelta(seconds=3600))
state = hass.states.get("sensor.derivative")
assert state is not None
await hass.async_block_till_done()
await hass.async_block_till_done()
states = [events[0].data["new_state"].state] + [
round(float(event.data["new_state"].state), config["sensor"]["round"])
for event in events[1:]
]
# Testing a energy sensor at 1 kWh for 1hour = 0kW
assert states == ["unavailable", 0.0, 1.0, 0.0]
state = events[-1].data["new_state"]
assert state.attributes.get("unit_of_measurement") == "kW"
assert state.last_changed == base + timedelta(seconds=2 * 3600)
async def _setup_sensor(
hass: HomeAssistant, config: dict[str, Any]
) -> tuple[dict[str, Any], str]:
default_config = {
"platform": "derivative",
"name": "power",
"source": "sensor.energy",
"round": 2,
}
config = {"sensor": dict(default_config, **config)}
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
entity_id = config["sensor"]["source"]
hass.states.async_set(entity_id, 0, {})
await hass.async_block_till_done()
return config, entity_id
async def setup_tests(
hass: HomeAssistant,
config: dict[str, Any],
times: list[int],
values: list[float],
expected_state: float,
) -> State:
"""Test derivative sensor state."""
config, entity_id = await _setup_sensor(hass, config)
# Testing a energy sensor with non-monotonic intervals and values
base = dt_util.utcnow()
with freeze_time(base) as freezer:
for time, value in zip(times, values, strict=True):
freezer.move_to(base + timedelta(seconds=time))
hass.states.async_set(entity_id, value, {})
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
assert state is not None
assert round(float(state.state), config["sensor"]["round"]) == expected_state
return state
async def test_dataSet1(hass: HomeAssistant) -> None:
"""Test derivative sensor state."""
await setup_tests(
hass,
{"unit_time": UnitOfTime.SECONDS},
times=[20, 30, 40, 50],
values=[10, 30, 5, 0],
expected_state=-0.5,
)
async def test_dataSet2(hass: HomeAssistant) -> None:
"""Test derivative sensor state."""
await setup_tests(
hass,
{"unit_time": UnitOfTime.SECONDS},
times=[20, 30],
values=[5, 0],
expected_state=-0.5,
)
async def test_dataSet3(hass: HomeAssistant) -> None:
"""Test derivative sensor state."""
state = await setup_tests(
hass,
{"unit_time": UnitOfTime.SECONDS},
times=[20, 30],
values=[5, 10],
expected_state=0.5,
)
assert state.attributes.get("unit_of_measurement") == f"/{UnitOfTime.SECONDS}"
async def test_dataSet4(hass: HomeAssistant) -> None:
"""Test derivative sensor state."""
await setup_tests(
hass,
{"unit_time": UnitOfTime.SECONDS},
times=[20, 30],
values=[5, 5],
expected_state=0,
)
async def test_dataSet5(hass: HomeAssistant) -> None:
"""Test derivative sensor state."""
await setup_tests(
hass,
{"unit_time": UnitOfTime.SECONDS},
times=[20, 30],
values=[10, -10],
expected_state=-2,
)
async def test_dataSet6(hass: HomeAssistant) -> None:
"""Test derivative sensor state."""
await setup_tests(hass, {}, times=[0, 60], values=[0, 1 / 60], expected_state=1)
# Test unchanged states work both with and without max_sub_interval
@pytest.mark.parametrize("extra_config", [{}, {"max_sub_interval": {"minutes": 9999}}])
@pytest.mark.parametrize("force_update", [False, True])
@pytest.mark.parametrize(
"attributes",
[
# Same attributes, fires state report
[A1, A1] * 10 + [A1],
# Changing attributes, fires state change with bumped last_updated
[A1, A2] * 10 + [A1],
],
)
async def test_data_moving_average_with_zeroes(
hass: HomeAssistant,
extra_config: dict[str, Any],
force_update: bool,
attributes: list[dict[str, Any]],
) -> None:
"""Test that zeroes are properly handled within the time window."""
# We simulate the following situation:
# The temperature rises 1 °C per minute for 10 minutes long. Then, it
# stays constant for another 10 minutes. There is a data point every
# minute and we use a time window of 10 minutes.
# Therefore, we can expect the derivative to peak at 1 after 10 minutes
# and then fall down to 0 in steps of 10%.
events: list[Event[EventStateChangedData]] = []
@callback
def _capture_event(event: Event) -> None:
events.append(event)
async_track_state_change_event(hass, "sensor.power", _capture_event)
temperature_values = []
for temperature in range(10):
temperature_values += [temperature]
temperature_values += [10] * 11
time_window = 600
times = list(range(0, 1200 + 60, 60))
config, entity_id = await _setup_sensor(
hass,
{
"time_window": {"seconds": time_window},
"unit_time": UnitOfTime.MINUTES,
"round": 1,
}
| extra_config,
)
base = dt_util.utcnow()
with freeze_time(base) as freezer:
last_derivative = 0
for time, value, extra_attributes in zip(
times, temperature_values, attributes, strict=True
):
now = base + timedelta(seconds=time)
freezer.move_to(now)
hass.states.async_set(
entity_id, value, extra_attributes, force_update=force_update
)
await hass.async_block_till_done()
await hass.async_block_till_done()
assert len(events[2:]) == len(times)
for time, event in zip(times, events[2:], strict=True):
state = event.data["new_state"]
derivative = round(float(state.state), config["sensor"]["round"])
if time_window == time:
assert derivative == 1.0
elif time_window < time < time_window * 2:
assert (0.1 - 1e-6) < abs(derivative - last_derivative) < (0.1 + 1e-6)
elif time == time_window * 2:
assert derivative == 0
last_derivative = derivative
async def test_data_moving_average_for_discrete_sensor(hass: HomeAssistant) -> None:
"""Test derivative sensor state."""
# We simulate the following situation:
# The temperature rises 1 °C per minute for 30 minutes long.
# There is a data point every 30 seconds, however, the sensor returns
# the temperature rounded down to an integer value.
# We use a time window of 10 minutes and therefore we can expect
# (because the true derivative is 1 °C/min) an error of less than 10%.
temperature_values = []
for temperature in range(30):
temperature_values += [temperature] * 2 # two values per minute
time_window = 600
times = list(range(0, 1800, 30))
config, entity_id = await _setup_sensor(
hass,
{
"time_window": {"seconds": time_window},
"unit_time": UnitOfTime.MINUTES,
"round": 1,
},
) # two minute window
base = dt_util.utcnow()
with freeze_time(base) as freezer:
for time, value in zip(times, temperature_values, strict=True):
now = base + timedelta(seconds=time)
freezer.move_to(now)
hass.states.async_set(entity_id, value, {})
await hass.async_block_till_done()
if time_window < time < times[-1] - time_window:
state = hass.states.get("sensor.power")
derivative = round(float(state.state), config["sensor"]["round"])
# Test that the error is never more than
# (time_window_in_minutes / true_derivative * 100) = 10% + ε
assert abs(1 - derivative) <= 0.1 + 1e-6
async def test_data_moving_average_for_irregular_times(hass: HomeAssistant) -> None:
"""Test derivative sensor state."""
# We simulate the following situation:
# The temperature rises 1 °C per minute for 30 minutes long.
# There is 60 random datapoints (and the start and end) and the signal is normally distributed
# around the expected value with ±0.1°C
# We use a time window of 1 minute and expect an error of less than the standard deviation. (0.01)
time_window = 60
random.seed(0)
times = sorted(random.sample(range(1800), 60))
def temp_function(time):
random.seed(0)
temp = time / (600)
return random.gauss(temp, 0.1)
temperature_values = list(map(temp_function, times))
config, entity_id = await _setup_sensor(
hass,
{
"time_window": {"seconds": time_window},
"unit_time": UnitOfTime.MINUTES,
"round": 3,
},
)
base = dt_util.utcnow()
with freeze_time(base) as freezer:
for time, value in zip(times, temperature_values, strict=True):
now = base + timedelta(seconds=time)
freezer.move_to(now)
hass.states.async_set(entity_id, value, {})
await hass.async_block_till_done()
if time_window < time and time > times[3]:
state = hass.states.get("sensor.power")
derivative = round(float(state.state), config["sensor"]["round"])
# Test that the error is never more than
# (time_window_in_minutes / true_derivative * 100) = 10% + ε
assert abs(0.1 - derivative) <= 0.01 + 1e-6
async def test_double_signal_after_delay(hass: HomeAssistant) -> None:
"""Test derivative sensor state."""
# The old algorithm would produce extreme values if, after a delay longer than the time window
# there would be two signals, a large spike would be produced. Check explicitly for this situation
time_window = 60
times = [*range(time_window * 10), time_window * 20, time_window * 20 + 0.01]
# just apply sine as some sort of temperature change and make sure the change after the delay is very small
temperature_values = [sin(x) for x in times]
temperature_values[-2] = temperature_values[-3] + 0.01
temperature_values[-1] = temperature_values[-2] + 0.01
config, entity_id = await _setup_sensor(
hass,
{
"time_window": {"seconds": time_window},
"unit_time": UnitOfTime.MINUTES,
"round": 3,
},
)
base = dt_util.utcnow()
previous = 0
with freeze_time(base) as freezer:
for time, value in zip(times, temperature_values, strict=True):
now = base + timedelta(seconds=time)
freezer.move_to(now)
hass.states.async_set(entity_id, value, {})
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
derivative = round(float(state.state), config["sensor"]["round"])
if time == times[-1]:
# Test that the error is never more than
# (time_window_in_minutes / true_derivative * 100) = 10% + ε
assert abs(previous - derivative) <= 0.01 + 1e-6
previous = derivative
async def test_sub_intervals_instantaneous(hass: HomeAssistant) -> None:
"""Test derivative sensor state."""
# We simulate the following situation:
# Value changes from 0 to 10 in 5 seconds (derivative = 2)
# The max_sub_interval is 20 seconds
# After max_sub_interval elapses, derivative should change to 0
# Value changes to 0, 35 seconds after changing to 10 (derivative = -10/35 = -0.29)
# State goes unavailable, derivative stops changing after that.
# State goes back to 0, derivative returns to 0 after a max_sub_interval
max_sub_interval = 20
config, entity_id = await _setup_sensor(
hass,
{
"unit_time": UnitOfTime.SECONDS,
"round": 2,
"max_sub_interval": {"seconds": max_sub_interval},
},
)
base = dt_util.utcnow()
with freeze_time(base) as freezer:
freezer.move_to(base)
hass.states.async_set(entity_id, 0, {}, force_update=True)
await hass.async_block_till_done()
now = base + timedelta(seconds=5)
freezer.move_to(now)
hass.states.async_set(entity_id, 10, {}, force_update=True)
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
derivative = round(float(state.state), config["sensor"]["round"])
assert derivative == 2
# No change yet as sub_interval not elapsed
now += timedelta(seconds=15)
async_fire_time_changed(hass, now)
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
derivative = round(float(state.state), config["sensor"]["round"])
assert derivative == 2
# After 5 more seconds the sub_interval should fire and derivative should be 0
now += timedelta(seconds=10)
async_fire_time_changed(hass, now)
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
derivative = round(float(state.state), config["sensor"]["round"])
assert derivative == 0
now += timedelta(seconds=10)
freezer.move_to(now)
hass.states.async_set(entity_id, 0, {}, force_update=True)
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
derivative = round(float(state.state), config["sensor"]["round"])
assert derivative == -0.29
now += timedelta(seconds=10)
freezer.move_to(now)
hass.states.async_set(entity_id, STATE_UNAVAILABLE, {}, force_update=True)
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
assert state.state == STATE_UNAVAILABLE
now += timedelta(seconds=60)
async_fire_time_changed(hass, now)
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
assert state.state == STATE_UNAVAILABLE
now += timedelta(seconds=10)
freezer.move_to(now)
hass.states.async_set(entity_id, 0, {}, force_update=True)
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
derivative = round(float(state.state), config["sensor"]["round"])
assert derivative == 0
now += timedelta(seconds=max_sub_interval + 1)
async_fire_time_changed(hass, now)
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
derivative = round(float(state.state), config["sensor"]["round"])
assert derivative == 0
async def test_sub_intervals_with_time_window(hass: HomeAssistant) -> None:
"""Test derivative sensor state."""
# We simulate the following situation:
# The value rises by 1 every second for 1 minute, then pauses
# The time window is 30 seconds
# The max_sub_interval is 5 seconds
# After the value stops increasing, the derivative should slowly trend back to 0
values = []
for value in range(60):
values += [value]
time_window = 30
max_sub_interval = 5
times = values
config, entity_id = await _setup_sensor(
hass,
{
"time_window": {"seconds": time_window},
"unit_time": UnitOfTime.SECONDS,
"round": 2,
"max_sub_interval": {"seconds": max_sub_interval},
},
)
base = dt_util.utcnow()
with freeze_time(base) as freezer:
last_state_change = None
for time, value in zip(times, values, strict=True):
now = base + timedelta(seconds=time)
freezer.move_to(now)
hass.states.async_set(entity_id, value, {}, force_update=True)
last_state_change = now
await hass.async_block_till_done()
if time_window < time:
state = hass.states.get("sensor.power")
derivative = round(float(state.state), config["sensor"]["round"])
# Test that the error is never more than
# (time_window_in_minutes / true_derivative * 100) = 1% + ε
assert abs(1 - derivative) <= 0.01 + 1e-6
for time in range(60):
now = last_state_change + timedelta(seconds=time)
freezer.move_to(now)
async_fire_time_changed(hass, now)
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
derivative = round(float(state.state), config["sensor"]["round"])
def calc_expected(elapsed_seconds: int, calculation_delay: int = 0):
last_sub_interval = (
elapsed_seconds // max_sub_interval
) * max_sub_interval
return (
0
if (last_sub_interval >= time_window)
else (
(time_window - last_sub_interval - calculation_delay)
/ time_window
)
)
rounding_err = 0.01 + 1e-6
expect_max = calc_expected(time) + rounding_err
# Allow one second of slop for internal delays
expect_min = calc_expected(time, 1) - rounding_err
assert expect_min <= derivative <= expect_max, f"Failed at time {time}"
async def test_prefix(hass: HomeAssistant) -> None:
"""Test derivative sensor state using a power source."""
config = {
"sensor": {
"platform": "derivative",
"name": "derivative",
"source": "sensor.power",
"round": 2,
"unit_prefix": "k",
}
}
assert await async_setup_component(hass, "sensor", config)
entity_id = config["sensor"]["source"]
base = dt_util.utcnow()
with freeze_time(base) as freezer:
hass.states.async_set(
entity_id,
1000,
{"unit_of_measurement": UnitOfPower.WATT},
)
await hass.async_block_till_done()
freezer.move_to(dt_util.utcnow() + timedelta(seconds=3600))
hass.states.async_set(
entity_id,
2000,
{"unit_of_measurement": UnitOfPower.WATT},
)
await hass.async_block_till_done()
state = hass.states.get("sensor.derivative")
assert state is not None
# Testing a power sensor increasing by 1000 Watts per hour = 1kW/h
assert round(float(state.state), config["sensor"]["round"]) == 1.0
assert state.attributes.get("unit_of_measurement") == f"kW/{UnitOfTime.HOURS}"
async def test_suffix(hass: HomeAssistant) -> None:
"""Test derivative sensor state using a network counter source."""
config = {
"sensor": {
"platform": "derivative",
"name": "derivative",
"source": "sensor.bytes_per_second",
"round": 2,
"unit_prefix": "k",
"unit_time": UnitOfTime.SECONDS,
}
}
assert await async_setup_component(hass, "sensor", config)
entity_id = config["sensor"]["source"]
base = dt_util.utcnow()
with freeze_time(base) as freezer:
hass.states.async_set(entity_id, 1000, {})
await hass.async_block_till_done()
freezer.move_to(dt_util.utcnow() + timedelta(seconds=3600))
hass.states.async_set(entity_id, 1000, {})
await hass.async_block_till_done()
state = hass.states.get("sensor.derivative")
assert state is not None
# Testing a network speed sensor at 1000 bytes/s over 10s = 10kbytes/s2
assert round(float(state.state), config["sensor"]["round"]) == 0.0
async def test_total_increasing_reset(hass: HomeAssistant) -> None:
"""Test derivative sensor state with total_increasing sensor input where it should ignore the reset value."""
times = [0, 20, 30, 35, 40, 50, 60]
values = [0, 10, 30, 40, 0, 10, 40]
expected_times = [0, 20, 30, 35, 50, 60]
expected_values = ["0.00", "0.50", "2.00", "2.00", "1.00", "3.00"]
config, entity_id = await _setup_sensor(hass, {"unit_time": UnitOfTime.SECONDS})
base_time = dt_util.utcnow()
actual_times = []
actual_values = []
with freeze_time(base_time) as freezer:
for time, value in zip(times, values, strict=True):
current_time = base_time + timedelta(seconds=time)
freezer.move_to(current_time)
hass.states.async_set(
entity_id,
value,
{ATTR_STATE_CLASS: SensorStateClass.TOTAL_INCREASING},
)
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
assert state is not None
if state.last_reported == current_time:
actual_times.append(time)
actual_values.append(state.state)
assert actual_times == expected_times
assert actual_values == expected_values
async def test_device_id(
hass: HomeAssistant,
entity_registry: er.EntityRegistry,
device_registry: dr.DeviceRegistry,
) -> None:
"""Test for source entity device for Derivative."""
source_config_entry = MockConfigEntry()
source_config_entry.add_to_hass(hass)
source_device_entry = device_registry.async_get_or_create(
config_entry_id=source_config_entry.entry_id,
identifiers={("sensor", "identifier_test")},
connections={("mac", "30:31:32:33:34:35")},
)
source_entity = entity_registry.async_get_or_create(
"sensor",
"test",
"source",
config_entry=source_config_entry,
device_id=source_device_entry.id,
)
await hass.async_block_till_done()
assert entity_registry.async_get("sensor.test_source") is not None
derivative_config_entry = MockConfigEntry(
data={},
domain=DOMAIN,
options={
"name": "Derivative",
"round": 1.0,
"source": "sensor.test_source",
"time_window": {"seconds": 0.0},
"unit_prefix": "k",
"unit_time": "min",
},
title="Derivative",
)
derivative_config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(derivative_config_entry.entry_id)
await hass.async_block_till_done()
derivative_entity = entity_registry.async_get("sensor.derivative")
assert derivative_entity is not None
assert derivative_entity.device_id == source_entity.device_id
@pytest.mark.parametrize("bad_state", [STATE_UNAVAILABLE, STATE_UNKNOWN, "foo"])
async def test_unavailable(
bad_state: str,
hass: HomeAssistant,
) -> None:
"""Test derivative sensor state when unavailable."""
config, entity_id = await _setup_sensor(hass, {"unit_time": "s"})
times = [0, 1, 2, 3]
values = [0, 1, bad_state, 2]
expected_state = [
0,
1,
STATE_UNAVAILABLE if bad_state == STATE_UNAVAILABLE else STATE_UNKNOWN,
0.5,
]
# Testing a energy sensor with non-monotonic intervals and values
base = dt_util.utcnow()
with freeze_time(base) as freezer:
for time, value, expect in zip(times, values, expected_state, strict=True):
freezer.move_to(base + timedelta(seconds=time))
hass.states.async_set(entity_id, value, {})
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
assert state is not None
rounded_state = (
state.state
if expect in [STATE_UNKNOWN, STATE_UNAVAILABLE]
else round(float(state.state), config["sensor"]["round"])
)
assert rounded_state == expect
@pytest.mark.parametrize("bad_state", [STATE_UNAVAILABLE, STATE_UNKNOWN, "foo"])
async def test_unavailable_2(
bad_state: str,
hass: HomeAssistant,
) -> None:
"""Test derivative sensor state when unavailable with a time window."""
config, entity_id = await _setup_sensor(
hass, {"unit_time": "s", "time_window": {"seconds": 10}}
)
# Monotonically increasing by 1, with some unavailable holes
times = list(range(21))
values = list(range(21))
values[3] = bad_state
values[6] = bad_state
values[7] = bad_state
values[8] = bad_state
base = dt_util.utcnow()
with freeze_time(base) as freezer:
for time, value in zip(times, values, strict=True):
freezer.move_to(base + timedelta(seconds=time))
hass.states.async_set(entity_id, value, {})
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
assert state is not None
if value == bad_state:
assert (
state.state == STATE_UNAVAILABLE
if bad_state is STATE_UNAVAILABLE
else STATE_UNKNOWN
)
else:
expect = (time / 10) if time < 10 else 1
assert round(float(state.state), config["sensor"]["round"]) == round(
expect, config["sensor"]["round"]
)
@pytest.mark.parametrize("restore_state", ["3.00", STATE_UNKNOWN])
async def test_unavailable_boot(
restore_state,
hass: HomeAssistant,
) -> None:
"""Test that the booting sequence does not leave derivative in a bad state."""
mock_restore_cache_with_extra_data(
hass,
[
(
State(
"sensor.power",
restore_state,
{
"unit_of_measurement": "W",
},
),
{
"native_value": restore_state,
"native_unit_of_measurement": "W",
},
),
],
)
config = {
"platform": "derivative",
"name": "power",
"source": "sensor.energy",
"round": 2,
"unit_time": "s",
}
config = {"sensor": config}
entity_id = config["sensor"]["source"]
hass.states.async_set(entity_id, STATE_UNAVAILABLE, {})
await hass.async_block_till_done()
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
assert state is not None
# Sensor is unavailable as source is unavailable
assert state.state == STATE_UNAVAILABLE
base = dt_util.utcnow()
with freeze_time(base) as freezer:
freezer.move_to(base + timedelta(seconds=1))
hass.states.async_set(entity_id, 10, {})
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
assert state is not None
# The source sensor has moved to a valid value, but we need 2 points to derive,
# so just hold until the next tick
assert state.state == restore_state
freezer.move_to(base + timedelta(seconds=2))
hass.states.async_set(entity_id, 15, {})
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
assert state is not None
# Now that the source sensor has two valid datapoints, we can calculate derivative
assert state.state == "5.00"