"""The tests for the Met Office sensor component.""" import datetime from datetime import timedelta import json from unittest.mock import patch from freezegun.api import FrozenDateTimeFactory import pytest import requests_mock from requests_mock.adapter import _Matcher from syrupy.assertion import SnapshotAssertion from homeassistant.components.metoffice.const import DEFAULT_SCAN_INTERVAL, DOMAIN from homeassistant.components.weather import ( DOMAIN as WEATHER_DOMAIN, LEGACY_SERVICE_GET_FORECAST, SERVICE_GET_FORECASTS, ) from homeassistant.const import STATE_UNAVAILABLE from homeassistant.core import HomeAssistant from homeassistant.helpers import entity_registry as er from homeassistant.helpers.device_registry import async_get as get_dev_reg from homeassistant.util import utcnow from .const import ( DEVICE_KEY_KINGSLYNN, DEVICE_KEY_WAVERTREE, METOFFICE_CONFIG_KINGSLYNN, METOFFICE_CONFIG_WAVERTREE, WAVERTREE_SENSOR_RESULTS, ) from tests.common import MockConfigEntry, async_fire_time_changed, load_fixture from tests.typing import WebSocketGenerator @pytest.fixture def no_sensor(): """Remove sensors.""" with patch( "homeassistant.components.metoffice.sensor.async_setup_entry", return_value=True ) as mock_setup_entry: yield mock_setup_entry @pytest.fixture async def wavertree_data(requests_mock: requests_mock.Mocker) -> dict[str, _Matcher]: """Mock data for the Wavertree location.""" # all metoffice test data encapsulated in here mock_json = json.loads(load_fixture("metoffice.json", "metoffice")) all_sites = json.dumps(mock_json["all_sites"]) wavertree_hourly = json.dumps(mock_json["wavertree_hourly"]) wavertree_daily = json.dumps(mock_json["wavertree_daily"]) sitelist_mock = requests_mock.get( "/public/data/val/wxfcs/all/json/sitelist/", text=all_sites ) wavertree_hourly_mock = requests_mock.get( "/public/data/val/wxfcs/all/json/354107?res=3hourly", text=wavertree_hourly, ) wavertree_daily_mock = requests_mock.get( "/public/data/val/wxfcs/all/json/354107?res=daily", text=wavertree_daily, ) return { "sitelist_mock": sitelist_mock, "wavertree_hourly_mock": wavertree_hourly_mock, "wavertree_daily_mock": wavertree_daily_mock, } @pytest.mark.freeze_time(datetime.datetime(2020, 4, 25, 12, tzinfo=datetime.UTC)) async def test_site_cannot_connect( hass: HomeAssistant, requests_mock: requests_mock.Mocker ) -> None: """Test we handle cannot connect error.""" requests_mock.get("/public/data/val/wxfcs/all/json/sitelist/", text="") requests_mock.get("/public/data/val/wxfcs/all/json/354107?res=3hourly", text="") requests_mock.get("/public/data/val/wxfcs/all/json/354107?res=daily", text="") entry = MockConfigEntry( domain=DOMAIN, data=METOFFICE_CONFIG_WAVERTREE, ) entry.add_to_hass(hass) await hass.config_entries.async_setup(entry.entry_id) await hass.async_block_till_done() dev_reg = get_dev_reg(hass) assert len(dev_reg.devices) == 0 assert hass.states.get("weather.met_office_wavertree_3hourly") is None assert hass.states.get("weather.met_office_wavertree_daily") is None for sensor_id in WAVERTREE_SENSOR_RESULTS: sensor_name, _ = WAVERTREE_SENSOR_RESULTS[sensor_id] sensor = hass.states.get(f"sensor.wavertree_{sensor_name}") assert sensor is None @pytest.mark.freeze_time(datetime.datetime(2020, 4, 25, 12, tzinfo=datetime.UTC)) async def test_site_cannot_update( hass: HomeAssistant, entity_registry: er.EntityRegistry, requests_mock: requests_mock.Mocker, wavertree_data, ) -> None: """Test we handle cannot connect error.""" entry = MockConfigEntry( domain=DOMAIN, data=METOFFICE_CONFIG_WAVERTREE, ) entry.add_to_hass(hass) await hass.config_entries.async_setup(entry.entry_id) await hass.async_block_till_done() weather = hass.states.get("weather.met_office_wavertree_daily") assert weather requests_mock.get("/public/data/val/wxfcs/all/json/354107?res=3hourly", text="") requests_mock.get("/public/data/val/wxfcs/all/json/354107?res=daily", text="") future_time = utcnow() + timedelta(minutes=20) async_fire_time_changed(hass, future_time) await hass.async_block_till_done(wait_background_tasks=True) weather = hass.states.get("weather.met_office_wavertree_daily") assert weather.state == STATE_UNAVAILABLE @pytest.mark.freeze_time(datetime.datetime(2020, 4, 25, 12, tzinfo=datetime.UTC)) async def test_one_weather_site_running( hass: HomeAssistant, entity_registry: er.EntityRegistry, requests_mock: requests_mock.Mocker, wavertree_data, ) -> None: """Test the Met Office weather platform.""" entry = MockConfigEntry( domain=DOMAIN, data=METOFFICE_CONFIG_WAVERTREE, ) entry.add_to_hass(hass) await hass.config_entries.async_setup(entry.entry_id) await hass.async_block_till_done() dev_reg = get_dev_reg(hass) assert len(dev_reg.devices) == 1 device_wavertree = dev_reg.async_get_device(identifiers=DEVICE_KEY_WAVERTREE) assert device_wavertree.name == "Met Office Wavertree" # Wavertree daily weather platform expected results weather = hass.states.get("weather.met_office_wavertree_daily") assert weather assert weather.state == "sunny" assert weather.attributes.get("temperature") == 19 assert weather.attributes.get("wind_speed") == 14.48 assert weather.attributes.get("wind_bearing") == "SSE" assert weather.attributes.get("humidity") == 50 @pytest.mark.freeze_time(datetime.datetime(2020, 4, 25, 12, tzinfo=datetime.UTC)) async def test_two_weather_sites_running( hass: HomeAssistant, entity_registry: er.EntityRegistry, requests_mock: requests_mock.Mocker, wavertree_data, ) -> None: """Test we handle two different weather sites both running.""" # all metoffice test data encapsulated in here mock_json = json.loads(load_fixture("metoffice.json", "metoffice")) kingslynn_hourly = json.dumps(mock_json["kingslynn_hourly"]) kingslynn_daily = json.dumps(mock_json["kingslynn_daily"]) requests_mock.get( "/public/data/val/wxfcs/all/json/322380?res=3hourly", text=kingslynn_hourly ) requests_mock.get( "/public/data/val/wxfcs/all/json/322380?res=daily", text=kingslynn_daily ) entry = MockConfigEntry( domain=DOMAIN, data=METOFFICE_CONFIG_WAVERTREE, ) entry.add_to_hass(hass) await hass.config_entries.async_setup(entry.entry_id) entry2 = MockConfigEntry( domain=DOMAIN, data=METOFFICE_CONFIG_KINGSLYNN, ) entry2.add_to_hass(hass) await hass.config_entries.async_setup(entry2.entry_id) await hass.async_block_till_done() dev_reg = get_dev_reg(hass) assert len(dev_reg.devices) == 2 device_kingslynn = dev_reg.async_get_device(identifiers=DEVICE_KEY_KINGSLYNN) assert device_kingslynn.name == "Met Office King's Lynn" device_wavertree = dev_reg.async_get_device(identifiers=DEVICE_KEY_WAVERTREE) assert device_wavertree.name == "Met Office Wavertree" # Wavertree daily weather platform expected results weather = hass.states.get("weather.met_office_wavertree_daily") assert weather assert weather.state == "sunny" assert weather.attributes.get("temperature") == 19 assert weather.attributes.get("wind_speed") == 14.48 assert weather.attributes.get("wind_speed_unit") == "km/h" assert weather.attributes.get("wind_bearing") == "SSE" assert weather.attributes.get("humidity") == 50 # King's Lynn daily weather platform expected results weather = hass.states.get("weather.met_office_king_s_lynn_daily") assert weather assert weather.state == "cloudy" assert weather.attributes.get("temperature") == 9 assert weather.attributes.get("wind_speed") == 6.44 assert weather.attributes.get("wind_speed_unit") == "km/h" assert weather.attributes.get("wind_bearing") == "ESE" assert weather.attributes.get("humidity") == 75 @pytest.mark.freeze_time(datetime.datetime(2020, 4, 25, 12, tzinfo=datetime.UTC)) async def test_new_config_entry( hass: HomeAssistant, entity_registry: er.EntityRegistry, no_sensor, wavertree_data ) -> None: """Test the expected entities are created.""" entry = MockConfigEntry( domain=DOMAIN, data=METOFFICE_CONFIG_WAVERTREE, ) entry.add_to_hass(hass) await hass.config_entries.async_setup(entry.entry_id) await hass.async_block_till_done() assert len(hass.states.async_entity_ids(WEATHER_DOMAIN)) == 1 entry = hass.config_entries.async_entries()[0] assert len(er.async_entries_for_config_entry(entity_registry, entry.entry_id)) == 1 @pytest.mark.freeze_time(datetime.datetime(2020, 4, 25, 12, tzinfo=datetime.UTC)) @pytest.mark.parametrize( ("service"), [ SERVICE_GET_FORECASTS, LEGACY_SERVICE_GET_FORECAST, ], ) async def test_forecast_service( hass: HomeAssistant, freezer: FrozenDateTimeFactory, requests_mock: requests_mock.Mocker, snapshot: SnapshotAssertion, no_sensor, wavertree_data: dict[str, _Matcher], service: str, ) -> None: """Test multiple forecast.""" entry = MockConfigEntry( domain=DOMAIN, data=METOFFICE_CONFIG_WAVERTREE, ) entry.add_to_hass(hass) await hass.config_entries.async_setup(entry.entry_id) await hass.async_block_till_done() assert wavertree_data["wavertree_daily_mock"].call_count == 1 assert wavertree_data["wavertree_hourly_mock"].call_count == 1 for forecast_type in ("daily", "hourly"): response = await hass.services.async_call( WEATHER_DOMAIN, service, { "entity_id": "weather.met_office_wavertree_daily", "type": forecast_type, }, blocking=True, return_response=True, ) assert response == snapshot # Calling the services should use cached data assert wavertree_data["wavertree_daily_mock"].call_count == 1 assert wavertree_data["wavertree_hourly_mock"].call_count == 1 # Trigger data refetch freezer.tick(DEFAULT_SCAN_INTERVAL + timedelta(seconds=1)) async_fire_time_changed(hass) await hass.async_block_till_done(wait_background_tasks=True) assert wavertree_data["wavertree_daily_mock"].call_count == 2 assert wavertree_data["wavertree_hourly_mock"].call_count == 1 for forecast_type in ("daily", "hourly"): response = await hass.services.async_call( WEATHER_DOMAIN, service, { "entity_id": "weather.met_office_wavertree_daily", "type": forecast_type, }, blocking=True, return_response=True, ) assert response == snapshot # Calling the services should update the hourly forecast assert wavertree_data["wavertree_daily_mock"].call_count == 2 assert wavertree_data["wavertree_hourly_mock"].call_count == 2 # Update fails requests_mock.get("/public/data/val/wxfcs/all/json/354107?res=3hourly", text="") freezer.tick(DEFAULT_SCAN_INTERVAL + timedelta(seconds=1)) async_fire_time_changed(hass) await hass.async_block_till_done(wait_background_tasks=True) response = await hass.services.async_call( WEATHER_DOMAIN, service, { "entity_id": "weather.met_office_wavertree_daily", "type": "hourly", }, blocking=True, return_response=True, ) assert response == snapshot @pytest.mark.freeze_time(datetime.datetime(2020, 4, 25, 12, tzinfo=datetime.UTC)) async def test_legacy_config_entry_is_removed( hass: HomeAssistant, entity_registry: er.EntityRegistry, no_sensor, wavertree_data ) -> None: """Test the expected entities are created.""" # Pre-create the hourly entity entity_registry.async_get_or_create( WEATHER_DOMAIN, DOMAIN, "53.38374_-2.90929", suggested_object_id="met_office_wavertree_3_hourly", ) entry = MockConfigEntry( domain=DOMAIN, data=METOFFICE_CONFIG_WAVERTREE, ) entry.add_to_hass(hass) await hass.config_entries.async_setup(entry.entry_id) await hass.async_block_till_done() assert len(hass.states.async_entity_ids("weather")) == 1 entry = hass.config_entries.async_entries()[0] assert len(er.async_entries_for_config_entry(entity_registry, entry.entry_id)) == 1 @pytest.mark.freeze_time(datetime.datetime(2020, 4, 25, 12, tzinfo=datetime.UTC)) @pytest.mark.parametrize("forecast_type", ["daily", "hourly"]) async def test_forecast_subscription( hass: HomeAssistant, hass_ws_client: WebSocketGenerator, entity_registry: er.EntityRegistry, freezer: FrozenDateTimeFactory, snapshot: SnapshotAssertion, no_sensor, wavertree_data: dict[str, _Matcher], forecast_type: str, ) -> None: """Test multiple forecast.""" client = await hass_ws_client(hass) entry = MockConfigEntry( domain=DOMAIN, data=METOFFICE_CONFIG_WAVERTREE, ) entry.add_to_hass(hass) await hass.config_entries.async_setup(entry.entry_id) await hass.async_block_till_done() await client.send_json_auto_id( { "type": "weather/subscribe_forecast", "forecast_type": forecast_type, "entity_id": "weather.met_office_wavertree_daily", } ) msg = await client.receive_json() assert msg["success"] assert msg["result"] is None subscription_id = msg["id"] msg = await client.receive_json() assert msg["id"] == subscription_id assert msg["type"] == "event" forecast1 = msg["event"]["forecast"] assert forecast1 != [] assert forecast1 == snapshot freezer.tick(DEFAULT_SCAN_INTERVAL + timedelta(seconds=1)) async_fire_time_changed(hass) await hass.async_block_till_done(wait_background_tasks=True) msg = await client.receive_json() assert msg["id"] == subscription_id assert msg["type"] == "event" forecast2 = msg["event"]["forecast"] assert forecast2 != [] assert forecast2 == snapshot await client.send_json_auto_id( { "type": "unsubscribe_events", "subscription": subscription_id, } ) freezer.tick(timedelta(seconds=1)) async_fire_time_changed(hass) await hass.async_block_till_done(wait_background_tasks=True) msg = await client.receive_json() assert msg["success"]