Break apart recorder into tasks and core modules (#71222)

pull/71231/head
J. Nick Koston 2022-05-02 23:53:56 -05:00 committed by GitHub
parent c594de25f7
commit 29bda196b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1583 additions and 1404 deletions

View File

@ -179,6 +179,7 @@ homeassistant.components.rdw.*
homeassistant.components.recollect_waste.*
homeassistant.components.recorder
homeassistant.components.recorder.const
homeassistant.components.recorder.core
homeassistant.components.recorder.backup
homeassistant.components.recorder.executor
homeassistant.components.recorder.history
@ -189,6 +190,7 @@ homeassistant.components.recorder.repack
homeassistant.components.recorder.run_history
homeassistant.components.recorder.statistics
homeassistant.components.recorder.system_health
homeassistant.components.recorder.tasks
homeassistant.components.recorder.util
homeassistant.components.recorder.websocket_api
homeassistant.components.remote.*

File diff suppressed because it is too large Load Diff

View File

@ -28,3 +28,12 @@ DB_WORKER_PREFIX = "DbWorker"
JSON_DUMP: Final = partial(json.dumps, cls=JSONEncoder, separators=(",", ":"))
ALL_DOMAIN_EXCLUDE_ATTRS = {ATTR_ATTRIBUTION, ATTR_RESTORED, ATTR_SUPPORTED_FEATURES}
ATTR_KEEP_DAYS = "keep_days"
ATTR_REPACK = "repack"
ATTR_APPLY_FILTER = "apply_filter"
KEEPALIVE_TIME = 30
EXCLUDE_ATTRIBUTES = f"{DOMAIN}_exclude_attributes_by_domain"

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,250 @@
"""Support for recording details."""
from __future__ import annotations
import abc
import asyncio
from collections.abc import Callable, Iterable
from dataclasses import dataclass
from datetime import datetime
import threading
from typing import TYPE_CHECKING, Any
from homeassistant.core import Event
from . import purge, statistics
from .const import DOMAIN, EXCLUDE_ATTRIBUTES
from .models import StatisticData, StatisticMetaData
from .util import periodic_db_cleanups
if TYPE_CHECKING:
from .core import Recorder
class RecorderTask(abc.ABC):
"""ABC for recorder tasks."""
commit_before = True
@abc.abstractmethod
def run(self, instance: Recorder) -> None:
"""Handle the task."""
@dataclass
class ClearStatisticsTask(RecorderTask):
"""Object to store statistics_ids which for which to remove statistics."""
statistic_ids: list[str]
def run(self, instance: Recorder) -> None:
"""Handle the task."""
statistics.clear_statistics(instance, self.statistic_ids)
@dataclass
class UpdateStatisticsMetadataTask(RecorderTask):
"""Object to store statistics_id and unit for update of statistics metadata."""
statistic_id: str
unit_of_measurement: str | None
def run(self, instance: Recorder) -> None:
"""Handle the task."""
statistics.update_statistics_metadata(
instance, self.statistic_id, self.unit_of_measurement
)
@dataclass
class PurgeTask(RecorderTask):
"""Object to store information about purge task."""
purge_before: datetime
repack: bool
apply_filter: bool
def run(self, instance: Recorder) -> None:
"""Purge the database."""
assert instance.get_session is not None
if purge.purge_old_data(
instance, self.purge_before, self.repack, self.apply_filter
):
with instance.get_session() as session:
instance.run_history.load_from_db(session)
# We always need to do the db cleanups after a purge
# is finished to ensure the WAL checkpoint and other
# tasks happen after a vacuum.
periodic_db_cleanups(instance)
return
# Schedule a new purge task if this one didn't finish
instance.queue.put(PurgeTask(self.purge_before, self.repack, self.apply_filter))
@dataclass
class PurgeEntitiesTask(RecorderTask):
"""Object to store entity information about purge task."""
entity_filter: Callable[[str], bool]
def run(self, instance: Recorder) -> None:
"""Purge entities from the database."""
if purge.purge_entity_data(instance, self.entity_filter):
return
# Schedule a new purge task if this one didn't finish
instance.queue.put(PurgeEntitiesTask(self.entity_filter))
@dataclass
class PerodicCleanupTask(RecorderTask):
"""An object to insert into the recorder to trigger cleanup tasks when auto purge is disabled."""
def run(self, instance: Recorder) -> None:
"""Handle the task."""
periodic_db_cleanups(instance)
@dataclass
class StatisticsTask(RecorderTask):
"""An object to insert into the recorder queue to run a statistics task."""
start: datetime
def run(self, instance: Recorder) -> None:
"""Run statistics task."""
if statistics.compile_statistics(instance, self.start):
return
# Schedule a new statistics task if this one didn't finish
instance.queue.put(StatisticsTask(self.start))
@dataclass
class ExternalStatisticsTask(RecorderTask):
"""An object to insert into the recorder queue to run an external statistics task."""
metadata: StatisticMetaData
statistics: Iterable[StatisticData]
def run(self, instance: Recorder) -> None:
"""Run statistics task."""
if statistics.add_external_statistics(instance, self.metadata, self.statistics):
return
# Schedule a new statistics task if this one didn't finish
instance.queue.put(ExternalStatisticsTask(self.metadata, self.statistics))
@dataclass
class AdjustStatisticsTask(RecorderTask):
"""An object to insert into the recorder queue to run an adjust statistics task."""
statistic_id: str
start_time: datetime
sum_adjustment: float
def run(self, instance: Recorder) -> None:
"""Run statistics task."""
if statistics.adjust_statistics(
instance,
self.statistic_id,
self.start_time,
self.sum_adjustment,
):
return
# Schedule a new adjust statistics task if this one didn't finish
instance.queue.put(
AdjustStatisticsTask(
self.statistic_id, self.start_time, self.sum_adjustment
)
)
@dataclass
class WaitTask(RecorderTask):
"""An object to insert into the recorder queue to tell it set the _queue_watch event."""
commit_before = False
def run(self, instance: Recorder) -> None:
"""Handle the task."""
instance._queue_watch.set() # pylint: disable=[protected-access]
@dataclass
class DatabaseLockTask(RecorderTask):
"""An object to insert into the recorder queue to prevent writes to the database."""
database_locked: asyncio.Event
database_unlock: threading.Event
queue_overflow: bool
def run(self, instance: Recorder) -> None:
"""Handle the task."""
instance._lock_database(self) # pylint: disable=[protected-access]
@dataclass
class StopTask(RecorderTask):
"""An object to insert into the recorder queue to stop the event handler."""
commit_before = False
def run(self, instance: Recorder) -> None:
"""Handle the task."""
instance.stop_requested = True
@dataclass
class EventTask(RecorderTask):
"""An event to be processed."""
event: Event
commit_before = False
def run(self, instance: Recorder) -> None:
"""Handle the task."""
# pylint: disable-next=[protected-access]
instance._process_one_event(self.event)
@dataclass
class KeepAliveTask(RecorderTask):
"""A keep alive to be sent."""
commit_before = False
def run(self, instance: Recorder) -> None:
"""Handle the task."""
# pylint: disable-next=[protected-access]
instance._send_keep_alive()
@dataclass
class CommitTask(RecorderTask):
"""Commit the event session."""
commit_before = False
def run(self, instance: Recorder) -> None:
"""Handle the task."""
# pylint: disable-next=[protected-access]
instance._commit_event_session_or_retry()
@dataclass
class AddRecorderPlatformTask(RecorderTask):
"""Add a recorder platform."""
domain: str
platform: Any
commit_before = False
def run(self, instance: Recorder) -> None:
"""Handle the task."""
hass = instance.hass
domain = self.domain
platform = self.platform
platforms: dict[str, Any] = hass.data[DOMAIN]
platforms[domain] = platform
if hasattr(self.platform, "exclude_attributes"):
hass.data[EXCLUDE_ATTRIBUTES][domain] = platform.exclude_attributes(hass)

View File

@ -1732,6 +1732,17 @@ no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.recorder.core]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.recorder.backup]
check_untyped_defs = true
disallow_incomplete_defs = true
@ -1842,6 +1853,17 @@ no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.recorder.tasks]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.recorder.util]
check_untyped_defs = true
disallow_incomplete_defs = true

View File

@ -459,23 +459,28 @@ def test_get_significant_states_only(hass_history):
points.append(start + timedelta(minutes=i))
states = []
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=start):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=start
):
set_state("123", attributes={"attribute": 10.64})
with patch(
"homeassistant.components.recorder.dt_util.utcnow", return_value=points[0]
"homeassistant.components.recorder.core.dt_util.utcnow",
return_value=points[0],
):
# Attributes are different, state not
states.append(set_state("123", attributes={"attribute": 21.42}))
with patch(
"homeassistant.components.recorder.dt_util.utcnow", return_value=points[1]
"homeassistant.components.recorder.core.dt_util.utcnow",
return_value=points[1],
):
# state is different, attributes not
states.append(set_state("32", attributes={"attribute": 21.42}))
with patch(
"homeassistant.components.recorder.dt_util.utcnow", return_value=points[2]
"homeassistant.components.recorder.core.dt_util.utcnow",
return_value=points[2],
):
# everything is different
states.append(set_state("412", attributes={"attribute": 54.23}))
@ -536,7 +541,9 @@ def record_states(hass):
four = three + timedelta(seconds=1)
states = {therm: [], therm2: [], mp: [], mp2: [], mp3: [], script_c: []}
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=one
):
states[mp].append(
set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)})
)
@ -553,7 +560,9 @@ def record_states(hass):
set_state(therm, 20, attributes={"current_temperature": 19.5})
)
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=two
):
# This state will be skipped only different in time
set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt3)})
# This state will be skipped because domain is excluded
@ -568,7 +577,9 @@ def record_states(hass):
set_state(therm2, 20, attributes={"current_temperature": 19})
)
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=three
):
states[mp].append(
set_state(mp, "Netflix", attributes={"media_title": str(sentinel.mt4)})
)

View File

@ -52,7 +52,7 @@ async def _async_get_states(
def _add_db_entries(
hass: ha.HomeAssistant, point: datetime, entity_ids: list[str]
) -> None:
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
for idx, entity_id in enumerate(entity_ids):
session.add(
Events(
@ -87,7 +87,9 @@ def _setup_get_states(hass):
"""Set up for testing get_states."""
states = []
now = dt_util.utcnow()
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=now):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=now
):
for i in range(5):
state = ha.State(
f"test.point_in_time_{i % 5}",
@ -102,7 +104,9 @@ def _setup_get_states(hass):
wait_recording_done(hass)
future = now + timedelta(seconds=1)
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=future):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=future
):
for i in range(5):
state = ha.State(
f"test.point_in_time_{i % 5}",
@ -122,7 +126,7 @@ def test_get_full_significant_states_with_session_entity_no_matches(hass_recorde
hass = hass_recorder()
now = dt_util.utcnow()
time_before_recorder_ran = now - timedelta(days=1000)
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
assert (
history.get_full_significant_states_with_session(
hass, session, time_before_recorder_ran, now, entity_ids=["demo.id"]
@ -148,7 +152,7 @@ def test_significant_states_with_session_entity_minimal_response_no_matches(
hass = hass_recorder()
now = dt_util.utcnow()
time_before_recorder_ran = now - timedelta(days=1000)
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
assert (
history.get_significant_states_with_session(
hass,
@ -197,11 +201,15 @@ def test_state_changes_during_period(hass_recorder, attributes, no_attributes, l
point = start + timedelta(seconds=1)
end = point + timedelta(seconds=1)
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=start):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=start
):
set_state("idle")
set_state("YouTube")
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=point):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=point
):
states = [
set_state("idle"),
set_state("Netflix"),
@ -209,7 +217,9 @@ def test_state_changes_during_period(hass_recorder, attributes, no_attributes, l
set_state("YouTube"),
]
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=end):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=end
):
set_state("Netflix")
set_state("Plex")
@ -235,11 +245,15 @@ def test_state_changes_during_period_descending(hass_recorder):
point = start + timedelta(seconds=1)
end = point + timedelta(seconds=1)
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=start):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=start
):
set_state("idle")
set_state("YouTube")
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=point):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=point
):
states = [
set_state("idle"),
set_state("Netflix"),
@ -247,7 +261,9 @@ def test_state_changes_during_period_descending(hass_recorder):
set_state("YouTube"),
]
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=end):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=end
):
set_state("Netflix")
set_state("Plex")
@ -277,14 +293,20 @@ def test_get_last_state_changes(hass_recorder):
point = start + timedelta(minutes=1)
point2 = point + timedelta(minutes=1)
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=start):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=start
):
set_state("1")
states = []
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=point):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=point
):
states.append(set_state("2"))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=point2):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=point2
):
states.append(set_state("3"))
hist = history.get_last_state_changes(hass, 2, entity_id)
@ -310,10 +332,14 @@ def test_ensure_state_can_be_copied(hass_recorder):
start = dt_util.utcnow() - timedelta(minutes=2)
point = start + timedelta(minutes=1)
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=start):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=start
):
set_state("1")
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=point):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=point
):
set_state("2")
hist = history.get_last_state_changes(hass, 2, entity_id)
@ -486,23 +512,28 @@ def test_get_significant_states_only(hass_recorder):
points.append(start + timedelta(minutes=i))
states = []
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=start):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=start
):
set_state("123", attributes={"attribute": 10.64})
with patch(
"homeassistant.components.recorder.dt_util.utcnow", return_value=points[0]
"homeassistant.components.recorder.core.dt_util.utcnow",
return_value=points[0],
):
# Attributes are different, state not
states.append(set_state("123", attributes={"attribute": 21.42}))
with patch(
"homeassistant.components.recorder.dt_util.utcnow", return_value=points[1]
"homeassistant.components.recorder.core.dt_util.utcnow",
return_value=points[1],
):
# state is different, attributes not
states.append(set_state("32", attributes={"attribute": 21.42}))
with patch(
"homeassistant.components.recorder.dt_util.utcnow", return_value=points[2]
"homeassistant.components.recorder.core.dt_util.utcnow",
return_value=points[2],
):
# everything is different
states.append(set_state("412", attributes={"attribute": 54.23}))
@ -547,7 +578,9 @@ def record_states(hass):
four = three + timedelta(seconds=1)
states = {therm: [], therm2: [], mp: [], mp2: [], mp3: [], script_c: []}
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=one
):
states[mp].append(
set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)})
)
@ -564,7 +597,9 @@ def record_states(hass):
set_state(therm, 20, attributes={"current_temperature": 19.5})
)
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=two
):
# This state will be skipped only different in time
set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt3)})
# This state will be skipped because domain is excluded
@ -579,7 +614,9 @@ def record_states(hass):
set_state(therm2, 20, attributes={"current_temperature": 19})
)
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=three
):
states[mp].append(
set_state(mp, "Netflix", attributes={"media_title": str(sentinel.mt4)})
)

View File

@ -20,7 +20,6 @@ from homeassistant.components.recorder import (
CONF_DB_URL,
CONFIG_SCHEMA,
DOMAIN,
KEEPALIVE_TIME,
SERVICE_DISABLE,
SERVICE_ENABLE,
SERVICE_PURGE,
@ -29,7 +28,7 @@ from homeassistant.components.recorder import (
Recorder,
get_instance,
)
from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.components.recorder.const import DATA_INSTANCE, KEEPALIVE_TIME
from homeassistant.components.recorder.models import (
EventData,
Events,
@ -196,7 +195,7 @@ async def test_saving_many_states(
with patch.object(
hass.data[DATA_INSTANCE].event_session, "expire_all"
) as expire_all, patch.object(recorder, "EXPIRE_AFTER_COMMITS", 2):
) as expire_all, patch.object(recorder.core, "EXPIRE_AFTER_COMMITS", 2):
for _ in range(3):
hass.states.async_set(entity_id, "on", attributes)
await async_wait_recording_done(hass)
@ -611,7 +610,7 @@ def test_saving_state_and_removing_entity(hass, hass_recorder):
def test_recorder_setup_failure(hass):
"""Test some exceptions."""
with patch.object(Recorder, "_setup_connection") as setup, patch(
"homeassistant.components.recorder.time.sleep"
"homeassistant.components.recorder.core.time.sleep"
):
setup.side_effect = ImportError("driver not found")
rec = _default_recorder(hass)
@ -625,7 +624,7 @@ def test_recorder_setup_failure(hass):
def test_recorder_setup_failure_without_event_listener(hass):
"""Test recorder setup failure when the event listener is not setup."""
with patch.object(Recorder, "_setup_connection") as setup, patch(
"homeassistant.components.recorder.time.sleep"
"homeassistant.components.recorder.core.time.sleep"
):
setup.side_effect = ImportError("driver not found")
rec = _default_recorder(hass)
@ -685,7 +684,7 @@ def test_auto_purge(hass_recorder):
with patch(
"homeassistant.components.recorder.purge.purge_old_data", return_value=True
) as purge_old_data, patch(
"homeassistant.components.recorder.periodic_db_cleanups"
"homeassistant.components.recorder.tasks.periodic_db_cleanups"
) as periodic_db_cleanups:
# Advance one day, and the purge task should run
test_time = test_time + timedelta(days=1)
@ -741,11 +740,11 @@ def test_auto_purge_auto_repack_on_second_sunday(hass_recorder):
run_tasks_at_time(hass, test_time)
with patch(
"homeassistant.components.recorder.is_second_sunday", return_value=True
"homeassistant.components.recorder.core.is_second_sunday", return_value=True
), patch(
"homeassistant.components.recorder.purge.purge_old_data", return_value=True
) as purge_old_data, patch(
"homeassistant.components.recorder.periodic_db_cleanups"
"homeassistant.components.recorder.tasks.periodic_db_cleanups"
) as periodic_db_cleanups:
# Advance one day, and the purge task should run
test_time = test_time + timedelta(days=1)
@ -779,11 +778,11 @@ def test_auto_purge_auto_repack_disabled_on_second_sunday(hass_recorder):
run_tasks_at_time(hass, test_time)
with patch(
"homeassistant.components.recorder.is_second_sunday", return_value=True
"homeassistant.components.recorder.core.is_second_sunday", return_value=True
), patch(
"homeassistant.components.recorder.purge.purge_old_data", return_value=True
) as purge_old_data, patch(
"homeassistant.components.recorder.periodic_db_cleanups"
"homeassistant.components.recorder.tasks.periodic_db_cleanups"
) as periodic_db_cleanups:
# Advance one day, and the purge task should run
test_time = test_time + timedelta(days=1)
@ -817,11 +816,12 @@ def test_auto_purge_no_auto_repack_on_not_second_sunday(hass_recorder):
run_tasks_at_time(hass, test_time)
with patch(
"homeassistant.components.recorder.is_second_sunday", return_value=False
"homeassistant.components.recorder.core.is_second_sunday",
return_value=False,
), patch(
"homeassistant.components.recorder.purge.purge_old_data", return_value=True
) as purge_old_data, patch(
"homeassistant.components.recorder.periodic_db_cleanups"
"homeassistant.components.recorder.tasks.periodic_db_cleanups"
) as periodic_db_cleanups:
# Advance one day, and the purge task should run
test_time = test_time + timedelta(days=1)
@ -856,7 +856,7 @@ def test_auto_purge_disabled(hass_recorder):
with patch(
"homeassistant.components.recorder.purge.purge_old_data", return_value=True
) as purge_old_data, patch(
"homeassistant.components.recorder.periodic_db_cleanups"
"homeassistant.components.recorder.tasks.periodic_db_cleanups"
) as periodic_db_cleanups:
# Advance one day, and the purge task should run
test_time = test_time + timedelta(days=1)
@ -924,7 +924,9 @@ def test_auto_statistics(hass_recorder):
def test_statistics_runs_initiated(hass_recorder):
"""Test statistics_runs is initiated when DB is created."""
now = dt_util.utcnow()
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=now):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=now
):
hass = hass_recorder()
wait_recording_done(hass)
@ -944,7 +946,9 @@ def test_compile_missing_statistics(tmpdir):
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=now):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=now
):
hass = get_test_home_assistant()
setup_component(hass, DOMAIN, {DOMAIN: {CONF_DB_URL: dburl}})
@ -963,7 +967,7 @@ def test_compile_missing_statistics(tmpdir):
hass.stop()
with patch(
"homeassistant.components.recorder.dt_util.utcnow",
"homeassistant.components.recorder.core.dt_util.utcnow",
return_value=now + timedelta(hours=1),
):
@ -1356,8 +1360,8 @@ async def test_database_lock_and_overflow(
instance: Recorder = hass.data[DATA_INSTANCE]
with patch.object(recorder, "MAX_QUEUE_BACKLOG", 1), patch.object(
recorder, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.1
with patch.object(recorder.core, "MAX_QUEUE_BACKLOG", 1), patch.object(
recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.1
):
await instance.lock_database()
@ -1382,7 +1386,7 @@ async def test_database_lock_timeout(hass, recorder_mock):
instance: Recorder = hass.data[DATA_INSTANCE]
class BlockQueue(recorder.RecorderTask):
class BlockQueue(recorder.tasks.RecorderTask):
event: threading.Event = threading.Event()
def run(self, instance: Recorder) -> None:
@ -1390,7 +1394,7 @@ async def test_database_lock_timeout(hass, recorder_mock):
block_task = BlockQueue()
instance.queue.put(block_task)
with patch.object(recorder, "DB_LOCK_TIMEOUT", 0.1):
with patch.object(recorder.core, "DB_LOCK_TIMEOUT", 0.1):
try:
with pytest.raises(TimeoutError):
await instance.lock_database()
@ -1435,7 +1439,7 @@ async def test_database_connection_keep_alive(
await instance.async_recorder_ready.wait()
async_fire_time_changed(
hass, dt_util.utcnow() + timedelta(seconds=recorder.KEEPALIVE_TIME)
hass, dt_util.utcnow() + timedelta(seconds=recorder.core.KEEPALIVE_TIME)
)
await async_wait_recording_done(hass)
assert "Sending keepalive" in caplog.text
@ -1452,7 +1456,7 @@ async def test_database_connection_keep_alive_disabled_on_sqlite(
await instance.async_recorder_ready.wait()
async_fire_time_changed(
hass, dt_util.utcnow() + timedelta(seconds=recorder.KEEPALIVE_TIME)
hass, dt_util.utcnow() + timedelta(seconds=recorder.core.KEEPALIVE_TIME)
)
await async_wait_recording_done(hass)
assert "Sending keepalive" not in caplog.text
@ -1482,7 +1486,7 @@ def test_deduplication_event_data_inside_commit_interval(hass_recorder, caplog):
# Patch STATE_ATTRIBUTES_ID_CACHE_SIZE since otherwise
# the CI can fail because the test takes too long to run
@patch("homeassistant.components.recorder.STATE_ATTRIBUTES_ID_CACHE_SIZE", 5)
@patch("homeassistant.components.recorder.core.STATE_ATTRIBUTES_ID_CACHE_SIZE", 5)
def test_deduplication_state_attributes_inside_commit_interval(hass_recorder, caplog):
"""Test deduplication of state attributes inside the commit interval."""
hass = hass_recorder()

View File

@ -44,7 +44,8 @@ async def test_schema_update_calls(hass):
assert recorder.util.async_migration_in_progress(hass) is False
with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch(
"homeassistant.components.recorder.create_engine", new=create_engine_test
"homeassistant.components.recorder.core.create_engine",
new=create_engine_test,
), patch(
"homeassistant.components.recorder.migration._apply_update",
wraps=migration._apply_update,
@ -67,10 +68,10 @@ async def test_migration_in_progress(hass):
"""Test that we can check for migration in progress."""
assert recorder.util.async_migration_in_progress(hass) is False
with patch(
"homeassistant.components.recorder.ALLOW_IN_MEMORY_DB",
True,
), patch("homeassistant.components.recorder.create_engine", new=create_engine_test):
with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True,), patch(
"homeassistant.components.recorder.core.create_engine",
new=create_engine_test,
):
await async_setup_component(
hass, "recorder", {"recorder": {"db_url": "sqlite://"}}
)
@ -86,7 +87,8 @@ async def test_database_migration_failed(hass):
assert recorder.util.async_migration_in_progress(hass) is False
with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch(
"homeassistant.components.recorder.create_engine", new=create_engine_test
"homeassistant.components.recorder.core.create_engine",
new=create_engine_test,
), patch(
"homeassistant.components.recorder.migration._apply_update",
side_effect=ValueError,
@ -125,7 +127,7 @@ async def test_database_migration_encounters_corruption(hass):
"homeassistant.components.recorder.migration.migrate_schema",
side_effect=sqlite3_exception,
), patch(
"homeassistant.components.recorder.move_away_broken_database"
"homeassistant.components.recorder.core.move_away_broken_database"
) as move_away:
await async_setup_component(
hass, "recorder", {"recorder": {"db_url": "sqlite://"}}
@ -149,7 +151,7 @@ async def test_database_migration_encounters_corruption_not_sqlite(hass):
"homeassistant.components.recorder.migration.migrate_schema",
side_effect=DatabaseError("statement", {}, []),
), patch(
"homeassistant.components.recorder.move_away_broken_database"
"homeassistant.components.recorder.core.move_away_broken_database"
) as move_away, patch(
"homeassistant.components.persistent_notification.create", side_effect=pn.create
) as mock_create, patch(
@ -176,10 +178,10 @@ async def test_events_during_migration_are_queued(hass):
assert recorder.util.async_migration_in_progress(hass) is False
with patch(
"homeassistant.components.recorder.ALLOW_IN_MEMORY_DB",
True,
), patch("homeassistant.components.recorder.create_engine", new=create_engine_test):
with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True,), patch(
"homeassistant.components.recorder.core.create_engine",
new=create_engine_test,
):
await async_setup_component(
hass,
"recorder",
@ -207,8 +209,9 @@ async def test_events_during_migration_queue_exhausted(hass):
assert recorder.util.async_migration_in_progress(hass) is False
with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch(
"homeassistant.components.recorder.create_engine", new=create_engine_test
), patch.object(recorder, "MAX_QUEUE_BACKLOG", 1):
"homeassistant.components.recorder.core.create_engine",
new=create_engine_test,
), patch.object(recorder.core, "MAX_QUEUE_BACKLOG", 1):
await async_setup_component(
hass,
"recorder",
@ -296,7 +299,8 @@ async def test_schema_migrate(hass, start_version):
migration_done.set()
with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch(
"homeassistant.components.recorder.create_engine", new=_create_engine_test
"homeassistant.components.recorder.core.create_engine",
new=_create_engine_test,
), patch(
"homeassistant.components.recorder.Recorder._setup_run",
side_effect=_mock_setup_run,

View File

@ -9,7 +9,6 @@ from sqlalchemy.exc import DatabaseError, OperationalError
from sqlalchemy.orm.session import Session
from homeassistant.components import recorder
from homeassistant.components.recorder import PurgeTask
from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE
from homeassistant.components.recorder.models import (
Events,
@ -20,6 +19,7 @@ from homeassistant.components.recorder.models import (
StatisticsShortTerm,
)
from homeassistant.components.recorder.purge import purge_old_data
from homeassistant.components.recorder.tasks import PurgeTask
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import EVENT_STATE_CHANGED, STATE_ON
from homeassistant.core import HomeAssistant
@ -128,7 +128,7 @@ async def test_purge_old_states_encouters_database_corruption(
sqlite3_exception.__cause__ = sqlite3.DatabaseError()
with patch(
"homeassistant.components.recorder.move_away_broken_database"
"homeassistant.components.recorder.core.move_away_broken_database"
) as move_away, patch(
"homeassistant.components.recorder.purge.purge_old_data",
side_effect=sqlite3_exception,
@ -406,7 +406,7 @@ async def test_purge_edge_case(
"""Test states and events are purged even if they occurred shortly before purge_before."""
async def _add_db_entries(hass: HomeAssistant, timestamp: datetime) -> None:
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
session.add(
Events(
event_id=1001,
@ -477,7 +477,7 @@ async def test_purge_cutoff_date(
timestamp_keep = cutoff
timestamp_purge = cutoff - timedelta(microseconds=1)
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
session.add(
Events(
event_id=1000,
@ -626,7 +626,7 @@ async def test_purge_filtered_states(
assert instance.entity_filter("sensor.excluded") is False
def _add_db_entries(hass: HomeAssistant) -> None:
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
# Add states and state_changed events that should be purged
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
@ -820,7 +820,7 @@ async def test_purge_filtered_states_to_empty(
assert instance.entity_filter("sensor.excluded") is False
def _add_db_entries(hass: HomeAssistant) -> None:
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
# Add states and state_changed events that should be purged
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
@ -877,7 +877,7 @@ async def test_purge_without_state_attributes_filtered_states_to_empty(
assert instance.entity_filter("sensor.old_format") is False
def _add_db_entries(hass: HomeAssistant) -> None:
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
# Add states and state_changed events that should be purged
# in the legacy format
timestamp = dt_util.utcnow() - timedelta(days=5)
@ -944,7 +944,7 @@ async def test_purge_filtered_events(
await async_setup_recorder_instance(hass, config)
def _add_db_entries(hass: HomeAssistant) -> None:
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
# Add events that should be purged
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
@ -1038,7 +1038,7 @@ async def test_purge_filtered_events_state_changed(
assert instance.entity_filter("sensor.excluded") is True
def _add_db_entries(hass: HomeAssistant) -> None:
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
# Add states and state_changed events that should be purged
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
@ -1154,7 +1154,7 @@ async def test_purge_entities(
await async_wait_purge_done(hass)
def _add_purge_records(hass: HomeAssistant) -> None:
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
# Add states and state_changed events that should be purged
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
@ -1186,7 +1186,7 @@ async def test_purge_entities(
)
def _add_keep_records(hass: HomeAssistant) -> None:
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
# Add states and state_changed events that should be kept
timestamp = dt_util.utcnow() - timedelta(days=2)
for event_id in range(200, 210):
@ -1289,7 +1289,8 @@ async def _add_test_states(hass: HomeAssistant):
attributes = {"dontpurgeme": True, **base_attributes}
with patch(
"homeassistant.components.recorder.dt_util.utcnow", return_value=timestamp
"homeassistant.components.recorder.core.dt_util.utcnow",
return_value=timestamp,
):
await set_state("test.recorder2", state, attributes=attributes)
@ -1304,7 +1305,7 @@ async def _add_test_events(hass: HomeAssistant):
await hass.async_block_till_done()
await async_wait_recording_done(hass)
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
@ -1335,7 +1336,7 @@ async def _add_test_statistics(hass: HomeAssistant):
await hass.async_block_till_done()
await async_wait_recording_done(hass)
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
@ -1364,7 +1365,7 @@ async def _add_test_recorder_runs(hass: HomeAssistant):
await hass.async_block_till_done()
await async_wait_recording_done(hass)
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
for rec_id in range(6):
if rec_id < 2:
timestamp = eleven_days_ago
@ -1391,7 +1392,7 @@ async def _add_test_statistics_runs(hass: HomeAssistant):
await hass.async_block_till_done()
await async_wait_recording_done(hass)
with recorder.session_scope(hass=hass) as session:
with session_scope(hass=hass) as session:
for rec_id in range(6):
if rec_id < 2:
timestamp = eleven_days_ago

View File

@ -839,7 +839,9 @@ def record_states(hass):
four = three + timedelta(seconds=15 * 5)
states = {mp: [], sns1: [], sns2: [], sns3: [], sns4: []}
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=one
):
states[mp].append(
set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)})
)
@ -851,13 +853,17 @@ def record_states(hass):
states[sns3].append(set_state(sns3, "10", attributes=sns3_attr))
states[sns4].append(set_state(sns4, "10", attributes=sns4_attr))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=two
):
states[sns1].append(set_state(sns1, "15", attributes=sns1_attr))
states[sns2].append(set_state(sns2, "15", attributes=sns2_attr))
states[sns3].append(set_state(sns3, "15", attributes=sns3_attr))
states[sns4].append(set_state(sns4, "15", attributes=sns4_attr))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=three
):
states[sns1].append(set_state(sns1, "20", attributes=sns1_attr))
states[sns2].append(set_state(sns2, "20", attributes=sns2_attr))
states[sns3].append(set_state(sns3, "20", attributes=sns3_attr))

View File

@ -45,7 +45,7 @@ def test_recorder_bad_commit(hass_recorder):
session.execute(text("select * from notthere"))
with patch(
"homeassistant.components.recorder.time.sleep"
"homeassistant.components.recorder.core.time.sleep"
) as e_mock, util.session_scope(hass=hass) as session:
res = util.commit(session, work)
assert res is False
@ -66,7 +66,7 @@ def test_recorder_bad_execute(hass_recorder):
mck1.to_native = to_native
with pytest.raises(SQLAlchemyError), patch(
"homeassistant.components.recorder.time.sleep"
"homeassistant.components.recorder.core.time.sleep"
) as e_mock:
util.execute((mck1,), to_native=True)
@ -148,7 +148,7 @@ async def test_last_run_was_recently_clean(
"homeassistant.components.recorder.util.last_run_was_recently_clean",
wraps=_last_run_was_recently_clean,
) as last_run_was_recently_clean_mock, patch(
"homeassistant.components.recorder.dt_util.utcnow",
"homeassistant.components.recorder.core.dt_util.utcnow",
return_value=thirty_min_future_time,
):
hass = await async_test_home_assistant(None)

View File

@ -323,9 +323,10 @@ async def test_recorder_info_migration_queue_exhausted(hass, hass_ws_client):
with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch(
"homeassistant.components.recorder.Recorder.async_periodic_statistics"
), patch(
"homeassistant.components.recorder.create_engine", new=create_engine_test
"homeassistant.components.recorder.core.create_engine",
new=create_engine_test,
), patch.object(
recorder, "MAX_QUEUE_BACKLOG", 1
recorder.core, "MAX_QUEUE_BACKLOG", 1
), patch(
"homeassistant.components.recorder.migration.migrate_schema",
wraps=stalled_migration,
@ -384,7 +385,7 @@ async def test_backup_start_timeout(
# Ensure there are no queued events
await async_wait_recording_done(hass)
with patch.object(recorder, "DB_LOCK_TIMEOUT", 0):
with patch.object(recorder.core, "DB_LOCK_TIMEOUT", 0):
try:
await client.send_json({"id": 1, "type": "backup/start"})
response = await client.receive_json()

View File

@ -172,7 +172,9 @@ def test_compile_hourly_statistics_purged_state_changes(
mean = min = max = float(hist["sensor.test1"][-1].state)
# Purge all states from the database
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=four):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=four
):
hass.services.call("recorder", "purge", {"keep_days": 0})
hass.block_till_done()
wait_recording_done(hass)
@ -2747,17 +2749,23 @@ def record_states(hass, zero, entity_id, attributes, seq=None):
four = three + timedelta(seconds=10 * 5)
states = {entity_id: []}
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=one
):
states[entity_id].append(
set_state(entity_id, str(seq[0]), attributes=attributes)
)
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=two
):
states[entity_id].append(
set_state(entity_id, str(seq[1]), attributes=attributes)
)
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=three
):
states[entity_id].append(
set_state(entity_id, str(seq[2]), attributes=attributes)
)
@ -3339,35 +3347,53 @@ def record_meter_states(hass, zero, entity_id, _attributes, seq):
attributes["last_reset"] = zero.isoformat()
states = {entity_id: []}
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=zero):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=zero
):
states[entity_id].append(set_state(entity_id, seq[0], attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=one
):
states[entity_id].append(set_state(entity_id, seq[1], attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=two
):
states[entity_id].append(set_state(entity_id, seq[2], attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=three
):
states[entity_id].append(set_state(entity_id, seq[3], attributes=attributes))
attributes = dict(_attributes)
if "last_reset" in _attributes:
attributes["last_reset"] = four.isoformat()
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=four):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=four
):
states[entity_id].append(set_state(entity_id, seq[4], attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=five):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=five
):
states[entity_id].append(set_state(entity_id, seq[5], attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=six):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=six
):
states[entity_id].append(set_state(entity_id, seq[6], attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=seven):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=seven
):
states[entity_id].append(set_state(entity_id, seq[7], attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=eight):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=eight
):
states[entity_id].append(set_state(entity_id, seq[8], attributes=attributes))
return four, eight, states
@ -3386,7 +3412,9 @@ def record_meter_state(hass, zero, entity_id, attributes, seq):
return hass.states.get(entity_id)
states = {entity_id: []}
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=zero):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=zero
):
states[entity_id].append(set_state(entity_id, seq[0], attributes=attributes))
return states
@ -3410,13 +3438,19 @@ def record_states_partially_unavailable(hass, zero, entity_id, attributes):
four = three + timedelta(seconds=15 * 5)
states = {entity_id: []}
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=one
):
states[entity_id].append(set_state(entity_id, "10", attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=two
):
states[entity_id].append(set_state(entity_id, "25", attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three):
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=three
):
states[entity_id].append(
set_state(entity_id, STATE_UNAVAILABLE, attributes=attributes)
)