diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py
index 3d8d1357b0f..9040d1f9fde 100644
--- a/homeassistant/components/recorder/__init__.py
+++ b/homeassistant/components/recorder/__init__.py
@@ -305,8 +305,82 @@ class Recorder(threading.Thread):
         models.Base.metadata.create_all(self.engine)
         session_factory = sessionmaker(bind=self.engine)
         Session = scoped_session(session_factory)
+        self._migrate_schema()
         self.db_ready.set()
 
+    def _migrate_schema(self):
+        """Check if the schema needs to be upgraded."""
+        import homeassistant.components.recorder.models as models
+        schema_changes = models.SchemaChanges
+        current_version = getattr(Session.query(schema_changes).order_by(
+            schema_changes.change_id.desc()).first(), 'schema_version', None)
+
+        if current_version == models.SCHEMA_VERSION:
+            return
+        _LOGGER.debug("Schema version incorrect: %d", current_version)
+
+        if current_version is None:
+            current_version = self._inspect_schema_version()
+            _LOGGER.debug("No schema version found. Inspected version: %d",
+                          current_version)
+
+        for version in range(current_version, models.SCHEMA_VERSION):
+            new_version = version + 1
+            _LOGGER.info(
+                "Upgrading recorder db schema to version %d", new_version)
+            self._apply_update(new_version)
+            self._commit(schema_changes(schema_version=new_version))
+            _LOGGER.info(
+                "Upgraded recorder db schema to version %d", new_version)
+
+    def _apply_update(self, new_version):
+        """Perform operations to bring schema up to date."""
+        from sqlalchemy import Index, Table
+        import homeassistant.components.recorder.models as models
+
+        if new_version == 1:
+            def create_index(table_name, column_name):
+                """Create an index for the specified table and column."""
+                table = Table(table_name, models.Base.metadata)
+                index_name = "_".join(("ix", table_name, column_name))
+                index = Index(index_name, getattr(table.c, column_name))
+                _LOGGER.debug("Creating index for table %s column %s",
+                              table_name, column_name)
+                index.create(self.engine)
+                _LOGGER.debug("Index creation done for table %s column %s",
+                              table_name, column_name)
+
+            create_index("events", "time_fired")
+        else:
+            raise ValueError("No schema migration defined for version {}"
+                             .format(new_version))
+
+    def _inspect_schema_version(self):
+        """Determine the schema version by inspecting the db structure.
+
+        When the schema verison is not present in the db, either db was just
+        created with the correct schema, or this is a db created before schema
+        versions were tracked. For now, we'll test if the changes for schema
+        version 1 are present to make the determination. Eventually this logic
+        can be removed and we can assume a new db is being created.
+        """
+        from sqlalchemy.engine import reflection
+        import homeassistant.components.recorder.models as models
+        inspector = reflection.Inspector.from_engine(self.engine)
+        indexes = inspector.get_indexes("events")
+        for index in indexes:
+            if index['column_names'] == ["time_fired"]:
+                # Schema addition from version 1 detected. This is a new db.
+                current_version = models.SchemaChanges(
+                    schema_version=models.SCHEMA_VERSION)
+                self._commit(current_version)
+                return models.SCHEMA_VERSION
+
+        # Version 1 schema changes not found, this db needs to be migrated.
+        current_version = models.SchemaChanges(schema_version=0)
+        self._commit(current_version)
+        return current_version.schema_version
+
     def _close_connection(self):
         """Close the connection."""
         global Session  # pylint: disable=global-statement
diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py
index 3b7b5aca1cb..4bc044a51bd 100644
--- a/homeassistant/components/recorder/models.py
+++ b/homeassistant/components/recorder/models.py
@@ -16,6 +16,8 @@ from homeassistant.remote import JSONEncoder
 # pylint: disable=invalid-name
 Base = declarative_base()
 
+SCHEMA_VERSION = 1
+
 _LOGGER = logging.getLogger(__name__)
 
 
@@ -27,7 +29,7 @@ class Events(Base):  # type: ignore
     event_type = Column(String(32), index=True)
     event_data = Column(Text)
     origin = Column(String(32))
-    time_fired = Column(DateTime(timezone=True))
+    time_fired = Column(DateTime(timezone=True), index=True)
     created = Column(DateTime(timezone=True), default=datetime.utcnow)
 
     @staticmethod
@@ -149,6 +151,15 @@ class RecorderRuns(Base):   # type: ignore
         return self
 
 
+class SchemaChanges(Base):   # type: ignore
+    """Representation of schema version changes."""
+
+    __tablename__ = 'schema_changes'
+    change_id = Column(Integer, primary_key=True)
+    schema_version = Column(Integer)
+    changed = Column(DateTime(timezone=True), default=datetime.utcnow)
+
+
 def _process_timestamp(ts):
     """Process a timestamp into datetime object."""
     if ts is None:
diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py
index e8a73e347ff..f729303c685 100644
--- a/tests/components/recorder/test_init.py
+++ b/tests/components/recorder/test_init.py
@@ -3,6 +3,7 @@
 import json
 from datetime import datetime, timedelta
 import unittest
+from unittest.mock import patch, call
 
 import pytest
 from homeassistant.core import callback
@@ -190,6 +191,29 @@ class TestRecorder(unittest.TestCase):
         self.assertEqual(states.count(), 5)
         self.assertEqual(events.count(), 5)
 
+    def test_schema_no_recheck(self):
+        """Test that schema is not double-checked when up-to-date."""
+        with patch.object(recorder._INSTANCE, '_apply_update') as update, \
+                patch.object(recorder._INSTANCE, '_inspect_schema_version') \
+                as inspect:
+            recorder._INSTANCE._migrate_schema()
+            self.assertEqual(update.call_count, 0)
+            self.assertEqual(inspect.call_count, 0)
+
+    def test_invalid_update(self):
+        """Test that an invalid new version raises an exception."""
+        with self.assertRaises(ValueError):
+            recorder._INSTANCE._apply_update(-1)
+
+    def test_schema_update_calls(self):
+        """Test that schema migrations occurr in correct order."""
+        test_version = recorder.models.SchemaChanges(schema_version=0)
+        self.session.add(test_version)
+        with patch.object(recorder._INSTANCE, '_apply_update') as update:
+            recorder._INSTANCE._migrate_schema()
+            update.assert_has_calls([call(version+1) for version in range(
+                0, recorder.models.SCHEMA_VERSION)])
+
 
 @pytest.fixture
 def hass_recorder():