Add config flow to Rova (#113596)
* Add Config Flow for Rova component * Add tests for Rova config flow * Fix data type * Add rova to requirements for tests * Removed seperate function for area check and global variable * Add unique name and id to rova entities * Add support for multiple rova entries * Fix correct error after connection timeout or http error * Revert SENSOR_TYPES update * Add existing rova configuration from yaml as new entity * Add tests for import configuration.yaml flow * Cleanup code * Update valid rova area check in config flow * Changed abort keys and messages * Updated using self.add_suggested_values_to_schema * Update to pass tests * Added missing strings * Update sensor unique_ids * Fix service name formatting * Update tests for Rova entry * Update tests to recover after error * Update test name * Apply suggestions from code review --------- Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>pull/113753/head
parent
815d120645
commit
1ed8232b02
|
@ -1145,6 +1145,7 @@ omit =
|
|||
homeassistant/components/roon/media_player.py
|
||||
homeassistant/components/roon/server.py
|
||||
homeassistant/components/route53/*
|
||||
homeassistant/components/rova/__init__.py
|
||||
homeassistant/components/rova/sensor.py
|
||||
homeassistant/components/rpi_camera/*
|
||||
homeassistant/components/rtorrent/sensor.py
|
||||
|
|
|
@ -1 +1,46 @@
|
|||
"""The rova component."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from requests.exceptions import ConnectTimeout, HTTPError
|
||||
from rova.rova import Rova
|
||||
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import Platform
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
|
||||
|
||||
from .const import CONF_HOUSE_NUMBER, CONF_HOUSE_NUMBER_SUFFIX, CONF_ZIP_CODE, DOMAIN
|
||||
|
||||
PLATFORMS: list[Platform] = [Platform.SENSOR]
|
||||
|
||||
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Set up ROVA from a config entry."""
|
||||
|
||||
api = Rova(
|
||||
entry.data[CONF_ZIP_CODE],
|
||||
entry.data[CONF_HOUSE_NUMBER],
|
||||
entry.data[CONF_HOUSE_NUMBER_SUFFIX],
|
||||
)
|
||||
|
||||
try:
|
||||
rova_area = await hass.async_add_executor_job(api.is_rova_area)
|
||||
except (ConnectTimeout, HTTPError) as ex:
|
||||
raise ConfigEntryNotReady from ex
|
||||
|
||||
if not rova_area:
|
||||
raise ConfigEntryError("Rova does not collect garbage in this area")
|
||||
|
||||
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = api
|
||||
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
||||
return True
|
||||
|
||||
|
||||
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Unload ROVA config entry."""
|
||||
|
||||
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
|
||||
hass.data[DOMAIN].pop(entry.entry_id)
|
||||
|
||||
return unload_ok
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
"""Config flow for the Rova platform."""
|
||||
|
||||
from typing import Any
|
||||
|
||||
from requests.exceptions import ConnectTimeout, HTTPError
|
||||
from rova.rova import Rova
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant import config_entries
|
||||
|
||||
from .const import CONF_HOUSE_NUMBER, CONF_HOUSE_NUMBER_SUFFIX, CONF_ZIP_CODE, DOMAIN
|
||||
|
||||
|
||||
class RovaConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
"""Handle Rova config flow."""
|
||||
|
||||
VERSION = 1
|
||||
|
||||
async def async_step_user(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> config_entries.ConfigFlowResult:
|
||||
"""Step when user initializes a integration."""
|
||||
errors: dict[str, str] = {}
|
||||
|
||||
if user_input is not None:
|
||||
# generate unique name for rova integration
|
||||
zip_code = user_input[CONF_ZIP_CODE]
|
||||
number = user_input[CONF_HOUSE_NUMBER]
|
||||
suffix = user_input[CONF_HOUSE_NUMBER_SUFFIX]
|
||||
|
||||
await self.async_set_unique_id(f"{zip_code}{number}{suffix}".strip())
|
||||
self._abort_if_unique_id_configured()
|
||||
|
||||
api = Rova(zip_code, number, suffix)
|
||||
|
||||
try:
|
||||
if not await self.hass.async_add_executor_job(api.is_rova_area):
|
||||
errors = {"base": "invalid_rova_area"}
|
||||
except (ConnectTimeout, HTTPError):
|
||||
errors = {"base": "cannot_connect"}
|
||||
|
||||
if not errors:
|
||||
return self.async_create_entry(
|
||||
title=f"{zip_code} {number} {suffix}".strip(),
|
||||
data=user_input,
|
||||
)
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
data_schema=self.add_suggested_values_to_schema(
|
||||
vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_ZIP_CODE): str,
|
||||
vol.Required(CONF_HOUSE_NUMBER): str,
|
||||
vol.Optional(CONF_HOUSE_NUMBER_SUFFIX, default=""): str,
|
||||
}
|
||||
),
|
||||
user_input,
|
||||
),
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
async def async_step_import(
|
||||
self, user_input: dict[str, Any]
|
||||
) -> config_entries.ConfigFlowResult:
|
||||
"""Import the yaml config."""
|
||||
zip_code = user_input[CONF_ZIP_CODE]
|
||||
number = user_input[CONF_HOUSE_NUMBER]
|
||||
suffix = user_input[CONF_HOUSE_NUMBER_SUFFIX]
|
||||
|
||||
await self.async_set_unique_id(f"{zip_code}{number}{suffix}".strip())
|
||||
self._abort_if_unique_id_configured()
|
||||
|
||||
api = Rova(zip_code, number, suffix)
|
||||
|
||||
try:
|
||||
result = await self.hass.async_add_executor_job(api.is_rova_area)
|
||||
|
||||
if result:
|
||||
return self.async_create_entry(
|
||||
title=f"{zip_code} {number} {suffix}".strip(),
|
||||
data={
|
||||
CONF_ZIP_CODE: zip_code,
|
||||
CONF_HOUSE_NUMBER: number,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: suffix,
|
||||
},
|
||||
)
|
||||
return self.async_abort(reason="invalid_rova_area")
|
||||
|
||||
except (ConnectTimeout, HTTPError):
|
||||
return self.async_abort(reason="cannot_connect")
|
|
@ -4,6 +4,10 @@ import logging
|
|||
|
||||
LOGGER = logging.getLogger(__package__)
|
||||
|
||||
DEFAULT_NAME = "Rova"
|
||||
|
||||
CONF_ZIP_CODE = "zip_code"
|
||||
CONF_HOUSE_NUMBER = "house_number"
|
||||
CONF_HOUSE_NUMBER_SUFFIX = "house_number_suffix"
|
||||
|
||||
DOMAIN = "rova"
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
"domain": "rova",
|
||||
"name": "ROVA",
|
||||
"codeowners": [],
|
||||
"config_flow": true,
|
||||
"documentation": "https://www.home-assistant.io/integrations/rova",
|
||||
"iot_class": "cloud_polling",
|
||||
"loggers": ["rova"],
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
from requests.exceptions import ConnectTimeout, HTTPError
|
||||
from rova.rova import Rova
|
||||
|
@ -14,21 +15,31 @@ from homeassistant.components.sensor import (
|
|||
SensorEntity,
|
||||
SensorEntityDescription,
|
||||
)
|
||||
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
|
||||
from homeassistant.const import CONF_MONITORED_CONDITIONS, CONF_NAME
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
|
||||
from homeassistant.data_entry_flow import FlowResultType
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
|
||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||
from homeassistant.util import Throttle
|
||||
from homeassistant.util.dt import get_time_zone
|
||||
|
||||
from .const import CONF_HOUSE_NUMBER, CONF_HOUSE_NUMBER_SUFFIX, CONF_ZIP_CODE, LOGGER
|
||||
from .const import (
|
||||
CONF_HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX,
|
||||
CONF_ZIP_CODE,
|
||||
DOMAIN,
|
||||
LOGGER,
|
||||
)
|
||||
|
||||
ISSUE_PLACEHOLDER = {"url": "/config/integrations/dashboard/add?domain=rova"}
|
||||
|
||||
UPDATE_DELAY = timedelta(hours=12)
|
||||
SCAN_INTERVAL = timedelta(hours=12)
|
||||
|
||||
|
||||
SENSOR_TYPES: dict[str, SensorEntityDescription] = {
|
||||
SENSOR_TYPES = {
|
||||
"bio": SensorEntityDescription(
|
||||
key="gft",
|
||||
name="bio",
|
||||
|
@ -64,39 +75,71 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
|
|||
)
|
||||
|
||||
|
||||
def setup_platform(
|
||||
async def async_setup_platform(
|
||||
hass: HomeAssistant,
|
||||
config: ConfigType,
|
||||
add_entities: AddEntitiesCallback,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
discovery_info: DiscoveryInfoType | None = None,
|
||||
) -> None:
|
||||
"""Create the Rova data service and sensors."""
|
||||
"""Set up the rova sensor platform through yaml configuration."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_IMPORT},
|
||||
data=config,
|
||||
)
|
||||
if (
|
||||
result["type"] == FlowResultType.CREATE_ENTRY
|
||||
or result["reason"] == "already_configured"
|
||||
):
|
||||
async_create_issue(
|
||||
hass,
|
||||
HOMEASSISTANT_DOMAIN,
|
||||
f"deprecated_yaml_{DOMAIN}",
|
||||
breaks_in_ha_version="2024.10.0",
|
||||
is_fixable=False,
|
||||
issue_domain=DOMAIN,
|
||||
severity=IssueSeverity.WARNING,
|
||||
translation_key="deprecated_yaml",
|
||||
translation_placeholders={
|
||||
"domain": DOMAIN,
|
||||
"integration_title": "Rova",
|
||||
},
|
||||
)
|
||||
else:
|
||||
async_create_issue(
|
||||
hass,
|
||||
DOMAIN,
|
||||
f"deprecated_yaml_import_issue_${result['reason']}",
|
||||
breaks_in_ha_version="2024.10.0",
|
||||
is_fixable=False,
|
||||
issue_domain=DOMAIN,
|
||||
severity=IssueSeverity.WARNING,
|
||||
translation_key=f"deprecated_yaml_import_issue_${result['reason']}",
|
||||
translation_placeholders=ISSUE_PLACEHOLDER,
|
||||
)
|
||||
|
||||
zip_code = config[CONF_ZIP_CODE]
|
||||
house_number = config[CONF_HOUSE_NUMBER]
|
||||
house_number_suffix = config[CONF_HOUSE_NUMBER_SUFFIX]
|
||||
platform_name = config[CONF_NAME]
|
||||
|
||||
# Create new Rova object to retrieve data
|
||||
api = Rova(zip_code, house_number, house_number_suffix)
|
||||
|
||||
try:
|
||||
if not api.is_rova_area():
|
||||
LOGGER.error("ROVA does not collect garbage in this area")
|
||||
return
|
||||
except (ConnectTimeout, HTTPError):
|
||||
LOGGER.error("Could not retrieve details from ROVA API")
|
||||
return
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant,
|
||||
entry: ConfigEntry,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
) -> None:
|
||||
"""Add Rova entry."""
|
||||
# get api from hass
|
||||
api: Rova = hass.data[DOMAIN][entry.entry_id]
|
||||
|
||||
# Create rova data service which will retrieve and update the data.
|
||||
data_service = RovaData(api)
|
||||
|
||||
# generate unique name for rova integration
|
||||
name = f"{entry.data[CONF_ZIP_CODE]}{entry.data[CONF_HOUSE_NUMBER]}{entry.data[CONF_HOUSE_NUMBER_SUFFIX]}"
|
||||
|
||||
# Create a new sensor for each garbage type.
|
||||
entities = [
|
||||
RovaSensor(platform_name, SENSOR_TYPES[sensor_key], data_service)
|
||||
for sensor_key in config[CONF_MONITORED_CONDITIONS]
|
||||
RovaSensor(name, description, data_service)
|
||||
for key, description in SENSOR_TYPES.items()
|
||||
]
|
||||
add_entities(entities, True)
|
||||
async_add_entities(entities, True)
|
||||
|
||||
|
||||
class RovaSensor(SensorEntity):
|
||||
|
@ -109,7 +152,8 @@ class RovaSensor(SensorEntity):
|
|||
self.entity_description = description
|
||||
self.data_service = data_service
|
||||
|
||||
self._attr_name = f"{platform_name}_{description.name}"
|
||||
self._attr_name = f"{platform_name}_{description.key}"
|
||||
self._attr_unique_id = f"{platform_name}_{description.key}"
|
||||
self._attr_device_class = SensorDeviceClass.TIMESTAMP
|
||||
|
||||
def update(self) -> None:
|
||||
|
@ -123,10 +167,10 @@ class RovaSensor(SensorEntity):
|
|||
class RovaData:
|
||||
"""Get and update the latest data from the Rova API."""
|
||||
|
||||
def __init__(self, api):
|
||||
def __init__(self, api) -> None:
|
||||
"""Initialize the data object."""
|
||||
self.api = api
|
||||
self.data = {}
|
||||
self.data: dict[str, Any] = {}
|
||||
|
||||
@Throttle(UPDATE_DELAY)
|
||||
def update(self):
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
{
|
||||
"config": {
|
||||
"step": {
|
||||
"user": {
|
||||
"title": "Provide your address details",
|
||||
"data": {
|
||||
"zip_code": "Your zip code",
|
||||
"house_number": "Your house number",
|
||||
"house_number_suffix": "A suffix for your house number"
|
||||
}
|
||||
}
|
||||
},
|
||||
"error": {
|
||||
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]",
|
||||
"invalid_rova_area": "Rova does not collect at this address"
|
||||
},
|
||||
"abort": {
|
||||
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]",
|
||||
"cannot_connect": "Could not connect to the Rova API",
|
||||
"invalid_rova_area": "Rova does not collect at this address"
|
||||
}
|
||||
},
|
||||
"issues": {
|
||||
"deprecated_yaml_import_issue_cannot_connect": {
|
||||
"title": "The Rova YAML configuration import failed",
|
||||
"description": "Configuring Rova using YAML is being removed but there was an connection error importing your YAML configuration.\n\nEnsure connection to Rova works and restart Home Assistant to try again or remove the Rova YAML configuration from your configuration.yaml file and continue to [set up the integration]({url}) manually."
|
||||
},
|
||||
"deprecated_yaml_import_issue_invalid_rova_area": {
|
||||
"title": "The Rova YAML configuration import failed",
|
||||
"description": "There was an error when trying to import your Rova YAML configuration.\n\nRova does not collect at this address.\n\nEnsure the imported configuration is correct and remove the Rova YAML configuration from your configuration.yaml file and continue to [set up the integration]({url}) manually."
|
||||
}
|
||||
}
|
||||
}
|
|
@ -441,6 +441,7 @@ FLOWS = {
|
|||
"romy",
|
||||
"roomba",
|
||||
"roon",
|
||||
"rova",
|
||||
"rpi_power",
|
||||
"rtsp_to_webrtc",
|
||||
"ruckus_unleashed",
|
||||
|
|
|
@ -5042,7 +5042,7 @@
|
|||
"rova": {
|
||||
"name": "ROVA",
|
||||
"integration_type": "hub",
|
||||
"config_flow": false,
|
||||
"config_flow": true,
|
||||
"iot_class": "cloud_polling"
|
||||
},
|
||||
"rss_feed_template": {
|
||||
|
|
|
@ -1895,6 +1895,9 @@ roombapy==1.6.13
|
|||
# homeassistant.components.roon
|
||||
roonapi==0.1.6
|
||||
|
||||
# homeassistant.components.rova
|
||||
rova==0.4.1
|
||||
|
||||
# homeassistant.components.rpi_power
|
||||
rpi-bad-power==0.1.0
|
||||
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
"""Tests for the Rova component."""
|
|
@ -0,0 +1,18 @@
|
|||
"""Common fixtures for Rova tests."""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_rova():
|
||||
"""Mock a successful Rova API."""
|
||||
api = MagicMock()
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.rova.config_flow.Rova",
|
||||
return_value=api,
|
||||
) as api, patch("homeassistant.components.rova.Rova", return_value=api):
|
||||
api.is_rova_area.return_value = True
|
||||
yield api
|
|
@ -0,0 +1,269 @@
|
|||
"""Tests for the Rova config flow."""
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
from requests.exceptions import ConnectTimeout, HTTPError
|
||||
|
||||
from homeassistant import data_entry_flow
|
||||
from homeassistant.components.rova.const import (
|
||||
CONF_HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX,
|
||||
CONF_ZIP_CODE,
|
||||
DOMAIN,
|
||||
)
|
||||
from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
ZIP_CODE = "7991AD"
|
||||
HOUSE_NUMBER = "10"
|
||||
HOUSE_NUMBER_SUFFIX = "a"
|
||||
|
||||
|
||||
async def test_user(hass: HomeAssistant, mock_rova: MagicMock) -> None:
|
||||
"""Test user config."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": SOURCE_USER}
|
||||
)
|
||||
assert result.get("type") == data_entry_flow.FlowResultType.FORM
|
||||
assert result.get("step_id") == "user"
|
||||
|
||||
# test with all information provided
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_USER},
|
||||
data={
|
||||
CONF_ZIP_CODE: ZIP_CODE,
|
||||
CONF_HOUSE_NUMBER: HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: HOUSE_NUMBER_SUFFIX,
|
||||
},
|
||||
)
|
||||
assert result.get("type") == data_entry_flow.FlowResultType.CREATE_ENTRY
|
||||
|
||||
data = result.get("data")
|
||||
assert data
|
||||
assert data[CONF_ZIP_CODE] == ZIP_CODE
|
||||
assert data[CONF_HOUSE_NUMBER] == HOUSE_NUMBER
|
||||
assert data[CONF_HOUSE_NUMBER_SUFFIX] == HOUSE_NUMBER_SUFFIX
|
||||
|
||||
|
||||
async def test_abort_if_not_rova_area(
|
||||
hass: HomeAssistant, mock_rova: MagicMock
|
||||
) -> None:
|
||||
"""Test we abort if rova does not collect at the given address."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": SOURCE_USER}
|
||||
)
|
||||
|
||||
# test with area where rova does not collect
|
||||
mock_rova.return_value.is_rova_area.return_value = False
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
CONF_ZIP_CODE: ZIP_CODE,
|
||||
CONF_HOUSE_NUMBER: HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: HOUSE_NUMBER_SUFFIX,
|
||||
},
|
||||
)
|
||||
|
||||
assert result.get("type") == data_entry_flow.FlowResultType.FORM
|
||||
assert result.get("errors") == {"base": "invalid_rova_area"}
|
||||
|
||||
# now reset the return value and test if we can recover
|
||||
mock_rova.return_value.is_rova_area.return_value = True
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
CONF_ZIP_CODE: ZIP_CODE,
|
||||
CONF_HOUSE_NUMBER: HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: HOUSE_NUMBER_SUFFIX,
|
||||
},
|
||||
)
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
|
||||
assert result["title"] == f"{ZIP_CODE} {HOUSE_NUMBER} {HOUSE_NUMBER_SUFFIX}"
|
||||
assert result["data"] == {
|
||||
CONF_ZIP_CODE: ZIP_CODE,
|
||||
CONF_HOUSE_NUMBER: HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: HOUSE_NUMBER_SUFFIX,
|
||||
}
|
||||
|
||||
|
||||
async def test_abort_if_already_setup(hass: HomeAssistant) -> None:
|
||||
"""Test we abort if rova is already setup."""
|
||||
MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
unique_id=f"{ZIP_CODE}{HOUSE_NUMBER}{HOUSE_NUMBER_SUFFIX}",
|
||||
data={
|
||||
CONF_ZIP_CODE: ZIP_CODE,
|
||||
CONF_HOUSE_NUMBER: HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: HOUSE_NUMBER_SUFFIX,
|
||||
},
|
||||
).add_to_hass(hass)
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_USER},
|
||||
data={
|
||||
CONF_ZIP_CODE: ZIP_CODE,
|
||||
CONF_HOUSE_NUMBER: HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: HOUSE_NUMBER_SUFFIX,
|
||||
},
|
||||
)
|
||||
assert result["type"] == data_entry_flow.FlowResultType.ABORT
|
||||
assert result["reason"] == "already_configured"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("exception", "error"),
|
||||
[
|
||||
(ConnectTimeout(), "cannot_connect"),
|
||||
(HTTPError(), "cannot_connect"),
|
||||
],
|
||||
)
|
||||
async def test_abort_if_api_throws_exception(
|
||||
hass: HomeAssistant, exception: Exception, error: str, mock_rova: MagicMock
|
||||
) -> None:
|
||||
"""Test different exceptions for the Rova entity."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_USER},
|
||||
)
|
||||
|
||||
# test with exception
|
||||
mock_rova.return_value.is_rova_area.side_effect = exception
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
CONF_ZIP_CODE: ZIP_CODE,
|
||||
CONF_HOUSE_NUMBER: HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: HOUSE_NUMBER_SUFFIX,
|
||||
},
|
||||
)
|
||||
assert result.get("type") == data_entry_flow.FlowResultType.FORM
|
||||
assert result.get("errors") == {"base": error}
|
||||
|
||||
# now reset the side effect to see if we can recover
|
||||
mock_rova.return_value.is_rova_area.side_effect = None
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
CONF_ZIP_CODE: ZIP_CODE,
|
||||
CONF_HOUSE_NUMBER: HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: HOUSE_NUMBER_SUFFIX,
|
||||
},
|
||||
)
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
|
||||
assert result["title"] == f"{ZIP_CODE} {HOUSE_NUMBER} {HOUSE_NUMBER_SUFFIX}"
|
||||
assert result["data"] == {
|
||||
CONF_ZIP_CODE: ZIP_CODE,
|
||||
CONF_HOUSE_NUMBER: HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: HOUSE_NUMBER_SUFFIX,
|
||||
}
|
||||
|
||||
|
||||
async def test_import(hass: HomeAssistant, mock_rova: MagicMock) -> None:
|
||||
"""Test import flow."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_IMPORT},
|
||||
data={
|
||||
CONF_ZIP_CODE: ZIP_CODE,
|
||||
CONF_HOUSE_NUMBER: HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: HOUSE_NUMBER_SUFFIX,
|
||||
},
|
||||
)
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
|
||||
assert result["title"] == f"{ZIP_CODE} {HOUSE_NUMBER} {HOUSE_NUMBER_SUFFIX}"
|
||||
assert result["data"] == {
|
||||
CONF_ZIP_CODE: ZIP_CODE,
|
||||
CONF_HOUSE_NUMBER: HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: HOUSE_NUMBER_SUFFIX,
|
||||
}
|
||||
|
||||
|
||||
async def test_import_already_configured(
|
||||
hass: HomeAssistant, mock_rova: MagicMock
|
||||
) -> None:
|
||||
"""Test we abort import flow when entry is already configured."""
|
||||
entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
unique_id=f"{ZIP_CODE}{HOUSE_NUMBER}{HOUSE_NUMBER_SUFFIX}",
|
||||
data={
|
||||
CONF_ZIP_CODE: ZIP_CODE,
|
||||
CONF_HOUSE_NUMBER: HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: HOUSE_NUMBER_SUFFIX,
|
||||
},
|
||||
)
|
||||
entry.add_to_hass(hass)
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_IMPORT},
|
||||
data={
|
||||
CONF_ZIP_CODE: ZIP_CODE,
|
||||
CONF_HOUSE_NUMBER: HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: HOUSE_NUMBER_SUFFIX,
|
||||
},
|
||||
)
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.ABORT
|
||||
assert result["reason"] == "already_configured"
|
||||
|
||||
|
||||
async def test_import_if_not_rova_area(
|
||||
hass: HomeAssistant, mock_rova: MagicMock
|
||||
) -> None:
|
||||
"""Test we abort if rova does not collect at the given address."""
|
||||
|
||||
# test with area where rova does not collect
|
||||
mock_rova.return_value.is_rova_area.return_value = False
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_IMPORT},
|
||||
data={
|
||||
CONF_ZIP_CODE: ZIP_CODE,
|
||||
CONF_HOUSE_NUMBER: HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: HOUSE_NUMBER_SUFFIX,
|
||||
},
|
||||
)
|
||||
|
||||
assert result.get("type") == data_entry_flow.FlowResultType.ABORT
|
||||
assert result.get("reason") == "invalid_rova_area"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("exception", "error"),
|
||||
[
|
||||
(ConnectTimeout(), "cannot_connect"),
|
||||
(HTTPError(), "cannot_connect"),
|
||||
],
|
||||
)
|
||||
async def test_import_connection_errors(
|
||||
hass: HomeAssistant, exception: Exception, error: str, mock_rova: MagicMock
|
||||
) -> None:
|
||||
"""Test import connection errors flow."""
|
||||
|
||||
# test with HTTPError
|
||||
mock_rova.return_value.is_rova_area.side_effect = exception
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_IMPORT},
|
||||
data={
|
||||
CONF_ZIP_CODE: ZIP_CODE,
|
||||
CONF_HOUSE_NUMBER: HOUSE_NUMBER,
|
||||
CONF_HOUSE_NUMBER_SUFFIX: HOUSE_NUMBER_SUFFIX,
|
||||
},
|
||||
)
|
||||
|
||||
assert result.get("type") == data_entry_flow.FlowResultType.ABORT
|
||||
assert result.get("reason") == error
|
Loading…
Reference in New Issue