diff --git a/homeassistant/components/sensor/statistics.py b/homeassistant/components/sensor/statistics.py new file mode 100644 index 00000000000..0c413ab9263 --- /dev/null +++ b/homeassistant/components/sensor/statistics.py @@ -0,0 +1,151 @@ +""" +Support for statistics for sensor values. + +For more details about this platform, please refer to the documentation at +https://home-assistant.io/components/sensor.statistics/ +""" +import logging +import statistics +from collections import deque + +import voluptuous as vol + +from homeassistant.components.sensor import PLATFORM_SCHEMA +from homeassistant.const import ( + CONF_NAME, CONF_ENTITY_ID, STATE_UNKNOWN, ATTR_UNIT_OF_MEASUREMENT) +import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.entity import Entity +from homeassistant.helpers.event import track_state_change + +_LOGGER = logging.getLogger(__name__) + +ATTR_MIN_VALUE = 'min_value' +ATTR_MAX_VALUE = 'max_value' +ATTR_COUNT = 'count' +ATTR_MEAN = 'mean' +ATTR_MEDIAN = 'median' +ATTR_VARIANCE = 'variance' +ATTR_STANDARD_DEVIATION = 'standard_deviation' +ATTR_SAMPLING_SIZE = 'sampling_size' +ATTR_TOTAL = 'total' + +CONF_SAMPLING_SIZE = 'sampling_size' +DEFAULT_NAME = 'Stats' +DEFAULT_SIZE = 20 +ICON = 'mdi:calculator' + +PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ + vol.Required(CONF_ENTITY_ID): cv.entity_id, + vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string, + vol.Optional(CONF_SAMPLING_SIZE, default=DEFAULT_SIZE): cv.positive_int, +}) + + +def setup_platform(hass, config, add_devices, discovery_info=None): + """Setup the Statistics sensor.""" + entity_id = config.get(CONF_ENTITY_ID) + name = config.get(CONF_NAME) + sampling_size = config.get(CONF_SAMPLING_SIZE) + + add_devices([StatisticsSensor(hass, entity_id, name, sampling_size)]) + + +# pylint: disable=too-many-instance-attributes +class StatisticsSensor(Entity): + """Representation of a Statistics sensor.""" + + def __init__(self, hass, entity_id, name, sampling_size): + """Initialize the Statistics sensor.""" + self._hass = hass + self._entity_id = entity_id + self.is_binary = True if self._entity_id.split('.')[0] == \ + 'binary_sensor' else False + if not self.is_binary: + self._name = '{} {}'.format(name, ATTR_MEAN) + else: + self._name = '{} {}'.format(name, ATTR_COUNT) + self._sampling_size = sampling_size + self._unit_of_measurement = None + if self._sampling_size == 0: + self.states = deque() + else: + self.states = deque(maxlen=self._sampling_size) + self.median = self.mean = self.variance = self.stdev = 0 + self.min = self.max = self.total = self.count = 0 + self.update() + + def calculate_sensor_state_listener(entity, old_state, new_state): + """Called when the sensor changes state.""" + self._unit_of_measurement = new_state.attributes.get( + ATTR_UNIT_OF_MEASUREMENT) + + try: + self.states.append(float(new_state.state)) + self.count = self.count + 1 + except ValueError: + self.count = self.count + 1 + + self.update_ha_state(True) + + track_state_change(hass, entity_id, calculate_sensor_state_listener) + + @property + def name(self): + """Return the name of the sensor.""" + return self._name + + @property + def state(self): + """Return the state of the sensor.""" + return self.mean if not self.is_binary else self.count + + @property + def unit_of_measurement(self): + """Return the unit the value is expressed in.""" + return self._unit_of_measurement if not self.is_binary else None + + @property + def should_poll(self): + """No polling needed.""" + return False + + @property + def state_attributes(self): + """Return the state attributes of the sensor.""" + if not self.is_binary: + return { + ATTR_MEAN: self.mean, + ATTR_COUNT: self.count, + ATTR_MAX_VALUE: self.max, + ATTR_MEDIAN: self.median, + ATTR_MIN_VALUE: self.min, + ATTR_SAMPLING_SIZE: 'unlimited' if self._sampling_size is + 0 else self._sampling_size, + ATTR_STANDARD_DEVIATION: self.stdev, + ATTR_TOTAL: self.total, + ATTR_VARIANCE: self.variance, + } + + @property + def icon(self): + """Return the icon to use in the frontend, if any.""" + return ICON + + def update(self): + """Get the latest data and updates the states.""" + if not self.is_binary: + try: + self.mean = round(statistics.mean(self.states), 2) + self.median = round(statistics.median(self.states), 2) + self.stdev = round(statistics.stdev(self.states), 2) + self.variance = round(statistics.variance(self.states), 2) + except statistics.StatisticsError as err: + _LOGGER.warning(err) + self.mean = self.median = STATE_UNKNOWN + self.stdev = self.variance = STATE_UNKNOWN + if self.states: + self.total = round(sum(self.states), 2) + self.min = min(self.states) + self.max = max(self.states) + else: + self.min = self.max = self.total = STATE_UNKNOWN diff --git a/tests/components/sensor/test_statistics.py b/tests/components/sensor/test_statistics.py new file mode 100644 index 00000000000..75649a0c140 --- /dev/null +++ b/tests/components/sensor/test_statistics.py @@ -0,0 +1,97 @@ +"""The test for the statistics sensor platform.""" +import unittest +import statistics + +from homeassistant.bootstrap import setup_component +from homeassistant.const import (ATTR_UNIT_OF_MEASUREMENT, TEMP_CELSIUS) +from tests.common import get_test_home_assistant + + +class TestStatisticsSensor(unittest.TestCase): + """Test the Statistics sensor.""" + + def setup_method(self, method): + """Setup things to be run when tests are started.""" + self.hass = get_test_home_assistant() + self.values = [17, 20, 15.2, 5, 3.8, 9.2, 6.7, 14, 6] + self.count = len(self.values) + self.min = min(self.values) + self.max = max(self.values) + self.total = sum(self.values) + self.mean = round(sum(self.values) / len(self.values), 2) + self.median = round(statistics.median(self.values), 2) + self.deviation = round(statistics.stdev(self.values), 2) + self.variance = round(statistics.variance(self.values), 2) + + def teardown_method(self, method): + """Stop everything that was started.""" + self.hass.stop() + + def test_binary_sensor_source(self): + """Test if source is a sensor.""" + values = [1, 0, 1, 0, 1, 0, 1] + assert setup_component(self.hass, 'sensor', { + 'sensor': { + 'platform': 'statistics', + 'name': 'test', + 'entity_id': 'binary_sensor.test_monitored', + } + }) + + for value in values: + self.hass.states.set('binary_sensor.test_monitored', value) + self.hass.block_till_done() + + state = self.hass.states.get('sensor.test_count') + + self.assertEqual(str(len(values)), state.state) + + def test_sensor_source(self): + """Test if source is a sensor.""" + assert setup_component(self.hass, 'sensor', { + 'sensor': { + 'platform': 'statistics', + 'name': 'test', + 'entity_id': 'sensor.test_monitored', + } + }) + + for value in self.values: + self.hass.states.set('sensor.test_monitored', value, + {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}) + self.hass.block_till_done() + + state = self.hass.states.get('sensor.test_mean') + + self.assertEqual(str(self.mean), state.state) + self.assertEqual(self.min, state.attributes.get('min_value')) + self.assertEqual(self.max, state.attributes.get('max_value')) + self.assertEqual(self.variance, state.attributes.get('variance')) + self.assertEqual(self.median, state.attributes.get('median')) + self.assertEqual(self.deviation, + state.attributes.get('standard_deviation')) + self.assertEqual(self.mean, state.attributes.get('mean')) + self.assertEqual(self.count, state.attributes.get('count')) + self.assertEqual(self.total, state.attributes.get('total')) + self.assertEqual('°C', state.attributes.get('unit_of_measurement')) + + def test_sampling_size(self): + """Test rotation.""" + assert setup_component(self.hass, 'sensor', { + 'sensor': { + 'platform': 'statistics', + 'name': 'test', + 'entity_id': 'sensor.test_monitored', + 'sampling_size': 5, + } + }) + + for value in self.values: + self.hass.states.set('sensor.test_monitored', value, + {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}) + self.hass.block_till_done() + + state = self.hass.states.get('sensor.test_mean') + + self.assertEqual(3.8, state.attributes.get('min_value')) + self.assertEqual(14, state.attributes.get('max_value'))