Switch to config_flow for Environment Canada (#57127)

* Add config_flow to Environment Canada

* Add unique_id

* Remove erroneous directory.

* Tests working!!

* Add back setup.

* First cut of import.

* Temp

* Tweak names.

* Import config.yaml.

* Clean up imports.

* Import working! Some refactor to clean it up.

* Add import test.

* Small optimization.

* Fix comments from code review.

* Remove CONF_NAME and config_flow for it.

* Fixup strings to match new config_flow.

* Fixes for comments from last review.

* Update tests to match new import code.

* Clean up use of CONF_TITLE; fix lint error on push.

* Phew. More cleanup on import. Really streamlined now!

* Update tests.

* Fix lint error.

* Fix lint error, try 2.

* Revert unique_id to use location as part of ID.

* Fix code review comments.

* Fix review comments.
pull/57532/head
Glenn Waters 2021-10-11 11:33:29 -04:00 committed by GitHub
parent 8ee6662cff
commit d0b37229dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 546 additions and 92 deletions

View File

@ -269,7 +269,10 @@ omit =
homeassistant/components/enphase_envoy/__init__.py
homeassistant/components/enphase_envoy/sensor.py
homeassistant/components/entur_public_transport/*
homeassistant/components/environment_canada/*
homeassistant/components/environment_canada/__init__.py
homeassistant/components/environment_canada/camera.py
homeassistant/components/environment_canada/sensor.py
homeassistant/components/environment_canada/weather.py
homeassistant/components/envirophat/sensor.py
homeassistant/components/envisalink/*
homeassistant/components/ephember/climate.py

View File

@ -151,7 +151,7 @@ homeassistant/components/enigma2/* @fbradyirl
homeassistant/components/enocean/* @bdurrer
homeassistant/components/enphase_envoy/* @gtdiehl
homeassistant/components/entur_public_transport/* @hfurubotten
homeassistant/components/environment_canada/* @michaeldavie
homeassistant/components/environment_canada/* @gwww @michaeldavie
homeassistant/components/ephember/* @ttroy50
homeassistant/components/epson/* @pszafer
homeassistant/components/epsonworkforce/* @ThaStealth

View File

@ -1 +1,79 @@
"""A component for Environment Canada weather."""
"""The Environment Canada (EC) component."""
from functools import partial
import logging
from env_canada import ECData, ECRadar
from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.const import CONF_LATITUDE, CONF_LONGITUDE
from .const import CONF_LANGUAGE, CONF_STATION, DOMAIN
PLATFORMS = ["camera", "sensor", "weather"]
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass, config_entry):
"""Set up EC as config entry."""
lat = config_entry.data.get(CONF_LATITUDE)
lon = config_entry.data.get(CONF_LONGITUDE)
station = config_entry.data.get(CONF_STATION)
lang = config_entry.data.get(CONF_LANGUAGE, "English")
weather_api = {}
weather_init = partial(
ECData, station_id=station, coordinates=(lat, lon), language=lang.lower()
)
weather_data = await hass.async_add_executor_job(weather_init)
weather_api["weather_data"] = weather_data
radar_init = partial(ECRadar, coordinates=(lat, lon))
radar_data = await hass.async_add_executor_job(radar_init)
weather_api["radar_data"] = radar_data
await hass.async_add_executor_job(radar_data.get_loop)
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][config_entry.entry_id] = weather_api
hass.config_entries.async_setup_platforms(config_entry, PLATFORMS)
return True
async def async_unload_entry(hass, config_entry):
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(
config_entry, PLATFORMS
)
hass.data[DOMAIN].pop(config_entry.entry_id)
return unload_ok
def trigger_import(hass, config):
"""Trigger a import of YAML config into a config_entry."""
_LOGGER.warning(
"Environment Canada YAML configuration is deprecated; your YAML configuration "
"has been imported into the UI and can be safely removed"
)
if not config.get(CONF_LANGUAGE):
config[CONF_LANGUAGE] = "English"
data = {}
for key in (
CONF_STATION,
CONF_LATITUDE,
CONF_LONGITUDE,
CONF_LANGUAGE,
): # pylint: disable=consider-using-tuple
if config.get(key):
data[key] = config[key]
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=data
)
)

View File

@ -2,8 +2,10 @@
from __future__ import annotations
import datetime
import logging
from env_canada import ECRadar
from env_canada import get_station_coords
from requests.exceptions import ConnectionError as RequestsConnectionError
import voluptuous as vol
from homeassistant.components.camera import PLATFORM_SCHEMA, Camera
@ -16,15 +18,17 @@ from homeassistant.const import (
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
ATTR_UPDATED = "updated"
from . import trigger_import
from .const import CONF_ATTRIBUTION, CONF_STATION, DOMAIN
CONF_ATTRIBUTION = "Data provided by Environment Canada"
CONF_STATION = "station"
CONF_LOOP = "loop"
CONF_PRECIP_TYPE = "precip_type"
ATTR_UPDATED = "updated"
MIN_TIME_BETWEEN_UPDATES = datetime.timedelta(minutes=10)
_LOGGER = logging.getLogger(__name__)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_LOOP, default=True): cv.boolean,
@ -37,35 +41,47 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
)
def setup_platform(hass, config, add_devices, discovery_info=None):
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the Environment Canada camera."""
if config.get(CONF_STATION):
radar_object = ECRadar(
station_id=config[CONF_STATION], precip_type=config.get(CONF_PRECIP_TYPE)
lat, lon = await hass.async_add_executor_job(
get_station_coords, config[CONF_STATION]
)
else:
lat = config.get(CONF_LATITUDE, hass.config.latitude)
lon = config.get(CONF_LONGITUDE, hass.config.longitude)
radar_object = ECRadar(
coordinates=(lat, lon), precip_type=config.get(CONF_PRECIP_TYPE)
)
add_devices(
[ECCamera(radar_object, config.get(CONF_NAME), config[CONF_LOOP])], True
config[CONF_LATITUDE] = lat
config[CONF_LONGITUDE] = lon
trigger_import(hass, config)
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Add a weather entity from a config_entry."""
radar_data = hass.data[DOMAIN][config_entry.entry_id]["radar_data"]
async_add_entities(
[
ECCamera(
radar_data,
f"{config_entry.title} Radar",
f"{config_entry.unique_id}-radar",
),
]
)
class ECCamera(Camera):
"""Implementation of an Environment Canada radar camera."""
def __init__(self, radar_object, camera_name, is_loop):
def __init__(self, radar_object, camera_name, unique_id):
"""Initialize the camera."""
super().__init__()
self.radar_object = radar_object
self.camera_name = camera_name
self.is_loop = is_loop
self._attr_name = camera_name
self._attr_unique_id = unique_id
self.content_type = "image/gif"
self.image = None
self.timestamp = None
@ -77,13 +93,6 @@ class ECCamera(Camera):
self.update()
return self.image
@property
def name(self):
"""Return the name of the camera."""
if self.camera_name is not None:
return self.camera_name
return "Environment Canada Radar"
@property
def extra_state_attributes(self):
"""Return the state attributes of the device."""
@ -92,8 +101,10 @@ class ECCamera(Camera):
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Update radar image."""
if self.is_loop:
try:
self.image = self.radar_object.get_loop()
else:
self.image = self.radar_object.get_latest_frame()
except RequestsConnectionError:
_LOGGER.warning("Radar data update failed due to rate limiting")
return
self.timestamp = self.radar_object.timestamp

View File

@ -0,0 +1,108 @@
"""Config flow for Environment Canada integration."""
from functools import partial
import logging
import xml.etree.ElementTree as et
import aiohttp
from env_canada import ECData
import voluptuous as vol
from homeassistant import config_entries, exceptions
from homeassistant.const import CONF_LATITUDE, CONF_LONGITUDE
from homeassistant.helpers import config_validation as cv
from .const import CONF_LANGUAGE, CONF_STATION, CONF_TITLE, DOMAIN
_LOGGER = logging.getLogger(__name__)
async def validate_input(hass, data):
"""Validate the user input allows us to connect."""
lat = data.get(CONF_LATITUDE)
lon = data.get(CONF_LONGITUDE)
station = data.get(CONF_STATION)
lang = data.get(CONF_LANGUAGE)
weather_init = partial(
ECData, station_id=station, coordinates=(lat, lon), language=lang.lower()
)
weather_data = await hass.async_add_executor_job(weather_init)
if weather_data.metadata.get("location") is None:
raise TooManyAttempts
if lat is None or lon is None:
lat = weather_data.lat
lon = weather_data.lon
return {
CONF_TITLE: weather_data.metadata.get("location"),
CONF_STATION: weather_data.station_id,
CONF_LATITUDE: lat,
CONF_LONGITUDE: lon,
}
class EnvironmentCanadaConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Environment Canada weather."""
VERSION = 1
async def async_step_user(self, user_input=None):
"""Handle the initial step."""
errors = {}
if user_input is not None:
try:
info = await validate_input(self.hass, user_input)
except TooManyAttempts:
errors["base"] = "too_many_attempts"
except et.ParseError:
errors["base"] = "bad_station_id"
except aiohttp.ClientConnectionError:
errors["base"] = "cannot_connect"
except aiohttp.ClientResponseError as err:
if err.status == 404:
errors["base"] = "bad_station_id"
else:
errors["base"] = "error_response"
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
if not errors:
user_input[CONF_STATION] = info[CONF_STATION]
user_input[CONF_LATITUDE] = info[CONF_LATITUDE]
user_input[CONF_LONGITUDE] = info[CONF_LONGITUDE]
# The combination of station and language are unique for all EC weather reporting
await self.async_set_unique_id(
f"{user_input[CONF_STATION]}-{user_input[CONF_LANGUAGE].lower()}"
)
self._abort_if_unique_id_configured()
return self.async_create_entry(title=info[CONF_TITLE], data=user_input)
data_schema = vol.Schema(
{
vol.Optional(CONF_STATION): str,
vol.Optional(
CONF_LATITUDE, default=self.hass.config.latitude
): cv.latitude,
vol.Optional(
CONF_LONGITUDE, default=self.hass.config.longitude
): cv.longitude,
vol.Required(CONF_LANGUAGE, default="English"): vol.In(
["English", "French"]
),
}
)
return self.async_show_form(
step_id="user", data_schema=data_schema, errors=errors
)
async def async_step_import(self, import_data):
"""Import entry from configuration.yaml."""
return await self.async_step_user(import_data)
class TooManyAttempts(exceptions.HomeAssistantError):
"""Error to indicate station ID is missing, invalid, or not in EC database."""

View File

@ -0,0 +1,9 @@
"""Constants for EC component."""
ATTR_OBSERVATION_TIME = "observation_time"
ATTR_STATION = "station"
CONF_ATTRIBUTION = "Data provided by Environment Canada"
CONF_LANGUAGE = "language"
CONF_STATION = "station"
CONF_TITLE = "title"
DOMAIN = "environment_canada"

View File

@ -2,7 +2,8 @@
"domain": "environment_canada",
"name": "Environment Canada",
"documentation": "https://www.home-assistant.io/integrations/environment_canada",
"requirements": ["env_canada==0.2.5"],
"codeowners": ["@michaeldavie"],
"requirements": ["env_canada==0.2.7"],
"codeowners": ["@gwww", "@michaeldavie"],
"config_flow": true,
"iot_class": "cloud_polling"
}

View File

@ -3,7 +3,6 @@ from datetime import datetime, timedelta
import logging
import re
from env_canada import ECData
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA, SensorEntity
@ -17,23 +16,20 @@ from homeassistant.const import (
)
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
from . import trigger_import
from .const import ATTR_STATION, CONF_ATTRIBUTION, CONF_LANGUAGE, CONF_STATION, DOMAIN
SCAN_INTERVAL = timedelta(minutes=10)
ATTR_UPDATED = "updated"
ATTR_STATION = "station"
ATTR_TIME = "alert time"
CONF_ATTRIBUTION = "Data provided by Environment Canada"
CONF_STATION = "station"
CONF_LANGUAGE = "language"
_LOGGER = logging.getLogger(__name__)
def validate_station(station):
"""Check that the station ID is well-formed."""
if station is None:
return
return None
if not re.fullmatch(r"[A-Z]{2}/s0000\d{3}", station):
raise vol.error.Invalid('Station ID must be of the form "XX/s0000###"')
return station
@ -49,47 +45,45 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
)
def setup_platform(hass, config, add_entities, discovery_info=None):
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the Environment Canada sensor."""
trigger_import(hass, config)
if config.get(CONF_STATION):
ec_data = ECData(
station_id=config[CONF_STATION], language=config.get(CONF_LANGUAGE)
)
else:
lat = config.get(CONF_LATITUDE, hass.config.latitude)
lon = config.get(CONF_LONGITUDE, hass.config.longitude)
ec_data = ECData(coordinates=(lat, lon), language=config.get(CONF_LANGUAGE))
sensor_list = list(ec_data.conditions) + list(ec_data.alerts)
add_entities([ECSensor(sensor_type, ec_data) for sensor_type in sensor_list], True)
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Add a weather entity from a config_entry."""
weather_data = hass.data[DOMAIN][config_entry.entry_id]["weather_data"]
sensor_list = list(weather_data.conditions) + list(weather_data.alerts)
async_add_entities(
[
ECSensor(
sensor_type,
f"{config_entry.title} {sensor_type}",
weather_data,
f"{weather_data.metadata['location']}-{sensor_type}",
)
for sensor_type in sensor_list
],
True,
)
class ECSensor(SensorEntity):
"""Implementation of an Environment Canada sensor."""
def __init__(self, sensor_type, ec_data):
def __init__(self, sensor_type, name, ec_data, unique_id):
"""Initialize the sensor."""
self.sensor_type = sensor_type
self.ec_data = ec_data
self._unique_id = None
self._name = None
self._attr_unique_id = unique_id
self._attr_name = name
self._state = None
self._attr = None
self._unit = None
self._device_class = None
@property
def unique_id(self) -> str:
"""Return the unique ID of the sensor."""
return self._unique_id
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def native_value(self):
"""Return the state of the sensor."""
@ -119,9 +113,7 @@ class ECSensor(SensorEntity):
metadata = self.ec_data.metadata
sensor_data = conditions.get(self.sensor_type)
self._unique_id = f"{metadata['location']}-{self.sensor_type}"
self._attr = {}
self._name = sensor_data.get("label")
value = sensor_data.get("value")
if isinstance(value, list):
@ -133,7 +125,9 @@ class ECSensor(SensorEntity):
self._state = str(value).capitalize()
elif value is not None and len(value) > 255:
self._state = value[:255]
_LOGGER.info("Value for %s truncated to 255 characters", self._unique_id)
_LOGGER.info(
"Value for %s truncated to 255 characters", self._attr_unique_id
)
else:
self._state = value

View File

@ -0,0 +1,23 @@
{
"config": {
"step": {
"user": {
"title": "Environment Canada: weather location and language",
"description": "Either a station ID or latitude/longitude must be specified. The default latitude/longitude used are the values configured in your Home Assistant installation. The closest weather station to the coordinates will be used if specifying coordinates. If a station code is used it must follow the format: PP/code, where PP is the two-letter province and code is the station ID. The list of station IDs can be found here: https://dd.weather.gc.ca/citypage_weather/docs/site_list_towns_en.csv. Weather information can be retrieved in either English or French.",
"data": {
"latitude": "[%key:common::config_flow::data::latitude%]",
"longitude": "[%key:common::config_flow::data::longitude%]",
"station": "Weather station ID",
"language": "Weather information language"
}
}
},
"error": {
"bad_station_id": "Station ID is invalid, missing, or not found in the station ID database",
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"error_response": "Response from Environment Canada in error",
"too_many_attempts": "Connections to Environment Canada are rate limited; Try again in 60 seconds",
"unknown": "[%key:common::config_flow::error::unknown%]"
}
}
}

View File

@ -0,0 +1,23 @@
{
"config": {
"error": {
"bad_station_id": "Station ID is invalid, missing, or not found in the station ID database",
"cannot_connect": "Failed to connect",
"error_response": "Response from Environment Canada in error",
"too_many_attempts": "Connections to Environment Canada are rate limited; Try again in 60 seconds",
"unknown": "Unexpected error"
},
"step": {
"user": {
"data": {
"language": "Weather information language",
"latitude": "Latitude",
"longitude": "Longitude",
"station": "Weather station ID"
},
"description": "Either a station ID or latitude/longitude must be specified. The default latitude/longitude used are the values configured in your Home Assistant installation. The closest weather station to the coordinates will be used if specifying coordinates. If a station code is used it must follow the format: PP/code, where PP is the two-letter province and code is the station ID. The list of station IDs can be found here: https://dd.weather.gc.ca/citypage_weather/docs/site_list_towns_en.csv. Weather information can be retrieved in either English or French.",
"title": "Environment Canada: weather location and language"
}
}
}
}

View File

@ -1,8 +1,8 @@
"""Platform for retrieving meteorological data from Environment Canada."""
import datetime
import logging
import re
from env_canada import ECData
import voluptuous as vol
from homeassistant.components.weather import (
@ -30,17 +30,20 @@ from homeassistant.const import CONF_LATITUDE, CONF_LONGITUDE, CONF_NAME, TEMP_C
import homeassistant.helpers.config_validation as cv
from homeassistant.util import dt
from . import trigger_import
from .const import CONF_ATTRIBUTION, CONF_STATION, DOMAIN
CONF_FORECAST = "forecast"
CONF_ATTRIBUTION = "Data provided by Environment Canada"
CONF_STATION = "station"
_LOGGER = logging.getLogger(__name__)
def validate_station(station):
"""Check that the station ID is well-formed."""
if station is None:
return
return None
if not re.fullmatch(r"[A-Z]{2}/s0000\d{3}", station):
raise vol.error.Invalid('Station ID must be of the form "XX/s0000###"')
raise vol.Invalid('Station ID must be of the form "XX/s0000###"')
return station
@ -72,45 +75,59 @@ ICON_CONDITION_MAP = {
}
def setup_platform(hass, config, add_devices, discovery_info=None):
async def async_setup_platform(hass, config, async_add_entries, discovery_info=None):
"""Set up the Environment Canada weather."""
if config.get(CONF_STATION):
ec_data = ECData(station_id=config[CONF_STATION])
else:
lat = config.get(CONF_LATITUDE, hass.config.latitude)
lon = config.get(CONF_LONGITUDE, hass.config.longitude)
ec_data = ECData(coordinates=(lat, lon))
trigger_import(hass, config)
add_devices([ECWeather(ec_data, config)])
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Add a weather entity from a config_entry."""
weather_data = hass.data[DOMAIN][config_entry.entry_id]["weather_data"]
async_add_entities(
[
ECWeather(
weather_data,
f"{config_entry.title}",
config_entry.data,
"daily",
f"{config_entry.unique_id}-daily",
),
ECWeather(
weather_data,
f"{config_entry.title} Hourly",
config_entry.data,
"hourly",
f"{config_entry.unique_id}-hourly",
),
]
)
class ECWeather(WeatherEntity):
"""Representation of a weather condition."""
def __init__(self, ec_data, config):
def __init__(self, ec_data, name, config, forecast_type, unique_id):
"""Initialize Environment Canada weather."""
self.ec_data = ec_data
self.platform_name = config.get(CONF_NAME)
self.forecast_type = config[CONF_FORECAST]
self.config = config
self._attr_name = name
self._attr_unique_id = unique_id
self.forecast_type = forecast_type
@property
def attribution(self):
"""Return the attribution."""
return CONF_ATTRIBUTION
@property
def name(self):
"""Return the name of the weather entity."""
if self.platform_name:
return self.platform_name
return self.ec_data.metadata.get("location")
@property
def temperature(self):
"""Return the temperature."""
if self.ec_data.conditions.get("temperature", {}).get("value"):
return float(self.ec_data.conditions["temperature"]["value"])
if self.ec_data.hourly_forecasts[0].get("temperature"):
if self.ec_data.hourly_forecasts and self.ec_data.hourly_forecasts[0].get(
"temperature"
):
return float(self.ec_data.hourly_forecasts[0]["temperature"])
return None
@ -161,7 +178,9 @@ class ECWeather(WeatherEntity):
if self.ec_data.conditions.get("icon_code", {}).get("value"):
icon_code = self.ec_data.conditions["icon_code"]["value"]
elif self.ec_data.hourly_forecasts[0].get("icon_code"):
elif self.ec_data.hourly_forecasts and self.ec_data.hourly_forecasts[0].get(
"icon_code"
):
icon_code = self.ec_data.hourly_forecasts[0]["icon_code"]
if icon_code:
@ -184,6 +203,8 @@ def get_forecast(ec_data, forecast_type):
if forecast_type == "daily":
half_days = ec_data.daily_forecasts
if not half_days:
return None
today = {
ATTR_FORECAST_TIME: dt.now().isoformat(),

View File

@ -77,6 +77,7 @@ FLOWS = [
"emulated_roku",
"enocean",
"enphase_envoy",
"environment_canada",
"epson",
"esphome",
"ezviz",

View File

@ -591,7 +591,7 @@ enocean==0.50
enturclient==0.2.2
# homeassistant.components.environment_canada
env_canada==0.2.5
env_canada==0.2.7
# homeassistant.components.envirophat
# envirophat==0.0.6

View File

@ -356,6 +356,9 @@ emulated_roku==0.2.1
# homeassistant.components.enocean
enocean==0.50
# homeassistant.components.environment_canada
env_canada==0.2.7
# homeassistant.components.enphase_envoy
envoy_reader==0.20.0

View File

@ -0,0 +1 @@
"""Tests for the Environment Canada integration."""

View File

@ -0,0 +1,178 @@
"""Test the Environment Canada (EC) config flow."""
from unittest.mock import MagicMock, Mock, patch
import xml.etree.ElementTree as et
import aiohttp
import pytest
from homeassistant import config_entries, data_entry_flow
from homeassistant.components.environment_canada.const import (
CONF_LANGUAGE,
CONF_STATION,
DOMAIN,
)
from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.const import CONF_LATITUDE, CONF_LONGITUDE
from tests.common import MockConfigEntry
FAKE_CONFIG = {
CONF_STATION: "ON/s1234567",
CONF_LANGUAGE: "English",
CONF_LATITUDE: 42.42,
CONF_LONGITUDE: -42.42,
}
FAKE_TITLE = "Universal title!"
def mocked_ec(
station_id=FAKE_CONFIG[CONF_STATION],
lat=FAKE_CONFIG[CONF_LATITUDE],
lon=FAKE_CONFIG[CONF_LONGITUDE],
lang=FAKE_CONFIG[CONF_LANGUAGE],
update=None,
metadata={"location": FAKE_TITLE},
):
"""Mock the env_canada library."""
ec_mock = MagicMock()
ec_mock.station_id = station_id
ec_mock.lat = lat
ec_mock.lon = lon
ec_mock.language = lang
ec_mock.metadata = metadata
if update:
ec_mock.update = update
else:
ec_mock.update = Mock()
return patch(
"homeassistant.components.environment_canada.config_flow.ECData",
return_value=ec_mock,
)
async def test_create_entry(hass):
"""Test creating an entry."""
with mocked_ec(), patch(
"homeassistant.components.environment_canada.async_setup_entry",
return_value=True,
):
flow = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
flow["flow_id"], FAKE_CONFIG
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["data"] == FAKE_CONFIG
assert result["title"] == FAKE_TITLE
async def test_create_same_entry_twice(hass):
"""Test duplicate entries."""
entry = MockConfigEntry(
domain=DOMAIN,
data=FAKE_CONFIG,
unique_id="ON/s1234567-english",
)
entry.add_to_hass(hass)
with mocked_ec(), patch(
"homeassistant.components.environment_canada.async_setup_entry",
return_value=True,
):
flow = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
flow["flow_id"], FAKE_CONFIG
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
async def test_too_many_attempts(hass):
"""Test hitting rate limit."""
with mocked_ec(metadata={}), patch(
"homeassistant.components.environment_canada.async_setup_entry",
return_value=True,
):
flow = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
flow["flow_id"],
{},
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {"base": "too_many_attempts"}
@pytest.mark.parametrize(
"error",
[
(aiohttp.ClientResponseError(Mock(), (), status=404), "bad_station_id"),
(aiohttp.ClientResponseError(Mock(), (), status=400), "error_response"),
(aiohttp.ClientConnectionError, "cannot_connect"),
(et.ParseError, "bad_station_id"),
(ValueError, "unknown"),
],
)
async def test_exception_handling(hass, error):
"""Test exception handling."""
exc, base_error = error
with patch(
"homeassistant.components.environment_canada.config_flow.ECData",
side_effect=exc,
):
flow = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
flow["flow_id"],
{},
)
await hass.async_block_till_done()
assert result["type"] == "form"
assert result["errors"] == {"base": base_error}
async def test_lat_or_lon_not_specified(hass):
"""Test that the import step works."""
with mocked_ec(), patch(
"homeassistant.components.environment_canada.async_setup_entry",
return_value=True,
):
fake_config = dict(FAKE_CONFIG)
del fake_config[CONF_LATITUDE]
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=fake_config
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["data"] == FAKE_CONFIG
assert result["title"] == FAKE_TITLE
async def test_async_step_import(hass):
"""Test that the import step works."""
with mocked_ec(), patch(
"homeassistant.components.environment_canada.async_setup_entry",
return_value=True,
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=FAKE_CONFIG
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["data"] == FAKE_CONFIG
assert result["title"] == FAKE_TITLE
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=FAKE_CONFIG
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT