Optimize fetching statistics by avoiding recalculating time boundaries (#87859)
* predictive * fix conversion error * fix conversion error * fix conversion error * convert day to use new algo * reducepull/87896/head
parent
34e2751f14
commit
86a93e9fce
|
@ -5,7 +5,7 @@ from collections import defaultdict
|
|||
from collections.abc import Callable, Iterable, Mapping, Sequence
|
||||
import contextlib
|
||||
import dataclasses
|
||||
from datetime import date, datetime, timedelta
|
||||
from datetime import datetime, timedelta
|
||||
from functools import lru_cache, partial
|
||||
from itertools import chain, groupby
|
||||
import json
|
||||
|
@ -1088,32 +1088,34 @@ def reduce_day_ts_factory() -> (
|
|||
]
|
||||
):
|
||||
"""Return functions to match same day and day start end."""
|
||||
_boundries: tuple[float, float] = (0, 0)
|
||||
|
||||
# We have to recreate _local_from_timestamp in the closure in case the timezone changes
|
||||
_local_from_timestamp = partial(
|
||||
datetime.fromtimestamp, tz=dt_util.DEFAULT_TIME_ZONE
|
||||
)
|
||||
# We create _as_local_cached in the closure in case the timezone changes
|
||||
_as_local_cached = lru_cache(maxsize=6)(_local_from_timestamp)
|
||||
|
||||
def _as_local_date(time: float) -> date:
|
||||
"""Return the local date of a datetime."""
|
||||
return _local_from_timestamp(time).date()
|
||||
|
||||
_as_local_date_cached = lru_cache(maxsize=6)(_as_local_date)
|
||||
|
||||
def _same_day_ts(time1: float, time2: float) -> bool:
|
||||
"""Return True if time1 and time2 are in the same date."""
|
||||
return _as_local_date_cached(time1) == _as_local_date_cached(time2)
|
||||
nonlocal _boundries
|
||||
if not _boundries[0] <= time1 < _boundries[1]:
|
||||
_boundries = _day_start_end_ts_cached(time1)
|
||||
return _boundries[0] <= time2 < _boundries[1]
|
||||
|
||||
def _day_start_end_ts(time: float) -> tuple[float, float]:
|
||||
"""Return the start and end of the period (day) time is within."""
|
||||
start = dt_util.as_utc(
|
||||
_as_local_cached(time).replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
start_local = _local_from_timestamp(time).replace(
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
)
|
||||
return (
|
||||
start_local.astimezone(dt_util.UTC).timestamp(),
|
||||
(start_local + timedelta(days=1)).astimezone(dt_util.UTC).timestamp(),
|
||||
)
|
||||
end = start + timedelta(days=1)
|
||||
return (start.timestamp(), end.timestamp())
|
||||
|
||||
return _same_day_ts, _day_start_end_ts
|
||||
# We create _day_start_end_ts_cached in the closure in case the timezone changes
|
||||
_day_start_end_ts_cached = lru_cache(maxsize=6)(_day_start_end_ts)
|
||||
|
||||
return _same_day_ts, _day_start_end_ts_cached
|
||||
|
||||
|
||||
def _reduce_statistics_per_day(
|
||||
|
@ -1134,38 +1136,36 @@ def reduce_week_ts_factory() -> (
|
|||
]
|
||||
):
|
||||
"""Return functions to match same week and week start end."""
|
||||
_boundries: tuple[float, float] = (0, 0)
|
||||
|
||||
# We have to recreate _local_from_timestamp in the closure in case the timezone changes
|
||||
_local_from_timestamp = partial(
|
||||
datetime.fromtimestamp, tz=dt_util.DEFAULT_TIME_ZONE
|
||||
)
|
||||
# We create _as_local_cached in the closure in case the timezone changes
|
||||
_as_local_cached = lru_cache(maxsize=6)(_local_from_timestamp)
|
||||
|
||||
def _as_local_isocalendar(
|
||||
time: float,
|
||||
) -> tuple: # Need python3.11 for isocalendar typing
|
||||
"""Return the local isocalendar of a datetime."""
|
||||
return _local_from_timestamp(time).isocalendar()
|
||||
|
||||
_as_local_isocalendar_cached = lru_cache(maxsize=6)(_as_local_isocalendar)
|
||||
|
||||
def _same_week_ts(time1: float, time2: float) -> bool:
|
||||
"""Return True if time1 and time2 are in the same year and week."""
|
||||
date1 = _as_local_isocalendar_cached(time1)
|
||||
date2 = _as_local_isocalendar_cached(time2)
|
||||
return (date1.year, date1.week) == (date2.year, date2.week) # type: ignore[attr-defined]
|
||||
nonlocal _boundries
|
||||
if not _boundries[0] <= time1 < _boundries[1]:
|
||||
_boundries = _week_start_end_ts_cached(time1)
|
||||
return _boundries[0] <= time2 < _boundries[1]
|
||||
|
||||
def _week_start_end_ts(time: float) -> tuple[float, float]:
|
||||
"""Return the start and end of the period (week) time is within."""
|
||||
time_local = _as_local_cached(time)
|
||||
nonlocal _boundries
|
||||
time_local = _local_from_timestamp(time)
|
||||
start_local = time_local.replace(
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
) - timedelta(days=time_local.weekday())
|
||||
start = dt_util.as_utc(start_local)
|
||||
end = dt_util.as_utc(start_local + timedelta(days=7))
|
||||
return (start.timestamp(), end.timestamp())
|
||||
return (
|
||||
start_local.astimezone(dt_util.UTC).timestamp(),
|
||||
(start_local + timedelta(days=7)).astimezone(dt_util.UTC).timestamp(),
|
||||
)
|
||||
|
||||
return _same_week_ts, _week_start_end_ts
|
||||
# We create _week_start_end_ts_cached in the closure in case the timezone changes
|
||||
_week_start_end_ts_cached = lru_cache(maxsize=6)(_week_start_end_ts)
|
||||
|
||||
return _same_week_ts, _week_start_end_ts_cached
|
||||
|
||||
|
||||
def _reduce_statistics_per_week(
|
||||
|
@ -1186,30 +1186,38 @@ def reduce_month_ts_factory() -> (
|
|||
]
|
||||
):
|
||||
"""Return functions to match same month and month start end."""
|
||||
_boundries: tuple[float, float] = (0, 0)
|
||||
|
||||
# We have to recreate _local_from_timestamp in the closure in case the timezone changes
|
||||
_local_from_timestamp = partial(
|
||||
datetime.fromtimestamp, tz=dt_util.DEFAULT_TIME_ZONE
|
||||
)
|
||||
# We create _as_local_cached in the closure in case the timezone changes
|
||||
_as_local_cached = lru_cache(maxsize=6)(_local_from_timestamp)
|
||||
|
||||
def _same_month_ts(time1: float, time2: float) -> bool:
|
||||
"""Return True if time1 and time2 are in the same year and month."""
|
||||
date1 = _as_local_cached(time1)
|
||||
date2 = _as_local_cached(time2)
|
||||
return (date1.year, date1.month) == (date2.year, date2.month)
|
||||
nonlocal _boundries
|
||||
if not _boundries[0] <= time1 < _boundries[1]:
|
||||
_boundries = _month_start_end_ts_cached(time1)
|
||||
return _boundries[0] <= time2 < _boundries[1]
|
||||
|
||||
def _month_start_end_ts(time: float) -> tuple[float, float]:
|
||||
"""Return the start and end of the period (month) time is within."""
|
||||
start_local = _as_local_cached(time).replace(
|
||||
start_local = _local_from_timestamp(time).replace(
|
||||
day=1, hour=0, minute=0, second=0, microsecond=0
|
||||
)
|
||||
start = dt_util.as_utc(start_local)
|
||||
end_local = (start_local + timedelta(days=31)).replace(day=1)
|
||||
end = dt_util.as_utc(end_local)
|
||||
return (start.timestamp(), end.timestamp())
|
||||
# We add 4 days to the end to make sure we are in the next month
|
||||
end_local = (start_local.replace(day=28) + timedelta(days=4)).replace(
|
||||
day=1, hour=0, minute=0, second=0, microsecond=0
|
||||
)
|
||||
return (
|
||||
start_local.astimezone(dt_util.UTC).timestamp(),
|
||||
end_local.astimezone(dt_util.UTC).timestamp(),
|
||||
)
|
||||
|
||||
return _same_month_ts, _month_start_end_ts
|
||||
# We create _month_start_end_ts_cached in the closure in case the timezone changes
|
||||
_month_start_end_ts_cached = lru_cache(maxsize=6)(_month_start_end_ts)
|
||||
|
||||
return _same_month_ts, _month_start_end_ts_cached
|
||||
|
||||
|
||||
def _reduce_statistics_per_month(
|
||||
|
|
Loading…
Reference in New Issue