Convert filter tests to use pytest style (#41743)

pull/45952/head^2
Christopher Gramberg 2020-12-17 15:08:35 -06:00 committed by GitHub
parent f54fcb7646
commit c5fdde2a94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 280 additions and 268 deletions

View File

@ -54,7 +54,7 @@ from homeassistant.helpers import (
storage,
)
from homeassistant.helpers.json import JSONEncoder
from homeassistant.setup import setup_component
from homeassistant.setup import async_setup_component, setup_component
from homeassistant.util.async_ import run_callback_threadsafe
import homeassistant.util.dt as date_util
from homeassistant.util.unit_system import METRIC_SYSTEM
@ -801,6 +801,19 @@ def init_recorder_component(hass, add_config=None):
_LOGGER.info("In-memory recorder successfully started")
async def async_init_recorder_component(hass, add_config=None):
"""Initialize the recorder asynchronously."""
config = dict(add_config) if add_config else {}
config[recorder.CONF_DB_URL] = "sqlite://"
with patch("homeassistant.components.recorder.migration.migrate_schema"):
assert await async_setup_component(
hass, recorder.DOMAIN, {recorder.DOMAIN: config}
)
assert recorder.DOMAIN in hass.config.components
_LOGGER.info("In-memory recorder successfully started")
def mock_restore_cache(hass, states):
"""Mock the DATA_RESTORE_CACHE."""
key = restore_state.DATA_RESTORE_STATE_TASK

View File

@ -1,7 +1,8 @@
"""The test for the data filter sensor platform."""
from datetime import timedelta
from os import path
import unittest
from pytest import fixture
from homeassistant import config as hass_config
from homeassistant.components.filter.sensor import (
@ -15,309 +16,307 @@ from homeassistant.components.filter.sensor import (
)
from homeassistant.const import SERVICE_RELOAD
import homeassistant.core as ha
from homeassistant.setup import async_setup_component, setup_component
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from tests.async_mock import patch
from tests.common import (
assert_setup_component,
get_test_home_assistant,
init_recorder_component,
)
from tests.common import assert_setup_component, async_init_recorder_component
class TestFilterSensor(unittest.TestCase):
"""Test the Data Filter sensor."""
@fixture
def values():
"""Fixture for a list of test States."""
values = []
raw_values = [20, 19, 18, 21, 22, 0]
timestamp = dt_util.utcnow()
for val in raw_values:
values.append(ha.State("sensor.test_monitored", val, last_updated=timestamp))
timestamp += timedelta(minutes=1)
return values
def setup_method(self, method):
"""Set up things to be run when tests are started."""
self.hass = get_test_home_assistant()
self.hass.config.components.add("history")
raw_values = [20, 19, 18, 21, 22, 0]
self.values = []
timestamp = dt_util.utcnow()
for val in raw_values:
self.values.append(
ha.State("sensor.test_monitored", val, last_updated=timestamp)
)
timestamp += timedelta(minutes=1)
async def init_recorder(hass):
"""Init the recorder for testing."""
await async_init_recorder_component(hass)
await hass.async_start()
def teardown_method(self, method):
"""Stop everything that was started."""
self.hass.stop()
def init_recorder(self):
"""Initialize the recorder."""
init_recorder_component(self.hass)
self.hass.start()
def test_setup_fail(self):
"""Test if filter doesn't exist."""
config = {
"sensor": {
"platform": "filter",
"entity_id": "sensor.test_monitored",
"filters": [{"filter": "nonexisting"}],
}
async def test_setup_fail(hass):
"""Test if filter doesn't exist."""
config = {
"sensor": {
"platform": "filter",
"entity_id": "sensor.test_monitored",
"filters": [{"filter": "nonexisting"}],
}
with assert_setup_component(0):
assert setup_component(self.hass, "sensor", config)
self.hass.block_till_done()
}
hass.config.components.add("history")
with assert_setup_component(0):
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
def test_chain(self):
"""Test if filter chaining works."""
config = {
"sensor": {
"platform": "filter",
"name": "test",
"entity_id": "sensor.test_monitored",
"filters": [
{"filter": "outlier", "window_size": 10, "radius": 4.0},
{"filter": "lowpass", "time_constant": 10, "precision": 2},
{"filter": "throttle", "window_size": 1},
],
}
async def test_chain(hass, values):
"""Test if filter chaining works."""
config = {
"sensor": {
"platform": "filter",
"name": "test",
"entity_id": "sensor.test_monitored",
"filters": [
{"filter": "outlier", "window_size": 10, "radius": 4.0},
{"filter": "lowpass", "time_constant": 10, "precision": 2},
{"filter": "throttle", "window_size": 1},
],
}
}
hass.config.components.add("history")
with assert_setup_component(1, "sensor"):
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
with assert_setup_component(1, "sensor"):
assert setup_component(self.hass, "sensor", config)
self.hass.block_till_done()
for value in values:
hass.states.async_set(config["sensor"]["entity_id"], value.state)
await hass.async_block_till_done()
for value in self.values:
self.hass.states.set(config["sensor"]["entity_id"], value.state)
self.hass.block_till_done()
state = hass.states.get("sensor.test")
assert "18.05" == state.state
state = self.hass.states.get("sensor.test")
assert "18.05" == state.state
def test_chain_history(self, missing=False):
"""Test if filter chaining works."""
self.init_recorder()
config = {
"history": {},
"sensor": {
"platform": "filter",
"name": "test",
"entity_id": "sensor.test_monitored",
"filters": [
{"filter": "outlier", "window_size": 10, "radius": 4.0},
{"filter": "lowpass", "time_constant": 10, "precision": 2},
{"filter": "throttle", "window_size": 1},
],
},
}
t_0 = dt_util.utcnow() - timedelta(minutes=1)
t_1 = dt_util.utcnow() - timedelta(minutes=2)
t_2 = dt_util.utcnow() - timedelta(minutes=3)
t_3 = dt_util.utcnow() - timedelta(minutes=4)
if missing:
fake_states = {}
else:
fake_states = {
"sensor.test_monitored": [
ha.State("sensor.test_monitored", 18.0, last_changed=t_0),
ha.State("sensor.test_monitored", "unknown", last_changed=t_1),
ha.State("sensor.test_monitored", 19.0, last_changed=t_2),
ha.State("sensor.test_monitored", 18.2, last_changed=t_3),
]
}
with patch(
"homeassistant.components.history.state_changes_during_period",
return_value=fake_states,
):
with patch(
"homeassistant.components.history.get_last_state_changes",
return_value=fake_states,
):
with assert_setup_component(1, "sensor"):
assert setup_component(self.hass, "sensor", config)
self.hass.block_till_done()
for value in self.values:
self.hass.states.set(config["sensor"]["entity_id"], value.state)
self.hass.block_till_done()
state = self.hass.states.get("sensor.test")
if missing:
assert "18.05" == state.state
else:
assert "17.05" == state.state
def test_chain_history_missing(self):
"""Test if filter chaining works when recorder is enabled but the source is not recorded."""
return self.test_chain_history(missing=True)
def test_history_time(self):
"""Test loading from history based on a time window."""
self.init_recorder()
config = {
"history": {},
"sensor": {
"platform": "filter",
"name": "test",
"entity_id": "sensor.test_monitored",
"filters": [{"filter": "time_throttle", "window_size": "00:01"}],
},
}
t_0 = dt_util.utcnow() - timedelta(minutes=1)
t_1 = dt_util.utcnow() - timedelta(minutes=2)
t_2 = dt_util.utcnow() - timedelta(minutes=3)
async def test_chain_history(hass, values, missing=False):
"""Test if filter chaining works."""
await init_recorder(hass)
config = {
"history": {},
"sensor": {
"platform": "filter",
"name": "test",
"entity_id": "sensor.test_monitored",
"filters": [
{"filter": "outlier", "window_size": 10, "radius": 4.0},
{"filter": "lowpass", "time_constant": 10, "precision": 2},
{"filter": "throttle", "window_size": 1},
],
},
}
t_0 = dt_util.utcnow() - timedelta(minutes=1)
t_1 = dt_util.utcnow() - timedelta(minutes=2)
t_2 = dt_util.utcnow() - timedelta(minutes=3)
t_3 = dt_util.utcnow() - timedelta(minutes=4)
if missing:
fake_states = {}
else:
fake_states = {
"sensor.test_monitored": [
ha.State("sensor.test_monitored", 18.0, last_changed=t_0),
ha.State("sensor.test_monitored", 19.0, last_changed=t_1),
ha.State("sensor.test_monitored", 18.2, last_changed=t_2),
ha.State("sensor.test_monitored", "unknown", last_changed=t_1),
ha.State("sensor.test_monitored", 19.0, last_changed=t_2),
ha.State("sensor.test_monitored", 18.2, last_changed=t_3),
]
}
with patch(
"homeassistant.components.history.state_changes_during_period",
return_value=fake_states,
):
with patch(
"homeassistant.components.history.state_changes_during_period",
"homeassistant.components.history.get_last_state_changes",
return_value=fake_states,
):
with patch(
"homeassistant.components.history.get_last_state_changes",
return_value=fake_states,
):
with assert_setup_component(1, "sensor"):
assert setup_component(self.hass, "sensor", config)
self.hass.block_till_done()
with assert_setup_component(1, "sensor"):
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
self.hass.block_till_done()
state = self.hass.states.get("sensor.test")
assert "18.0" == state.state
for value in values:
hass.states.async_set(config["sensor"]["entity_id"], value.state)
await hass.async_block_till_done()
def test_outlier(self):
"""Test if outlier filter works."""
filt = OutlierFilter(window_size=3, precision=2, entity=None, radius=4.0)
for state in self.values:
filtered = filt.filter_state(state)
assert 21 == filtered.state
def test_outlier_step(self):
"""
Test step-change handling in outlier.
Test if outlier filter handles long-running step-changes correctly.
It should converge to no longer filter once just over half the
window_size is occupied by the new post step-change values.
"""
filt = OutlierFilter(window_size=3, precision=2, entity=None, radius=1.1)
self.values[-1].state = 22
for state in self.values:
filtered = filt.filter_state(state)
assert 22 == filtered.state
def test_initial_outlier(self):
"""Test issue #13363."""
filt = OutlierFilter(window_size=3, precision=2, entity=None, radius=4.0)
out = ha.State("sensor.test_monitored", 4000)
for state in [out] + self.values:
filtered = filt.filter_state(state)
assert 21 == filtered.state
def test_unknown_state_outlier(self):
"""Test issue #32395."""
filt = OutlierFilter(window_size=3, precision=2, entity=None, radius=4.0)
out = ha.State("sensor.test_monitored", "unknown")
for state in [out] + self.values + [out]:
try:
filtered = filt.filter_state(state)
except ValueError:
assert state.state == "unknown"
assert 21 == filtered.state
def test_precision_zero(self):
"""Test if precision of zero returns an integer."""
filt = LowPassFilter(window_size=10, precision=0, entity=None, time_constant=10)
for state in self.values:
filtered = filt.filter_state(state)
assert isinstance(filtered.state, int)
def test_lowpass(self):
"""Test if lowpass filter works."""
filt = LowPassFilter(window_size=10, precision=2, entity=None, time_constant=10)
out = ha.State("sensor.test_monitored", "unknown")
for state in [out] + self.values + [out]:
try:
filtered = filt.filter_state(state)
except ValueError:
assert state.state == "unknown"
assert 18.05 == filtered.state
def test_range(self):
"""Test if range filter works."""
lower = 10
upper = 20
filt = RangeFilter(
entity=None, precision=2, lower_bound=lower, upper_bound=upper
)
for unf_state in self.values:
unf = float(unf_state.state)
filtered = filt.filter_state(unf_state)
if unf < lower:
assert lower == filtered.state
elif unf > upper:
assert upper == filtered.state
state = hass.states.get("sensor.test")
if missing:
assert "18.05" == state.state
else:
assert unf == filtered.state
assert "17.05" == state.state
def test_range_zero(self):
"""Test if range filter works with zeroes as bounds."""
lower = 0
upper = 0
filt = RangeFilter(
entity=None, precision=2, lower_bound=lower, upper_bound=upper
)
for unf_state in self.values:
unf = float(unf_state.state)
filtered = filt.filter_state(unf_state)
if unf < lower:
assert lower == filtered.state
elif unf > upper:
assert upper == filtered.state
else:
assert unf == filtered.state
def test_throttle(self):
"""Test if lowpass filter works."""
filt = ThrottleFilter(window_size=3, precision=2, entity=None)
filtered = []
for state in self.values:
new_state = filt.filter_state(state)
if not filt.skip_processing:
filtered.append(new_state)
assert [20, 21] == [f.state for f in filtered]
async def test_chain_history_missing(hass, values):
"""Test if filter chaining works when recorder is enabled but the source is not recorded."""
await test_chain_history(hass, values, missing=True)
def test_time_throttle(self):
"""Test if lowpass filter works."""
filt = TimeThrottleFilter(
window_size=timedelta(minutes=2), precision=2, entity=None
)
filtered = []
for state in self.values:
new_state = filt.filter_state(state)
if not filt.skip_processing:
filtered.append(new_state)
assert [20, 18, 22] == [f.state for f in filtered]
def test_time_sma(self):
"""Test if time_sma filter works."""
filt = TimeSMAFilter(
window_size=timedelta(minutes=2), precision=2, entity=None, type="last"
)
for state in self.values:
async def test_history_time(hass):
"""Test loading from history based on a time window."""
await init_recorder(hass)
config = {
"history": {},
"sensor": {
"platform": "filter",
"name": "test",
"entity_id": "sensor.test_monitored",
"filters": [{"filter": "time_throttle", "window_size": "00:01"}],
},
}
t_0 = dt_util.utcnow() - timedelta(minutes=1)
t_1 = dt_util.utcnow() - timedelta(minutes=2)
t_2 = dt_util.utcnow() - timedelta(minutes=3)
fake_states = {
"sensor.test_monitored": [
ha.State("sensor.test_monitored", 18.0, last_changed=t_0),
ha.State("sensor.test_monitored", 19.0, last_changed=t_1),
ha.State("sensor.test_monitored", 18.2, last_changed=t_2),
]
}
with patch(
"homeassistant.components.history.state_changes_during_period",
return_value=fake_states,
):
with patch(
"homeassistant.components.history.get_last_state_changes",
return_value=fake_states,
):
with assert_setup_component(1, "sensor"):
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get("sensor.test")
assert "18.0" == state.state
async def test_outlier(values):
"""Test if outlier filter works."""
filt = OutlierFilter(window_size=3, precision=2, entity=None, radius=4.0)
for state in values:
filtered = filt.filter_state(state)
assert 21 == filtered.state
def test_outlier_step(values):
"""
Test step-change handling in outlier.
Test if outlier filter handles long-running step-changes correctly.
It should converge to no longer filter once just over half the
window_size is occupied by the new post step-change values.
"""
filt = OutlierFilter(window_size=3, precision=2, entity=None, radius=1.1)
values[-1].state = 22
for state in values:
filtered = filt.filter_state(state)
assert 22 == filtered.state
def test_initial_outlier(values):
"""Test issue #13363."""
filt = OutlierFilter(window_size=3, precision=2, entity=None, radius=4.0)
out = ha.State("sensor.test_monitored", 4000)
for state in [out] + values:
filtered = filt.filter_state(state)
assert 21 == filtered.state
def test_unknown_state_outlier(values):
"""Test issue #32395."""
filt = OutlierFilter(window_size=3, precision=2, entity=None, radius=4.0)
out = ha.State("sensor.test_monitored", "unknown")
for state in [out] + values + [out]:
try:
filtered = filt.filter_state(state)
assert 21.5 == filtered.state
except ValueError:
assert state.state == "unknown"
assert 21 == filtered.state
def test_precision_zero(values):
"""Test if precision of zero returns an integer."""
filt = LowPassFilter(window_size=10, precision=0, entity=None, time_constant=10)
for state in values:
filtered = filt.filter_state(state)
assert isinstance(filtered.state, int)
def test_lowpass(values):
"""Test if lowpass filter works."""
filt = LowPassFilter(window_size=10, precision=2, entity=None, time_constant=10)
out = ha.State("sensor.test_monitored", "unknown")
for state in [out] + values + [out]:
try:
filtered = filt.filter_state(state)
except ValueError:
assert state.state == "unknown"
assert 18.05 == filtered.state
def test_range(values):
"""Test if range filter works."""
lower = 10
upper = 20
filt = RangeFilter(entity=None, precision=2, lower_bound=lower, upper_bound=upper)
for unf_state in values:
unf = float(unf_state.state)
filtered = filt.filter_state(unf_state)
if unf < lower:
assert lower == filtered.state
elif unf > upper:
assert upper == filtered.state
else:
assert unf == filtered.state
def test_range_zero(values):
"""Test if range filter works with zeroes as bounds."""
lower = 0
upper = 0
filt = RangeFilter(entity=None, precision=2, lower_bound=lower, upper_bound=upper)
for unf_state in values:
unf = float(unf_state.state)
filtered = filt.filter_state(unf_state)
if unf < lower:
assert lower == filtered.state
elif unf > upper:
assert upper == filtered.state
else:
assert unf == filtered.state
def test_throttle(values):
"""Test if lowpass filter works."""
filt = ThrottleFilter(window_size=3, precision=2, entity=None)
filtered = []
for state in values:
new_state = filt.filter_state(state)
if not filt.skip_processing:
filtered.append(new_state)
assert [20, 21] == [f.state for f in filtered]
def test_time_throttle(values):
"""Test if lowpass filter works."""
filt = TimeThrottleFilter(
window_size=timedelta(minutes=2), precision=2, entity=None
)
filtered = []
for state in values:
new_state = filt.filter_state(state)
if not filt.skip_processing:
filtered.append(new_state)
assert [20, 18, 22] == [f.state for f in filtered]
def test_time_sma(values):
"""Test if time_sma filter works."""
filt = TimeSMAFilter(
window_size=timedelta(minutes=2), precision=2, entity=None, type="last"
)
for state in values:
filtered = filt.filter_state(state)
assert 21.5 == filtered.state
async def test_reload(hass):
"""Verify we can reload filter sensors."""
await hass.async_add_executor_job(
init_recorder_component, hass
) # force in memory db
await init_recorder(hass)
hass.states.async_set("sensor.test_monitored", 12345)
await async_setup_component(