Improve db performance of state change events and reduce overall db size (#36883)

* Add old_state_id to states, remove old/new state data from events since it can now be found by a join

* remove state lookup on restart

* Ensure old_state is set for exisitng states
pull/36911/head
J. Nick Koston 2020-06-17 22:26:41 -05:00 committed by GitHub
parent 94132e5572
commit e7d982ee11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 83 additions and 26 deletions

View File

@ -6,6 +6,7 @@ import logging
import time
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import aliased
import voluptuous as vol
from homeassistant.components import sun
@ -390,18 +391,24 @@ def _get_events(hass, config, start_day, end_day, entity_id=None):
else:
entity_ids = None
old_state = aliased(States, name="old_state")
query = (
session.query(
Events.event_type,
Events.event_data,
Events.time_fired,
Events.context_user_id,
States.state_id,
States.state,
States.entity_id,
States.domain,
States.attributes,
old_state.state_id.label("old_state_id"),
)
.order_by(Events.time_fired)
.outerjoin(States, (Events.event_id == States.event_id))
.outerjoin(old_state, (States.old_state_id == old_state.state_id))
.filter(
Events.event_type.in_(ALL_EVENT_TYPES + list(hass.data.get(DOMAIN, {})))
)
@ -429,7 +436,7 @@ def _get_events(hass, config, start_day, end_day, entity_id=None):
def _get_attribute(hass, entity_id, event, attribute):
current_state = hass.states.get(entity_id)
if not current_state:
return event.data.get("new_state", {}).get("attributes", {}).get(attribute)
return event.attributes.get(attribute)
return current_state.attributes.get(attribute, None)
@ -566,6 +573,7 @@ class LazyEventPartialState:
"_row",
"_event_data",
"_time_fired",
"_attributes",
"event_type",
"entity_id",
"state",
@ -577,6 +585,7 @@ class LazyEventPartialState:
self._row = row
self._event_data = None
self._time_fired = None
self._attributes = None
self.event_type = self._row.event_type
self.entity_id = self._row.entity_id
self.state = self._row.state
@ -587,6 +596,16 @@ class LazyEventPartialState:
"""Context user id of event."""
return self._row.context_user_id
@property
def attributes(self):
"""State attributes."""
if not self._attributes:
if self._row.attributes is None or self._row.attributes == "{}":
self._attributes = {}
else:
self._attributes = json.loads(self._row.attributes)
return self._attributes
@property
def data(self):
"""Event data."""
@ -615,6 +634,9 @@ class LazyEventPartialState:
@property
def has_old_and_new_state(self):
"""Check the json data to see if new_state and old_state is present without decoding."""
if self._row.event_data == "{}":
return self._row.state_id is not None and self._row.old_state_id is not None
return (
'"old_state": {' in self._row.event_data
and '"new_state": {' in self._row.event_data
@ -623,10 +645,6 @@ class LazyEventPartialState:
@property
def hidden(self):
"""Check the json to see if hidden."""
if '"hidden":' in self._row.event_data:
return (
self.data.get("new_state", {})
.get("attributes", {})
.get(ATTR_HIDDEN, False)
)
if '"hidden":' in self._row.attributes:
return self.attributes.get(ATTR_HIDDEN, False)
return False

View File

@ -242,6 +242,7 @@ class Recorder(threading.Thread):
self._timechanges_seen = 0
self._keepalive_count = 0
self._old_state_ids = {}
self.event_session = None
self.get_session = None
@ -383,6 +384,8 @@ class Recorder(threading.Thread):
try:
dbevent = Events.from_event(event)
if event.event_type == EVENT_STATE_CHANGED:
dbevent.event_data = "{}"
self.event_session.add(dbevent)
self.event_session.flush()
except (TypeError, ValueError):
@ -394,8 +397,10 @@ class Recorder(threading.Thread):
if dbevent and event.event_type == EVENT_STATE_CHANGED:
try:
dbstate = States.from_event(event)
dbstate.old_state_id = self._old_state_ids.get(dbstate.entity_id)
dbstate.event_id = dbevent.event_id
self.event_session.add(dbstate)
self.event_session.flush()
except (TypeError, ValueError):
_LOGGER.warning(
"State is not JSON serializable: %s",
@ -405,6 +410,11 @@ class Recorder(threading.Thread):
# Must catch the exception to prevent the loop from collapsing
_LOGGER.exception("Error adding state change: %s", err)
if "new_state" in event.data:
self._old_state_ids[dbstate.entity_id] = dbstate.state_id
elif dbstate.entity_id in self._old_state_ids:
del self._old_state_ids[dbstate.entity_id]
# If they do not have a commit interval
# than we commit right away
if not self.commit_interval:

View File

@ -249,14 +249,11 @@ def _apply_update(engine, new_version, old_version):
elif new_version == 7:
_create_index(engine, "states", "ix_states_entity_id")
elif new_version == 8:
# Pending migration, want to group a few.
pass
# _add_columns(engine, "events", [
# 'context_parent_id CHARACTER(36)',
# ])
# _add_columns(engine, "states", [
# 'context_parent_id CHARACTER(36)',
# ])
_add_columns(engine, "events", ["context_parent_id CHARACTER(36)"])
_add_columns(engine, "states", ["context_parent_id CHARACTER(36)"])
_add_columns(engine, "states", ["old_state_id INTEGER"])
_create_index(engine, "states", "ix_states_context_parent_id")
_create_index(engine, "events", "ix_events_context_parent_id")
else:
raise ValueError(f"No schema migration defined for version {new_version}")

View File

@ -24,7 +24,7 @@ import homeassistant.util.dt as dt_util
# pylint: disable=invalid-name
Base = declarative_base()
SCHEMA_VERSION = 7
SCHEMA_VERSION = 8
_LOGGER = logging.getLogger(__name__)
@ -43,7 +43,7 @@ class Events(Base): # type: ignore
created = Column(DateTime(timezone=True), default=dt_util.utcnow)
context_id = Column(String(36), index=True)
context_user_id = Column(String(36), index=True)
# context_parent_id = Column(String(36), index=True)
context_parent_id = Column(String(36), index=True)
@staticmethod
def from_event(event):
@ -55,7 +55,7 @@ class Events(Base): # type: ignore
time_fired=event.time_fired,
context_id=event.context.id,
context_user_id=event.context.user_id,
# context_parent_id=event.context.parent_id,
context_parent_id=event.context.parent_id,
)
def to_native(self):
@ -90,7 +90,8 @@ class States(Base): # type: ignore
created = Column(DateTime(timezone=True), default=dt_util.utcnow)
context_id = Column(String(36), index=True)
context_user_id = Column(String(36), index=True)
# context_parent_id = Column(String(36), index=True)
context_parent_id = Column(String(36), index=True)
old_state_id = Column(Integer)
__table_args__ = (
# Used for fetching the state of entities at a specific time
@ -108,7 +109,7 @@ class States(Base): # type: ignore
entity_id=entity_id,
context_id=event.context.id,
context_user_id=event.context.user_id,
# context_parent_id=event.context.parent_id,
context_parent_id=event.context.parent_id,
)
# State got deleted

View File

@ -1215,10 +1215,10 @@ class TestComponentLogbook(unittest.TestCase):
self, entity_id, event_time_fired, old_state, new_state
):
"""Create a state changed event from a old and new state."""
event_data_json = json.dumps(
{"entity_id": entity_id, "old_state": old_state, "new_state": new_state},
cls=JSONEncoder,
)
attributes = {}
if new_state is not None:
attributes = new_state.get("attributes")
attributes_json = json.dumps(attributes, cls=JSONEncoder)
row = collections.namedtuple(
"Row",
[
@ -1230,18 +1230,23 @@ class TestComponentLogbook(unittest.TestCase):
"state"
"entity_id"
"domain"
"attributes"
"state_id",
"old_state_id",
],
)
row.event_type = EVENT_STATE_CHANGED
row.event_data = event_data_json
row.event_data = "{}"
row.attributes = attributes_json
row.time_fired = event_time_fired
row.state = new_state and new_state.get("state")
row.entity_id = entity_id
row.domain = entity_id and ha.split_entity_id(entity_id)[0]
row.context_id = None
row.context_user_id = None
row.old_state_id = old_state and 1
row.state_id = new_state and 1
return logbook.LazyEventPartialState(row)

View File

@ -258,3 +258,29 @@ def test_auto_purge(hass_recorder):
assert len(purge_old_data.mock_calls) == 1
dt_util.set_default_time_zone(original_tz)
def test_saving_sets_old_state(hass_recorder):
"""Test saving sets old state."""
hass = hass_recorder()
hass.states.set("test.one", "on", {})
hass.states.set("test.two", "on", {})
wait_recording_done(hass)
hass.states.set("test.one", "off", {})
hass.states.set("test.two", "off", {})
wait_recording_done(hass)
with session_scope(hass=hass) as session:
states = list(session.query(States))
assert len(states) == 4
assert states[0].entity_id == "test.one"
assert states[1].entity_id == "test.two"
assert states[2].entity_id == "test.one"
assert states[3].entity_id == "test.two"
assert states[0].old_state_id is None
assert states[1].old_state_id is None
assert states[2].old_state_id == states[0].state_id
assert states[3].old_state_id == states[1].state_id