Maintain recorder purge schedule (#10279)

* Maintain automated purge schedule

* Updates from review feedback
pull/9319/merge
PeteBa 2017-11-03 15:28:16 +00:00 committed by Paulus Schoutsen
parent 81324806d5
commit 1ffccfc91c
2 changed files with 46 additions and 24 deletions

View File

@ -14,6 +14,7 @@ from os import path
import queue import queue
import threading import threading
import time import time
from collections import namedtuple
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional, Dict from typing import Optional, Dict
@ -27,7 +28,6 @@ from homeassistant.const import (
EVENT_STATE_CHANGED, EVENT_TIME_CHANGED, MATCH_ALL) EVENT_STATE_CHANGED, EVENT_TIME_CHANGED, MATCH_ALL)
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entityfilter import generate_filter from homeassistant.helpers.entityfilter import generate_filter
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.typing import ConfigType from homeassistant.helpers.typing import ConfigType
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from homeassistant import config as conf_util from homeassistant import config as conf_util
@ -121,7 +121,7 @@ def run_information(hass, point_in_time: Optional[datetime]=None):
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the recorder.""" """Set up the recorder."""
conf = config.get(DOMAIN, {}) conf = config.get(DOMAIN, {})
purge_days = conf.get(CONF_PURGE_KEEP_DAYS) keep_days = conf.get(CONF_PURGE_KEEP_DAYS)
purge_interval = conf.get(CONF_PURGE_INTERVAL) purge_interval = conf.get(CONF_PURGE_INTERVAL)
db_url = conf.get(CONF_DB_URL, None) db_url = conf.get(CONF_DB_URL, None)
@ -132,28 +132,20 @@ def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
include = conf.get(CONF_INCLUDE, {}) include = conf.get(CONF_INCLUDE, {})
exclude = conf.get(CONF_EXCLUDE, {}) exclude = conf.get(CONF_EXCLUDE, {})
instance = hass.data[DATA_INSTANCE] = Recorder( instance = hass.data[DATA_INSTANCE] = Recorder(
hass, uri=db_url, include=include, exclude=exclude) hass=hass, keep_days=keep_days, purge_interval=purge_interval,
uri=db_url, include=include, exclude=exclude)
instance.async_initialize() instance.async_initialize()
instance.start() instance.start()
@asyncio.coroutine
def async_handle_purge_interval(now):
"""Handle purge interval."""
instance.do_purge(purge_days)
@asyncio.coroutine @asyncio.coroutine
def async_handle_purge_service(service): def async_handle_purge_service(service):
"""Handle calls to the purge service.""" """Handle calls to the purge service."""
instance.do_purge(service.data[ATTR_KEEP_DAYS]) instance.do_adhoc_purge(service.data[ATTR_KEEP_DAYS])
descriptions = yield from hass.async_add_job( descriptions = yield from hass.async_add_job(
conf_util.load_yaml_config_file, path.join( conf_util.load_yaml_config_file, path.join(
path.dirname(__file__), 'services.yaml')) path.dirname(__file__), 'services.yaml'))
if purge_interval and purge_days:
async_track_time_interval(hass, async_handle_purge_interval,
timedelta(days=purge_interval))
hass.services.async_register(DOMAIN, SERVICE_PURGE, hass.services.async_register(DOMAIN, SERVICE_PURGE,
async_handle_purge_service, async_handle_purge_service,
descriptions.get(SERVICE_PURGE), descriptions.get(SERVICE_PURGE),
@ -162,16 +154,21 @@ def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
return (yield from instance.async_db_ready) return (yield from instance.async_db_ready)
PurgeTask = namedtuple('PurgeTask', ['keep_days'])
class Recorder(threading.Thread): class Recorder(threading.Thread):
"""A threaded recorder class.""" """A threaded recorder class."""
def __init__(self, hass: HomeAssistant, uri: str, def __init__(self, hass: HomeAssistant, keep_days: int,
purge_interval: int, uri: str,
include: Dict, exclude: Dict) -> None: include: Dict, exclude: Dict) -> None:
"""Initialize the recorder.""" """Initialize the recorder."""
threading.Thread.__init__(self, name='Recorder') threading.Thread.__init__(self, name='Recorder')
self.hass = hass self.hass = hass
self.purge_days = None self.keep_days = keep_days
self.purge_interval = purge_interval
self.queue = queue.Queue() # type: Any self.queue = queue.Queue() # type: Any
self.recording_start = dt_util.utcnow() self.recording_start = dt_util.utcnow()
self.db_url = uri self.db_url = uri
@ -186,18 +183,16 @@ class Recorder(threading.Thread):
self.exclude_t = exclude.get(CONF_EVENT_TYPES, []) self.exclude_t = exclude.get(CONF_EVENT_TYPES, [])
self.get_session = None self.get_session = None
self.purge_task = object()
@callback @callback
def async_initialize(self): def async_initialize(self):
"""Initialize the recorder.""" """Initialize the recorder."""
self.hass.bus.async_listen(MATCH_ALL, self.event_listener) self.hass.bus.async_listen(MATCH_ALL, self.event_listener)
def do_purge(self, purge_days=None): def do_adhoc_purge(self, keep_days):
"""Event listener for purging data.""" """Trigger an adhoc purge retaining keep_days worth of data."""
if purge_days is not None: if keep_days is not None:
self.purge_days = purge_days self.queue.put(PurgeTask(keep_days))
self.queue.put(self.purge_task)
def run(self): def run(self):
"""Start processing events to save.""" """Start processing events to save."""
@ -264,6 +259,31 @@ class Recorder(threading.Thread):
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START,
notify_hass_started) notify_hass_started)
if self.keep_days and self.purge_interval:
async_track_point_in_time = \
self.hass.helpers.event.async_track_point_in_time
@callback
def async_purge(now):
"""Trigger the purge and schedule the next run."""
self.queue.put(PurgeTask(self.keep_days))
async_track_point_in_time(async_purge, now + timedelta(
days=self.purge_interval))
earliest = dt_util.utcnow() + timedelta(minutes=30)
run = latest = dt_util.utcnow() + \
timedelta(days=self.purge_interval)
with session_scope(session=self.get_session()) as session:
event = session.query(Events).first()
if event is not None:
session.expunge(event)
run = dt_util.UTC.localize(event.time_fired) + \
timedelta(days=self.keep_days+self.purge_interval)
run = min(latest, max(run, earliest))
_LOGGER.debug("Scheduling purge run for %s", run)
async_track_point_in_time(async_purge, run)
self.hass.add_job(register) self.hass.add_job(register)
result = hass_started.result() result = hass_started.result()
@ -279,8 +299,9 @@ class Recorder(threading.Thread):
self._close_connection() self._close_connection()
self.queue.task_done() self.queue.task_done()
return return
elif event is self.purge_task: elif isinstance(event, PurgeTask):
purge.purge_old_data(self, self.purge_days) purge.purge_old_data(self, event.keep_days)
self.queue.task_done()
continue continue
elif event.event_type == EVENT_TIME_CHANGED: elif event.event_type == EVENT_TIME_CHANGED:
self.queue.task_done() self.queue.task_done()

View File

@ -195,7 +195,8 @@ def test_recorder_setup_failure():
with patch.object(Recorder, '_setup_connection') as setup, \ with patch.object(Recorder, '_setup_connection') as setup, \
patch('homeassistant.components.recorder.time.sleep'): patch('homeassistant.components.recorder.time.sleep'):
setup.side_effect = ImportError("driver not found") setup.side_effect = ImportError("driver not found")
rec = Recorder(hass, uri='sqlite://', include={}, exclude={}) rec = Recorder(hass, keep_days=7, purge_interval=2,
uri='sqlite://', include={}, exclude={})
rec.start() rec.start()
rec.join() rec.join()