core/homeassistant/components/recorder.py

437 lines
14 KiB
Python
Raw Normal View History

"""
homeassistant.components.recorder
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Component that records all events and state changes.
Allows other components to query this database.
"""
2015-01-28 08:22:09 +00:00
import logging
import threading
import queue
import sqlite3
2015-03-29 21:43:16 +00:00
from datetime import datetime, date
2015-01-28 08:22:09 +00:00
import json
import atexit
2015-01-28 08:22:09 +00:00
from homeassistant import Event, EventOrigin, State
import homeassistant.util.dt as date_util
2015-01-28 08:22:09 +00:00
from homeassistant.remote import JSONEncoder
from homeassistant.const import (
MATCH_ALL, EVENT_TIME_CHANGED, EVENT_STATE_CHANGED,
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP)
DOMAIN = "recorder"
DEPENDENCIES = []
DB_FILE = 'home-assistant.db'
RETURN_ROWCOUNT = "rowcount"
RETURN_LASTROWID = "lastrowid"
RETURN_ONE_ROW = "one_row"
2015-01-28 08:22:09 +00:00
_INSTANCE = None
_LOGGER = logging.getLogger(__name__)
def query(sql_query, arguments=None):
""" Query the database. """
_verify_instance()
2015-01-28 08:22:09 +00:00
return _INSTANCE.query(sql_query, arguments)
2015-01-28 08:22:09 +00:00
def query_states(state_query, arguments=None):
""" Query the database and return a list of states. """
2015-02-07 20:39:10 +00:00
return [
row for row in
(row_to_state(row) for row in query(state_query, arguments))
2015-02-07 20:39:10 +00:00
if row is not None]
2015-01-28 08:22:09 +00:00
def query_events(event_query, arguments=None):
""" Query the database and return a list of states. """
2015-02-07 20:39:10 +00:00
return [
row for row in
(row_to_event(row) for row in query(event_query, arguments))
2015-02-07 20:39:10 +00:00
if row is not None]
2015-01-28 08:22:09 +00:00
def row_to_state(row):
""" Convert a databsae row to a state. """
try:
return State(
row[1], row[2], json.loads(row[3]),
date_util.utc_from_timestamp(row[4]),
date_util.utc_from_timestamp(row[5]))
except ValueError:
# When json.loads fails
_LOGGER.exception("Error converting row to state: %s", row)
return None
def row_to_event(row):
""" Convert a databse row to an event. """
try:
2015-03-29 21:39:47 +00:00
return Event(row[1], json.loads(row[2]), EventOrigin[row[3].lower()],
date_util.utc_from_timestamp(row[5]))
except ValueError:
2015-03-29 21:39:47 +00:00
# When json.loads fails
_LOGGER.exception("Error converting row to event: %s", row)
return None
2015-01-28 08:22:09 +00:00
2015-02-07 20:39:10 +00:00
def run_information(point_in_time=None):
2015-02-07 23:54:58 +00:00
""" Returns information about current run or the run that
covers point_in_time. """
_verify_instance()
2015-02-10 03:12:12 +00:00
if point_in_time is None or point_in_time > _INSTANCE.recording_start:
2015-02-07 20:39:10 +00:00
return RecorderRun()
2015-02-07 20:39:10 +00:00
run = _INSTANCE.query(
"SELECT * FROM recorder_runs WHERE start<? AND END>?",
2015-02-07 20:39:10 +00:00
(point_in_time, point_in_time), return_value=RETURN_ONE_ROW)
2015-02-07 20:39:10 +00:00
return RecorderRun(run) if run else None
2015-01-28 08:22:09 +00:00
def setup(hass, config):
""" Setup the recorder. """
# pylint: disable=global-statement
2015-01-28 08:22:09 +00:00
global _INSTANCE
_INSTANCE = Recorder(hass)
return True
2015-02-07 20:39:10 +00:00
class RecorderRun(object):
""" Represents a recorder run. """
def __init__(self, row=None):
2015-02-10 03:12:12 +00:00
self.end = None
2015-02-07 20:39:10 +00:00
if row is None:
self.start = _INSTANCE.recording_start
self.closed_incorrect = False
else:
self.start = date_util.utc_from_timestamp(row[1])
2015-02-10 03:12:12 +00:00
if row[2] is not None:
self.end = date_util.utc_from_timestamp(row[2])
2015-02-10 03:12:12 +00:00
2015-02-07 20:39:10 +00:00
self.closed_incorrect = bool(row[3])
def entity_ids(self, point_in_time=None):
"""
Return the entity ids that existed in this run.
Specify point_in_time if you want to know which existed at that point
in time inside the run.
"""
where = self.where_after_start_run
where_data = []
if point_in_time is not None or self.end is not None:
where += "AND created < ? "
where_data.append(point_in_time or self.end)
return [row[0] for row in query(
"SELECT entity_id FROM states WHERE {}"
"GROUP BY entity_id".format(where), where_data)]
@property
def where_after_start_run(self):
2015-02-07 23:54:58 +00:00
""" Returns SQL WHERE clause to select rows
created after the start of the run. """
2015-02-07 20:39:10 +00:00
return "created >= {} ".format(_adapt_datetime(self.start))
@property
def where_limit_to_run(self):
""" Return a SQL WHERE clause to limit results to this run. """
where = self.where_after_start_run
if self.end is not None:
where += "AND created < {} ".format(_adapt_datetime(self.end))
return where
2015-01-28 08:22:09 +00:00
class Recorder(threading.Thread):
"""
Threaded recorder
"""
def __init__(self, hass):
threading.Thread.__init__(self)
self.hass = hass
self.conn = None
self.queue = queue.Queue()
self.quit_object = object()
self.lock = threading.Lock()
self.recording_start = date_util.utcnow()
self.utc_offset = date_util.now().utcoffset().total_seconds()
2015-01-28 08:22:09 +00:00
def start_recording(event):
""" Start recording. """
self.start()
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_recording)
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self.shutdown)
hass.bus.listen(MATCH_ALL, self.event_listener)
def run(self):
""" Start processing events to save. """
self._setup_connection()
self._setup_run()
2015-01-28 08:22:09 +00:00
while True:
event = self.queue.get()
if event == self.quit_object:
self._close_run()
2015-01-28 08:22:09 +00:00
self._close_connection()
2015-04-30 06:21:31 +00:00
self.queue.task_done()
2015-01-28 08:22:09 +00:00
return
elif event.event_type == EVENT_TIME_CHANGED:
2015-04-30 06:21:31 +00:00
self.queue.task_done()
2015-01-28 08:22:09 +00:00
continue
event_id = self.record_event(event)
if event.event_type == EVENT_STATE_CHANGED:
2015-01-28 08:22:09 +00:00
self.record_state(
event.data['entity_id'], event.data.get('new_state'),
event_id)
2015-01-28 08:22:09 +00:00
2015-04-30 06:21:31 +00:00
self.queue.task_done()
2015-01-28 08:22:09 +00:00
def event_listener(self, event):
""" Listens for new events on the EventBus and puts them
in the process queue. """
self.queue.put(event)
def shutdown(self, event):
""" Tells the recorder to shut down. """
self.queue.put(self.quit_object)
def record_state(self, entity_id, state, event_id):
2015-01-28 08:22:09 +00:00
""" Save a state to the database. """
now = date_util.utcnow()
2015-01-28 08:22:09 +00:00
# State got deleted
2015-01-28 08:22:09 +00:00
if state is None:
state_state = ''
state_attr = '{}'
last_changed = last_updated = now
2015-01-28 08:22:09 +00:00
else:
state_state = state.state
state_attr = json.dumps(state.attributes)
last_changed = state.last_changed
last_updated = state.last_updated
info = (
entity_id, state_state, state_attr, last_changed, last_updated,
now, self.utc_offset, event_id)
2015-01-28 08:22:09 +00:00
self.query(
2015-02-02 02:00:30 +00:00
"INSERT INTO states ("
2015-01-28 08:22:09 +00:00
"entity_id, state, attributes, last_changed, last_updated,"
"created, utc_offset, event_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
info)
2015-01-28 08:22:09 +00:00
def record_event(self, event):
""" Save an event to the database. """
info = (
event.event_type, json.dumps(event.data, cls=JSONEncoder),
str(event.origin), date_util.utcnow(), event.time_fired,
self.utc_offset
2015-01-28 08:22:09 +00:00
)
return self.query(
2015-02-02 02:00:30 +00:00
"INSERT INTO events ("
"event_type, event_data, origin, created, time_fired, utc_offset"
") VALUES (?, ?, ?, ?, ?, ?)", info, RETURN_LASTROWID)
2015-01-28 08:22:09 +00:00
def query(self, sql_query, data=None, return_value=None):
2015-01-28 08:22:09 +00:00
""" Query the database. """
try:
with self.conn, self.lock:
_LOGGER.info("Running query %s", sql_query)
2015-01-28 08:22:09 +00:00
cur = self.conn.cursor()
if data is not None:
cur.execute(sql_query, data)
2015-01-28 08:22:09 +00:00
else:
cur.execute(sql_query)
if return_value == RETURN_ROWCOUNT:
return cur.rowcount
elif return_value == RETURN_LASTROWID:
return cur.lastrowid
elif return_value == RETURN_ONE_ROW:
return cur.fetchone()
else:
return cur.fetchall()
2015-01-28 08:22:09 +00:00
except sqlite3.IntegrityError:
_LOGGER.exception(
"Error querying the database using: %s", sql_query)
2015-01-28 08:22:09 +00:00
return []
2015-04-30 06:21:31 +00:00
def block_till_done(self):
""" Blocks till all events processed. """
self.queue.join()
2015-01-28 08:22:09 +00:00
def _setup_connection(self):
""" Ensure database is ready to fly. """
db_path = self.hass.config.path(DB_FILE)
2015-01-28 08:22:09 +00:00
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
# Make sure the database is closed whenever Python exits
# without the STOP event being fired.
atexit.register(self._close_connection)
2015-01-28 08:22:09 +00:00
# Have datetime objects be saved as integers
2015-03-29 21:43:16 +00:00
sqlite3.register_adapter(date, _adapt_datetime)
sqlite3.register_adapter(datetime, _adapt_datetime)
2015-01-28 08:22:09 +00:00
# Validate we are on the correct schema or that we have to migrate
cur = self.conn.cursor()
2015-01-28 08:22:09 +00:00
def save_migration(migration_id):
""" Save and commit a migration to the database. """
cur.execute('INSERT INTO schema_version VALUES (?, ?)',
(migration_id, date_util.utcnow()))
2015-01-28 08:22:09 +00:00
self.conn.commit()
_LOGGER.info("Database migrated to version %d", migration_id)
try:
cur.execute('SELECT max(migration_id) FROM schema_version;')
migration_id = cur.fetchone()[0] or 0
2015-01-28 08:22:09 +00:00
except sqlite3.OperationalError:
# The table does not exist
cur.execute('CREATE TABLE schema_version ('
'migration_id integer primary key, performed integer)')
2015-01-28 08:22:09 +00:00
migration_id = 0
if migration_id < 1:
2015-04-29 05:38:43 +00:00
self.query("""
CREATE TABLE recorder_runs (
run_id integer primary key,
start integer,
end integer,
closed_incorrect integer default 0,
created integer)
""")
2015-04-29 05:38:43 +00:00
self.query("""
CREATE TABLE events (
event_id integer primary key,
event_type text,
event_data text,
origin text,
created integer)
""")
2015-04-29 05:38:43 +00:00
self.query(
'CREATE INDEX events__event_type ON events(event_type)')
2015-04-29 05:38:43 +00:00
self.query("""
CREATE TABLE states (
state_id integer primary key,
entity_id text,
state text,
attributes text,
last_changed integer,
last_updated integer,
created integer)
""")
2015-04-29 05:38:43 +00:00
self.query('CREATE INDEX states__entity_id ON states(entity_id)')
2015-01-28 08:22:09 +00:00
save_migration(1)
2015-03-29 21:39:47 +00:00
if migration_id < 2:
2015-04-29 05:38:43 +00:00
self.query("""
2015-03-29 21:39:47 +00:00
ALTER TABLE events
ADD COLUMN time_fired integer
""")
2015-04-29 05:38:43 +00:00
self.query('UPDATE events SET time_fired=created')
2015-03-29 21:39:47 +00:00
save_migration(2)
if migration_id < 3:
utc_offset = self.utc_offset
2015-04-29 05:38:43 +00:00
self.query("""
ALTER TABLE recorder_runs
ADD COLUMN utc_offset integer
""")
2015-04-29 05:38:43 +00:00
self.query("""
ALTER TABLE events
ADD COLUMN utc_offset integer
""")
2015-04-29 05:38:43 +00:00
self.query("""
ALTER TABLE states
ADD COLUMN utc_offset integer
""")
2015-04-29 05:38:43 +00:00
self.query("UPDATE recorder_runs SET utc_offset=?", [utc_offset])
self.query("UPDATE events SET utc_offset=?", [utc_offset])
self.query("UPDATE states SET utc_offset=?", [utc_offset])
save_migration(3)
if migration_id < 4:
# We had a bug where we did not save utc offset for recorder runs
self.query(
"""UPDATE recorder_runs SET utc_offset=?
WHERE utc_offset IS NULL""", [self.utc_offset])
self.query("""
ALTER TABLE states
ADD COLUMN event_id integer
""")
save_migration(4)
2015-01-28 08:22:09 +00:00
def _close_connection(self):
""" Close connection to the database. """
2015-01-28 08:22:09 +00:00
_LOGGER.info("Closing database")
atexit.unregister(self._close_connection)
2015-01-28 08:22:09 +00:00
self.conn.close()
def _setup_run(self):
""" Log the start of the current run. """
if self.query("""UPDATE recorder_runs SET end=?, closed_incorrect=1
WHERE end IS NULL""", (self.recording_start, ),
return_value=RETURN_ROWCOUNT):
_LOGGER.warning("Found unfinished sessions")
self.query(
"""INSERT INTO recorder_runs (start, created, utc_offset)
VALUES (?, ?, ?)""",
(self.recording_start, date_util.utcnow(), self.utc_offset))
def _close_run(self):
""" Save end time for current run. """
self.query(
"UPDATE recorder_runs SET end=? WHERE start=?",
(date_util.utcnow(), self.recording_start))
def _adapt_datetime(datetimestamp):
""" Turn a datetime into an integer for in the DB. """
2015-04-29 05:38:43 +00:00
return date_util.as_utc(datetimestamp.replace(microsecond=0)).timestamp()
2015-01-28 08:22:09 +00:00
def _verify_instance():
""" throws error if recorder not initialized. """
if _INSTANCE is None:
raise RuntimeError("Recorder not initialized.")