487 lines
15 KiB
Python
487 lines
15 KiB
Python
"""Tests for init module."""
|
|
|
|
from asyncio import Event
|
|
from collections.abc import Callable
|
|
from copy import deepcopy
|
|
from datetime import datetime, time as dt_time, timedelta
|
|
import http
|
|
import time
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
from aioautomower.exceptions import (
|
|
ApiError,
|
|
AuthError,
|
|
HusqvarnaTimeoutError,
|
|
HusqvarnaWSServerHandshakeError,
|
|
)
|
|
from aioautomower.model import Calendar, MowerAttributes, WorkArea
|
|
from freezegun.api import FrozenDateTimeFactory
|
|
import pytest
|
|
from syrupy.assertion import SnapshotAssertion
|
|
|
|
from homeassistant.components.husqvarna_automower.const import DOMAIN, OAUTH2_TOKEN
|
|
from homeassistant.components.husqvarna_automower.coordinator import SCAN_INTERVAL
|
|
from homeassistant.config_entries import ConfigEntryState
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.helpers import device_registry as dr, entity_registry as er
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from . import setup_integration
|
|
from .const import TEST_MOWER_ID
|
|
|
|
from tests.common import MockConfigEntry, async_fire_time_changed
|
|
from tests.test_util.aiohttp import AiohttpClientMocker
|
|
|
|
ADDITIONAL_NUMBER_ENTITIES = 1
|
|
ADDITIONAL_SENSOR_ENTITIES = 2
|
|
ADDITIONAL_SWITCH_ENTITIES = 1
|
|
NUMBER_OF_ENTITIES_MOWER_2 = 11
|
|
|
|
|
|
async def test_load_unload_entry(
|
|
hass: HomeAssistant,
|
|
mock_automower_client: AsyncMock,
|
|
mock_config_entry: MockConfigEntry,
|
|
) -> None:
|
|
"""Test load and unload entry."""
|
|
await setup_integration(hass, mock_config_entry)
|
|
entry = hass.config_entries.async_entries(DOMAIN)[0]
|
|
|
|
assert entry.state is ConfigEntryState.LOADED
|
|
|
|
await hass.config_entries.async_remove(entry.entry_id)
|
|
await hass.async_block_till_done()
|
|
|
|
assert entry.state is ConfigEntryState.NOT_LOADED
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("scope"),
|
|
[
|
|
("iam:read"),
|
|
],
|
|
)
|
|
async def test_load_missing_scope(
|
|
hass: HomeAssistant,
|
|
mock_automower_client: AsyncMock,
|
|
mock_config_entry: MockConfigEntry,
|
|
) -> None:
|
|
"""Test if the entry starts a reauth with the missing token scope."""
|
|
await setup_integration(hass, mock_config_entry)
|
|
assert mock_config_entry.state is ConfigEntryState.SETUP_ERROR
|
|
flows = hass.config_entries.flow.async_progress()
|
|
assert len(flows) == 1
|
|
result = flows[0]
|
|
assert result["step_id"] == "missing_scope"
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("expires_at", "status", "expected_state"),
|
|
[
|
|
(
|
|
time.time() - 3600,
|
|
http.HTTPStatus.UNAUTHORIZED,
|
|
ConfigEntryState.SETUP_ERROR,
|
|
),
|
|
(
|
|
time.time() - 3600,
|
|
http.HTTPStatus.INTERNAL_SERVER_ERROR,
|
|
ConfigEntryState.SETUP_RETRY,
|
|
),
|
|
],
|
|
ids=["unauthorized", "internal_server_error"],
|
|
)
|
|
async def test_expired_token_refresh_failure(
|
|
hass: HomeAssistant,
|
|
mock_config_entry: MockConfigEntry,
|
|
aioclient_mock: AiohttpClientMocker,
|
|
status: http.HTTPStatus,
|
|
expected_state: ConfigEntryState,
|
|
) -> None:
|
|
"""Test failure while refreshing token with a transient error."""
|
|
|
|
aioclient_mock.clear_requests()
|
|
aioclient_mock.post(
|
|
OAUTH2_TOKEN,
|
|
status=status,
|
|
)
|
|
|
|
await setup_integration(hass, mock_config_entry)
|
|
|
|
assert mock_config_entry.state is expected_state
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("exception", "entry_state"),
|
|
[
|
|
(ApiError, ConfigEntryState.SETUP_RETRY),
|
|
(AuthError, ConfigEntryState.SETUP_ERROR),
|
|
],
|
|
)
|
|
async def test_update_failed(
|
|
hass: HomeAssistant,
|
|
mock_automower_client: AsyncMock,
|
|
mock_config_entry: MockConfigEntry,
|
|
exception: Exception,
|
|
entry_state: ConfigEntryState,
|
|
) -> None:
|
|
"""Test update failed."""
|
|
mock_automower_client.get_status.side_effect = exception("Test error")
|
|
await setup_integration(hass, mock_config_entry)
|
|
entry = hass.config_entries.async_entries(DOMAIN)[0]
|
|
assert entry.state is entry_state
|
|
|
|
|
|
@patch(
|
|
"homeassistant.components.husqvarna_automower.coordinator.DEFAULT_RECONNECT_TIME", 0
|
|
)
|
|
@pytest.mark.parametrize(
|
|
("method_path", "exception", "error_msg"),
|
|
[
|
|
(
|
|
["auth", "websocket_connect"],
|
|
HusqvarnaWSServerHandshakeError,
|
|
"Failed to connect to websocket.",
|
|
),
|
|
(
|
|
["start_listening"],
|
|
HusqvarnaTimeoutError,
|
|
"Failed to listen to websocket.",
|
|
),
|
|
],
|
|
)
|
|
async def test_websocket_not_available(
|
|
hass: HomeAssistant,
|
|
mock_automower_client: AsyncMock,
|
|
mock_config_entry: MockConfigEntry,
|
|
caplog: pytest.LogCaptureFixture,
|
|
freezer: FrozenDateTimeFactory,
|
|
method_path: list[str],
|
|
exception: type[Exception],
|
|
error_msg: str,
|
|
) -> None:
|
|
"""Test trying to reload the websocket."""
|
|
calls = []
|
|
mock_called = Event()
|
|
mock_stall = Event()
|
|
|
|
async def mock_function():
|
|
mock_called.set()
|
|
await mock_stall.wait()
|
|
# Raise the first time the method is awaited
|
|
if not calls:
|
|
calls.append(None)
|
|
raise exception("Boom")
|
|
if mock_side_effect:
|
|
await mock_side_effect()
|
|
|
|
# Find the method to mock
|
|
mock = mock_automower_client
|
|
for itm in method_path:
|
|
mock = getattr(mock, itm)
|
|
mock_side_effect = mock.side_effect
|
|
mock.side_effect = mock_function
|
|
|
|
# Setup integration and verify log error message
|
|
await setup_integration(hass, mock_config_entry)
|
|
await mock_called.wait()
|
|
mock_called.clear()
|
|
# Allow the exception to be raised
|
|
mock_stall.set()
|
|
assert mock.call_count == 1
|
|
await hass.async_block_till_done()
|
|
assert f"{error_msg} Trying to reconnect: Boom" in caplog.text
|
|
|
|
# Simulate a successful connection
|
|
caplog.clear()
|
|
await mock_called.wait()
|
|
mock_called.clear()
|
|
await hass.async_block_till_done()
|
|
assert mock.call_count == 2
|
|
assert "Trying to reconnect: Boom" not in caplog.text
|
|
|
|
# Simulate hass shutting down
|
|
await hass.async_stop()
|
|
assert mock.call_count == 2
|
|
|
|
|
|
async def test_device_info(
|
|
hass: HomeAssistant,
|
|
mock_automower_client: AsyncMock,
|
|
mock_config_entry: MockConfigEntry,
|
|
device_registry: dr.DeviceRegistry,
|
|
snapshot: SnapshotAssertion,
|
|
) -> None:
|
|
"""Test select platform."""
|
|
|
|
mock_config_entry.add_to_hass(hass)
|
|
await hass.config_entries.async_setup(mock_config_entry.entry_id)
|
|
await hass.async_block_till_done()
|
|
reg_device = device_registry.async_get_device(
|
|
identifiers={(DOMAIN, TEST_MOWER_ID)},
|
|
)
|
|
assert reg_device == snapshot
|
|
|
|
|
|
async def test_constant_polling(
|
|
hass: HomeAssistant,
|
|
mock_automower_client: AsyncMock,
|
|
mock_config_entry: MockConfigEntry,
|
|
values: dict[str, MowerAttributes],
|
|
freezer: FrozenDateTimeFactory,
|
|
) -> None:
|
|
"""Verify that receiving a WebSocket update does not interrupt the regular polling cycle.
|
|
|
|
The test simulates a WebSocket update that changes an entity's state, then advances time
|
|
to trigger a scheduled poll to confirm polled data also arrives.
|
|
"""
|
|
test_values = deepcopy(values)
|
|
callback_holder: dict[str, Callable] = {}
|
|
|
|
@callback
|
|
def fake_register_websocket_response(
|
|
cb: Callable[[dict[str, MowerAttributes]], None],
|
|
) -> None:
|
|
callback_holder["cb"] = cb
|
|
|
|
mock_automower_client.register_data_callback.side_effect = (
|
|
fake_register_websocket_response
|
|
)
|
|
await setup_integration(hass, mock_config_entry)
|
|
await hass.async_block_till_done()
|
|
|
|
assert mock_automower_client.register_data_callback.called
|
|
assert "cb" in callback_holder
|
|
|
|
state = hass.states.get("sensor.test_mower_1_battery")
|
|
assert state is not None
|
|
assert state.state == "100"
|
|
state = hass.states.get("sensor.test_mower_1_front_lawn_progress")
|
|
assert state is not None
|
|
assert state.state == "40"
|
|
|
|
test_values[TEST_MOWER_ID].battery.battery_percent = 77
|
|
|
|
freezer.tick(SCAN_INTERVAL - timedelta(seconds=10))
|
|
async_fire_time_changed(hass)
|
|
await hass.async_block_till_done()
|
|
|
|
callback_holder["cb"](test_values)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get("sensor.test_mower_1_battery")
|
|
assert state is not None
|
|
assert state.state == "77"
|
|
state = hass.states.get("sensor.test_mower_1_front_lawn_progress")
|
|
assert state is not None
|
|
assert state.state == "40"
|
|
|
|
test_values[TEST_MOWER_ID].work_areas[123456].progress = 50
|
|
mock_automower_client.get_status.return_value = test_values
|
|
freezer.tick(timedelta(seconds=10))
|
|
async_fire_time_changed(hass)
|
|
await hass.async_block_till_done()
|
|
mock_automower_client.get_status.assert_awaited()
|
|
state = hass.states.get("sensor.test_mower_1_battery")
|
|
assert state is not None
|
|
assert state.state == "77"
|
|
state = hass.states.get("sensor.test_mower_1_front_lawn_progress")
|
|
assert state is not None
|
|
assert state.state == "50"
|
|
|
|
|
|
async def test_coordinator_automatic_registry_cleanup(
|
|
hass: HomeAssistant,
|
|
mock_automower_client: AsyncMock,
|
|
mock_config_entry: MockConfigEntry,
|
|
device_registry: dr.DeviceRegistry,
|
|
entity_registry: er.EntityRegistry,
|
|
values: dict[str, MowerAttributes],
|
|
freezer: FrozenDateTimeFactory,
|
|
) -> None:
|
|
"""Test automatic registry cleanup."""
|
|
await setup_integration(hass, mock_config_entry)
|
|
entry = hass.config_entries.async_entries(DOMAIN)[0]
|
|
await hass.async_block_till_done()
|
|
|
|
# Count current entitties and devices
|
|
current_entites = len(
|
|
er.async_entries_for_config_entry(entity_registry, entry.entry_id)
|
|
)
|
|
current_devices = len(
|
|
dr.async_entries_for_config_entry(device_registry, entry.entry_id)
|
|
)
|
|
# Remove mower 2 and check if it worked
|
|
values_copy = deepcopy(values)
|
|
mower2 = values_copy.pop("1234")
|
|
mock_automower_client.get_status.return_value = values_copy
|
|
freezer.tick(SCAN_INTERVAL)
|
|
async_fire_time_changed(hass)
|
|
await hass.async_block_till_done()
|
|
|
|
assert (
|
|
len(er.async_entries_for_config_entry(entity_registry, entry.entry_id))
|
|
== current_entites - NUMBER_OF_ENTITIES_MOWER_2
|
|
)
|
|
assert (
|
|
len(dr.async_entries_for_config_entry(device_registry, entry.entry_id))
|
|
== current_devices - 1
|
|
)
|
|
# Add mower 2 and check if it worked
|
|
values_copy = deepcopy(values)
|
|
values_copy["1234"] = mower2
|
|
mock_automower_client.get_status.return_value = values_copy
|
|
freezer.tick(SCAN_INTERVAL)
|
|
async_fire_time_changed(hass)
|
|
await hass.async_block_till_done()
|
|
assert (
|
|
len(er.async_entries_for_config_entry(entity_registry, entry.entry_id))
|
|
== current_entites
|
|
)
|
|
assert (
|
|
len(dr.async_entries_for_config_entry(device_registry, entry.entry_id))
|
|
== current_devices
|
|
)
|
|
|
|
# Remove mower 1 and check if it worked
|
|
values_copy = deepcopy(values)
|
|
mower1 = values_copy.pop(TEST_MOWER_ID)
|
|
mock_automower_client.get_status.return_value = values_copy
|
|
freezer.tick(SCAN_INTERVAL)
|
|
async_fire_time_changed(hass)
|
|
await hass.async_block_till_done()
|
|
|
|
assert (
|
|
len(er.async_entries_for_config_entry(entity_registry, entry.entry_id))
|
|
== NUMBER_OF_ENTITIES_MOWER_2
|
|
)
|
|
assert (
|
|
len(dr.async_entries_for_config_entry(device_registry, entry.entry_id))
|
|
== current_devices - 1
|
|
)
|
|
# Add mower 1 and check if it worked
|
|
values_copy = deepcopy(values)
|
|
values_copy[TEST_MOWER_ID] = mower1
|
|
mock_automower_client.get_status.return_value = values_copy
|
|
freezer.tick(SCAN_INTERVAL)
|
|
async_fire_time_changed(hass)
|
|
await hass.async_block_till_done()
|
|
assert (
|
|
len(dr.async_entries_for_config_entry(device_registry, entry.entry_id))
|
|
== current_devices
|
|
)
|
|
assert (
|
|
len(er.async_entries_for_config_entry(entity_registry, entry.entry_id))
|
|
== current_entites
|
|
)
|
|
|
|
|
|
async def test_add_and_remove_work_area(
|
|
hass: HomeAssistant,
|
|
mock_automower_client: AsyncMock,
|
|
mock_config_entry: MockConfigEntry,
|
|
freezer: FrozenDateTimeFactory,
|
|
entity_registry: er.EntityRegistry,
|
|
values: dict[str, MowerAttributes],
|
|
) -> None:
|
|
"""Test adding a work area in runtime."""
|
|
websocket_values = deepcopy(values)
|
|
callback_holder: dict[str, Callable] = {}
|
|
|
|
@callback
|
|
def fake_register_websocket_response(
|
|
cb: Callable[[dict[str, MowerAttributes]], None],
|
|
) -> None:
|
|
callback_holder["cb"] = cb
|
|
|
|
mock_automower_client.register_data_callback.side_effect = (
|
|
fake_register_websocket_response
|
|
)
|
|
await setup_integration(hass, mock_config_entry)
|
|
entry = hass.config_entries.async_entries(DOMAIN)[0]
|
|
current_entites_start = len(
|
|
er.async_entries_for_config_entry(entity_registry, entry.entry_id)
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
assert mock_automower_client.register_data_callback.called
|
|
assert "cb" in callback_holder
|
|
|
|
new_task = Calendar(
|
|
start=dt_time(hour=11),
|
|
duration=timedelta(60),
|
|
monday=True,
|
|
tuesday=True,
|
|
wednesday=True,
|
|
thursday=True,
|
|
friday=True,
|
|
saturday=True,
|
|
sunday=True,
|
|
work_area_id=1,
|
|
)
|
|
websocket_values[TEST_MOWER_ID].calendar.tasks.append(new_task)
|
|
poll_values = deepcopy(websocket_values)
|
|
poll_values[TEST_MOWER_ID].work_area_names.append("new work area")
|
|
poll_values[TEST_MOWER_ID].work_area_dict.update({1: "new work area"})
|
|
poll_values[TEST_MOWER_ID].work_areas.update(
|
|
{
|
|
1: WorkArea(
|
|
name="new work area",
|
|
cutting_height=12,
|
|
enabled=True,
|
|
progress=12,
|
|
last_time_completed=datetime(
|
|
2024, 10, 1, 11, 11, 0, tzinfo=dt_util.get_default_time_zone()
|
|
),
|
|
)
|
|
}
|
|
)
|
|
mock_automower_client.get_status.return_value = poll_values
|
|
|
|
callback_holder["cb"](websocket_values)
|
|
await hass.async_block_till_done()
|
|
assert mock_automower_client.get_status.called
|
|
|
|
state = hass.states.get("sensor.test_mower_1_new_work_area_progress")
|
|
assert state is not None
|
|
assert state.state == "12"
|
|
current_entites_after_addition = len(
|
|
er.async_entries_for_config_entry(entity_registry, entry.entry_id)
|
|
)
|
|
assert (
|
|
current_entites_after_addition
|
|
== current_entites_start
|
|
+ ADDITIONAL_NUMBER_ENTITIES
|
|
+ ADDITIONAL_SENSOR_ENTITIES
|
|
+ ADDITIONAL_SWITCH_ENTITIES
|
|
)
|
|
|
|
poll_values[TEST_MOWER_ID].work_area_names.remove("new work area")
|
|
del poll_values[TEST_MOWER_ID].work_area_dict[1]
|
|
del poll_values[TEST_MOWER_ID].work_areas[1]
|
|
poll_values[TEST_MOWER_ID].work_area_names.remove("Front lawn")
|
|
del poll_values[TEST_MOWER_ID].work_area_dict[123456]
|
|
del poll_values[TEST_MOWER_ID].work_areas[123456]
|
|
|
|
poll_values[TEST_MOWER_ID].calendar.tasks = [
|
|
task
|
|
for task in poll_values[TEST_MOWER_ID].calendar.tasks
|
|
if task.work_area_id not in [1, 123456]
|
|
]
|
|
|
|
poll_values[TEST_MOWER_ID].mower.work_area_id = 654321
|
|
mock_automower_client.get_status.return_value = poll_values
|
|
freezer.tick(SCAN_INTERVAL)
|
|
async_fire_time_changed(hass)
|
|
await hass.async_block_till_done()
|
|
current_entites_after_deletion = len(
|
|
er.async_entries_for_config_entry(entity_registry, entry.entry_id)
|
|
)
|
|
assert (
|
|
current_entites_after_deletion
|
|
== current_entites_start
|
|
- ADDITIONAL_SWITCH_ENTITIES
|
|
- ADDITIONAL_NUMBER_ENTITIES
|
|
- ADDITIONAL_SENSOR_ENTITIES
|
|
)
|