487 lines
15 KiB
Python
487 lines
15 KiB
Python
"""
|
|
Support for recording details.
|
|
|
|
Component that records all events and state changes. Allows other components
|
|
to query this database.
|
|
|
|
For more details about this component, please refer to the documentation at
|
|
https://home-assistant.io/components/recorder/
|
|
"""
|
|
import atexit
|
|
import json
|
|
import logging
|
|
import queue
|
|
import sqlite3
|
|
import threading
|
|
from datetime import date, datetime
|
|
|
|
import homeassistant.util.dt as dt_util
|
|
from homeassistant.const import (
|
|
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED,
|
|
EVENT_TIME_CHANGED, MATCH_ALL)
|
|
from homeassistant.core import Event, EventOrigin, State
|
|
from homeassistant.remote import JSONEncoder
|
|
|
|
DOMAIN = "recorder"
|
|
|
|
DB_FILE = 'home-assistant.db'
|
|
|
|
RETURN_ROWCOUNT = "rowcount"
|
|
RETURN_LASTROWID = "lastrowid"
|
|
RETURN_ONE_ROW = "one_row"
|
|
|
|
_INSTANCE = None
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def query(sql_query, arguments=None):
|
|
"""Query the database."""
|
|
_verify_instance()
|
|
|
|
return _INSTANCE.query(sql_query, arguments)
|
|
|
|
|
|
def query_states(state_query, arguments=None):
|
|
"""Query the database and return a list of states."""
|
|
return [
|
|
row for row in
|
|
(row_to_state(row) for row in query(state_query, arguments))
|
|
if row is not None]
|
|
|
|
|
|
def query_events(event_query, arguments=None):
|
|
"""Query the database and return a list of states."""
|
|
return [
|
|
row for row in
|
|
(row_to_event(row) for row in query(event_query, arguments))
|
|
if row is not None]
|
|
|
|
|
|
def row_to_state(row):
|
|
"""Convert a database row to a state."""
|
|
try:
|
|
return State(
|
|
row[1], row[2], json.loads(row[3]),
|
|
dt_util.utc_from_timestamp(row[4]),
|
|
dt_util.utc_from_timestamp(row[5]))
|
|
except ValueError:
|
|
# When json.loads fails
|
|
_LOGGER.exception("Error converting row to state: %s", row)
|
|
return None
|
|
|
|
|
|
def row_to_event(row):
|
|
"""Convert a databse row to an event."""
|
|
try:
|
|
return Event(row[1], json.loads(row[2]), EventOrigin(row[3]),
|
|
dt_util.utc_from_timestamp(row[5]))
|
|
except ValueError:
|
|
# When json.loads fails
|
|
_LOGGER.exception("Error converting row to event: %s", row)
|
|
return None
|
|
|
|
|
|
def run_information(point_in_time=None):
|
|
"""Return information about current run.
|
|
|
|
There is also the run that covers point_in_time.
|
|
"""
|
|
_verify_instance()
|
|
|
|
if point_in_time is None or point_in_time > _INSTANCE.recording_start:
|
|
return RecorderRun()
|
|
|
|
run = _INSTANCE.query(
|
|
"SELECT * FROM recorder_runs WHERE start<? AND END>?",
|
|
(point_in_time, point_in_time), return_value=RETURN_ONE_ROW)
|
|
|
|
return RecorderRun(run) if run else None
|
|
|
|
|
|
def setup(hass, config):
|
|
"""Setup the recorder."""
|
|
# pylint: disable=global-statement
|
|
global _INSTANCE
|
|
|
|
_INSTANCE = Recorder(hass)
|
|
|
|
return True
|
|
|
|
|
|
class RecorderRun(object):
|
|
"""Representation of arecorder run."""
|
|
|
|
def __init__(self, row=None):
|
|
"""Initialize the recorder run."""
|
|
self.end = None
|
|
|
|
if row is None:
|
|
self.start = _INSTANCE.recording_start
|
|
self.closed_incorrect = False
|
|
else:
|
|
self.start = dt_util.utc_from_timestamp(row[1])
|
|
|
|
if row[2] is not None:
|
|
self.end = dt_util.utc_from_timestamp(row[2])
|
|
|
|
self.closed_incorrect = bool(row[3])
|
|
|
|
def entity_ids(self, point_in_time=None):
|
|
"""Return the entity ids that existed in this run.
|
|
|
|
Specify point_in_time if you want to know which existed at that point
|
|
in time inside the run.
|
|
"""
|
|
where = self.where_after_start_run
|
|
where_data = []
|
|
|
|
if point_in_time is not None or self.end is not None:
|
|
where += "AND created < ? "
|
|
where_data.append(point_in_time or self.end)
|
|
|
|
return [row[0] for row in query(
|
|
"SELECT entity_id FROM states WHERE {}"
|
|
"GROUP BY entity_id".format(where), where_data)]
|
|
|
|
@property
|
|
def where_after_start_run(self):
|
|
"""Return SQL WHERE clause.
|
|
|
|
Selection of the rows created after the start of the run.
|
|
"""
|
|
return "created >= {} ".format(_adapt_datetime(self.start))
|
|
|
|
@property
|
|
def where_limit_to_run(self):
|
|
"""Return a SQL WHERE clause.
|
|
|
|
For limiting the results to this run.
|
|
"""
|
|
where = self.where_after_start_run
|
|
|
|
if self.end is not None:
|
|
where += "AND created < {} ".format(_adapt_datetime(self.end))
|
|
|
|
return where
|
|
|
|
|
|
class Recorder(threading.Thread):
|
|
"""A threaded recorder class."""
|
|
|
|
def __init__(self, hass):
|
|
"""Initialize the recorder."""
|
|
threading.Thread.__init__(self)
|
|
|
|
self.hass = hass
|
|
self.conn = None
|
|
self.queue = queue.Queue()
|
|
self.quit_object = object()
|
|
self.lock = threading.Lock()
|
|
self.recording_start = dt_util.utcnow()
|
|
self.utc_offset = dt_util.now().utcoffset().total_seconds()
|
|
|
|
def start_recording(event):
|
|
"""Start recording."""
|
|
self.start()
|
|
|
|
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_recording)
|
|
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self.shutdown)
|
|
hass.bus.listen(MATCH_ALL, self.event_listener)
|
|
|
|
def run(self):
|
|
"""Start processing events to save."""
|
|
self._setup_connection()
|
|
self._setup_run()
|
|
|
|
while True:
|
|
event = self.queue.get()
|
|
|
|
if event == self.quit_object:
|
|
self._close_run()
|
|
self._close_connection()
|
|
self.queue.task_done()
|
|
return
|
|
|
|
elif event.event_type == EVENT_TIME_CHANGED:
|
|
self.queue.task_done()
|
|
continue
|
|
|
|
event_id = self.record_event(event)
|
|
|
|
if event.event_type == EVENT_STATE_CHANGED:
|
|
self.record_state(
|
|
event.data['entity_id'], event.data.get('new_state'),
|
|
event_id)
|
|
|
|
self.queue.task_done()
|
|
|
|
def event_listener(self, event):
|
|
"""Listen for new events and put them in the process queue."""
|
|
self.queue.put(event)
|
|
|
|
def shutdown(self, event):
|
|
"""Tell the recorder to shut down."""
|
|
self.queue.put(self.quit_object)
|
|
self.block_till_done()
|
|
|
|
def record_state(self, entity_id, state, event_id):
|
|
"""Save a state to the database."""
|
|
now = dt_util.utcnow()
|
|
|
|
# State got deleted
|
|
if state is None:
|
|
state_state = ''
|
|
state_domain = ''
|
|
state_attr = '{}'
|
|
last_changed = last_updated = now
|
|
else:
|
|
state_domain = state.domain
|
|
state_state = state.state
|
|
state_attr = json.dumps(dict(state.attributes))
|
|
last_changed = state.last_changed
|
|
last_updated = state.last_updated
|
|
|
|
info = (
|
|
entity_id, state_domain, state_state, state_attr,
|
|
last_changed, last_updated,
|
|
now, self.utc_offset, event_id)
|
|
|
|
self.query(
|
|
"""
|
|
INSERT INTO states (
|
|
entity_id, domain, state, attributes, last_changed, last_updated,
|
|
created, utc_offset, event_id)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
info)
|
|
|
|
def record_event(self, event):
|
|
"""Save an event to the database."""
|
|
info = (
|
|
event.event_type, json.dumps(event.data, cls=JSONEncoder),
|
|
str(event.origin), dt_util.utcnow(), event.time_fired,
|
|
self.utc_offset
|
|
)
|
|
|
|
return self.query(
|
|
"INSERT INTO events ("
|
|
"event_type, event_data, origin, created, time_fired, utc_offset"
|
|
") VALUES (?, ?, ?, ?, ?, ?)", info, RETURN_LASTROWID)
|
|
|
|
def query(self, sql_query, data=None, return_value=None):
|
|
"""Query the database."""
|
|
try:
|
|
with self.conn, self.lock:
|
|
_LOGGER.debug("Running query %s", sql_query)
|
|
|
|
cur = self.conn.cursor()
|
|
|
|
if data is not None:
|
|
cur.execute(sql_query, data)
|
|
else:
|
|
cur.execute(sql_query)
|
|
|
|
if return_value == RETURN_ROWCOUNT:
|
|
return cur.rowcount
|
|
elif return_value == RETURN_LASTROWID:
|
|
return cur.lastrowid
|
|
elif return_value == RETURN_ONE_ROW:
|
|
return cur.fetchone()
|
|
else:
|
|
return cur.fetchall()
|
|
|
|
except (sqlite3.IntegrityError, sqlite3.OperationalError,
|
|
sqlite3.ProgrammingError):
|
|
_LOGGER.exception(
|
|
"Error querying the database using: %s", sql_query)
|
|
return []
|
|
|
|
def block_till_done(self):
|
|
"""Block till all events processed."""
|
|
self.queue.join()
|
|
|
|
def _setup_connection(self):
|
|
"""Ensure database is ready to fly."""
|
|
db_path = self.hass.config.path(DB_FILE)
|
|
self.conn = sqlite3.connect(db_path, check_same_thread=False)
|
|
self.conn.row_factory = sqlite3.Row
|
|
|
|
# Make sure the database is closed whenever Python exits
|
|
# without the STOP event being fired.
|
|
atexit.register(self._close_connection)
|
|
|
|
# Have datetime objects be saved as integers.
|
|
sqlite3.register_adapter(date, _adapt_datetime)
|
|
sqlite3.register_adapter(datetime, _adapt_datetime)
|
|
|
|
# Validate we are on the correct schema or that we have to migrate.
|
|
cur = self.conn.cursor()
|
|
|
|
def save_migration(migration_id):
|
|
"""Save and commit a migration to the database."""
|
|
cur.execute('INSERT INTO schema_version VALUES (?, ?)',
|
|
(migration_id, dt_util.utcnow()))
|
|
self.conn.commit()
|
|
_LOGGER.info("Database migrated to version %d", migration_id)
|
|
|
|
try:
|
|
cur.execute('SELECT max(migration_id) FROM schema_version;')
|
|
migration_id = cur.fetchone()[0] or 0
|
|
|
|
except sqlite3.OperationalError:
|
|
# The table does not exist.
|
|
cur.execute('CREATE TABLE schema_version ('
|
|
'migration_id integer primary key, performed integer)')
|
|
migration_id = 0
|
|
|
|
if migration_id < 1:
|
|
cur.execute("""
|
|
CREATE TABLE recorder_runs (
|
|
run_id integer primary key,
|
|
start integer,
|
|
end integer,
|
|
closed_incorrect integer default 0,
|
|
created integer)
|
|
""")
|
|
|
|
cur.execute("""
|
|
CREATE TABLE events (
|
|
event_id integer primary key,
|
|
event_type text,
|
|
event_data text,
|
|
origin text,
|
|
created integer)
|
|
""")
|
|
cur.execute(
|
|
'CREATE INDEX events__event_type ON events(event_type)')
|
|
|
|
cur.execute("""
|
|
CREATE TABLE states (
|
|
state_id integer primary key,
|
|
entity_id text,
|
|
state text,
|
|
attributes text,
|
|
last_changed integer,
|
|
last_updated integer,
|
|
created integer)
|
|
""")
|
|
cur.execute('CREATE INDEX states__entity_id ON states(entity_id)')
|
|
|
|
save_migration(1)
|
|
|
|
if migration_id < 2:
|
|
cur.execute("""
|
|
ALTER TABLE events
|
|
ADD COLUMN time_fired integer
|
|
""")
|
|
|
|
cur.execute('UPDATE events SET time_fired=created')
|
|
|
|
save_migration(2)
|
|
|
|
if migration_id < 3:
|
|
utc_offset = self.utc_offset
|
|
|
|
cur.execute("""
|
|
ALTER TABLE recorder_runs
|
|
ADD COLUMN utc_offset integer
|
|
""")
|
|
|
|
cur.execute("""
|
|
ALTER TABLE events
|
|
ADD COLUMN utc_offset integer
|
|
""")
|
|
|
|
cur.execute("""
|
|
ALTER TABLE states
|
|
ADD COLUMN utc_offset integer
|
|
""")
|
|
|
|
cur.execute("UPDATE recorder_runs SET utc_offset=?", [utc_offset])
|
|
cur.execute("UPDATE events SET utc_offset=?", [utc_offset])
|
|
cur.execute("UPDATE states SET utc_offset=?", [utc_offset])
|
|
|
|
save_migration(3)
|
|
|
|
if migration_id < 4:
|
|
# We had a bug where we did not save utc offset for recorder runs.
|
|
cur.execute(
|
|
"""UPDATE recorder_runs SET utc_offset=?
|
|
WHERE utc_offset IS NULL""", [self.utc_offset])
|
|
|
|
cur.execute("""
|
|
ALTER TABLE states
|
|
ADD COLUMN event_id integer
|
|
""")
|
|
|
|
save_migration(4)
|
|
|
|
if migration_id < 5:
|
|
# Add domain so that thermostat graphs look right.
|
|
try:
|
|
cur.execute("""
|
|
ALTER TABLE states
|
|
ADD COLUMN domain text
|
|
""")
|
|
except sqlite3.OperationalError:
|
|
# We had a bug in this migration for a while on dev.
|
|
# Without this, dev-users will have to throw away their db.
|
|
pass
|
|
|
|
# TravisCI has Python compiled against an old version of SQLite3
|
|
# which misses the instr method.
|
|
self.conn.create_function(
|
|
"instr", 2,
|
|
lambda string, substring: string.find(substring) + 1)
|
|
|
|
# Populate domain with defaults.
|
|
cur.execute("""
|
|
UPDATE states
|
|
set domain=substr(entity_id, 0, instr(entity_id, '.'))
|
|
""")
|
|
|
|
# Add indexes we are going to use a lot on selects.
|
|
cur.execute("""
|
|
CREATE INDEX states__state_changes ON
|
|
states (last_changed, last_updated, entity_id)""")
|
|
cur.execute("""
|
|
CREATE INDEX states__significant_changes ON
|
|
states (domain, last_updated, entity_id)""")
|
|
save_migration(5)
|
|
|
|
def _close_connection(self):
|
|
"""Close connection to the database."""
|
|
_LOGGER.info("Closing database")
|
|
atexit.unregister(self._close_connection)
|
|
self.conn.close()
|
|
|
|
def _setup_run(self):
|
|
"""Log the start of the current run."""
|
|
if self.query("""UPDATE recorder_runs SET end=?, closed_incorrect=1
|
|
WHERE end IS NULL""", (self.recording_start, ),
|
|
return_value=RETURN_ROWCOUNT):
|
|
|
|
_LOGGER.warning("Found unfinished sessions")
|
|
|
|
self.query(
|
|
"""INSERT INTO recorder_runs (start, created, utc_offset)
|
|
VALUES (?, ?, ?)""",
|
|
(self.recording_start, dt_util.utcnow(), self.utc_offset))
|
|
|
|
def _close_run(self):
|
|
"""Save end time for current run."""
|
|
self.query(
|
|
"UPDATE recorder_runs SET end=? WHERE start=?",
|
|
(dt_util.utcnow(), self.recording_start))
|
|
|
|
|
|
def _adapt_datetime(datetimestamp):
|
|
"""Turn a datetime into an integer for in the DB."""
|
|
return dt_util.as_utc(datetimestamp.replace(microsecond=0)).timestamp()
|
|
|
|
|
|
def _verify_instance():
|
|
"""Throw error if recorder not initialized."""
|
|
if _INSTANCE is None:
|
|
raise RuntimeError("Recorder not initialized.")
|