diff --git a/homeassistant/components/history.py b/homeassistant/components/history.py index 893ff23df35..5a3002c05f2 100644 --- a/homeassistant/components/history.py +++ b/homeassistant/components/history.py @@ -57,6 +57,7 @@ def get_significant_states(hass, start_time, end_time=None, entity_id=None, as well as all states from certain domains (for instance thermostat so that we get current temperature in our graphs). """ + timer_start = time.perf_counter() from homeassistant.components.recorder.models import States entity_ids = (entity_id.lower(), ) if entity_id is not None else None @@ -73,12 +74,18 @@ def get_significant_states(hass, start_time, end_time=None, entity_id=None, if end_time is not None: query = query.filter(States.last_updated < end_time) + query = query.order_by(States.last_updated) + states = ( - state for state in execute( - query.order_by(States.entity_id, States.last_updated)) + state for state in execute(query) if (_is_significant(state) and not state.attributes.get(ATTR_HIDDEN, False))) + if _LOGGER.isEnabledFor(logging.DEBUG): + elapsed = time.perf_counter() - timer_start + _LOGGER.debug( + 'get_significant_states took %fs', elapsed) + return states_to_json(hass, states, start_time, entity_id, filters) @@ -90,7 +97,7 @@ def state_changes_during_period(hass, start_time, end_time=None, with session_scope(hass=hass) as session: query = session.query(States).filter( (States.last_changed == States.last_updated) & - (States.last_changed > start_time)) + (States.last_updated > start_time)) if end_time is not None: query = query.filter(States.last_updated < end_time) @@ -99,7 +106,7 @@ def state_changes_during_period(hass, start_time, end_time=None, query = query.filter_by(entity_id=entity_id.lower()) states = execute( - query.order_by(States.entity_id, States.last_updated)) + query.order_by(States.last_updated)) return states_to_json(hass, states, start_time, entity_id) @@ -125,14 +132,10 @@ def get_states(hass, utc_point_in_time, entity_ids=None, run=None, most_recent_state_ids = session.query( States.state_id.label('max_state_id') ).filter( - (States.created < utc_point_in_time) & + (States.last_updated < utc_point_in_time) & (States.entity_id.in_(entity_ids)) ).order_by( - States.created.desc()) - - if filters: - most_recent_state_ids = filters.apply(most_recent_state_ids, - entity_ids) + States.last_updated.desc()) most_recent_state_ids = most_recent_state_ids.limit(1) @@ -140,24 +143,43 @@ def get_states(hass, utc_point_in_time, entity_ids=None, run=None, # We have more than one entity to look at (most commonly we want # all entities,) so we need to do a search on all states since the # last recorder run started. + + most_recent_states_by_date = session.query( + States.entity_id.label('max_entity_id'), + func.max(States.last_updated).label('max_last_updated') + ).filter( + (States.last_updated >= run.start) & + (States.last_updated < utc_point_in_time) + ) + + if entity_ids: + most_recent_states_by_date.filter( + States.entity_id.in_(entity_ids)) + + most_recent_states_by_date = most_recent_states_by_date.group_by( + States.entity_id) + + most_recent_states_by_date = most_recent_states_by_date.subquery() + most_recent_state_ids = session.query( func.max(States.state_id).label('max_state_id') - ).filter( - (States.created >= run.start) & - (States.created < utc_point_in_time) & - (~States.domain.in_(IGNORE_DOMAINS))) - - if filters: - most_recent_state_ids = filters.apply(most_recent_state_ids, - entity_ids) + ).join(most_recent_states_by_date, and_( + States.entity_id == most_recent_states_by_date.c.max_entity_id, + States.last_updated == most_recent_states_by_date.c. + max_last_updated)) most_recent_state_ids = most_recent_state_ids.group_by( States.entity_id) most_recent_state_ids = most_recent_state_ids.subquery() - query = session.query(States).join(most_recent_state_ids, and_( - States.state_id == most_recent_state_ids.c.max_state_id)) + query = session.query(States).join( + most_recent_state_ids, + States.state_id == most_recent_state_ids.c.max_state_id + ).filter((~States.domain.in_(IGNORE_DOMAINS))) + + if filters: + query = filters.apply(query, entity_ids) return [state for state in execute(query) if not state.attributes.get(ATTR_HIDDEN, False)] @@ -178,11 +200,17 @@ def states_to_json(hass, states, start_time, entity_id, filters=None): entity_ids = [entity_id] if entity_id is not None else None # Get the states at the start time + timer_start = time.perf_counter() for state in get_states(hass, start_time, entity_ids, filters=filters): state.last_changed = start_time state.last_updated = start_time result[state.entity_id].append(state) + if _LOGGER.isEnabledFor(logging.DEBUG): + elapsed = time.perf_counter() - timer_start + _LOGGER.debug( + 'getting %d first datapoints took %fs', len(result), elapsed) + # Append all changes to it for ent_id, group in groupby(states, lambda state: state.entity_id): result[ent_id].extend(group) diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 043887b7cab..5a68fe43fe0 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -30,7 +30,7 @@ def migrate_schema(instance): new_version = version + 1 _LOGGER.info("Upgrading recorder db schema to version %s", new_version) - _apply_update(instance.engine, new_version) + _apply_update(instance.engine, new_version, current_version) session.add(SchemaChanges(schema_version=new_version)) _LOGGER.info("Upgrade to version %s done", new_version) @@ -50,11 +50,71 @@ def _create_index(engine, table_name, index_name): # Look up the index object by name from the table is the the models index = next(idx for idx in table.indexes if idx.name == index_name) _LOGGER.debug("Creating %s index", index_name) + _LOGGER.info("Adding index `%s` to database. Note: this can take several " + "minutes on large databases and slow computers. Please " + "be patient!", index_name) index.create(engine) _LOGGER.debug("Finished creating %s", index_name) -def _apply_update(engine, new_version): +def _drop_index(engine, table_name, index_name): + """Drop an index from a specified table. + + There is no universal way to do something like `DROP INDEX IF EXISTS` + so we will simply execute the DROP command and ignore any exceptions + + WARNING: Due to some engines (MySQL at least) being unable to use bind + parameters in a DROP INDEX statement (at least via SQLAlchemy), the query + string here is generated from the method parameters without sanitizing. + DO NOT USE THIS FUNCTION IN ANY OPERATION THAT TAKES USER INPUT. + """ + from sqlalchemy import text + from sqlalchemy.exc import SQLAlchemyError + + _LOGGER.debug("Dropping index %s from table %s", index_name, table_name) + success = False + + # Engines like DB2/Oracle + try: + engine.execute(text("DROP INDEX {index}".format( + index=index_name))) + except SQLAlchemyError: + pass + else: + success = True + + # Engines like SQLite, SQL Server + if not success: + try: + engine.execute(text("DROP INDEX {table}.{index}".format( + index=index_name, + table=table_name))) + except SQLAlchemyError: + pass + else: + success = True + + if not success: + # Engines like MySQL, MS Access + try: + engine.execute(text("DROP INDEX {index} ON {table}".format( + index=index_name, + table=table_name))) + except SQLAlchemyError: + pass + else: + success = True + + if success: + _LOGGER.debug("Finished dropping index %s from table %s", + index_name, table_name) + else: + _LOGGER.warning("Failed to drop index %s from table %s. Schema " + "Migration will continue; this is not a " + "critical operation.", index_name, table_name) + + +def _apply_update(engine, new_version, old_version): """Perform operations to bring schema up to date.""" if new_version == 1: _create_index(engine, "events", "ix_events_time_fired") @@ -63,9 +123,26 @@ def _apply_update(engine, new_version): _create_index(engine, "recorder_runs", "ix_recorder_runs_start_end") # Create indexes for states _create_index(engine, "states", "ix_states_last_updated") - _create_index(engine, "states", "ix_states_entity_id_created") elif new_version == 3: - _create_index(engine, "states", "ix_states_created_domain") + # There used to be a new index here, but it was removed in version 4. + pass + elif new_version == 4: + # Queries were rewritten in this schema release. Most indexes from + # earlier versions of the schema are no longer needed. + + if old_version == 3: + # Remove index that was added in version 3 + _drop_index(engine, "states", "ix_states_created_domain") + if old_version == 2: + # Remove index that was added in version 2 + _drop_index(engine, "states", "ix_states_entity_id_created") + + # Remove indexes that were added in version 0 + _drop_index(engine, "states", "states__state_changes") + _drop_index(engine, "states", "states__significant_changes") + _drop_index(engine, "states", "ix_states_entity_id_created") + + _create_index(engine, "states", "ix_states_entity_id_last_updated") else: raise ValueError("No schema migration defined for version {}" .format(new_version)) diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py index a87eabc44e6..7c29c8045ea 100644 --- a/homeassistant/components/recorder/models.py +++ b/homeassistant/components/recorder/models.py @@ -16,7 +16,7 @@ from homeassistant.remote import JSONEncoder # pylint: disable=invalid-name Base = declarative_base() -SCHEMA_VERSION = 3 +SCHEMA_VERSION = 4 _LOGGER = logging.getLogger(__name__) @@ -70,14 +70,11 @@ class States(Base): # type: ignore index=True) created = Column(DateTime(timezone=True), default=datetime.utcnow) - __table_args__ = (Index('states__state_changes', - 'last_changed', 'last_updated', 'entity_id'), - Index('states__significant_changes', - 'domain', 'last_updated', 'entity_id'), - Index('ix_states_entity_id_created', - 'entity_id', 'created'), - Index('ix_states_created_domain', - 'created', 'domain'),) + __table_args__ = ( + # Used for fetching the state of entities at a specific time + # (get_states in history.py) + Index( + 'ix_states_entity_id_last_updated', 'entity_id', 'last_updated'),) @staticmethod def from_event(event): diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index 2b675e72759..26ddefedf7d 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -16,12 +16,12 @@ def purge_old_data(instance, purge_days): with session_scope(session=instance.get_session()) as session: deleted_rows = session.query(States) \ - .filter((States.created < purge_before)) \ + .filter((States.last_updated < purge_before)) \ .delete(synchronize_session=False) _LOGGER.debug("Deleted %s states", deleted_rows) deleted_rows = session.query(Events) \ - .filter((Events.created < purge_before)) \ + .filter((Events.time_fired < purge_before)) \ .delete(synchronize_session=False) _LOGGER.debug("Deleted %s events", deleted_rows) diff --git a/homeassistant/components/recorder/util.py b/homeassistant/components/recorder/util.py index 63faf2633b1..c6390e5d8e2 100644 --- a/homeassistant/components/recorder/util.py +++ b/homeassistant/components/recorder/util.py @@ -58,10 +58,19 @@ def execute(qry): for tryno in range(0, RETRIES): try: - return [ + timer_start = time.perf_counter() + result = [ row for row in (row.to_native() for row in qry) if row is not None] + + if _LOGGER.isEnabledFor(logging.DEBUG): + elapsed = time.perf_counter() - timer_start + _LOGGER.debug('converting %d rows to native objects took %fs', + len(result), + elapsed) + + return result except SQLAlchemyError as err: _LOGGER.error("Error executing query: %s", err) diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index 4990cbc00eb..7c558f2803d 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -37,7 +37,7 @@ def test_schema_update_calls(hass): yield from wait_connection_ready(hass) update.assert_has_calls([ - call(hass.data[DATA_INSTANCE].engine, version+1) for version + call(hass.data[DATA_INSTANCE].engine, version+1, 0) for version in range(0, SCHEMA_VERSION)]) @@ -64,4 +64,4 @@ def test_schema_migrate(hass): def test_invalid_update(): """Test that an invalid new version raises an exception.""" with pytest.raises(ValueError): - migration._apply_update(None, -1) + migration._apply_update(None, -1, 0)