From 6a64e79d7b780418f1af39b895d37d5c1a8fecab Mon Sep 17 00:00:00 2001 From: Adam Mills Date: Thu, 2 Feb 2017 22:04:14 -0500 Subject: [PATCH] [recorder] Index events time_fired to improve logbook performance (#5633) * Index events time_fired to improve logbook perf. * Updated implementation to track schema versions * Added tests for schema migration support logic * Rename check_schema to migrate_schema --- homeassistant/components/recorder/__init__.py | 74 +++++++++++++++++++ homeassistant/components/recorder/models.py | 13 +++- tests/components/recorder/test_init.py | 24 ++++++ 3 files changed, 110 insertions(+), 1 deletion(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 3d8d1357b0f..9040d1f9fde 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -305,8 +305,82 @@ class Recorder(threading.Thread): models.Base.metadata.create_all(self.engine) session_factory = sessionmaker(bind=self.engine) Session = scoped_session(session_factory) + self._migrate_schema() self.db_ready.set() + def _migrate_schema(self): + """Check if the schema needs to be upgraded.""" + import homeassistant.components.recorder.models as models + schema_changes = models.SchemaChanges + current_version = getattr(Session.query(schema_changes).order_by( + schema_changes.change_id.desc()).first(), 'schema_version', None) + + if current_version == models.SCHEMA_VERSION: + return + _LOGGER.debug("Schema version incorrect: %d", current_version) + + if current_version is None: + current_version = self._inspect_schema_version() + _LOGGER.debug("No schema version found. Inspected version: %d", + current_version) + + for version in range(current_version, models.SCHEMA_VERSION): + new_version = version + 1 + _LOGGER.info( + "Upgrading recorder db schema to version %d", new_version) + self._apply_update(new_version) + self._commit(schema_changes(schema_version=new_version)) + _LOGGER.info( + "Upgraded recorder db schema to version %d", new_version) + + def _apply_update(self, new_version): + """Perform operations to bring schema up to date.""" + from sqlalchemy import Index, Table + import homeassistant.components.recorder.models as models + + if new_version == 1: + def create_index(table_name, column_name): + """Create an index for the specified table and column.""" + table = Table(table_name, models.Base.metadata) + index_name = "_".join(("ix", table_name, column_name)) + index = Index(index_name, getattr(table.c, column_name)) + _LOGGER.debug("Creating index for table %s column %s", + table_name, column_name) + index.create(self.engine) + _LOGGER.debug("Index creation done for table %s column %s", + table_name, column_name) + + create_index("events", "time_fired") + else: + raise ValueError("No schema migration defined for version {}" + .format(new_version)) + + def _inspect_schema_version(self): + """Determine the schema version by inspecting the db structure. + + When the schema verison is not present in the db, either db was just + created with the correct schema, or this is a db created before schema + versions were tracked. For now, we'll test if the changes for schema + version 1 are present to make the determination. Eventually this logic + can be removed and we can assume a new db is being created. + """ + from sqlalchemy.engine import reflection + import homeassistant.components.recorder.models as models + inspector = reflection.Inspector.from_engine(self.engine) + indexes = inspector.get_indexes("events") + for index in indexes: + if index['column_names'] == ["time_fired"]: + # Schema addition from version 1 detected. This is a new db. + current_version = models.SchemaChanges( + schema_version=models.SCHEMA_VERSION) + self._commit(current_version) + return models.SCHEMA_VERSION + + # Version 1 schema changes not found, this db needs to be migrated. + current_version = models.SchemaChanges(schema_version=0) + self._commit(current_version) + return current_version.schema_version + def _close_connection(self): """Close the connection.""" global Session # pylint: disable=global-statement diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py index 3b7b5aca1cb..4bc044a51bd 100644 --- a/homeassistant/components/recorder/models.py +++ b/homeassistant/components/recorder/models.py @@ -16,6 +16,8 @@ from homeassistant.remote import JSONEncoder # pylint: disable=invalid-name Base = declarative_base() +SCHEMA_VERSION = 1 + _LOGGER = logging.getLogger(__name__) @@ -27,7 +29,7 @@ class Events(Base): # type: ignore event_type = Column(String(32), index=True) event_data = Column(Text) origin = Column(String(32)) - time_fired = Column(DateTime(timezone=True)) + time_fired = Column(DateTime(timezone=True), index=True) created = Column(DateTime(timezone=True), default=datetime.utcnow) @staticmethod @@ -149,6 +151,15 @@ class RecorderRuns(Base): # type: ignore return self +class SchemaChanges(Base): # type: ignore + """Representation of schema version changes.""" + + __tablename__ = 'schema_changes' + change_id = Column(Integer, primary_key=True) + schema_version = Column(Integer) + changed = Column(DateTime(timezone=True), default=datetime.utcnow) + + def _process_timestamp(ts): """Process a timestamp into datetime object.""" if ts is None: diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index e8a73e347ff..f729303c685 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -3,6 +3,7 @@ import json from datetime import datetime, timedelta import unittest +from unittest.mock import patch, call import pytest from homeassistant.core import callback @@ -190,6 +191,29 @@ class TestRecorder(unittest.TestCase): self.assertEqual(states.count(), 5) self.assertEqual(events.count(), 5) + def test_schema_no_recheck(self): + """Test that schema is not double-checked when up-to-date.""" + with patch.object(recorder._INSTANCE, '_apply_update') as update, \ + patch.object(recorder._INSTANCE, '_inspect_schema_version') \ + as inspect: + recorder._INSTANCE._migrate_schema() + self.assertEqual(update.call_count, 0) + self.assertEqual(inspect.call_count, 0) + + def test_invalid_update(self): + """Test that an invalid new version raises an exception.""" + with self.assertRaises(ValueError): + recorder._INSTANCE._apply_update(-1) + + def test_schema_update_calls(self): + """Test that schema migrations occurr in correct order.""" + test_version = recorder.models.SchemaChanges(schema_version=0) + self.session.add(test_version) + with patch.object(recorder._INSTANCE, '_apply_update') as update: + recorder._INSTANCE._migrate_schema() + update.assert_has_calls([call(version+1) for version in range( + 0, recorder.models.SCHEMA_VERSION)]) + @pytest.fixture def hass_recorder():