From b5c5da96acbb8d16f9a0974cf45c92e5da516a81 Mon Sep 17 00:00:00 2001 From: Erik Montnemery Date: Tue, 22 Mar 2022 23:18:30 +0100 Subject: [PATCH] Add WS API to adjust incorrect energy statistics (#65147) Co-authored-by: Paulus Schoutsen --- homeassistant/components/recorder/__init__.py | 30 ++++ .../components/recorder/statistics.py | 69 +++++++++- .../components/recorder/websocket_api.py | 30 ++++ tests/components/recorder/test_statistics.py | 62 ++++++++- tests/components/sensor/test_recorder.py | 130 +++++++++++++----- 5 files changed, 280 insertions(+), 41 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index c188dd6c4b6..aae6576c225 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -462,6 +462,31 @@ class ExternalStatisticsTask(RecorderTask): instance.queue.put(ExternalStatisticsTask(self.metadata, self.statistics)) +@dataclass +class AdjustStatisticsTask(RecorderTask): + """An object to insert into the recorder queue to run an adjust statistics task.""" + + statistic_id: str + start_time: datetime + sum_adjustment: float + + def run(self, instance: Recorder) -> None: + """Run statistics task.""" + if statistics.adjust_statistics( + instance, + self.statistic_id, + self.start_time, + self.sum_adjustment, + ): + return + # Schedule a new adjust statistics task if this one didn't finish + instance.queue.put( + AdjustStatisticsTask( + self.statistic_id, self.start_time, self.sum_adjustment + ) + ) + + @dataclass class WaitTask(RecorderTask): """An object to insert into the recorder queue to tell it set the _queue_watch event.""" @@ -761,6 +786,11 @@ class Recorder(threading.Thread): start = statistics.get_start_time() self.queue.put(StatisticsTask(start)) + @callback + def async_adjust_statistics(self, statistic_id, start_time, sum_adjustment): + """Adjust statistics.""" + self.queue.put(AdjustStatisticsTask(statistic_id, start_time, sum_adjustment)) + @callback def async_clear_statistics(self, statistic_ids): """Clear statistics for a list of statistic_ids.""" diff --git a/homeassistant/components/recorder/statistics.py b/homeassistant/components/recorder/statistics.py index df53bd55307..b27a08f489c 100644 --- a/homeassistant/components/recorder/statistics.py +++ b/homeassistant/components/recorder/statistics.py @@ -19,6 +19,7 @@ from sqlalchemy.exc import SQLAlchemyError, StatementError from sqlalchemy.ext import baked from sqlalchemy.orm.scoping import scoped_session from sqlalchemy.sql.expression import literal_column, true +import voluptuous as vol from homeassistant.const import ( PRESSURE_PA, @@ -163,6 +164,14 @@ def valid_statistic_id(statistic_id: str) -> bool: return VALID_STATISTIC_ID.match(statistic_id) is not None +def validate_statistic_id(value: str) -> str: + """Validate statistic ID.""" + if valid_statistic_id(value): + return value + + raise vol.Invalid(f"Statistics ID {value} is an invalid statistic ID") + + @dataclasses.dataclass class ValidationIssue: """Error or warning message.""" @@ -567,6 +576,30 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool: return True +def _adjust_sum_statistics( + session: scoped_session, + table: type[Statistics | StatisticsShortTerm], + metadata_id: int, + start_time: datetime, + adj: float, +) -> None: + """Adjust statistics in the database.""" + try: + session.query(table).filter_by(metadata_id=metadata_id).filter( + table.start >= start_time + ).update( + { + table.sum: table.sum + adj, + }, + synchronize_session=False, + ) + except SQLAlchemyError: + _LOGGER.exception( + "Unexpected exception when updating statistics %s", + id, + ) + + def _insert_statistics( session: scoped_session, table: type[Statistics | StatisticsShortTerm], @@ -606,7 +639,7 @@ def _update_statistics( except SQLAlchemyError: _LOGGER.exception( "Unexpected exception when updating statistics %s:%s ", - id, + stat_id, statistic, ) @@ -1249,7 +1282,7 @@ def add_external_statistics( metadata: StatisticMetaData, statistics: Iterable[StatisticData], ) -> bool: - """Process an add_statistics job.""" + """Process an add_external_statistics job.""" with session_scope( session=instance.get_session(), # type: ignore[misc] @@ -1265,3 +1298,35 @@ def add_external_statistics( _insert_statistics(session, Statistics, metadata_id, stat) return True + + +@retryable_database_job("adjust_statistics") +def adjust_statistics( + instance: Recorder, + statistic_id: str, + start_time: datetime, + sum_adjustment: float, +) -> bool: + """Process an add_statistics job.""" + + with session_scope(session=instance.get_session()) as session: # type: ignore[misc] + metadata = get_metadata_with_session( + instance.hass, session, statistic_ids=(statistic_id,) + ) + if statistic_id not in metadata: + return True + + tables: tuple[type[Statistics | StatisticsShortTerm], ...] = ( + Statistics, + StatisticsShortTerm, + ) + for table in tables: + _adjust_sum_statistics( + session, + table, + metadata[statistic_id][0], + start_time, + sum_adjustment, + ) + + return True diff --git a/homeassistant/components/recorder/websocket_api.py b/homeassistant/components/recorder/websocket_api.py index a480439eaac..241dca9026c 100644 --- a/homeassistant/components/recorder/websocket_api.py +++ b/homeassistant/components/recorder/websocket_api.py @@ -8,6 +8,7 @@ import voluptuous as vol from homeassistant.components import websocket_api from homeassistant.core import HomeAssistant, callback +from homeassistant.util import dt as dt_util from .const import DATA_INSTANCE, MAX_QUEUE_BACKLOG from .statistics import list_statistic_ids, validate_statistics @@ -29,6 +30,7 @@ def async_setup(hass: HomeAssistant) -> None: websocket_api.async_register_command(hass, ws_info) websocket_api.async_register_command(hass, ws_backup_start) websocket_api.async_register_command(hass, ws_backup_end) + websocket_api.async_register_command(hass, ws_adjust_sum_statistics) @websocket_api.websocket_command( @@ -105,6 +107,34 @@ def ws_update_statistics_metadata( connection.send_result(msg["id"]) +@websocket_api.require_admin +@websocket_api.websocket_command( + { + vol.Required("type"): "recorder/adjust_sum_statistics", + vol.Required("statistic_id"): str, + vol.Required("start_time"): str, + vol.Required("adjustment"): vol.Any(float, int), + } +) +@callback +def ws_adjust_sum_statistics( + hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict +) -> None: + """Adjust sum statistics.""" + start_time_str = msg["start_time"] + + if start_time := dt_util.parse_datetime(start_time_str): + start_time = dt_util.as_utc(start_time) + else: + connection.send_error(msg["id"], "invalid_start_time", "Invalid start time") + return + + hass.data[DATA_INSTANCE].async_adjust_statistics( + msg["statistic_id"], start_time, msg["adjustment"] + ) + connection.send_result(msg["id"]) + + @websocket_api.websocket_command( { vol.Required("type"): "recorder/info", diff --git a/tests/components/recorder/test_statistics.py b/tests/components/recorder/test_statistics.py index fe05dbc25ab..29853c2cc0e 100644 --- a/tests/components/recorder/test_statistics.py +++ b/tests/components/recorder/test_statistics.py @@ -34,7 +34,13 @@ from homeassistant.exceptions import HomeAssistantError from homeassistant.setup import setup_component import homeassistant.util.dt as dt_util -from tests.common import get_test_home_assistant, mock_registry +from .common import async_wait_recording_done_without_instance + +from tests.common import ( + async_init_recorder_component, + get_test_home_assistant, + mock_registry, +) from tests.components.recorder.common import wait_recording_done ORIG_TZ = dt_util.DEFAULT_TIME_ZONE @@ -327,10 +333,11 @@ def test_statistics_duplicated(hass_recorder, caplog): caplog.clear() -def test_external_statistics(hass_recorder, caplog): +async def test_external_statistics(hass, hass_ws_client, caplog): """Test inserting external statistics.""" - hass = hass_recorder() - wait_recording_done(hass) + client = await hass_ws_client() + await async_init_recorder_component(hass) + assert "Compiling statistics for" not in caplog.text assert "Statistics already compiled" not in caplog.text @@ -363,7 +370,7 @@ def test_external_statistics(hass_recorder, caplog): async_add_external_statistics( hass, external_metadata, (external_statistics1, external_statistics2) ) - wait_recording_done(hass) + await async_wait_recording_done_without_instance(hass) stats = statistics_during_period(hass, zero, period="hour") assert stats == { "test:total_energy_import": [ @@ -439,7 +446,7 @@ def test_external_statistics(hass_recorder, caplog): "sum": 6, } async_add_external_statistics(hass, external_metadata, (external_statistics,)) - wait_recording_done(hass) + await async_wait_recording_done_without_instance(hass) stats = statistics_during_period(hass, zero, period="hour") assert stats == { "test:total_energy_import": [ @@ -479,7 +486,7 @@ def test_external_statistics(hass_recorder, caplog): "sum": 5, } async_add_external_statistics(hass, external_metadata, (external_statistics,)) - wait_recording_done(hass) + await async_wait_recording_done_without_instance(hass) stats = statistics_during_period(hass, zero, period="hour") assert stats == { "test:total_energy_import": [ @@ -508,6 +515,47 @@ def test_external_statistics(hass_recorder, caplog): ] } + await client.send_json( + { + "id": 1, + "type": "recorder/adjust_sum_statistics", + "statistic_id": "test:total_energy_import", + "start_time": period2.isoformat(), + "adjustment": 1000.0, + } + ) + response = await client.receive_json() + assert response["success"] + + await async_wait_recording_done_without_instance(hass) + stats = statistics_during_period(hass, zero, period="hour") + assert stats == { + "test:total_energy_import": [ + { + "statistic_id": "test:total_energy_import", + "start": period1.isoformat(), + "end": (period1 + timedelta(hours=1)).isoformat(), + "max": approx(1.0), + "mean": approx(2.0), + "min": approx(3.0), + "last_reset": None, + "state": approx(4.0), + "sum": approx(5.0), + }, + { + "statistic_id": "test:total_energy_import", + "start": period2.isoformat(), + "end": (period2 + timedelta(hours=1)).isoformat(), + "max": None, + "mean": None, + "min": None, + "last_reset": None, + "state": approx(1.0), + "sum": approx(1003.0), + }, + ] + } + def test_external_statistics_errors(hass_recorder, caplog): """Test validation of external statistics.""" diff --git a/tests/components/sensor/test_recorder.py b/tests/components/sensor/test_recorder.py index af1494b381e..f26b5e7ead3 100644 --- a/tests/components/sensor/test_recorder.py +++ b/tests/components/sensor/test_recorder.py @@ -1,6 +1,7 @@ """The tests for sensor recorder platform.""" # pylint: disable=protected-access,invalid-name from datetime import timedelta +from functools import partial import math from statistics import mean from unittest.mock import patch @@ -26,8 +27,15 @@ from homeassistant.setup import setup_component import homeassistant.util.dt as dt_util from homeassistant.util.unit_system import IMPERIAL_SYSTEM, METRIC_SYSTEM -from tests.common import async_setup_component, init_recorder_component -from tests.components.recorder.common import wait_recording_done +from tests.common import ( + async_init_recorder_component, + async_setup_component, + init_recorder_component, +) +from tests.components.recorder.common import ( + async_wait_recording_done_without_instance, + wait_recording_done, +) BATTERY_SENSOR_ATTRIBUTES = { "device_class": "battery", @@ -307,34 +315,44 @@ def test_compile_hourly_statistics_unsupported(hass_recorder, caplog, attributes @pytest.mark.parametrize("state_class", ["total"]) @pytest.mark.parametrize( - "units,device_class,unit,display_unit,factor", + "units,device_class,unit,display_unit,factor,factor2", [ - (IMPERIAL_SYSTEM, "energy", "kWh", "kWh", 1), - (IMPERIAL_SYSTEM, "energy", "Wh", "kWh", 1 / 1000), - (IMPERIAL_SYSTEM, "monetary", "EUR", "EUR", 1), - (IMPERIAL_SYSTEM, "monetary", "SEK", "SEK", 1), - (IMPERIAL_SYSTEM, "gas", "m³", "ft³", 35.314666711), - (IMPERIAL_SYSTEM, "gas", "ft³", "ft³", 1), - (METRIC_SYSTEM, "energy", "kWh", "kWh", 1), - (METRIC_SYSTEM, "energy", "Wh", "kWh", 1 / 1000), - (METRIC_SYSTEM, "monetary", "EUR", "EUR", 1), - (METRIC_SYSTEM, "monetary", "SEK", "SEK", 1), - (METRIC_SYSTEM, "gas", "m³", "m³", 1), - (METRIC_SYSTEM, "gas", "ft³", "m³", 0.0283168466), + (IMPERIAL_SYSTEM, "energy", "kWh", "kWh", 1, 1), + (IMPERIAL_SYSTEM, "energy", "Wh", "kWh", 1 / 1000, 1), + (IMPERIAL_SYSTEM, "monetary", "EUR", "EUR", 1, 1), + (IMPERIAL_SYSTEM, "monetary", "SEK", "SEK", 1, 1), + (IMPERIAL_SYSTEM, "gas", "m³", "ft³", 35.314666711, 35.314666711), + (IMPERIAL_SYSTEM, "gas", "ft³", "ft³", 1, 35.314666711), + (METRIC_SYSTEM, "energy", "kWh", "kWh", 1, 1), + (METRIC_SYSTEM, "energy", "Wh", "kWh", 1 / 1000, 1), + (METRIC_SYSTEM, "monetary", "EUR", "EUR", 1, 1), + (METRIC_SYSTEM, "monetary", "SEK", "SEK", 1, 1), + (METRIC_SYSTEM, "gas", "m³", "m³", 1, 1), + (METRIC_SYSTEM, "gas", "ft³", "m³", 0.0283168466, 1), ], ) -def test_compile_hourly_sum_statistics_amount( - hass_recorder, caplog, units, state_class, device_class, unit, display_unit, factor +async def test_compile_hourly_sum_statistics_amount( + hass, + hass_ws_client, + caplog, + units, + state_class, + device_class, + unit, + display_unit, + factor, + factor2, ): """Test compiling hourly statistics.""" period0 = dt_util.utcnow() period0_end = period1 = period0 + timedelta(minutes=5) period1_end = period2 = period0 + timedelta(minutes=10) period2_end = period0 + timedelta(minutes=15) - hass = hass_recorder() + client = await hass_ws_client() + await async_init_recorder_component(hass) hass.config.units = units recorder = hass.data[DATA_INSTANCE] - setup_component(hass, "sensor", {}) + await async_setup_component(hass, "sensor", {}) attributes = { "device_class": device_class, "state_class": state_class, @@ -343,21 +361,28 @@ def test_compile_hourly_sum_statistics_amount( } seq = [10, 15, 20, 10, 30, 40, 50, 60, 70] - four, eight, states = record_meter_states( - hass, period0, "sensor.test1", attributes, seq + four, eight, states = await hass.async_add_executor_job( + record_meter_states, hass, period0, "sensor.test1", attributes, seq ) + await async_wait_recording_done_without_instance(hass) hist = history.get_significant_states( hass, period0 - timedelta.resolution, eight + timedelta.resolution ) assert dict(states)["sensor.test1"] == dict(hist)["sensor.test1"] - recorder.do_adhoc_statistics(start=period0) - wait_recording_done(hass) - recorder.do_adhoc_statistics(start=period1) - wait_recording_done(hass) - recorder.do_adhoc_statistics(start=period2) - wait_recording_done(hass) - statistic_ids = list_statistic_ids(hass) + await hass.async_add_executor_job( + partial(recorder.do_adhoc_statistics, start=period0) + ) + await async_wait_recording_done_without_instance(hass) + await hass.async_add_executor_job( + partial(recorder.do_adhoc_statistics, start=period1) + ) + await async_wait_recording_done_without_instance(hass) + await hass.async_add_executor_job( + partial(recorder.do_adhoc_statistics, start=period2) + ) + await async_wait_recording_done_without_instance(hass) + statistic_ids = await hass.async_add_executor_job(list_statistic_ids, hass) assert statistic_ids == [ { "statistic_id": "sensor.test1", @@ -416,20 +441,57 @@ def test_compile_hourly_sum_statistics_amount( stats = statistics_during_period( hass, period0 + timedelta(minutes=5), period="5minute" ) - expected_stats["sensor.test1"] = expected_stats["sensor.test1"][1:3] - assert stats == expected_stats + assert stats == {"sensor.test1": expected_stats["sensor.test1"][1:3]} # With an offset of 6 minutes, we expect to get the 2nd and 3rd periods stats = statistics_during_period( hass, period0 + timedelta(minutes=6), period="5minute" ) - assert stats == expected_stats + assert stats == {"sensor.test1": expected_stats["sensor.test1"][1:3]} assert "Error while processing event StatisticsTask" not in caplog.text assert "Detected new cycle for sensor.test1, last_reset set to" in caplog.text assert "Compiling initial sum statistics for sensor.test1" in caplog.text assert "Detected new cycle for sensor.test1, value dropped" not in caplog.text + # Adjust the inserted statistics + await client.send_json( + { + "id": 1, + "type": "recorder/adjust_sum_statistics", + "statistic_id": "sensor.test1", + "start_time": period1.isoformat(), + "adjustment": 100.0, + } + ) + response = await client.receive_json() + assert response["success"] + await async_wait_recording_done_without_instance(hass) + + expected_stats["sensor.test1"][1]["sum"] = approx(factor * 40.0 + factor2 * 100) + expected_stats["sensor.test1"][2]["sum"] = approx(factor * 70.0 + factor2 * 100) + stats = statistics_during_period(hass, period0, period="5minute") + assert stats == expected_stats + + # Adjust the inserted statistics + await client.send_json( + { + "id": 2, + "type": "recorder/adjust_sum_statistics", + "statistic_id": "sensor.test1", + "start_time": period2.isoformat(), + "adjustment": -400.0, + } + ) + response = await client.receive_json() + assert response["success"] + await async_wait_recording_done_without_instance(hass) + + expected_stats["sensor.test1"][1]["sum"] = approx(factor * 40.0 + factor2 * 100) + expected_stats["sensor.test1"][2]["sum"] = approx(factor * 70.0 - factor2 * 300) + stats = statistics_during_period(hass, period0, period="5minute") + assert stats == expected_stats + @pytest.mark.parametrize("state_class", ["total"]) @pytest.mark.parametrize( @@ -836,6 +898,7 @@ def test_compile_hourly_sum_statistics_total_no_reset( four, eight, states = record_meter_states( hass, period0, "sensor.test1", attributes, seq ) + wait_recording_done(hass) hist = history.get_significant_states( hass, period0 - timedelta.resolution, eight + timedelta.resolution ) @@ -927,6 +990,7 @@ def test_compile_hourly_sum_statistics_total_increasing( four, eight, states = record_meter_states( hass, period0, "sensor.test1", attributes, seq ) + wait_recording_done(hass) hist = history.get_significant_states( hass, period0 - timedelta.resolution, eight + timedelta.resolution ) @@ -1016,6 +1080,7 @@ def test_compile_hourly_sum_statistics_total_increasing_small_dip( four, eight, states = record_meter_states( hass, period0, "sensor.test1", attributes, seq ) + wait_recording_done(hass) hist = history.get_significant_states( hass, period0 - timedelta.resolution, eight + timedelta.resolution ) @@ -1118,6 +1183,7 @@ def test_compile_hourly_energy_statistics_unsupported(hass_recorder, caplog): states = {**states, **_states} _, _, _states = record_meter_states(hass, period0, "sensor.test3", sns3_attr, seq3) states = {**states, **_states} + wait_recording_done(hass) hist = history.get_significant_states( hass, period0 - timedelta.resolution, eight + timedelta.resolution @@ -1207,6 +1273,7 @@ def test_compile_hourly_energy_statistics_multiple(hass_recorder, caplog): states = {**states, **_states} _, _, _states = record_meter_states(hass, period0, "sensor.test3", sns3_attr, seq3) states = {**states, **_states} + wait_recording_done(hass) hist = history.get_significant_states( hass, period0 - timedelta.resolution, eight + timedelta.resolution ) @@ -3164,7 +3231,6 @@ def record_meter_states(hass, zero, entity_id, _attributes, seq): def set_state(entity_id, state, **kwargs): """Set the state.""" hass.states.set(entity_id, state, **kwargs) - wait_recording_done(hass) return hass.states.get(entity_id) one = zero + timedelta(seconds=15 * 5) # 00:01:15