Speed up compiling hourly statistics (#88225)
parent
d46bbcf5a3
commit
5fe8829cf6
|
@ -52,6 +52,7 @@ from homeassistant.util.json import (
|
|||
from .const import ALL_DOMAIN_EXCLUDE_ATTRS, SupportedDialect
|
||||
from .models import (
|
||||
StatisticData,
|
||||
StatisticDataTimestamp,
|
||||
StatisticMetaData,
|
||||
datetime_to_timestamp_or_none,
|
||||
process_timestamp,
|
||||
|
@ -532,7 +533,7 @@ class StatisticsBase:
|
|||
|
||||
@classmethod
|
||||
def from_stats(cls, metadata_id: int, stats: StatisticData) -> Self:
|
||||
"""Create object from a statistics."""
|
||||
"""Create object from a statistics with datatime objects."""
|
||||
return cls( # type: ignore[call-arg]
|
||||
metadata_id=metadata_id,
|
||||
created=None,
|
||||
|
@ -548,6 +549,24 @@ class StatisticsBase:
|
|||
sum=stats.get("sum"),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_stats_ts(cls, metadata_id: int, stats: StatisticDataTimestamp) -> Self:
|
||||
"""Create object from a statistics with timestamps."""
|
||||
return cls( # type: ignore[call-arg]
|
||||
metadata_id=metadata_id,
|
||||
created=None,
|
||||
created_ts=time.time(),
|
||||
start=None,
|
||||
start_ts=stats["start_ts"],
|
||||
mean=stats.get("mean"),
|
||||
min=stats.get("min"),
|
||||
max=stats.get("max"),
|
||||
last_reset=None,
|
||||
last_reset_ts=stats.get("last_reset_ts"),
|
||||
state=stats.get("state"),
|
||||
sum=stats.get("sum"),
|
||||
)
|
||||
|
||||
|
||||
class Statistics(Base, StatisticsBase):
|
||||
"""Long term statistics."""
|
||||
|
|
|
@ -44,21 +44,38 @@ class StatisticResult(TypedDict):
|
|||
stat: StatisticData
|
||||
|
||||
|
||||
class StatisticDataTimestampBase(TypedDict):
|
||||
"""Mandatory fields for statistic data class with a timestamp."""
|
||||
|
||||
start_ts: float
|
||||
|
||||
|
||||
class StatisticDataBase(TypedDict):
|
||||
"""Mandatory fields for statistic data class."""
|
||||
|
||||
start: datetime
|
||||
|
||||
|
||||
class StatisticData(StatisticDataBase, total=False):
|
||||
"""Statistic data class."""
|
||||
class StatisticMixIn(TypedDict, total=False):
|
||||
"""Mandatory fields for statistic data class."""
|
||||
|
||||
mean: float
|
||||
min: float
|
||||
max: float
|
||||
last_reset: datetime | None
|
||||
state: float
|
||||
sum: float
|
||||
min: float
|
||||
max: float
|
||||
mean: float
|
||||
|
||||
|
||||
class StatisticData(StatisticDataBase, StatisticMixIn, total=False):
|
||||
"""Statistic data class."""
|
||||
|
||||
last_reset: datetime | None
|
||||
|
||||
|
||||
class StatisticDataTimestamp(StatisticDataTimestampBase, StatisticMixIn, total=False):
|
||||
"""Statistic data class with a timestamp."""
|
||||
|
||||
last_reset_ts: float | None
|
||||
|
||||
|
||||
class StatisticMetaData(TypedDict):
|
||||
|
|
|
@ -69,10 +69,10 @@ from .db_schema import (
|
|||
)
|
||||
from .models import (
|
||||
StatisticData,
|
||||
StatisticDataTimestamp,
|
||||
StatisticMetaData,
|
||||
StatisticResult,
|
||||
datetime_to_timestamp_or_none,
|
||||
timestamp_to_datetime_or_none,
|
||||
)
|
||||
from .util import (
|
||||
execute,
|
||||
|
@ -644,6 +644,32 @@ def _compile_hourly_statistics_summary_mean_stmt(
|
|||
)
|
||||
|
||||
|
||||
def _compile_hourly_statistics_last_sum_stmt_subquery(
|
||||
start_time_ts: float, end_time_ts: float
|
||||
) -> Subquery:
|
||||
"""Generate the summary mean statement for hourly statistics."""
|
||||
return (
|
||||
select(*QUERY_STATISTICS_SUMMARY_SUM)
|
||||
.filter(StatisticsShortTerm.start_ts >= start_time_ts)
|
||||
.filter(StatisticsShortTerm.start_ts < end_time_ts)
|
||||
.subquery()
|
||||
)
|
||||
|
||||
|
||||
def _compile_hourly_statistics_last_sum_stmt(
|
||||
start_time_ts: float, end_time_ts: float
|
||||
) -> StatementLambdaElement:
|
||||
"""Generate the summary mean statement for hourly statistics."""
|
||||
subquery = _compile_hourly_statistics_last_sum_stmt_subquery(
|
||||
start_time_ts, end_time_ts
|
||||
)
|
||||
return lambda_stmt(
|
||||
lambda: select(subquery)
|
||||
.filter(subquery.c.rownum == 1)
|
||||
.order_by(subquery.c.metadata_id)
|
||||
)
|
||||
|
||||
|
||||
def _compile_hourly_statistics(session: Session, start: datetime) -> None:
|
||||
"""Compile hourly statistics.
|
||||
|
||||
|
@ -657,7 +683,7 @@ def _compile_hourly_statistics(session: Session, start: datetime) -> None:
|
|||
end_time_ts = end_time.timestamp()
|
||||
|
||||
# Compute last hour's average, min, max
|
||||
summary: dict[str, StatisticData] = {}
|
||||
summary: dict[int, StatisticDataTimestamp] = {}
|
||||
stmt = _compile_hourly_statistics_summary_mean_stmt(start_time_ts, end_time_ts)
|
||||
stats = execute_stmt_lambda_element(session, stmt)
|
||||
|
||||
|
@ -665,25 +691,15 @@ def _compile_hourly_statistics(session: Session, start: datetime) -> None:
|
|||
for stat in stats:
|
||||
metadata_id, _mean, _min, _max = stat
|
||||
summary[metadata_id] = {
|
||||
"start": start_time,
|
||||
"start_ts": start_time_ts,
|
||||
"mean": _mean,
|
||||
"min": _min,
|
||||
"max": _max,
|
||||
}
|
||||
|
||||
stmt = _compile_hourly_statistics_last_sum_stmt(start_time_ts, end_time_ts)
|
||||
# Get last hour's last sum
|
||||
subquery = (
|
||||
session.query(*QUERY_STATISTICS_SUMMARY_SUM)
|
||||
.filter(StatisticsShortTerm.start_ts >= bindparam("start_time_ts"))
|
||||
.filter(StatisticsShortTerm.start_ts < bindparam("end_time_ts"))
|
||||
.subquery()
|
||||
)
|
||||
query = (
|
||||
session.query(subquery)
|
||||
.filter(subquery.c.rownum == 1)
|
||||
.order_by(subquery.c.metadata_id)
|
||||
)
|
||||
stats = execute(query.params(start_time_ts=start_time_ts, end_time_ts=end_time_ts))
|
||||
stats = execute_stmt_lambda_element(session, stmt)
|
||||
|
||||
if stats:
|
||||
for stat in stats:
|
||||
|
@ -691,22 +707,24 @@ def _compile_hourly_statistics(session: Session, start: datetime) -> None:
|
|||
if metadata_id in summary:
|
||||
summary[metadata_id].update(
|
||||
{
|
||||
"last_reset": timestamp_to_datetime_or_none(last_reset_ts),
|
||||
"last_reset_ts": last_reset_ts,
|
||||
"state": state,
|
||||
"sum": _sum,
|
||||
}
|
||||
)
|
||||
else:
|
||||
summary[metadata_id] = {
|
||||
"start": start_time,
|
||||
"last_reset": timestamp_to_datetime_or_none(last_reset_ts),
|
||||
"start_ts": start_time_ts,
|
||||
"last_reset_ts": last_reset_ts,
|
||||
"state": state,
|
||||
"sum": _sum,
|
||||
}
|
||||
|
||||
# Insert compiled hourly statistics in the database
|
||||
for metadata_id, summary_item in summary.items():
|
||||
session.add(Statistics.from_stats(metadata_id, summary_item))
|
||||
session.add_all(
|
||||
Statistics.from_stats_ts(metadata_id, summary_item)
|
||||
for metadata_id, summary_item in summary.items()
|
||||
)
|
||||
|
||||
|
||||
@retryable_database_job("statistics")
|
||||
|
|
Loading…
Reference in New Issue