core/homeassistant/components/influxdb.py

344 lines
12 KiB
Python
Raw Normal View History

2015-11-21 18:01:47 +00:00
"""
2016-02-26 22:52:54 +00:00
A component which allows you to send data to an Influx database.
2015-11-21 18:01:47 +00:00
For more details about this component, please refer to the documentation at
2015-12-07 06:33:07 +00:00
https://home-assistant.io/components/influxdb/
2015-11-21 18:01:47 +00:00
"""
import logging
import re
import queue
import threading
import time
import math
import requests.exceptions
import voluptuous as vol
from homeassistant.const import (
CONF_DOMAINS, CONF_ENTITIES, CONF_EXCLUDE, CONF_HOST, CONF_INCLUDE,
CONF_PASSWORD, CONF_PORT, CONF_SSL, CONF_USERNAME, CONF_VERIFY_SSL,
EVENT_STATE_CHANGED, EVENT_HOMEASSISTANT_STOP, STATE_UNAVAILABLE,
STATE_UNKNOWN)
from homeassistant.helpers import state as state_helper
import homeassistant.helpers.config_validation as cv
InfluxDB component improvements (#8633) * Allow reporting some state attributes as tags to InfluxDB Some state attributes should really be tags in InfluxDB. E.g. it is helpful to be able to group by friendly_name, or add a custom attribute like "location" and group by that. Graphs in Grafana are much easier to read when friendly names are used, and not node ids. This commit adds an optional setting to InfluxDB config: 'tags_attributes'. Any attribute on this list will be reported as tag and not as field to InfluxDB. * Allow overriding InfluxDB measurement for each reported item separately Bundling all items with the same "unit of measurement" together does not always makes sense. For example, both "relatively humidity" and "battery level" are reported as "%", but I'd rather see them as separate measurements in InfluxDB. This commit allows for 'influxdb_measurement' attribute. When set on node, it will take precedence over the global 'override_measurement' and component-specific 'unit_of_measurement'. * Minor updates to InfluxDB component improvements, as suggested by @MartinHjelmare. * Moved per-component config from 'customize' into 'influxdb' configuration section. The following three sub-sections were added: 'component_config', 'component_config_domain' and 'component_config_glob'. The sole supported per-component attribute at this point is 'override_measurement'. * Lint * Fixed mocked entity_ids in InfluxDB tests to be in domain.entity_id format, to satisfy EntityValues requirements. * Added tests for new InfluxDB configuration parameters * Fixes to some docstrings
2017-08-03 14:26:01 +00:00
from homeassistant.helpers.entity_values import EntityValues
REQUIREMENTS = ['influxdb==5.0.0']
2015-11-21 18:01:47 +00:00
_LOGGER = logging.getLogger(__name__)
CONF_DB_NAME = 'database'
CONF_TAGS = 'tags'
CONF_DEFAULT_MEASUREMENT = 'default_measurement'
CONF_OVERRIDE_MEASUREMENT = 'override_measurement'
InfluxDB component improvements (#8633) * Allow reporting some state attributes as tags to InfluxDB Some state attributes should really be tags in InfluxDB. E.g. it is helpful to be able to group by friendly_name, or add a custom attribute like "location" and group by that. Graphs in Grafana are much easier to read when friendly names are used, and not node ids. This commit adds an optional setting to InfluxDB config: 'tags_attributes'. Any attribute on this list will be reported as tag and not as field to InfluxDB. * Allow overriding InfluxDB measurement for each reported item separately Bundling all items with the same "unit of measurement" together does not always makes sense. For example, both "relatively humidity" and "battery level" are reported as "%", but I'd rather see them as separate measurements in InfluxDB. This commit allows for 'influxdb_measurement' attribute. When set on node, it will take precedence over the global 'override_measurement' and component-specific 'unit_of_measurement'. * Minor updates to InfluxDB component improvements, as suggested by @MartinHjelmare. * Moved per-component config from 'customize' into 'influxdb' configuration section. The following three sub-sections were added: 'component_config', 'component_config_domain' and 'component_config_glob'. The sole supported per-component attribute at this point is 'override_measurement'. * Lint * Fixed mocked entity_ids in InfluxDB tests to be in domain.entity_id format, to satisfy EntityValues requirements. * Added tests for new InfluxDB configuration parameters * Fixes to some docstrings
2017-08-03 14:26:01 +00:00
CONF_TAGS_ATTRIBUTES = 'tags_attributes'
CONF_COMPONENT_CONFIG = 'component_config'
CONF_COMPONENT_CONFIG_GLOB = 'component_config_glob'
CONF_COMPONENT_CONFIG_DOMAIN = 'component_config_domain'
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
CONF_RETRY_COUNT = 'max_retries'
2015-11-21 18:01:47 +00:00
DEFAULT_DATABASE = 'home_assistant'
2016-12-06 07:39:22 +00:00
DEFAULT_VERIFY_SSL = True
DOMAIN = 'influxdb'
TIMEOUT = 5
RETRY_DELAY = 20
QUEUE_BACKLOG_SECONDS = 30
BATCH_TIMEOUT = 1
BATCH_BUFFER_SIZE = 100
InfluxDB component improvements (#8633) * Allow reporting some state attributes as tags to InfluxDB Some state attributes should really be tags in InfluxDB. E.g. it is helpful to be able to group by friendly_name, or add a custom attribute like "location" and group by that. Graphs in Grafana are much easier to read when friendly names are used, and not node ids. This commit adds an optional setting to InfluxDB config: 'tags_attributes'. Any attribute on this list will be reported as tag and not as field to InfluxDB. * Allow overriding InfluxDB measurement for each reported item separately Bundling all items with the same "unit of measurement" together does not always makes sense. For example, both "relatively humidity" and "battery level" are reported as "%", but I'd rather see them as separate measurements in InfluxDB. This commit allows for 'influxdb_measurement' attribute. When set on node, it will take precedence over the global 'override_measurement' and component-specific 'unit_of_measurement'. * Minor updates to InfluxDB component improvements, as suggested by @MartinHjelmare. * Moved per-component config from 'customize' into 'influxdb' configuration section. The following three sub-sections were added: 'component_config', 'component_config_domain' and 'component_config_glob'. The sole supported per-component attribute at this point is 'override_measurement'. * Lint * Fixed mocked entity_ids in InfluxDB tests to be in domain.entity_id format, to satisfy EntityValues requirements. * Added tests for new InfluxDB configuration parameters * Fixes to some docstrings
2017-08-03 14:26:01 +00:00
COMPONENT_CONFIG_SCHEMA_ENTRY = vol.Schema({
vol.Optional(CONF_OVERRIDE_MEASUREMENT): cv.string,
})
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.All(vol.Schema({
2016-12-06 07:39:22 +00:00
vol.Optional(CONF_HOST): cv.string,
vol.Inclusive(CONF_USERNAME, 'authentication'): cv.string,
vol.Inclusive(CONF_PASSWORD, 'authentication'): cv.string,
vol.Optional(CONF_EXCLUDE, default={}): vol.Schema({
vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids,
vol.Optional(CONF_DOMAINS, default=[]):
vol.All(cv.ensure_list, [cv.string])
}),
vol.Optional(CONF_INCLUDE, default={}): vol.Schema({
vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids,
vol.Optional(CONF_DOMAINS, default=[]):
vol.All(cv.ensure_list, [cv.string])
}),
vol.Optional(CONF_DB_NAME, default=DEFAULT_DATABASE): cv.string,
2016-12-06 07:39:22 +00:00
vol.Optional(CONF_PORT): cv.port,
vol.Optional(CONF_SSL): cv.boolean,
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
vol.Optional(CONF_RETRY_COUNT, default=0): cv.positive_int,
vol.Optional(CONF_DEFAULT_MEASUREMENT): cv.string,
vol.Optional(CONF_OVERRIDE_MEASUREMENT): cv.string,
vol.Optional(CONF_TAGS, default={}):
vol.Schema({cv.string: cv.string}),
InfluxDB component improvements (#8633) * Allow reporting some state attributes as tags to InfluxDB Some state attributes should really be tags in InfluxDB. E.g. it is helpful to be able to group by friendly_name, or add a custom attribute like "location" and group by that. Graphs in Grafana are much easier to read when friendly names are used, and not node ids. This commit adds an optional setting to InfluxDB config: 'tags_attributes'. Any attribute on this list will be reported as tag and not as field to InfluxDB. * Allow overriding InfluxDB measurement for each reported item separately Bundling all items with the same "unit of measurement" together does not always makes sense. For example, both "relatively humidity" and "battery level" are reported as "%", but I'd rather see them as separate measurements in InfluxDB. This commit allows for 'influxdb_measurement' attribute. When set on node, it will take precedence over the global 'override_measurement' and component-specific 'unit_of_measurement'. * Minor updates to InfluxDB component improvements, as suggested by @MartinHjelmare. * Moved per-component config from 'customize' into 'influxdb' configuration section. The following three sub-sections were added: 'component_config', 'component_config_domain' and 'component_config_glob'. The sole supported per-component attribute at this point is 'override_measurement'. * Lint * Fixed mocked entity_ids in InfluxDB tests to be in domain.entity_id format, to satisfy EntityValues requirements. * Added tests for new InfluxDB configuration parameters * Fixes to some docstrings
2017-08-03 14:26:01 +00:00
vol.Optional(CONF_TAGS_ATTRIBUTES, default=[]):
vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL): cv.boolean,
InfluxDB component improvements (#8633) * Allow reporting some state attributes as tags to InfluxDB Some state attributes should really be tags in InfluxDB. E.g. it is helpful to be able to group by friendly_name, or add a custom attribute like "location" and group by that. Graphs in Grafana are much easier to read when friendly names are used, and not node ids. This commit adds an optional setting to InfluxDB config: 'tags_attributes'. Any attribute on this list will be reported as tag and not as field to InfluxDB. * Allow overriding InfluxDB measurement for each reported item separately Bundling all items with the same "unit of measurement" together does not always makes sense. For example, both "relatively humidity" and "battery level" are reported as "%", but I'd rather see them as separate measurements in InfluxDB. This commit allows for 'influxdb_measurement' attribute. When set on node, it will take precedence over the global 'override_measurement' and component-specific 'unit_of_measurement'. * Minor updates to InfluxDB component improvements, as suggested by @MartinHjelmare. * Moved per-component config from 'customize' into 'influxdb' configuration section. The following three sub-sections were added: 'component_config', 'component_config_domain' and 'component_config_glob'. The sole supported per-component attribute at this point is 'override_measurement'. * Lint * Fixed mocked entity_ids in InfluxDB tests to be in domain.entity_id format, to satisfy EntityValues requirements. * Added tests for new InfluxDB configuration parameters * Fixes to some docstrings
2017-08-03 14:26:01 +00:00
vol.Optional(CONF_COMPONENT_CONFIG, default={}):
vol.Schema({cv.entity_id: COMPONENT_CONFIG_SCHEMA_ENTRY}),
vol.Optional(CONF_COMPONENT_CONFIG_GLOB, default={}):
vol.Schema({cv.string: COMPONENT_CONFIG_SCHEMA_ENTRY}),
vol.Optional(CONF_COMPONENT_CONFIG_DOMAIN, default={}):
vol.Schema({cv.string: COMPONENT_CONFIG_SCHEMA_ENTRY}),
})),
}, extra=vol.ALLOW_EXTRA)
2015-11-21 18:01:47 +00:00
RE_DIGIT_TAIL = re.compile(r'^[^\.]*\d+\.?\d+[^\.]*$')
RE_DECIMAL = re.compile(r'[^\d.]+')
2015-11-21 18:01:47 +00:00
def setup(hass, config):
"""Set up the InfluxDB component."""
from influxdb import InfluxDBClient, exceptions
2015-11-21 18:01:47 +00:00
conf = config[DOMAIN]
2016-12-06 07:39:22 +00:00
kwargs = {
'database': conf[CONF_DB_NAME],
'verify_ssl': conf[CONF_VERIFY_SSL],
'timeout': TIMEOUT
}
if CONF_HOST in conf:
kwargs['host'] = conf[CONF_HOST]
if CONF_PORT in conf:
kwargs['port'] = conf[CONF_PORT]
if CONF_USERNAME in conf:
kwargs['username'] = conf[CONF_USERNAME]
if CONF_PASSWORD in conf:
kwargs['password'] = conf[CONF_PASSWORD]
if CONF_SSL in conf:
kwargs['ssl'] = conf[CONF_SSL]
include = conf.get(CONF_INCLUDE, {})
exclude = conf.get(CONF_EXCLUDE, {})
whitelist_e = set(include.get(CONF_ENTITIES, []))
whitelist_d = set(include.get(CONF_DOMAINS, []))
blacklist_e = set(exclude.get(CONF_ENTITIES, []))
blacklist_d = set(exclude.get(CONF_DOMAINS, []))
tags = conf.get(CONF_TAGS)
InfluxDB component improvements (#8633) * Allow reporting some state attributes as tags to InfluxDB Some state attributes should really be tags in InfluxDB. E.g. it is helpful to be able to group by friendly_name, or add a custom attribute like "location" and group by that. Graphs in Grafana are much easier to read when friendly names are used, and not node ids. This commit adds an optional setting to InfluxDB config: 'tags_attributes'. Any attribute on this list will be reported as tag and not as field to InfluxDB. * Allow overriding InfluxDB measurement for each reported item separately Bundling all items with the same "unit of measurement" together does not always makes sense. For example, both "relatively humidity" and "battery level" are reported as "%", but I'd rather see them as separate measurements in InfluxDB. This commit allows for 'influxdb_measurement' attribute. When set on node, it will take precedence over the global 'override_measurement' and component-specific 'unit_of_measurement'. * Minor updates to InfluxDB component improvements, as suggested by @MartinHjelmare. * Moved per-component config from 'customize' into 'influxdb' configuration section. The following three sub-sections were added: 'component_config', 'component_config_domain' and 'component_config_glob'. The sole supported per-component attribute at this point is 'override_measurement'. * Lint * Fixed mocked entity_ids in InfluxDB tests to be in domain.entity_id format, to satisfy EntityValues requirements. * Added tests for new InfluxDB configuration parameters * Fixes to some docstrings
2017-08-03 14:26:01 +00:00
tags_attributes = conf.get(CONF_TAGS_ATTRIBUTES)
default_measurement = conf.get(CONF_DEFAULT_MEASUREMENT)
override_measurement = conf.get(CONF_OVERRIDE_MEASUREMENT)
InfluxDB component improvements (#8633) * Allow reporting some state attributes as tags to InfluxDB Some state attributes should really be tags in InfluxDB. E.g. it is helpful to be able to group by friendly_name, or add a custom attribute like "location" and group by that. Graphs in Grafana are much easier to read when friendly names are used, and not node ids. This commit adds an optional setting to InfluxDB config: 'tags_attributes'. Any attribute on this list will be reported as tag and not as field to InfluxDB. * Allow overriding InfluxDB measurement for each reported item separately Bundling all items with the same "unit of measurement" together does not always makes sense. For example, both "relatively humidity" and "battery level" are reported as "%", but I'd rather see them as separate measurements in InfluxDB. This commit allows for 'influxdb_measurement' attribute. When set on node, it will take precedence over the global 'override_measurement' and component-specific 'unit_of_measurement'. * Minor updates to InfluxDB component improvements, as suggested by @MartinHjelmare. * Moved per-component config from 'customize' into 'influxdb' configuration section. The following three sub-sections were added: 'component_config', 'component_config_domain' and 'component_config_glob'. The sole supported per-component attribute at this point is 'override_measurement'. * Lint * Fixed mocked entity_ids in InfluxDB tests to be in domain.entity_id format, to satisfy EntityValues requirements. * Added tests for new InfluxDB configuration parameters * Fixes to some docstrings
2017-08-03 14:26:01 +00:00
component_config = EntityValues(
conf[CONF_COMPONENT_CONFIG],
conf[CONF_COMPONENT_CONFIG_DOMAIN],
conf[CONF_COMPONENT_CONFIG_GLOB])
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
max_tries = conf.get(CONF_RETRY_COUNT)
2015-11-21 18:01:47 +00:00
try:
2016-12-06 07:39:22 +00:00
influx = InfluxDBClient(**kwargs)
influx.query("SHOW SERIES LIMIT 1;", database=conf[CONF_DB_NAME])
except (exceptions.InfluxDBClientError,
requests.exceptions.ConnectionError) as exc:
_LOGGER.error("Database host is not accessible due to '%s', please "
"check your entries in the configuration file (host, "
"port, etc.) and verify that the database exists and is "
"READ/WRITE", exc)
return False
2015-11-21 18:01:47 +00:00
def event_to_json(event):
"""Add an event to the outgoing Influx list."""
2015-12-06 17:45:58 +00:00
state = event.data.get('new_state')
if state is None or state.state in (
STATE_UNKNOWN, '', STATE_UNAVAILABLE) or \
state.entity_id in blacklist_e or state.domain in blacklist_d:
return
try:
if (whitelist_e and state.entity_id not in whitelist_e) or \
(whitelist_d and state.domain not in whitelist_d):
return
_include_state = _include_value = False
_state_as_value = float(state.state)
_include_value = True
except ValueError:
try:
_state_as_value = float(state_helper.state_as_number(state))
_include_state = _include_value = True
except ValueError:
_include_state = True
include_uom = True
InfluxDB component improvements (#8633) * Allow reporting some state attributes as tags to InfluxDB Some state attributes should really be tags in InfluxDB. E.g. it is helpful to be able to group by friendly_name, or add a custom attribute like "location" and group by that. Graphs in Grafana are much easier to read when friendly names are used, and not node ids. This commit adds an optional setting to InfluxDB config: 'tags_attributes'. Any attribute on this list will be reported as tag and not as field to InfluxDB. * Allow overriding InfluxDB measurement for each reported item separately Bundling all items with the same "unit of measurement" together does not always makes sense. For example, both "relatively humidity" and "battery level" are reported as "%", but I'd rather see them as separate measurements in InfluxDB. This commit allows for 'influxdb_measurement' attribute. When set on node, it will take precedence over the global 'override_measurement' and component-specific 'unit_of_measurement'. * Minor updates to InfluxDB component improvements, as suggested by @MartinHjelmare. * Moved per-component config from 'customize' into 'influxdb' configuration section. The following three sub-sections were added: 'component_config', 'component_config_domain' and 'component_config_glob'. The sole supported per-component attribute at this point is 'override_measurement'. * Lint * Fixed mocked entity_ids in InfluxDB tests to be in domain.entity_id format, to satisfy EntityValues requirements. * Added tests for new InfluxDB configuration parameters * Fixes to some docstrings
2017-08-03 14:26:01 +00:00
measurement = component_config.get(state.entity_id).get(
CONF_OVERRIDE_MEASUREMENT)
if measurement in (None, ''):
if override_measurement:
measurement = override_measurement
else:
measurement = state.attributes.get('unit_of_measurement')
if measurement in (None, ''):
if default_measurement:
measurement = default_measurement
else:
measurement = state.entity_id
else:
include_uom = False
json = {
'measurement': measurement,
'tags': {
'domain': state.domain,
'entity_id': state.object_id,
},
'time': event.time_fired,
'fields': {}
}
if _include_state:
json['fields']['state'] = state.state
if _include_value:
json['fields']['value'] = _state_as_value
2015-11-21 18:01:47 +00:00
for key, value in state.attributes.items():
InfluxDB component improvements (#8633) * Allow reporting some state attributes as tags to InfluxDB Some state attributes should really be tags in InfluxDB. E.g. it is helpful to be able to group by friendly_name, or add a custom attribute like "location" and group by that. Graphs in Grafana are much easier to read when friendly names are used, and not node ids. This commit adds an optional setting to InfluxDB config: 'tags_attributes'. Any attribute on this list will be reported as tag and not as field to InfluxDB. * Allow overriding InfluxDB measurement for each reported item separately Bundling all items with the same "unit of measurement" together does not always makes sense. For example, both "relatively humidity" and "battery level" are reported as "%", but I'd rather see them as separate measurements in InfluxDB. This commit allows for 'influxdb_measurement' attribute. When set on node, it will take precedence over the global 'override_measurement' and component-specific 'unit_of_measurement'. * Minor updates to InfluxDB component improvements, as suggested by @MartinHjelmare. * Moved per-component config from 'customize' into 'influxdb' configuration section. The following three sub-sections were added: 'component_config', 'component_config_domain' and 'component_config_glob'. The sole supported per-component attribute at this point is 'override_measurement'. * Lint * Fixed mocked entity_ids in InfluxDB tests to be in domain.entity_id format, to satisfy EntityValues requirements. * Added tests for new InfluxDB configuration parameters * Fixes to some docstrings
2017-08-03 14:26:01 +00:00
if key in tags_attributes:
json['tags'][key] = value
elif key != 'unit_of_measurement' or include_uom:
# If the key is already in fields
if key in json['fields']:
key = key + "_"
# Prevent column data errors in influxDB.
# For each value we try to cast it as float
# But if we can not do it we store the value
# as string add "_str" postfix to the field key
try:
json['fields'][key] = float(value)
except (ValueError, TypeError):
new_key = "{}_str".format(key)
new_value = str(value)
json['fields'][new_key] = new_value
if RE_DIGIT_TAIL.match(new_value):
json['fields'][key] = float(
RE_DECIMAL.sub('', new_value))
# Infinity and NaN are not valid floats in InfluxDB
try:
if not math.isfinite(json['fields'][key]):
del json['fields'][key]
except (KeyError, TypeError):
pass
json['tags'].update(tags)
return json
2015-11-21 18:01:47 +00:00
instance = hass.data[DOMAIN] = InfluxThread(
hass, influx, event_to_json, max_tries)
instance.start()
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
def shutdown(event):
"""Shut down the thread."""
instance.queue.put(None)
instance.join()
influx.close()
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
return True
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
class InfluxThread(threading.Thread):
"""A threaded event handler class."""
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
def __init__(self, hass, influx, event_to_json, max_tries):
"""Initialize the listener."""
threading.Thread.__init__(self, name='InfluxDB')
self.queue = queue.Queue()
self.influx = influx
self.event_to_json = event_to_json
self.max_tries = max_tries
self.write_errors = 0
self.shutdown = False
hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener)
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
def _event_listener(self, event):
"""Listen for new messages on the bus and queue them for Influx."""
item = (time.monotonic(), event)
self.queue.put(item)
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
@staticmethod
def batch_timeout():
"""Return number of seconds to wait for more events."""
return BATCH_TIMEOUT
def get_events_json(self):
"""Return a batch of events formatted for writing."""
queue_seconds = QUEUE_BACKLOG_SECONDS + self.max_tries*RETRY_DELAY
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
count = 0
json = []
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
dropped = 0
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
try:
while len(json) < BATCH_BUFFER_SIZE and not self.shutdown:
timeout = None if count == 0 else self.batch_timeout()
item = self.queue.get(timeout=timeout)
count += 1
if item is None:
self.shutdown = True
else:
timestamp, event = item
age = time.monotonic() - timestamp
if age < queue_seconds:
event_json = self.event_to_json(event)
if event_json:
json.append(event_json)
else:
dropped += 1
InfluxDB send retry after IOError (#10263) * Implement data write retry for InfluxDB This adds an optional max_retries parameter to the InfluxDB component to specify if and how often the component should try to send the data if the connection failed due to an IOError. The sending will be scheduled for a retry in 20 seconds as often as the user specified. This can be handy for flaky getwork connections between the DB and Homeassistant or outages like daily DSL reconnects. Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittest for influx write retries Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add RetryOnError as helper decorator in util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Add unittests for RetryOnError Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Use RetryOnError decorator in InfluxDB This replaces the scheduling logic in the InfluxDB component with the RetryOnError decorator from homeassistant.util Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Make the linters happy Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de> * Implement a queue limit for the retry decorator. This adds a queue limit to the RetryOnError handler. It limits the number of calls waiting for be retried. If this number is exceeded, every new call will discard the oldest one in the queue. * influxdb: Add the retry queue limit option. * Make the linter happy. * Make pylint happy * Log exception of dropped retry * Move RetryOnError decorator to influxdb component. * Fix bug in logging usage * Fix imports * Add newlines at the end of files. * Remove blank line * Remove blank line
2017-11-24 00:58:18 +00:00
except queue.Empty:
pass
if dropped:
_LOGGER.warning("Catching up, dropped %d old events", dropped)
return count, json
def write_to_influxdb(self, json):
"""Write preprocessed events to influxdb, with retry."""
from influxdb import exceptions
for retry in range(self.max_tries+1):
try:
self.influx.write_points(json)
if self.write_errors:
_LOGGER.error("Resumed, lost %d events", self.write_errors)
self.write_errors = 0
_LOGGER.debug("Wrote %d events", len(json))
break
except (exceptions.InfluxDBClientError, IOError):
if retry < self.max_tries:
time.sleep(RETRY_DELAY)
else:
if not self.write_errors:
_LOGGER.exception("Write error")
self.write_errors += len(json)
def run(self):
"""Process incoming events."""
while not self.shutdown:
count, json = self.get_events_json()
if json:
self.write_to_influxdb(json)
for _ in range(count):
self.queue.task_done()
def block_till_done(self):
"""Block till all events processed."""
self.queue.join()