ZHA tests refactoring ()

* Refactor ZHA fixtures.
Patch Zigpy radio libs instead of ZHA when setting up fixtures.
Use new fixtures for binary_sensor.zha platform.

* Update ZHA api tests.
* Update ZHA channels and discovery tests.
* Update ZHA cover tests.
* Update device action/trigger tests.
* Update device_tracker.zha platform tests.
* Update fan.zha platform tests.
* Update ZHA gateway tests.
* Update lock.zha platform tests.
* Update switch.zha platform tests.
* Update sensor.zha platform tests.
* Update light.zha platform tests.
* Use MockConfigEntry.
* Address PR comments.
pull/31772/head
Alexei Chetroi 2020-02-12 16:12:14 -05:00 committed by GitHub
parent 43256ebd83
commit 52fe1328f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 187 additions and 252 deletions

View File

@ -10,27 +10,10 @@ import zigpy.zcl.clusters.general
import zigpy.zcl.foundation as zcl_f
import zigpy.zdo.types
from homeassistant.components.zha.core.const import (
DATA_ZHA,
DATA_ZHA_BRIDGE_ID,
DATA_ZHA_CONFIG,
DATA_ZHA_DISPATCHERS,
)
import homeassistant.components.zha.core.const as zha_const
from homeassistant.util import slugify
class FakeApplication:
"""Fake application for mocking zigpy."""
def __init__(self):
"""Init fake application."""
self.ieee = zigpy.types.EUI64.convert("00:15:8d:00:02:32:4f:32")
self.nwk = 0x087D
APPLICATION = FakeApplication()
class FakeEndpoint:
"""Fake endpoint for moking zigpy."""
@ -71,14 +54,15 @@ def patch_cluster(cluster):
cluster.read_attributes = CoroutineMock()
cluster.read_attributes_raw = Mock()
cluster.unbind = CoroutineMock(return_value=[0])
cluster.write_attributes = CoroutineMock(return_value=[0])
class FakeDevice:
"""Fake device for mocking zigpy."""
def __init__(self, ieee, manufacturer, model, node_desc=None):
def __init__(self, app, ieee, manufacturer, model, node_desc=None):
"""Init fake device."""
self._application = APPLICATION
self._application = app
self.ieee = zigpy.types.EUI64.convert(ieee)
self.nwk = 0xB79C
self.zdo = Mock()
@ -98,6 +82,14 @@ class FakeDevice:
self.node_desc = zigpy.zdo.types.NodeDescriptor.deserialize(node_desc)[0]
def get_zha_gateway(hass):
"""Return ZHA gateway from hass.data."""
try:
return hass.data[zha_const.DATA_ZHA][zha_const.DATA_ZHA_GATEWAY]
except KeyError:
return None
def make_attribute(attrid, value, status=0):
"""Make an attribute."""
attr = zcl_f.Attribute()
@ -107,14 +99,6 @@ def make_attribute(attrid, value, status=0):
return attr
async def async_setup_entry(hass, config_entry):
"""Mock setup entry for zha."""
hass.data[DATA_ZHA][DATA_ZHA_CONFIG] = {}
hass.data[DATA_ZHA][DATA_ZHA_DISPATCHERS] = []
hass.data[DATA_ZHA][DATA_ZHA_BRIDGE_ID] = APPLICATION.ieee
return True
async def find_entity_id(domain, zha_device, hass):
"""Find the entity id under the testing.
@ -133,7 +117,7 @@ async def find_entity_id(domain, zha_device, hass):
return None
async def async_enable_traffic(hass, zha_gateway, zha_devices):
async def async_enable_traffic(hass, zha_devices):
"""Allow traffic to flow through the gateway and the zha device."""
for zha_device in zha_devices:
zha_device.update_available(True)
@ -147,3 +131,25 @@ def make_zcl_header(command_id: int, global_command: bool = True) -> zcl_f.ZCLHe
else:
frc = zcl_f.FrameControl(zcl_f.FrameType.CLUSTER_COMMAND)
return zcl_f.ZCLHeader(frc, tsn=1, command_id=command_id)
def reset_clusters(clusters):
"""Reset mocks on cluster."""
for cluster in clusters:
cluster.bind.reset_mock()
cluster.configure_reporting.reset_mock()
cluster.write_attributes.reset_mock()
async def async_test_rejoin(hass, zigpy_device, clusters, report_counts, ep_id=1):
"""Test device rejoins."""
reset_clusters(clusters)
zha_gateway = get_zha_gateway(hass)
await zha_gateway.async_device_initialized(zigpy_device)
await hass.async_block_till_done()
for cluster, reports in zip(clusters, report_counts):
assert cluster.bind.call_count == 1
assert cluster.bind.await_count == 1
assert cluster.configure_reporting.call_count == reports
assert cluster.configure_reporting.await_count == reports

View File

@ -1,19 +1,18 @@
"""Test configuration for the ZHA component."""
import functools
from unittest import mock
from unittest.mock import patch
import asynctest
import pytest
import zigpy
from zigpy.application import ControllerApplication
import zigpy.group
import zigpy.types
from homeassistant.components.zha.core.const import COMPONENTS, DATA_ZHA, DOMAIN
from homeassistant.components.zha.core.gateway import ZHAGateway
from homeassistant.components.zha.core.store import async_get_registry
from homeassistant.helpers.device_registry import async_get_registry as get_dev_reg
import homeassistant.components.zha.core.const as zha_const
import homeassistant.components.zha.core.registries as zha_regs
from homeassistant.setup import async_setup_component
from .common import FakeDevice, FakeEndpoint, async_setup_entry
from .common import FakeDevice, FakeEndpoint, get_zha_gateway
from tests.common import MockConfigEntry
@ -21,52 +20,64 @@ FIXTURE_GRP_ID = 0x1001
FIXTURE_GRP_NAME = "fixture group"
@pytest.fixture(name="config_entry")
async def config_entry_fixture(hass):
"""Fixture representing a config entry."""
config_entry = MockConfigEntry(domain=DOMAIN)
config_entry.add_to_hass(hass)
return config_entry
@pytest.fixture
def zigpy_app_controller():
"""Zigpy ApplicationController fixture."""
app = mock.MagicMock(spec_set=ControllerApplication)
app.startup = asynctest.CoroutineMock()
app.shutdown = asynctest.CoroutineMock()
groups = zigpy.group.Groups(app)
groups.add_group(FIXTURE_GRP_ID, FIXTURE_GRP_NAME, suppress_event=True)
app.configure_mock(groups=groups)
type(app).ieee = mock.PropertyMock()
app.ieee.return_value = zigpy.types.EUI64.convert("00:15:8d:00:02:32:4f:32")
type(app).nwk = mock.PropertyMock(return_value=zigpy.types.NWK(0x0000))
type(app).devices = mock.PropertyMock(return_value={})
return app
@pytest.fixture
async def setup_zha(hass, config_entry):
"""Load the ZHA component.
This will init the ZHA component. It loads the component in HA so that
we can test the domains that ZHA supports without actually having a zigbee
network running.
"""
# this prevents needing an actual radio and zigbee network available
with patch("homeassistant.components.zha.async_setup_entry", async_setup_entry):
hass.data[DATA_ZHA] = {}
# init ZHA
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
def zigpy_radio():
"""Zigpy radio mock."""
radio = mock.MagicMock()
radio.connect = asynctest.CoroutineMock()
return radio
@pytest.fixture(name="zha_gateway")
async def zha_gateway_fixture(hass, config_entry, setup_zha):
"""Fixture representing a zha gateway.
@pytest.fixture(name="config_entry")
async def config_entry_fixture(hass):
"""Fixture representing a config entry."""
entry = MockConfigEntry(
version=1,
domain=zha_const.DOMAIN,
data={
zha_const.CONF_BAUDRATE: zha_const.DEFAULT_BAUDRATE,
zha_const.CONF_RADIO_TYPE: "MockRadio",
zha_const.CONF_USB_PATH: "/dev/ttyUSB0",
},
)
entry.add_to_hass(hass)
return entry
Create a ZHAGateway object that can be used to interact with as if we
had a real zigbee network running.
"""
for component in COMPONENTS:
hass.data[DATA_ZHA][component] = hass.data[DATA_ZHA].get(component, {})
zha_storage = await async_get_registry(hass)
dev_reg = await get_dev_reg(hass)
gateway = ZHAGateway(hass, {}, config_entry)
gateway.zha_storage = zha_storage
gateway.ha_device_registry = dev_reg
gateway.application_controller = mock.MagicMock(spec_set=ControllerApplication)
groups = zigpy.group.Groups(gateway.application_controller)
groups.add_listener(gateway)
groups.add_group(FIXTURE_GRP_ID, FIXTURE_GRP_NAME, suppress_event=True)
gateway.application_controller.configure_mock(groups=groups)
gateway._initialize_groups()
return gateway
@pytest.fixture
def setup_zha(hass, config_entry, zigpy_app_controller, zigpy_radio):
"""Set up ZHA component."""
zha_config = {zha_const.DOMAIN: {zha_const.CONF_ENABLE_QUIRKS: False}}
radio_details = {
zha_const.ZHA_GW_RADIO: mock.MagicMock(return_value=zigpy_radio),
zha_const.CONTROLLER: mock.MagicMock(return_value=zigpy_app_controller),
zha_const.ZHA_GW_RADIO_DESCRIPTION: "mock radio",
}
async def _setup():
with mock.patch.dict(zha_regs.RADIO_TYPES, {"MockRadio": radio_details}):
status = await async_setup_component(hass, zha_const.DOMAIN, zha_config)
assert status is True
await hass.async_block_till_done()
return _setup
@pytest.fixture
@ -86,7 +97,7 @@ def channel():
@pytest.fixture
def zigpy_device_mock():
def zigpy_device_mock(zigpy_app_controller):
"""Make a fake device using the specified cluster classes."""
def _mock_dev(
@ -94,10 +105,12 @@ def zigpy_device_mock():
ieee="00:0d:6f:00:0a:90:69:e7",
manufacturer="FakeManufacturer",
model="FakeModel",
node_desc=b"\x02@\x807\x10\x7fd\x00\x00*d\x00\x00",
node_descriptor=b"\x02@\x807\x10\x7fd\x00\x00*d\x00\x00",
):
"""Make a fake device using the specified cluster classes."""
device = FakeDevice(ieee, manufacturer, model, node_desc)
device = FakeDevice(
zigpy_app_controller, ieee, manufacturer, model, node_descriptor
)
for epid, ep in endpoints.items():
endpoint = FakeEndpoint(manufacturer, model, epid)
endpoint.device = device
@ -119,19 +132,13 @@ def zigpy_device_mock():
@pytest.fixture
def _zha_device_restored_or_joined(hass, zha_gateway, config_entry):
"""Make a restored or joined ZHA devices."""
def zha_device_joined(hass, setup_zha):
"""Return a newly joined ZHA device."""
async def _zha_device(is_new_join, zigpy_dev):
if is_new_join:
for cmp in COMPONENTS:
await hass.config_entries.async_forward_entry_setup(config_entry, cmp)
await hass.async_block_till_done()
await zha_gateway.async_device_initialized(zigpy_dev)
else:
await zha_gateway.async_device_restored(zigpy_dev)
for cmp in COMPONENTS:
await hass.config_entries.async_forward_entry_setup(config_entry, cmp)
async def _zha_device(zigpy_dev):
await setup_zha()
zha_gateway = get_zha_gateway(hass)
await zha_gateway.async_device_initialized(zigpy_dev)
await hass.async_block_till_done()
return zha_gateway.get_device(zigpy_dev.ieee)
@ -139,17 +146,16 @@ def _zha_device_restored_or_joined(hass, zha_gateway, config_entry):
@pytest.fixture
def zha_device_joined(_zha_device_restored_or_joined):
"""Return a newly joined ZHA device."""
return functools.partial(_zha_device_restored_or_joined, True)
@pytest.fixture
def zha_device_restored(_zha_device_restored_or_joined):
def zha_device_restored(hass, zigpy_app_controller, setup_zha):
"""Return a restored ZHA device."""
return functools.partial(_zha_device_restored_or_joined, False)
async def _zha_device(zigpy_dev):
zigpy_app_controller.devices[zigpy_dev.ieee] = zigpy_dev
await setup_zha()
zha_gateway = hass.data[zha_const.DATA_ZHA][zha_const.DATA_ZHA_GATEWAY]
return zha_gateway.get_device(zigpy_dev.ieee)
return _zha_device
@pytest.fixture(params=["zha_device_joined", "zha_device_restored"])

View File

@ -28,7 +28,7 @@ IEEE_GROUPABLE_DEVICE = "01:2d:6f:00:0a:90:69:e8"
@pytest.fixture
async def device_switch(hass, zha_gateway, zigpy_device_mock, zha_device_joined):
async def device_switch(hass, zigpy_device_mock, zha_device_joined):
"""Test zha switch platform."""
zigpy_device = zigpy_device_mock(
@ -47,7 +47,7 @@ async def device_switch(hass, zha_gateway, zigpy_device_mock, zha_device_joined)
@pytest.fixture
async def device_groupable(hass, zha_gateway, zigpy_device_mock, zha_device_joined):
async def device_groupable(hass, zigpy_device_mock, zha_device_joined):
"""Test zha light platform."""
zigpy_device = zigpy_device_mock(
@ -78,7 +78,7 @@ async def zha_client(hass, hass_ws_client, device_switch, device_groupable):
return await hass_ws_client(hass)
async def test_device_clusters(hass, config_entry, zha_gateway, zha_client):
async def test_device_clusters(hass, zha_client):
"""Test getting device cluster info."""
await zha_client.send_json(
{ID: 5, TYPE: "zha/devices/clusters", ATTR_IEEE: IEEE_SWITCH_DEVICE}

View File

@ -9,6 +9,7 @@ from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE
from .common import (
async_enable_traffic,
async_test_rejoin,
find_entity_id,
make_attribute,
make_zcl_header,
@ -65,13 +66,12 @@ async def async_test_iaszone_on_off(hass, cluster, entity_id):
@pytest.mark.parametrize(
"device, on_off_test, cluster_name, reporting",
[
(DEVICE_IAS, async_test_iaszone_on_off, "ias_zone", False),
(DEVICE_OCCUPANCY, async_test_binary_sensor_on_off, "occupancy", True),
(DEVICE_IAS, async_test_iaszone_on_off, "ias_zone", (0,)),
(DEVICE_OCCUPANCY, async_test_binary_sensor_on_off, "occupancy", (1,)),
],
)
async def test_binary_sensor(
hass,
zha_gateway,
zigpy_device_mock,
zha_device_joined_restored,
device,
@ -89,7 +89,7 @@ async def test_binary_sensor(
# test that the sensors exist and are in the unavailable state
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
await async_enable_traffic(hass, zha_gateway, [zha_device])
await async_enable_traffic(hass, [zha_device])
# test that the sensors exist and are in the off state
assert hass.states.get(entity_id).state == STATE_OFF
@ -99,13 +99,5 @@ async def test_binary_sensor(
await on_off_test(hass, cluster, entity_id)
# test rejoin
cluster.bind.reset_mock()
cluster.configure_reporting.reset_mock()
await zha_gateway.async_device_initialized(zigpy_device)
await hass.async_block_till_done()
await async_test_rejoin(hass, zigpy_device, [cluster], reporting)
assert hass.states.get(entity_id).state == STATE_OFF
assert cluster.bind.call_count == 1
assert cluster.bind.await_count == 1
if reporting:
assert cluster.configure_reporting.call_count > 0
assert cluster.configure_reporting.await_count > 0

View File

@ -6,6 +6,8 @@ import homeassistant.components.zha.core.channels as channels
import homeassistant.components.zha.core.device as zha_device
import homeassistant.components.zha.core.registries as registries
from .common import get_zha_gateway
@pytest.fixture
def ieee():
@ -19,6 +21,13 @@ def nwk():
return t.NWK(0xBEEF)
@pytest.fixture
async def zha_gateway(hass, setup_zha):
"""Return ZhaGateway fixture."""
await setup_zha()
return get_zha_gateway(hass)
@pytest.mark.parametrize(
"cluster_id, bind_count, attrs",
[
@ -63,7 +72,7 @@ def nwk():
],
)
async def test_in_channel_config(
cluster_id, bind_count, attrs, zha_gateway, hass, zigpy_device_mock
cluster_id, bind_count, attrs, hass, zigpy_device_mock, zha_gateway
):
"""Test ZHA core channel configuration for input clusters."""
zigpy_dev = zigpy_device_mock(

View File

@ -12,6 +12,7 @@ from homeassistant.const import STATE_CLOSED, STATE_OPEN, STATE_UNAVAILABLE
from .common import (
async_enable_traffic,
async_test_rejoin,
find_entity_id,
make_attribute,
make_zcl_header,
@ -37,9 +38,7 @@ def zigpy_cover_device(zigpy_device_mock):
@asynctest.patch(
"homeassistant.components.zha.core.channels.closures.WindowCovering.async_initialize"
)
async def test_cover(
m1, hass, zha_gateway, zha_device_joined_restored, zigpy_cover_device
):
async def test_cover(m1, hass, zha_device_joined_restored, zigpy_cover_device):
"""Test zha cover platform."""
async def get_chan_attr(*args, **kwargs):
@ -62,7 +61,7 @@ async def test_cover(
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, zha_gateway, [zha_device])
await async_enable_traffic(hass, [zha_device])
await hass.async_block_till_done()
attr = make_attribute(8, 100)
@ -132,12 +131,5 @@ async def test_cover(
)
# test rejoin
cluster.bind.reset_mock()
cluster.configure_reporting.reset_mock()
await zha_gateway.async_device_initialized(zigpy_cover_device)
await hass.async_block_till_done()
await async_test_rejoin(hass, zigpy_cover_device, [cluster], (1,))
assert hass.states.get(entity_id).state == STATE_OPEN
assert cluster.bind.call_count == 1
assert cluster.bind.await_count == 1
assert cluster.configure_reporting.call_count == 1
assert cluster.configure_reporting.await_count == 1

View File

@ -23,13 +23,7 @@ COMMAND_SINGLE = "single"
@pytest.fixture
def calls(hass):
"""Track calls to a mock service."""
return async_mock_service(hass, "zha", "warning_device_warn")
@pytest.fixture
async def device_ias(hass, zha_gateway, zigpy_device_mock, zha_device_joined_restored):
async def device_ias(hass, zigpy_device_mock, zha_device_joined_restored):
"""IAS device fixture."""
clusters = [general.Basic, security.IasZone, security.IasWd]
@ -67,7 +61,7 @@ async def test_get_actions(hass, device_ias):
assert actions == expected_actions
async def test_action(hass, calls, device_ias):
async def test_action(hass, device_ias):
"""Test for executing a zha device action."""
zigpy_device, zha_device = device_ias
@ -108,6 +102,7 @@ async def test_action(hass, calls, device_ias):
)
await hass.async_block_till_done()
calls = async_mock_service(hass, DOMAIN, "warning_device_warn")
channel = {ch.name: ch for ch in zha_device.all_channels}[CHANNEL_EVENT_RELAY]
channel.zha_send_event(channel.cluster, COMMAND_SINGLE, [])

View File

@ -15,6 +15,7 @@ import homeassistant.util.dt as dt_util
from .common import (
async_enable_traffic,
async_test_rejoin,
find_entity_id,
make_attribute,
make_zcl_header,
@ -42,9 +43,7 @@ def zigpy_device_dt(zigpy_device_mock):
return zigpy_device_mock(endpoints)
async def test_device_tracker(
hass, zha_gateway, zha_device_joined_restored, zigpy_device_dt
):
async def test_device_tracker(hass, zha_device_joined_restored, zigpy_device_dt):
"""Test zha device tracker platform."""
zha_device = await zha_device_joined_restored(zigpy_device_dt)
@ -61,7 +60,7 @@ async def test_device_tracker(
await hass.async_block_till_done()
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, zha_gateway, [zha_device])
await async_enable_traffic(hass, [zha_device])
# test that the state has changed from unavailable to not home
assert hass.states.get(entity_id).state == STATE_NOT_HOME
@ -88,12 +87,5 @@ async def test_device_tracker(
assert entity.battery_level == 100
# test adding device tracker to the network and HA
cluster.bind.reset_mock()
cluster.configure_reporting.reset_mock()
await zha_gateway.async_device_initialized(zigpy_device_dt)
await hass.async_block_till_done()
await async_test_rejoin(hass, zigpy_device_dt, [cluster], (2,))
assert hass.states.get(entity_id).state == STATE_HOME
assert cluster.bind.call_count == 1
assert cluster.bind.await_count == 1
assert cluster.configure_reporting.call_count == 2
assert cluster.configure_reporting.await_count == 2

View File

@ -39,8 +39,8 @@ def calls(hass):
return async_mock_service(hass, "test", "automation")
@pytest.fixture(params=["zha_device_joined", "zha_device_restored"])
async def mock_devices(hass, zha_gateway, zigpy_device_mock, request):
@pytest.fixture
async def mock_devices(hass, zigpy_device_mock, zha_device_joined_restored):
"""IAS device fixture."""
zigpy_device = zigpy_device_mock(
@ -53,8 +53,7 @@ async def mock_devices(hass, zha_gateway, zigpy_device_mock, request):
},
)
join_or_restore = request.getfixturevalue(request.param)
zha_device = await join_or_restore(zigpy_device)
zha_device = await zha_device_joined_restored(zigpy_device)
zha_device.update_available(True)
await hass.async_block_till_done()
return zigpy_device, zha_device

View File

@ -1,6 +1,5 @@
"""Test zha device discovery."""
import asyncio
import re
from unittest import mock
@ -11,6 +10,7 @@ import homeassistant.components.zha.core.discovery as disc
import homeassistant.components.zha.core.gateway as core_zha_gw
import homeassistant.helpers.entity_registry
from .common import get_zha_gateway
from .zha_devices_list import DEVICES
NO_TAIL_ID = re.compile("_\\d$")
@ -18,12 +18,7 @@ NO_TAIL_ID = re.compile("_\\d$")
@pytest.mark.parametrize("device", DEVICES)
async def test_devices(
device,
zha_gateway: core_zha_gw.ZHAGateway,
hass,
config_entry,
zigpy_device_mock,
monkeypatch,
device, hass, zigpy_device_mock, monkeypatch, zha_device_joined_restored
):
"""Test device discovery."""
@ -32,7 +27,7 @@ async def test_devices(
"00:11:22:33:44:55:66:77",
device["manufacturer"],
device["model"],
node_desc=device["node_descriptor"],
node_descriptor=device["node_descriptor"],
)
_dispatch = mock.MagicMock(wraps=disc.async_dispatch_discovery_info)
@ -45,14 +40,7 @@ async def test_devices(
"homeassistant.components.zha.core.discovery._async_create_cluster_channel",
wraps=disc._async_create_cluster_channel,
):
await zha_gateway.async_device_restored(zigpy_device)
await hass.async_block_till_done()
tasks = [
hass.config_entries.async_forward_entry_setup(config_entry, component)
for component in zha_const.COMPONENTS
]
await asyncio.gather(*tasks)
await zha_device_joined_restored(zigpy_device)
await hass.async_block_till_done()
entity_ids = hass.states.async_entity_ids()
@ -61,6 +49,7 @@ async def test_devices(
ent for ent in entity_ids if ent.split(".")[0] in zha_const.COMPONENTS
}
zha_gateway = get_zha_gateway(hass)
zha_dev = zha_gateway.get_device(zigpy_device.ieee)
event_channels = { # pylint: disable=protected-access
ch.id for ch in zha_dev._relay_channels.values()

View File

@ -1,5 +1,5 @@
"""Test zha fan."""
from unittest.mock import call, patch
from unittest.mock import call
import pytest
import zigpy.zcl.clusters.hvac as hvac
@ -18,13 +18,12 @@ from homeassistant.const import (
from .common import (
async_enable_traffic,
async_test_rejoin,
find_entity_id,
make_attribute,
make_zcl_header,
)
from tests.common import mock_coro
@pytest.fixture
def zigpy_device(zigpy_device_mock):
@ -35,7 +34,7 @@ def zigpy_device(zigpy_device_mock):
return zigpy_device_mock(endpoints)
async def test_fan(hass, zha_gateway, zha_device_joined_restored, zigpy_device):
async def test_fan(hass, zha_device_joined_restored, zigpy_device):
"""Test zha fan platform."""
zha_device = await zha_device_joined_restored(zigpy_device)
@ -47,7 +46,7 @@ async def test_fan(hass, zha_gateway, zha_device_joined_restored, zigpy_device):
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, zha_gateway, [zha_device])
await async_enable_traffic(hass, [zha_device])
# test that the state has changed from unavailable to off
assert hass.states.get(entity_id).state == STATE_OFF
@ -66,44 +65,25 @@ async def test_fan(hass, zha_gateway, zha_device_joined_restored, zigpy_device):
assert hass.states.get(entity_id).state == STATE_OFF
# turn on from HA
with patch(
"zigpy.zcl.Cluster.write_attributes",
return_value=mock_coro([zcl_f.Status.SUCCESS, zcl_f.Status.SUCCESS]),
):
# turn on via UI
await async_turn_on(hass, entity_id)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"fan_mode": 2})
cluster.write_attributes.reset_mock()
await async_turn_on(hass, entity_id)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"fan_mode": 2})
# turn off from HA
with patch(
"zigpy.zcl.Cluster.write_attributes",
return_value=mock_coro([zcl_f.Status.SUCCESS, zcl_f.Status.SUCCESS]),
):
# turn off via UI
await async_turn_off(hass, entity_id)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"fan_mode": 0})
cluster.write_attributes.reset_mock()
await async_turn_off(hass, entity_id)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"fan_mode": 0})
# change speed from HA
with patch(
"zigpy.zcl.Cluster.write_attributes",
return_value=mock_coro([zcl_f.Status.SUCCESS, zcl_f.Status.SUCCESS]),
):
# turn on via UI
await async_set_speed(hass, entity_id, speed=fan.SPEED_HIGH)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"fan_mode": 3})
cluster.write_attributes.reset_mock()
await async_set_speed(hass, entity_id, speed=fan.SPEED_HIGH)
assert len(cluster.write_attributes.mock_calls) == 1
assert cluster.write_attributes.call_args == call({"fan_mode": 3})
# test adding new fan to the network and HA
cluster.bind.reset_mock()
cluster.configure_reporting.reset_mock()
await zha_gateway.async_device_initialized(zigpy_device)
await hass.async_block_till_done()
assert cluster.bind.call_count == 1
assert cluster.bind.await_count == 1
assert cluster.configure_reporting.call_count == 1
assert cluster.configure_reporting.await_count == 1
await async_test_rejoin(hass, zigpy_device, [cluster], (1,))
async def async_turn_on(hass, entity_id, speed=None):

View File

@ -2,7 +2,7 @@
import pytest
import zigpy.zcl.clusters.general as general
from .common import async_enable_traffic
from .common import async_enable_traffic, get_zha_gateway
@pytest.fixture
@ -27,13 +27,13 @@ async def zha_dev_basic(hass, zha_device_restored, zigpy_dev_basic):
return zha_device
async def test_device_left(hass, zha_gateway, zigpy_dev_basic, zha_dev_basic):
async def test_device_left(hass, zigpy_dev_basic, zha_dev_basic):
"""Device leaving the network should become unavailable."""
assert zha_dev_basic.available is False
await async_enable_traffic(hass, zha_gateway, [zha_dev_basic])
await async_enable_traffic(hass, [zha_dev_basic])
assert zha_dev_basic.available is True
zha_gateway.device_left(zigpy_dev_basic)
get_zha_gateway(hass).device_left(zigpy_dev_basic)
assert zha_dev_basic.available is False

View File

@ -14,6 +14,7 @@ from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE
from .common import (
async_enable_traffic,
async_test_rejoin,
find_entity_id,
make_attribute,
make_zcl_header,
@ -73,7 +74,7 @@ LIGHT_COLOR = {
[(LIGHT_ON_OFF, (1, 0, 0)), (LIGHT_LEVEL, (1, 1, 0)), (LIGHT_COLOR, (1, 1, 3))],
)
async def test_light(
hass, zha_gateway, zigpy_device_mock, zha_device_joined_restored, device, reporting,
hass, zigpy_device_mock, zha_device_joined_restored, device, reporting,
):
"""Test zha light platform."""
@ -92,7 +93,7 @@ async def test_light(
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, zha_gateway, [zha_device])
await async_enable_traffic(hass, [zha_device])
# test that the lights were created and are off
assert hass.states.get(entity_id).state == STATE_OFF
@ -121,17 +122,7 @@ async def test_light(
clusters.append(cluster_level)
if cluster_color:
clusters.append(cluster_color)
for cluster in clusters:
cluster.bind.reset_mock()
cluster.configure_reporting.reset_mock()
await zha_gateway.async_device_initialized(zigpy_device)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_OFF
for cluster, reporting_count in zip(clusters, reporting):
assert cluster.bind.call_count == 1
assert cluster.bind.await_count == 1
assert cluster.configure_reporting.call_count == reporting_count
assert cluster.configure_reporting.await_count == reporting_count
await async_test_rejoin(hass, zigpy_device, clusters, reporting)
async def async_test_on_off_from_light(hass, cluster, entity_id):

View File

@ -23,8 +23,8 @@ LOCK_DOOR = 0
UNLOCK_DOOR = 1
@pytest.fixture(params=["zha_device_joined", "zha_device_restored"])
async def lock(hass, zha_gateway, zigpy_device_mock, request):
@pytest.fixture
async def lock(hass, zigpy_device_mock, zha_device_joined_restored):
"""Lock cluster fixture."""
zigpy_device = zigpy_device_mock(
@ -37,12 +37,11 @@ async def lock(hass, zha_gateway, zigpy_device_mock, request):
},
)
join_or_restore = request.getfixturevalue(request.param)
zha_device = await join_or_restore(zigpy_device)
zha_device = await zha_device_joined_restored(zigpy_device)
return zha_device, zigpy_device.endpoints[1].door_lock
async def test_lock(hass, zha_gateway, lock):
async def test_lock(hass, lock):
"""Test zha lock platform."""
zha_device, cluster = lock
@ -53,7 +52,7 @@ async def test_lock(hass, zha_gateway, lock):
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, zha_gateway, [zha_device])
await async_enable_traffic(hass, [zha_device])
# test that the state has changed from unavailable to unlocked
assert hass.states.get(entity_id).state == STATE_UNLOCKED

View File

@ -25,6 +25,7 @@ from homeassistant.util import dt as dt_util
from .common import (
async_enable_traffic,
async_test_rejoin,
find_entity_id,
make_attribute,
make_zcl_header,
@ -102,7 +103,6 @@ async def async_test_electrical_measurement(hass, cluster, entity_id):
)
async def test_sensor(
hass,
zha_gateway,
zigpy_device_mock,
zha_device_joined_restored,
cluster_id,
@ -128,7 +128,7 @@ async def test_sensor(
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and devices
await async_enable_traffic(hass, zha_gateway, [zha_device])
await async_enable_traffic(hass, [zha_device])
# test that the sensor now have a state of unknown
assert hass.states.get(entity_id).state == STATE_UNKNOWN
@ -137,15 +137,7 @@ async def test_sensor(
await test_func(hass, cluster, entity_id)
# test rejoin
cluster.bind.reset_mock()
cluster.configure_reporting.reset_mock()
await zha_gateway.async_device_initialized(zigpy_device)
await hass.async_block_till_done()
await test_func(hass, cluster, entity_id)
assert cluster.bind.call_count == 1
assert cluster.bind.await_count == 1
assert cluster.configure_reporting.call_count == report_count
assert cluster.configure_reporting.await_count == report_count
await async_test_rejoin(hass, zigpy_device, [cluster], (report_count,))
async def send_attribute_report(hass, cluster, attrid, value):
@ -232,7 +224,6 @@ async def test_temp_uom(
expected,
restore,
hass_ms,
zha_gateway,
core_rs,
zigpy_device_mock,
zha_device_restored,
@ -267,7 +258,7 @@ async def test_temp_uom(
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and devices
await async_enable_traffic(hass, zha_gateway, [zha_device])
await async_enable_traffic(hass, [zha_device])
# test that the sensors now have a state of unknown
if not restore:

View File

@ -10,6 +10,7 @@ from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE
from .common import (
async_enable_traffic,
async_test_rejoin,
find_entity_id,
make_attribute,
make_zcl_header,
@ -34,7 +35,7 @@ def zigpy_device(zigpy_device_mock):
return zigpy_device_mock(endpoints)
async def test_switch(hass, zha_gateway, zha_device_joined_restored, zigpy_device):
async def test_switch(hass, zha_device_joined_restored, zigpy_device):
"""Test zha switch platform."""
zha_device = await zha_device_joined_restored(zigpy_device)
@ -46,7 +47,7 @@ async def test_switch(hass, zha_gateway, zha_device_joined_restored, zigpy_devic
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, zha_gateway, [zha_device])
await async_enable_traffic(hass, [zha_device])
# test that the state has changed from unavailable to off
assert hass.states.get(entity_id).state == STATE_OFF
@ -93,11 +94,4 @@ async def test_switch(hass, zha_gateway, zha_device_joined_restored, zigpy_devic
)
# test joining a new switch to the network and HA
cluster.bind.reset_mock()
cluster.configure_reporting.reset_mock()
await zha_gateway.async_device_initialized(zigpy_device)
await hass.async_block_till_done()
assert cluster.bind.call_count == 1
assert cluster.bind.await_count == 1
assert cluster.configure_reporting.call_count == 1
assert cluster.configure_reporting.await_count == 1
await async_test_rejoin(hass, zigpy_device, [cluster], (1,))