core/tests/components/test_graphite.py

246 lines
9.5 KiB
Python

"""The tests for the Graphite component."""
import socket
import unittest
from unittest import mock
from unittest.mock import patch
from homeassistant.bootstrap import setup_component
import homeassistant.core as ha
import homeassistant.components.graphite as graphite
from homeassistant.const import (
EVENT_STATE_CHANGED, EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP,
STATE_ON, STATE_OFF)
from tests.common import get_test_home_assistant
class TestGraphite(unittest.TestCase):
"""Test the Graphite component."""
def setup_method(self, method):
"""Setup things to be run when tests are started."""
self.hass = get_test_home_assistant()
self.gf = graphite.GraphiteFeeder(self.hass, 'foo', 123, 'ha')
def teardown_method(self, method):
"""Stop everything that was started."""
self.hass.stop()
@patch('socket.socket')
def test_setup(self, mock_socket):
"""Test setup."""
assert setup_component(self.hass, graphite.DOMAIN, {'graphite': {}})
self.assertEqual(mock_socket.call_count, 1)
self.assertEqual(
mock_socket.call_args,
mock.call(socket.AF_INET, socket.SOCK_STREAM)
)
@patch('socket.socket')
@patch('homeassistant.components.graphite.GraphiteFeeder')
def test_full_config(self, mock_gf, mock_socket):
"""Test setup with full configuration."""
config = {
'graphite': {
'host': 'foo',
'port': 123,
'prefix': 'me',
}
}
self.assertTrue(setup_component(self.hass, graphite.DOMAIN, config))
self.assertEqual(mock_gf.call_count, 1)
self.assertEqual(
mock_gf.call_args, mock.call(self.hass, 'foo', 123, 'me')
)
self.assertEqual(mock_socket.call_count, 1)
self.assertEqual(
mock_socket.call_args,
mock.call(socket.AF_INET, socket.SOCK_STREAM)
)
@patch('socket.socket')
@patch('homeassistant.components.graphite.GraphiteFeeder')
def test_config_port(self, mock_gf, mock_socket):
"""Test setup with invalid port."""
config = {
'graphite': {
'host': 'foo',
'port': 2003,
}
}
self.assertTrue(setup_component(self.hass, graphite.DOMAIN, config))
self.assertTrue(mock_gf.called)
self.assertEqual(mock_socket.call_count, 1)
self.assertEqual(
mock_socket.call_args,
mock.call(socket.AF_INET, socket.SOCK_STREAM)
)
def test_subscribe(self):
"""Test the subscription."""
fake_hass = mock.MagicMock()
gf = graphite.GraphiteFeeder(fake_hass, 'foo', 123, 'ha')
fake_hass.bus.listen_once.has_calls([
mock.call(EVENT_HOMEASSISTANT_START, gf.start_listen),
mock.call(EVENT_HOMEASSISTANT_STOP, gf.shutdown),
])
self.assertEqual(fake_hass.bus.listen.call_count, 1)
self.assertEqual(
fake_hass.bus.listen.call_args,
mock.call(EVENT_STATE_CHANGED, gf.event_listener)
)
def test_start(self):
"""Test the start."""
with mock.patch.object(self.gf, 'start') as mock_start:
self.gf.start_listen('event')
self.assertEqual(mock_start.call_count, 1)
self.assertEqual(mock_start.call_args, mock.call())
def test_shutdown(self):
"""Test the shutdown."""
with mock.patch.object(self.gf, '_queue') as mock_queue:
self.gf.shutdown('event')
self.assertEqual(mock_queue.put.call_count, 1)
self.assertEqual(
mock_queue.put.call_args, mock.call(self.gf._quit_object)
)
def test_event_listener(self):
"""Test the event listener."""
with mock.patch.object(self.gf, '_queue') as mock_queue:
self.gf.event_listener('foo')
self.assertEqual(mock_queue.put.call_count, 1)
self.assertEqual(mock_queue.put.call_args, mock.call('foo'))
@patch('time.time')
def test_report_attributes(self, mock_time):
"""Test the reporting with attributes."""
mock_time.return_value = 12345
attrs = {'foo': 1,
'bar': 2.0,
'baz': True,
'bat': 'NaN',
}
expected = [
'ha.entity.state 0.000000 12345',
'ha.entity.foo 1.000000 12345',
'ha.entity.bar 2.000000 12345',
'ha.entity.baz 1.000000 12345',
]
state = mock.MagicMock(state=0, attributes=attrs)
with mock.patch.object(self.gf, '_send_to_graphite') as mock_send:
self.gf._report_attributes('entity', state)
actual = mock_send.call_args_list[0][0][0].split('\n')
self.assertEqual(sorted(expected), sorted(actual))
@patch('time.time')
def test_report_with_string_state(self, mock_time):
"""Test the reporting with strings."""
mock_time.return_value = 12345
expected = [
'ha.entity.foo 1.000000 12345',
'ha.entity.state 1.000000 12345',
]
state = mock.MagicMock(state='above_horizon', attributes={'foo': 1.0})
with mock.patch.object(self.gf, '_send_to_graphite') as mock_send:
self.gf._report_attributes('entity', state)
actual = mock_send.call_args_list[0][0][0].split('\n')
self.assertEqual(sorted(expected), sorted(actual))
@patch('time.time')
def test_report_with_binary_state(self, mock_time):
"""Test the reporting with binary state."""
mock_time.return_value = 12345
state = ha.State('domain.entity', STATE_ON, {'foo': 1.0})
with mock.patch.object(self.gf, '_send_to_graphite') as mock_send:
self.gf._report_attributes('entity', state)
expected = ['ha.entity.foo 1.000000 12345',
'ha.entity.state 1.000000 12345']
actual = mock_send.call_args_list[0][0][0].split('\n')
self.assertEqual(sorted(expected), sorted(actual))
state.state = STATE_OFF
with mock.patch.object(self.gf, '_send_to_graphite') as mock_send:
self.gf._report_attributes('entity', state)
expected = ['ha.entity.foo 1.000000 12345',
'ha.entity.state 0.000000 12345']
actual = mock_send.call_args_list[0][0][0].split('\n')
self.assertEqual(sorted(expected), sorted(actual))
@patch('time.time')
def test_send_to_graphite_errors(self, mock_time):
"""Test the sending with errors."""
mock_time.return_value = 12345
state = ha.State('domain.entity', STATE_ON, {'foo': 1.0})
with mock.patch.object(self.gf, '_send_to_graphite') as mock_send:
mock_send.side_effect = socket.error
self.gf._report_attributes('entity', state)
mock_send.side_effect = socket.gaierror
self.gf._report_attributes('entity', state)
@patch('socket.socket')
def test_send_to_graphite(self, mock_socket):
"""Test the sending of data."""
self.gf._send_to_graphite('foo')
self.assertEqual(mock_socket.call_count, 1)
self.assertEqual(
mock_socket.call_args,
mock.call(socket.AF_INET, socket.SOCK_STREAM)
)
sock = mock_socket.return_value
self.assertEqual(sock.connect.call_count, 1)
self.assertEqual(sock.connect.call_args, mock.call(('foo', 123)))
self.assertEqual(sock.sendall.call_count, 1)
self.assertEqual(
sock.sendall.call_args, mock.call('foo'.encode('ascii'))
)
self.assertEqual(sock.send.call_count, 1)
self.assertEqual(sock.send.call_args, mock.call('\n'.encode('ascii')))
self.assertEqual(sock.close.call_count, 1)
self.assertEqual(sock.close.call_args, mock.call())
def test_run_stops(self):
"""Test the stops."""
with mock.patch.object(self.gf, '_queue') as mock_queue:
mock_queue.get.return_value = self.gf._quit_object
self.assertEqual(None, self.gf.run())
self.assertEqual(mock_queue.get.call_count, 1)
self.assertEqual(mock_queue.get.call_args, mock.call())
self.assertEqual(mock_queue.task_done.call_count, 1)
self.assertEqual(mock_queue.task_done.call_args, mock.call())
def test_run(self):
"""Test the running."""
runs = []
event = mock.MagicMock(event_type=EVENT_STATE_CHANGED,
data={'entity_id': 'entity',
'new_state': mock.MagicMock()})
def fake_get():
if len(runs) >= 2:
return self.gf._quit_object
elif runs:
runs.append(1)
return mock.MagicMock(event_type='somethingelse',
data={'new_event': None})
else:
runs.append(1)
return event
with mock.patch.object(self.gf, '_queue') as mock_queue:
with mock.patch.object(self.gf, '_report_attributes') as mock_r:
mock_queue.get.side_effect = fake_get
self.gf.run()
# Twice for two events, once for the stop
self.assertEqual(3, mock_queue.task_done.call_count)
self.assertEqual(mock_r.call_count, 1)
self.assertEqual(
mock_r.call_args,
mock.call('entity', event.data['new_state'])
)