Add assumed_state to group

pull/1353/head
Paulus Schoutsen 2016-02-20 19:11:02 -08:00
parent 443b39bccd
commit 1eae74be58
2 changed files with 144 additions and 53 deletions

View File

@ -9,7 +9,8 @@ https://home-assistant.io/components/group/
import homeassistant.core as ha
from homeassistant.const import (
ATTR_ENTITY_ID, CONF_ICON, CONF_NAME, STATE_CLOSED, STATE_HOME,
STATE_NOT_HOME, STATE_OFF, STATE_ON, STATE_OPEN, STATE_UNKNOWN)
STATE_NOT_HOME, STATE_OFF, STATE_ON, STATE_OPEN, STATE_UNKNOWN,
ATTR_ASSUMED_STATE, )
from homeassistant.helpers.entity import (
Entity, generate_entity_id, split_entity_id)
from homeassistant.helpers.event import track_state_change
@ -144,6 +145,7 @@ class Group(Entity):
self.tracking = []
self.group_on = None
self.group_off = None
self._assumed_state = False
if entity_ids is not None:
self.update_tracked_entity_ids(entity_ids)
@ -182,6 +184,11 @@ class Group(Entity):
data[ATTR_VIEW] = True
return data
@property
def assumed_state(self):
"""Return True if unable to access real state of entity."""
return self._assumed_state
def update_tracked_entity_ids(self, entity_ids):
""" Update the tracked entity IDs. """
self.stop()
@ -207,47 +214,77 @@ class Group(Entity):
def update(self):
""" Query all the tracked states and determine current group state. """
self._state = STATE_UNKNOWN
self._update_group_state()
def _state_changed_listener(self, entity_id, old_state, new_state):
""" Listener to receive state changes of tracked entities. """
self._update_group_state(new_state)
self.update_ha_state()
@property
def _tracking_states(self):
"""States that the group is tracking."""
states = []
for entity_id in self.tracking:
state = self.hass.states.get(entity_id)
if state is not None:
self._process_tracked_state(state)
states.append(state)
def _state_changed_listener(self, entity_id, old_state, new_state):
""" Listener to receive state changes of tracked entities. """
self._process_tracked_state(new_state)
self.update_ha_state()
return states
def _process_tracked_state(self, tr_state):
""" Updates group state based on a new state of a tracked entity. """
def _update_group_state(self, tr_state=None):
"""Update group state.
Optionally you can provide the only state changed since last update
allowing this method to take shortcuts.
"""
# pylint: disable=too-many-branches
# To store current states of group entities. Might not be needed.
states = None
gr_state, gr_on, gr_off = self._state, self.group_on, self.group_off
# We have not determined type of group yet
if self.group_on is None:
self.group_on, self.group_off = _get_group_on_off(tr_state.state)
if gr_on is None:
if tr_state is None:
states = self._tracking_states
if self.group_on is not None:
# New state of the group is going to be based on the first
# state that we can recognize
self._state = tr_state.state
for state in states:
gr_on, gr_off = \
_get_group_on_off(state.state)
if gr_on is not None:
break
else:
gr_on, gr_off = _get_group_on_off(tr_state.state)
if gr_on is not None:
self.group_on, self.group_off = gr_on, gr_off
# We cannot determine state of the group
if gr_on is None:
return
# There is already a group state
cur_gr_state = self._state
group_on, group_off = self.group_on, self.group_off
if tr_state is None or (gr_state == gr_on and
tr_state.state == gr_off):
if states is None:
states = self._tracking_states
# if cur_gr_state = OFF and tr_state = ON: set ON
# if cur_gr_state = ON and tr_state = OFF: research
# else: ignore
if any(state.state == gr_on for state in states):
self._state = gr_on
else:
self._state = gr_off
if cur_gr_state == group_off and tr_state.state == group_on:
self._state = group_on
elif tr_state.state in (gr_on, gr_off):
self._state = tr_state.state
elif cur_gr_state == group_on and tr_state.state == group_off:
if tr_state is None or self._assumed_state and \
not tr_state.attributes.get(ATTR_ASSUMED_STATE):
if states is None:
states = self._tracking_states
# Set to off if no other states are on
if not any(self.hass.states.is_state(ent_id, group_on)
for ent_id in self.tracking
if tr_state.entity_id != ent_id):
self._state = group_off
self._assumed_state = any(state.attributes.get(ATTR_ASSUMED_STATE)
for state in states)
elif tr_state.attributes.get(ATTR_ASSUMED_STATE):
self._assumed_state = True

View File

@ -8,7 +8,8 @@ Tests the group compoments.
import unittest
from homeassistant.const import (
STATE_ON, STATE_OFF, STATE_HOME, STATE_UNKNOWN, ATTR_ICON, ATTR_HIDDEN)
STATE_ON, STATE_OFF, STATE_HOME, STATE_UNKNOWN, ATTR_ICON, ATTR_HIDDEN,
ATTR_ASSUMED_STATE, )
import homeassistant.components.group as group
from tests.common import get_test_home_assistant
@ -21,19 +22,13 @@ class TestComponentsGroup(unittest.TestCase):
""" Init needed objects. """
self.hass = get_test_home_assistant()
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
self.group_entity_id = test_group.entity_id
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass.stop()
def test_setup_group_with_mixed_groupable_states(self):
""" Try to setup a group with mixed groupable states """
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('device_tracker.Paulus', STATE_HOME)
group.Group(
self.hass, 'person_and_light',
@ -46,6 +41,8 @@ class TestComponentsGroup(unittest.TestCase):
def test_setup_group_with_a_non_existing_state(self):
""" Try to setup a group with a non existing state """
self.hass.states.set('light.Bowl', STATE_ON)
grp = group.Group(
self.hass, 'light_and_nothing',
['light.Bowl', 'non.existing'])
@ -70,11 +67,15 @@ class TestComponentsGroup(unittest.TestCase):
def test_monitor_group(self):
""" Test if the group keeps track of states. """
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
# Test if group setup in our init mode is ok
self.assertIn(self.group_entity_id, self.hass.states.entity_ids())
self.assertIn(test_group.entity_id, self.hass.states.entity_ids())
group_state = self.hass.states.get(self.group_entity_id)
group_state = self.hass.states.get(test_group.entity_id)
self.assertEqual(STATE_ON, group_state.state)
self.assertTrue(group_state.attributes.get(group.ATTR_AUTO))
@ -83,54 +84,73 @@ class TestComponentsGroup(unittest.TestCase):
Test if the group turns off if the last device that was on turns off.
"""
self.hass.states.set('light.Bowl', STATE_OFF)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
self.hass.pool.block_till_done()
group_state = self.hass.states.get(self.group_entity_id)
group_state = self.hass.states.get(test_group.entity_id)
self.assertEqual(STATE_OFF, group_state.state)
def test_group_turns_on_if_all_are_off_and_one_turns_on(self):
"""
Test if group turns on if all devices were turned off and one turns on.
"""
# Make sure all are off.
self.hass.states.set('light.Bowl', STATE_OFF)
self.hass.pool.block_till_done()
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
# Turn one on
self.hass.states.set('light.Ceiling', STATE_ON)
self.hass.pool.block_till_done()
group_state = self.hass.states.get(self.group_entity_id)
group_state = self.hass.states.get(test_group.entity_id)
self.assertEqual(STATE_ON, group_state.state)
def test_is_on(self):
""" Test is_on method. """
self.assertTrue(group.is_on(self.hass, self.group_entity_id))
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
self.assertTrue(group.is_on(self.hass, test_group.entity_id))
self.hass.states.set('light.Bowl', STATE_OFF)
self.hass.pool.block_till_done()
self.assertFalse(group.is_on(self.hass, self.group_entity_id))
self.assertFalse(group.is_on(self.hass, test_group.entity_id))
# Try on non existing state
self.assertFalse(group.is_on(self.hass, 'non.existing'))
def test_expand_entity_ids(self):
""" Test expand_entity_ids method. """
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
self.assertEqual(sorted(['light.ceiling', 'light.bowl']),
sorted(group.expand_entity_ids(
self.hass, [self.group_entity_id])))
self.hass, [test_group.entity_id])))
def test_expand_entity_ids_does_not_return_duplicates(self):
""" Test that expand_entity_ids does not return duplicates. """
self.assertEqual(
['light.bowl', 'light.ceiling'],
sorted(group.expand_entity_ids(
self.hass, [self.group_entity_id, 'light.Ceiling'])))
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
self.assertEqual(
['light.bowl', 'light.ceiling'],
sorted(group.expand_entity_ids(
self.hass, ['light.bowl', self.group_entity_id])))
self.hass, [test_group.entity_id, 'light.Ceiling'])))
self.assertEqual(
['light.bowl', 'light.ceiling'],
sorted(group.expand_entity_ids(
self.hass, ['light.bowl', test_group.entity_id])))
def test_expand_entity_ids_ignores_non_strings(self):
""" Test that non string elements in lists are ignored. """
@ -138,9 +158,14 @@ class TestComponentsGroup(unittest.TestCase):
def test_get_entity_ids(self):
""" Test get_entity_ids method. """
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
self.assertEqual(
['light.bowl', 'light.ceiling'],
sorted(group.get_entity_ids(self.hass, self.group_entity_id)))
sorted(group.get_entity_ids(self.hass, test_group.entity_id)))
def test_get_entity_ids_with_domain_filter(self):
""" Test if get_entity_ids works with a domain_filter. """
@ -190,13 +215,18 @@ class TestComponentsGroup(unittest.TestCase):
def test_setup(self):
""" Test setup method. """
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
self.assertTrue(
group.setup(
self.hass,
{
group.DOMAIN: {
'second_group': {
'entities': 'light.Bowl, ' + self.group_entity_id,
'entities': 'light.Bowl, ' + test_group.entity_id,
'icon': 'mdi:work',
'view': True,
},
@ -207,7 +237,7 @@ class TestComponentsGroup(unittest.TestCase):
group_state = self.hass.states.get(
group.ENTITY_ID_FORMAT.format('second_group'))
self.assertEqual(STATE_ON, group_state.state)
self.assertEqual(set((self.group_entity_id, 'light.bowl')),
self.assertEqual(set((test_group.entity_id, 'light.bowl')),
set(group_state.attributes['entity_id']))
self.assertIsNone(group_state.attributes.get(group.ATTR_AUTO))
self.assertEqual('mdi:work',
@ -242,3 +272,27 @@ class TestComponentsGroup(unittest.TestCase):
['light.test_1', 'light.test_2', 'switch.test_1', 'switch.test_2'],
sorted(group.expand_entity_ids(self.hass,
['group.group_of_groups'])))
def test_set_assumed_state_based_on_tracked(self):
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group',
['light.Bowl', 'light.Ceiling', 'sensor.no_exist'])
state = self.hass.states.get(test_group.entity_id)
self.assertIsNone(state.attributes.get(ATTR_ASSUMED_STATE))
self.hass.states.set('light.Bowl', STATE_ON, {
ATTR_ASSUMED_STATE: True
})
self.hass.pool.block_till_done()
state = self.hass.states.get(test_group.entity_id)
self.assertTrue(state.attributes.get(ATTR_ASSUMED_STATE))
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.pool.block_till_done()
state = self.hass.states.get(test_group.entity_id)
self.assertIsNone(state.attributes.get(ATTR_ASSUMED_STATE))