diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index a97eed8eff6..62c1213a4a4 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -1026,7 +1026,9 @@ class Recorder(threading.Thread): def _post_schema_migration(self, old_version: int, new_version: int) -> None: """Run post schema migration tasks.""" - migration.post_schema_migration(self.event_session, old_version, new_version) + migration.post_schema_migration( + self.engine, self.event_session, old_version, new_version + ) def _send_keep_alive(self) -> None: """Send a keep alive to keep the db connection open.""" diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 9bddf11fcad..746adf11c18 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -10,7 +10,7 @@ from typing import TYPE_CHECKING import sqlalchemy from sqlalchemy import ForeignKeyConstraint, MetaData, Table, func, text -from sqlalchemy.engine import Engine +from sqlalchemy.engine import CursorResult, Engine from sqlalchemy.exc import ( DatabaseError, InternalError, @@ -43,7 +43,7 @@ from .statistics import ( get_start_time, validate_db_schema as statistics_validate_db_schema, ) -from .tasks import PostSchemaMigrationTask +from .tasks import CommitTask, PostSchemaMigrationTask from .util import session_scope if TYPE_CHECKING: @@ -166,6 +166,9 @@ def migrate_schema( if current_version != SCHEMA_VERSION: instance.queue_task(PostSchemaMigrationTask(current_version, SCHEMA_VERSION)) + # Make sure the post schema migration task is committed in case + # the next task does not have commit_before = True + instance.queue_task(CommitTask()) def _create_index( @@ -846,8 +849,7 @@ def _apply_update( # noqa: C901 _create_index(session_maker, "events", "ix_events_event_type_time_fired_ts") _create_index(session_maker, "states", "ix_states_entity_id_last_updated_ts") _create_index(session_maker, "states", "ix_states_last_updated_ts") - with session_scope(session=session_maker()) as session: - _migrate_columns_to_timestamp(hass, session, engine) + _migrate_columns_to_timestamp(session_maker, engine) elif new_version == 32: # Migration is done in two steps to ensure we can start using # the new columns before we wipe the old ones. @@ -860,6 +862,7 @@ def _apply_update( # noqa: C901 def post_schema_migration( + engine: Engine, session: Session, old_version: int, new_version: int, @@ -878,62 +881,142 @@ def post_schema_migration( # In version 31 we migrated all the time_fired, last_updated, and last_changed # columns to be timestamps. In version 32 we need to wipe the old columns # since they are no longer used and take up a significant amount of space. - _wipe_old_string_time_columns(session) + _wipe_old_string_time_columns(engine, session) -def _wipe_old_string_time_columns(session: Session) -> None: +def _wipe_old_string_time_columns(engine: Engine, session: Session) -> None: """Wipe old string time columns to save space.""" # Wipe Events.time_fired since its been replaced by Events.time_fired_ts # Wipe States.last_updated since its been replaced by States.last_updated_ts # Wipe States.last_changed since its been replaced by States.last_changed_ts - session.execute(text("UPDATE events set time_fired=NULL;")) - session.execute(text("UPDATE states set last_updated=NULL, last_changed=NULL;")) - session.commit() + # + if engine.dialect.name == SupportedDialect.SQLITE: + session.execute(text("UPDATE events set time_fired=NULL;")) + session.commit() + session.execute(text("UPDATE states set last_updated=NULL, last_changed=NULL;")) + session.commit() + elif engine.dialect.name == SupportedDialect.MYSQL: + # + # Since this is only to save space we limit the number of rows we update + # to 10,000,000 per table since we do not want to block the database for too long + # or run out of innodb_buffer_pool_size on MySQL. The old data will eventually + # be cleaned up by the recorder purge if we do not do it now. + # + session.execute(text("UPDATE events set time_fired=NULL LIMIT 10000000;")) + session.commit() + session.execute( + text( + "UPDATE states set last_updated=NULL, last_changed=NULL " + " LIMIT 10000000;" + ) + ) + session.commit() + elif engine.dialect.name == SupportedDialect.POSTGRESQL: + # + # Since this is only to save space we limit the number of rows we update + # to 250,000 per table since we do not want to block the database for too long + # or run out ram with postgresql. The old data will eventually + # be cleaned up by the recorder purge if we do not do it now. + # + session.execute( + text( + "UPDATE events set time_fired=NULL " + "where event_id in " + "(select event_id from events where time_fired_ts is NOT NULL LIMIT 250000);" + ) + ) + session.commit() + session.execute( + text( + "UPDATE states set last_updated=NULL, last_changed=NULL " + "where state_id in " + "(select state_id from states where last_updated_ts is NOT NULL LIMIT 250000);" + ) + ) + session.commit() def _migrate_columns_to_timestamp( - hass: HomeAssistant, session: Session, engine: Engine + session_maker: Callable[[], Session], engine: Engine ) -> None: """Migrate columns to use timestamp.""" # Migrate all data in Events.time_fired to Events.time_fired_ts # Migrate all data in States.last_updated to States.last_updated_ts # Migrate all data in States.last_changed to States.last_changed_ts - connection = session.connection() + result: CursorResult | None = None if engine.dialect.name == SupportedDialect.SQLITE: - connection.execute( - text( - 'UPDATE events set time_fired_ts=strftime("%s",time_fired) + ' - "cast(substr(time_fired,-7) AS FLOAT);" + # With SQLite we do this in one go since it is faster + with session_scope(session=session_maker()) as session: + connection = session.connection() + connection.execute( + text( + 'UPDATE events set time_fired_ts=strftime("%s",time_fired) + ' + "cast(substr(time_fired,-7) AS FLOAT);" + ) ) - ) - connection.execute( - text( - 'UPDATE states set last_updated_ts=strftime("%s",last_updated) + ' - "cast(substr(last_updated,-7) AS FLOAT), " - 'last_changed_ts=strftime("%s",last_changed) + ' - "cast(substr(last_changed,-7) AS FLOAT);" + connection.execute( + text( + 'UPDATE states set last_updated_ts=strftime("%s",last_updated) + ' + "cast(substr(last_updated,-7) AS FLOAT), " + 'last_changed_ts=strftime("%s",last_changed) + ' + "cast(substr(last_changed,-7) AS FLOAT);" + ) ) - ) elif engine.dialect.name == SupportedDialect.MYSQL: - connection.execute( - text("UPDATE events set time_fired_ts=UNIX_TIMESTAMP(time_fired);") - ) - connection.execute( - text( - "UPDATE states set last_updated_ts=UNIX_TIMESTAMP(last_updated), " - "last_changed_ts=UNIX_TIMESTAMP(last_changed);" - ) - ) + # With MySQL we do this in chunks to avoid hitting the `innodb_buffer_pool_size` limit + # We also need to do this in a loop since we can't be sure that we have + # updated all rows in the table until the rowcount is 0 + while result is None or result.rowcount > 0: + with session_scope(session=session_maker()) as session: + result = session.connection().execute( + text( + "UPDATE events set time_fired_ts=" + "IF(time_fired is NULL,0,UNIX_TIMESTAMP(time_fired)) " + "where time_fired_ts is NULL " + "LIMIT 250000;" + ) + ) + result = None + while result is None or result.rowcount > 0: + with session_scope(session=session_maker()) as session: + result = session.connection().execute( + text( + "UPDATE states set last_updated_ts=" + "IF(last_updated is NULL,0,UNIX_TIMESTAMP(last_updated)), " + "last_changed_ts=UNIX_TIMESTAMP(last_changed) " + "where last_updated_ts is NULL " + "LIMIT 250000;" + ) + ) elif engine.dialect.name == SupportedDialect.POSTGRESQL: - connection.execute( - text("UPDATE events set time_fired_ts=EXTRACT(EPOCH FROM time_fired);") - ) - connection.execute( - text( - "UPDATE states set last_updated_ts=EXTRACT(EPOCH FROM last_updated), " - "last_changed_ts=EXTRACT(EPOCH FROM last_changed);" - ) - ) + # With Postgresql we do this in chunks to avoid using too much memory + # We also need to do this in a loop since we can't be sure that we have + # updated all rows in the table until the rowcount is 0 + while result is None or result.rowcount > 0: + with session_scope(session=session_maker()) as session: + result = session.connection().execute( + text( + "UPDATE events SET " + "time_fired_ts= " + "(case when time_fired is NULL then 0 else EXTRACT(EPOCH FROM time_fired) end) " + "WHERE event_id IN ( " + "SELECT event_id FROM events where time_fired_ts is NULL LIMIT 250000 " + " );" + ) + ) + result = None + while result is None or result.rowcount > 0: + with session_scope(session=session_maker()) as session: + result = session.connection().execute( + text( + "UPDATE states set last_updated_ts=" + "(case when last_updated is NULL then 0 else EXTRACT(EPOCH FROM last_updated) end), " + "last_changed_ts=EXTRACT(EPOCH FROM last_changed) " + "where state_id IN ( " + "SELECT state_id FROM states where last_updated_ts is NULL LIMIT 250000 " + " );" + ) + ) def _initialize_database(session: Session) -> bool: