Add comments to recorder statistics code (#56545)
* Add comments to recorder statistics code * Revert accidental change of list_statistic_idspull/56569/head
parent
63610eadc9
commit
f0a4a89d21
|
@ -7,7 +7,7 @@ import dataclasses
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from itertools import groupby
|
from itertools import groupby
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Any, Callable
|
from typing import TYPE_CHECKING, Any, Callable, Literal
|
||||||
|
|
||||||
from sqlalchemy import bindparam, func
|
from sqlalchemy import bindparam, func
|
||||||
from sqlalchemy.exc import SQLAlchemyError
|
from sqlalchemy.exc import SQLAlchemyError
|
||||||
|
@ -205,7 +205,13 @@ def _update_or_add_metadata(
|
||||||
session: scoped_session,
|
session: scoped_session,
|
||||||
new_metadata: StatisticMetaData,
|
new_metadata: StatisticMetaData,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Get metadata_id for a statistic_id, add if it doesn't exist."""
|
"""Get metadata_id for a statistic_id.
|
||||||
|
|
||||||
|
If the statistic_id is previously unknown, add it. If it's already known, update
|
||||||
|
metadata if needed.
|
||||||
|
|
||||||
|
Updating metadata source is not possible.
|
||||||
|
"""
|
||||||
statistic_id = new_metadata["statistic_id"]
|
statistic_id = new_metadata["statistic_id"]
|
||||||
old_metadata_dict = _get_metadata(hass, session, [statistic_id], None)
|
old_metadata_dict = _get_metadata(hass, session, [statistic_id], None)
|
||||||
if not old_metadata_dict:
|
if not old_metadata_dict:
|
||||||
|
@ -250,10 +256,16 @@ def _update_or_add_metadata(
|
||||||
def compile_hourly_statistics(
|
def compile_hourly_statistics(
|
||||||
instance: Recorder, session: scoped_session, start: datetime
|
instance: Recorder, session: scoped_session, start: datetime
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Compile hourly statistics."""
|
"""Compile hourly statistics.
|
||||||
|
|
||||||
|
This will summarize 5-minute statistics for one hour:
|
||||||
|
- average, min max is computed by a database query
|
||||||
|
- sum is taken from the last 5-minute entry during the hour
|
||||||
|
"""
|
||||||
start_time = start.replace(minute=0)
|
start_time = start.replace(minute=0)
|
||||||
end_time = start_time + timedelta(hours=1)
|
end_time = start_time + timedelta(hours=1)
|
||||||
# Get last hour's average, min, max
|
|
||||||
|
# Compute last hour's average, min, max
|
||||||
summary: dict[str, StatisticData] = {}
|
summary: dict[str, StatisticData] = {}
|
||||||
baked_query = instance.hass.data[STATISTICS_SHORT_TERM_BAKERY](
|
baked_query = instance.hass.data[STATISTICS_SHORT_TERM_BAKERY](
|
||||||
lambda session: session.query(*QUERY_STATISTICS_SUMMARY_MEAN)
|
lambda session: session.query(*QUERY_STATISTICS_SUMMARY_MEAN)
|
||||||
|
@ -280,7 +292,7 @@ def compile_hourly_statistics(
|
||||||
"max": _max,
|
"max": _max,
|
||||||
}
|
}
|
||||||
|
|
||||||
# Get last hour's sum
|
# Get last hour's last sum
|
||||||
subquery = (
|
subquery = (
|
||||||
session.query(*QUERY_STATISTICS_SUMMARY_SUM)
|
session.query(*QUERY_STATISTICS_SUMMARY_SUM)
|
||||||
.filter(StatisticsShortTerm.start >= bindparam("start_time"))
|
.filter(StatisticsShortTerm.start >= bindparam("start_time"))
|
||||||
|
@ -315,16 +327,21 @@ def compile_hourly_statistics(
|
||||||
"sum_increase": sum_increase,
|
"sum_increase": sum_increase,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Insert compiled hourly statistics in the database
|
||||||
for metadata_id, stat in summary.items():
|
for metadata_id, stat in summary.items():
|
||||||
session.add(Statistics.from_stats(metadata_id, stat))
|
session.add(Statistics.from_stats(metadata_id, stat))
|
||||||
|
|
||||||
|
|
||||||
@retryable_database_job("statistics")
|
@retryable_database_job("statistics")
|
||||||
def compile_statistics(instance: Recorder, start: datetime) -> bool:
|
def compile_statistics(instance: Recorder, start: datetime) -> bool:
|
||||||
"""Compile statistics."""
|
"""Compile 5-minute statistics for all integrations with a recorder platform.
|
||||||
|
|
||||||
|
The actual calculation is delegated to the platforms.
|
||||||
|
"""
|
||||||
start = dt_util.as_utc(start)
|
start = dt_util.as_utc(start)
|
||||||
end = start + timedelta(minutes=5)
|
end = start + timedelta(minutes=5)
|
||||||
|
|
||||||
|
# Return if we already have 5-minute statistics for the requested period
|
||||||
with session_scope(session=instance.get_session()) as session: # type: ignore
|
with session_scope(session=instance.get_session()) as session: # type: ignore
|
||||||
if session.query(StatisticsRuns).filter_by(start=start).first():
|
if session.query(StatisticsRuns).filter_by(start=start).first():
|
||||||
_LOGGER.debug("Statistics already compiled for %s-%s", start, end)
|
_LOGGER.debug("Statistics already compiled for %s-%s", start, end)
|
||||||
|
@ -332,6 +349,7 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool:
|
||||||
|
|
||||||
_LOGGER.debug("Compiling statistics for %s-%s", start, end)
|
_LOGGER.debug("Compiling statistics for %s-%s", start, end)
|
||||||
platform_stats: list[StatisticResult] = []
|
platform_stats: list[StatisticResult] = []
|
||||||
|
# Collect statistics from all platforms implementing support
|
||||||
for domain, platform in instance.hass.data[DOMAIN].items():
|
for domain, platform in instance.hass.data[DOMAIN].items():
|
||||||
if not hasattr(platform, "compile_statistics"):
|
if not hasattr(platform, "compile_statistics"):
|
||||||
continue
|
continue
|
||||||
|
@ -341,6 +359,7 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool:
|
||||||
)
|
)
|
||||||
platform_stats.extend(platform_stat)
|
platform_stats.extend(platform_stat)
|
||||||
|
|
||||||
|
# Insert collected statistics in the database
|
||||||
with session_scope(session=instance.get_session()) as session: # type: ignore
|
with session_scope(session=instance.get_session()) as session: # type: ignore
|
||||||
for stats in platform_stats:
|
for stats in platform_stats:
|
||||||
metadata_id = _update_or_add_metadata(instance.hass, session, stats["meta"])
|
metadata_id = _update_or_add_metadata(instance.hass, session, stats["meta"])
|
||||||
|
@ -367,9 +386,13 @@ def _get_metadata(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
session: scoped_session,
|
session: scoped_session,
|
||||||
statistic_ids: list[str] | None,
|
statistic_ids: list[str] | None,
|
||||||
statistic_type: str | None,
|
statistic_type: Literal["mean"] | Literal["sum"] | None,
|
||||||
) -> dict[str, StatisticMetaData]:
|
) -> dict[str, StatisticMetaData]:
|
||||||
"""Fetch meta data."""
|
"""Fetch meta data, returns a dict of StatisticMetaData indexed by statistic_id.
|
||||||
|
|
||||||
|
If statistic_ids is given, fetch metadata only for the listed statistics_ids.
|
||||||
|
If statistic_type is given, fetch metadata only for statistic_ids supporting it.
|
||||||
|
"""
|
||||||
|
|
||||||
def _meta(metas: list, wanted_metadata_id: str) -> StatisticMetaData | None:
|
def _meta(metas: list, wanted_metadata_id: str) -> StatisticMetaData | None:
|
||||||
meta: StatisticMetaData | None = None
|
meta: StatisticMetaData | None = None
|
||||||
|
@ -383,6 +406,7 @@ def _get_metadata(
|
||||||
}
|
}
|
||||||
return meta
|
return meta
|
||||||
|
|
||||||
|
# Fetch metatadata from the database
|
||||||
baked_query = hass.data[STATISTICS_META_BAKERY](
|
baked_query = hass.data[STATISTICS_META_BAKERY](
|
||||||
lambda session: session.query(*QUERY_STATISTIC_META)
|
lambda session: session.query(*QUERY_STATISTIC_META)
|
||||||
)
|
)
|
||||||
|
@ -394,13 +418,12 @@ def _get_metadata(
|
||||||
baked_query += lambda q: q.filter(StatisticsMeta.has_mean.isnot(False))
|
baked_query += lambda q: q.filter(StatisticsMeta.has_mean.isnot(False))
|
||||||
elif statistic_type == "sum":
|
elif statistic_type == "sum":
|
||||||
baked_query += lambda q: q.filter(StatisticsMeta.has_sum.isnot(False))
|
baked_query += lambda q: q.filter(StatisticsMeta.has_sum.isnot(False))
|
||||||
elif statistic_type is not None:
|
|
||||||
return {}
|
|
||||||
result = execute(baked_query(session).params(statistic_ids=statistic_ids))
|
result = execute(baked_query(session).params(statistic_ids=statistic_ids))
|
||||||
if not result:
|
if not result:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
metadata_ids = [metadata[0] for metadata in result]
|
metadata_ids = [metadata[0] for metadata in result]
|
||||||
|
# Prepare the result dict
|
||||||
metadata: dict[str, StatisticMetaData] = {}
|
metadata: dict[str, StatisticMetaData] = {}
|
||||||
for _id in metadata_ids:
|
for _id in metadata_ids:
|
||||||
meta = _meta(result, _id)
|
meta = _meta(result, _id)
|
||||||
|
@ -436,17 +459,26 @@ def _configured_unit(unit: str, units: UnitSystem) -> str:
|
||||||
|
|
||||||
|
|
||||||
def list_statistic_ids(
|
def list_statistic_ids(
|
||||||
hass: HomeAssistant, statistic_type: str | None = None
|
hass: HomeAssistant,
|
||||||
|
statistic_type: Literal["mean"] | Literal["sum"] | None = None,
|
||||||
) -> list[dict | None]:
|
) -> list[dict | None]:
|
||||||
"""Return statistic_ids and meta data."""
|
"""Return all statistic_ids and unit of measurement.
|
||||||
|
|
||||||
|
Queries the database for existing statistic_ids, as well as integrations with
|
||||||
|
a recorder platform for statistic_ids which will be added in the next statistics
|
||||||
|
period.
|
||||||
|
"""
|
||||||
units = hass.config.units
|
units = hass.config.units
|
||||||
statistic_ids = {}
|
statistic_ids = {}
|
||||||
|
|
||||||
|
# Query the database
|
||||||
with session_scope(hass=hass) as session:
|
with session_scope(hass=hass) as session:
|
||||||
metadata = _get_metadata(hass, session, None, statistic_type)
|
metadata = _get_metadata(hass, session, None, statistic_type)
|
||||||
|
|
||||||
for meta in metadata.values():
|
for meta in metadata.values():
|
||||||
unit = meta["unit_of_measurement"]
|
unit = meta["unit_of_measurement"]
|
||||||
if unit is not None:
|
if unit is not None:
|
||||||
|
# Display unit according to user settings
|
||||||
unit = _configured_unit(unit, units)
|
unit = _configured_unit(unit, units)
|
||||||
meta["unit_of_measurement"] = unit
|
meta["unit_of_measurement"] = unit
|
||||||
|
|
||||||
|
@ -455,6 +487,7 @@ def list_statistic_ids(
|
||||||
for meta in metadata.values()
|
for meta in metadata.values()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Query all integrations with a registered recorder platform
|
||||||
for platform in hass.data[DOMAIN].values():
|
for platform in hass.data[DOMAIN].values():
|
||||||
if not hasattr(platform, "list_statistic_ids"):
|
if not hasattr(platform, "list_statistic_ids"):
|
||||||
continue
|
continue
|
||||||
|
@ -462,11 +495,13 @@ def list_statistic_ids(
|
||||||
|
|
||||||
for statistic_id, unit in platform_statistic_ids.items():
|
for statistic_id, unit in platform_statistic_ids.items():
|
||||||
if unit is not None:
|
if unit is not None:
|
||||||
|
# Display unit according to user settings
|
||||||
unit = _configured_unit(unit, units)
|
unit = _configured_unit(unit, units)
|
||||||
platform_statistic_ids[statistic_id] = unit
|
platform_statistic_ids[statistic_id] = unit
|
||||||
|
|
||||||
statistic_ids = {**statistic_ids, **platform_statistic_ids}
|
statistic_ids = {**statistic_ids, **platform_statistic_ids}
|
||||||
|
|
||||||
|
# Return a map of statistic_id to unit_of_measurement
|
||||||
return [
|
return [
|
||||||
{"statistic_id": _id, "unit_of_measurement": unit}
|
{"statistic_id": _id, "unit_of_measurement": unit}
|
||||||
for _id, unit in statistic_ids.items()
|
for _id, unit in statistic_ids.items()
|
||||||
|
@ -481,6 +516,10 @@ def _statistics_during_period_query(
|
||||||
base_query: Iterable,
|
base_query: Iterable,
|
||||||
table: type[Statistics | StatisticsShortTerm],
|
table: type[Statistics | StatisticsShortTerm],
|
||||||
) -> Callable:
|
) -> Callable:
|
||||||
|
"""Prepare a database query for statistics during a given period.
|
||||||
|
|
||||||
|
This prepares a baked query, so we don't insert the parameters yet.
|
||||||
|
"""
|
||||||
baked_query = hass.data[bakery](lambda session: session.query(*base_query))
|
baked_query = hass.data[bakery](lambda session: session.query(*base_query))
|
||||||
|
|
||||||
baked_query += lambda q: q.filter(table.start >= bindparam("start_time"))
|
baked_query += lambda q: q.filter(table.start >= bindparam("start_time"))
|
||||||
|
@ -502,11 +541,16 @@ def statistics_during_period(
|
||||||
start_time: datetime,
|
start_time: datetime,
|
||||||
end_time: datetime | None = None,
|
end_time: datetime | None = None,
|
||||||
statistic_ids: list[str] | None = None,
|
statistic_ids: list[str] | None = None,
|
||||||
period: str = "hour",
|
period: Literal["hour"] | Literal["5minute"] = "hour",
|
||||||
) -> dict[str, list[dict[str, str]]]:
|
) -> dict[str, list[dict[str, str]]]:
|
||||||
"""Return states changes during UTC period start_time - end_time."""
|
"""Return statistics during UTC period start_time - end_time for the statistic_ids.
|
||||||
|
|
||||||
|
If end_time is omitted, returns statistics newer than or equal to start_time.
|
||||||
|
If statistic_ids is omitted, returns statistics for all statistics ids.
|
||||||
|
"""
|
||||||
metadata = None
|
metadata = None
|
||||||
with session_scope(hass=hass) as session:
|
with session_scope(hass=hass) as session:
|
||||||
|
# Fetch metadata for the given (or all) statistic_ids
|
||||||
metadata = _get_metadata(hass, session, statistic_ids, None)
|
metadata = _get_metadata(hass, session, statistic_ids, None)
|
||||||
if not metadata:
|
if not metadata:
|
||||||
return {}
|
return {}
|
||||||
|
@ -535,6 +579,7 @@ def statistics_during_period(
|
||||||
)
|
)
|
||||||
if not stats:
|
if not stats:
|
||||||
return {}
|
return {}
|
||||||
|
# Return statistics combined with metadata
|
||||||
return _sorted_statistics_to_dict(
|
return _sorted_statistics_to_dict(
|
||||||
hass, stats, statistic_ids, metadata, True, table.duration
|
hass, stats, statistic_ids, metadata, True, table.duration
|
||||||
)
|
)
|
||||||
|
@ -543,9 +588,10 @@ def statistics_during_period(
|
||||||
def get_last_statistics(
|
def get_last_statistics(
|
||||||
hass: HomeAssistant, number_of_stats: int, statistic_id: str, convert_units: bool
|
hass: HomeAssistant, number_of_stats: int, statistic_id: str, convert_units: bool
|
||||||
) -> dict[str, list[dict]]:
|
) -> dict[str, list[dict]]:
|
||||||
"""Return the last number_of_stats statistics for a statistic_id."""
|
"""Return the last number_of_stats statistics for a given statistic_id."""
|
||||||
statistic_ids = [statistic_id]
|
statistic_ids = [statistic_id]
|
||||||
with session_scope(hass=hass) as session:
|
with session_scope(hass=hass) as session:
|
||||||
|
# Fetch metadata for the given statistic_id
|
||||||
metadata = _get_metadata(hass, session, statistic_ids, None)
|
metadata = _get_metadata(hass, session, statistic_ids, None)
|
||||||
if not metadata:
|
if not metadata:
|
||||||
return {}
|
return {}
|
||||||
|
@ -571,6 +617,7 @@ def get_last_statistics(
|
||||||
if not stats:
|
if not stats:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
# Return statistics combined with metadata
|
||||||
return _sorted_statistics_to_dict(
|
return _sorted_statistics_to_dict(
|
||||||
hass,
|
hass,
|
||||||
stats,
|
stats,
|
||||||
|
@ -602,7 +649,7 @@ def _sorted_statistics_to_dict(
|
||||||
for stat_id in statistic_ids:
|
for stat_id in statistic_ids:
|
||||||
result[stat_id] = []
|
result[stat_id] = []
|
||||||
|
|
||||||
# Append all statistic entries, and do unit conversion
|
# Append all statistic entries, and optionally do unit conversion
|
||||||
for meta_id, group in groupby(stats, lambda stat: stat.metadata_id): # type: ignore
|
for meta_id, group in groupby(stats, lambda stat: stat.metadata_id): # type: ignore
|
||||||
unit = metadata[meta_id]["unit_of_measurement"]
|
unit = metadata[meta_id]["unit_of_measurement"]
|
||||||
statistic_id = metadata[meta_id]["statistic_id"]
|
statistic_id = metadata[meta_id]["statistic_id"]
|
||||||
|
|
Loading…
Reference in New Issue