"""Tests for init module.""" from asyncio import Event from collections.abc import Callable from copy import deepcopy from datetime import datetime, time as dt_time, timedelta import http import time from unittest.mock import AsyncMock, patch from aioautomower.exceptions import ( ApiError, AuthError, HusqvarnaTimeoutError, HusqvarnaWSServerHandshakeError, ) from aioautomower.model import Calendar, MowerAttributes, WorkArea from freezegun.api import FrozenDateTimeFactory import pytest from syrupy.assertion import SnapshotAssertion from homeassistant.components.husqvarna_automower.const import DOMAIN, OAUTH2_TOKEN from homeassistant.components.husqvarna_automower.coordinator import SCAN_INTERVAL from homeassistant.config_entries import ConfigEntryState from homeassistant.core import HomeAssistant, callback from homeassistant.helpers import device_registry as dr, entity_registry as er from homeassistant.util import dt as dt_util from . import setup_integration from .const import TEST_MOWER_ID from tests.common import MockConfigEntry, async_fire_time_changed from tests.test_util.aiohttp import AiohttpClientMocker ADDITIONAL_NUMBER_ENTITIES = 1 ADDITIONAL_SENSOR_ENTITIES = 2 ADDITIONAL_SWITCH_ENTITIES = 1 NUMBER_OF_ENTITIES_MOWER_2 = 11 async def test_load_unload_entry( hass: HomeAssistant, mock_automower_client: AsyncMock, mock_config_entry: MockConfigEntry, ) -> None: """Test load and unload entry.""" await setup_integration(hass, mock_config_entry) entry = hass.config_entries.async_entries(DOMAIN)[0] assert entry.state is ConfigEntryState.LOADED await hass.config_entries.async_remove(entry.entry_id) await hass.async_block_till_done() assert entry.state is ConfigEntryState.NOT_LOADED @pytest.mark.parametrize( ("scope"), [ ("iam:read"), ], ) async def test_load_missing_scope( hass: HomeAssistant, mock_automower_client: AsyncMock, mock_config_entry: MockConfigEntry, ) -> None: """Test if the entry starts a reauth with the missing token scope.""" await setup_integration(hass, mock_config_entry) assert mock_config_entry.state is ConfigEntryState.SETUP_ERROR flows = hass.config_entries.flow.async_progress() assert len(flows) == 1 result = flows[0] assert result["step_id"] == "missing_scope" @pytest.mark.parametrize( ("expires_at", "status", "expected_state"), [ ( time.time() - 3600, http.HTTPStatus.UNAUTHORIZED, ConfigEntryState.SETUP_ERROR, ), ( time.time() - 3600, http.HTTPStatus.INTERNAL_SERVER_ERROR, ConfigEntryState.SETUP_RETRY, ), ], ids=["unauthorized", "internal_server_error"], ) async def test_expired_token_refresh_failure( hass: HomeAssistant, mock_config_entry: MockConfigEntry, aioclient_mock: AiohttpClientMocker, status: http.HTTPStatus, expected_state: ConfigEntryState, ) -> None: """Test failure while refreshing token with a transient error.""" aioclient_mock.clear_requests() aioclient_mock.post( OAUTH2_TOKEN, status=status, ) await setup_integration(hass, mock_config_entry) assert mock_config_entry.state is expected_state @pytest.mark.parametrize( ("exception", "entry_state"), [ (ApiError, ConfigEntryState.SETUP_RETRY), (AuthError, ConfigEntryState.SETUP_ERROR), ], ) async def test_update_failed( hass: HomeAssistant, mock_automower_client: AsyncMock, mock_config_entry: MockConfigEntry, exception: Exception, entry_state: ConfigEntryState, ) -> None: """Test update failed.""" mock_automower_client.get_status.side_effect = exception("Test error") await setup_integration(hass, mock_config_entry) entry = hass.config_entries.async_entries(DOMAIN)[0] assert entry.state is entry_state @patch( "homeassistant.components.husqvarna_automower.coordinator.DEFAULT_RECONNECT_TIME", 0 ) @pytest.mark.parametrize( ("method_path", "exception", "error_msg"), [ ( ["auth", "websocket_connect"], HusqvarnaWSServerHandshakeError, "Failed to connect to websocket.", ), ( ["start_listening"], HusqvarnaTimeoutError, "Failed to listen to websocket.", ), ], ) async def test_websocket_not_available( hass: HomeAssistant, mock_automower_client: AsyncMock, mock_config_entry: MockConfigEntry, caplog: pytest.LogCaptureFixture, freezer: FrozenDateTimeFactory, method_path: list[str], exception: type[Exception], error_msg: str, ) -> None: """Test trying to reload the websocket.""" calls = [] mock_called = Event() mock_stall = Event() async def mock_function(): mock_called.set() await mock_stall.wait() # Raise the first time the method is awaited if not calls: calls.append(None) raise exception("Boom") if mock_side_effect: await mock_side_effect() # Find the method to mock mock = mock_automower_client for itm in method_path: mock = getattr(mock, itm) mock_side_effect = mock.side_effect mock.side_effect = mock_function # Setup integration and verify log error message await setup_integration(hass, mock_config_entry) await mock_called.wait() mock_called.clear() # Allow the exception to be raised mock_stall.set() assert mock.call_count == 1 await hass.async_block_till_done() assert f"{error_msg} Trying to reconnect: Boom" in caplog.text # Simulate a successful connection caplog.clear() await mock_called.wait() mock_called.clear() await hass.async_block_till_done() assert mock.call_count == 2 assert "Trying to reconnect: Boom" not in caplog.text # Simulate hass shutting down await hass.async_stop() assert mock.call_count == 2 async def test_device_info( hass: HomeAssistant, mock_automower_client: AsyncMock, mock_config_entry: MockConfigEntry, device_registry: dr.DeviceRegistry, snapshot: SnapshotAssertion, ) -> None: """Test select platform.""" mock_config_entry.add_to_hass(hass) await hass.config_entries.async_setup(mock_config_entry.entry_id) await hass.async_block_till_done() reg_device = device_registry.async_get_device( identifiers={(DOMAIN, TEST_MOWER_ID)}, ) assert reg_device == snapshot async def test_constant_polling( hass: HomeAssistant, mock_automower_client: AsyncMock, mock_config_entry: MockConfigEntry, values: dict[str, MowerAttributes], freezer: FrozenDateTimeFactory, ) -> None: """Verify that receiving a WebSocket update does not interrupt the regular polling cycle. The test simulates a WebSocket update that changes an entity's state, then advances time to trigger a scheduled poll to confirm polled data also arrives. """ test_values = deepcopy(values) callback_holder: dict[str, Callable] = {} @callback def fake_register_websocket_response( cb: Callable[[dict[str, MowerAttributes]], None], ) -> None: callback_holder["cb"] = cb mock_automower_client.register_data_callback.side_effect = ( fake_register_websocket_response ) await setup_integration(hass, mock_config_entry) await hass.async_block_till_done() assert mock_automower_client.register_data_callback.called assert "cb" in callback_holder state = hass.states.get("sensor.test_mower_1_battery") assert state is not None assert state.state == "100" state = hass.states.get("sensor.test_mower_1_front_lawn_progress") assert state is not None assert state.state == "40" test_values[TEST_MOWER_ID].battery.battery_percent = 77 freezer.tick(SCAN_INTERVAL - timedelta(seconds=10)) async_fire_time_changed(hass) await hass.async_block_till_done() callback_holder["cb"](test_values) await hass.async_block_till_done() state = hass.states.get("sensor.test_mower_1_battery") assert state is not None assert state.state == "77" state = hass.states.get("sensor.test_mower_1_front_lawn_progress") assert state is not None assert state.state == "40" test_values[TEST_MOWER_ID].work_areas[123456].progress = 50 mock_automower_client.get_status.return_value = test_values freezer.tick(timedelta(seconds=10)) async_fire_time_changed(hass) await hass.async_block_till_done() mock_automower_client.get_status.assert_awaited() state = hass.states.get("sensor.test_mower_1_battery") assert state is not None assert state.state == "77" state = hass.states.get("sensor.test_mower_1_front_lawn_progress") assert state is not None assert state.state == "50" async def test_coordinator_automatic_registry_cleanup( hass: HomeAssistant, mock_automower_client: AsyncMock, mock_config_entry: MockConfigEntry, device_registry: dr.DeviceRegistry, entity_registry: er.EntityRegistry, values: dict[str, MowerAttributes], freezer: FrozenDateTimeFactory, ) -> None: """Test automatic registry cleanup.""" await setup_integration(hass, mock_config_entry) entry = hass.config_entries.async_entries(DOMAIN)[0] await hass.async_block_till_done() # Count current entitties and devices current_entites = len( er.async_entries_for_config_entry(entity_registry, entry.entry_id) ) current_devices = len( dr.async_entries_for_config_entry(device_registry, entry.entry_id) ) # Remove mower 2 and check if it worked values_copy = deepcopy(values) mower2 = values_copy.pop("1234") mock_automower_client.get_status.return_value = values_copy freezer.tick(SCAN_INTERVAL) async_fire_time_changed(hass) await hass.async_block_till_done() assert ( len(er.async_entries_for_config_entry(entity_registry, entry.entry_id)) == current_entites - NUMBER_OF_ENTITIES_MOWER_2 ) assert ( len(dr.async_entries_for_config_entry(device_registry, entry.entry_id)) == current_devices - 1 ) # Add mower 2 and check if it worked values_copy = deepcopy(values) values_copy["1234"] = mower2 mock_automower_client.get_status.return_value = values_copy freezer.tick(SCAN_INTERVAL) async_fire_time_changed(hass) await hass.async_block_till_done() assert ( len(er.async_entries_for_config_entry(entity_registry, entry.entry_id)) == current_entites ) assert ( len(dr.async_entries_for_config_entry(device_registry, entry.entry_id)) == current_devices ) # Remove mower 1 and check if it worked values_copy = deepcopy(values) mower1 = values_copy.pop(TEST_MOWER_ID) mock_automower_client.get_status.return_value = values_copy freezer.tick(SCAN_INTERVAL) async_fire_time_changed(hass) await hass.async_block_till_done() assert ( len(er.async_entries_for_config_entry(entity_registry, entry.entry_id)) == NUMBER_OF_ENTITIES_MOWER_2 ) assert ( len(dr.async_entries_for_config_entry(device_registry, entry.entry_id)) == current_devices - 1 ) # Add mower 1 and check if it worked values_copy = deepcopy(values) values_copy[TEST_MOWER_ID] = mower1 mock_automower_client.get_status.return_value = values_copy freezer.tick(SCAN_INTERVAL) async_fire_time_changed(hass) await hass.async_block_till_done() assert ( len(dr.async_entries_for_config_entry(device_registry, entry.entry_id)) == current_devices ) assert ( len(er.async_entries_for_config_entry(entity_registry, entry.entry_id)) == current_entites ) async def test_add_and_remove_work_area( hass: HomeAssistant, mock_automower_client: AsyncMock, mock_config_entry: MockConfigEntry, freezer: FrozenDateTimeFactory, entity_registry: er.EntityRegistry, values: dict[str, MowerAttributes], ) -> None: """Test adding a work area in runtime.""" websocket_values = deepcopy(values) callback_holder: dict[str, Callable] = {} @callback def fake_register_websocket_response( cb: Callable[[dict[str, MowerAttributes]], None], ) -> None: callback_holder["cb"] = cb mock_automower_client.register_data_callback.side_effect = ( fake_register_websocket_response ) await setup_integration(hass, mock_config_entry) entry = hass.config_entries.async_entries(DOMAIN)[0] current_entites_start = len( er.async_entries_for_config_entry(entity_registry, entry.entry_id) ) await hass.async_block_till_done() assert mock_automower_client.register_data_callback.called assert "cb" in callback_holder new_task = Calendar( start=dt_time(hour=11), duration=timedelta(60), monday=True, tuesday=True, wednesday=True, thursday=True, friday=True, saturday=True, sunday=True, work_area_id=1, ) websocket_values[TEST_MOWER_ID].calendar.tasks.append(new_task) poll_values = deepcopy(websocket_values) poll_values[TEST_MOWER_ID].work_area_names.append("new work area") poll_values[TEST_MOWER_ID].work_area_dict.update({1: "new work area"}) poll_values[TEST_MOWER_ID].work_areas.update( { 1: WorkArea( name="new work area", cutting_height=12, enabled=True, progress=12, last_time_completed=datetime( 2024, 10, 1, 11, 11, 0, tzinfo=dt_util.get_default_time_zone() ), ) } ) mock_automower_client.get_status.return_value = poll_values callback_holder["cb"](websocket_values) await hass.async_block_till_done() assert mock_automower_client.get_status.called state = hass.states.get("sensor.test_mower_1_new_work_area_progress") assert state is not None assert state.state == "12" current_entites_after_addition = len( er.async_entries_for_config_entry(entity_registry, entry.entry_id) ) assert ( current_entites_after_addition == current_entites_start + ADDITIONAL_NUMBER_ENTITIES + ADDITIONAL_SENSOR_ENTITIES + ADDITIONAL_SWITCH_ENTITIES ) poll_values[TEST_MOWER_ID].work_area_names.remove("new work area") del poll_values[TEST_MOWER_ID].work_area_dict[1] del poll_values[TEST_MOWER_ID].work_areas[1] poll_values[TEST_MOWER_ID].work_area_names.remove("Front lawn") del poll_values[TEST_MOWER_ID].work_area_dict[123456] del poll_values[TEST_MOWER_ID].work_areas[123456] poll_values[TEST_MOWER_ID].calendar.tasks = [ task for task in poll_values[TEST_MOWER_ID].calendar.tasks if task.work_area_id not in [1, 123456] ] poll_values[TEST_MOWER_ID].mower.work_area_id = 654321 mock_automower_client.get_status.return_value = poll_values freezer.tick(SCAN_INTERVAL) async_fire_time_changed(hass) await hass.async_block_till_done() current_entites_after_deletion = len( er.async_entries_for_config_entry(entity_registry, entry.entry_id) ) assert ( current_entites_after_deletion == current_entites_start - ADDITIONAL_SWITCH_ENTITIES - ADDITIONAL_NUMBER_ENTITIES - ADDITIONAL_SENSOR_ENTITIES )