Add Elvia integration (#107405)
parent
4f4d79137e
commit
52a692df3e
|
@ -320,6 +320,8 @@ omit =
|
|||
homeassistant/components/elmax/cover.py
|
||||
homeassistant/components/elmax/switch.py
|
||||
homeassistant/components/elv/*
|
||||
homeassistant/components/elvia/__init__.py
|
||||
homeassistant/components/elvia/importer.py
|
||||
homeassistant/components/emby/media_player.py
|
||||
homeassistant/components/emoncms/sensor.py
|
||||
homeassistant/components/emoncms_history/*
|
||||
|
|
|
@ -347,6 +347,8 @@ build.json @home-assistant/supervisor
|
|||
/homeassistant/components/elmax/ @albertogeniola
|
||||
/tests/components/elmax/ @albertogeniola
|
||||
/homeassistant/components/elv/ @majuss
|
||||
/homeassistant/components/elvia/ @ludeeus
|
||||
/tests/components/elvia/ @ludeeus
|
||||
/homeassistant/components/emby/ @mezz64
|
||||
/homeassistant/components/emoncms/ @borpin
|
||||
/homeassistant/components/emonitor/ @bdraco
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
"""The Elvia integration."""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from elvia import error as ElviaError
|
||||
|
||||
from homeassistant.const import CONF_API_TOKEN
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.event import async_track_time_interval
|
||||
|
||||
from .const import CONF_METERING_POINT_ID, LOGGER
|
||||
from .importer import ElviaImporter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
|
||||
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Set up Elvia from a config entry."""
|
||||
importer = ElviaImporter(
|
||||
hass=hass,
|
||||
api_token=entry.data[CONF_API_TOKEN],
|
||||
metering_point_id=entry.data[CONF_METERING_POINT_ID],
|
||||
)
|
||||
|
||||
async def _import_meter_values(_: datetime | None = None) -> None:
|
||||
"""Import meter values."""
|
||||
try:
|
||||
await importer.import_meter_values()
|
||||
except ElviaError.ElviaException as exception:
|
||||
LOGGER.exception("Unknown error %s", exception)
|
||||
|
||||
try:
|
||||
await importer.import_meter_values()
|
||||
except ElviaError.ElviaException as exception:
|
||||
LOGGER.exception("Unknown error %s", exception)
|
||||
return False
|
||||
|
||||
entry.async_on_unload(
|
||||
async_track_time_interval(
|
||||
hass,
|
||||
_import_meter_values,
|
||||
timedelta(minutes=60),
|
||||
)
|
||||
)
|
||||
|
||||
return True
|
|
@ -0,0 +1,119 @@
|
|||
"""Config flow for Elvia integration."""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import timedelta
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from elvia import Elvia, error as ElviaError
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.const import CONF_API_TOKEN
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
from .const import CONF_METERING_POINT_ID, DOMAIN, LOGGER
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from homeassistant.data_entry_flow import FlowResult
|
||||
|
||||
|
||||
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
"""Handle a config flow for Elvia."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize."""
|
||||
self._api_token: str | None = None
|
||||
self._metering_point_ids: list[str] | None = None
|
||||
|
||||
async def async_step_user(
|
||||
self,
|
||||
user_input: dict[str, Any] | None = None,
|
||||
) -> FlowResult:
|
||||
"""Handle the initial step."""
|
||||
errors: dict[str, str] = {}
|
||||
if user_input is not None:
|
||||
self._api_token = api_token = user_input[CONF_API_TOKEN]
|
||||
client = Elvia(meter_value_token=api_token).meter_value()
|
||||
try:
|
||||
results = await client.get_meter_values(
|
||||
start_time=(dt_util.now() - timedelta(hours=1)).isoformat()
|
||||
)
|
||||
|
||||
except ElviaError.AuthError as exception:
|
||||
LOGGER.error("Authentication error %s", exception)
|
||||
errors["base"] = "invalid_auth"
|
||||
except ElviaError.ElviaException as exception:
|
||||
LOGGER.error("Unknown error %s", exception)
|
||||
errors["base"] = "unknown"
|
||||
else:
|
||||
try:
|
||||
self._metering_point_ids = metering_point_ids = [
|
||||
x["meteringPointId"] for x in results["meteringpoints"]
|
||||
]
|
||||
except KeyError:
|
||||
return self.async_abort(reason="no_metering_points")
|
||||
|
||||
if (meter_count := len(metering_point_ids)) > 1:
|
||||
return await self.async_step_select_meter()
|
||||
if meter_count == 1:
|
||||
return await self._create_config_entry(
|
||||
api_token=api_token,
|
||||
metering_point_id=metering_point_ids[0],
|
||||
)
|
||||
|
||||
return self.async_abort(reason="no_metering_points")
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
data_schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_API_TOKEN): str,
|
||||
}
|
||||
),
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
async def async_step_select_meter(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> FlowResult:
|
||||
"""Handle selecting a metering point ID."""
|
||||
if TYPE_CHECKING:
|
||||
assert self._metering_point_ids is not None
|
||||
assert self._api_token is not None
|
||||
|
||||
if user_input is not None:
|
||||
return await self._create_config_entry(
|
||||
api_token=self._api_token,
|
||||
metering_point_id=user_input[CONF_METERING_POINT_ID],
|
||||
)
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="select_meter",
|
||||
data_schema=vol.Schema(
|
||||
{
|
||||
vol.Required(
|
||||
CONF_METERING_POINT_ID,
|
||||
default=self._metering_point_ids[0],
|
||||
): vol.In(self._metering_point_ids),
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
async def _create_config_entry(
|
||||
self,
|
||||
api_token: str,
|
||||
metering_point_id: str,
|
||||
) -> FlowResult:
|
||||
"""Store metering point ID and API token."""
|
||||
if (await self.async_set_unique_id(metering_point_id)) is not None:
|
||||
return self.async_abort(
|
||||
reason="metering_point_id_already_configured",
|
||||
description_placeholders={"metering_point_id": metering_point_id},
|
||||
)
|
||||
return self.async_create_entry(
|
||||
title=metering_point_id,
|
||||
data={
|
||||
CONF_API_TOKEN: api_token,
|
||||
CONF_METERING_POINT_ID: metering_point_id,
|
||||
},
|
||||
)
|
|
@ -0,0 +1,7 @@
|
|||
"""Constants for the Elvia integration."""
|
||||
from logging import getLogger
|
||||
|
||||
DOMAIN = "elvia"
|
||||
LOGGER = getLogger(__package__)
|
||||
|
||||
CONF_METERING_POINT_ID = "metering_point_id"
|
|
@ -0,0 +1,129 @@
|
|||
"""Importer for the Elvia integration."""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
from elvia import Elvia
|
||||
|
||||
from homeassistant.components.recorder.models import StatisticData, StatisticMetaData
|
||||
from homeassistant.components.recorder.statistics import (
|
||||
async_add_external_statistics,
|
||||
get_last_statistics,
|
||||
statistics_during_period,
|
||||
)
|
||||
from homeassistant.components.recorder.util import get_instance
|
||||
from homeassistant.const import UnitOfEnergy
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
from .const import DOMAIN, LOGGER
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from elvia.types.meter_value_types import MeterValueTimeSeries
|
||||
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
|
||||
class ElviaImporter:
|
||||
"""Class to import data from Elvia."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
api_token: str,
|
||||
metering_point_id: str,
|
||||
) -> None:
|
||||
"""Initialize."""
|
||||
self.hass = hass
|
||||
self.client = Elvia(meter_value_token=api_token).meter_value()
|
||||
self.metering_point_id = metering_point_id
|
||||
|
||||
async def _fetch_hourly_data(self, since: datetime) -> list[MeterValueTimeSeries]:
|
||||
"""Fetch hourly data."""
|
||||
LOGGER.debug("Fetching hourly data since %s", since)
|
||||
all_data = await self.client.get_meter_values(
|
||||
start_time=since.isoformat(),
|
||||
metering_point_ids=[self.metering_point_id],
|
||||
)
|
||||
return all_data["meteringpoints"][0]["metervalue"]["timeSeries"]
|
||||
|
||||
async def import_meter_values(self) -> None:
|
||||
"""Import meter values."""
|
||||
statistics: list[StatisticData] = []
|
||||
statistic_id = f"{DOMAIN}:{self.metering_point_id}_consumption"
|
||||
last_stats = await get_instance(self.hass).async_add_executor_job(
|
||||
get_last_statistics,
|
||||
self.hass,
|
||||
1,
|
||||
statistic_id,
|
||||
True,
|
||||
{"sum"},
|
||||
)
|
||||
|
||||
if not last_stats:
|
||||
# First time we insert 1 years of data (if available)
|
||||
hourly_data = await self._fetch_hourly_data(
|
||||
since=dt_util.now() - timedelta(days=365)
|
||||
)
|
||||
if hourly_data is None or len(hourly_data) == 0:
|
||||
return
|
||||
last_stats_time = None
|
||||
_sum = 0.0
|
||||
else:
|
||||
hourly_data = await self._fetch_hourly_data(
|
||||
since=dt_util.utc_from_timestamp(last_stats[statistic_id][0]["end"])
|
||||
)
|
||||
|
||||
if (
|
||||
hourly_data is None
|
||||
or len(hourly_data) == 0
|
||||
or not hourly_data[-1]["verified"]
|
||||
or (from_time := dt_util.parse_datetime(hourly_data[0]["startTime"]))
|
||||
is None
|
||||
):
|
||||
return
|
||||
|
||||
curr_stat = await get_instance(self.hass).async_add_executor_job(
|
||||
statistics_during_period,
|
||||
self.hass,
|
||||
from_time - timedelta(hours=1),
|
||||
None,
|
||||
{statistic_id},
|
||||
"hour",
|
||||
None,
|
||||
{"sum"},
|
||||
)
|
||||
first_stat = curr_stat[statistic_id][0]
|
||||
_sum = cast(float, first_stat["sum"])
|
||||
last_stats_time = first_stat["start"]
|
||||
|
||||
last_stats_time_dt = (
|
||||
dt_util.utc_from_timestamp(last_stats_time) if last_stats_time else None
|
||||
)
|
||||
|
||||
for entry in hourly_data:
|
||||
from_time = dt_util.parse_datetime(entry["startTime"])
|
||||
if from_time is None or (
|
||||
last_stats_time_dt is not None and from_time <= last_stats_time_dt
|
||||
):
|
||||
continue
|
||||
|
||||
_sum += entry["value"]
|
||||
|
||||
statistics.append(
|
||||
StatisticData(start=from_time, state=entry["value"], sum=_sum)
|
||||
)
|
||||
|
||||
async_add_external_statistics(
|
||||
hass=self.hass,
|
||||
metadata=StatisticMetaData(
|
||||
has_mean=False,
|
||||
has_sum=True,
|
||||
name=f"{self.metering_point_id} Consumption",
|
||||
source=DOMAIN,
|
||||
statistic_id=statistic_id,
|
||||
unit_of_measurement=UnitOfEnergy.KILO_WATT_HOUR,
|
||||
),
|
||||
statistics=statistics,
|
||||
)
|
||||
LOGGER.debug("Imported %s statistics", len(statistics))
|
|
@ -0,0 +1,10 @@
|
|||
{
|
||||
"domain": "elvia",
|
||||
"name": "Elvia",
|
||||
"codeowners": ["@ludeeus"],
|
||||
"config_flow": true,
|
||||
"dependencies": ["recorder"],
|
||||
"documentation": "https://www.home-assistant.io/integrations/elvia",
|
||||
"iot_class": "cloud_polling",
|
||||
"requirements": ["elvia==0.1.0"]
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
{
|
||||
"config": {
|
||||
"step": {
|
||||
"user": {
|
||||
"description": "Enter your meter value API token from Elvia",
|
||||
"data": {
|
||||
"api_token": "[%key:common::config_flow::data::api_token%]"
|
||||
}
|
||||
},
|
||||
"select_meter": {
|
||||
"data": {
|
||||
"metering_point_id": "Select your metering point ID"
|
||||
}
|
||||
}
|
||||
},
|
||||
"error": {
|
||||
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
|
||||
"unknown": "[%key:common::config_flow::error::unknown%]"
|
||||
},
|
||||
"abort": {
|
||||
"metering_point_id_already_configured": "Metering point with ID `{metering_point_id}` is already configured.",
|
||||
"no_metering_points": "The provived API token has no metering points."
|
||||
}
|
||||
}
|
||||
}
|
|
@ -139,6 +139,7 @@ FLOWS = {
|
|||
"elgato",
|
||||
"elkm1",
|
||||
"elmax",
|
||||
"elvia",
|
||||
"emonitor",
|
||||
"emulated_roku",
|
||||
"energyzero",
|
||||
|
|
|
@ -1502,6 +1502,12 @@
|
|||
"config_flow": false,
|
||||
"iot_class": "local_polling"
|
||||
},
|
||||
"elvia": {
|
||||
"name": "Elvia",
|
||||
"integration_type": "hub",
|
||||
"config_flow": true,
|
||||
"iot_class": "cloud_polling"
|
||||
},
|
||||
"emby": {
|
||||
"name": "Emby",
|
||||
"integration_type": "hub",
|
||||
|
|
|
@ -766,6 +766,9 @@ elkm1-lib==2.2.6
|
|||
# homeassistant.components.elmax
|
||||
elmax-api==0.0.4
|
||||
|
||||
# homeassistant.components.elvia
|
||||
elvia==0.1.0
|
||||
|
||||
# homeassistant.components.xmpp
|
||||
emoji==2.8.0
|
||||
|
||||
|
|
|
@ -623,6 +623,9 @@ elkm1-lib==2.2.6
|
|||
# homeassistant.components.elmax
|
||||
elmax-api==0.0.4
|
||||
|
||||
# homeassistant.components.elvia
|
||||
elvia==0.1.0
|
||||
|
||||
# homeassistant.components.emulated_roku
|
||||
emulated-roku==0.2.1
|
||||
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
"""Tests for the Elvia integration."""
|
|
@ -0,0 +1,14 @@
|
|||
"""Common fixtures for the Elvia tests."""
|
||||
from collections.abc import Generator
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_setup_entry() -> Generator[AsyncMock, None, None]:
|
||||
"""Override async_setup_entry."""
|
||||
with patch(
|
||||
"homeassistant.components.elvia.async_setup_entry", return_value=True
|
||||
) as mock_setup_entry:
|
||||
yield mock_setup_entry
|
|
@ -0,0 +1,237 @@
|
|||
"""Test the Elvia config flow."""
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
from elvia import error as ElviaError
|
||||
import pytest
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.components.elvia.const import CONF_METERING_POINT_ID, DOMAIN
|
||||
from homeassistant.components.recorder.core import Recorder
|
||||
from homeassistant.const import CONF_API_TOKEN
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.data_entry_flow import FlowResultType, UnknownFlow
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
TEST_API_TOKEN = "xxx-xxx-xxx-xxx"
|
||||
|
||||
|
||||
async def test_single_metering_point(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
mock_setup_entry: AsyncMock,
|
||||
) -> None:
|
||||
"""Test using the config flow with a single metering point."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["errors"] == {}
|
||||
|
||||
with patch(
|
||||
"elvia.meter_value.MeterValue.get_meter_values",
|
||||
return_value={"meteringpoints": [{"meteringPointId": "1234"}]},
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
CONF_API_TOKEN: TEST_API_TOKEN,
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == FlowResultType.CREATE_ENTRY
|
||||
assert result["title"] == "1234"
|
||||
assert result["data"] == {
|
||||
CONF_API_TOKEN: TEST_API_TOKEN,
|
||||
CONF_METERING_POINT_ID: "1234",
|
||||
}
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_multiple_metering_points(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
mock_setup_entry: AsyncMock,
|
||||
) -> None:
|
||||
"""Test using the config flow with multiple metering points."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["errors"] == {}
|
||||
|
||||
with patch(
|
||||
"elvia.meter_value.MeterValue.get_meter_values",
|
||||
return_value={
|
||||
"meteringpoints": [
|
||||
{"meteringPointId": "1234"},
|
||||
{"meteringPointId": "5678"},
|
||||
]
|
||||
},
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
CONF_API_TOKEN: TEST_API_TOKEN,
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["step_id"] == "select_meter"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
CONF_METERING_POINT_ID: "5678",
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == FlowResultType.CREATE_ENTRY
|
||||
assert result["title"] == "5678"
|
||||
assert result["data"] == {
|
||||
CONF_API_TOKEN: TEST_API_TOKEN,
|
||||
CONF_METERING_POINT_ID: "5678",
|
||||
}
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_no_metering_points(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
mock_setup_entry: AsyncMock,
|
||||
) -> None:
|
||||
"""Test using the config flow with no metering points."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["errors"] == {}
|
||||
|
||||
with patch(
|
||||
"elvia.meter_value.MeterValue.get_meter_values",
|
||||
return_value={"meteringpoints": []},
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
CONF_API_TOKEN: TEST_API_TOKEN,
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == FlowResultType.ABORT
|
||||
assert result["reason"] == "no_metering_points"
|
||||
|
||||
assert len(mock_setup_entry.mock_calls) == 0
|
||||
|
||||
|
||||
async def test_bad_data(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
mock_setup_entry: AsyncMock,
|
||||
) -> None:
|
||||
"""Test using the config flow with no metering points."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["errors"] == {}
|
||||
|
||||
with patch(
|
||||
"elvia.meter_value.MeterValue.get_meter_values",
|
||||
return_value={},
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
CONF_API_TOKEN: TEST_API_TOKEN,
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == FlowResultType.ABORT
|
||||
assert result["reason"] == "no_metering_points"
|
||||
|
||||
assert len(mock_setup_entry.mock_calls) == 0
|
||||
|
||||
|
||||
async def test_abort_when_metering_point_id_exist(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
mock_setup_entry: AsyncMock,
|
||||
) -> None:
|
||||
"""Test that we abort when the metering point ID exist."""
|
||||
entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
unique_id="1234",
|
||||
)
|
||||
entry.add_to_hass(hass)
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["errors"] == {}
|
||||
|
||||
with patch(
|
||||
"elvia.meter_value.MeterValue.get_meter_values",
|
||||
return_value={"meteringpoints": [{"meteringPointId": "1234"}]},
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
CONF_API_TOKEN: TEST_API_TOKEN,
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == FlowResultType.ABORT
|
||||
assert result["reason"] == "metering_point_id_already_configured"
|
||||
|
||||
assert len(mock_setup_entry.mock_calls) == 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("side_effect", "base_error"),
|
||||
(
|
||||
(ElviaError.ElviaException("Boom"), "unknown"),
|
||||
(ElviaError.AuthError("Boom", 403, {}, ""), "invalid_auth"),
|
||||
(ElviaError.ElviaServerException("Boom", 500, {}, ""), "unknown"),
|
||||
(ElviaError.ElviaClientException("Boom"), "unknown"),
|
||||
),
|
||||
)
|
||||
async def test_form_exceptions(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
side_effect: Exception,
|
||||
base_error: str,
|
||||
) -> None:
|
||||
"""Test we handle cannot connect error."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
|
||||
with patch(
|
||||
"elvia.meter_value.MeterValue.get_meter_values",
|
||||
side_effect=side_effect,
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
CONF_API_TOKEN: TEST_API_TOKEN,
|
||||
},
|
||||
)
|
||||
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["errors"] == {"base": base_error}
|
||||
|
||||
# Simulate that the user gives up and closes the window...
|
||||
hass.config_entries.flow._async_remove_flow_progress(result["flow_id"])
|
||||
await hass.async_block_till_done()
|
||||
|
||||
with pytest.raises(UnknownFlow):
|
||||
hass.config_entries.flow.async_get(result["flow_id"])
|
Loading…
Reference in New Issue