2017-02-26 22:38:06 +00:00
""" The tests for the Recorder component. """
2021-01-01 21:31:56 +00:00
# pylint: disable=protected-access
2021-04-12 06:43:54 +00:00
import datetime
2021-11-05 03:21:38 +00:00
import importlib
2021-04-12 06:43:54 +00:00
import sqlite3
2021-11-05 03:21:38 +00:00
import sys
import threading
2021-04-22 06:29:36 +00:00
from unittest . mock import ANY , Mock , PropertyMock , call , patch
2021-01-01 21:31:56 +00:00
2017-02-26 22:38:06 +00:00
import pytest
2021-04-22 06:29:36 +00:00
from sqlalchemy import create_engine , text
2021-04-12 06:43:54 +00:00
from sqlalchemy . exc import (
DatabaseError ,
InternalError ,
OperationalError ,
ProgrammingError ,
)
2021-04-22 06:29:36 +00:00
from sqlalchemy . orm import Session
2018-08-19 15:22:09 +00:00
from sqlalchemy . pool import StaticPool
2017-02-26 22:38:06 +00:00
from homeassistant . bootstrap import async_setup_component
2021-10-07 10:58:00 +00:00
from homeassistant . components import persistent_notification as pn , recorder
2021-04-12 06:43:54 +00:00
from homeassistant . components . recorder import RecorderRuns , migration , models
from homeassistant . components . recorder . const import DATA_INSTANCE
from homeassistant . components . recorder . models import States
from homeassistant . components . recorder . util import session_scope
2021-03-12 22:17:27 +00:00
import homeassistant . util . dt as dt_util
2019-12-08 17:48:18 +00:00
2021-11-04 15:46:45 +00:00
from . common import async_wait_recording_done_without_instance , create_engine_test
2021-04-12 06:43:54 +00:00
2021-10-07 10:58:00 +00:00
from tests . common import async_fire_time_changed
2017-02-26 22:38:06 +00:00
2021-04-12 06:43:54 +00:00
def _get_native_states ( hass , entity_id ) :
with session_scope ( hass = hass ) as session :
return [
state . to_native ( )
for state in session . query ( States ) . filter ( States . entity_id == entity_id )
]
2018-11-28 12:16:43 +00:00
async def test_schema_update_calls ( hass ) :
2018-01-27 19:58:27 +00:00
""" Test that schema migrations occur in correct order. """
2021-04-13 00:18:38 +00:00
assert await recorder . async_migration_in_progress ( hass ) is False
2021-10-07 10:58:00 +00:00
2019-10-18 17:14:54 +00:00
with patch (
" homeassistant.components.recorder.create_engine " , new = create_engine_test
2020-04-16 22:11:36 +00:00
) , patch (
" homeassistant.components.recorder.migration._apply_update " ,
wraps = migration . _apply_update ,
) as update :
2019-07-31 19:25:30 +00:00
await async_setup_component (
hass , " recorder " , { " recorder " : { " db_url " : " sqlite:// " } }
)
2021-04-12 06:43:54 +00:00
await async_wait_recording_done_without_instance ( hass )
2017-02-26 22:38:06 +00:00
2021-04-13 00:18:38 +00:00
assert await recorder . async_migration_in_progress ( hass ) is False
2019-07-31 19:25:30 +00:00
update . assert_has_calls (
[
2021-09-24 07:19:22 +00:00
call ( hass . data [ DATA_INSTANCE ] , ANY , version + 1 , 0 )
2019-07-31 19:25:30 +00:00
for version in range ( 0 , models . SCHEMA_VERSION )
]
)
2017-02-26 22:38:06 +00:00
2021-04-13 00:18:38 +00:00
async def test_migration_in_progress ( hass ) :
""" Test that we can check for migration in progress. """
assert await recorder . async_migration_in_progress ( hass ) is False
with patch (
" homeassistant.components.recorder.create_engine " , new = create_engine_test
) :
await async_setup_component (
hass , " recorder " , { " recorder " : { " db_url " : " sqlite:// " } }
)
await hass . data [ DATA_INSTANCE ] . async_migration_event . wait ( )
assert await recorder . async_migration_in_progress ( hass ) is True
await async_wait_recording_done_without_instance ( hass )
assert await recorder . async_migration_in_progress ( hass ) is False
2021-04-12 06:43:54 +00:00
async def test_database_migration_failed ( hass ) :
""" Test we notify if the migration fails. """
2021-04-13 00:18:38 +00:00
assert await recorder . async_migration_in_progress ( hass ) is False
2021-04-12 06:43:54 +00:00
with patch (
" homeassistant.components.recorder.create_engine " , new = create_engine_test
) , patch (
" homeassistant.components.recorder.migration._apply_update " ,
side_effect = ValueError ,
2021-10-07 10:58:00 +00:00
) , patch (
" homeassistant.components.persistent_notification.create " , side_effect = pn . create
) as mock_create , patch (
" homeassistant.components.persistent_notification.dismiss " ,
side_effect = pn . dismiss ,
) as mock_dismiss :
2021-04-12 06:43:54 +00:00
await async_setup_component (
hass , " recorder " , { " recorder " : { " db_url " : " sqlite:// " } }
)
hass . states . async_set ( " my.entity " , " on " , { } )
hass . states . async_set ( " my.entity " , " off " , { } )
await hass . async_block_till_done ( )
await hass . async_add_executor_job ( hass . data [ DATA_INSTANCE ] . join )
await hass . async_block_till_done ( )
2021-04-13 00:18:38 +00:00
assert await recorder . async_migration_in_progress ( hass ) is False
2021-10-07 10:58:00 +00:00
assert len ( mock_create . mock_calls ) == 2
assert len ( mock_dismiss . mock_calls ) == 1
2021-04-12 06:43:54 +00:00
async def test_database_migration_encounters_corruption ( hass ) :
""" Test we move away the database if its corrupt. """
2021-10-07 10:58:00 +00:00
2021-04-13 00:18:38 +00:00
assert await recorder . async_migration_in_progress ( hass ) is False
2021-04-12 06:43:54 +00:00
sqlite3_exception = DatabaseError ( " statement " , { } , [ ] )
sqlite3_exception . __cause__ = sqlite3 . DatabaseError ( )
with patch (
" homeassistant.components.recorder.migration.schema_is_current " ,
side_effect = [ False , True ] ,
) , patch (
" homeassistant.components.recorder.migration.migrate_schema " ,
side_effect = sqlite3_exception ,
) , patch (
" homeassistant.components.recorder.move_away_broken_database "
) as move_away :
await async_setup_component (
hass , " recorder " , { " recorder " : { " db_url " : " sqlite:// " } }
)
hass . states . async_set ( " my.entity " , " on " , { } )
hass . states . async_set ( " my.entity " , " off " , { } )
await async_wait_recording_done_without_instance ( hass )
2021-04-13 00:18:38 +00:00
assert await recorder . async_migration_in_progress ( hass ) is False
2021-04-12 06:43:54 +00:00
assert move_away . called
async def test_database_migration_encounters_corruption_not_sqlite ( hass ) :
""" Test we fail on database error when we cannot recover. """
2021-04-13 00:18:38 +00:00
assert await recorder . async_migration_in_progress ( hass ) is False
2021-04-12 06:43:54 +00:00
with patch (
" homeassistant.components.recorder.migration.schema_is_current " ,
side_effect = [ False , True ] ,
) , patch (
" homeassistant.components.recorder.migration.migrate_schema " ,
side_effect = DatabaseError ( " statement " , { } , [ ] ) ,
) , patch (
" homeassistant.components.recorder.move_away_broken_database "
2021-10-07 10:58:00 +00:00
) as move_away , patch (
" homeassistant.components.persistent_notification.create " , side_effect = pn . create
) as mock_create , patch (
" homeassistant.components.persistent_notification.dismiss " ,
side_effect = pn . dismiss ,
) as mock_dismiss :
2021-04-12 06:43:54 +00:00
await async_setup_component (
hass , " recorder " , { " recorder " : { " db_url " : " sqlite:// " } }
)
hass . states . async_set ( " my.entity " , " on " , { } )
hass . states . async_set ( " my.entity " , " off " , { } )
await hass . async_block_till_done ( )
await hass . async_add_executor_job ( hass . data [ DATA_INSTANCE ] . join )
await hass . async_block_till_done ( )
2021-04-13 00:18:38 +00:00
assert await recorder . async_migration_in_progress ( hass ) is False
2021-04-12 06:43:54 +00:00
assert not move_away . called
2021-10-07 10:58:00 +00:00
assert len ( mock_create . mock_calls ) == 2
assert len ( mock_dismiss . mock_calls ) == 1
2021-04-12 06:43:54 +00:00
async def test_events_during_migration_are_queued ( hass ) :
""" Test that events during migration are queued. """
2021-04-13 00:18:38 +00:00
assert await recorder . async_migration_in_progress ( hass ) is False
2021-10-07 10:58:00 +00:00
2021-04-12 06:43:54 +00:00
with patch (
" homeassistant.components.recorder.create_engine " , new = create_engine_test
) :
await async_setup_component (
hass , " recorder " , { " recorder " : { " db_url " : " sqlite:// " } }
)
hass . states . async_set ( " my.entity " , " on " , { } )
hass . states . async_set ( " my.entity " , " off " , { } )
await hass . async_block_till_done ( )
async_fire_time_changed ( hass , dt_util . utcnow ( ) + datetime . timedelta ( hours = 2 ) )
await hass . async_block_till_done ( )
async_fire_time_changed ( hass , dt_util . utcnow ( ) + datetime . timedelta ( hours = 4 ) )
await hass . data [ DATA_INSTANCE ] . async_recorder_ready . wait ( )
await async_wait_recording_done_without_instance ( hass )
2021-04-13 00:18:38 +00:00
assert await recorder . async_migration_in_progress ( hass ) is False
2021-04-12 06:43:54 +00:00
db_states = await hass . async_add_executor_job ( _get_native_states , hass , " my.entity " )
assert len ( db_states ) == 2
async def test_events_during_migration_queue_exhausted ( hass ) :
""" Test that events during migration takes so long the queue is exhausted. """
2021-10-07 10:58:00 +00:00
2021-04-13 00:18:38 +00:00
assert await recorder . async_migration_in_progress ( hass ) is False
2021-04-12 06:43:54 +00:00
with patch (
" homeassistant.components.recorder.create_engine " , new = create_engine_test
) , patch . object ( recorder , " MAX_QUEUE_BACKLOG " , 1 ) :
await async_setup_component (
hass , " recorder " , { " recorder " : { " db_url " : " sqlite:// " } }
)
hass . states . async_set ( " my.entity " , " on " , { } )
await hass . async_block_till_done ( )
async_fire_time_changed ( hass , dt_util . utcnow ( ) + datetime . timedelta ( hours = 2 ) )
await hass . async_block_till_done ( )
async_fire_time_changed ( hass , dt_util . utcnow ( ) + datetime . timedelta ( hours = 4 ) )
await hass . async_block_till_done ( )
hass . states . async_set ( " my.entity " , " off " , { } )
await hass . data [ DATA_INSTANCE ] . async_recorder_ready . wait ( )
await async_wait_recording_done_without_instance ( hass )
2021-04-13 00:18:38 +00:00
assert await recorder . async_migration_in_progress ( hass ) is False
2021-04-12 06:43:54 +00:00
db_states = await hass . async_add_executor_job ( _get_native_states , hass , " my.entity " )
assert len ( db_states ) == 1
hass . states . async_set ( " my.entity " , " on " , { } )
await async_wait_recording_done_without_instance ( hass )
db_states = await hass . async_add_executor_job ( _get_native_states , hass , " my.entity " )
assert len ( db_states ) == 2
2021-11-05 03:21:38 +00:00
@pytest.mark.parametrize ( " start_version " , [ 0 , 16 , 18 , 22 ] )
async def test_schema_migrate ( hass , start_version ) :
2017-02-26 22:38:06 +00:00
""" Test the full schema migration logic.
We ' re just testing that the logic can execute successfully here without
throwing exceptions . Maintaining a set of assertions based on schema
inspection could quickly become quite cumbersome .
"""
2021-03-12 22:17:27 +00:00
2021-11-05 03:21:38 +00:00
migration_done = threading . Event ( )
migration_stall = threading . Event ( )
migration_version = None
real_migration = recorder . migration . migrate_schema
def _create_engine_test ( * args , * * kwargs ) :
""" Test version of create_engine that initializes with old schema.
This simulates an existing db with the old schema .
"""
module = f " tests.components.recorder.models_schema_ { str ( start_version ) } "
importlib . import_module ( module )
old_models = sys . modules [ module ]
engine = create_engine ( * args , * * kwargs )
old_models . Base . metadata . create_all ( engine )
if start_version > 0 :
with Session ( engine ) as session :
session . add ( recorder . models . SchemaChanges ( schema_version = start_version ) )
session . commit ( )
return engine
2021-03-12 22:17:27 +00:00
def _mock_setup_run ( self ) :
self . run_info = RecorderRuns (
start = self . recording_start , created = dt_util . utcnow ( )
)
2021-11-05 03:21:38 +00:00
def _instrument_migration ( * args ) :
""" Control migration progress and check results. """
nonlocal migration_done
nonlocal migration_version
nonlocal migration_stall
migration_stall . wait ( )
try :
real_migration ( * args )
except Exception :
migration_done . set ( )
raise
# Check and report the outcome of the migration; if migration fails
# the recorder will silently create a new database.
with session_scope ( hass = hass ) as session :
res = (
session . query ( models . SchemaChanges )
. order_by ( models . SchemaChanges . change_id . desc ( ) )
. first ( )
)
migration_version = res . schema_version
migration_done . set ( )
with patch (
" homeassistant.components.recorder.create_engine " , new = _create_engine_test
) , patch (
2021-03-12 22:17:27 +00:00
" homeassistant.components.recorder.Recorder._setup_run " ,
side_effect = _mock_setup_run ,
autospec = True ,
2021-11-05 03:21:38 +00:00
) as setup_run , patch (
" homeassistant.components.recorder.migration.migrate_schema " ,
wraps = _instrument_migration ,
) :
2019-07-31 19:25:30 +00:00
await async_setup_component (
hass , " recorder " , { " recorder " : { " db_url " : " sqlite:// " } }
)
2021-11-05 03:21:38 +00:00
assert await recorder . async_migration_in_progress ( hass ) is True
migration_stall . set ( )
2018-11-28 12:16:43 +00:00
await hass . async_block_till_done ( )
2021-11-05 03:21:38 +00:00
migration_done . wait ( )
await async_wait_recording_done_without_instance ( hass )
assert migration_version == models . SCHEMA_VERSION
2017-02-26 22:38:06 +00:00
assert setup_run . called
2021-11-05 03:21:38 +00:00
assert await recorder . async_migration_in_progress ( hass ) is not True
2017-02-26 22:38:06 +00:00
def test_invalid_update ( ) :
""" Test that an invalid new version raises an exception. """
with pytest . raises ( ValueError ) :
2021-04-22 06:29:36 +00:00
migration . _apply_update ( Mock ( ) , Mock ( ) , - 1 , 0 )
2018-08-19 15:22:09 +00:00
2021-04-08 17:08:49 +00:00
@pytest.mark.parametrize (
[ " engine_type " , " substr " ] ,
[
( " postgresql " , " ALTER event_type TYPE VARCHAR(64) " ) ,
( " mssql " , " ALTER COLUMN event_type VARCHAR(64) " ) ,
( " mysql " , " MODIFY event_type VARCHAR(64) " ) ,
( " sqlite " , None ) ,
] ,
)
def test_modify_column ( engine_type , substr ) :
""" Test that modify column generates the expected query. """
2021-04-22 06:29:36 +00:00
connection = Mock ( )
2021-04-08 17:08:49 +00:00
engine = Mock ( )
engine . dialect . name = engine_type
2021-04-22 06:29:36 +00:00
migration . _modify_columns ( connection , engine , " events " , [ " event_type VARCHAR(64) " ] )
2021-04-08 17:08:49 +00:00
if substr :
2021-04-22 06:29:36 +00:00
assert substr in connection . execute . call_args [ 0 ] [ 0 ] . text
2021-04-08 17:08:49 +00:00
else :
2021-04-22 06:29:36 +00:00
assert not connection . execute . called
2021-04-08 17:08:49 +00:00
2018-08-19 15:22:09 +00:00
def test_forgiving_add_column ( ) :
""" Test that add column will continue if column exists. """
2019-07-31 19:25:30 +00:00
engine = create_engine ( " sqlite:// " , poolclass = StaticPool )
2021-04-22 06:29:36 +00:00
with Session ( engine ) as session :
session . execute ( text ( " CREATE TABLE hello (id int) " ) )
migration . _add_columns ( session , " hello " , [ " context_id CHARACTER(36) " ] )
migration . _add_columns ( session , " hello " , [ " context_id CHARACTER(36) " ] )
2018-08-21 09:41:52 +00:00
def test_forgiving_add_index ( ) :
""" Test that add index will continue if index exists. """
2019-07-31 19:25:30 +00:00
engine = create_engine ( " sqlite:// " , poolclass = StaticPool )
2018-08-21 09:41:52 +00:00
models . Base . metadata . create_all ( engine )
2021-04-22 06:29:36 +00:00
with Session ( engine ) as session :
migration . _create_index ( session , " states " , " ix_states_context_id " )
2021-02-08 22:22:38 +00:00
@pytest.mark.parametrize (
" exception_type " , [ OperationalError , ProgrammingError , InternalError ]
)
def test_forgiving_add_index_with_other_db_types ( caplog , exception_type ) :
""" Test that add index will continue if index exists on mysql and postgres. """
mocked_index = Mock ( )
type ( mocked_index ) . name = " ix_states_context_id "
mocked_index . create = Mock (
side_effect = exception_type (
" CREATE INDEX ix_states_old_state_id ON states (old_state_id); " ,
[ ] ,
' relation " ix_states_old_state_id " already exists ' ,
)
)
mocked_table = Mock ( )
type ( mocked_table ) . indexes = PropertyMock ( return_value = [ mocked_index ] )
with patch (
" homeassistant.components.recorder.migration.Table " , return_value = mocked_table
) :
migration . _create_index ( Mock ( ) , " states " , " ix_states_context_id " )
assert " already exists on states " in caplog . text
assert " continuing " in caplog . text
2021-05-03 03:57:42 +00:00
class MockPyODBCProgrammingError ( Exception ) :
""" A mock pyodbc error. """
def test_raise_if_exception_missing_str ( ) :
""" Test we raise an exception if strings are not present. """
programming_exc = ProgrammingError ( " select * from; " , Mock ( ) , Mock ( ) )
programming_exc . __cause__ = MockPyODBCProgrammingError (
" [42S11] [FreeTDS][SQL Server]The operation failed because an index or statistics with name ' ix_states_old_state_id ' already exists on table ' states ' . (1913) (SQLExecDirectW) "
)
migration . raise_if_exception_missing_str (
programming_exc , [ " already exists " , " duplicate " ]
)
with pytest . raises ( ProgrammingError ) :
migration . raise_if_exception_missing_str ( programming_exc , [ " not present " ] )
def test_raise_if_exception_missing_empty_cause_str ( ) :
""" Test we raise an exception if strings are not present with an empty cause. """
programming_exc = ProgrammingError ( " select * from; " , Mock ( ) , Mock ( ) )
programming_exc . __cause__ = MockPyODBCProgrammingError ( )
with pytest . raises ( ProgrammingError ) :
migration . raise_if_exception_missing_str (
programming_exc , [ " already exists " , " duplicate " ]
)
with pytest . raises ( ProgrammingError ) :
migration . raise_if_exception_missing_str ( programming_exc , [ " not present " ] )