Accommodate mysql servers with a low wait_timeout (#33638)

Some providers have set their wait_timeout to 60s
in order to pack as many users as they can on a machine.
The mysql default is 28800 seconds (8 hours)

Since mysql connection build and tear down is relativity
expensive, we want to avoid being disconnected.

We now accommodate this scenario with the following:

1. Raise the mysql session wait_timeout 28800 when we connect
2. The event session now does a 30 second keep alive to
   ensure the connection stays open
pull/33665/merge
J. Nick Koston 2020-04-08 14:56:22 -05:00 committed by GitHub
parent 0b715b751d
commit b09b5729a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 40 additions and 6 deletions

View File

@ -5,12 +5,11 @@ import concurrent.futures
from datetime import datetime, timedelta
import logging
import queue
from sqlite3 import Connection
import threading
import time
from typing import Any, Dict, Optional
from sqlalchemy import create_engine, exc
from sqlalchemy import create_engine, exc, select
from sqlalchemy.engine import Engine
from sqlalchemy.event import listens_for
from sqlalchemy.orm import scoped_session, sessionmaker
@ -61,6 +60,7 @@ DEFAULT_URL = "sqlite:///{hass_config_path}"
DEFAULT_DB_FILE = "home-assistant_v2.db"
DEFAULT_DB_MAX_RETRIES = 10
DEFAULT_DB_RETRY_WAIT = 3
KEEPALIVE_TIME = 30
CONF_DB_URL = "db_url"
CONF_DB_MAX_RETRIES = "db_max_retries"
@ -223,6 +223,7 @@ class Recorder(threading.Thread):
self.exclude_t = exclude.get(CONF_EVENT_TYPES, [])
self._timechanges_seen = 0
self._keepalive_count = 0
self.event_session = None
self.get_session = None
@ -353,6 +354,10 @@ class Recorder(threading.Thread):
continue
if event.event_type == EVENT_TIME_CHANGED:
self.queue.task_done()
self._keepalive_count += 1
if self._keepalive_count >= KEEPALIVE_TIME:
self._keepalive_count = 0
self._send_keep_alive()
if self.commit_interval:
self._timechanges_seen += 1
if self._timechanges_seen >= self.commit_interval:
@ -400,6 +405,18 @@ class Recorder(threading.Thread):
self.queue.task_done()
def _send_keep_alive(self):
try:
_LOGGER.debug("Sending keepalive")
self.event_session.connection().scalar(select([1]))
return
except Exception as err: # pylint: disable=broad-except
# Must catch the exception to prevent the loop from collapsing
_LOGGER.error(
"Error in database connectivity during keepalive: %s.", err,
)
self._reopen_event_session()
def _commit_event_session_or_retry(self):
tries = 1
while tries <= self.db_max_retries:
@ -419,7 +436,7 @@ class Recorder(threading.Thread):
)
else:
_LOGGER.error(
"Error in database connectivity: %s. "
"Error in database connectivity during commit: %s. "
"(retrying in %s seconds)",
err,
self.db_retry_wait,
@ -435,6 +452,15 @@ class Recorder(threading.Thread):
"Error in database update. Could not save " "after %d tries. Giving up",
tries,
)
self._reopen_event_session()
def _reopen_event_session(self):
try:
self.event_session.rollback()
except Exception as err: # pylint: disable=broad-except
# Must catch the exception to prevent the loop from collapsing
_LOGGER.exception("Error while rolling back event session: %s", err)
try:
self.event_session.close()
except Exception as err: # pylint: disable=broad-except
@ -470,15 +496,23 @@ class Recorder(threading.Thread):
# pylint: disable=unused-variable
@listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
"""Set sqlite's WAL mode."""
if isinstance(dbapi_connection, Connection):
def setup_connection(dbapi_connection, connection_record):
"""Dbapi specific connection settings."""
# We do not import sqlite3 here so mysql/other
# users do not have to pay for it to be loaded in
# memory
if self.db_url == "sqlite://" or ":memory:" in self.db_url:
old_isolation = dbapi_connection.isolation_level
dbapi_connection.isolation_level = None
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA journal_mode=WAL")
cursor.close()
dbapi_connection.isolation_level = old_isolation
elif self.db_url.startswith("mysql"):
cursor = dbapi_connection.cursor()
cursor.execute("SET session wait_timeout=28800")
cursor.close()
if self.db_url == "sqlite://" or ":memory:" in self.db_url:
kwargs["connect_args"] = {"check_same_thread": False}