Add week period to recorder statistics api (#80784)

* add week period to get statistics api

* add test
pull/80970/head
Michael 2022-10-25 20:07:28 +02:00 committed by GitHub
parent c197e1765a
commit 8175dab7ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 175 additions and 5 deletions

View File

@ -1014,6 +1014,35 @@ def _reduce_statistics_per_day(
return _reduce_statistics(stats, same_day, day_start_end, timedelta(days=1))
def same_week(time1: datetime, time2: datetime) -> bool:
"""Return True if time1 and time2 are in the same year and week."""
date1 = dt_util.as_local(time1).date()
date2 = dt_util.as_local(time2).date()
return (date1.year, date1.isocalendar().week) == (
date2.year,
date2.isocalendar().week,
)
def week_start_end(time: datetime) -> tuple[datetime, datetime]:
"""Return the start and end of the period (week) time is within."""
time_local = dt_util.as_local(time)
start_local = time_local.replace(
hour=0, minute=0, second=0, microsecond=0
) - timedelta(days=time_local.weekday())
start = dt_util.as_utc(start_local)
end = dt_util.as_utc(start_local + timedelta(days=7))
return (start, end)
def _reduce_statistics_per_week(
stats: dict[str, list[dict[str, Any]]],
) -> dict[str, list[dict[str, Any]]]:
"""Reduce hourly statistics to weekly statistics."""
return _reduce_statistics(stats, same_week, week_start_end, timedelta(days=7))
def same_month(time1: datetime, time2: datetime) -> bool:
"""Return True if time1 and time2 are in the same year and month."""
date1 = dt_util.as_local(time1).date()
@ -1089,7 +1118,7 @@ def statistics_during_period(
start_time: datetime,
end_time: datetime | None = None,
statistic_ids: list[str] | None = None,
period: Literal["5minute", "day", "hour", "month"] = "hour",
period: Literal["5minute", "day", "hour", "week", "month"] = "hour",
start_time_as_datetime: bool = False,
units: dict[str, str] | None = None,
) -> dict[str, list[dict[str, Any]]]:
@ -1122,7 +1151,7 @@ def statistics_during_period(
if not stats:
return {}
# Return statistics combined with metadata
if period not in ("day", "month"):
if period not in ("day", "week", "month"):
return _sorted_statistics_to_dict(
hass,
session,
@ -1152,6 +1181,9 @@ def statistics_during_period(
if period == "day":
return _reduce_statistics_per_day(result)
if period == "week":
return _reduce_statistics_per_week(result)
return _reduce_statistics_per_month(result)

View File

@ -62,7 +62,7 @@ def _ws_get_statistics_during_period(
start_time: dt,
end_time: dt | None,
statistic_ids: list[str] | None,
period: Literal["5minute", "day", "hour", "month"],
period: Literal["5minute", "day", "hour", "week", "month"],
units: dict[str, str],
) -> str:
"""Fetch statistics and convert them to json in the executor."""
@ -118,7 +118,7 @@ async def ws_handle_get_statistics_during_period(
vol.Required("start_time"): str,
vol.Optional("end_time"): str,
vol.Optional("statistic_ids"): [str],
vol.Required("period"): vol.Any("5minute", "hour", "day", "month"),
vol.Required("period"): vol.Any("5minute", "hour", "day", "week", "month"),
vol.Optional("units"): vol.Schema(
{
vol.Optional("distance"): vol.In(DistanceConverter.VALID_UNITS),

View File

@ -885,10 +885,148 @@ def test_import_statistics_errors(hass_recorder, caplog):
assert get_metadata(hass, statistic_ids=("sensor.total_energy_import",)) == {}
@pytest.mark.parametrize("timezone", ["America/Regina", "Europe/Vienna", "UTC"])
@pytest.mark.freeze_time("2022-10-01 00:00:00+00:00")
def test_weekly_statistics(hass_recorder, caplog, timezone):
"""Test weekly statistics."""
dt_util.set_default_time_zone(dt_util.get_time_zone(timezone))
hass = hass_recorder()
wait_recording_done(hass)
assert "Compiling statistics for" not in caplog.text
assert "Statistics already compiled" not in caplog.text
zero = dt_util.utcnow()
period1 = dt_util.as_utc(dt_util.parse_datetime("2022-10-03 00:00:00"))
period2 = dt_util.as_utc(dt_util.parse_datetime("2022-10-09 23:00:00"))
period3 = dt_util.as_utc(dt_util.parse_datetime("2022-10-10 00:00:00"))
period4 = dt_util.as_utc(dt_util.parse_datetime("2022-10-16 23:00:00"))
external_statistics = (
{
"start": period1,
"last_reset": None,
"state": 0,
"sum": 2,
},
{
"start": period2,
"last_reset": None,
"state": 1,
"sum": 3,
},
{
"start": period3,
"last_reset": None,
"state": 2,
"sum": 4,
},
{
"start": period4,
"last_reset": None,
"state": 3,
"sum": 5,
},
)
external_metadata = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "test",
"statistic_id": "test:total_energy_import",
"unit_of_measurement": "kWh",
}
async_add_external_statistics(hass, external_metadata, external_statistics)
wait_recording_done(hass)
stats = statistics_during_period(hass, zero, period="week")
week1_start = dt_util.as_utc(dt_util.parse_datetime("2022-10-03 00:00:00"))
week1_end = dt_util.as_utc(dt_util.parse_datetime("2022-10-10 00:00:00"))
week2_start = dt_util.as_utc(dt_util.parse_datetime("2022-10-10 00:00:00"))
week2_end = dt_util.as_utc(dt_util.parse_datetime("2022-10-17 00:00:00"))
assert stats == {
"test:total_energy_import": [
{
"statistic_id": "test:total_energy_import",
"start": week1_start.isoformat(),
"end": week1_end.isoformat(),
"max": None,
"mean": None,
"min": None,
"last_reset": None,
"state": 1.0,
"sum": 3.0,
},
{
"statistic_id": "test:total_energy_import",
"start": week2_start.isoformat(),
"end": week2_end.isoformat(),
"max": None,
"mean": None,
"min": None,
"last_reset": None,
"state": 3.0,
"sum": 5.0,
},
]
}
stats = statistics_during_period(
hass,
start_time=zero,
statistic_ids=["not", "the", "same", "test:total_energy_import"],
period="week",
)
assert stats == {
"test:total_energy_import": [
{
"statistic_id": "test:total_energy_import",
"start": week1_start.isoformat(),
"end": week1_end.isoformat(),
"max": None,
"mean": None,
"min": None,
"last_reset": None,
"state": 1.0,
"sum": 3.0,
},
{
"statistic_id": "test:total_energy_import",
"start": week2_start.isoformat(),
"end": week2_end.isoformat(),
"max": None,
"mean": None,
"min": None,
"last_reset": None,
"state": 3.0,
"sum": 5.0,
},
]
}
# Use 5minute to ensure table switch works
stats = statistics_during_period(
hass,
start_time=zero,
statistic_ids=["test:total_energy_import", "with_other"],
period="5minute",
)
assert stats == {}
# Ensure future date has not data
future = dt_util.as_utc(dt_util.parse_datetime("2221-11-01 00:00:00"))
stats = statistics_during_period(
hass, start_time=future, end_time=future, period="month"
)
assert stats == {}
dt_util.set_default_time_zone(dt_util.get_time_zone("UTC"))
@pytest.mark.parametrize("timezone", ["America/Regina", "Europe/Vienna", "UTC"])
@pytest.mark.freeze_time("2021-08-01 00:00:00+00:00")
def test_monthly_statistics(hass_recorder, caplog, timezone):
"""Test inserting external statistics."""
"""Test monthly statistics."""
dt_util.set_default_time_zone(dt_util.get_time_zone(timezone))
hass = hass_recorder()