Purge recorder database at night (#33646)
parent
374fe47809
commit
6e2cf9663a
|
@ -2,7 +2,7 @@
|
|||
import asyncio
|
||||
from collections import namedtuple
|
||||
import concurrent.futures
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
import logging
|
||||
import queue
|
||||
import threading
|
||||
|
@ -62,6 +62,7 @@ DEFAULT_DB_MAX_RETRIES = 10
|
|||
DEFAULT_DB_RETRY_WAIT = 3
|
||||
KEEPALIVE_TIME = 30
|
||||
|
||||
CONF_AUTO_PURGE = "auto_purge"
|
||||
CONF_DB_URL = "db_url"
|
||||
CONF_DB_MAX_RETRIES = "db_max_retries"
|
||||
CONF_DB_RETRY_WAIT = "db_retry_wait"
|
||||
|
@ -90,25 +91,29 @@ FILTER_SCHEMA = vol.Schema(
|
|||
|
||||
CONFIG_SCHEMA = vol.Schema(
|
||||
{
|
||||
vol.Optional(DOMAIN, default=dict): FILTER_SCHEMA.extend(
|
||||
{
|
||||
vol.Optional(CONF_PURGE_KEEP_DAYS, default=10): vol.All(
|
||||
vol.Coerce(int), vol.Range(min=1)
|
||||
),
|
||||
vol.Optional(CONF_PURGE_INTERVAL, default=1): vol.All(
|
||||
vol.Coerce(int), vol.Range(min=0)
|
||||
),
|
||||
vol.Optional(CONF_DB_URL): cv.string,
|
||||
vol.Optional(CONF_COMMIT_INTERVAL, default=1): vol.All(
|
||||
vol.Coerce(int), vol.Range(min=0)
|
||||
),
|
||||
vol.Optional(
|
||||
CONF_DB_MAX_RETRIES, default=DEFAULT_DB_MAX_RETRIES
|
||||
): cv.positive_int,
|
||||
vol.Optional(
|
||||
CONF_DB_RETRY_WAIT, default=DEFAULT_DB_RETRY_WAIT
|
||||
): cv.positive_int,
|
||||
}
|
||||
vol.Optional(DOMAIN, default=dict): vol.All(
|
||||
cv.deprecated(CONF_PURGE_INTERVAL),
|
||||
FILTER_SCHEMA.extend(
|
||||
{
|
||||
vol.Optional(CONF_AUTO_PURGE, default=True): cv.boolean,
|
||||
vol.Optional(CONF_PURGE_KEEP_DAYS, default=10): vol.All(
|
||||
vol.Coerce(int), vol.Range(min=1)
|
||||
),
|
||||
vol.Optional(CONF_PURGE_INTERVAL, default=1): vol.All(
|
||||
vol.Coerce(int), vol.Range(min=0)
|
||||
),
|
||||
vol.Optional(CONF_DB_URL): cv.string,
|
||||
vol.Optional(CONF_COMMIT_INTERVAL, default=1): vol.All(
|
||||
vol.Coerce(int), vol.Range(min=0)
|
||||
),
|
||||
vol.Optional(
|
||||
CONF_DB_MAX_RETRIES, default=DEFAULT_DB_MAX_RETRIES
|
||||
): cv.positive_int,
|
||||
vol.Optional(
|
||||
CONF_DB_RETRY_WAIT, default=DEFAULT_DB_RETRY_WAIT
|
||||
): cv.positive_int,
|
||||
}
|
||||
),
|
||||
)
|
||||
},
|
||||
extra=vol.ALLOW_EXTRA,
|
||||
|
@ -143,8 +148,8 @@ def run_information(hass, point_in_time: Optional[datetime] = None):
|
|||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Set up the recorder."""
|
||||
conf = config[DOMAIN]
|
||||
keep_days = conf.get(CONF_PURGE_KEEP_DAYS)
|
||||
purge_interval = conf.get(CONF_PURGE_INTERVAL)
|
||||
auto_purge = conf[CONF_AUTO_PURGE]
|
||||
keep_days = conf[CONF_PURGE_KEEP_DAYS]
|
||||
commit_interval = conf[CONF_COMMIT_INTERVAL]
|
||||
db_max_retries = conf[CONF_DB_MAX_RETRIES]
|
||||
db_retry_wait = conf[CONF_DB_RETRY_WAIT]
|
||||
|
@ -157,8 +162,8 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|||
exclude = conf.get(CONF_EXCLUDE, {})
|
||||
instance = hass.data[DATA_INSTANCE] = Recorder(
|
||||
hass=hass,
|
||||
auto_purge=auto_purge,
|
||||
keep_days=keep_days,
|
||||
purge_interval=purge_interval,
|
||||
commit_interval=commit_interval,
|
||||
uri=db_url,
|
||||
db_max_retries=db_max_retries,
|
||||
|
@ -189,8 +194,8 @@ class Recorder(threading.Thread):
|
|||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
auto_purge: bool,
|
||||
keep_days: int,
|
||||
purge_interval: int,
|
||||
commit_interval: int,
|
||||
uri: str,
|
||||
db_max_retries: int,
|
||||
|
@ -202,8 +207,8 @@ class Recorder(threading.Thread):
|
|||
threading.Thread.__init__(self, name="Recorder")
|
||||
|
||||
self.hass = hass
|
||||
self.auto_purge = auto_purge
|
||||
self.keep_days = keep_days
|
||||
self.purge_interval = purge_interval
|
||||
self.commit_interval = commit_interval
|
||||
self.queue: Any = queue.Queue()
|
||||
self.recording_start = dt_util.utcnow()
|
||||
|
@ -314,28 +319,17 @@ class Recorder(threading.Thread):
|
|||
return
|
||||
|
||||
# Start periodic purge
|
||||
if self.keep_days and self.purge_interval:
|
||||
if self.auto_purge:
|
||||
|
||||
@callback
|
||||
def async_purge(now):
|
||||
"""Trigger the purge and schedule the next run."""
|
||||
"""Trigger the purge."""
|
||||
self.queue.put(PurgeTask(self.keep_days, repack=False))
|
||||
self.hass.helpers.event.async_track_point_in_time(
|
||||
async_purge, now + timedelta(days=self.purge_interval)
|
||||
)
|
||||
|
||||
earliest = dt_util.utcnow() + timedelta(minutes=30)
|
||||
run = latest = dt_util.utcnow() + timedelta(days=self.purge_interval)
|
||||
with session_scope(session=self.get_session()) as session:
|
||||
event = session.query(Events).first()
|
||||
if event is not None:
|
||||
session.expunge(event)
|
||||
run = dt_util.as_utc(event.time_fired) + timedelta(
|
||||
days=self.keep_days + self.purge_interval
|
||||
)
|
||||
run = min(latest, max(run, earliest))
|
||||
|
||||
self.hass.helpers.event.track_point_in_time(async_purge, run)
|
||||
# Purge every night at 4:12am
|
||||
self.hass.helpers.event.track_time_change(
|
||||
async_purge, hour=4, minute=12, second=0
|
||||
)
|
||||
|
||||
self.event_session = self.get_session()
|
||||
# Use a session for the event read loop
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
"""The tests for the Recorder component."""
|
||||
# pylint: disable=protected-access
|
||||
from datetime import datetime, timedelta
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
|
@ -10,8 +11,9 @@ from homeassistant.components.recorder.const import DATA_INSTANCE
|
|||
from homeassistant.components.recorder.models import Events, States
|
||||
from homeassistant.components.recorder.util import session_scope
|
||||
from homeassistant.const import MATCH_ALL
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.core import ATTR_NOW, EVENT_TIME_CHANGED, callback
|
||||
from homeassistant.setup import async_setup_component
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
from .common import wait_recording_done
|
||||
|
||||
|
@ -198,8 +200,8 @@ def test_recorder_setup_failure():
|
|||
setup.side_effect = ImportError("driver not found")
|
||||
rec = Recorder(
|
||||
hass,
|
||||
auto_purge=True,
|
||||
keep_days=7,
|
||||
purge_interval=2,
|
||||
commit_interval=1,
|
||||
uri="sqlite://",
|
||||
db_max_retries=10,
|
||||
|
@ -227,5 +229,31 @@ async def test_defaults_set(hass):
|
|||
assert await async_setup_component(hass, "history", {})
|
||||
|
||||
assert recorder_config is not None
|
||||
assert recorder_config["auto_purge"]
|
||||
assert recorder_config["purge_keep_days"] == 10
|
||||
assert recorder_config["purge_interval"] == 1
|
||||
|
||||
|
||||
def test_auto_purge(hass_recorder):
|
||||
"""Test saving and restoring a state."""
|
||||
hass = hass_recorder()
|
||||
|
||||
original_tz = dt_util.DEFAULT_TIME_ZONE
|
||||
|
||||
tz = dt_util.get_time_zone("Europe/Copenhagen")
|
||||
dt_util.set_default_time_zone(tz)
|
||||
|
||||
test_time = tz.localize(datetime(2020, 1, 1, 4, 12, 0))
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.recorder.purge.purge_old_data"
|
||||
) as purge_old_data:
|
||||
for delta in (-1, 0, 1):
|
||||
hass.bus.fire(
|
||||
EVENT_TIME_CHANGED, {ATTR_NOW: test_time + timedelta(seconds=delta)}
|
||||
)
|
||||
hass.block_till_done()
|
||||
hass.data[DATA_INSTANCE].block_till_done()
|
||||
|
||||
assert len(purge_old_data.mock_calls) == 1
|
||||
|
||||
dt_util.set_default_time_zone(original_tz)
|
||||
|
|
|
@ -26,7 +26,10 @@ async def test_schema_update_calls(hass):
|
|||
"""Test that schema migrations occur in correct order."""
|
||||
with patch(
|
||||
"homeassistant.components.recorder.create_engine", new=create_engine_test
|
||||
), patch("homeassistant.components.recorder.migration._apply_update") as update:
|
||||
), patch(
|
||||
"homeassistant.components.recorder.migration._apply_update",
|
||||
wraps=migration._apply_update,
|
||||
) as update:
|
||||
await async_setup_component(
|
||||
hass, "recorder", {"recorder": {"db_url": "sqlite://"}}
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue