diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index a285486437e..df19f0125ef 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -14,6 +14,7 @@ from os import path import queue import threading import time +from collections import namedtuple from datetime import datetime, timedelta from typing import Optional, Dict @@ -27,7 +28,6 @@ from homeassistant.const import ( EVENT_STATE_CHANGED, EVENT_TIME_CHANGED, MATCH_ALL) import homeassistant.helpers.config_validation as cv from homeassistant.helpers.entityfilter import generate_filter -from homeassistant.helpers.event import async_track_time_interval from homeassistant.helpers.typing import ConfigType import homeassistant.util.dt as dt_util from homeassistant import config as conf_util @@ -121,7 +121,7 @@ def run_information(hass, point_in_time: Optional[datetime]=None): def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the recorder.""" conf = config.get(DOMAIN, {}) - purge_days = conf.get(CONF_PURGE_KEEP_DAYS) + keep_days = conf.get(CONF_PURGE_KEEP_DAYS) purge_interval = conf.get(CONF_PURGE_INTERVAL) db_url = conf.get(CONF_DB_URL, None) @@ -132,28 +132,20 @@ def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: include = conf.get(CONF_INCLUDE, {}) exclude = conf.get(CONF_EXCLUDE, {}) instance = hass.data[DATA_INSTANCE] = Recorder( - hass, uri=db_url, include=include, exclude=exclude) + hass=hass, keep_days=keep_days, purge_interval=purge_interval, + uri=db_url, include=include, exclude=exclude) instance.async_initialize() instance.start() - @asyncio.coroutine - def async_handle_purge_interval(now): - """Handle purge interval.""" - instance.do_purge(purge_days) - @asyncio.coroutine def async_handle_purge_service(service): """Handle calls to the purge service.""" - instance.do_purge(service.data[ATTR_KEEP_DAYS]) + instance.do_adhoc_purge(service.data[ATTR_KEEP_DAYS]) descriptions = yield from hass.async_add_job( conf_util.load_yaml_config_file, path.join( path.dirname(__file__), 'services.yaml')) - if purge_interval and purge_days: - async_track_time_interval(hass, async_handle_purge_interval, - timedelta(days=purge_interval)) - hass.services.async_register(DOMAIN, SERVICE_PURGE, async_handle_purge_service, descriptions.get(SERVICE_PURGE), @@ -162,16 +154,21 @@ def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: return (yield from instance.async_db_ready) +PurgeTask = namedtuple('PurgeTask', ['keep_days']) + + class Recorder(threading.Thread): """A threaded recorder class.""" - def __init__(self, hass: HomeAssistant, uri: str, + def __init__(self, hass: HomeAssistant, keep_days: int, + purge_interval: int, uri: str, include: Dict, exclude: Dict) -> None: """Initialize the recorder.""" threading.Thread.__init__(self, name='Recorder') self.hass = hass - self.purge_days = None + self.keep_days = keep_days + self.purge_interval = purge_interval self.queue = queue.Queue() # type: Any self.recording_start = dt_util.utcnow() self.db_url = uri @@ -186,18 +183,16 @@ class Recorder(threading.Thread): self.exclude_t = exclude.get(CONF_EVENT_TYPES, []) self.get_session = None - self.purge_task = object() @callback def async_initialize(self): """Initialize the recorder.""" self.hass.bus.async_listen(MATCH_ALL, self.event_listener) - def do_purge(self, purge_days=None): - """Event listener for purging data.""" - if purge_days is not None: - self.purge_days = purge_days - self.queue.put(self.purge_task) + def do_adhoc_purge(self, keep_days): + """Trigger an adhoc purge retaining keep_days worth of data.""" + if keep_days is not None: + self.queue.put(PurgeTask(keep_days)) def run(self): """Start processing events to save.""" @@ -264,6 +259,31 @@ class Recorder(threading.Thread): self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, notify_hass_started) + if self.keep_days and self.purge_interval: + async_track_point_in_time = \ + self.hass.helpers.event.async_track_point_in_time + + @callback + def async_purge(now): + """Trigger the purge and schedule the next run.""" + self.queue.put(PurgeTask(self.keep_days)) + async_track_point_in_time(async_purge, now + timedelta( + days=self.purge_interval)) + + earliest = dt_util.utcnow() + timedelta(minutes=30) + run = latest = dt_util.utcnow() + \ + timedelta(days=self.purge_interval) + with session_scope(session=self.get_session()) as session: + event = session.query(Events).first() + if event is not None: + session.expunge(event) + run = dt_util.UTC.localize(event.time_fired) + \ + timedelta(days=self.keep_days+self.purge_interval) + run = min(latest, max(run, earliest)) + + _LOGGER.debug("Scheduling purge run for %s", run) + async_track_point_in_time(async_purge, run) + self.hass.add_job(register) result = hass_started.result() @@ -279,8 +299,9 @@ class Recorder(threading.Thread): self._close_connection() self.queue.task_done() return - elif event is self.purge_task: - purge.purge_old_data(self, self.purge_days) + elif isinstance(event, PurgeTask): + purge.purge_old_data(self, event.keep_days) + self.queue.task_done() continue elif event.event_type == EVENT_TIME_CHANGED: self.queue.task_done() diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index ed04e96a43c..58b8dc1f839 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -195,7 +195,8 @@ def test_recorder_setup_failure(): with patch.object(Recorder, '_setup_connection') as setup, \ patch('homeassistant.components.recorder.time.sleep'): setup.side_effect = ImportError("driver not found") - rec = Recorder(hass, uri='sqlite://', include={}, exclude={}) + rec = Recorder(hass, keep_days=7, purge_interval=2, + uri='sqlite://', include={}, exclude={}) rec.start() rec.join()