Use floats instead of datetime in statistics (#132746)

* Use floats instead of datetime in statistics

* check if debug log
pull/132869/head
G Johansson 2024-12-10 19:03:43 +01:00 committed by GitHub
parent 7fb5b17ac5
commit 76b73fa9b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 120 additions and 116 deletions

View File

@ -9,6 +9,7 @@ from datetime import datetime, timedelta
import logging
import math
import statistics
import time
from typing import Any, cast
import voluptuous as vol
@ -100,9 +101,7 @@ STAT_VARIANCE = "variance"
def _callable_characteristic_fn(
characteristic: str, binary: bool
) -> Callable[
[deque[bool | float], deque[datetime], int], float | int | datetime | None
]:
) -> Callable[[deque[bool | float], deque[float], int], float | int | datetime | None]:
"""Return the function callable of one characteristic function."""
Callable[[deque[bool | float], deque[datetime], int], datetime | int | float | None]
if binary:
@ -114,45 +113,41 @@ def _callable_characteristic_fn(
def _stat_average_linear(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) == 1:
return states[0]
if len(states) >= 2:
area: float = 0
for i in range(1, len(states)):
area += (
0.5
* (states[i] + states[i - 1])
* (ages[i] - ages[i - 1]).total_seconds()
)
age_range_seconds = (ages[-1] - ages[0]).total_seconds()
area += 0.5 * (states[i] + states[i - 1]) * (ages[i] - ages[i - 1])
age_range_seconds = ages[-1] - ages[0]
return area / age_range_seconds
return None
def _stat_average_step(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) == 1:
return states[0]
if len(states) >= 2:
area: float = 0
for i in range(1, len(states)):
area += states[i - 1] * (ages[i] - ages[i - 1]).total_seconds()
age_range_seconds = (ages[-1] - ages[0]).total_seconds()
area += states[i - 1] * (ages[i] - ages[i - 1])
age_range_seconds = ages[-1] - ages[0]
return area / age_range_seconds
return None
def _stat_average_timeless(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
return _stat_mean(states, ages, percentile)
def _stat_change(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) > 0:
return states[-1] - states[0]
@ -160,7 +155,7 @@ def _stat_change(
def _stat_change_sample(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) > 1:
return (states[-1] - states[0]) / (len(states) - 1)
@ -168,55 +163,55 @@ def _stat_change_sample(
def _stat_change_second(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) > 1:
age_range_seconds = (ages[-1] - ages[0]).total_seconds()
age_range_seconds = ages[-1] - ages[0]
if age_range_seconds > 0:
return (states[-1] - states[0]) / age_range_seconds
return None
def _stat_count(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> int | None:
return len(states)
def _stat_datetime_newest(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> datetime | None:
if len(states) > 0:
return ages[-1]
return dt_util.utc_from_timestamp(ages[-1])
return None
def _stat_datetime_oldest(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> datetime | None:
if len(states) > 0:
return ages[0]
return dt_util.utc_from_timestamp(ages[0])
return None
def _stat_datetime_value_max(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> datetime | None:
if len(states) > 0:
return ages[states.index(max(states))]
return dt_util.utc_from_timestamp(ages[states.index(max(states))])
return None
def _stat_datetime_value_min(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> datetime | None:
if len(states) > 0:
return ages[states.index(min(states))]
return dt_util.utc_from_timestamp(ages[states.index(min(states))])
return None
def _stat_distance_95_percent_of_values(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) >= 1:
return (
@ -226,7 +221,7 @@ def _stat_distance_95_percent_of_values(
def _stat_distance_99_percent_of_values(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) >= 1:
return (
@ -236,7 +231,7 @@ def _stat_distance_99_percent_of_values(
def _stat_distance_absolute(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) > 0:
return max(states) - min(states)
@ -244,7 +239,7 @@ def _stat_distance_absolute(
def _stat_mean(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) > 0:
return statistics.mean(states)
@ -252,7 +247,7 @@ def _stat_mean(
def _stat_mean_circular(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) > 0:
sin_sum = sum(math.sin(math.radians(x)) for x in states)
@ -262,7 +257,7 @@ def _stat_mean_circular(
def _stat_median(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) > 0:
return statistics.median(states)
@ -270,7 +265,7 @@ def _stat_median(
def _stat_noisiness(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) == 1:
return 0.0
@ -282,7 +277,7 @@ def _stat_noisiness(
def _stat_percentile(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) == 1:
return states[0]
@ -293,7 +288,7 @@ def _stat_percentile(
def _stat_standard_deviation(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) == 1:
return 0.0
@ -303,7 +298,7 @@ def _stat_standard_deviation(
def _stat_sum(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) > 0:
return sum(states)
@ -311,7 +306,7 @@ def _stat_sum(
def _stat_sum_differences(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) == 1:
return 0.0
@ -323,7 +318,7 @@ def _stat_sum_differences(
def _stat_sum_differences_nonnegative(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) == 1:
return 0.0
@ -336,13 +331,13 @@ def _stat_sum_differences_nonnegative(
def _stat_total(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
return _stat_sum(states, ages, percentile)
def _stat_value_max(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) > 0:
return max(states)
@ -350,7 +345,7 @@ def _stat_value_max(
def _stat_value_min(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) > 0:
return min(states)
@ -358,7 +353,7 @@ def _stat_value_min(
def _stat_variance(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) == 1:
return 0.0
@ -371,7 +366,7 @@ def _stat_variance(
def _stat_binary_average_step(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) == 1:
return 100.0 * int(states[0] is True)
@ -379,50 +374,50 @@ def _stat_binary_average_step(
on_seconds: float = 0
for i in range(1, len(states)):
if states[i - 1] is True:
on_seconds += (ages[i] - ages[i - 1]).total_seconds()
age_range_seconds = (ages[-1] - ages[0]).total_seconds()
on_seconds += ages[i] - ages[i - 1]
age_range_seconds = ages[-1] - ages[0]
return 100 / age_range_seconds * on_seconds
return None
def _stat_binary_average_timeless(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
return _stat_binary_mean(states, ages, percentile)
def _stat_binary_count(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> int | None:
return len(states)
def _stat_binary_count_on(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> int | None:
return states.count(True)
def _stat_binary_count_off(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> int | None:
return states.count(False)
def _stat_binary_datetime_newest(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> datetime | None:
return _stat_datetime_newest(states, ages, percentile)
def _stat_binary_datetime_oldest(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> datetime | None:
return _stat_datetime_oldest(states, ages, percentile)
def _stat_binary_mean(
states: deque[bool | float], ages: deque[datetime], percentile: int
states: deque[bool | float], ages: deque[float], percentile: int
) -> float | None:
if len(states) > 0:
return 100.0 / len(states) * states.count(True)
@ -630,12 +625,8 @@ async def async_setup_entry(
sampling_size = int(sampling_size)
max_age = None
if max_age_input := entry.options.get(CONF_MAX_AGE):
max_age = timedelta(
hours=max_age_input["hours"],
minutes=max_age_input["minutes"],
seconds=max_age_input["seconds"],
)
if max_age := entry.options.get(CONF_MAX_AGE):
max_age = timedelta(**max_age)
async_add_entities(
[
@ -688,20 +679,22 @@ class StatisticsSensor(SensorEntity):
)
self._state_characteristic: str = state_characteristic
self._samples_max_buffer_size: int | None = samples_max_buffer_size
self._samples_max_age: timedelta | None = samples_max_age
self._samples_max_age: float | None = (
samples_max_age.total_seconds() if samples_max_age else None
)
self.samples_keep_last: bool = samples_keep_last
self._precision: int = precision
self._percentile: int = percentile
self._attr_available: bool = False
self.states: deque[float | bool] = deque(maxlen=self._samples_max_buffer_size)
self.ages: deque[datetime] = deque(maxlen=self._samples_max_buffer_size)
self.states: deque[float | bool] = deque(maxlen=samples_max_buffer_size)
self.ages: deque[float] = deque(maxlen=samples_max_buffer_size)
self._attr_extra_state_attributes = {}
self._state_characteristic_fn: Callable[
[deque[bool | float], deque[datetime], int],
[deque[bool | float], deque[float], int],
float | int | datetime | None,
] = _callable_characteristic_fn(self._state_characteristic, self.is_binary)
] = _callable_characteristic_fn(state_characteristic, self.is_binary)
self._update_listener: CALLBACK_TYPE | None = None
self._preview_callback: Callable[[str, Mapping[str, Any]], None] | None = None
@ -807,7 +800,7 @@ class StatisticsSensor(SensorEntity):
self.states.append(new_state.state == "on")
else:
self.states.append(float(new_state.state))
self.ages.append(new_state.last_reported)
self.ages.append(new_state.last_reported_timestamp)
self._attr_extra_state_attributes[STAT_SOURCE_VALUE_VALID] = True
except ValueError:
self._attr_extra_state_attributes[STAT_SOURCE_VALUE_VALID] = False
@ -840,27 +833,24 @@ class StatisticsSensor(SensorEntity):
base_unit: str | None = new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
unit: str | None = None
if self.is_binary and self._state_characteristic in STATS_BINARY_PERCENTAGE:
stat_type = self._state_characteristic
if self.is_binary and stat_type in STATS_BINARY_PERCENTAGE:
unit = PERCENTAGE
elif not base_unit:
unit = None
elif self._state_characteristic in STATS_NUMERIC_RETAIN_UNIT:
elif stat_type in STATS_NUMERIC_RETAIN_UNIT:
unit = base_unit
elif (
self._state_characteristic in STATS_NOT_A_NUMBER
or self._state_characteristic
in (
STAT_COUNT,
STAT_COUNT_BINARY_ON,
STAT_COUNT_BINARY_OFF,
)
elif stat_type in STATS_NOT_A_NUMBER or stat_type in (
STAT_COUNT,
STAT_COUNT_BINARY_ON,
STAT_COUNT_BINARY_OFF,
):
unit = None
elif self._state_characteristic == STAT_VARIANCE:
elif stat_type == STAT_VARIANCE:
unit = base_unit + "²"
elif self._state_characteristic == STAT_CHANGE_SAMPLE:
elif stat_type == STAT_CHANGE_SAMPLE:
unit = base_unit + "/sample"
elif self._state_characteristic == STAT_CHANGE_SECOND:
elif stat_type == STAT_CHANGE_SECOND:
unit = base_unit + "/s"
return unit
@ -876,9 +866,10 @@ class StatisticsSensor(SensorEntity):
"""
device_class: SensorDeviceClass | None = None
if self._state_characteristic in STATS_DATETIME:
stat_type = self._state_characteristic
if stat_type in STATS_DATETIME:
return SensorDeviceClass.TIMESTAMP
if self._state_characteristic in STATS_NUMERIC_RETAIN_UNIT:
if stat_type in STATS_NUMERIC_RETAIN_UNIT:
device_class = new_state.attributes.get(ATTR_DEVICE_CLASS)
if device_class is None:
return None
@ -917,55 +908,60 @@ class StatisticsSensor(SensorEntity):
return None
return SensorStateClass.MEASUREMENT
def _purge_old_states(self, max_age: timedelta) -> None:
def _purge_old_states(self, max_age: float) -> None:
"""Remove states which are older than a given age."""
now = dt_util.utcnow()
now_timestamp = time.time()
debug = _LOGGER.isEnabledFor(logging.DEBUG)
_LOGGER.debug(
"%s: purging records older then %s(%s)(keep_last_sample: %s)",
self.entity_id,
dt_util.as_local(now - max_age),
self._samples_max_age,
self.samples_keep_last,
)
if debug:
_LOGGER.debug(
"%s: purging records older then %s(%s)(keep_last_sample: %s)",
self.entity_id,
dt_util.as_local(dt_util.utc_from_timestamp(now_timestamp - max_age)),
self._samples_max_age,
self.samples_keep_last,
)
while self.ages and (now - self.ages[0]) > max_age:
while self.ages and (now_timestamp - self.ages[0]) > max_age:
if self.samples_keep_last and len(self.ages) == 1:
# Under normal circumstance this will not be executed, as a purge will not
# be scheduled for the last value if samples_keep_last is enabled.
# If this happens to be called outside normal scheduling logic or a
# source sensor update, this ensures the last value is preserved.
_LOGGER.debug(
"%s: preserving expired record with datetime %s(%s)",
self.entity_id,
dt_util.as_local(self.ages[0]),
(now - self.ages[0]),
)
if debug:
_LOGGER.debug(
"%s: preserving expired record with datetime %s(%s)",
self.entity_id,
dt_util.as_local(dt_util.utc_from_timestamp(self.ages[0])),
dt_util.utc_from_timestamp(now_timestamp - self.ages[0]),
)
break
_LOGGER.debug(
"%s: purging record with datetime %s(%s)",
self.entity_id,
dt_util.as_local(self.ages[0]),
(now - self.ages[0]),
)
if debug:
_LOGGER.debug(
"%s: purging record with datetime %s(%s)",
self.entity_id,
dt_util.as_local(dt_util.utc_from_timestamp(self.ages[0])),
dt_util.utc_from_timestamp(now_timestamp - self.ages[0]),
)
self.ages.popleft()
self.states.popleft()
@callback
def _async_next_to_purge_timestamp(self) -> datetime | None:
def _async_next_to_purge_timestamp(self) -> float | None:
"""Find the timestamp when the next purge would occur."""
if self.ages and self._samples_max_age:
if self.samples_keep_last and len(self.ages) == 1:
# Preserve the most recent entry if it is the only value.
# Do not schedule another purge. When a new source
# value is inserted it will restart purge cycle.
_LOGGER.debug(
"%s: skipping purge cycle for last record with datetime %s(%s)",
self.entity_id,
dt_util.as_local(self.ages[0]),
(dt_util.utcnow() - self.ages[0]),
)
if _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug(
"%s: skipping purge cycle for last record with datetime %s(%s)",
self.entity_id,
dt_util.as_local(dt_util.utc_from_timestamp(self.ages[0])),
(dt_util.utcnow() - dt_util.utc_from_timestamp(self.ages[0])),
)
return None
# Take the oldest entry from the ages list and add the configured max_age.
# If executed after purging old states, the result is the next timestamp
@ -990,10 +986,17 @@ class StatisticsSensor(SensorEntity):
# By basing updates off the timestamps of sampled data we avoid updating
# when none of the observed entities change.
if timestamp := self._async_next_to_purge_timestamp():
_LOGGER.debug("%s: scheduling update at %s", self.entity_id, timestamp)
if _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug(
"%s: scheduling update at %s",
self.entity_id,
dt_util.utc_from_timestamp(timestamp),
)
self._async_cancel_update_listener()
self._update_listener = async_track_point_in_utc_time(
self.hass, self._async_scheduled_update, timestamp
self.hass,
self._async_scheduled_update,
dt_util.utc_from_timestamp(timestamp),
)
@callback
@ -1017,9 +1020,11 @@ class StatisticsSensor(SensorEntity):
"""Fetch the states from the database."""
_LOGGER.debug("%s: initializing values from the database", self.entity_id)
lower_entity_id = self._source_entity_id.lower()
if self._samples_max_age is not None:
if (max_age := self._samples_max_age) is not None:
start_date = (
dt_util.utcnow() - self._samples_max_age - timedelta(microseconds=1)
dt_util.utcnow()
- timedelta(seconds=max_age)
- timedelta(microseconds=1)
)
_LOGGER.debug(
"%s: retrieve records not older then %s",
@ -1071,11 +1076,10 @@ class StatisticsSensor(SensorEntity):
len(self.states) / self._samples_max_buffer_size, 2
)
if self._samples_max_age is not None:
if (max_age := self._samples_max_age) is not None:
if len(self.states) >= 1:
self._attr_extra_state_attributes[STAT_AGE_COVERAGE_RATIO] = round(
(self.ages[-1] - self.ages[0]).total_seconds()
/ self._samples_max_age.total_seconds(),
(self.ages[-1] - self.ages[0]) / max_age,
2,
)
else: