Use states to avoid decoding logbook state changed events. (#36768)
avg 4.43s -> 1.88spull/36792/head
parent
83e3f680bf
commit
b0163b65c6
|
@ -193,14 +193,15 @@ class LogbookView(HomeAssistantView):
|
|||
return await hass.async_add_job(json_events)
|
||||
|
||||
|
||||
def humanify(hass, events):
|
||||
def humanify(hass, events, prev_states=None):
|
||||
"""Generate a converted list of events into Entry objects.
|
||||
|
||||
Will try to group events if possible:
|
||||
- if 2+ sensor updates in GROUP_BY_MINUTES, show last
|
||||
- if Home Assistant stop and start happen in same minute call it restarted
|
||||
"""
|
||||
domain_prefixes = tuple(f"{dom}." for dom in CONTINUOUS_DOMAINS)
|
||||
if prev_states is None:
|
||||
prev_states = {}
|
||||
|
||||
# Group events in batches of GROUP_BY_MINUTES
|
||||
for _, g_events in groupby(
|
||||
|
@ -219,10 +220,8 @@ def humanify(hass, events):
|
|||
# Process events
|
||||
for event in events_batch:
|
||||
if event.event_type == EVENT_STATE_CHANGED:
|
||||
entity_id = event.data.get("entity_id")
|
||||
|
||||
if entity_id.startswith(domain_prefixes):
|
||||
last_sensor_event[entity_id] = event
|
||||
if event.domain in CONTINUOUS_DOMAINS:
|
||||
last_sensor_event[event.entity_id] = event
|
||||
|
||||
elif event.event_type == EVENT_HOMEASSISTANT_STOP:
|
||||
if event.time_fired.minute in start_stop_events:
|
||||
|
@ -249,32 +248,34 @@ def humanify(hass, events):
|
|||
yield data
|
||||
|
||||
if event.event_type == EVENT_STATE_CHANGED:
|
||||
new_state = event.data.get("new_state")
|
||||
entity_id = event.entity_id
|
||||
|
||||
entity_id = new_state.get("entity_id")
|
||||
domain, object_id = split_entity_id(entity_id)
|
||||
|
||||
# Skip all but the last sensor state
|
||||
if (
|
||||
domain in CONTINUOUS_DOMAINS
|
||||
and event != last_sensor_event[entity_id]
|
||||
):
|
||||
# Skip events that have not changed state
|
||||
if entity_id in prev_states and prev_states[entity_id] == event.state:
|
||||
continue
|
||||
|
||||
attributes = new_state.get("attributes", {})
|
||||
prev_states[entity_id] = event.state
|
||||
domain = event.domain
|
||||
|
||||
# Don't show continuous sensor value changes in the logbook
|
||||
if domain in CONTINUOUS_DOMAINS and attributes.get(
|
||||
"unit_of_measurement"
|
||||
):
|
||||
continue
|
||||
if domain in CONTINUOUS_DOMAINS:
|
||||
# Skip all but the last sensor state
|
||||
if event != last_sensor_event[entity_id]:
|
||||
continue
|
||||
|
||||
name = attributes.get(ATTR_FRIENDLY_NAME) or object_id.replace("_", " ")
|
||||
# Don't show continuous sensor value changes in the logbook
|
||||
if _get_attribute(hass, entity_id, event, "unit_of_measurement"):
|
||||
continue
|
||||
|
||||
name = _get_attribute(
|
||||
hass, entity_id, event, ATTR_FRIENDLY_NAME
|
||||
) or split_entity_id(entity_id)[1].replace("_", " ")
|
||||
|
||||
yield {
|
||||
"when": event.time_fired,
|
||||
"name": name,
|
||||
"message": _entry_message_from_state(domain, new_state),
|
||||
"message": _entry_message_from_event(
|
||||
hass, entity_id, domain, event
|
||||
),
|
||||
"domain": domain,
|
||||
"entity_id": entity_id,
|
||||
"context_id": event.context.id,
|
||||
|
@ -383,7 +384,7 @@ def _get_events(hass, config, start_day, end_day, entity_id=None):
|
|||
def yield_events(query):
|
||||
"""Yield Events that are not filtered away."""
|
||||
for row in query.yield_per(500):
|
||||
event = LazyEvent(row)
|
||||
event = LazyEventPartialState(row)
|
||||
if _keep_event(hass, event, entities_filter):
|
||||
yield event
|
||||
|
||||
|
@ -400,6 +401,9 @@ def _get_events(hass, config, start_day, end_day, entity_id=None):
|
|||
Events.time_fired,
|
||||
Events.context_id,
|
||||
Events.context_user_id,
|
||||
States.state,
|
||||
States.entity_id,
|
||||
States.domain,
|
||||
)
|
||||
.order_by(Events.time_fired)
|
||||
.outerjoin(States, (Events.event_id == States.event_id))
|
||||
|
@ -416,62 +420,62 @@ def _get_events(hass, config, start_day, end_day, entity_id=None):
|
|||
)
|
||||
)
|
||||
|
||||
return list(humanify(hass, yield_events(query)))
|
||||
prev_states = {}
|
||||
return list(humanify(hass, yield_events(query), prev_states))
|
||||
|
||||
|
||||
def _get_attribute(hass, entity_id, event, attribute):
|
||||
current_state = hass.states.get(entity_id)
|
||||
if not current_state:
|
||||
return event.data.get("new_state", {}).get("attributes", {}).get(attribute)
|
||||
return current_state.attributes.get(attribute, None)
|
||||
|
||||
|
||||
def _keep_event(hass, event, entities_filter):
|
||||
event_data = event.data
|
||||
domain = event_data.get(ATTR_DOMAIN)
|
||||
entity_id = event_data.get("entity_id")
|
||||
if entity_id:
|
||||
domain = split_entity_id(entity_id)[0]
|
||||
entity_id = None
|
||||
|
||||
if event.event_type == EVENT_STATE_CHANGED:
|
||||
entity_id = event.entity_id
|
||||
if entity_id is None:
|
||||
return False
|
||||
|
||||
# Do not report on new entities
|
||||
old_state = event_data.get("old_state")
|
||||
if old_state is None:
|
||||
return False
|
||||
|
||||
# Do not report on entity removal
|
||||
new_state = event_data.get("new_state")
|
||||
if new_state is None:
|
||||
return False
|
||||
|
||||
# Do not report on only attribute changes
|
||||
if new_state.get("state") == old_state.get("state"):
|
||||
return False
|
||||
|
||||
attributes = new_state.get("attributes", {})
|
||||
|
||||
# Also filter auto groups.
|
||||
if domain == "group" and attributes.get("auto", False):
|
||||
if not event.has_old_and_new_state:
|
||||
return False
|
||||
|
||||
# exclude entities which are customized hidden
|
||||
if attributes.get(ATTR_HIDDEN, False):
|
||||
if event.hidden:
|
||||
return False
|
||||
|
||||
elif event.event_type == EVENT_LOGBOOK_ENTRY:
|
||||
event_data = event.data
|
||||
domain = event_data.get(ATTR_DOMAIN)
|
||||
|
||||
elif not entity_id and event.event_type in hass.data.get(DOMAIN, {}):
|
||||
elif event.event_type in hass.data.get(DOMAIN, {}) and not event.data.get(
|
||||
"entity_id"
|
||||
):
|
||||
# If the entity_id isn't described, use the domain that describes
|
||||
# the event for filtering.
|
||||
domain = hass.data[DOMAIN][event.event_type][0]
|
||||
|
||||
else:
|
||||
event_data = event.data
|
||||
domain = event_data.get(ATTR_DOMAIN)
|
||||
entity_id = event_data.get("entity_id")
|
||||
if entity_id:
|
||||
domain = split_entity_id(entity_id)[0]
|
||||
|
||||
if not entity_id and domain:
|
||||
entity_id = f"{domain}."
|
||||
|
||||
return not entity_id or entities_filter(entity_id)
|
||||
|
||||
|
||||
def _entry_message_from_state(domain, state):
|
||||
def _entry_message_from_event(hass, entity_id, domain, event):
|
||||
"""Convert a state to a message for the logbook."""
|
||||
# We pass domain in so we don't have to split entity_id again
|
||||
state_state = state.get("state")
|
||||
state_state = event.state
|
||||
|
||||
if domain in ["device_tracker", "person"]:
|
||||
if state_state == STATE_NOT_HOME:
|
||||
|
@ -483,9 +487,8 @@ def _entry_message_from_state(domain, state):
|
|||
return "has risen"
|
||||
return "has set"
|
||||
|
||||
device_class = state.get("attributes", {}).get("device_class")
|
||||
|
||||
if domain == "binary_sensor" and device_class:
|
||||
if domain == "binary_sensor":
|
||||
device_class = _get_attribute(hass, entity_id, event, "device_class")
|
||||
if device_class == "battery":
|
||||
if state_state == STATE_ON:
|
||||
return "is low"
|
||||
|
@ -557,8 +560,8 @@ def _entry_message_from_state(domain, state):
|
|||
return f"changed to {state_state}"
|
||||
|
||||
|
||||
class LazyEvent:
|
||||
"""A lazy version of core Event."""
|
||||
class LazyEventPartialState:
|
||||
"""A lazy version of core Event with limited State joined in."""
|
||||
|
||||
__slots__ = ["_row", "_event_data", "_time_fired", "_context"]
|
||||
|
||||
|
@ -586,8 +589,12 @@ class LazyEvent:
|
|||
@property
|
||||
def data(self):
|
||||
"""Event data."""
|
||||
|
||||
if not self._event_data:
|
||||
self._event_data = json.loads(self._row.event_data)
|
||||
if self._row.event_data == "{}":
|
||||
self._event_data = {}
|
||||
else:
|
||||
self._event_data = json.loads(self._row.event_data)
|
||||
return self._event_data
|
||||
|
||||
@property
|
||||
|
@ -598,3 +605,37 @@ class LazyEvent:
|
|||
process_timestamp(self._row.time_fired) or dt_util.utcnow()
|
||||
)
|
||||
return self._time_fired
|
||||
|
||||
@property
|
||||
def has_old_and_new_state(self):
|
||||
"""Check the json data to see if new_state and old_state is present without decoding."""
|
||||
return (
|
||||
'"old_state": {' in self._row.event_data
|
||||
and '"new_state": {' in self._row.event_data
|
||||
)
|
||||
|
||||
@property
|
||||
def hidden(self):
|
||||
"""Check the json to see if hidden."""
|
||||
if '"hidden":' in self._row.event_data:
|
||||
return (
|
||||
self.data.get("new_state", {})
|
||||
.get("attributes", {})
|
||||
.get(ATTR_HIDDEN, False)
|
||||
)
|
||||
return False
|
||||
|
||||
@property
|
||||
def entity_id(self):
|
||||
"""Entity id that changed state."""
|
||||
return self._row.entity_id
|
||||
|
||||
@property
|
||||
def domain(self):
|
||||
"""Domain of the entity_id that changed state."""
|
||||
return self._row.domain
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
"""State of the entity_id that changed state."""
|
||||
return self._row.state
|
||||
|
|
|
@ -200,6 +200,6 @@ def process_timestamp(ts):
|
|||
if ts is None:
|
||||
return None
|
||||
if ts.tzinfo is None:
|
||||
return dt_util.UTC.localize(ts)
|
||||
return ts.replace(tzinfo=dt_util.UTC)
|
||||
|
||||
return dt_util.as_utc(ts)
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
"""The tests for the logbook component."""
|
||||
# pylint: disable=protected-access,invalid-name
|
||||
import collections
|
||||
from datetime import datetime, timedelta
|
||||
from functools import partial
|
||||
import json
|
||||
import logging
|
||||
import unittest
|
||||
|
||||
|
@ -21,6 +23,7 @@ from homeassistant.const import (
|
|||
STATE_ON,
|
||||
)
|
||||
import homeassistant.core as ha
|
||||
from homeassistant.helpers.json import JSONEncoder
|
||||
from homeassistant.setup import async_setup_component, setup_component
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
|
@ -152,9 +155,14 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
pointA = dt_util.utcnow()
|
||||
pointB = pointA + timedelta(minutes=logbook.GROUP_BY_MINUTES)
|
||||
|
||||
eventA = self.create_state_changed_event(pointA, entity_id, 10)
|
||||
state_on = ha.State(
|
||||
entity_id, "on", {"brightness": 200}, pointA, pointA
|
||||
).as_dict()
|
||||
|
||||
eventA = self.create_state_changed_event_from_old_new(
|
||||
entity_id, pointA, None, state_on
|
||||
)
|
||||
eventB = self.create_state_changed_event(pointB, entity_id2, 20)
|
||||
eventA.data["old_state"] = None
|
||||
|
||||
entities_filter = logbook._generate_filter_from_config({})
|
||||
events = [
|
||||
|
@ -179,9 +187,13 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
pointA = dt_util.utcnow()
|
||||
pointB = pointA + timedelta(minutes=logbook.GROUP_BY_MINUTES)
|
||||
|
||||
eventA = self.create_state_changed_event(pointA, entity_id, 10)
|
||||
state_on = ha.State(
|
||||
entity_id, "on", {"brightness": 200}, pointA, pointA
|
||||
).as_dict()
|
||||
eventA = self.create_state_changed_event_from_old_new(
|
||||
None, pointA, state_on, None,
|
||||
)
|
||||
eventB = self.create_state_changed_event(pointB, entity_id2, 20)
|
||||
eventA.data["new_state"] = None
|
||||
|
||||
entities_filter = logbook._generate_filter_from_config({})
|
||||
events = [
|
||||
|
@ -436,28 +448,6 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
entries[4], pointB, "blu", domain="sensor", entity_id=entity_id2
|
||||
)
|
||||
|
||||
def test_exclude_auto_groups(self):
|
||||
"""Test if events of automatically generated groups are filtered."""
|
||||
entity_id = "switch.bla"
|
||||
entity_id2 = "group.switches"
|
||||
pointA = dt_util.utcnow()
|
||||
|
||||
eventA = self.create_state_changed_event(pointA, entity_id, 10)
|
||||
eventB = self.create_state_changed_event(pointA, entity_id2, 20, {"auto": True})
|
||||
|
||||
entities_filter = logbook._generate_filter_from_config({})
|
||||
events = [
|
||||
e
|
||||
for e in (eventA, eventB)
|
||||
if logbook._keep_event(self.hass, e, entities_filter)
|
||||
]
|
||||
entries = list(logbook.humanify(self.hass, events))
|
||||
|
||||
assert len(entries) == 1
|
||||
self.assert_entry(
|
||||
entries[0], pointA, "bla", domain="switch", entity_id=entity_id
|
||||
)
|
||||
|
||||
def test_exclude_attribute_changes(self):
|
||||
"""Test if events of attribute changes are filtered."""
|
||||
pointA = dt_util.utcnow()
|
||||
|
@ -472,23 +462,11 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
"light.kitchen", "on", {"brightness": 200}, pointB, pointC
|
||||
).as_dict()
|
||||
|
||||
eventA = ha.Event(
|
||||
EVENT_STATE_CHANGED,
|
||||
{
|
||||
"entity_id": "light.kitchen",
|
||||
"old_state": state_off,
|
||||
"new_state": state_100,
|
||||
},
|
||||
time_fired=pointB,
|
||||
eventA = self.create_state_changed_event_from_old_new(
|
||||
"light.kitchen", pointB, state_off, state_100
|
||||
)
|
||||
eventB = ha.Event(
|
||||
EVENT_STATE_CHANGED,
|
||||
{
|
||||
"entity_id": "light.kitchen",
|
||||
"old_state": state_100,
|
||||
"new_state": state_200,
|
||||
},
|
||||
time_fired=pointC,
|
||||
eventB = self.create_state_changed_event_from_old_new(
|
||||
"light.kitchen", pointC, state_100, state_200
|
||||
)
|
||||
|
||||
entities_filter = logbook._generate_filter_from_config({})
|
||||
|
@ -547,7 +525,7 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
entries[1], pointA, "bla", domain="switch", entity_id=entity_id
|
||||
)
|
||||
|
||||
def test_entry_message_from_state_device(self):
|
||||
def test_entry_message_from_event_device(self):
|
||||
"""Test if logbook message is correctly created for switches.
|
||||
|
||||
Especially test if the special handling for turn on/off events is done.
|
||||
|
@ -556,29 +534,26 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
|
||||
# message for a device state change
|
||||
eventA = self.create_state_changed_event(pointA, "switch.bla", 10)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "changed to 10"
|
||||
|
||||
# message for a switch turned on
|
||||
eventA = self.create_state_changed_event(pointA, "switch.bla", STATE_ON)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "turned on"
|
||||
|
||||
# message for a switch turned off
|
||||
eventA = self.create_state_changed_event(pointA, "switch.bla", STATE_OFF)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "turned off"
|
||||
|
||||
def test_entry_message_from_state_device_tracker(self):
|
||||
def test_entry_message_from_event_device_tracker(self):
|
||||
"""Test if logbook message is correctly created for device tracker."""
|
||||
pointA = dt_util.utcnow()
|
||||
|
||||
|
@ -586,41 +561,37 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "device_tracker.john", STATE_NOT_HOME
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is away"
|
||||
|
||||
# message for a device tracker "home" state
|
||||
eventA = self.create_state_changed_event(pointA, "device_tracker.john", "work")
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is at work"
|
||||
|
||||
def test_entry_message_from_state_person(self):
|
||||
def test_entry_message_from_event_person(self):
|
||||
"""Test if logbook message is correctly created for a person."""
|
||||
pointA = dt_util.utcnow()
|
||||
|
||||
# message for a device tracker "not home" state
|
||||
eventA = self.create_state_changed_event(pointA, "person.john", STATE_NOT_HOME)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is away"
|
||||
|
||||
# message for a device tracker "home" state
|
||||
eventA = self.create_state_changed_event(pointA, "person.john", "work")
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is at work"
|
||||
|
||||
def test_entry_message_from_state_sun(self):
|
||||
def test_entry_message_from_event_sun(self):
|
||||
"""Test if logbook message is correctly created for sun."""
|
||||
pointA = dt_util.utcnow()
|
||||
|
||||
|
@ -628,9 +599,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "sun.sun", sun.STATE_ABOVE_HORIZON
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "has risen"
|
||||
|
||||
|
@ -638,13 +608,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "sun.sun", sun.STATE_BELOW_HORIZON
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "has set"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_battery(self):
|
||||
def test_entry_message_from_event_binary_sensor_battery(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "battery"}
|
||||
|
@ -653,9 +622,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.battery", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is low"
|
||||
|
||||
|
@ -663,13 +631,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.battery", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is normal"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_connectivity(self):
|
||||
def test_entry_message_from_event_binary_sensor_connectivity(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "connectivity"}
|
||||
|
@ -678,9 +645,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.connectivity", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is connected"
|
||||
|
||||
|
@ -688,13 +654,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.connectivity", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is disconnected"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_door(self):
|
||||
def test_entry_message_from_event_binary_sensor_door(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "door"}
|
||||
|
@ -703,9 +668,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.door", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is opened"
|
||||
|
||||
|
@ -713,13 +677,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.door", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is closed"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_garage_door(self):
|
||||
def test_entry_message_from_event_binary_sensor_garage_door(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "garage_door"}
|
||||
|
@ -728,9 +691,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.garage_door", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is opened"
|
||||
|
||||
|
@ -738,13 +700,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.garage_door", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is closed"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_opening(self):
|
||||
def test_entry_message_from_event_binary_sensor_opening(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "opening"}
|
||||
|
@ -753,9 +714,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.opening", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is opened"
|
||||
|
||||
|
@ -763,13 +723,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.opening", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is closed"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_window(self):
|
||||
def test_entry_message_from_event_binary_sensor_window(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "window"}
|
||||
|
@ -778,9 +737,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.window", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is opened"
|
||||
|
||||
|
@ -788,13 +746,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.window", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is closed"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_lock(self):
|
||||
def test_entry_message_from_event_binary_sensor_lock(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "lock"}
|
||||
|
@ -803,9 +760,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.lock", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is unlocked"
|
||||
|
||||
|
@ -813,13 +769,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.lock", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is locked"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_plug(self):
|
||||
def test_entry_message_from_event_binary_sensor_plug(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "plug"}
|
||||
|
@ -828,9 +783,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.plug", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is plugged in"
|
||||
|
||||
|
@ -838,13 +792,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.plug", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is unplugged"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_presence(self):
|
||||
def test_entry_message_from_event_binary_sensor_presence(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "presence"}
|
||||
|
@ -853,9 +806,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.presence", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is at home"
|
||||
|
||||
|
@ -863,13 +815,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.presence", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is away"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_safety(self):
|
||||
def test_entry_message_from_event_binary_sensor_safety(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "safety"}
|
||||
|
@ -878,9 +829,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.safety", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is unsafe"
|
||||
|
||||
|
@ -888,13 +838,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.safety", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "is safe"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_cold(self):
|
||||
def test_entry_message_from_event_binary_sensor_cold(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "cold"}
|
||||
|
@ -903,9 +852,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.cold", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "detected cold"
|
||||
|
||||
|
@ -913,13 +861,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.cold", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "cleared (no cold detected)"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_gas(self):
|
||||
def test_entry_message_from_event_binary_sensor_gas(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "gas"}
|
||||
|
@ -928,9 +875,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.gas", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "detected gas"
|
||||
|
||||
|
@ -938,13 +884,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.gas", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "cleared (no gas detected)"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_heat(self):
|
||||
def test_entry_message_from_event_binary_sensor_heat(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "heat"}
|
||||
|
@ -953,9 +898,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.heat", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "detected heat"
|
||||
|
||||
|
@ -963,13 +907,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.heat", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "cleared (no heat detected)"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_light(self):
|
||||
def test_entry_message_from_event_binary_sensor_light(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "light"}
|
||||
|
@ -978,9 +921,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.light", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "detected light"
|
||||
|
||||
|
@ -988,13 +930,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.light", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "cleared (no light detected)"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_moisture(self):
|
||||
def test_entry_message_from_event_binary_sensor_moisture(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "moisture"}
|
||||
|
@ -1003,9 +944,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.moisture", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "detected moisture"
|
||||
|
||||
|
@ -1013,13 +953,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.moisture", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "cleared (no moisture detected)"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_motion(self):
|
||||
def test_entry_message_from_event_binary_sensor_motion(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "motion"}
|
||||
|
@ -1028,9 +967,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.motion", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "detected motion"
|
||||
|
||||
|
@ -1038,13 +976,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.motion", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "cleared (no motion detected)"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_occupancy(self):
|
||||
def test_entry_message_from_event_binary_sensor_occupancy(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "occupancy"}
|
||||
|
@ -1053,9 +990,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.occupancy", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "detected occupancy"
|
||||
|
||||
|
@ -1063,13 +999,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.occupancy", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "cleared (no occupancy detected)"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_power(self):
|
||||
def test_entry_message_from_event_binary_sensor_power(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "power"}
|
||||
|
@ -1078,9 +1013,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.power", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "detected power"
|
||||
|
||||
|
@ -1088,13 +1022,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.power", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "cleared (no power detected)"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_problem(self):
|
||||
def test_entry_message_from_event_binary_sensor_problem(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "problem"}
|
||||
|
@ -1103,9 +1036,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.problem", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "detected problem"
|
||||
|
||||
|
@ -1113,13 +1045,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.problem", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "cleared (no problem detected)"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_smoke(self):
|
||||
def test_entry_message_from_event_binary_sensor_smoke(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "smoke"}
|
||||
|
@ -1128,9 +1059,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.smoke", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "detected smoke"
|
||||
|
||||
|
@ -1138,13 +1068,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.smoke", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "cleared (no smoke detected)"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_sound(self):
|
||||
def test_entry_message_from_event_binary_sensor_sound(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "sound"}
|
||||
|
@ -1153,9 +1082,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.sound", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "detected sound"
|
||||
|
||||
|
@ -1163,13 +1091,12 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.sound", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "cleared (no sound detected)"
|
||||
|
||||
def test_entry_message_from_state_binary_sensor_vibration(self):
|
||||
def test_entry_message_from_event_binary_sensor_vibration(self):
|
||||
"""Test if logbook message is correctly created for a binary_sensor."""
|
||||
pointA = dt_util.utcnow()
|
||||
attributes = {"device_class": "vibration"}
|
||||
|
@ -1178,9 +1105,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.vibration", STATE_ON, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "detected vibration"
|
||||
|
||||
|
@ -1188,9 +1114,8 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
eventA = self.create_state_changed_event(
|
||||
pointA, "binary_sensor.vibration", STATE_OFF, attributes
|
||||
)
|
||||
new_state = eventA.data.get("new_state")
|
||||
message = logbook._entry_message_from_state(
|
||||
ha.split_entity_id(new_state.get("entity_id"))[0], new_state
|
||||
message = logbook._entry_message_from_event(
|
||||
self.hass, eventA.entity_id, eventA.domain, eventA
|
||||
)
|
||||
assert message == "cleared (no vibration detected)"
|
||||
|
||||
|
@ -1257,12 +1182,43 @@ class TestComponentLogbook(unittest.TestCase):
|
|||
entity_id, state, attributes, last_changed, last_updated
|
||||
).as_dict()
|
||||
|
||||
return ha.Event(
|
||||
EVENT_STATE_CHANGED,
|
||||
{"entity_id": entity_id, "old_state": old_state, "new_state": new_state},
|
||||
time_fired=event_time_fired,
|
||||
return self.create_state_changed_event_from_old_new(
|
||||
entity_id, event_time_fired, old_state, new_state
|
||||
)
|
||||
|
||||
def create_state_changed_event_from_old_new(
|
||||
self, entity_id, event_time_fired, old_state, new_state
|
||||
):
|
||||
"""Create a state changed event from a old and new state."""
|
||||
event_data_json = json.dumps(
|
||||
{"entity_id": entity_id, "old_state": old_state, "new_state": new_state},
|
||||
cls=JSONEncoder,
|
||||
)
|
||||
row = collections.namedtuple(
|
||||
"Row",
|
||||
[
|
||||
"event_type"
|
||||
"event_data"
|
||||
"time_fired"
|
||||
"context_id"
|
||||
"context_user_id"
|
||||
"state"
|
||||
"entity_id"
|
||||
"domain"
|
||||
],
|
||||
)
|
||||
|
||||
row.event_type = EVENT_STATE_CHANGED
|
||||
row.event_data = event_data_json
|
||||
row.time_fired = event_time_fired
|
||||
row.state = new_state and new_state.get("state")
|
||||
row.entity_id = entity_id
|
||||
row.domain = entity_id and ha.split_entity_id(entity_id)[0]
|
||||
row.context_id = None
|
||||
row.context_user_id = None
|
||||
|
||||
return logbook.LazyEventPartialState(row)
|
||||
|
||||
|
||||
async def test_logbook_view(hass, hass_client):
|
||||
"""Test the logbook view."""
|
||||
|
|
Loading…
Reference in New Issue