Fix Filter Sensor - check existing entity history (#25870)

* make sure we have entity history

* increase coverage

* increase coverage

* no need for list comprehension

* increase coverage
pull/25927/head
Diogo Gomes 2019-08-13 19:07:05 +01:00 committed by Paulus Schoutsen
parent e0ea5f2b04
commit 1f2bab8e67
2 changed files with 91 additions and 16 deletions

View File

@ -242,7 +242,8 @@ class SensorFilter(Entity):
entity_id=self._entity,
)
)
history_list.extend([state for state in filter_history[self._entity]])
if self._entity in filter_history:
history_list.extend(filter_history[self._entity])
if largest_window_time > timedelta(seconds=0):
start = dt_util.utcnow() - largest_window_time
filter_history = await self.hass.async_add_job(
@ -253,13 +254,14 @@ class SensorFilter(Entity):
entity_id=self._entity,
)
)
history_list.extend(
[
state
for state in filter_history[self._entity]
if state not in history_list
]
)
if self._entity in filter_history:
history_list.extend(
[
state
for state in filter_history[self._entity]
if state not in history_list
]
)
# Sort the window states
history_list = sorted(history_list, key=lambda s: s.last_updated)

View File

@ -59,6 +59,31 @@ class TestFilterSensor(unittest.TestCase):
assert setup_component(self.hass, "sensor", config)
def test_chain(self):
"""Test if filter chaining works."""
config = {
"sensor": {
"platform": "filter",
"name": "test",
"entity_id": "sensor.test_monitored",
"filters": [
{"filter": "outlier", "window_size": 10, "radius": 4.0},
{"filter": "lowpass", "time_constant": 10, "precision": 2},
{"filter": "throttle", "window_size": 1},
],
}
}
with assert_setup_component(1, "sensor"):
assert setup_component(self.hass, "sensor", config)
for value in self.values:
self.hass.states.set(config["sensor"]["entity_id"], value.state)
self.hass.block_till_done()
state = self.hass.states.get("sensor.test")
assert "18.05" == state.state
def test_chain_history(self, missing=False):
"""Test if filter chaining works."""
self.init_recorder()
config = {
@ -78,13 +103,16 @@ class TestFilterSensor(unittest.TestCase):
t_1 = dt_util.utcnow() - timedelta(minutes=2)
t_2 = dt_util.utcnow() - timedelta(minutes=3)
fake_states = {
"sensor.test_monitored": [
ha.State("sensor.test_monitored", 18.0, last_changed=t_0),
ha.State("sensor.test_monitored", 19.0, last_changed=t_1),
ha.State("sensor.test_monitored", 18.2, last_changed=t_2),
]
}
if missing:
fake_states = {}
else:
fake_states = {
"sensor.test_monitored": [
ha.State("sensor.test_monitored", 18.0, last_changed=t_0),
ha.State("sensor.test_monitored", 19.0, last_changed=t_1),
ha.State("sensor.test_monitored", 18.2, last_changed=t_2),
]
}
with patch(
"homeassistant.components.history." "state_changes_during_period",
@ -102,7 +130,52 @@ class TestFilterSensor(unittest.TestCase):
self.hass.block_till_done()
state = self.hass.states.get("sensor.test")
assert "17.05" == state.state
if missing:
assert "18.05" == state.state
else:
assert "17.05" == state.state
def test_chain_history_missing(self):
"""Test if filter chaining works when recorder is enabled but the source is not recorded."""
return self.test_chain_history(missing=True)
def test_history_time(self):
"""Test loading from history based on a time window."""
self.init_recorder()
config = {
"history": {},
"sensor": {
"platform": "filter",
"name": "test",
"entity_id": "sensor.test_monitored",
"filters": [{"filter": "time_throttle", "window_size": "00:01"}],
},
}
t_0 = dt_util.utcnow() - timedelta(minutes=1)
t_1 = dt_util.utcnow() - timedelta(minutes=2)
t_2 = dt_util.utcnow() - timedelta(minutes=3)
fake_states = {
"sensor.test_monitored": [
ha.State("sensor.test_monitored", 18.0, last_changed=t_0),
ha.State("sensor.test_monitored", 19.0, last_changed=t_1),
ha.State("sensor.test_monitored", 18.2, last_changed=t_2),
]
}
with patch(
"homeassistant.components.history." "state_changes_during_period",
return_value=fake_states,
):
with patch(
"homeassistant.components.history." "get_last_state_changes",
return_value=fake_states,
):
with assert_setup_component(1, "sensor"):
assert setup_component(self.hass, "sensor", config)
self.hass.block_till_done()
state = self.hass.states.get("sensor.test")
assert "18.0" == state.state
def test_outlier(self):
"""Test if outlier filter works."""