Add config flow to Rainforest EAGLE-200 (#54846)

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
pull/54778/head^2
Paulus Schoutsen 2021-08-19 09:22:30 -07:00 committed by GitHub
parent 8103d9ae3c
commit 4ae2a26aa3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 552 additions and 142 deletions

View File

@ -838,6 +838,8 @@ omit =
homeassistant/components/rainmachine/binary_sensor.py
homeassistant/components/rainmachine/sensor.py
homeassistant/components/rainmachine/switch.py
homeassistant/components/rainforest_eagle/__init__.py
homeassistant/components/rainforest_eagle/data.py
homeassistant/components/rainforest_eagle/sensor.py
homeassistant/components/raspihats/*
homeassistant/components/raspyrfm/*

View File

@ -1 +1,28 @@
"""The rainforest_eagle component."""
"""The Rainforest Eagle integration."""
from __future__ import annotations
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from . import data
from .const import DOMAIN
PLATFORMS = ("sensor",)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Rainforest Eagle from a config entry."""
coordinator = data.EagleDataCoordinator(hass, entry)
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok

View File

@ -0,0 +1,69 @@
"""Config flow for Rainforest Eagle integration."""
from __future__ import annotations
import logging
from typing import Any
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_TYPE
from homeassistant.data_entry_flow import FlowResult
from . import data
from .const import CONF_CLOUD_ID, CONF_HARDWARE_ADDRESS, CONF_INSTALL_CODE, DOMAIN
_LOGGER = logging.getLogger(__name__)
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_CLOUD_ID): str,
vol.Required(CONF_INSTALL_CODE): str,
}
)
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Rainforest Eagle."""
VERSION = 1
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the initial step."""
if user_input is None:
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA
)
await self.async_set_unique_id(user_input[CONF_CLOUD_ID])
errors = {}
try:
eagle_type, hardware_address = await data.async_get_type(
self.hass, user_input[CONF_CLOUD_ID], user_input[CONF_INSTALL_CODE]
)
except data.CannotConnect:
errors["base"] = "cannot_connect"
except data.InvalidAuth:
errors["base"] = "invalid_auth"
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
user_input[CONF_TYPE] = eagle_type
user_input[CONF_HARDWARE_ADDRESS] = hardware_address
return self.async_create_entry(
title=user_input[CONF_CLOUD_ID], data=user_input
)
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA, errors=errors
)
async def async_step_import(self, user_input: dict[str, Any]) -> FlowResult:
"""Handle the import step."""
await self.async_set_unique_id(user_input[CONF_CLOUD_ID])
self._abort_if_unique_id_configured()
return await self.async_step_user(user_input)

View File

@ -0,0 +1,9 @@
"""Constants for the Rainforest Eagle integration."""
DOMAIN = "rainforest_eagle"
CONF_CLOUD_ID = "cloud_id"
CONF_INSTALL_CODE = "install_code"
CONF_HARDWARE_ADDRESS = "hardware_address"
TYPE_EAGLE_100 = "eagle-100"
TYPE_EAGLE_200 = "eagle-200"

View File

@ -0,0 +1,173 @@
"""Rainforest data."""
from __future__ import annotations
from datetime import timedelta
import logging
import aioeagle
import aiohttp
import async_timeout
from requests.exceptions import ConnectionError as ConnectError, HTTPError, Timeout
from uEagle import Eagle as Eagle100Reader
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_TYPE
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import aiohttp_client
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
CONF_CLOUD_ID,
CONF_HARDWARE_ADDRESS,
CONF_INSTALL_CODE,
TYPE_EAGLE_100,
TYPE_EAGLE_200,
)
_LOGGER = logging.getLogger(__name__)
UPDATE_100_ERRORS = (ConnectError, HTTPError, Timeout, ValueError)
class RainforestError(HomeAssistantError):
"""Base error."""
class CannotConnect(RainforestError):
"""Error to indicate a request failed."""
class InvalidAuth(RainforestError):
"""Error to indicate bad auth."""
async def async_get_type(hass, cloud_id, install_code):
"""Try API call 'get_network_info' to see if target device is Eagle-100 or Eagle-200."""
reader = Eagle100Reader(cloud_id, install_code)
try:
response = await hass.async_add_executor_job(reader.get_network_info)
except UPDATE_100_ERRORS as error:
_LOGGER.error("Failed to connect during setup: %s", error)
raise CannotConnect from error
# Branch to test if target is Legacy Model
if (
"NetworkInfo" in response
and response["NetworkInfo"].get("ModelId") == "Z109-EAGLE"
):
return TYPE_EAGLE_100, None
# Branch to test if target is not an Eagle-200 Model
if (
"Response" not in response
or response["Response"].get("Command") != "get_network_info"
):
# We don't support this
return None, None
# For EAGLE-200, fetch the hardware address of the meter too.
hub = aioeagle.EagleHub(
aiohttp_client.async_get_clientsession(hass), cloud_id, install_code
)
try:
meters = await hub.get_device_list()
except aioeagle.BadAuth as err:
raise InvalidAuth from err
except aiohttp.ClientError as err:
raise CannotConnect from err
if meters:
hardware_address = meters[0].hardware_address
else:
hardware_address = None
return TYPE_EAGLE_200, hardware_address
class EagleDataCoordinator(DataUpdateCoordinator):
"""Get the latest data from the Eagle device."""
eagle100_reader: Eagle100Reader | None = None
eagle200_meter: aioeagle.ElectricMeter | None = None
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Initialize the data object."""
self.entry = entry
if self.type == TYPE_EAGLE_100:
self.model = "EAGLE-100"
update_method = self._async_update_data_100
else:
self.model = "EAGLE-200"
update_method = self._async_update_data_200
super().__init__(
hass,
_LOGGER,
name=entry.data[CONF_CLOUD_ID],
update_interval=timedelta(seconds=30),
update_method=update_method,
)
@property
def cloud_id(self):
"""Return the cloud ID."""
return self.entry.data[CONF_CLOUD_ID]
@property
def type(self):
"""Return entry type."""
return self.entry.data[CONF_TYPE]
@property
def hardware_address(self):
"""Return hardware address of meter."""
return self.entry.data[CONF_HARDWARE_ADDRESS]
async def _async_update_data_200(self):
"""Get the latest data from the Eagle-200 device."""
if self.eagle200_meter is None:
hub = aioeagle.EagleHub(
aiohttp_client.async_get_clientsession(self.hass),
self.cloud_id,
self.entry.data[CONF_INSTALL_CODE],
)
self.eagle200_meter = aioeagle.ElectricMeter.create_instance(
hub, self.hardware_address
)
async with async_timeout.timeout(30):
data = await self.eagle200_meter.get_device_query()
_LOGGER.debug("API data: %s", data)
return {var["Name"]: var["Value"] for var in data.values()}
async def _async_update_data_100(self):
"""Get the latest data from the Eagle-100 device."""
try:
data = await self.hass.async_add_executor_job(self._fetch_data)
except UPDATE_100_ERRORS as error:
raise UpdateFailed from error
_LOGGER.debug("API data: %s", data)
return data
def _fetch_data(self):
"""Fetch and return the four sensor values in a dict."""
if self.eagle100_reader is None:
self.eagle100_reader = Eagle100Reader(
self.cloud_id, self.entry.data[CONF_INSTALL_CODE]
)
out = {}
resp = self.eagle100_reader.get_instantaneous_demand()["InstantaneousDemand"]
out["zigbee:InstantaneousDemand"] = resp["Demand"]
resp = self.eagle100_reader.get_current_summation()["CurrentSummation"]
out["zigbee:CurrentSummationDelivered"] = resp["SummationDelivered"]
out["zigbee:CurrentSummationReceived"] = resp["SummationReceived"]
return out

View File

@ -1,10 +1,11 @@
{
"domain": "rainforest_eagle",
"name": "Rainforest Eagle-200",
"name": "Rainforest Eagle",
"documentation": "https://www.home-assistant.io/integrations/rainforest_eagle",
"requirements": ["eagle200_reader==0.2.4", "uEagle==0.0.2"],
"requirements": ["aioeagle==1.1.0", "uEagle==0.0.2"],
"codeowners": ["@gtdiehl", "@jcalbert"],
"iot_class": "local_polling",
"config_flow": true,
"dhcp": [
{
"macaddress": "D8D5B9*"

View File

@ -1,74 +1,60 @@
"""Support for the Rainforest Eagle-200 energy monitor."""
"""Support for the Rainforest Eagle energy monitor."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import timedelta
import logging
from typing import Any
from eagle200_reader import EagleReader
from requests.exceptions import ConnectionError as ConnectError, HTTPError, Timeout
from uEagle import Eagle as LegacyReader
import voluptuous as vol
from homeassistant.components.sensor import (
DEVICE_CLASS_ENERGY,
PLATFORM_SCHEMA,
STATE_CLASS_MEASUREMENT,
STATE_CLASS_TOTAL_INCREASING,
SensorEntity,
SensorEntityDescription,
StateType,
)
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import (
CONF_IP_ADDRESS,
DEVICE_CLASS_POWER,
ENERGY_KILO_WATT_HOUR,
POWER_KILO_WATT,
)
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType
from homeassistant.helpers.update_coordinator import CoordinatorEntity
CONF_CLOUD_ID = "cloud_id"
CONF_INSTALL_CODE = "install_code"
POWER_KILO_WATT = "kW"
from .const import CONF_CLOUD_ID, CONF_INSTALL_CODE, DOMAIN
from .data import EagleDataCoordinator
_LOGGER = logging.getLogger(__name__)
MIN_SCAN_INTERVAL = timedelta(seconds=30)
@dataclass
class SensorType:
"""Rainforest sensor type."""
name: str
unit_of_measurement: str
device_class: str | None = None
state_class: str | None = None
SENSORS = {
"instantanous_demand": SensorType(
name="Eagle-200 Meter Power Demand",
unit_of_measurement=POWER_KILO_WATT,
SENSORS = (
SensorEntityDescription(
key="zigbee:InstantaneousDemand",
name="Meter Power Demand",
native_unit_of_measurement=POWER_KILO_WATT,
device_class=DEVICE_CLASS_POWER,
),
"summation_delivered": SensorType(
name="Eagle-200 Total Meter Energy Delivered",
unit_of_measurement=ENERGY_KILO_WATT_HOUR,
SensorEntityDescription(
key="zigbee:CurrentSummationDelivered",
name="Total Meter Energy Delivered",
native_unit_of_measurement=ENERGY_KILO_WATT_HOUR,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
"summation_received": SensorType(
name="Eagle-200 Total Meter Energy Received",
unit_of_measurement=ENERGY_KILO_WATT_HOUR,
SensorEntityDescription(
key="zigbee:CurrentSummationReceived",
name="Total Meter Energy Received",
native_unit_of_measurement=ENERGY_KILO_WATT_HOUR,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
"summation_total": SensorType(
name="Eagle-200 Net Meter Energy (Delivered minus Received)",
unit_of_measurement=ENERGY_KILO_WATT_HOUR,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_MEASUREMENT,
),
}
)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
@ -79,104 +65,65 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
)
def hwtest(cloud_id, install_code, ip_address):
"""Try API call 'get_network_info' to see if target device is Legacy or Eagle-200."""
reader = LeagleReader(cloud_id, install_code, ip_address)
response = reader.get_network_info()
# Branch to test if target is Legacy Model
if (
"NetworkInfo" in response
and response["NetworkInfo"].get("ModelId", None) == "Z109-EAGLE"
):
return reader
# Branch to test if target is Eagle-200 Model
if (
"Response" in response
and response["Response"].get("Command", None) == "get_network_info"
):
return EagleReader(ip_address, cloud_id, install_code)
# Catch-all if hardware ID tests fail
raise ValueError("Couldn't determine device model.")
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: dict[str, Any] | None = None,
):
"""Import config as config entry."""
_LOGGER.warning(
"Configuration of the rainforest_eagle platform in YAML is deprecated "
"and will be removed in Home Assistant 2021.11; Your existing configuration "
"has been imported into the UI automatically and can be safely removed "
"from your configuration.yaml file"
)
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={
CONF_CLOUD_ID: config[CONF_CLOUD_ID],
CONF_INSTALL_CODE: config[CONF_INSTALL_CODE],
},
)
)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Create the Eagle-200 sensor."""
ip_address = config[CONF_IP_ADDRESS]
cloud_id = config[CONF_CLOUD_ID]
install_code = config[CONF_INSTALL_CODE]
try:
eagle_reader = hwtest(cloud_id, install_code, ip_address)
except (ConnectError, HTTPError, Timeout, ValueError) as error:
_LOGGER.error("Failed to connect during setup: %s", error)
return
eagle_data = EagleData(eagle_reader)
eagle_data.update()
add_entities(EagleSensor(eagle_data, condition) for condition in SENSORS)
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up a config entry."""
coordinator = hass.data[DOMAIN][entry.entry_id]
async_add_entities(EagleSensor(coordinator, description) for description in SENSORS)
class EagleSensor(SensorEntity):
"""Implementation of the Rainforest Eagle-200 sensor."""
class EagleSensor(CoordinatorEntity, SensorEntity):
"""Implementation of the Rainforest Eagle sensor."""
def __init__(self, eagle_data, sensor_type):
coordinator: EagleDataCoordinator
def __init__(self, coordinator, entity_description):
"""Initialize the sensor."""
self.eagle_data = eagle_data
self._type = sensor_type
sensor_info = SENSORS[sensor_type]
self._attr_name = sensor_info.name
self._attr_native_unit_of_measurement = sensor_info.unit_of_measurement
self._attr_device_class = sensor_info.device_class
self._attr_state_class = sensor_info.state_class
super().__init__(coordinator)
self.entity_description = entity_description
def update(self):
"""Get the energy information from the Rainforest Eagle."""
self.eagle_data.update()
self._attr_native_value = self.eagle_data.get_state(self._type)
@property
def unique_id(self) -> str | None:
"""Return unique ID of entity."""
return f"{self.coordinator.cloud_id}-{self.entity_description.key}"
@property
def native_value(self) -> StateType:
"""Return native value of the sensor."""
return self.coordinator.data.get(self.entity_description.key)
class EagleData:
"""Get the latest data from the Eagle-200 device."""
def __init__(self, eagle_reader):
"""Initialize the data object."""
self._eagle_reader = eagle_reader
self.data = {}
@Throttle(MIN_SCAN_INTERVAL)
def update(self):
"""Get the latest data from the Eagle-200 device."""
try:
self.data = self._eagle_reader.update()
_LOGGER.debug("API data: %s", self.data)
except (ConnectError, HTTPError, Timeout, ValueError) as error:
_LOGGER.error("Unable to connect during update: %s", error)
self.data = {}
def get_state(self, sensor_type):
"""Get the sensor value from the dictionary."""
state = self.data.get(sensor_type)
_LOGGER.debug("Updating: %s - %s", sensor_type, state)
return state
class LeagleReader(LegacyReader, SensorEntity):
"""Wraps uEagle to make it behave like eagle_reader, offering update()."""
def update(self):
"""Fetch and return the four sensor values in a dict."""
out = {}
resp = self.get_instantaneous_demand()["InstantaneousDemand"]
out["instantanous_demand"] = resp["Demand"]
resp = self.get_current_summation()["CurrentSummation"]
out["summation_delivered"] = resp["SummationDelivered"]
out["summation_received"] = resp["SummationReceived"]
out["summation_total"] = out["summation_delivered"] - out["summation_received"]
return out
@property
def device_info(self) -> DeviceInfo | None:
"""Return device info."""
return {
"name": self.coordinator.model,
"identifiers": {(DOMAIN, self.coordinator.cloud_id)},
"manufacturer": "Rainforest Automation",
"model": self.coordinator.model,
}

View File

@ -0,0 +1,20 @@
{
"config": {
"step": {
"user": {
"data": {
"cloud_id": "Cloud ID",
"install_code": "Installation Code"
}
}
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
}
}
}

View File

@ -0,0 +1,20 @@
{
"config": {
"abort": {
"already_configured": "Device is already configured"
},
"error": {
"cannot_connect": "Failed to connect",
"invalid_auth": "Invalid authentication",
"unknown": "Unexpected error"
},
"step": {
"user": {
"data": {
"cloud_id": "Cloud ID",
"install_code": "Installation Code"
}
}
}
}
}

View File

@ -215,6 +215,7 @@ FLOWS = [
"ps4",
"pvpc_hourly_pricing",
"rachio",
"rainforest_eagle",
"rainmachine",
"recollect_waste",
"renault",

View File

@ -161,6 +161,10 @@ DHCP = [
"hostname": "rachio-*",
"macaddress": "74C63B*"
},
{
"domain": "rainforest_eagle",
"macaddress": "D8D5B9*"
},
{
"domain": "ring",
"hostname": "ring*",

View File

@ -254,9 +254,10 @@ class DataUpdateCoordinator(Generic[T]):
finally:
self.logger.debug(
"Finished fetching %s data in %.3f seconds",
"Finished fetching %s data in %.3f seconds (success: %s)",
self.name,
monotonic() - start,
self.last_update_success,
)
if not auth_failed and self._listeners and not self.hass.is_stopping:
self._schedule_refresh()

View File

@ -160,6 +160,9 @@ aiodns==3.0.0
# homeassistant.components.eafm
aioeafm==0.1.2
# homeassistant.components.rainforest_eagle
aioeagle==1.1.0
# homeassistant.components.emonitor
aioemonitor==1.0.5
@ -543,9 +546,6 @@ dweepy==0.3.0
# homeassistant.components.dynalite
dynalite_devices==0.1.46
# homeassistant.components.rainforest_eagle
eagle200_reader==0.2.4
# homeassistant.components.ebusd
ebusdpy==0.0.16

View File

@ -99,6 +99,9 @@ aiodns==3.0.0
# homeassistant.components.eafm
aioeafm==0.1.2
# homeassistant.components.rainforest_eagle
aioeagle==1.1.0
# homeassistant.components.emonitor
aioemonitor==1.0.5
@ -1278,6 +1281,9 @@ twilio==6.32.0
# homeassistant.components.twinkly
twinkly-client==0.0.2
# homeassistant.components.rainforest_eagle
uEagle==0.0.2
# homeassistant.components.upb
upb_lib==0.4.12

View File

@ -15,7 +15,7 @@ async def test_form(hass: HomeAssistant) -> None:
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == RESULT_TYPE_FORM
assert result["errors"] == {}
assert result["errors"] is None
with patch(
"homeassistant.components.NEW_DOMAIN.config_flow.PlaceholderHub.authenticate",

View File

@ -0,0 +1 @@
"""Tests for the Rainforest Eagle integration."""

View File

@ -0,0 +1,129 @@
"""Test the Rainforest Eagle config flow."""
from unittest.mock import patch
from homeassistant import config_entries, setup
from homeassistant.components.rainforest_eagle.const import (
CONF_CLOUD_ID,
CONF_HARDWARE_ADDRESS,
CONF_INSTALL_CODE,
DOMAIN,
TYPE_EAGLE_200,
)
from homeassistant.components.rainforest_eagle.data import CannotConnect, InvalidAuth
from homeassistant.const import CONF_TYPE
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import (
RESULT_TYPE_ABORT,
RESULT_TYPE_CREATE_ENTRY,
RESULT_TYPE_FORM,
)
async def test_form(hass: HomeAssistant) -> None:
"""Test we get the form."""
await setup.async_setup_component(hass, "persistent_notification", {})
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == RESULT_TYPE_FORM
assert result["errors"] is None
with patch(
"homeassistant.components.rainforest_eagle.data.async_get_type",
return_value=(TYPE_EAGLE_200, "mock-hw"),
), patch(
"homeassistant.components.rainforest_eagle.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_CLOUD_ID: "abcdef", CONF_INSTALL_CODE: "123456"},
)
await hass.async_block_till_done()
assert result2["type"] == RESULT_TYPE_CREATE_ENTRY
assert result2["title"] == "abcdef"
assert result2["data"] == {
CONF_TYPE: TYPE_EAGLE_200,
CONF_CLOUD_ID: "abcdef",
CONF_INSTALL_CODE: "123456",
CONF_HARDWARE_ADDRESS: "mock-hw",
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_form_invalid_auth(hass: HomeAssistant) -> None:
"""Test we handle invalid auth."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with patch(
"homeassistant.components.rainforest_eagle.data.Eagle100Reader.get_network_info",
side_effect=InvalidAuth,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_CLOUD_ID: "abcdef", CONF_INSTALL_CODE: "123456"},
)
assert result2["type"] == RESULT_TYPE_FORM
assert result2["errors"] == {"base": "invalid_auth"}
async def test_form_cannot_connect(hass: HomeAssistant) -> None:
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with patch(
"homeassistant.components.rainforest_eagle.data.Eagle100Reader.get_network_info",
side_effect=CannotConnect,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_CLOUD_ID: "abcdef", CONF_INSTALL_CODE: "123456"},
)
assert result2["type"] == RESULT_TYPE_FORM
assert result2["errors"] == {"base": "cannot_connect"}
async def test_import(hass: HomeAssistant) -> None:
"""Test we get the form."""
await setup.async_setup_component(hass, "persistent_notification", {})
with patch(
"homeassistant.components.rainforest_eagle.data.async_get_type",
return_value=(TYPE_EAGLE_200, "mock-hw"),
), patch(
"homeassistant.components.rainforest_eagle.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result = await hass.config_entries.flow.async_init(
DOMAIN,
data={CONF_CLOUD_ID: "abcdef", CONF_INSTALL_CODE: "123456"},
context={"source": config_entries.SOURCE_IMPORT},
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "abcdef"
assert result["data"] == {
CONF_TYPE: TYPE_EAGLE_200,
CONF_CLOUD_ID: "abcdef",
CONF_INSTALL_CODE: "123456",
CONF_HARDWARE_ADDRESS: "mock-hw",
}
assert len(mock_setup_entry.mock_calls) == 1
# Second time we should get already_configured
result2 = await hass.config_entries.flow.async_init(
DOMAIN,
data={CONF_CLOUD_ID: "abcdef", CONF_INSTALL_CODE: "123456"},
context={"source": config_entries.SOURCE_IMPORT},
)
assert result2["type"] == RESULT_TYPE_ABORT
assert result2["reason"] == "already_configured"