"""Tests for the Z-Wave init.""" import asyncio from collections import OrderedDict from datetime import datetime from pytz import utc import unittest from unittest.mock import patch, MagicMock from homeassistant.bootstrap import async_setup_component from homeassistant.const import ATTR_ENTITY_ID, EVENT_HOMEASSISTANT_START from homeassistant.components import zwave from homeassistant.components.binary_sensor.zwave import get_device from homeassistant.components.zwave import ( const, CONFIG_SCHEMA, CONF_DEVICE_CONFIG_GLOB, DATA_NETWORK) from homeassistant.setup import setup_component from tests.common import mock_registry import pytest from tests.common import ( get_test_home_assistant, async_fire_time_changed, mock_coro) from tests.mock.zwave import MockNetwork, MockNode, MockValue, MockEntityValues @asyncio.coroutine def test_valid_device_config(hass, mock_openzwave): """Test valid device config.""" device_config = { 'light.kitchen': { 'ignored': 'true' } } result = yield from async_setup_component(hass, 'zwave', { 'zwave': { 'device_config': device_config }}) yield from hass.async_block_till_done() assert result @asyncio.coroutine def test_invalid_device_config(hass, mock_openzwave): """Test invalid device config.""" device_config = { 'light.kitchen': { 'some_ignored': 'true' } } result = yield from async_setup_component(hass, 'zwave', { 'zwave': { 'device_config': device_config }}) yield from hass.async_block_till_done() assert not result def test_config_access_error(): """Test threading error accessing config values.""" node = MagicMock() def side_effect(): raise RuntimeError node.values.values.side_effect = side_effect result = zwave.get_config_value(node, 1) assert result is None @asyncio.coroutine def test_network_options(hass, mock_openzwave): """Test network options.""" result = yield from async_setup_component(hass, 'zwave', { 'zwave': { 'usb_path': 'mock_usb_path', 'config_path': 'mock_config_path', }}) yield from hass.async_block_till_done() assert result network = hass.data[zwave.DATA_NETWORK] assert network.options.device == 'mock_usb_path' assert network.options.config_path == 'mock_config_path' @asyncio.coroutine def test_auto_heal_midnight(hass, mock_openzwave): """Test network auto-heal at midnight.""" yield from async_setup_component(hass, 'zwave', { 'zwave': { 'autoheal': True, }}) yield from hass.async_block_till_done() network = hass.data[zwave.DATA_NETWORK] assert not network.heal.called time = utc.localize(datetime(2017, 5, 6, 0, 0, 0)) async_fire_time_changed(hass, time) yield from hass.async_block_till_done() assert network.heal.called assert len(network.heal.mock_calls) == 1 @asyncio.coroutine def test_auto_heal_disabled(hass, mock_openzwave): """Test network auto-heal disabled.""" yield from async_setup_component(hass, 'zwave', { 'zwave': { 'autoheal': False, }}) yield from hass.async_block_till_done() network = hass.data[zwave.DATA_NETWORK] assert not network.heal.called time = utc.localize(datetime(2017, 5, 6, 0, 0, 0)) async_fire_time_changed(hass, time) yield from hass.async_block_till_done() assert not network.heal.called @asyncio.coroutine def test_setup_platform(hass, mock_openzwave): """Test invalid device config.""" mock_device = MagicMock() hass.data[DATA_NETWORK] = MagicMock() hass.data[zwave.DATA_DEVICES] = {456: mock_device} async_add_entities = MagicMock() result = yield from zwave.async_setup_platform( hass, None, async_add_entities, None) assert not result assert not async_add_entities.called result = yield from zwave.async_setup_platform( hass, None, async_add_entities, {const.DISCOVERY_DEVICE: 123}) assert not result assert not async_add_entities.called result = yield from zwave.async_setup_platform( hass, None, async_add_entities, {const.DISCOVERY_DEVICE: 456}) assert result assert async_add_entities.called assert len(async_add_entities.mock_calls) == 1 assert async_add_entities.mock_calls[0][1][0] == [mock_device] @asyncio.coroutine def test_zwave_ready_wait(hass, mock_openzwave): """Test that zwave continues after waiting for network ready.""" # Initialize zwave yield from async_setup_component(hass, 'zwave', {'zwave': {}}) yield from hass.async_block_till_done() sleeps = [] def utcnow(): return datetime.fromtimestamp(len(sleeps)) asyncio_sleep = asyncio.sleep @asyncio.coroutine def sleep(duration, loop=None): if duration > 0: sleeps.append(duration) yield from asyncio_sleep(0) with patch('homeassistant.components.zwave.dt_util.utcnow', new=utcnow): with patch('asyncio.sleep', new=sleep): with patch.object(zwave, '_LOGGER') as mock_logger: hass.data[DATA_NETWORK].state = MockNetwork.STATE_STARTED hass.bus.async_fire(EVENT_HOMEASSISTANT_START) yield from hass.async_block_till_done() assert len(sleeps) == const.NETWORK_READY_WAIT_SECS assert mock_logger.warning.called assert len(mock_logger.warning.mock_calls) == 1 assert mock_logger.warning.mock_calls[0][1][1] == \ const.NETWORK_READY_WAIT_SECS @asyncio.coroutine def test_device_entity(hass, mock_openzwave): """Test device entity base class.""" node = MockNode(node_id='10', name='Mock Node') value = MockValue(data=False, node=node, instance=2, object_id='11', label='Sensor', command_class=const.COMMAND_CLASS_SENSOR_BINARY) power_value = MockValue(data=50.123456, node=node, precision=3, command_class=const.COMMAND_CLASS_METER) values = MockEntityValues(primary=value, power=power_value) device = zwave.ZWaveDeviceEntity(values, 'zwave') device.hass = hass device.value_added() device.update_properties() yield from hass.async_block_till_done() assert not device.should_poll assert device.unique_id == "10-11" assert device.name == 'Mock Node Sensor' assert device.device_state_attributes[zwave.ATTR_POWER] == 50.123 @asyncio.coroutine def test_node_discovery(hass, mock_openzwave): """Test discovery of a node.""" mock_receivers = [] def mock_connect(receiver, signal, *args, **kwargs): if signal == MockNetwork.SIGNAL_NODE_ADDED: mock_receivers.append(receiver) with patch('pydispatch.dispatcher.connect', new=mock_connect): yield from async_setup_component(hass, 'zwave', {'zwave': {}}) yield from hass.async_block_till_done() assert len(mock_receivers) == 1 node = MockNode(node_id=14) hass.async_add_job(mock_receivers[0], node) yield from hass.async_block_till_done() assert hass.states.get('zwave.mock_node').state is 'unknown' async def test_unparsed_node_discovery(hass, mock_openzwave): """Test discovery of a node.""" mock_receivers = [] def mock_connect(receiver, signal, *args, **kwargs): if signal == MockNetwork.SIGNAL_NODE_ADDED: mock_receivers.append(receiver) with patch('pydispatch.dispatcher.connect', new=mock_connect): await async_setup_component(hass, 'zwave', {'zwave': {}}) await hass.async_block_till_done() assert len(mock_receivers) == 1 node = MockNode( node_id=14, manufacturer_name=None, name=None, is_ready=False) sleeps = [] def utcnow(): return datetime.fromtimestamp(len(sleeps)) asyncio_sleep = asyncio.sleep async def sleep(duration, loop=None): if duration > 0: sleeps.append(duration) await asyncio_sleep(0) with patch('homeassistant.components.zwave.dt_util.utcnow', new=utcnow): with patch('asyncio.sleep', new=sleep): with patch.object(zwave, '_LOGGER') as mock_logger: hass.async_add_job(mock_receivers[0], node) await hass.async_block_till_done() assert len(sleeps) == const.NODE_READY_WAIT_SECS assert mock_logger.warning.called assert len(mock_logger.warning.mock_calls) == 1 assert mock_logger.warning.mock_calls[0][1][1:] == \ (14, const.NODE_READY_WAIT_SECS) assert hass.states.get('zwave.unknown_node_14').state is 'unknown' @asyncio.coroutine def test_node_ignored(hass, mock_openzwave): """Test discovery of a node.""" mock_receivers = [] def mock_connect(receiver, signal, *args, **kwargs): if signal == MockNetwork.SIGNAL_NODE_ADDED: mock_receivers.append(receiver) with patch('pydispatch.dispatcher.connect', new=mock_connect): yield from async_setup_component(hass, 'zwave', {'zwave': { 'device_config': { 'zwave.mock_node': { 'ignored': True, }}}}) yield from hass.async_block_till_done() assert len(mock_receivers) == 1 node = MockNode(node_id=14) hass.async_add_job(mock_receivers[0], node) yield from hass.async_block_till_done() assert hass.states.get('zwave.mock_node') is None @asyncio.coroutine def test_value_discovery(hass, mock_openzwave): """Test discovery of a node.""" mock_receivers = [] def mock_connect(receiver, signal, *args, **kwargs): if signal == MockNetwork.SIGNAL_VALUE_ADDED: mock_receivers.append(receiver) with patch('pydispatch.dispatcher.connect', new=mock_connect): yield from async_setup_component(hass, 'zwave', {'zwave': {}}) yield from hass.async_block_till_done() assert len(mock_receivers) == 1 node = MockNode(node_id=11, generic=const.GENERIC_TYPE_SENSOR_BINARY) value = MockValue(data=False, node=node, index=12, instance=13, command_class=const.COMMAND_CLASS_SENSOR_BINARY, type=const.TYPE_BOOL, genre=const.GENRE_USER) hass.async_add_job(mock_receivers[0], node, value) yield from hass.async_block_till_done() assert hass.states.get( 'binary_sensor.mock_node_mock_value').state is 'off' @asyncio.coroutine def test_value_discovery_existing_entity(hass, mock_openzwave): """Test discovery of a node.""" mock_receivers = [] def mock_connect(receiver, signal, *args, **kwargs): if signal == MockNetwork.SIGNAL_VALUE_ADDED: mock_receivers.append(receiver) with patch('pydispatch.dispatcher.connect', new=mock_connect): yield from async_setup_component(hass, 'zwave', {'zwave': {}}) yield from hass.async_block_till_done() assert len(mock_receivers) == 1 node = MockNode(node_id=11, generic=const.GENERIC_TYPE_THERMOSTAT) setpoint = MockValue( data=22.0, node=node, index=12, instance=13, command_class=const.COMMAND_CLASS_THERMOSTAT_SETPOINT, genre=const.GENRE_USER, units='C') hass.async_add_job(mock_receivers[0], node, setpoint) yield from hass.async_block_till_done() assert hass.states.get('climate.mock_node_mock_value').attributes[ 'temperature'] == 22.0 assert hass.states.get('climate.mock_node_mock_value').attributes[ 'current_temperature'] is None def mock_update(self): self.hass.add_job(self.async_update_ha_state) with patch.object(zwave.node_entity.ZWaveBaseEntity, 'maybe_schedule_update', new=mock_update): temperature = MockValue( data=23.5, node=node, index=1, instance=13, command_class=const.COMMAND_CLASS_SENSOR_MULTILEVEL, genre=const.GENRE_USER, units='C') hass.async_add_job(mock_receivers[0], node, temperature) yield from hass.async_block_till_done() assert hass.states.get('climate.mock_node_mock_value').attributes[ 'temperature'] == 22.0 assert hass.states.get('climate.mock_node_mock_value').attributes[ 'current_temperature'] == 23.5 @asyncio.coroutine def test_power_schemes(hass, mock_openzwave): """Test power attribute.""" mock_receivers = [] def mock_connect(receiver, signal, *args, **kwargs): if signal == MockNetwork.SIGNAL_VALUE_ADDED: mock_receivers.append(receiver) with patch('pydispatch.dispatcher.connect', new=mock_connect): yield from async_setup_component(hass, 'zwave', {'zwave': {}}) yield from hass.async_block_till_done() assert len(mock_receivers) == 1 node = MockNode(node_id=11, generic=const.GENERIC_TYPE_SWITCH_BINARY) switch = MockValue( data=True, node=node, index=12, instance=13, command_class=const.COMMAND_CLASS_SWITCH_BINARY, genre=const.GENRE_USER, type=const.TYPE_BOOL) hass.async_add_job(mock_receivers[0], node, switch) yield from hass.async_block_till_done() assert hass.states.get('switch.mock_node_mock_value').state == 'on' assert 'power_consumption' not in hass.states.get( 'switch.mock_node_mock_value').attributes def mock_update(self): self.hass.add_job(self.async_update_ha_state) with patch.object(zwave.node_entity.ZWaveBaseEntity, 'maybe_schedule_update', new=mock_update): power = MockValue( data=23.5, node=node, index=const.INDEX_SENSOR_MULTILEVEL_POWER, instance=13, command_class=const.COMMAND_CLASS_SENSOR_MULTILEVEL) hass.async_add_job(mock_receivers[0], node, power) yield from hass.async_block_till_done() assert hass.states.get('switch.mock_node_mock_value').attributes[ 'power_consumption'] == 23.5 @asyncio.coroutine def test_network_ready(hass, mock_openzwave): """Test Node network ready event.""" mock_receivers = [] def mock_connect(receiver, signal, *args, **kwargs): if signal == MockNetwork.SIGNAL_ALL_NODES_QUERIED: mock_receivers.append(receiver) with patch('pydispatch.dispatcher.connect', new=mock_connect): yield from async_setup_component(hass, 'zwave', {'zwave': {}}) yield from hass.async_block_till_done() assert len(mock_receivers) == 1 events = [] def listener(event): events.append(event) hass.bus.async_listen(const.EVENT_NETWORK_COMPLETE, listener) hass.async_add_job(mock_receivers[0]) yield from hass.async_block_till_done() assert len(events) == 1 @asyncio.coroutine def test_network_complete(hass, mock_openzwave): """Test Node network complete event.""" mock_receivers = [] def mock_connect(receiver, signal, *args, **kwargs): if signal == MockNetwork.SIGNAL_AWAKE_NODES_QUERIED: mock_receivers.append(receiver) with patch('pydispatch.dispatcher.connect', new=mock_connect): yield from async_setup_component(hass, 'zwave', {'zwave': {}}) yield from hass.async_block_till_done() assert len(mock_receivers) == 1 events = [] def listener(event): events.append(event) hass.bus.async_listen(const.EVENT_NETWORK_READY, listener) hass.async_add_job(mock_receivers[0]) yield from hass.async_block_till_done() assert len(events) == 1 @asyncio.coroutine def test_network_complete_some_dead(hass, mock_openzwave): """Test Node network complete some dead event.""" mock_receivers = [] def mock_connect(receiver, signal, *args, **kwargs): if signal == MockNetwork.SIGNAL_ALL_NODES_QUERIED_SOME_DEAD: mock_receivers.append(receiver) with patch('pydispatch.dispatcher.connect', new=mock_connect): yield from async_setup_component(hass, 'zwave', {'zwave': {}}) yield from hass.async_block_till_done() assert len(mock_receivers) == 1 events = [] def listener(event): events.append(event) hass.bus.async_listen(const.EVENT_NETWORK_COMPLETE_SOME_DEAD, listener) hass.async_add_job(mock_receivers[0]) yield from hass.async_block_till_done() assert len(events) == 1 class TestZWaveDeviceEntityValues(unittest.TestCase): """Tests for the ZWaveDeviceEntityValues helper.""" @pytest.fixture(autouse=True) def set_mock_openzwave(self, mock_openzwave): """Use the mock_openzwave fixture for this class.""" self.mock_openzwave = mock_openzwave def setUp(self): """Initialize values for this testcase class.""" self.hass = get_test_home_assistant() self.hass.start() self.registry = mock_registry(self.hass) setup_component(self.hass, 'zwave', {'zwave': {}}) self.hass.block_till_done() self.node = MockNode() self.mock_schema = { const.DISC_COMPONENT: 'mock_component', const.DISC_VALUES: { const.DISC_PRIMARY: { const.DISC_COMMAND_CLASS: ['mock_primary_class'], }, 'secondary': { const.DISC_COMMAND_CLASS: ['mock_secondary_class'], }, 'optional': { const.DISC_COMMAND_CLASS: ['mock_optional_class'], const.DISC_OPTIONAL: True, }}} self.primary = MockValue( command_class='mock_primary_class', node=self.node, value_id=1000) self.secondary = MockValue( command_class='mock_secondary_class', node=self.node) self.duplicate_secondary = MockValue( command_class='mock_secondary_class', node=self.node) self.optional = MockValue( command_class='mock_optional_class', node=self.node) self.no_match_value = MockValue( command_class='mock_bad_class', node=self.node) self.entity_id = 'mock_component.mock_node_mock_value' self.zwave_config = {'zwave': {}} self.device_config = {self.entity_id: {}} def tearDown(self): # pylint: disable=invalid-name """Stop everything that was started.""" self.hass.stop() @patch.object(zwave, 'get_platform') @patch.object(zwave, 'discovery') def test_entity_discovery(self, discovery, get_platform): """Test the creation of a new entity.""" discovery.async_load_platform.return_value = mock_coro() mock_platform = MagicMock() get_platform.return_value = mock_platform mock_device = MagicMock() mock_device.name = 'test_device' mock_platform.get_device.return_value = mock_device values = zwave.ZWaveDeviceEntityValues( hass=self.hass, schema=self.mock_schema, primary_value=self.primary, zwave_config=self.zwave_config, device_config=self.device_config, registry=self.registry ) assert values.primary is self.primary assert len(list(values)) == 3 assert sorted(list(values), key=lambda a: id(a)) == \ sorted([self.primary, None, None], key=lambda a: id(a)) assert not discovery.async_load_platform.called values.check_value(self.secondary) self.hass.block_till_done() assert values.secondary is self.secondary assert len(list(values)) == 3 assert sorted(list(values), key=lambda a: id(a)) == \ sorted([self.primary, self.secondary, None], key=lambda a: id(a)) assert discovery.async_load_platform.called assert len(discovery.async_load_platform.mock_calls) == 1 args = discovery.async_load_platform.mock_calls[0][1] assert args[0] == self.hass assert args[1] == 'mock_component' assert args[2] == 'zwave' assert args[3] == {const.DISCOVERY_DEVICE: mock_device.unique_id} assert args[4] == self.zwave_config discovery.async_load_platform.reset_mock() values.check_value(self.optional) values.check_value(self.duplicate_secondary) values.check_value(self.no_match_value) self.hass.block_till_done() assert values.optional is self.optional assert len(list(values)) == 3 assert sorted(list(values), key=lambda a: id(a)) == \ sorted([self.primary, self.secondary, self.optional], key=lambda a: id(a)) assert not discovery.async_load_platform.called assert values._entity.value_added.called assert len(values._entity.value_added.mock_calls) == 1 assert values._entity.value_changed.called assert len(values._entity.value_changed.mock_calls) == 1 @patch.object(zwave, 'get_platform') @patch.object(zwave, 'discovery') def test_entity_existing_values(self, discovery, get_platform): """Test the loading of already discovered values.""" discovery.async_load_platform.return_value = mock_coro() mock_platform = MagicMock() get_platform.return_value = mock_platform mock_device = MagicMock() mock_device.name = 'test_device' mock_platform.get_device.return_value = mock_device self.node.values = { self.primary.value_id: self.primary, self.secondary.value_id: self.secondary, self.optional.value_id: self.optional, self.no_match_value.value_id: self.no_match_value, } values = zwave.ZWaveDeviceEntityValues( hass=self.hass, schema=self.mock_schema, primary_value=self.primary, zwave_config=self.zwave_config, device_config=self.device_config, registry=self.registry ) self.hass.block_till_done() assert values.primary is self.primary assert values.secondary is self.secondary assert values.optional is self.optional assert len(list(values)) == 3 assert sorted(list(values), key=lambda a: id(a)) == \ sorted([self.primary, self.secondary, self.optional], key=lambda a: id(a)) assert discovery.async_load_platform.called assert len(discovery.async_load_platform.mock_calls) == 1 args = discovery.async_load_platform.mock_calls[0][1] assert args[0] == self.hass assert args[1] == 'mock_component' assert args[2] == 'zwave' assert args[3] == {const.DISCOVERY_DEVICE: mock_device.unique_id} assert args[4] == self.zwave_config assert not self.primary.enable_poll.called @patch.object(zwave, 'get_platform') @patch.object(zwave, 'discovery') def test_node_schema_mismatch(self, discovery, get_platform): """Test node schema mismatch.""" self.node.generic = 'no_match' self.node.values = { self.primary.value_id: self.primary, self.secondary.value_id: self.secondary, } self.mock_schema[const.DISC_GENERIC_DEVICE_CLASS] = ['generic_match'] values = zwave.ZWaveDeviceEntityValues( hass=self.hass, schema=self.mock_schema, primary_value=self.primary, zwave_config=self.zwave_config, device_config=self.device_config, registry=self.registry ) values._check_entity_ready() self.hass.block_till_done() assert not discovery.async_load_platform.called @patch.object(zwave, 'get_platform') @patch.object(zwave, 'discovery') def test_entity_workaround_component(self, discovery, get_platform): """Test component workaround.""" discovery.async_load_platform.return_value = mock_coro() mock_platform = MagicMock() get_platform.return_value = mock_platform mock_device = MagicMock() mock_device.name = 'test_device' mock_platform.get_device.return_value = mock_device self.node.manufacturer_id = '010f' self.node.product_type = '0b00' self.primary.command_class = const.COMMAND_CLASS_SENSOR_ALARM self.entity_id = 'binary_sensor.mock_node_mock_value' self.device_config = {self.entity_id: {}} self.mock_schema = { const.DISC_COMPONENT: 'mock_component', const.DISC_VALUES: { const.DISC_PRIMARY: { const.DISC_COMMAND_CLASS: [ const.COMMAND_CLASS_SWITCH_BINARY], }}} with patch.object(zwave, 'async_dispatcher_send') as \ mock_dispatch_send: values = zwave.ZWaveDeviceEntityValues( hass=self.hass, schema=self.mock_schema, primary_value=self.primary, zwave_config=self.zwave_config, device_config=self.device_config, registry=self.registry ) values._check_entity_ready() self.hass.block_till_done() assert mock_dispatch_send.called assert len(mock_dispatch_send.mock_calls) == 1 args = mock_dispatch_send.mock_calls[0][1] assert args[1] == 'zwave_new_binary_sensor' @patch.object(zwave, 'get_platform') @patch.object(zwave, 'discovery') def test_entity_workaround_ignore(self, discovery, get_platform): """Test ignore workaround.""" self.node.manufacturer_id = '010f' self.node.product_type = '0301' self.primary.command_class = const.COMMAND_CLASS_SWITCH_BINARY self.mock_schema = { const.DISC_COMPONENT: 'mock_component', const.DISC_VALUES: { const.DISC_PRIMARY: { const.DISC_COMMAND_CLASS: [ const.COMMAND_CLASS_SWITCH_BINARY], }}} values = zwave.ZWaveDeviceEntityValues( hass=self.hass, schema=self.mock_schema, primary_value=self.primary, zwave_config=self.zwave_config, device_config=self.device_config, registry=self.registry ) values._check_entity_ready() self.hass.block_till_done() assert not discovery.async_load_platform.called @patch.object(zwave, 'get_platform') @patch.object(zwave, 'discovery') def test_entity_config_ignore(self, discovery, get_platform): """Test ignore config.""" self.node.values = { self.primary.value_id: self.primary, self.secondary.value_id: self.secondary, } self.device_config = {self.entity_id: { zwave.CONF_IGNORED: True }} values = zwave.ZWaveDeviceEntityValues( hass=self.hass, schema=self.mock_schema, primary_value=self.primary, zwave_config=self.zwave_config, device_config=self.device_config, registry=self.registry ) values._check_entity_ready() self.hass.block_till_done() assert not discovery.async_load_platform.called @patch.object(zwave, 'get_platform') @patch.object(zwave, 'discovery') def test_entity_config_ignore_with_registry(self, discovery, get_platform): """Test ignore config. The case when the device is in entity registry. """ self.node.values = { self.primary.value_id: self.primary, self.secondary.value_id: self.secondary, } self.device_config = {'mock_component.registry_id': { zwave.CONF_IGNORED: True }} with patch.object(self.registry, 'async_schedule_save'): self.registry.async_get_or_create( 'mock_component', zwave.DOMAIN, '567-1000', suggested_object_id='registry_id') zwave.ZWaveDeviceEntityValues( hass=self.hass, schema=self.mock_schema, primary_value=self.primary, zwave_config=self.zwave_config, device_config=self.device_config, registry=self.registry ) self.hass.block_till_done() assert not discovery.async_load_platform.called @patch.object(zwave, 'get_platform') @patch.object(zwave, 'discovery') def test_entity_platform_ignore(self, discovery, get_platform): """Test platform ignore device.""" self.node.values = { self.primary.value_id: self.primary, self.secondary.value_id: self.secondary, } platform = MagicMock() get_platform.return_value = platform platform.get_device.return_value = None zwave.ZWaveDeviceEntityValues( hass=self.hass, schema=self.mock_schema, primary_value=self.primary, zwave_config=self.zwave_config, device_config=self.device_config, registry=self.registry ) self.hass.block_till_done() assert not discovery.async_load_platform.called @patch.object(zwave, 'get_platform') @patch.object(zwave, 'discovery') def test_config_polling_intensity(self, discovery, get_platform): """Test polling intensity.""" mock_platform = MagicMock() get_platform.return_value = mock_platform mock_device = MagicMock() mock_device.name = 'test_device' mock_platform.get_device.return_value = mock_device self.node.values = { self.primary.value_id: self.primary, self.secondary.value_id: self.secondary, } self.device_config = {self.entity_id: { zwave.CONF_POLLING_INTENSITY: 123, }} values = zwave.ZWaveDeviceEntityValues( hass=self.hass, schema=self.mock_schema, primary_value=self.primary, zwave_config=self.zwave_config, device_config=self.device_config, registry=self.registry ) values._check_entity_ready() self.hass.block_till_done() assert discovery.async_load_platform.called assert self.primary.enable_poll.called assert len(self.primary.enable_poll.mock_calls) == 1 assert self.primary.enable_poll.mock_calls[0][1][0] == 123 class TestZwave(unittest.TestCase): """Test zwave init.""" def test_device_config_glob_is_ordered(self): """Test that device_config_glob preserves order.""" conf = CONFIG_SCHEMA( {'zwave': {CONF_DEVICE_CONFIG_GLOB: OrderedDict()}}) assert isinstance(conf['zwave'][CONF_DEVICE_CONFIG_GLOB], OrderedDict) class TestZWaveServices(unittest.TestCase): """Tests for zwave services.""" @pytest.fixture(autouse=True) def set_mock_openzwave(self, mock_openzwave): """Use the mock_openzwave fixture for this class.""" self.mock_openzwave = mock_openzwave def setUp(self): """Initialize values for this testcase class.""" self.hass = get_test_home_assistant() self.hass.start() # Initialize zwave setup_component(self.hass, 'zwave', {'zwave': {}}) self.hass.block_till_done() self.zwave_network = self.hass.data[DATA_NETWORK] self.zwave_network.state = MockNetwork.STATE_READY self.hass.bus.fire(EVENT_HOMEASSISTANT_START) self.hass.block_till_done() def tearDown(self): # pylint: disable=invalid-name """Stop everything that was started.""" self.hass.services.call('zwave', 'stop_network', {}) self.hass.block_till_done() self.hass.stop() def test_add_node(self): """Test zwave add_node service.""" self.hass.services.call('zwave', 'add_node', {}) self.hass.block_till_done() assert self.zwave_network.controller.add_node.called assert len(self.zwave_network.controller .add_node.mock_calls) == 1 assert len(self.zwave_network.controller .add_node.mock_calls[0][1]) == 0 def test_add_node_secure(self): """Test zwave add_node_secure service.""" self.hass.services.call('zwave', 'add_node_secure', {}) self.hass.block_till_done() assert self.zwave_network.controller.add_node.called assert len(self.zwave_network.controller.add_node.mock_calls) == 1 assert (self.zwave_network.controller .add_node.mock_calls[0][1][0] is True) def test_remove_node(self): """Test zwave remove_node service.""" self.hass.services.call('zwave', 'remove_node', {}) self.hass.block_till_done() assert self.zwave_network.controller.remove_node.called assert len(self.zwave_network.controller.remove_node.mock_calls) == 1 def test_cancel_command(self): """Test zwave cancel_command service.""" self.hass.services.call('zwave', 'cancel_command', {}) self.hass.block_till_done() assert self.zwave_network.controller.cancel_command.called assert len(self.zwave_network.controller .cancel_command.mock_calls) == 1 def test_heal_network(self): """Test zwave heal_network service.""" self.hass.services.call('zwave', 'heal_network', {}) self.hass.block_till_done() assert self.zwave_network.heal.called assert len(self.zwave_network.heal.mock_calls) == 1 def test_soft_reset(self): """Test zwave soft_reset service.""" self.hass.services.call('zwave', 'soft_reset', {}) self.hass.block_till_done() assert self.zwave_network.controller.soft_reset.called assert len(self.zwave_network.controller.soft_reset.mock_calls) == 1 def test_test_network(self): """Test zwave test_network service.""" self.hass.services.call('zwave', 'test_network', {}) self.hass.block_till_done() assert self.zwave_network.test.called assert len(self.zwave_network.test.mock_calls) == 1 def test_stop_network(self): """Test zwave stop_network service.""" with patch.object(self.hass.bus, 'fire') as mock_fire: self.hass.services.call('zwave', 'stop_network', {}) self.hass.block_till_done() assert self.zwave_network.stop.called assert len(self.zwave_network.stop.mock_calls) == 1 assert mock_fire.called assert len(mock_fire.mock_calls) == 2 assert mock_fire.mock_calls[0][1][0] == const.EVENT_NETWORK_STOP def test_rename_node(self): """Test zwave rename_node service.""" self.zwave_network.nodes = {11: MagicMock()} self.hass.services.call('zwave', 'rename_node', { const.ATTR_NODE_ID: 11, const.ATTR_NAME: 'test_name', }) self.hass.block_till_done() assert self.zwave_network.nodes[11].name == 'test_name' def test_rename_value(self): """Test zwave rename_value service.""" node = MockNode(node_id=14) value = MockValue(index=12, value_id=123456, label="Old Label") node.values = {123456: value} self.zwave_network.nodes = {11: node} assert value.label == "Old Label" self.hass.services.call('zwave', 'rename_value', { const.ATTR_NODE_ID: 11, const.ATTR_VALUE_ID: 123456, const.ATTR_NAME: "New Label", }) self.hass.block_till_done() assert value.label == "New Label" def test_set_poll_intensity_enable(self): """Test zwave set_poll_intensity service, successful set.""" node = MockNode(node_id=14) value = MockValue(index=12, value_id=123456, poll_intensity=0) node.values = {123456: value} self.zwave_network.nodes = {11: node} assert value.poll_intensity == 0 self.hass.services.call('zwave', 'set_poll_intensity', { const.ATTR_NODE_ID: 11, const.ATTR_VALUE_ID: 123456, const.ATTR_POLL_INTENSITY: 4, }) self.hass.block_till_done() enable_poll = value.enable_poll assert value.enable_poll.called assert len(enable_poll.mock_calls) == 2 assert enable_poll.mock_calls[0][1][0] == 4 def test_set_poll_intensity_enable_failed(self): """Test zwave set_poll_intensity service, failed set.""" node = MockNode(node_id=14) value = MockValue(index=12, value_id=123456, poll_intensity=0) value.enable_poll.return_value = False node.values = {123456: value} self.zwave_network.nodes = {11: node} assert value.poll_intensity == 0 self.hass.services.call('zwave', 'set_poll_intensity', { const.ATTR_NODE_ID: 11, const.ATTR_VALUE_ID: 123456, const.ATTR_POLL_INTENSITY: 4, }) self.hass.block_till_done() enable_poll = value.enable_poll assert value.enable_poll.called assert len(enable_poll.mock_calls) == 1 def test_set_poll_intensity_disable(self): """Test zwave set_poll_intensity service, successful disable.""" node = MockNode(node_id=14) value = MockValue(index=12, value_id=123456, poll_intensity=4) node.values = {123456: value} self.zwave_network.nodes = {11: node} assert value.poll_intensity == 4 self.hass.services.call('zwave', 'set_poll_intensity', { const.ATTR_NODE_ID: 11, const.ATTR_VALUE_ID: 123456, const.ATTR_POLL_INTENSITY: 0, }) self.hass.block_till_done() disable_poll = value.disable_poll assert value.disable_poll.called assert len(disable_poll.mock_calls) == 2 def test_set_poll_intensity_disable_failed(self): """Test zwave set_poll_intensity service, failed disable.""" node = MockNode(node_id=14) value = MockValue(index=12, value_id=123456, poll_intensity=4) value.disable_poll.return_value = False node.values = {123456: value} self.zwave_network.nodes = {11: node} assert value.poll_intensity == 4 self.hass.services.call('zwave', 'set_poll_intensity', { const.ATTR_NODE_ID: 11, const.ATTR_VALUE_ID: 123456, const.ATTR_POLL_INTENSITY: 0, }) self.hass.block_till_done() disable_poll = value.disable_poll assert value.disable_poll.called assert len(disable_poll.mock_calls) == 1 def test_remove_failed_node(self): """Test zwave remove_failed_node service.""" self.hass.services.call('zwave', 'remove_failed_node', { const.ATTR_NODE_ID: 12, }) self.hass.block_till_done() remove_failed_node = self.zwave_network.controller.remove_failed_node assert remove_failed_node.called assert len(remove_failed_node.mock_calls) == 1 assert remove_failed_node.mock_calls[0][1][0] == 12 def test_replace_failed_node(self): """Test zwave replace_failed_node service.""" self.hass.services.call('zwave', 'replace_failed_node', { const.ATTR_NODE_ID: 13, }) self.hass.block_till_done() replace_failed_node = self.zwave_network.controller.replace_failed_node assert replace_failed_node.called assert len(replace_failed_node.mock_calls) == 1 assert replace_failed_node.mock_calls[0][1][0] == 13 def test_set_config_parameter(self): """Test zwave set_config_parameter service.""" value = MockValue( index=12, command_class=const.COMMAND_CLASS_CONFIGURATION, type=const.TYPE_BYTE, ) value_list = MockValue( index=13, command_class=const.COMMAND_CLASS_CONFIGURATION, type=const.TYPE_LIST, data_items=['item1', 'item2', 'item3'], ) value_list_int = MockValue( index=15, command_class=const.COMMAND_CLASS_CONFIGURATION, type=const.TYPE_LIST, data_items=['1', '2', '3'], ) value_button = MockValue( index=14, command_class=const.COMMAND_CLASS_CONFIGURATION, type=const.TYPE_BUTTON, ) node = MockNode(node_id=14) node.get_values.return_value = {12: value, 13: value_list, 14: value_button, 15: value_list_int} self.zwave_network.nodes = {14: node} self.hass.services.call('zwave', 'set_config_parameter', { const.ATTR_NODE_ID: 14, const.ATTR_CONFIG_PARAMETER: 13, const.ATTR_CONFIG_VALUE: 'item3', }) self.hass.block_till_done() assert value_list.data == 'item3' self.hass.services.call('zwave', 'set_config_parameter', { const.ATTR_NODE_ID: 14, const.ATTR_CONFIG_PARAMETER: 15, const.ATTR_CONFIG_VALUE: 3, }) self.hass.block_till_done() assert value_list_int.data == '3' self.hass.services.call('zwave', 'set_config_parameter', { const.ATTR_NODE_ID: 14, const.ATTR_CONFIG_PARAMETER: 12, const.ATTR_CONFIG_VALUE: 7, }) self.hass.block_till_done() assert value.data == 7 self.hass.services.call('zwave', 'set_config_parameter', { const.ATTR_NODE_ID: 14, const.ATTR_CONFIG_PARAMETER: 14, const.ATTR_CONFIG_VALUE: True, }) self.hass.block_till_done() assert self.zwave_network.manager.pressButton.called assert self.zwave_network.manager.releaseButton.called self.hass.services.call('zwave', 'set_config_parameter', { const.ATTR_NODE_ID: 14, const.ATTR_CONFIG_PARAMETER: 19, const.ATTR_CONFIG_VALUE: 0x01020304, const.ATTR_CONFIG_SIZE: 4 }) self.hass.block_till_done() assert node.set_config_param.called assert len(node.set_config_param.mock_calls) == 1 assert node.set_config_param.mock_calls[0][1][0] == 19 assert node.set_config_param.mock_calls[0][1][1] == 0x01020304 assert node.set_config_param.mock_calls[0][1][2] == 4 node.set_config_param.reset_mock() def test_print_config_parameter(self): """Test zwave print_config_parameter service.""" value1 = MockValue( index=12, command_class=const.COMMAND_CLASS_CONFIGURATION, data=1234, ) value2 = MockValue( index=13, command_class=const.COMMAND_CLASS_CONFIGURATION, data=2345, ) node = MockNode(node_id=14) node.values = {12: value1, 13: value2} self.zwave_network.nodes = {14: node} with patch.object(zwave, '_LOGGER') as mock_logger: self.hass.services.call('zwave', 'print_config_parameter', { const.ATTR_NODE_ID: 14, const.ATTR_CONFIG_PARAMETER: 13, }) self.hass.block_till_done() assert mock_logger.info.called assert len(mock_logger.info.mock_calls) == 1 assert mock_logger.info.mock_calls[0][1][1] == 13 assert mock_logger.info.mock_calls[0][1][2] == 14 assert mock_logger.info.mock_calls[0][1][3] == 2345 def test_print_node(self): """Test zwave print_node_parameter service.""" node = MockNode(node_id=14) self.zwave_network.nodes = {14: node} with self.assertLogs(level='DEBUG') as mock_logger: self.hass.services.call('zwave', 'print_node', { const.ATTR_NODE_ID: 14 }) self.hass.block_till_done() assert "FOUND NODE " in mock_logger.output[1] def test_set_wakeup(self): """Test zwave set_wakeup service.""" value = MockValue( index=12, command_class=const.COMMAND_CLASS_WAKE_UP, ) node = MockNode(node_id=14) node.values = {12: value} node.get_values.return_value = node.values self.zwave_network.nodes = {14: node} self.hass.services.call('zwave', 'set_wakeup', { const.ATTR_NODE_ID: 14, const.ATTR_CONFIG_VALUE: 15, }) self.hass.block_till_done() assert value.data == 15 node.can_wake_up_value = False self.hass.services.call('zwave', 'set_wakeup', { const.ATTR_NODE_ID: 14, const.ATTR_CONFIG_VALUE: 20, }) self.hass.block_till_done() assert value.data == 15 def test_reset_node_meters(self): """Test zwave reset_node_meters service.""" value = MockValue( instance=1, index=8, data=99.5, command_class=const.COMMAND_CLASS_METER, ) reset_value = MockValue( instance=1, index=33, command_class=const.COMMAND_CLASS_METER, ) node = MockNode(node_id=14) node.values = {8: value, 33: reset_value} node.get_values.return_value = node.values self.zwave_network.nodes = {14: node} self.hass.services.call('zwave', 'reset_node_meters', { const.ATTR_NODE_ID: 14, const.ATTR_INSTANCE: 2, }) self.hass.block_till_done() assert not self.zwave_network.manager.pressButton.called assert not self.zwave_network.manager.releaseButton.called self.hass.services.call('zwave', 'reset_node_meters', { const.ATTR_NODE_ID: 14, }) self.hass.block_till_done() assert self.zwave_network.manager.pressButton.called value_id, = self.zwave_network.manager.pressButton.mock_calls.pop(0)[1] assert value_id == reset_value.value_id assert self.zwave_network.manager.releaseButton.called value_id, = ( self.zwave_network.manager.releaseButton.mock_calls.pop(0)[1]) assert value_id == reset_value.value_id def test_add_association(self): """Test zwave change_association service.""" ZWaveGroup = self.mock_openzwave.group.ZWaveGroup group = MagicMock() ZWaveGroup.return_value = group value = MockValue( index=12, command_class=const.COMMAND_CLASS_WAKE_UP, ) node = MockNode(node_id=14) node.values = {12: value} node.get_values.return_value = node.values self.zwave_network.nodes = {14: node} self.hass.services.call('zwave', 'change_association', { const.ATTR_ASSOCIATION: 'add', const.ATTR_NODE_ID: 14, const.ATTR_TARGET_NODE_ID: 24, const.ATTR_GROUP: 3, const.ATTR_INSTANCE: 5, }) self.hass.block_till_done() assert ZWaveGroup.called assert len(ZWaveGroup.mock_calls) == 2 assert ZWaveGroup.mock_calls[0][1][0] == 3 assert ZWaveGroup.mock_calls[0][1][2] == 14 assert group.add_association.called assert len(group.add_association.mock_calls) == 1 assert group.add_association.mock_calls[0][1][0] == 24 assert group.add_association.mock_calls[0][1][1] == 5 def test_remove_association(self): """Test zwave change_association service.""" ZWaveGroup = self.mock_openzwave.group.ZWaveGroup group = MagicMock() ZWaveGroup.return_value = group value = MockValue( index=12, command_class=const.COMMAND_CLASS_WAKE_UP, ) node = MockNode(node_id=14) node.values = {12: value} node.get_values.return_value = node.values self.zwave_network.nodes = {14: node} self.hass.services.call('zwave', 'change_association', { const.ATTR_ASSOCIATION: 'remove', const.ATTR_NODE_ID: 14, const.ATTR_TARGET_NODE_ID: 24, const.ATTR_GROUP: 3, const.ATTR_INSTANCE: 5, }) self.hass.block_till_done() assert ZWaveGroup.called assert len(ZWaveGroup.mock_calls) == 2 assert ZWaveGroup.mock_calls[0][1][0] == 3 assert ZWaveGroup.mock_calls[0][1][2] == 14 assert group.remove_association.called assert len(group.remove_association.mock_calls) == 1 assert group.remove_association.mock_calls[0][1][0] == 24 assert group.remove_association.mock_calls[0][1][1] == 5 def test_refresh_entity(self): """Test zwave refresh_entity service.""" node = MockNode() value = MockValue(data=False, node=node, command_class=const.COMMAND_CLASS_SENSOR_BINARY) power_value = MockValue(data=50, node=node, command_class=const.COMMAND_CLASS_METER) values = MockEntityValues(primary=value, power=power_value) device = get_device(node=node, values=values, node_config={}) device.hass = self.hass device.entity_id = 'binary_sensor.mock_entity_id' self.hass.add_job(device.async_added_to_hass()) self.hass.block_till_done() self.hass.services.call('zwave', 'refresh_entity', { ATTR_ENTITY_ID: 'binary_sensor.mock_entity_id', }) self.hass.block_till_done() assert node.refresh_value.called assert len(node.refresh_value.mock_calls) == 2 assert sorted([node.refresh_value.mock_calls[0][1][0], node.refresh_value.mock_calls[1][1][0]]) == \ sorted([value.value_id, power_value.value_id]) def test_refresh_node(self): """Test zwave refresh_node service.""" node = MockNode(node_id=14) self.zwave_network.nodes = {14: node} self.hass.services.call('zwave', 'refresh_node', { const.ATTR_NODE_ID: 14, }) self.hass.block_till_done() assert node.refresh_info.called assert len(node.refresh_info.mock_calls) == 1 def test_set_node_value(self): """Test zwave set_node_value service.""" value = MockValue( index=12, command_class=const.COMMAND_CLASS_INDICATOR, data=4 ) node = MockNode(node_id=14, command_classes=[const.COMMAND_CLASS_INDICATOR]) node.values = {12: value} node.get_values.return_value = node.values self.zwave_network.nodes = {14: node} self.hass.services.call('zwave', 'set_node_value', { const.ATTR_NODE_ID: 14, const.ATTR_VALUE_ID: 12, const.ATTR_CONFIG_VALUE: 2, }) self.hass.block_till_done() assert self.zwave_network.nodes[14].values[12].data == 2 def test_refresh_node_value(self): """Test zwave refresh_node_value service.""" node = MockNode(node_id=14, command_classes=[const.COMMAND_CLASS_INDICATOR], network=self.zwave_network) value = MockValue( node=node, index=12, command_class=const.COMMAND_CLASS_INDICATOR, data=2 ) value.refresh = MagicMock() node.values = {12: value} node.get_values.return_value = node.values self.zwave_network.nodes = {14: node} self.hass.services.call('zwave', 'refresh_node_value', { const.ATTR_NODE_ID: 14, const.ATTR_VALUE_ID: 12 }) self.hass.block_till_done() assert value.refresh.called def test_heal_node(self): """Test zwave heal_node service.""" node = MockNode(node_id=19) self.zwave_network.nodes = {19: node} self.hass.services.call('zwave', 'heal_node', { const.ATTR_NODE_ID: 19, }) self.hass.block_till_done() assert node.heal.called assert len(node.heal.mock_calls) == 1 def test_test_node(self): """Test the zwave test_node service.""" node = MockNode(node_id=19) self.zwave_network.nodes = {19: node} self.hass.services.call('zwave', 'test_node', { const.ATTR_NODE_ID: 19, }) self.hass.block_till_done() assert node.test.called assert len(node.test.mock_calls) == 1