Add WS API to adjust incorrect energy statistics (#65147)

Co-authored-by: Paulus Schoutsen <balloob@gmail.com>
pull/58789/head^2
Erik Montnemery 2022-03-22 23:18:30 +01:00 committed by GitHub
parent c5a3ba4065
commit b5c5da96ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 280 additions and 41 deletions

View File

@ -462,6 +462,31 @@ class ExternalStatisticsTask(RecorderTask):
instance.queue.put(ExternalStatisticsTask(self.metadata, self.statistics))
@dataclass
class AdjustStatisticsTask(RecorderTask):
"""An object to insert into the recorder queue to run an adjust statistics task."""
statistic_id: str
start_time: datetime
sum_adjustment: float
def run(self, instance: Recorder) -> None:
"""Run statistics task."""
if statistics.adjust_statistics(
instance,
self.statistic_id,
self.start_time,
self.sum_adjustment,
):
return
# Schedule a new adjust statistics task if this one didn't finish
instance.queue.put(
AdjustStatisticsTask(
self.statistic_id, self.start_time, self.sum_adjustment
)
)
@dataclass
class WaitTask(RecorderTask):
"""An object to insert into the recorder queue to tell it set the _queue_watch event."""
@ -761,6 +786,11 @@ class Recorder(threading.Thread):
start = statistics.get_start_time()
self.queue.put(StatisticsTask(start))
@callback
def async_adjust_statistics(self, statistic_id, start_time, sum_adjustment):
"""Adjust statistics."""
self.queue.put(AdjustStatisticsTask(statistic_id, start_time, sum_adjustment))
@callback
def async_clear_statistics(self, statistic_ids):
"""Clear statistics for a list of statistic_ids."""

View File

@ -19,6 +19,7 @@ from sqlalchemy.exc import SQLAlchemyError, StatementError
from sqlalchemy.ext import baked
from sqlalchemy.orm.scoping import scoped_session
from sqlalchemy.sql.expression import literal_column, true
import voluptuous as vol
from homeassistant.const import (
PRESSURE_PA,
@ -163,6 +164,14 @@ def valid_statistic_id(statistic_id: str) -> bool:
return VALID_STATISTIC_ID.match(statistic_id) is not None
def validate_statistic_id(value: str) -> str:
"""Validate statistic ID."""
if valid_statistic_id(value):
return value
raise vol.Invalid(f"Statistics ID {value} is an invalid statistic ID")
@dataclasses.dataclass
class ValidationIssue:
"""Error or warning message."""
@ -567,6 +576,30 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool:
return True
def _adjust_sum_statistics(
session: scoped_session,
table: type[Statistics | StatisticsShortTerm],
metadata_id: int,
start_time: datetime,
adj: float,
) -> None:
"""Adjust statistics in the database."""
try:
session.query(table).filter_by(metadata_id=metadata_id).filter(
table.start >= start_time
).update(
{
table.sum: table.sum + adj,
},
synchronize_session=False,
)
except SQLAlchemyError:
_LOGGER.exception(
"Unexpected exception when updating statistics %s",
id,
)
def _insert_statistics(
session: scoped_session,
table: type[Statistics | StatisticsShortTerm],
@ -606,7 +639,7 @@ def _update_statistics(
except SQLAlchemyError:
_LOGGER.exception(
"Unexpected exception when updating statistics %s:%s ",
id,
stat_id,
statistic,
)
@ -1249,7 +1282,7 @@ def add_external_statistics(
metadata: StatisticMetaData,
statistics: Iterable[StatisticData],
) -> bool:
"""Process an add_statistics job."""
"""Process an add_external_statistics job."""
with session_scope(
session=instance.get_session(), # type: ignore[misc]
@ -1265,3 +1298,35 @@ def add_external_statistics(
_insert_statistics(session, Statistics, metadata_id, stat)
return True
@retryable_database_job("adjust_statistics")
def adjust_statistics(
instance: Recorder,
statistic_id: str,
start_time: datetime,
sum_adjustment: float,
) -> bool:
"""Process an add_statistics job."""
with session_scope(session=instance.get_session()) as session: # type: ignore[misc]
metadata = get_metadata_with_session(
instance.hass, session, statistic_ids=(statistic_id,)
)
if statistic_id not in metadata:
return True
tables: tuple[type[Statistics | StatisticsShortTerm], ...] = (
Statistics,
StatisticsShortTerm,
)
for table in tables:
_adjust_sum_statistics(
session,
table,
metadata[statistic_id][0],
start_time,
sum_adjustment,
)
return True

View File

@ -8,6 +8,7 @@ import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.core import HomeAssistant, callback
from homeassistant.util import dt as dt_util
from .const import DATA_INSTANCE, MAX_QUEUE_BACKLOG
from .statistics import list_statistic_ids, validate_statistics
@ -29,6 +30,7 @@ def async_setup(hass: HomeAssistant) -> None:
websocket_api.async_register_command(hass, ws_info)
websocket_api.async_register_command(hass, ws_backup_start)
websocket_api.async_register_command(hass, ws_backup_end)
websocket_api.async_register_command(hass, ws_adjust_sum_statistics)
@websocket_api.websocket_command(
@ -105,6 +107,34 @@ def ws_update_statistics_metadata(
connection.send_result(msg["id"])
@websocket_api.require_admin
@websocket_api.websocket_command(
{
vol.Required("type"): "recorder/adjust_sum_statistics",
vol.Required("statistic_id"): str,
vol.Required("start_time"): str,
vol.Required("adjustment"): vol.Any(float, int),
}
)
@callback
def ws_adjust_sum_statistics(
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
) -> None:
"""Adjust sum statistics."""
start_time_str = msg["start_time"]
if start_time := dt_util.parse_datetime(start_time_str):
start_time = dt_util.as_utc(start_time)
else:
connection.send_error(msg["id"], "invalid_start_time", "Invalid start time")
return
hass.data[DATA_INSTANCE].async_adjust_statistics(
msg["statistic_id"], start_time, msg["adjustment"]
)
connection.send_result(msg["id"])
@websocket_api.websocket_command(
{
vol.Required("type"): "recorder/info",

View File

@ -34,7 +34,13 @@ from homeassistant.exceptions import HomeAssistantError
from homeassistant.setup import setup_component
import homeassistant.util.dt as dt_util
from tests.common import get_test_home_assistant, mock_registry
from .common import async_wait_recording_done_without_instance
from tests.common import (
async_init_recorder_component,
get_test_home_assistant,
mock_registry,
)
from tests.components.recorder.common import wait_recording_done
ORIG_TZ = dt_util.DEFAULT_TIME_ZONE
@ -327,10 +333,11 @@ def test_statistics_duplicated(hass_recorder, caplog):
caplog.clear()
def test_external_statistics(hass_recorder, caplog):
async def test_external_statistics(hass, hass_ws_client, caplog):
"""Test inserting external statistics."""
hass = hass_recorder()
wait_recording_done(hass)
client = await hass_ws_client()
await async_init_recorder_component(hass)
assert "Compiling statistics for" not in caplog.text
assert "Statistics already compiled" not in caplog.text
@ -363,7 +370,7 @@ def test_external_statistics(hass_recorder, caplog):
async_add_external_statistics(
hass, external_metadata, (external_statistics1, external_statistics2)
)
wait_recording_done(hass)
await async_wait_recording_done_without_instance(hass)
stats = statistics_during_period(hass, zero, period="hour")
assert stats == {
"test:total_energy_import": [
@ -439,7 +446,7 @@ def test_external_statistics(hass_recorder, caplog):
"sum": 6,
}
async_add_external_statistics(hass, external_metadata, (external_statistics,))
wait_recording_done(hass)
await async_wait_recording_done_without_instance(hass)
stats = statistics_during_period(hass, zero, period="hour")
assert stats == {
"test:total_energy_import": [
@ -479,7 +486,7 @@ def test_external_statistics(hass_recorder, caplog):
"sum": 5,
}
async_add_external_statistics(hass, external_metadata, (external_statistics,))
wait_recording_done(hass)
await async_wait_recording_done_without_instance(hass)
stats = statistics_during_period(hass, zero, period="hour")
assert stats == {
"test:total_energy_import": [
@ -508,6 +515,47 @@ def test_external_statistics(hass_recorder, caplog):
]
}
await client.send_json(
{
"id": 1,
"type": "recorder/adjust_sum_statistics",
"statistic_id": "test:total_energy_import",
"start_time": period2.isoformat(),
"adjustment": 1000.0,
}
)
response = await client.receive_json()
assert response["success"]
await async_wait_recording_done_without_instance(hass)
stats = statistics_during_period(hass, zero, period="hour")
assert stats == {
"test:total_energy_import": [
{
"statistic_id": "test:total_energy_import",
"start": period1.isoformat(),
"end": (period1 + timedelta(hours=1)).isoformat(),
"max": approx(1.0),
"mean": approx(2.0),
"min": approx(3.0),
"last_reset": None,
"state": approx(4.0),
"sum": approx(5.0),
},
{
"statistic_id": "test:total_energy_import",
"start": period2.isoformat(),
"end": (period2 + timedelta(hours=1)).isoformat(),
"max": None,
"mean": None,
"min": None,
"last_reset": None,
"state": approx(1.0),
"sum": approx(1003.0),
},
]
}
def test_external_statistics_errors(hass_recorder, caplog):
"""Test validation of external statistics."""

View File

@ -1,6 +1,7 @@
"""The tests for sensor recorder platform."""
# pylint: disable=protected-access,invalid-name
from datetime import timedelta
from functools import partial
import math
from statistics import mean
from unittest.mock import patch
@ -26,8 +27,15 @@ from homeassistant.setup import setup_component
import homeassistant.util.dt as dt_util
from homeassistant.util.unit_system import IMPERIAL_SYSTEM, METRIC_SYSTEM
from tests.common import async_setup_component, init_recorder_component
from tests.components.recorder.common import wait_recording_done
from tests.common import (
async_init_recorder_component,
async_setup_component,
init_recorder_component,
)
from tests.components.recorder.common import (
async_wait_recording_done_without_instance,
wait_recording_done,
)
BATTERY_SENSOR_ATTRIBUTES = {
"device_class": "battery",
@ -307,34 +315,44 @@ def test_compile_hourly_statistics_unsupported(hass_recorder, caplog, attributes
@pytest.mark.parametrize("state_class", ["total"])
@pytest.mark.parametrize(
"units,device_class,unit,display_unit,factor",
"units,device_class,unit,display_unit,factor,factor2",
[
(IMPERIAL_SYSTEM, "energy", "kWh", "kWh", 1),
(IMPERIAL_SYSTEM, "energy", "Wh", "kWh", 1 / 1000),
(IMPERIAL_SYSTEM, "monetary", "EUR", "EUR", 1),
(IMPERIAL_SYSTEM, "monetary", "SEK", "SEK", 1),
(IMPERIAL_SYSTEM, "gas", "", "ft³", 35.314666711),
(IMPERIAL_SYSTEM, "gas", "ft³", "ft³", 1),
(METRIC_SYSTEM, "energy", "kWh", "kWh", 1),
(METRIC_SYSTEM, "energy", "Wh", "kWh", 1 / 1000),
(METRIC_SYSTEM, "monetary", "EUR", "EUR", 1),
(METRIC_SYSTEM, "monetary", "SEK", "SEK", 1),
(METRIC_SYSTEM, "gas", "", "", 1),
(METRIC_SYSTEM, "gas", "ft³", "", 0.0283168466),
(IMPERIAL_SYSTEM, "energy", "kWh", "kWh", 1, 1),
(IMPERIAL_SYSTEM, "energy", "Wh", "kWh", 1 / 1000, 1),
(IMPERIAL_SYSTEM, "monetary", "EUR", "EUR", 1, 1),
(IMPERIAL_SYSTEM, "monetary", "SEK", "SEK", 1, 1),
(IMPERIAL_SYSTEM, "gas", "", "ft³", 35.314666711, 35.314666711),
(IMPERIAL_SYSTEM, "gas", "ft³", "ft³", 1, 35.314666711),
(METRIC_SYSTEM, "energy", "kWh", "kWh", 1, 1),
(METRIC_SYSTEM, "energy", "Wh", "kWh", 1 / 1000, 1),
(METRIC_SYSTEM, "monetary", "EUR", "EUR", 1, 1),
(METRIC_SYSTEM, "monetary", "SEK", "SEK", 1, 1),
(METRIC_SYSTEM, "gas", "", "", 1, 1),
(METRIC_SYSTEM, "gas", "ft³", "", 0.0283168466, 1),
],
)
def test_compile_hourly_sum_statistics_amount(
hass_recorder, caplog, units, state_class, device_class, unit, display_unit, factor
async def test_compile_hourly_sum_statistics_amount(
hass,
hass_ws_client,
caplog,
units,
state_class,
device_class,
unit,
display_unit,
factor,
factor2,
):
"""Test compiling hourly statistics."""
period0 = dt_util.utcnow()
period0_end = period1 = period0 + timedelta(minutes=5)
period1_end = period2 = period0 + timedelta(minutes=10)
period2_end = period0 + timedelta(minutes=15)
hass = hass_recorder()
client = await hass_ws_client()
await async_init_recorder_component(hass)
hass.config.units = units
recorder = hass.data[DATA_INSTANCE]
setup_component(hass, "sensor", {})
await async_setup_component(hass, "sensor", {})
attributes = {
"device_class": device_class,
"state_class": state_class,
@ -343,21 +361,28 @@ def test_compile_hourly_sum_statistics_amount(
}
seq = [10, 15, 20, 10, 30, 40, 50, 60, 70]
four, eight, states = record_meter_states(
hass, period0, "sensor.test1", attributes, seq
four, eight, states = await hass.async_add_executor_job(
record_meter_states, hass, period0, "sensor.test1", attributes, seq
)
await async_wait_recording_done_without_instance(hass)
hist = history.get_significant_states(
hass, period0 - timedelta.resolution, eight + timedelta.resolution
)
assert dict(states)["sensor.test1"] == dict(hist)["sensor.test1"]
recorder.do_adhoc_statistics(start=period0)
wait_recording_done(hass)
recorder.do_adhoc_statistics(start=period1)
wait_recording_done(hass)
recorder.do_adhoc_statistics(start=period2)
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
await hass.async_add_executor_job(
partial(recorder.do_adhoc_statistics, start=period0)
)
await async_wait_recording_done_without_instance(hass)
await hass.async_add_executor_job(
partial(recorder.do_adhoc_statistics, start=period1)
)
await async_wait_recording_done_without_instance(hass)
await hass.async_add_executor_job(
partial(recorder.do_adhoc_statistics, start=period2)
)
await async_wait_recording_done_without_instance(hass)
statistic_ids = await hass.async_add_executor_job(list_statistic_ids, hass)
assert statistic_ids == [
{
"statistic_id": "sensor.test1",
@ -416,20 +441,57 @@ def test_compile_hourly_sum_statistics_amount(
stats = statistics_during_period(
hass, period0 + timedelta(minutes=5), period="5minute"
)
expected_stats["sensor.test1"] = expected_stats["sensor.test1"][1:3]
assert stats == expected_stats
assert stats == {"sensor.test1": expected_stats["sensor.test1"][1:3]}
# With an offset of 6 minutes, we expect to get the 2nd and 3rd periods
stats = statistics_during_period(
hass, period0 + timedelta(minutes=6), period="5minute"
)
assert stats == expected_stats
assert stats == {"sensor.test1": expected_stats["sensor.test1"][1:3]}
assert "Error while processing event StatisticsTask" not in caplog.text
assert "Detected new cycle for sensor.test1, last_reset set to" in caplog.text
assert "Compiling initial sum statistics for sensor.test1" in caplog.text
assert "Detected new cycle for sensor.test1, value dropped" not in caplog.text
# Adjust the inserted statistics
await client.send_json(
{
"id": 1,
"type": "recorder/adjust_sum_statistics",
"statistic_id": "sensor.test1",
"start_time": period1.isoformat(),
"adjustment": 100.0,
}
)
response = await client.receive_json()
assert response["success"]
await async_wait_recording_done_without_instance(hass)
expected_stats["sensor.test1"][1]["sum"] = approx(factor * 40.0 + factor2 * 100)
expected_stats["sensor.test1"][2]["sum"] = approx(factor * 70.0 + factor2 * 100)
stats = statistics_during_period(hass, period0, period="5minute")
assert stats == expected_stats
# Adjust the inserted statistics
await client.send_json(
{
"id": 2,
"type": "recorder/adjust_sum_statistics",
"statistic_id": "sensor.test1",
"start_time": period2.isoformat(),
"adjustment": -400.0,
}
)
response = await client.receive_json()
assert response["success"]
await async_wait_recording_done_without_instance(hass)
expected_stats["sensor.test1"][1]["sum"] = approx(factor * 40.0 + factor2 * 100)
expected_stats["sensor.test1"][2]["sum"] = approx(factor * 70.0 - factor2 * 300)
stats = statistics_during_period(hass, period0, period="5minute")
assert stats == expected_stats
@pytest.mark.parametrize("state_class", ["total"])
@pytest.mark.parametrize(
@ -836,6 +898,7 @@ def test_compile_hourly_sum_statistics_total_no_reset(
four, eight, states = record_meter_states(
hass, period0, "sensor.test1", attributes, seq
)
wait_recording_done(hass)
hist = history.get_significant_states(
hass, period0 - timedelta.resolution, eight + timedelta.resolution
)
@ -927,6 +990,7 @@ def test_compile_hourly_sum_statistics_total_increasing(
four, eight, states = record_meter_states(
hass, period0, "sensor.test1", attributes, seq
)
wait_recording_done(hass)
hist = history.get_significant_states(
hass, period0 - timedelta.resolution, eight + timedelta.resolution
)
@ -1016,6 +1080,7 @@ def test_compile_hourly_sum_statistics_total_increasing_small_dip(
four, eight, states = record_meter_states(
hass, period0, "sensor.test1", attributes, seq
)
wait_recording_done(hass)
hist = history.get_significant_states(
hass, period0 - timedelta.resolution, eight + timedelta.resolution
)
@ -1118,6 +1183,7 @@ def test_compile_hourly_energy_statistics_unsupported(hass_recorder, caplog):
states = {**states, **_states}
_, _, _states = record_meter_states(hass, period0, "sensor.test3", sns3_attr, seq3)
states = {**states, **_states}
wait_recording_done(hass)
hist = history.get_significant_states(
hass, period0 - timedelta.resolution, eight + timedelta.resolution
@ -1207,6 +1273,7 @@ def test_compile_hourly_energy_statistics_multiple(hass_recorder, caplog):
states = {**states, **_states}
_, _, _states = record_meter_states(hass, period0, "sensor.test3", sns3_attr, seq3)
states = {**states, **_states}
wait_recording_done(hass)
hist = history.get_significant_states(
hass, period0 - timedelta.resolution, eight + timedelta.resolution
)
@ -3164,7 +3231,6 @@ def record_meter_states(hass, zero, entity_id, _attributes, seq):
def set_state(entity_id, state, **kwargs):
"""Set the state."""
hass.states.set(entity_id, state, **kwargs)
wait_recording_done(hass)
return hass.states.get(entity_id)
one = zero + timedelta(seconds=15 * 5) # 00:01:15