Use SimpleQueue for recorder (#38967)

Now that python 3.7 is the minimum supported version, we can
use the more efficient SimpleQueue in the recorder as it does
not have to use threading.Lock
pull/39059/head
J. Nick Koston 2020-08-21 06:54:13 -05:00 committed by GitHub
parent 3cea3eb6e5
commit 7878d97588
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 10 deletions

View File

@ -212,7 +212,7 @@ class Recorder(threading.Thread):
self.auto_purge = auto_purge
self.keep_days = keep_days
self.commit_interval = commit_interval
self.queue: Any = queue.Queue()
self.queue: Any = queue.SimpleQueue()
self.recording_start = dt_util.utcnow()
self.db_url = uri
self.db_max_retries = db_max_retries
@ -339,16 +339,13 @@ class Recorder(threading.Thread):
if event is None:
self._close_run()
self._close_connection()
self.queue.task_done()
return
if isinstance(event, PurgeTask):
# Schedule a new purge task if this one didn't finish
if not purge.purge_old_data(self, event.keep_days, event.repack):
self.queue.put(PurgeTask(event.keep_days, event.repack))
self.queue.task_done()
continue
if event.event_type == EVENT_TIME_CHANGED:
self.queue.task_done()
self._keepalive_count += 1
if self._keepalive_count >= KEEPALIVE_TIME:
self._keepalive_count = 0
@ -360,13 +357,11 @@ class Recorder(threading.Thread):
self._commit_event_session_or_retry()
continue
if event.event_type in self.exclude_t:
self.queue.task_done()
continue
entity_id = event.data.get(ATTR_ENTITY_ID)
if entity_id is not None:
if not self.entity_filter(entity_id):
self.queue.task_done()
continue
try:
@ -409,8 +404,6 @@ class Recorder(threading.Thread):
if not self.commit_interval:
self._commit_event_session_or_retry()
self.queue.task_done()
def _send_keep_alive(self):
try:
_LOGGER.debug("Sending keepalive")
@ -493,8 +486,19 @@ class Recorder(threading.Thread):
self.queue.put(event)
def block_till_done(self):
"""Block till all events processed."""
self.queue.join()
"""Block till all events processed.
This is only called in tests.
This only blocks until the queue is empty
which does not mean the recorder is done.
Call tests.common's wait_recording_done
after calling this to ensure the data
is in the database.
"""
while not self.queue.empty():
time.sleep(0.025)
def _setup_connection(self):
"""Ensure database is ready to fly."""

View File

@ -10,6 +10,8 @@ from homeassistant.components.recorder.purge import purge_old_data
from homeassistant.components.recorder.util import session_scope
from homeassistant.util import dt as dt_util
from .common import wait_recording_done
from tests.async_mock import patch
from tests.common import get_test_home_assistant, init_recorder_component
@ -37,6 +39,7 @@ class TestRecorderPurge(unittest.TestCase):
self.hass.block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
with recorder.session_scope(hass=self.hass) as session:
for event_id in range(6):
@ -72,6 +75,7 @@ class TestRecorderPurge(unittest.TestCase):
self.hass.block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
with recorder.session_scope(hass=self.hass) as session:
for event_id in range(6):
@ -103,6 +107,7 @@ class TestRecorderPurge(unittest.TestCase):
self.hass.block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
with recorder.session_scope(hass=self.hass) as session:
for rec_id in range(6):
@ -183,6 +188,7 @@ class TestRecorderPurge(unittest.TestCase):
assert recorder_runs.count() == 7
self.hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
# run purge method - no service data, use defaults
self.hass.services.call("recorder", "purge")
@ -190,6 +196,7 @@ class TestRecorderPurge(unittest.TestCase):
# Small wait for recorder thread
self.hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
# only purged old events
assert states.count() == 4
@ -201,6 +208,7 @@ class TestRecorderPurge(unittest.TestCase):
# Small wait for recorder thread
self.hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
# we should only have 2 states left after purging
assert states.count() == 2
@ -223,6 +231,7 @@ class TestRecorderPurge(unittest.TestCase):
self.hass.services.call("recorder", "purge", service_data=service_data)
self.hass.block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
assert (
mock_logger.debug.mock_calls[5][1][0]
== "Vacuuming SQL DB to free space"