[recorder] Index events time_fired to improve logbook performance (#5633)
* Index events time_fired to improve logbook perf. * Updated implementation to track schema versions * Added tests for schema migration support logic * Rename check_schema to migrate_schemapull/5729/head
parent
dfb991ce19
commit
6a64e79d7b
|
@ -305,8 +305,82 @@ class Recorder(threading.Thread):
|
|||
models.Base.metadata.create_all(self.engine)
|
||||
session_factory = sessionmaker(bind=self.engine)
|
||||
Session = scoped_session(session_factory)
|
||||
self._migrate_schema()
|
||||
self.db_ready.set()
|
||||
|
||||
def _migrate_schema(self):
|
||||
"""Check if the schema needs to be upgraded."""
|
||||
import homeassistant.components.recorder.models as models
|
||||
schema_changes = models.SchemaChanges
|
||||
current_version = getattr(Session.query(schema_changes).order_by(
|
||||
schema_changes.change_id.desc()).first(), 'schema_version', None)
|
||||
|
||||
if current_version == models.SCHEMA_VERSION:
|
||||
return
|
||||
_LOGGER.debug("Schema version incorrect: %d", current_version)
|
||||
|
||||
if current_version is None:
|
||||
current_version = self._inspect_schema_version()
|
||||
_LOGGER.debug("No schema version found. Inspected version: %d",
|
||||
current_version)
|
||||
|
||||
for version in range(current_version, models.SCHEMA_VERSION):
|
||||
new_version = version + 1
|
||||
_LOGGER.info(
|
||||
"Upgrading recorder db schema to version %d", new_version)
|
||||
self._apply_update(new_version)
|
||||
self._commit(schema_changes(schema_version=new_version))
|
||||
_LOGGER.info(
|
||||
"Upgraded recorder db schema to version %d", new_version)
|
||||
|
||||
def _apply_update(self, new_version):
|
||||
"""Perform operations to bring schema up to date."""
|
||||
from sqlalchemy import Index, Table
|
||||
import homeassistant.components.recorder.models as models
|
||||
|
||||
if new_version == 1:
|
||||
def create_index(table_name, column_name):
|
||||
"""Create an index for the specified table and column."""
|
||||
table = Table(table_name, models.Base.metadata)
|
||||
index_name = "_".join(("ix", table_name, column_name))
|
||||
index = Index(index_name, getattr(table.c, column_name))
|
||||
_LOGGER.debug("Creating index for table %s column %s",
|
||||
table_name, column_name)
|
||||
index.create(self.engine)
|
||||
_LOGGER.debug("Index creation done for table %s column %s",
|
||||
table_name, column_name)
|
||||
|
||||
create_index("events", "time_fired")
|
||||
else:
|
||||
raise ValueError("No schema migration defined for version {}"
|
||||
.format(new_version))
|
||||
|
||||
def _inspect_schema_version(self):
|
||||
"""Determine the schema version by inspecting the db structure.
|
||||
|
||||
When the schema verison is not present in the db, either db was just
|
||||
created with the correct schema, or this is a db created before schema
|
||||
versions were tracked. For now, we'll test if the changes for schema
|
||||
version 1 are present to make the determination. Eventually this logic
|
||||
can be removed and we can assume a new db is being created.
|
||||
"""
|
||||
from sqlalchemy.engine import reflection
|
||||
import homeassistant.components.recorder.models as models
|
||||
inspector = reflection.Inspector.from_engine(self.engine)
|
||||
indexes = inspector.get_indexes("events")
|
||||
for index in indexes:
|
||||
if index['column_names'] == ["time_fired"]:
|
||||
# Schema addition from version 1 detected. This is a new db.
|
||||
current_version = models.SchemaChanges(
|
||||
schema_version=models.SCHEMA_VERSION)
|
||||
self._commit(current_version)
|
||||
return models.SCHEMA_VERSION
|
||||
|
||||
# Version 1 schema changes not found, this db needs to be migrated.
|
||||
current_version = models.SchemaChanges(schema_version=0)
|
||||
self._commit(current_version)
|
||||
return current_version.schema_version
|
||||
|
||||
def _close_connection(self):
|
||||
"""Close the connection."""
|
||||
global Session # pylint: disable=global-statement
|
||||
|
|
|
@ -16,6 +16,8 @@ from homeassistant.remote import JSONEncoder
|
|||
# pylint: disable=invalid-name
|
||||
Base = declarative_base()
|
||||
|
||||
SCHEMA_VERSION = 1
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -27,7 +29,7 @@ class Events(Base): # type: ignore
|
|||
event_type = Column(String(32), index=True)
|
||||
event_data = Column(Text)
|
||||
origin = Column(String(32))
|
||||
time_fired = Column(DateTime(timezone=True))
|
||||
time_fired = Column(DateTime(timezone=True), index=True)
|
||||
created = Column(DateTime(timezone=True), default=datetime.utcnow)
|
||||
|
||||
@staticmethod
|
||||
|
@ -149,6 +151,15 @@ class RecorderRuns(Base): # type: ignore
|
|||
return self
|
||||
|
||||
|
||||
class SchemaChanges(Base): # type: ignore
|
||||
"""Representation of schema version changes."""
|
||||
|
||||
__tablename__ = 'schema_changes'
|
||||
change_id = Column(Integer, primary_key=True)
|
||||
schema_version = Column(Integer)
|
||||
changed = Column(DateTime(timezone=True), default=datetime.utcnow)
|
||||
|
||||
|
||||
def _process_timestamp(ts):
|
||||
"""Process a timestamp into datetime object."""
|
||||
if ts is None:
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
import json
|
||||
from datetime import datetime, timedelta
|
||||
import unittest
|
||||
from unittest.mock import patch, call
|
||||
|
||||
import pytest
|
||||
from homeassistant.core import callback
|
||||
|
@ -190,6 +191,29 @@ class TestRecorder(unittest.TestCase):
|
|||
self.assertEqual(states.count(), 5)
|
||||
self.assertEqual(events.count(), 5)
|
||||
|
||||
def test_schema_no_recheck(self):
|
||||
"""Test that schema is not double-checked when up-to-date."""
|
||||
with patch.object(recorder._INSTANCE, '_apply_update') as update, \
|
||||
patch.object(recorder._INSTANCE, '_inspect_schema_version') \
|
||||
as inspect:
|
||||
recorder._INSTANCE._migrate_schema()
|
||||
self.assertEqual(update.call_count, 0)
|
||||
self.assertEqual(inspect.call_count, 0)
|
||||
|
||||
def test_invalid_update(self):
|
||||
"""Test that an invalid new version raises an exception."""
|
||||
with self.assertRaises(ValueError):
|
||||
recorder._INSTANCE._apply_update(-1)
|
||||
|
||||
def test_schema_update_calls(self):
|
||||
"""Test that schema migrations occurr in correct order."""
|
||||
test_version = recorder.models.SchemaChanges(schema_version=0)
|
||||
self.session.add(test_version)
|
||||
with patch.object(recorder._INSTANCE, '_apply_update') as update:
|
||||
recorder._INSTANCE._migrate_schema()
|
||||
update.assert_has_calls([call(version+1) for version in range(
|
||||
0, recorder.models.SCHEMA_VERSION)])
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def hass_recorder():
|
||||
|
|
Loading…
Reference in New Issue