Code styling tweaks to the recorder integration (#86030)
parent
5b2c1d701a
commit
c5dedb7a79
|
@ -39,7 +39,11 @@ class DBInterruptibleThreadPoolExecutor(InterruptibleThreadPoolExecutor):
|
||||||
|
|
||||||
# When the executor gets lost, the weakref callback will wake up
|
# When the executor gets lost, the weakref callback will wake up
|
||||||
# the worker threads.
|
# the worker threads.
|
||||||
def weakref_cb(_: Any, q=self._work_queue) -> None: # type: ignore[no-untyped-def] # pylint: disable=invalid-name
|
# pylint: disable=invalid-name
|
||||||
|
def weakref_cb( # type: ignore[no-untyped-def]
|
||||||
|
_: Any,
|
||||||
|
q=self._work_queue,
|
||||||
|
) -> None:
|
||||||
q.put(None)
|
q.put(None)
|
||||||
|
|
||||||
num_threads = len(self._threads)
|
num_threads = len(self._threads)
|
||||||
|
|
|
@ -93,10 +93,13 @@ class Filters:
|
||||||
"""Return human readable excludes/includes."""
|
"""Return human readable excludes/includes."""
|
||||||
return (
|
return (
|
||||||
"<Filters"
|
"<Filters"
|
||||||
f" excluded_entities={self.excluded_entities} excluded_domains={self.excluded_domains} "
|
f" excluded_entities={self.excluded_entities}"
|
||||||
f"excluded_entity_globs={self.excluded_entity_globs} "
|
f" excluded_domains={self.excluded_domains}"
|
||||||
f"included_entities={self.included_entities} included_domains={self.included_domains} "
|
f" excluded_entity_globs={self.excluded_entity_globs}"
|
||||||
f"included_entity_globs={self.included_entity_globs}>"
|
f" included_entities={self.included_entities}"
|
||||||
|
f" included_domains={self.included_domains}"
|
||||||
|
f" included_entity_globs={self.included_entity_globs}"
|
||||||
|
">"
|
||||||
)
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
|
|
@ -398,7 +398,11 @@ def get_full_significant_states_with_session(
|
||||||
significant_changes_only: bool = True,
|
significant_changes_only: bool = True,
|
||||||
no_attributes: bool = False,
|
no_attributes: bool = False,
|
||||||
) -> MutableMapping[str, list[State]]:
|
) -> MutableMapping[str, list[State]]:
|
||||||
"""Variant of get_significant_states_with_session that does not return minimal responses."""
|
"""Variant of get_significant_states_with_session.
|
||||||
|
|
||||||
|
Difference with get_significant_states_with_session is that it does not
|
||||||
|
return minimal responses.
|
||||||
|
"""
|
||||||
return cast(
|
return cast(
|
||||||
MutableMapping[str, list[State]],
|
MutableMapping[str, list[State]],
|
||||||
get_significant_states_with_session(
|
get_significant_states_with_session(
|
||||||
|
|
|
@ -55,7 +55,7 @@ _LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def raise_if_exception_missing_str(ex: Exception, match_substrs: Iterable[str]) -> None:
|
def raise_if_exception_missing_str(ex: Exception, match_substrs: Iterable[str]) -> None:
|
||||||
"""Raise an exception if the exception and cause do not contain the match substrs."""
|
"""Raise if the exception and cause do not contain the match substrs."""
|
||||||
lower_ex_strs = [str(ex).lower(), str(ex.__cause__).lower()]
|
lower_ex_strs = [str(ex).lower(), str(ex.__cause__).lower()]
|
||||||
for str_sub in match_substrs:
|
for str_sub in match_substrs:
|
||||||
for exc_str in lower_ex_strs:
|
for exc_str in lower_ex_strs:
|
||||||
|
@ -665,7 +665,8 @@ def _apply_update( # noqa: C901
|
||||||
with session_scope(session=session_maker()) as session:
|
with session_scope(session=session_maker()) as session:
|
||||||
connection = session.connection()
|
connection = session.connection()
|
||||||
connection.execute(
|
connection.execute(
|
||||||
# Using LOCK=EXCLUSIVE to prevent the database from corrupting
|
# Using LOCK=EXCLUSIVE to prevent
|
||||||
|
# the database from corrupting
|
||||||
# https://github.com/home-assistant/core/issues/56104
|
# https://github.com/home-assistant/core/issues/56104
|
||||||
text(
|
text(
|
||||||
f"ALTER TABLE {table} CONVERT TO CHARACTER SET utf8mb4"
|
f"ALTER TABLE {table} CONVERT TO CHARACTER SET utf8mb4"
|
||||||
|
@ -806,7 +807,8 @@ def _apply_update( # noqa: C901
|
||||||
with contextlib.suppress(SQLAlchemyError):
|
with contextlib.suppress(SQLAlchemyError):
|
||||||
with session_scope(session=session_maker()) as session:
|
with session_scope(session=session_maker()) as session:
|
||||||
connection = session.connection()
|
connection = session.connection()
|
||||||
# This is safe to run multiple times and fast since the table is small
|
# This is safe to run multiple times and fast
|
||||||
|
# since the table is small.
|
||||||
connection.execute(
|
connection.execute(
|
||||||
text("ALTER TABLE statistics_meta ROW_FORMAT=DYNAMIC")
|
text("ALTER TABLE statistics_meta ROW_FORMAT=DYNAMIC")
|
||||||
)
|
)
|
||||||
|
@ -935,7 +937,7 @@ def _migrate_columns_to_timestamp(
|
||||||
|
|
||||||
|
|
||||||
def _initialize_database(session: Session) -> bool:
|
def _initialize_database(session: Session) -> bool:
|
||||||
"""Initialize a new database, or a database created before introducing schema changes.
|
"""Initialize a new database.
|
||||||
|
|
||||||
The function determines the schema version by inspecting the db structure.
|
The function determines the schema version by inspecting the db structure.
|
||||||
|
|
||||||
|
@ -962,7 +964,7 @@ def _initialize_database(session: Session) -> bool:
|
||||||
|
|
||||||
|
|
||||||
def initialize_database(session_maker: Callable[[], Session]) -> bool:
|
def initialize_database(session_maker: Callable[[], Session]) -> bool:
|
||||||
"""Initialize a new database, or a database created before introducing schema changes."""
|
"""Initialize a new database."""
|
||||||
try:
|
try:
|
||||||
with session_scope(session=session_maker()) as session:
|
with session_scope(session=session_maker()) as session:
|
||||||
if _get_schema_version(session) is not None:
|
if _get_schema_version(session) is not None:
|
||||||
|
|
|
@ -269,21 +269,22 @@ def _select_event_data_ids_to_purge(
|
||||||
def _select_unused_attributes_ids(
|
def _select_unused_attributes_ids(
|
||||||
session: Session, attributes_ids: set[int], using_sqlite: bool
|
session: Session, attributes_ids: set[int], using_sqlite: bool
|
||||||
) -> set[int]:
|
) -> set[int]:
|
||||||
"""Return a set of attributes ids that are not used by any states in the database."""
|
"""Return a set of attributes ids that are not used by any states in the db."""
|
||||||
if not attributes_ids:
|
if not attributes_ids:
|
||||||
return set()
|
return set()
|
||||||
|
|
||||||
if using_sqlite:
|
if using_sqlite:
|
||||||
#
|
#
|
||||||
# SQLite has a superior query optimizer for the distinct query below as it uses the
|
# SQLite has a superior query optimizer for the distinct query below as it uses
|
||||||
# covering index without having to examine the rows directly for both of the queries
|
# the covering index without having to examine the rows directly for both of the
|
||||||
# below.
|
# queries below.
|
||||||
#
|
#
|
||||||
# We use the distinct query for SQLite since the query in the other branch can
|
# We use the distinct query for SQLite since the query in the other branch can
|
||||||
# generate more than 500 unions which SQLite does not support.
|
# generate more than 500 unions which SQLite does not support.
|
||||||
#
|
#
|
||||||
# How MariaDB's query optimizer handles this query:
|
# How MariaDB's query optimizer handles this query:
|
||||||
# > explain select distinct attributes_id from states where attributes_id in (136723);
|
# > explain select distinct attributes_id from states where attributes_id in
|
||||||
|
# (136723);
|
||||||
# ...Using index
|
# ...Using index
|
||||||
#
|
#
|
||||||
seen_ids = {
|
seen_ids = {
|
||||||
|
@ -294,15 +295,16 @@ def _select_unused_attributes_ids(
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
#
|
#
|
||||||
# This branch is for DBMS that cannot optimize the distinct query well and has to examine
|
# This branch is for DBMS that cannot optimize the distinct query well and has
|
||||||
# all the rows that match.
|
# to examine all the rows that match.
|
||||||
#
|
#
|
||||||
# This branch uses a union of simple queries, as each query is optimized away as the answer
|
# This branch uses a union of simple queries, as each query is optimized away
|
||||||
# to the query can be found in the index.
|
# as the answer to the query can be found in the index.
|
||||||
#
|
#
|
||||||
# The below query works for SQLite as long as there are no more than 500 attributes_id
|
# The below query works for SQLite as long as there are no more than 500
|
||||||
# to be selected. We currently do not have MySQL or PostgreSQL servers running in the
|
# attributes_id to be selected. We currently do not have MySQL or PostgreSQL
|
||||||
# test suite; we test this path using SQLite when there are less than 500 attributes_id.
|
# servers running in the test suite; we test this path using SQLite when there
|
||||||
|
# are less than 500 attributes_id.
|
||||||
#
|
#
|
||||||
# How MariaDB's query optimizer handles this query:
|
# How MariaDB's query optimizer handles this query:
|
||||||
# > explain select min(attributes_id) from states where attributes_id = 136723;
|
# > explain select min(attributes_id) from states where attributes_id = 136723;
|
||||||
|
@ -349,7 +351,7 @@ def _purge_unused_attributes_ids(
|
||||||
def _select_unused_event_data_ids(
|
def _select_unused_event_data_ids(
|
||||||
session: Session, data_ids: set[int], using_sqlite: bool
|
session: Session, data_ids: set[int], using_sqlite: bool
|
||||||
) -> set[int]:
|
) -> set[int]:
|
||||||
"""Return a set of event data ids that are not used by any events in the database."""
|
"""Return a set of event data ids that are not used by any events in the db."""
|
||||||
if not data_ids:
|
if not data_ids:
|
||||||
return set()
|
return set()
|
||||||
|
|
||||||
|
@ -391,7 +393,10 @@ def _purge_unused_data_ids(
|
||||||
def _select_statistics_runs_to_purge(
|
def _select_statistics_runs_to_purge(
|
||||||
session: Session, purge_before: datetime
|
session: Session, purge_before: datetime
|
||||||
) -> list[int]:
|
) -> list[int]:
|
||||||
"""Return a list of statistic runs to purge, but take care to keep the newest run."""
|
"""Return a list of statistic runs to purge.
|
||||||
|
|
||||||
|
Takes care to keep the newest run.
|
||||||
|
"""
|
||||||
statistic_runs = session.execute(find_statistics_runs_to_purge(purge_before)).all()
|
statistic_runs = session.execute(find_statistics_runs_to_purge(purge_before)).all()
|
||||||
statistic_runs_list = [run.run_id for run in statistic_runs]
|
statistic_runs_list = [run.run_id for run in statistic_runs]
|
||||||
# Exclude the newest statistics run
|
# Exclude the newest statistics run
|
||||||
|
@ -418,7 +423,7 @@ def _select_short_term_statistics_to_purge(
|
||||||
def _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
|
def _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
|
||||||
session: Session, purge_before: datetime
|
session: Session, purge_before: datetime
|
||||||
) -> tuple[set[int], set[int], set[int], set[int]]:
|
) -> tuple[set[int], set[int], set[int], set[int]]:
|
||||||
"""Return a list of event, state, and attribute ids to purge that are linked by the event_id.
|
"""Return a list of event, state, and attribute ids to purge linked by the event_id.
|
||||||
|
|
||||||
We do not link these anymore since state_change events
|
We do not link these anymore since state_change events
|
||||||
do not exist in the events table anymore, however we
|
do not exist in the events table anymore, however we
|
||||||
|
@ -676,7 +681,8 @@ def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool])
|
||||||
]
|
]
|
||||||
_LOGGER.debug("Purging entity data for %s", selected_entity_ids)
|
_LOGGER.debug("Purging entity data for %s", selected_entity_ids)
|
||||||
if len(selected_entity_ids) > 0:
|
if len(selected_entity_ids) > 0:
|
||||||
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
|
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states
|
||||||
|
# or events record.
|
||||||
_purge_filtered_states(instance, session, selected_entity_ids, using_sqlite)
|
_purge_filtered_states(instance, session, selected_entity_ids, using_sqlite)
|
||||||
_LOGGER.debug("Purging entity data hasn't fully completed yet")
|
_LOGGER.debug("Purging entity data hasn't fully completed yet")
|
||||||
return False
|
return False
|
||||||
|
|
|
@ -1672,7 +1672,10 @@ def _get_last_statistics_short_term_stmt(
|
||||||
metadata_id: int,
|
metadata_id: int,
|
||||||
number_of_stats: int,
|
number_of_stats: int,
|
||||||
) -> StatementLambdaElement:
|
) -> StatementLambdaElement:
|
||||||
"""Generate a statement for number_of_stats short term statistics for a given statistic_id."""
|
"""Generate a statement for number_of_stats short term statistics.
|
||||||
|
|
||||||
|
For a given statistic_id.
|
||||||
|
"""
|
||||||
return lambda_stmt(
|
return lambda_stmt(
|
||||||
lambda: select(*QUERY_STATISTICS_SHORT_TERM)
|
lambda: select(*QUERY_STATISTICS_SHORT_TERM)
|
||||||
.filter_by(metadata_id=metadata_id)
|
.filter_by(metadata_id=metadata_id)
|
||||||
|
@ -1881,7 +1884,10 @@ def _sorted_statistics_to_dict(
|
||||||
result[stat_id] = []
|
result[stat_id] = []
|
||||||
|
|
||||||
# Identify metadata IDs for which no data was available at the requested start time
|
# Identify metadata IDs for which no data was available at the requested start time
|
||||||
for meta_id, group in groupby(stats, lambda stat: stat.metadata_id): # type: ignore[no-any-return]
|
for meta_id, group in groupby(
|
||||||
|
stats,
|
||||||
|
lambda stat: stat.metadata_id, # type: ignore[no-any-return]
|
||||||
|
):
|
||||||
first_start_time = process_timestamp(next(group).start)
|
first_start_time = process_timestamp(next(group).start)
|
||||||
if start_time and first_start_time > start_time:
|
if start_time and first_start_time > start_time:
|
||||||
need_stat_at_start_time.add(meta_id)
|
need_stat_at_start_time.add(meta_id)
|
||||||
|
@ -1897,7 +1903,10 @@ def _sorted_statistics_to_dict(
|
||||||
stats_at_start_time[stat.metadata_id] = (stat,)
|
stats_at_start_time[stat.metadata_id] = (stat,)
|
||||||
|
|
||||||
# Append all statistic entries, and optionally do unit conversion
|
# Append all statistic entries, and optionally do unit conversion
|
||||||
for meta_id, group in groupby(stats, lambda stat: stat.metadata_id): # type: ignore[no-any-return]
|
for meta_id, group in groupby(
|
||||||
|
stats,
|
||||||
|
lambda stat: stat.metadata_id, # type: ignore[no-any-return]
|
||||||
|
):
|
||||||
state_unit = unit = metadata[meta_id]["unit_of_measurement"]
|
state_unit = unit = metadata[meta_id]["unit_of_measurement"]
|
||||||
statistic_id = metadata[meta_id]["statistic_id"]
|
statistic_id = metadata[meta_id]["statistic_id"]
|
||||||
if state := hass.states.get(statistic_id):
|
if state := hass.states.get(statistic_id):
|
||||||
|
@ -1964,7 +1973,7 @@ def _async_import_statistics(
|
||||||
metadata: StatisticMetaData,
|
metadata: StatisticMetaData,
|
||||||
statistics: Iterable[StatisticData],
|
statistics: Iterable[StatisticData],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Validate timestamps and insert an import_statistics job in the recorder's queue."""
|
"""Validate timestamps and insert an import_statistics job in the queue."""
|
||||||
for statistic in statistics:
|
for statistic in statistics:
|
||||||
start = statistic["start"]
|
start = statistic["start"]
|
||||||
if start.tzinfo is None or start.tzinfo.utcoffset(start) is None:
|
if start.tzinfo is None or start.tzinfo.utcoffset(start) is None:
|
||||||
|
|
|
@ -121,7 +121,10 @@ class PurgeEntitiesTask(RecorderTask):
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class PerodicCleanupTask(RecorderTask):
|
class PerodicCleanupTask(RecorderTask):
|
||||||
"""An object to insert into the recorder to trigger cleanup tasks when auto purge is disabled."""
|
"""An object to insert into the recorder to trigger cleanup tasks.
|
||||||
|
|
||||||
|
Trigger cleanup tasks when auto purge is disabled.
|
||||||
|
"""
|
||||||
|
|
||||||
def run(self, instance: Recorder) -> None:
|
def run(self, instance: Recorder) -> None:
|
||||||
"""Handle the task."""
|
"""Handle the task."""
|
||||||
|
@ -195,7 +198,10 @@ class AdjustStatisticsTask(RecorderTask):
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class WaitTask(RecorderTask):
|
class WaitTask(RecorderTask):
|
||||||
"""An object to insert into the recorder queue to tell it set the _queue_watch event."""
|
"""An object to insert into the recorder queue.
|
||||||
|
|
||||||
|
Tell it set the _queue_watch event.
|
||||||
|
"""
|
||||||
|
|
||||||
commit_before = False
|
commit_before = False
|
||||||
|
|
||||||
|
|
|
@ -196,9 +196,11 @@ def execute_stmt_lambda_element(
|
||||||
"""
|
"""
|
||||||
executed = session.execute(stmt)
|
executed = session.execute(stmt)
|
||||||
use_all = not start_time or ((end_time or dt_util.utcnow()) - start_time).days <= 1
|
use_all = not start_time or ((end_time or dt_util.utcnow()) - start_time).days <= 1
|
||||||
for tryno in range(0, RETRIES):
|
for tryno in range(RETRIES):
|
||||||
try:
|
try:
|
||||||
return executed.all() if use_all else executed.yield_per(yield_per) # type: ignore[no-any-return]
|
if use_all:
|
||||||
|
return executed.all() # type: ignore[no-any-return]
|
||||||
|
return executed.yield_per(yield_per) # type: ignore[no-any-return]
|
||||||
except SQLAlchemyError as err:
|
except SQLAlchemyError as err:
|
||||||
_LOGGER.error("Error executing query: %s", err)
|
_LOGGER.error("Error executing query: %s", err)
|
||||||
if tryno == RETRIES - 1:
|
if tryno == RETRIES - 1:
|
||||||
|
@ -400,12 +402,11 @@ def _datetime_or_none(value: str) -> datetime | None:
|
||||||
def build_mysqldb_conv() -> dict:
|
def build_mysqldb_conv() -> dict:
|
||||||
"""Build a MySQLDB conv dict that uses cisco8601 to parse datetimes."""
|
"""Build a MySQLDB conv dict that uses cisco8601 to parse datetimes."""
|
||||||
# Late imports since we only call this if they are using mysqldb
|
# Late imports since we only call this if they are using mysqldb
|
||||||
from MySQLdb.constants import ( # pylint: disable=import-outside-toplevel,import-error
|
# pylint: disable=import-outside-toplevel,import-error
|
||||||
FIELD_TYPE,
|
from MySQLdb.constants import FIELD_TYPE
|
||||||
)
|
|
||||||
from MySQLdb.converters import ( # pylint: disable=import-outside-toplevel,import-error
|
# pylint: disable=import-outside-toplevel,import-error
|
||||||
conversions,
|
from MySQLdb.converters import conversions
|
||||||
)
|
|
||||||
|
|
||||||
return {**conversions, FIELD_TYPE.DATETIME: _datetime_or_none}
|
return {**conversions, FIELD_TYPE.DATETIME: _datetime_or_none}
|
||||||
|
|
||||||
|
@ -444,7 +445,8 @@ def setup_connection_for_dialect(
|
||||||
# or NORMAL if they do not.
|
# or NORMAL if they do not.
|
||||||
#
|
#
|
||||||
# https://sqlite.org/pragma.html#pragma_synchronous
|
# https://sqlite.org/pragma.html#pragma_synchronous
|
||||||
# The synchronous=NORMAL setting is a good choice for most applications running in WAL mode.
|
# The synchronous=NORMAL setting is a good choice for most applications
|
||||||
|
# running in WAL mode.
|
||||||
#
|
#
|
||||||
synchronous = "NORMAL" if instance.commit_interval else "FULL"
|
synchronous = "NORMAL" if instance.commit_interval else "FULL"
|
||||||
execute_on_connection(dbapi_connection, f"PRAGMA synchronous={synchronous}")
|
execute_on_connection(dbapi_connection, f"PRAGMA synchronous={synchronous}")
|
||||||
|
|
|
@ -242,7 +242,10 @@ def _ws_get_list_statistic_ids(
|
||||||
msg_id: int,
|
msg_id: int,
|
||||||
statistic_type: Literal["mean"] | Literal["sum"] | None = None,
|
statistic_type: Literal["mean"] | Literal["sum"] | None = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Fetch a list of available statistic_id and convert them to json in the executor."""
|
"""Fetch a list of available statistic_id and convert them to JSON.
|
||||||
|
|
||||||
|
Runs in the executor.
|
||||||
|
"""
|
||||||
return JSON_DUMP(
|
return JSON_DUMP(
|
||||||
messages.result_message(msg_id, list_statistic_ids(hass, None, statistic_type))
|
messages.result_message(msg_id, list_statistic_ids(hass, None, statistic_type))
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue