Chunk MariaDB and Postgresql data migration to avoid running out of buffer space (#86680)

* Chunk MariaDB data migration to avoid running out of buffer space

This will make the migration slower but since the innodb_buffer_pool_size
is using the defaul to 128M and not tuned to the db size there is a
risk of running out of buffer space for large databases

* Update homeassistant/components/recorder/migration.py

* hard code since bandit thinks its an injection

* Update homeassistant/components/recorder/migration.py

* guard against manually modified data/corrupt db

* adjust to 10k per chunk

* adjust to 50k per chunk

* memory still just fine at 250k

* but slower

* commit after each chunk to reduce lock pressure

* adjust

* set to 0 if null so we do not loop forever (this should only happen if the data is missing)

* set to 0 if null so we do not loop forever (this should only happen if the data is missing)

* tweak

* tweak

* limit cleanup

* lower limit to give some more buffer

* lower limit to give some more buffer

* where required for sqlite

* sqlite can wipe as many as needed with no limit

* limit on mysql only

* chunk postgres

* fix limit

* tweak

* fix reference

* fix

* tweak for ram

* postgres memory reduction

* defer cleanup

* fix

* same order
pull/87101/head
J. Nick Koston 2023-01-27 17:39:45 -10:00 committed by Paulus Schoutsen
parent 6397cc5d04
commit 0a6ce35e30
2 changed files with 127 additions and 42 deletions

View File

@ -1026,7 +1026,9 @@ class Recorder(threading.Thread):
def _post_schema_migration(self, old_version: int, new_version: int) -> None: def _post_schema_migration(self, old_version: int, new_version: int) -> None:
"""Run post schema migration tasks.""" """Run post schema migration tasks."""
migration.post_schema_migration(self.event_session, old_version, new_version) migration.post_schema_migration(
self.engine, self.event_session, old_version, new_version
)
def _send_keep_alive(self) -> None: def _send_keep_alive(self) -> None:
"""Send a keep alive to keep the db connection open.""" """Send a keep alive to keep the db connection open."""

View File

@ -10,7 +10,7 @@ from typing import TYPE_CHECKING
import sqlalchemy import sqlalchemy
from sqlalchemy import ForeignKeyConstraint, MetaData, Table, func, text from sqlalchemy import ForeignKeyConstraint, MetaData, Table, func, text
from sqlalchemy.engine import Engine from sqlalchemy.engine import CursorResult, Engine
from sqlalchemy.exc import ( from sqlalchemy.exc import (
DatabaseError, DatabaseError,
InternalError, InternalError,
@ -43,7 +43,7 @@ from .statistics import (
get_start_time, get_start_time,
validate_db_schema as statistics_validate_db_schema, validate_db_schema as statistics_validate_db_schema,
) )
from .tasks import PostSchemaMigrationTask from .tasks import CommitTask, PostSchemaMigrationTask
from .util import session_scope from .util import session_scope
if TYPE_CHECKING: if TYPE_CHECKING:
@ -166,6 +166,9 @@ def migrate_schema(
if current_version != SCHEMA_VERSION: if current_version != SCHEMA_VERSION:
instance.queue_task(PostSchemaMigrationTask(current_version, SCHEMA_VERSION)) instance.queue_task(PostSchemaMigrationTask(current_version, SCHEMA_VERSION))
# Make sure the post schema migration task is committed in case
# the next task does not have commit_before = True
instance.queue_task(CommitTask())
def _create_index( def _create_index(
@ -846,8 +849,7 @@ def _apply_update( # noqa: C901
_create_index(session_maker, "events", "ix_events_event_type_time_fired_ts") _create_index(session_maker, "events", "ix_events_event_type_time_fired_ts")
_create_index(session_maker, "states", "ix_states_entity_id_last_updated_ts") _create_index(session_maker, "states", "ix_states_entity_id_last_updated_ts")
_create_index(session_maker, "states", "ix_states_last_updated_ts") _create_index(session_maker, "states", "ix_states_last_updated_ts")
with session_scope(session=session_maker()) as session: _migrate_columns_to_timestamp(session_maker, engine)
_migrate_columns_to_timestamp(hass, session, engine)
elif new_version == 32: elif new_version == 32:
# Migration is done in two steps to ensure we can start using # Migration is done in two steps to ensure we can start using
# the new columns before we wipe the old ones. # the new columns before we wipe the old ones.
@ -860,6 +862,7 @@ def _apply_update( # noqa: C901
def post_schema_migration( def post_schema_migration(
engine: Engine,
session: Session, session: Session,
old_version: int, old_version: int,
new_version: int, new_version: int,
@ -878,62 +881,142 @@ def post_schema_migration(
# In version 31 we migrated all the time_fired, last_updated, and last_changed # In version 31 we migrated all the time_fired, last_updated, and last_changed
# columns to be timestamps. In version 32 we need to wipe the old columns # columns to be timestamps. In version 32 we need to wipe the old columns
# since they are no longer used and take up a significant amount of space. # since they are no longer used and take up a significant amount of space.
_wipe_old_string_time_columns(session) _wipe_old_string_time_columns(engine, session)
def _wipe_old_string_time_columns(session: Session) -> None: def _wipe_old_string_time_columns(engine: Engine, session: Session) -> None:
"""Wipe old string time columns to save space.""" """Wipe old string time columns to save space."""
# Wipe Events.time_fired since its been replaced by Events.time_fired_ts # Wipe Events.time_fired since its been replaced by Events.time_fired_ts
# Wipe States.last_updated since its been replaced by States.last_updated_ts # Wipe States.last_updated since its been replaced by States.last_updated_ts
# Wipe States.last_changed since its been replaced by States.last_changed_ts # Wipe States.last_changed since its been replaced by States.last_changed_ts
session.execute(text("UPDATE events set time_fired=NULL;")) #
session.execute(text("UPDATE states set last_updated=NULL, last_changed=NULL;")) if engine.dialect.name == SupportedDialect.SQLITE:
session.commit() session.execute(text("UPDATE events set time_fired=NULL;"))
session.commit()
session.execute(text("UPDATE states set last_updated=NULL, last_changed=NULL;"))
session.commit()
elif engine.dialect.name == SupportedDialect.MYSQL:
#
# Since this is only to save space we limit the number of rows we update
# to 10,000,000 per table since we do not want to block the database for too long
# or run out of innodb_buffer_pool_size on MySQL. The old data will eventually
# be cleaned up by the recorder purge if we do not do it now.
#
session.execute(text("UPDATE events set time_fired=NULL LIMIT 10000000;"))
session.commit()
session.execute(
text(
"UPDATE states set last_updated=NULL, last_changed=NULL "
" LIMIT 10000000;"
)
)
session.commit()
elif engine.dialect.name == SupportedDialect.POSTGRESQL:
#
# Since this is only to save space we limit the number of rows we update
# to 250,000 per table since we do not want to block the database for too long
# or run out ram with postgresql. The old data will eventually
# be cleaned up by the recorder purge if we do not do it now.
#
session.execute(
text(
"UPDATE events set time_fired=NULL "
"where event_id in "
"(select event_id from events where time_fired_ts is NOT NULL LIMIT 250000);"
)
)
session.commit()
session.execute(
text(
"UPDATE states set last_updated=NULL, last_changed=NULL "
"where state_id in "
"(select state_id from states where last_updated_ts is NOT NULL LIMIT 250000);"
)
)
session.commit()
def _migrate_columns_to_timestamp( def _migrate_columns_to_timestamp(
hass: HomeAssistant, session: Session, engine: Engine session_maker: Callable[[], Session], engine: Engine
) -> None: ) -> None:
"""Migrate columns to use timestamp.""" """Migrate columns to use timestamp."""
# Migrate all data in Events.time_fired to Events.time_fired_ts # Migrate all data in Events.time_fired to Events.time_fired_ts
# Migrate all data in States.last_updated to States.last_updated_ts # Migrate all data in States.last_updated to States.last_updated_ts
# Migrate all data in States.last_changed to States.last_changed_ts # Migrate all data in States.last_changed to States.last_changed_ts
connection = session.connection() result: CursorResult | None = None
if engine.dialect.name == SupportedDialect.SQLITE: if engine.dialect.name == SupportedDialect.SQLITE:
connection.execute( # With SQLite we do this in one go since it is faster
text( with session_scope(session=session_maker()) as session:
'UPDATE events set time_fired_ts=strftime("%s",time_fired) + ' connection = session.connection()
"cast(substr(time_fired,-7) AS FLOAT);" connection.execute(
text(
'UPDATE events set time_fired_ts=strftime("%s",time_fired) + '
"cast(substr(time_fired,-7) AS FLOAT);"
)
) )
) connection.execute(
connection.execute( text(
text( 'UPDATE states set last_updated_ts=strftime("%s",last_updated) + '
'UPDATE states set last_updated_ts=strftime("%s",last_updated) + ' "cast(substr(last_updated,-7) AS FLOAT), "
"cast(substr(last_updated,-7) AS FLOAT), " 'last_changed_ts=strftime("%s",last_changed) + '
'last_changed_ts=strftime("%s",last_changed) + ' "cast(substr(last_changed,-7) AS FLOAT);"
"cast(substr(last_changed,-7) AS FLOAT);" )
) )
)
elif engine.dialect.name == SupportedDialect.MYSQL: elif engine.dialect.name == SupportedDialect.MYSQL:
connection.execute( # With MySQL we do this in chunks to avoid hitting the `innodb_buffer_pool_size` limit
text("UPDATE events set time_fired_ts=UNIX_TIMESTAMP(time_fired);") # We also need to do this in a loop since we can't be sure that we have
) # updated all rows in the table until the rowcount is 0
connection.execute( while result is None or result.rowcount > 0:
text( with session_scope(session=session_maker()) as session:
"UPDATE states set last_updated_ts=UNIX_TIMESTAMP(last_updated), " result = session.connection().execute(
"last_changed_ts=UNIX_TIMESTAMP(last_changed);" text(
) "UPDATE events set time_fired_ts="
) "IF(time_fired is NULL,0,UNIX_TIMESTAMP(time_fired)) "
"where time_fired_ts is NULL "
"LIMIT 250000;"
)
)
result = None
while result is None or result.rowcount > 0:
with session_scope(session=session_maker()) as session:
result = session.connection().execute(
text(
"UPDATE states set last_updated_ts="
"IF(last_updated is NULL,0,UNIX_TIMESTAMP(last_updated)), "
"last_changed_ts=UNIX_TIMESTAMP(last_changed) "
"where last_updated_ts is NULL "
"LIMIT 250000;"
)
)
elif engine.dialect.name == SupportedDialect.POSTGRESQL: elif engine.dialect.name == SupportedDialect.POSTGRESQL:
connection.execute( # With Postgresql we do this in chunks to avoid using too much memory
text("UPDATE events set time_fired_ts=EXTRACT(EPOCH FROM time_fired);") # We also need to do this in a loop since we can't be sure that we have
) # updated all rows in the table until the rowcount is 0
connection.execute( while result is None or result.rowcount > 0:
text( with session_scope(session=session_maker()) as session:
"UPDATE states set last_updated_ts=EXTRACT(EPOCH FROM last_updated), " result = session.connection().execute(
"last_changed_ts=EXTRACT(EPOCH FROM last_changed);" text(
) "UPDATE events SET "
) "time_fired_ts= "
"(case when time_fired is NULL then 0 else EXTRACT(EPOCH FROM time_fired) end) "
"WHERE event_id IN ( "
"SELECT event_id FROM events where time_fired_ts is NULL LIMIT 250000 "
" );"
)
)
result = None
while result is None or result.rowcount > 0:
with session_scope(session=session_maker()) as session:
result = session.connection().execute(
text(
"UPDATE states set last_updated_ts="
"(case when last_updated is NULL then 0 else EXTRACT(EPOCH FROM last_updated) end), "
"last_changed_ts=EXTRACT(EPOCH FROM last_changed) "
"where state_id IN ( "
"SELECT state_id FROM states where last_updated_ts is NULL LIMIT 250000 "
" );"
)
)
def _initialize_database(session: Session) -> bool: def _initialize_database(session: Session) -> bool: