diff --git a/homeassistant/components/graphite.py b/homeassistant/components/graphite.py new file mode 100644 index 00000000000..e6675df2f80 --- /dev/null +++ b/homeassistant/components/graphite.py @@ -0,0 +1,125 @@ +""" +homeassistant.components.graphite +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Component that records all events and state changes and feeds the data to +a graphite installation. + +Example configuration: + + graphite: + host: foobar + port: 2003 + prefix: ha + +All config elements are optional, and assumed to be on localhost at the +default port if not specified. Prefix is the metric prefix in graphite, +and defaults to 'ha'. +""" +import logging +import queue +import socket +import threading +import time + +from homeassistant.const import ( + EVENT_STATE_CHANGED, + EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP, + STATE_ON, STATE_OFF) + +DOMAIN = "graphite" +_LOGGER = logging.getLogger(__name__) + + +def setup(hass, config): + """ Setup graphite feeder. """ + graphite_config = config.get('graphite', {}) + host = graphite_config.get('host', 'localhost') + prefix = graphite_config.get('prefix', 'ha') + try: + port = int(graphite_config.get('port', 2003)) + except ValueError: + _LOGGER.error('Invalid port specified') + return False + + GraphiteFeeder(hass, host, port, prefix) + return True + + +class GraphiteFeeder(threading.Thread): + """ Feeds data to graphite. """ + def __init__(self, hass, host, port, prefix): + super(GraphiteFeeder, self).__init__(daemon=True) + self._hass = hass + self._host = host + self._port = port + # rstrip any trailing dots in case they think they + # need it + self._prefix = prefix.rstrip('.') + self._queue = queue.Queue() + self._quit_object = object() + + hass.bus.listen_once(EVENT_HOMEASSISTANT_START, + self.start_listen) + hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, + self.shutdown) + hass.bus.listen(EVENT_STATE_CHANGED, self.event_listener) + + def start_listen(self, event): + """ Start event-processing thread. """ + self.start() + + def shutdown(self, event): + """ Tell the thread that we are done. + + This does not block because there is nothing to + clean up (and no penalty for killing in-process + connections to graphite. + """ + self._queue.put(self._quit_object) + + def event_listener(self, event): + """ Queue an event for processing. """ + self._queue.put(event) + + def _send_to_graphite(self, data): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(10) + sock.connect((self._host, self._port)) + sock.sendall(data.encode('ascii')) + sock.send('\n'.encode('ascii')) + sock.close() + + def _report_attributes(self, entity_id, new_state): + now = time.time() + things = dict(new_state.attributes) + state = new_state.state + if state in (STATE_ON, STATE_OFF): + state = float(state == STATE_ON) + else: + state = None + if state is not None: + things['state'] = state + lines = ['%s.%s.%s %f %i' % (self._prefix, + entity_id, key.replace(' ', '_'), + value, now) + for key, value in things.items() + if isinstance(value, (float, int))] + if not lines: + return + _LOGGER.debug('Sending to graphite: %s', lines) + try: + self._send_to_graphite('\n'.join(lines)) + except socket.error: + _LOGGER.exception('Failed to send data to graphite') + + def run(self): + while True: + event = self._queue.get() + if event == self._quit_object: + self._queue.task_done() + return + elif (event.event_type == EVENT_STATE_CHANGED and + 'new_state' in event.data): + self._report_attributes(event.data['entity_id'], + event.data['new_state']) + self._queue.task_done() diff --git a/tests/components/test_graphite.py b/tests/components/test_graphite.py new file mode 100644 index 00000000000..2a3c8750d40 --- /dev/null +++ b/tests/components/test_graphite.py @@ -0,0 +1,171 @@ +""" +tests.components.test_graphite +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Tests graphite feeder. +""" +import socket +import unittest +from unittest import mock + +import homeassistant.core as ha +import homeassistant.components.graphite as graphite +from homeassistant.const import ( + EVENT_STATE_CHANGED, + EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP, + STATE_ON, STATE_OFF) + + +class TestGraphite(unittest.TestCase): + def setup_method(self, method): + self.hass = ha.HomeAssistant() + self.hass.config.latitude = 32.87336 + self.hass.config.longitude = 117.22743 + self.gf = graphite.GraphiteFeeder(self.hass, 'foo', 123, 'ha') + + def teardown_method(self, method): + """ Stop down stuff we started. """ + self.hass.stop() + + @mock.patch('homeassistant.components.graphite.GraphiteFeeder') + def test_minimal_config(self, mock_gf): + self.assertTrue(graphite.setup(self.hass, {})) + mock_gf.assert_called_once_with(self.hass, 'localhost', 2003, 'ha') + + @mock.patch('homeassistant.components.graphite.GraphiteFeeder') + def test_full_config(self, mock_gf): + config = { + 'graphite': { + 'host': 'foo', + 'port': 123, + 'prefix': 'me', + } + } + self.assertTrue(graphite.setup(self.hass, config)) + mock_gf.assert_called_once_with(self.hass, 'foo', 123, 'me') + + @mock.patch('homeassistant.components.graphite.GraphiteFeeder') + def test_config_bad_port(self, mock_gf): + config = { + 'graphite': { + 'host': 'foo', + 'port': 'wrong', + } + } + self.assertFalse(graphite.setup(self.hass, config)) + self.assertFalse(mock_gf.called) + + def test_subscribe(self): + fake_hass = mock.MagicMock() + gf = graphite.GraphiteFeeder(fake_hass, 'foo', 123, 'ha') + fake_hass.bus.listen_once.has_calls([ + mock.call(EVENT_HOMEASSISTANT_START, gf.start_listen), + mock.call(EVENT_HOMEASSISTANT_STOP, gf.shutdown), + ]) + fake_hass.bus.listen.assert_called_once_with( + EVENT_STATE_CHANGED, gf.event_listener) + + def test_start(self): + with mock.patch.object(self.gf, 'start') as mock_start: + self.gf.start_listen('event') + mock_start.assert_called_once_with() + + def test_shutdown(self): + with mock.patch.object(self.gf, '_queue') as mock_queue: + self.gf.shutdown('event') + mock_queue.put.assert_called_once_with(self.gf._quit_object) + + def test_event_listener(self): + with mock.patch.object(self.gf, '_queue') as mock_queue: + self.gf.event_listener('foo') + mock_queue.put.assert_called_once_with('foo') + + @mock.patch('time.time') + def test_report_attributes(self, mock_time): + mock_time.return_value = 12345 + attrs = {'foo': 1, + 'bar': 2.0, + 'baz': True, + 'bat': 'NaN', + } + expected = [ + 'ha.entity.foo 1.000000 12345', + 'ha.entity.bar 2.000000 12345', + 'ha.entity.baz 1.000000 12345', + ] + state = mock.MagicMock(state=0, attributes=attrs) + with mock.patch.object(self.gf, '_send_to_graphite') as mock_send: + self.gf._report_attributes('entity', state) + actual = mock_send.call_args_list[0][0][0].split('\n') + self.assertEqual(sorted(expected), sorted(actual)) + + @mock.patch('time.time') + def test_report_with_string_state(self, mock_time): + mock_time.return_value = 12345 + state = mock.MagicMock(state='above_horizon', attributes={'foo': 1.0}) + with mock.patch.object(self.gf, '_send_to_graphite') as mock_send: + self.gf._report_attributes('entity', state) + mock_send.assert_called_once_with('ha.entity.foo 1.000000 12345') + + @mock.patch('time.time') + def test_report_with_binary_state(self, mock_time): + mock_time.return_value = 12345 + state = mock.MagicMock(state=STATE_ON, attributes={'foo': 1.0}) + with mock.patch.object(self.gf, '_send_to_graphite') as mock_send: + self.gf._report_attributes('entity', state) + expected = ['ha.entity.foo 1.000000 12345', + 'ha.entity.state 1.000000 12345'] + actual = mock_send.call_args_list[0][0][0].split('\n') + self.assertEqual(sorted(expected), sorted(actual)) + + state.state = STATE_OFF + with mock.patch.object(self.gf, '_send_to_graphite') as mock_send: + self.gf._report_attributes('entity', state) + expected = ['ha.entity.foo 1.000000 12345', + 'ha.entity.state 0.000000 12345'] + actual = mock_send.call_args_list[0][0][0].split('\n') + self.assertEqual(sorted(expected), sorted(actual)) + + @mock.patch('socket.socket') + def test_send_to_graphite(self, mock_socket): + self.gf._send_to_graphite('foo') + mock_socket.assert_called_once_with(socket.AF_INET, + socket.SOCK_STREAM) + sock = mock_socket.return_value + sock.connect.assert_called_once_with(('foo', 123)) + sock.sendall.assert_called_once_with('foo'.encode('ascii')) + sock.send.assert_called_once_with('\n'.encode('ascii')) + sock.close.assert_called_once_with() + + def test_run_stops(self): + with mock.patch.object(self.gf, '_queue') as mock_queue: + mock_queue.get.return_value = self.gf._quit_object + self.assertEqual(None, self.gf.run()) + mock_queue.get.assert_called_once_with() + mock_queue.task_done.assert_called_once_with() + + def test_run(self): + runs = [] + event = mock.MagicMock(event_type=EVENT_STATE_CHANGED, + data={'entity_id': 'entity', + 'new_state': mock.MagicMock()}) + + def fake_get(): + if len(runs) >= 2: + return self.gf._quit_object + elif runs: + runs.append(1) + return mock.MagicMock(event_type='somethingelse') + else: + runs.append(1) + return event + + with mock.patch.object(self.gf, '_queue') as mock_queue: + with mock.patch.object(self.gf, '_report_attributes') as mock_r: + mock_queue.get.side_effect = fake_get + self.gf.run() + # Twice for two events, once for the stop + self.assertEqual(3, mock_queue.task_done.call_count) + mock_r.assert_called_once_with( + 'entity', + event.data['new_state'])