Ensure recorder event loop recovers if the database server dis… (#33253)

If the database server disconnects there were exceptions
that were not trapped which would cause the recorder event
loop to collapse.  As we never want the loop to end
we trap exceptions broadly.

Fix a bug in the new commit interval setting which caused
it to always commit after 1s
pull/33262/head
J. Nick Koston 2020-03-25 15:08:20 -05:00 committed by GitHub
parent 4bbc0a03ca
commit 6d311a31dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 33 additions and 14 deletions

View File

@ -342,7 +342,6 @@ class Recorder(threading.Thread):
# has changed. This reduces the disk io. # has changed. This reduces the disk io.
while True: while True:
event = self.queue.get() event = self.queue.get()
if event is None: if event is None:
self._close_run() self._close_run()
self._close_connection() self._close_connection()
@ -356,7 +355,7 @@ class Recorder(threading.Thread):
self.queue.task_done() self.queue.task_done()
if self.commit_interval: if self.commit_interval:
self._timechanges_seen += 1 self._timechanges_seen += 1
if self.commit_interval >= self._timechanges_seen: if self._timechanges_seen >= self.commit_interval:
self._timechanges_seen = 0 self._timechanges_seen = 0
self._commit_event_session_or_retry() self._commit_event_session_or_retry()
continue continue
@ -376,6 +375,9 @@ class Recorder(threading.Thread):
self.event_session.flush() self.event_session.flush()
except (TypeError, ValueError): except (TypeError, ValueError):
_LOGGER.warning("Event is not JSON serializable: %s", event) _LOGGER.warning("Event is not JSON serializable: %s", event)
except Exception as err: # pylint: disable=broad-except
# Must catch the exception to prevent the loop from collapsing
_LOGGER.exception("Error adding event: %s", err)
if dbevent and event.event_type == EVENT_STATE_CHANGED: if dbevent and event.event_type == EVENT_STATE_CHANGED:
try: try:
@ -387,6 +389,9 @@ class Recorder(threading.Thread):
"State is not JSON serializable: %s", "State is not JSON serializable: %s",
event.data.get("new_state"), event.data.get("new_state"),
) )
except Exception as err: # pylint: disable=broad-except
# Must catch the exception to prevent the loop from collapsing
_LOGGER.exception("Error adding state change: %s", err)
# If they do not have a commit interval # If they do not have a commit interval
# than we commit right away # than we commit right away
@ -404,17 +409,26 @@ class Recorder(threading.Thread):
try: try:
self._commit_event_session() self._commit_event_session()
return return
except (exc.InternalError, exc.OperationalError) as err:
except exc.OperationalError as err: if err.connection_invalidated:
_LOGGER.error( _LOGGER.error(
"Error in database connectivity: %s. " "(retrying in %s seconds)", "Database connection invalidated: %s. "
err, "(retrying in %s seconds)",
self.db_retry_wait, err,
) self.db_retry_wait,
)
else:
_LOGGER.error(
"Error in database connectivity: %s. "
"(retrying in %s seconds)",
err,
self.db_retry_wait,
)
tries += 1 tries += 1
except exc.SQLAlchemyError: except Exception as err: # pylint: disable=broad-except
_LOGGER.exception("Error saving events") # Must catch the exception to prevent the loop from collapsing
_LOGGER.exception("Error saving events: %s", err)
return return
_LOGGER.error( _LOGGER.error(
@ -423,10 +437,15 @@ class Recorder(threading.Thread):
) )
try: try:
self.event_session.close() self.event_session.close()
except exc.SQLAlchemyError: except Exception as err: # pylint: disable=broad-except
_LOGGER.exception("Failed to close event session.") # Must catch the exception to prevent the loop from collapsing
_LOGGER.exception("Error while closing event session: %s", err)
self.event_session = self.get_session() try:
self.event_session = self.get_session()
except Exception as err: # pylint: disable=broad-except
# Must catch the exception to prevent the loop from collapsing
_LOGGER.exception("Error while creating new event session: %s", err)
def _commit_event_session(self): def _commit_event_session(self):
try: try: