Minor refactoring of periodic statistics (#56492)
parent
d8d34fdd3b
commit
92253f5192
|
@ -1,6 +1,7 @@
|
|||
"""Models for SQLAlchemy."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterable
|
||||
from datetime import datetime, timedelta
|
||||
import json
|
||||
import logging
|
||||
|
@ -223,7 +224,23 @@ class States(Base): # type: ignore
|
|||
return None
|
||||
|
||||
|
||||
class StatisticData(TypedDict, total=False):
|
||||
class StatisticResult(TypedDict):
|
||||
"""Statistic result data class.
|
||||
|
||||
Allows multiple datapoints for the same statistic_id.
|
||||
"""
|
||||
|
||||
meta: StatisticMetaData
|
||||
stat: Iterable[StatisticData]
|
||||
|
||||
|
||||
class StatisticDataBase(TypedDict):
|
||||
"""Mandatory fields for statistic data class."""
|
||||
|
||||
start: datetime
|
||||
|
||||
|
||||
class StatisticData(StatisticDataBase, total=False):
|
||||
"""Statistic data class."""
|
||||
|
||||
mean: float
|
||||
|
@ -260,11 +277,10 @@ class StatisticsBase:
|
|||
sum_increase = Column(DOUBLE_TYPE)
|
||||
|
||||
@classmethod
|
||||
def from_stats(cls, metadata_id: str, start: datetime, stats: StatisticData):
|
||||
def from_stats(cls, metadata_id: str, stats: StatisticData):
|
||||
"""Create object from a statistics."""
|
||||
return cls( # type: ignore
|
||||
metadata_id=metadata_id,
|
||||
start=start,
|
||||
**stats,
|
||||
)
|
||||
|
||||
|
@ -293,7 +309,7 @@ class StatisticsShortTerm(Base, StatisticsBase): # type: ignore
|
|||
__tablename__ = TABLE_STATISTICS_SHORT_TERM
|
||||
|
||||
|
||||
class StatisticMetaData(TypedDict, total=False):
|
||||
class StatisticMetaData(TypedDict):
|
||||
"""Statistic meta data class."""
|
||||
|
||||
statistic_id: str
|
||||
|
|
|
@ -30,7 +30,9 @@ import homeassistant.util.volume as volume_util
|
|||
|
||||
from .const import DOMAIN
|
||||
from .models import (
|
||||
StatisticData,
|
||||
StatisticMetaData,
|
||||
StatisticResult,
|
||||
Statistics,
|
||||
StatisticsMeta,
|
||||
StatisticsRuns,
|
||||
|
@ -201,10 +203,10 @@ def _get_metadata_ids(
|
|||
def _update_or_add_metadata(
|
||||
hass: HomeAssistant,
|
||||
session: scoped_session,
|
||||
statistic_id: str,
|
||||
new_metadata: StatisticMetaData,
|
||||
) -> str:
|
||||
"""Get metadata_id for a statistic_id, add if it doesn't exist."""
|
||||
statistic_id = new_metadata["statistic_id"]
|
||||
old_metadata_dict = _get_metadata(hass, session, [statistic_id], None)
|
||||
if not old_metadata_dict:
|
||||
unit = new_metadata["unit_of_measurement"]
|
||||
|
@ -252,7 +254,7 @@ def compile_hourly_statistics(
|
|||
start_time = start.replace(minute=0)
|
||||
end_time = start_time + timedelta(hours=1)
|
||||
# Get last hour's average, min, max
|
||||
summary = {}
|
||||
summary: dict[str, StatisticData] = {}
|
||||
baked_query = instance.hass.data[STATISTICS_SHORT_TERM_BAKERY](
|
||||
lambda session: session.query(*QUERY_STATISTICS_SUMMARY_MEAN)
|
||||
)
|
||||
|
@ -272,7 +274,7 @@ def compile_hourly_statistics(
|
|||
for stat in stats:
|
||||
metadata_id, _mean, _min, _max = stat
|
||||
summary[metadata_id] = {
|
||||
"metadata_id": metadata_id,
|
||||
"start": start_time,
|
||||
"mean": _mean,
|
||||
"min": _min,
|
||||
"max": _max,
|
||||
|
@ -295,19 +297,26 @@ def compile_hourly_statistics(
|
|||
if stats:
|
||||
for stat in stats:
|
||||
metadata_id, start, last_reset, state, _sum, sum_increase, _ = stat
|
||||
summary[metadata_id] = {
|
||||
**summary.get(metadata_id, {}),
|
||||
**{
|
||||
"metadata_id": metadata_id,
|
||||
if metadata_id in summary:
|
||||
summary[metadata_id].update(
|
||||
{
|
||||
"last_reset": process_timestamp(last_reset),
|
||||
"state": state,
|
||||
"sum": _sum,
|
||||
"sum_increase": sum_increase,
|
||||
}
|
||||
)
|
||||
else:
|
||||
summary[metadata_id] = {
|
||||
"start": start_time,
|
||||
"last_reset": process_timestamp(last_reset),
|
||||
"state": state,
|
||||
"sum": _sum,
|
||||
"sum_increase": sum_increase,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
for stat in summary.values():
|
||||
session.add(Statistics.from_stats(stat.pop("metadata_id"), start_time, stat))
|
||||
for metadata_id, stat in summary.items():
|
||||
session.add(Statistics.from_stats(metadata_id, stat))
|
||||
|
||||
|
||||
@retryable_database_job("statistics")
|
||||
|
@ -322,30 +331,27 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool:
|
|||
return True
|
||||
|
||||
_LOGGER.debug("Compiling statistics for %s-%s", start, end)
|
||||
platform_stats = []
|
||||
platform_stats: list[StatisticResult] = []
|
||||
for domain, platform in instance.hass.data[DOMAIN].items():
|
||||
if not hasattr(platform, "compile_statistics"):
|
||||
continue
|
||||
platform_stats.append(platform.compile_statistics(instance.hass, start, end))
|
||||
platform_stat = platform.compile_statistics(instance.hass, start, end)
|
||||
_LOGGER.debug(
|
||||
"Statistics for %s during %s-%s: %s", domain, start, end, platform_stats[-1]
|
||||
"Statistics for %s during %s-%s: %s", domain, start, end, platform_stat
|
||||
)
|
||||
platform_stats.extend(platform_stat)
|
||||
|
||||
with session_scope(session=instance.get_session()) as session: # type: ignore
|
||||
for stats in platform_stats:
|
||||
for entity_id, stat in stats.items():
|
||||
metadata_id = _update_or_add_metadata(
|
||||
instance.hass, session, entity_id, stat["meta"]
|
||||
)
|
||||
metadata_id = _update_or_add_metadata(instance.hass, session, stats["meta"])
|
||||
for stat in stats["stat"]:
|
||||
try:
|
||||
session.add(
|
||||
StatisticsShortTerm.from_stats(metadata_id, start, stat["stat"])
|
||||
)
|
||||
session.add(StatisticsShortTerm.from_stats(metadata_id, stat))
|
||||
except SQLAlchemyError:
|
||||
_LOGGER.exception(
|
||||
"Unexpected exception when inserting statistics %s:%s ",
|
||||
metadata_id,
|
||||
stat,
|
||||
stats,
|
||||
)
|
||||
|
||||
if start.minute == 55:
|
||||
|
@ -431,7 +437,7 @@ def _configured_unit(unit: str, units: UnitSystem) -> str:
|
|||
|
||||
def list_statistic_ids(
|
||||
hass: HomeAssistant, statistic_type: str | None = None
|
||||
) -> list[StatisticMetaData | None]:
|
||||
) -> list[dict | None]:
|
||||
"""Return statistic_ids and meta data."""
|
||||
units = hass.config.units
|
||||
statistic_ids = {}
|
||||
|
|
|
@ -9,6 +9,11 @@ import math
|
|||
from typing import Callable
|
||||
|
||||
from homeassistant.components.recorder import history, statistics
|
||||
from homeassistant.components.recorder.models import (
|
||||
StatisticData,
|
||||
StatisticMetaData,
|
||||
StatisticResult,
|
||||
)
|
||||
from homeassistant.components.sensor import (
|
||||
ATTR_STATE_CLASS,
|
||||
DEVICE_CLASS_ENERGY,
|
||||
|
@ -309,12 +314,12 @@ def _wanted_statistics(
|
|||
|
||||
def compile_statistics( # noqa: C901
|
||||
hass: HomeAssistant, start: datetime.datetime, end: datetime.datetime
|
||||
) -> dict:
|
||||
) -> list[StatisticResult]:
|
||||
"""Compile statistics for all entities during start-end.
|
||||
|
||||
Note: This will query the database and must not be run in the event loop
|
||||
"""
|
||||
result: dict = {}
|
||||
result: list[StatisticResult] = []
|
||||
|
||||
entities = _get_entities(hass)
|
||||
|
||||
|
@ -375,21 +380,20 @@ def compile_statistics( # noqa: C901
|
|||
)
|
||||
continue
|
||||
|
||||
result[entity_id] = {}
|
||||
|
||||
# Set meta data
|
||||
result[entity_id]["meta"] = {
|
||||
meta: StatisticMetaData = {
|
||||
"statistic_id": entity_id,
|
||||
"unit_of_measurement": unit,
|
||||
"has_mean": "mean" in wanted_statistics[entity_id],
|
||||
"has_sum": "sum" in wanted_statistics[entity_id],
|
||||
}
|
||||
|
||||
# Make calculations
|
||||
stat: dict = {}
|
||||
stat: StatisticData = {"start": start}
|
||||
if "max" in wanted_statistics[entity_id]:
|
||||
stat["max"] = max(*itertools.islice(zip(*fstates), 1))
|
||||
stat["max"] = max(*itertools.islice(zip(*fstates), 1)) # type: ignore[typeddict-item]
|
||||
if "min" in wanted_statistics[entity_id]:
|
||||
stat["min"] = min(*itertools.islice(zip(*fstates), 1))
|
||||
stat["min"] = min(*itertools.islice(zip(*fstates), 1)) # type: ignore[typeddict-item]
|
||||
|
||||
if "mean" in wanted_statistics[entity_id]:
|
||||
stat["mean"] = _time_weighted_average(fstates, start, end)
|
||||
|
@ -480,12 +484,10 @@ def compile_statistics( # noqa: C901
|
|||
# Deprecated, will be removed in Home Assistant 2021.11
|
||||
if last_reset is None and state_class == STATE_CLASS_MEASUREMENT:
|
||||
# No valid updates
|
||||
result.pop(entity_id)
|
||||
continue
|
||||
|
||||
if new_state is None or old_state is None:
|
||||
# No valid updates
|
||||
result.pop(entity_id)
|
||||
continue
|
||||
|
||||
# Update the sum with the last state
|
||||
|
@ -497,7 +499,7 @@ def compile_statistics( # noqa: C901
|
|||
stat["sum_increase"] = sum_increase
|
||||
stat["state"] = new_state
|
||||
|
||||
result[entity_id]["stat"] = stat
|
||||
result.append({"meta": meta, "stat": (stat,)})
|
||||
|
||||
return result
|
||||
|
||||
|
|
|
@ -38,6 +38,14 @@ async def setup_integration(hass):
|
|||
await hass.async_block_till_done()
|
||||
|
||||
|
||||
def get_statistics_for_entity(statistics_results, entity_id):
|
||||
"""Get statistics for a certain entity, or None if there is none."""
|
||||
for statistics_result in statistics_results:
|
||||
if statistics_result["meta"]["statistic_id"] == entity_id:
|
||||
return statistics_result
|
||||
return None
|
||||
|
||||
|
||||
async def test_cost_sensor_no_states(hass, hass_storage) -> None:
|
||||
"""Test sensors are created."""
|
||||
energy_data = data.EnergyManager.default_preferences()
|
||||
|
@ -222,9 +230,9 @@ async def test_cost_sensor_price_entity_total_increasing(
|
|||
|
||||
# Check generated statistics
|
||||
await async_wait_recording_done_without_instance(hass)
|
||||
statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
|
||||
assert cost_sensor_entity_id in statistics
|
||||
assert statistics[cost_sensor_entity_id]["stat"]["sum"] == 19.0
|
||||
all_statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
|
||||
statistics = get_statistics_for_entity(all_statistics, cost_sensor_entity_id)
|
||||
assert statistics["stat"][0]["sum"] == 19.0
|
||||
|
||||
# Energy sensor has a small dip, no reset should be detected
|
||||
hass.states.async_set(
|
||||
|
@ -262,9 +270,9 @@ async def test_cost_sensor_price_entity_total_increasing(
|
|||
|
||||
# Check generated statistics
|
||||
await async_wait_recording_done_without_instance(hass)
|
||||
statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
|
||||
assert cost_sensor_entity_id in statistics
|
||||
assert statistics[cost_sensor_entity_id]["stat"]["sum"] == 38.0
|
||||
all_statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
|
||||
statistics = get_statistics_for_entity(all_statistics, cost_sensor_entity_id)
|
||||
assert statistics["stat"][0]["sum"] == 38.0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("initial_energy,initial_cost", [(0, "0.0"), (None, "unknown")])
|
||||
|
@ -427,9 +435,9 @@ async def test_cost_sensor_price_entity_total(
|
|||
|
||||
# Check generated statistics
|
||||
await async_wait_recording_done_without_instance(hass)
|
||||
statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
|
||||
assert cost_sensor_entity_id in statistics
|
||||
assert statistics[cost_sensor_entity_id]["stat"]["sum"] == 19.0
|
||||
all_statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
|
||||
statistics = get_statistics_for_entity(all_statistics, cost_sensor_entity_id)
|
||||
assert statistics["stat"][0]["sum"] == 19.0
|
||||
|
||||
# Energy sensor has a small dip
|
||||
hass.states.async_set(
|
||||
|
@ -468,9 +476,9 @@ async def test_cost_sensor_price_entity_total(
|
|||
|
||||
# Check generated statistics
|
||||
await async_wait_recording_done_without_instance(hass)
|
||||
statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
|
||||
assert cost_sensor_entity_id in statistics
|
||||
assert statistics[cost_sensor_entity_id]["stat"]["sum"] == 38.0
|
||||
all_statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
|
||||
statistics = get_statistics_for_entity(all_statistics, cost_sensor_entity_id)
|
||||
assert statistics["stat"][0]["sum"] == 38.0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("initial_energy,initial_cost", [(0, "0.0"), (None, "unknown")])
|
||||
|
@ -632,9 +640,9 @@ async def test_cost_sensor_price_entity_total_no_reset(
|
|||
|
||||
# Check generated statistics
|
||||
await async_wait_recording_done_without_instance(hass)
|
||||
statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
|
||||
assert cost_sensor_entity_id in statistics
|
||||
assert statistics[cost_sensor_entity_id]["stat"]["sum"] == 19.0
|
||||
all_statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
|
||||
statistics = get_statistics_for_entity(all_statistics, cost_sensor_entity_id)
|
||||
assert statistics["stat"][0]["sum"] == 19.0
|
||||
|
||||
# Energy sensor has a small dip
|
||||
hass.states.async_set(
|
||||
|
@ -649,9 +657,9 @@ async def test_cost_sensor_price_entity_total_no_reset(
|
|||
|
||||
# Check generated statistics
|
||||
await async_wait_recording_done_without_instance(hass)
|
||||
statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
|
||||
assert cost_sensor_entity_id in statistics
|
||||
assert statistics[cost_sensor_entity_id]["stat"]["sum"] == 18.0
|
||||
all_statistics = await hass.loop.run_in_executor(None, _compile_statistics, hass)
|
||||
statistics = get_statistics_for_entity(all_statistics, cost_sensor_entity_id)
|
||||
assert statistics["stat"][0]["sum"] == 18.0
|
||||
|
||||
|
||||
async def test_cost_sensor_handle_wh(hass, hass_storage) -> None:
|
||||
|
|
|
@ -111,21 +111,29 @@ def test_compile_hourly_statistics(hass_recorder):
|
|||
@pytest.fixture
|
||||
def mock_sensor_statistics():
|
||||
"""Generate some fake statistics."""
|
||||
sensor_stats = {
|
||||
"meta": {"unit_of_measurement": "dogs", "has_mean": True, "has_sum": False},
|
||||
"stat": {},
|
||||
}
|
||||
|
||||
def get_fake_stats():
|
||||
def sensor_stats(entity_id, start):
|
||||
"""Generate fake statistics."""
|
||||
return {
|
||||
"sensor.test1": sensor_stats,
|
||||
"sensor.test2": sensor_stats,
|
||||
"sensor.test3": sensor_stats,
|
||||
"meta": {
|
||||
"statistic_id": entity_id,
|
||||
"unit_of_measurement": "dogs",
|
||||
"has_mean": True,
|
||||
"has_sum": False,
|
||||
},
|
||||
"stat": ({"start": start},),
|
||||
}
|
||||
|
||||
def get_fake_stats(_hass, start, _end):
|
||||
return [
|
||||
sensor_stats("sensor.test1", start),
|
||||
sensor_stats("sensor.test2", start),
|
||||
sensor_stats("sensor.test3", start),
|
||||
]
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.sensor.recorder.compile_statistics",
|
||||
return_value=get_fake_stats(),
|
||||
side_effect=get_fake_stats,
|
||||
):
|
||||
yield
|
||||
|
||||
|
@ -136,12 +144,12 @@ def mock_from_stats():
|
|||
counter = 0
|
||||
real_from_stats = StatisticsShortTerm.from_stats
|
||||
|
||||
def from_stats(metadata_id, start, stats):
|
||||
def from_stats(metadata_id, stats):
|
||||
nonlocal counter
|
||||
if counter == 0 and metadata_id == 2:
|
||||
counter += 1
|
||||
return None
|
||||
return real_from_stats(metadata_id, start, stats)
|
||||
return real_from_stats(metadata_id, stats)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.recorder.statistics.StatisticsShortTerm.from_stats",
|
||||
|
@ -156,9 +164,6 @@ def test_compile_periodic_statistics_exception(
|
|||
):
|
||||
"""Test exception handling when compiling periodic statistics."""
|
||||
|
||||
def mock_from_stats():
|
||||
raise ValueError
|
||||
|
||||
hass = hass_recorder()
|
||||
recorder = hass.data[DATA_INSTANCE]
|
||||
setup_component(hass, "sensor", {})
|
||||
|
|
Loading…
Reference in New Issue