Add graphite feeder component

Like recorder, this component listens to all events and reports any
that it can to a graphite installation. This makes it easy to use
graphite for all your data collection and analysis. If you run
carbon-cache (the backend for graphite) on the local machine, no
configuration is required other than enabling the component.

For more info on graphite: http://graphite.wikidot.com/
pull/1194/head
Dan Smith 2016-02-10 18:54:06 +00:00
parent 70a528c04b
commit 7478c36b27
2 changed files with 296 additions and 0 deletions

View File

@ -0,0 +1,125 @@
"""
homeassistant.components.graphite
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Component that records all events and state changes and feeds the data to
a graphite installation.
Example configuration:
graphite:
host: foobar
port: 2003
prefix: ha
All config elements are optional, and assumed to be on localhost at the
default port if not specified. Prefix is the metric prefix in graphite,
and defaults to 'ha'.
"""
import logging
import queue
import socket
import threading
import time
from homeassistant.const import (
EVENT_STATE_CHANGED,
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP,
STATE_ON, STATE_OFF)
DOMAIN = "graphite"
_LOGGER = logging.getLogger(__name__)
def setup(hass, config):
""" Setup graphite feeder. """
graphite_config = config.get('graphite', {})
host = graphite_config.get('host', 'localhost')
prefix = graphite_config.get('prefix', 'ha')
try:
port = int(graphite_config.get('port', 2003))
except ValueError:
_LOGGER.error('Invalid port specified')
return False
GraphiteFeeder(hass, host, port, prefix)
return True
class GraphiteFeeder(threading.Thread):
""" Feeds data to graphite. """
def __init__(self, hass, host, port, prefix):
super(GraphiteFeeder, self).__init__(daemon=True)
self._hass = hass
self._host = host
self._port = port
# rstrip any trailing dots in case they think they
# need it
self._prefix = prefix.rstrip('.')
self._queue = queue.Queue()
self._quit_object = object()
hass.bus.listen_once(EVENT_HOMEASSISTANT_START,
self.start_listen)
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP,
self.shutdown)
hass.bus.listen(EVENT_STATE_CHANGED, self.event_listener)
def start_listen(self, event):
""" Start event-processing thread. """
self.start()
def shutdown(self, event):
""" Tell the thread that we are done.
This does not block because there is nothing to
clean up (and no penalty for killing in-process
connections to graphite.
"""
self._queue.put(self._quit_object)
def event_listener(self, event):
""" Queue an event for processing. """
self._queue.put(event)
def _send_to_graphite(self, data):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
sock.connect((self._host, self._port))
sock.sendall(data.encode('ascii'))
sock.send('\n'.encode('ascii'))
sock.close()
def _report_attributes(self, entity_id, new_state):
now = time.time()
things = dict(new_state.attributes)
state = new_state.state
if state in (STATE_ON, STATE_OFF):
state = float(state == STATE_ON)
else:
state = None
if state is not None:
things['state'] = state
lines = ['%s.%s.%s %f %i' % (self._prefix,
entity_id, key.replace(' ', '_'),
value, now)
for key, value in things.items()
if isinstance(value, (float, int))]
if not lines:
return
_LOGGER.debug('Sending to graphite: %s', lines)
try:
self._send_to_graphite('\n'.join(lines))
except socket.error:
_LOGGER.exception('Failed to send data to graphite')
def run(self):
while True:
event = self._queue.get()
if event == self._quit_object:
self._queue.task_done()
return
elif (event.event_type == EVENT_STATE_CHANGED and
'new_state' in event.data):
self._report_attributes(event.data['entity_id'],
event.data['new_state'])
self._queue.task_done()

View File

@ -0,0 +1,171 @@
"""
tests.components.test_graphite
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests graphite feeder.
"""
import socket
import unittest
from unittest import mock
import homeassistant.core as ha
import homeassistant.components.graphite as graphite
from homeassistant.const import (
EVENT_STATE_CHANGED,
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP,
STATE_ON, STATE_OFF)
class TestGraphite(unittest.TestCase):
def setup_method(self, method):
self.hass = ha.HomeAssistant()
self.hass.config.latitude = 32.87336
self.hass.config.longitude = 117.22743
self.gf = graphite.GraphiteFeeder(self.hass, 'foo', 123, 'ha')
def teardown_method(self, method):
""" Stop down stuff we started. """
self.hass.stop()
@mock.patch('homeassistant.components.graphite.GraphiteFeeder')
def test_minimal_config(self, mock_gf):
self.assertTrue(graphite.setup(self.hass, {}))
mock_gf.assert_called_once_with(self.hass, 'localhost', 2003, 'ha')
@mock.patch('homeassistant.components.graphite.GraphiteFeeder')
def test_full_config(self, mock_gf):
config = {
'graphite': {
'host': 'foo',
'port': 123,
'prefix': 'me',
}
}
self.assertTrue(graphite.setup(self.hass, config))
mock_gf.assert_called_once_with(self.hass, 'foo', 123, 'me')
@mock.patch('homeassistant.components.graphite.GraphiteFeeder')
def test_config_bad_port(self, mock_gf):
config = {
'graphite': {
'host': 'foo',
'port': 'wrong',
}
}
self.assertFalse(graphite.setup(self.hass, config))
self.assertFalse(mock_gf.called)
def test_subscribe(self):
fake_hass = mock.MagicMock()
gf = graphite.GraphiteFeeder(fake_hass, 'foo', 123, 'ha')
fake_hass.bus.listen_once.has_calls([
mock.call(EVENT_HOMEASSISTANT_START, gf.start_listen),
mock.call(EVENT_HOMEASSISTANT_STOP, gf.shutdown),
])
fake_hass.bus.listen.assert_called_once_with(
EVENT_STATE_CHANGED, gf.event_listener)
def test_start(self):
with mock.patch.object(self.gf, 'start') as mock_start:
self.gf.start_listen('event')
mock_start.assert_called_once_with()
def test_shutdown(self):
with mock.patch.object(self.gf, '_queue') as mock_queue:
self.gf.shutdown('event')
mock_queue.put.assert_called_once_with(self.gf._quit_object)
def test_event_listener(self):
with mock.patch.object(self.gf, '_queue') as mock_queue:
self.gf.event_listener('foo')
mock_queue.put.assert_called_once_with('foo')
@mock.patch('time.time')
def test_report_attributes(self, mock_time):
mock_time.return_value = 12345
attrs = {'foo': 1,
'bar': 2.0,
'baz': True,
'bat': 'NaN',
}
expected = [
'ha.entity.foo 1.000000 12345',
'ha.entity.bar 2.000000 12345',
'ha.entity.baz 1.000000 12345',
]
state = mock.MagicMock(state=0, attributes=attrs)
with mock.patch.object(self.gf, '_send_to_graphite') as mock_send:
self.gf._report_attributes('entity', state)
actual = mock_send.call_args_list[0][0][0].split('\n')
self.assertEqual(sorted(expected), sorted(actual))
@mock.patch('time.time')
def test_report_with_string_state(self, mock_time):
mock_time.return_value = 12345
state = mock.MagicMock(state='above_horizon', attributes={'foo': 1.0})
with mock.patch.object(self.gf, '_send_to_graphite') as mock_send:
self.gf._report_attributes('entity', state)
mock_send.assert_called_once_with('ha.entity.foo 1.000000 12345')
@mock.patch('time.time')
def test_report_with_binary_state(self, mock_time):
mock_time.return_value = 12345
state = mock.MagicMock(state=STATE_ON, attributes={'foo': 1.0})
with mock.patch.object(self.gf, '_send_to_graphite') as mock_send:
self.gf._report_attributes('entity', state)
expected = ['ha.entity.foo 1.000000 12345',
'ha.entity.state 1.000000 12345']
actual = mock_send.call_args_list[0][0][0].split('\n')
self.assertEqual(sorted(expected), sorted(actual))
state.state = STATE_OFF
with mock.patch.object(self.gf, '_send_to_graphite') as mock_send:
self.gf._report_attributes('entity', state)
expected = ['ha.entity.foo 1.000000 12345',
'ha.entity.state 0.000000 12345']
actual = mock_send.call_args_list[0][0][0].split('\n')
self.assertEqual(sorted(expected), sorted(actual))
@mock.patch('socket.socket')
def test_send_to_graphite(self, mock_socket):
self.gf._send_to_graphite('foo')
mock_socket.assert_called_once_with(socket.AF_INET,
socket.SOCK_STREAM)
sock = mock_socket.return_value
sock.connect.assert_called_once_with(('foo', 123))
sock.sendall.assert_called_once_with('foo'.encode('ascii'))
sock.send.assert_called_once_with('\n'.encode('ascii'))
sock.close.assert_called_once_with()
def test_run_stops(self):
with mock.patch.object(self.gf, '_queue') as mock_queue:
mock_queue.get.return_value = self.gf._quit_object
self.assertEqual(None, self.gf.run())
mock_queue.get.assert_called_once_with()
mock_queue.task_done.assert_called_once_with()
def test_run(self):
runs = []
event = mock.MagicMock(event_type=EVENT_STATE_CHANGED,
data={'entity_id': 'entity',
'new_state': mock.MagicMock()})
def fake_get():
if len(runs) >= 2:
return self.gf._quit_object
elif runs:
runs.append(1)
return mock.MagicMock(event_type='somethingelse')
else:
runs.append(1)
return event
with mock.patch.object(self.gf, '_queue') as mock_queue:
with mock.patch.object(self.gf, '_report_attributes') as mock_r:
mock_queue.get.side_effect = fake_get
self.gf.run()
# Twice for two events, once for the stop
self.assertEqual(3, mock_queue.task_done.call_count)
mock_r.assert_called_once_with(
'entity',
event.data['new_state'])