core/tests/components/influxdb/test_init.py

1114 lines
34 KiB
Python
Raw Normal View History

2016-03-09 09:25:50 +00:00
"""The tests for the InfluxDB component."""
import datetime
import pytest
import homeassistant.components.influxdb as influxdb
from homeassistant.const import (
EVENT_STATE_CHANGED,
STATE_OFF,
STATE_ON,
STATE_STANDBY,
UNIT_PERCENTAGE,
)
from homeassistant.setup import async_setup_component
from tests.async_mock import MagicMock, Mock, call, patch
BASE_V1_CONFIG = {}
BASE_V2_CONFIG = {
"api_version": influxdb.API_VERSION_2,
"organization": "org",
"token": "token",
}
@pytest.fixture(autouse=True)
def mock_batch_timeout(hass, monkeypatch):
"""Mock the event bus listener and the batch timeout for tests."""
hass.bus.listen = MagicMock()
monkeypatch.setattr(
"homeassistant.components.influxdb.InfluxThread.batch_timeout",
Mock(return_value=0),
)
@pytest.fixture(name="mock_client")
def mock_client_fixture(request):
"""Patch the InfluxDBClient object with mock for version under test."""
if request.param == influxdb.API_VERSION_2:
client_target = "homeassistant.components.influxdb.InfluxDBClientV2"
else:
client_target = "homeassistant.components.influxdb.InfluxDBClient"
with patch(client_target) as client:
yield client
@pytest.fixture(name="get_mock_call")
def get_mock_call_fixture(request):
"""Get version specific lambda to make write API call mock."""
if request.param == influxdb.API_VERSION_2:
return lambda body: call(bucket=influxdb.DEFAULT_BUCKET, record=body)
# pylint: disable=unnecessary-lambda
return lambda body: call(body)
def _get_write_api_mock_v1(mock_influx_client):
"""Return the write api mock for the V1 client."""
return mock_influx_client.return_value.write_points
def _get_write_api_mock_v2(mock_influx_client):
"""Return the write api mock for the V2 client."""
return mock_influx_client.return_value.write_api.return_value.write
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api",
[
(
influxdb.DEFAULT_API_VERSION,
{
"api_version": influxdb.DEFAULT_API_VERSION,
2019-07-31 19:25:30 +00:00
"username": "user",
"password": "password",
"verify_ssl": "False",
},
_get_write_api_mock_v1,
),
(
influxdb.API_VERSION_2,
{
"api_version": influxdb.API_VERSION_2,
"token": "token",
"organization": "organization",
"bucket": "bucket",
},
_get_write_api_mock_v2,
),
],
indirect=["mock_client"],
)
async def test_setup_config_full(hass, mock_client, config_ext, get_write_api):
"""Test the setup with full configuration."""
config = {
"influxdb": {
"host": "host",
"port": 123,
"database": "db",
"max_retries": 4,
"ssl": "False",
}
}
config["influxdb"].update(config_ext)
assert await async_setup_component(hass, influxdb.DOMAIN, config)
await hass.async_block_till_done()
assert hass.bus.listen.called
assert EVENT_STATE_CHANGED == hass.bus.listen.call_args_list[0][0][0]
assert get_write_api(mock_client).call_count == 1
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api",
[
(influxdb.DEFAULT_API_VERSION, BASE_V1_CONFIG, _get_write_api_mock_v1),
(influxdb.API_VERSION_2, BASE_V2_CONFIG, _get_write_api_mock_v2),
],
indirect=["mock_client"],
)
async def test_setup_minimal_config(hass, mock_client, config_ext, get_write_api):
"""Test the setup with minimal configuration and defaults."""
config = {"influxdb": {}}
config["influxdb"].update(config_ext)
assert await async_setup_component(hass, influxdb.DOMAIN, config)
await hass.async_block_till_done()
assert hass.bus.listen.called
assert EVENT_STATE_CHANGED == hass.bus.listen.call_args_list[0][0][0]
assert get_write_api(mock_client).call_count == 1
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api",
[
(influxdb.DEFAULT_API_VERSION, {"username": "user"}, _get_write_api_mock_v1),
(influxdb.DEFAULT_API_VERSION, {"token": "token"}, _get_write_api_mock_v1),
(
influxdb.API_VERSION_2,
{"api_version": influxdb.API_VERSION_2, "organization": "organization"},
_get_write_api_mock_v2,
),
(
influxdb.API_VERSION_2,
{
"api_version": influxdb.API_VERSION_2,
"token": "token",
"organization": "organization",
2019-07-31 19:25:30 +00:00
"username": "user",
"password": "pass",
},
_get_write_api_mock_v2,
),
],
indirect=["mock_client"],
)
async def test_invalid_config(hass, mock_client, config_ext, get_write_api):
"""Test the setup with invalid config or config options specified for wrong version."""
config = {"influxdb": {}}
config["influxdb"].update(config_ext)
assert not await async_setup_component(hass, influxdb.DOMAIN, config)
async def _setup(hass, mock_influx_client, config_ext, get_write_api):
"""Prepare client for next test and return event handler method."""
config = {
"influxdb": {
"host": "host",
"exclude": {"entities": ["fake.blacklisted"], "domains": ["another_fake"]},
}
}
config["influxdb"].update(config_ext)
assert await async_setup_component(hass, influxdb.DOMAIN, config)
await hass.async_block_till_done()
# A call is made to the write API during setup to test the connection.
# Therefore we reset the write API mock here before the test begins.
get_write_api(mock_influx_client).reset_mock()
return hass.bus.listen.call_args_list[0][0][1]
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener."""
handler_method = await _setup(hass, mock_client, config_ext, get_write_api)
# map of HA State to valid influxdb [state, value] fields
valid = {
"1": [None, 1],
"1.0": [None, 1.0],
STATE_ON: [STATE_ON, 1],
STATE_OFF: [STATE_OFF, 0],
STATE_STANDBY: [STATE_STANDBY, None],
"foo": ["foo", None],
}
for in_, out in valid.items():
attrs = {
"unit_of_measurement": "foobars",
"longitude": "1.1",
"latitude": "2.2",
"battery_level": f"99{UNIT_PERCENTAGE}",
"temperature": "20c",
"last_seen": "Last seen 23 minutes ago",
"updated_at": datetime.datetime(2017, 1, 1, 0, 0),
"multi_periods": "0.120.240.2023873",
}
state = MagicMock(
state=in_,
domain="fake",
entity_id="fake.entity-id",
object_id="entity",
attributes=attrs,
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
body = [
{
"measurement": "foobars",
"tags": {"domain": "fake", "entity_id": "entity"},
"time": 12345,
"fields": {
"longitude": 1.1,
"latitude": 2.2,
"battery_level_str": f"99{UNIT_PERCENTAGE}",
"battery_level": 99.0,
"temperature_str": "20c",
"temperature": 20.0,
"last_seen_str": "Last seen 23 minutes ago",
"last_seen": 23.0,
"updated_at_str": "2017-01-01 00:00:00",
"updated_at": 20170101000000,
"multi_periods_str": "0.120.240.2023873",
},
}
]
if out[0] is not None:
body[0]["fields"]["state"] = out[0]
if out[1] is not None:
body[0]["fields"]["value"] = out[1]
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
assert write_api.call_args == get_mock_call(body)
write_api.reset_mock()
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener_no_units(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener for missing units."""
handler_method = await _setup(hass, mock_client, config_ext, get_write_api)
for unit in (None, ""):
if unit:
attrs = {"unit_of_measurement": unit}
else:
attrs = {}
state = MagicMock(
state=1,
2019-07-31 19:25:30 +00:00
domain="fake",
entity_id="fake.entity-id",
object_id="entity",
attributes=attrs,
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
2019-07-31 19:25:30 +00:00
body = [
{
"measurement": "fake.entity-id",
"tags": {"domain": "fake", "entity_id": "entity"},
"time": 12345,
"fields": {"value": 1},
2019-07-31 19:25:30 +00:00
}
]
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
assert write_api.call_args == get_mock_call(body)
write_api.reset_mock()
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener_inf(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener with large or invalid numbers."""
handler_method = await _setup(hass, mock_client, config_ext, get_write_api)
attrs = {"bignumstring": "9" * 999, "nonumstring": "nan"}
state = MagicMock(
state=8,
domain="fake",
entity_id="fake.entity-id",
object_id="entity",
attributes=attrs,
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
body = [
{
"measurement": "fake.entity-id",
"tags": {"domain": "fake", "entity_id": "entity"},
"time": 12345,
"fields": {"value": 8},
}
]
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
assert write_api.call_args == get_mock_call(body)
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener_states(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener against ignored states."""
handler_method = await _setup(hass, mock_client, config_ext, get_write_api)
for state_state in (1, "unknown", "", "unavailable"):
state = MagicMock(
state=state_state,
2019-07-31 19:25:30 +00:00
domain="fake",
entity_id="fake.entity-id",
object_id="entity",
attributes={},
2019-07-31 19:25:30 +00:00
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
2019-07-31 19:25:30 +00:00
body = [
{
"measurement": "fake.entity-id",
2019-07-31 19:25:30 +00:00
"tags": {"domain": "fake", "entity_id": "entity"},
"time": 12345,
"fields": {"value": 1},
2019-07-31 19:25:30 +00:00
}
]
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
InfluxDB component improvements (#8633) * Allow reporting some state attributes as tags to InfluxDB Some state attributes should really be tags in InfluxDB. E.g. it is helpful to be able to group by friendly_name, or add a custom attribute like "location" and group by that. Graphs in Grafana are much easier to read when friendly names are used, and not node ids. This commit adds an optional setting to InfluxDB config: 'tags_attributes'. Any attribute on this list will be reported as tag and not as field to InfluxDB. * Allow overriding InfluxDB measurement for each reported item separately Bundling all items with the same "unit of measurement" together does not always makes sense. For example, both "relatively humidity" and "battery level" are reported as "%", but I'd rather see them as separate measurements in InfluxDB. This commit allows for 'influxdb_measurement' attribute. When set on node, it will take precedence over the global 'override_measurement' and component-specific 'unit_of_measurement'. * Minor updates to InfluxDB component improvements, as suggested by @MartinHjelmare. * Moved per-component config from 'customize' into 'influxdb' configuration section. The following three sub-sections were added: 'component_config', 'component_config_domain' and 'component_config_glob'. The sole supported per-component attribute at this point is 'override_measurement'. * Lint * Fixed mocked entity_ids in InfluxDB tests to be in domain.entity_id format, to satisfy EntityValues requirements. * Added tests for new InfluxDB configuration parameters * Fixes to some docstrings
2017-08-03 14:26:01 +00:00
write_api = get_write_api(mock_client)
if state_state == 1:
assert write_api.call_count == 1
assert write_api.call_args == get_mock_call(body)
else:
assert not write_api.called
write_api.reset_mock()
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener_blacklist(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener against a blacklist."""
handler_method = await _setup(hass, mock_client, config_ext, get_write_api)
for entity_id in ("ok", "blacklisted"):
state = MagicMock(
2019-07-31 19:25:30 +00:00
state=1,
domain="fake",
entity_id=f"fake.{entity_id}",
object_id=entity_id,
attributes={},
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
body = [
{
"measurement": f"fake.{entity_id}",
"tags": {"domain": "fake", "entity_id": entity_id},
"time": 12345,
"fields": {"value": 1},
}
]
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
write_api = get_write_api(mock_client)
if entity_id == "ok":
assert write_api.call_count == 1
assert write_api.call_args == get_mock_call(body)
else:
assert not write_api.called
write_api.reset_mock()
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener_blacklist_domain(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener against a domain blacklist."""
handler_method = await _setup(hass, mock_client, config_ext, get_write_api)
for domain in ("ok", "another_fake"):
state = MagicMock(
state=1,
domain=domain,
entity_id=f"{domain}.something",
2019-07-31 19:25:30 +00:00
object_id="something",
attributes={},
2019-07-31 19:25:30 +00:00
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
2019-07-31 19:25:30 +00:00
body = [
{
"measurement": f"{domain}.something",
"tags": {"domain": domain, "entity_id": "something"},
2019-07-31 19:25:30 +00:00
"time": 12345,
"fields": {"value": 1},
2019-07-31 19:25:30 +00:00
}
]
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
write_api = get_write_api(mock_client)
if domain == "ok":
assert write_api.call_count == 1
assert write_api.call_args == get_mock_call(body)
else:
assert not write_api.called
write_api.reset_mock()
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener_whitelist(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener against a whitelist."""
config = {"include": {"entities": ["fake.included"]}}
config.update(config_ext)
handler_method = await _setup(hass, mock_client, config, get_write_api)
for entity_id in ("included", "default"):
state = MagicMock(
state=1,
domain="fake",
entity_id=f"fake.{entity_id}",
object_id=entity_id,
attributes={},
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
body = [
{
"measurement": f"fake.{entity_id}",
"tags": {"domain": "fake", "entity_id": entity_id},
"time": 12345,
"fields": {"value": 1},
InfluxDB component improvements (#8633) * Allow reporting some state attributes as tags to InfluxDB Some state attributes should really be tags in InfluxDB. E.g. it is helpful to be able to group by friendly_name, or add a custom attribute like "location" and group by that. Graphs in Grafana are much easier to read when friendly names are used, and not node ids. This commit adds an optional setting to InfluxDB config: 'tags_attributes'. Any attribute on this list will be reported as tag and not as field to InfluxDB. * Allow overriding InfluxDB measurement for each reported item separately Bundling all items with the same "unit of measurement" together does not always makes sense. For example, both "relatively humidity" and "battery level" are reported as "%", but I'd rather see them as separate measurements in InfluxDB. This commit allows for 'influxdb_measurement' attribute. When set on node, it will take precedence over the global 'override_measurement' and component-specific 'unit_of_measurement'. * Minor updates to InfluxDB component improvements, as suggested by @MartinHjelmare. * Moved per-component config from 'customize' into 'influxdb' configuration section. The following three sub-sections were added: 'component_config', 'component_config_domain' and 'component_config_glob'. The sole supported per-component attribute at this point is 'override_measurement'. * Lint * Fixed mocked entity_ids in InfluxDB tests to be in domain.entity_id format, to satisfy EntityValues requirements. * Added tests for new InfluxDB configuration parameters * Fixes to some docstrings
2017-08-03 14:26:01 +00:00
}
]
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
write_api = get_write_api(mock_client)
if entity_id == "included":
assert write_api.call_count == 1
assert write_api.call_args == get_mock_call(body)
else:
assert not write_api.called
write_api.reset_mock()
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener_whitelist_domain(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener against a domain whitelist."""
config = {"include": {"domains": ["fake"]}}
config.update(config_ext)
handler_method = await _setup(hass, mock_client, config, get_write_api)
for domain in ("fake", "another_fake"):
state = MagicMock(
state=1,
domain=domain,
entity_id=f"{domain}.something",
object_id="something",
attributes={},
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
body = [
{
"measurement": f"{domain}.something",
"tags": {"domain": domain, "entity_id": "something"},
"time": 12345,
"fields": {"value": 1},
}
]
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
write_api = get_write_api(mock_client)
if domain == "fake":
assert write_api.call_count == 1
assert write_api.call_args == get_mock_call(body)
else:
assert not write_api.called
write_api.reset_mock()
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener_whitelist_domain_and_entities(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener against a domain and entity whitelist."""
config = {"include": {"domains": ["fake"], "entities": ["other.one"]}}
config.update(config_ext)
handler_method = await _setup(hass, mock_client, config, get_write_api)
for domain in ("fake", "another_fake"):
state = MagicMock(
2019-07-31 19:25:30 +00:00
state=1,
domain=domain,
entity_id=f"{domain}.something",
object_id="something",
2019-07-31 19:25:30 +00:00
attributes={},
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
body = [
{
"measurement": f"{domain}.something",
"tags": {"domain": domain, "entity_id": "something"},
"time": 12345,
"fields": {"value": 1},
}
]
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
write_api = get_write_api(mock_client)
if domain == "fake":
assert write_api.call_count == 1
assert write_api.call_args == get_mock_call(body)
else:
assert not write_api.called
write_api.reset_mock()
for entity_id in ("one", "two"):
state = MagicMock(
2019-07-31 19:25:30 +00:00
state=1,
domain="other",
entity_id=f"other.{entity_id}",
object_id=entity_id,
attributes={},
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
body = [
{
"measurement": f"other.{entity_id}",
"tags": {"domain": "other", "entity_id": entity_id},
"time": 12345,
"fields": {"value": 1},
}
]
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
write_api = get_write_api(mock_client)
if entity_id == "one":
assert write_api.call_count == 1
assert write_api.call_args == get_mock_call(body)
else:
assert not write_api.called
write_api.reset_mock()
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener_invalid_type(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener when an attribute has an invalid type."""
handler_method = await _setup(hass, mock_client, config_ext, get_write_api)
# map of HA State to valid influxdb [state, value] fields
valid = {
"1": [None, 1],
"1.0": [None, 1.0],
STATE_ON: [STATE_ON, 1],
STATE_OFF: [STATE_OFF, 0],
STATE_STANDBY: [STATE_STANDBY, None],
"foo": ["foo", None],
}
for in_, out in valid.items():
attrs = {
"unit_of_measurement": "foobars",
"longitude": "1.1",
"latitude": "2.2",
"invalid_attribute": ["value1", "value2"],
}
state = MagicMock(
state=in_,
2019-07-31 19:25:30 +00:00
domain="fake",
entity_id="fake.entity-id",
2019-07-31 19:25:30 +00:00
object_id="entity",
attributes=attrs,
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
body = [
{
"measurement": "foobars",
"tags": {"domain": "fake", "entity_id": "entity"},
"time": 12345,
"fields": {
"longitude": 1.1,
"latitude": 2.2,
"invalid_attribute_str": "['value1', 'value2']",
},
}
]
if out[0] is not None:
body[0]["fields"]["state"] = out[0]
if out[1] is not None:
body[0]["fields"]["value"] = out[1]
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
assert write_api.call_args == get_mock_call(body)
write_api.reset_mock()
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener_default_measurement(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener with a default measurement."""
config = {"default_measurement": "state"}
config.update(config_ext)
handler_method = await _setup(hass, mock_client, config, get_write_api)
state = MagicMock(
state=1, domain="fake", entity_id="fake.ok", object_id="ok", attributes={},
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
body = [
{
"measurement": "state",
"tags": {"domain": "fake", "entity_id": "ok"},
"time": 12345,
"fields": {"value": 1},
}
]
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
assert write_api.call_args == get_mock_call(body)
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener_unit_of_measurement_field(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener for unit of measurement field."""
config = {"override_measurement": "state"}
config.update(config_ext)
handler_method = await _setup(hass, mock_client, config, get_write_api)
attrs = {"unit_of_measurement": "foobars"}
state = MagicMock(
state="foo",
domain="fake",
entity_id="fake.entity-id",
object_id="entity",
attributes=attrs,
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
body = [
{
"measurement": "state",
"tags": {"domain": "fake", "entity_id": "entity"},
"time": 12345,
"fields": {"state": "foo", "unit_of_measurement_str": "foobars"},
}
]
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
assert write_api.call_args == get_mock_call(body)
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener_tags_attributes(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener when some attributes should be tags."""
config = {"tags_attributes": ["friendly_fake"]}
config.update(config_ext)
handler_method = await _setup(hass, mock_client, config, get_write_api)
attrs = {"friendly_fake": "tag_str", "field_fake": "field_str"}
state = MagicMock(
state=1,
domain="fake",
entity_id="fake.something",
object_id="something",
attributes=attrs,
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
body = [
{
"measurement": "fake.something",
"tags": {
"domain": "fake",
"entity_id": "something",
"friendly_fake": "tag_str",
},
"time": 12345,
"fields": {"value": 1, "field_fake_str": "field_str"},
}
]
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
assert write_api.call_args == get_mock_call(body)
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener_component_override_measurement(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener with overridden measurements."""
config = {
"component_config": {
"sensor.fake_humidity": {"override_measurement": "humidity"}
},
"component_config_glob": {
"binary_sensor.*motion": {"override_measurement": "motion"}
},
"component_config_domain": {"climate": {"override_measurement": "hvac"}},
}
config.update(config_ext)
handler_method = await _setup(hass, mock_client, config, get_write_api)
test_components = [
{"domain": "sensor", "id": "fake_humidity", "res": "humidity"},
{"domain": "binary_sensor", "id": "fake_motion", "res": "motion"},
{"domain": "climate", "id": "fake_thermostat", "res": "hvac"},
{"domain": "other", "id": "just_fake", "res": "other.just_fake"},
]
for comp in test_components:
state = MagicMock(
state=1,
domain=comp["domain"],
entity_id=f"{comp['domain']}.{comp['id']}",
object_id=comp["id"],
2019-07-31 19:25:30 +00:00
attributes={},
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
body = [
{
"measurement": comp["res"],
"tags": {"domain": comp["domain"], "entity_id": comp["id"]},
"time": 12345,
"fields": {"value": 1},
}
]
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
assert write_api.call_args == get_mock_call(body)
write_api.reset_mock()
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener_scheduled_write(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener retries after a write failure."""
config = {"max_retries": 1}
config.update(config_ext)
handler_method = await _setup(hass, mock_client, config, get_write_api)
state = MagicMock(
state=1,
domain="fake",
entity_id="entity.id",
object_id="entity",
attributes={},
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
write_api = get_write_api(mock_client)
write_api.side_effect = IOError("foo")
# Write fails
with patch.object(influxdb.time, "sleep") as mock_sleep:
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
assert mock_sleep.called
assert write_api.call_count == 2
# Write works again
write_api.side_effect = None
with patch.object(influxdb.time, "sleep") as mock_sleep:
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
assert not mock_sleep.called
assert write_api.call_count == 3
@pytest.mark.parametrize(
"mock_client, config_ext, get_write_api, get_mock_call",
[
(
influxdb.DEFAULT_API_VERSION,
BASE_V1_CONFIG,
_get_write_api_mock_v1,
influxdb.DEFAULT_API_VERSION,
),
(
influxdb.API_VERSION_2,
BASE_V2_CONFIG,
_get_write_api_mock_v2,
influxdb.API_VERSION_2,
),
],
indirect=["mock_client", "get_mock_call"],
)
async def test_event_listener_backlog_full(
hass, mock_client, config_ext, get_write_api, get_mock_call
):
"""Test the event listener drops old events when backlog gets full."""
handler_method = await _setup(hass, mock_client, config_ext, get_write_api)
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
state = MagicMock(
state=1,
domain="fake",
entity_id="entity.id",
object_id="entity",
attributes={},
)
event = MagicMock(data={"new_state": state}, time_fired=12345)
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
monotonic_time = 0
def fast_monotonic():
"""Monotonic time that ticks fast enough to cause a timeout."""
nonlocal monotonic_time
monotonic_time += 60
return monotonic_time
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
with patch("homeassistant.components.influxdb.time.monotonic", new=fast_monotonic):
handler_method(event)
hass.data[influxdb.DOMAIN].block_till_done()
assert get_write_api(mock_client).call_count == 0