Add NextDNS integration (#74150)

* Initial commit

* Update manifest

* Add first test

* Simplify init

* More tests

* Update tests

* More tests

* More tests

* Add tests for sensor platform

* More tests for sensor platform

* Add tests for system_health

* Fix typo

* Improve test coverage

* Improve test coverage

* Add tests for diagnostics

* Add comment

* Run hassfest

* Fix typo

* Run gen_requirements_all

* Fix tests

* Change key name in diagnostics

* Remove diagnostics and system_health platforms

* Bump library
pull/74360/head
Maciej Bieniek 2022-07-03 18:51:50 +02:00 committed by GitHub
parent e7e940afa5
commit 84119eefaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1342 additions and 0 deletions

View File

@ -698,6 +698,8 @@ build.json @home-assistant/supervisor
/homeassistant/components/nextbus/ @vividboarder
/tests/components/nextbus/ @vividboarder
/homeassistant/components/nextcloud/ @meichthys
/homeassistant/components/nextdns/ @bieniu
/tests/components/nextdns/ @bieniu
/homeassistant/components/nfandroidtv/ @tkdrob
/tests/components/nfandroidtv/ @tkdrob
/homeassistant/components/nightscout/ @marciogranzotto

View File

@ -0,0 +1,185 @@
"""The NextDNS component."""
from __future__ import annotations
import asyncio
from datetime import timedelta
import logging
from aiohttp.client_exceptions import ClientConnectorError
from async_timeout import timeout
from nextdns import (
AnalyticsDnssec,
AnalyticsEncryption,
AnalyticsIpVersions,
AnalyticsProtocols,
AnalyticsStatus,
ApiError,
InvalidApiKeyError,
NextDns,
)
from nextdns.model import NextDnsData
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_API_KEY
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
ATTR_DNSSEC,
ATTR_ENCRYPTION,
ATTR_IP_VERSIONS,
ATTR_PROTOCOLS,
ATTR_STATUS,
CONF_PROFILE_ID,
DOMAIN,
UPDATE_INTERVAL_ANALYTICS,
)
class NextDnsUpdateCoordinator(DataUpdateCoordinator):
"""Class to manage fetching NextDNS data API."""
def __init__(
self,
hass: HomeAssistant,
nextdns: NextDns,
profile_id: str,
update_interval: timedelta,
) -> None:
"""Initialize."""
self.nextdns = nextdns
self.profile_id = profile_id
self.profile_name = nextdns.get_profile_name(profile_id)
self.device_info = DeviceInfo(
configuration_url=f"https://my.nextdns.io/{profile_id}/setup",
entry_type=DeviceEntryType.SERVICE,
identifiers={(DOMAIN, str(profile_id))},
manufacturer="NextDNS Inc.",
name=self.profile_name,
)
super().__init__(hass, _LOGGER, name=DOMAIN, update_interval=update_interval)
async def _async_update_data(self) -> NextDnsData:
"""Update data via library."""
raise NotImplementedError("Update method not implemented")
class NextDnsStatusUpdateCoordinator(NextDnsUpdateCoordinator):
"""Class to manage fetching NextDNS analytics status data from API."""
async def _async_update_data(self) -> AnalyticsStatus:
"""Update data via library."""
try:
async with timeout(10):
return await self.nextdns.get_analytics_status(self.profile_id)
except (ApiError, ClientConnectorError, InvalidApiKeyError) as err:
raise UpdateFailed(err) from err
class NextDnsDnssecUpdateCoordinator(NextDnsUpdateCoordinator):
"""Class to manage fetching NextDNS analytics Dnssec data from API."""
async def _async_update_data(self) -> AnalyticsDnssec:
"""Update data via library."""
try:
async with timeout(10):
return await self.nextdns.get_analytics_dnssec(self.profile_id)
except (ApiError, ClientConnectorError, InvalidApiKeyError) as err:
raise UpdateFailed(err) from err
class NextDnsEncryptionUpdateCoordinator(NextDnsUpdateCoordinator):
"""Class to manage fetching NextDNS analytics encryption data from API."""
async def _async_update_data(self) -> AnalyticsEncryption:
"""Update data via library."""
try:
async with timeout(10):
return await self.nextdns.get_analytics_encryption(self.profile_id)
except (ApiError, ClientConnectorError, InvalidApiKeyError) as err:
raise UpdateFailed(err) from err
class NextDnsIpVersionsUpdateCoordinator(NextDnsUpdateCoordinator):
"""Class to manage fetching NextDNS analytics IP versions data from API."""
async def _async_update_data(self) -> AnalyticsIpVersions:
"""Update data via library."""
try:
async with timeout(10):
return await self.nextdns.get_analytics_ip_versions(self.profile_id)
except (ApiError, ClientConnectorError, InvalidApiKeyError) as err:
raise UpdateFailed(err) from err
class NextDnsProtocolsUpdateCoordinator(NextDnsUpdateCoordinator):
"""Class to manage fetching NextDNS analytics protocols data from API."""
async def _async_update_data(self) -> AnalyticsProtocols:
"""Update data via library."""
try:
async with timeout(10):
return await self.nextdns.get_analytics_protocols(self.profile_id)
except (ApiError, ClientConnectorError, InvalidApiKeyError) as err:
raise UpdateFailed(err) from err
_LOGGER = logging.getLogger(__name__)
PLATFORMS = ["sensor"]
COORDINATORS = [
(ATTR_DNSSEC, NextDnsDnssecUpdateCoordinator, UPDATE_INTERVAL_ANALYTICS),
(ATTR_ENCRYPTION, NextDnsEncryptionUpdateCoordinator, UPDATE_INTERVAL_ANALYTICS),
(ATTR_IP_VERSIONS, NextDnsIpVersionsUpdateCoordinator, UPDATE_INTERVAL_ANALYTICS),
(ATTR_PROTOCOLS, NextDnsProtocolsUpdateCoordinator, UPDATE_INTERVAL_ANALYTICS),
(ATTR_STATUS, NextDnsStatusUpdateCoordinator, UPDATE_INTERVAL_ANALYTICS),
]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up NextDNS as config entry."""
api_key = entry.data[CONF_API_KEY]
profile_id = entry.data[CONF_PROFILE_ID]
websession = async_get_clientsession(hass)
try:
async with timeout(10):
nextdns = await NextDns.create(websession, api_key)
except (ApiError, ClientConnectorError, asyncio.TimeoutError) as err:
raise ConfigEntryNotReady from err
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN].setdefault(entry.entry_id, {})
tasks = []
# Independent DataUpdateCoordinator is used for each API endpoint to avoid
# unnecessary requests when entities using this endpoint are disabled.
for coordinator_name, coordinator_class, update_interval in COORDINATORS:
hass.data[DOMAIN][entry.entry_id][coordinator_name] = coordinator_class(
hass, nextdns, profile_id, update_interval
)
tasks.append(
hass.data[DOMAIN][entry.entry_id][coordinator_name].async_refresh()
)
await asyncio.gather(*tasks)
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok: bool = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok

View File

@ -0,0 +1,88 @@
"""Adds config flow for NextDNS."""
from __future__ import annotations
import asyncio
from typing import Any
from aiohttp.client_exceptions import ClientConnectorError
from async_timeout import timeout
from nextdns import ApiError, InvalidApiKeyError, NextDns
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_API_KEY
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import CONF_PROFILE_ID, CONF_PROFILE_NAME, DOMAIN
class NextDnsFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Config flow for NextDNS."""
VERSION = 1
def __init__(self) -> None:
"""Initialize the config flow."""
self.nextdns: NextDns
self.api_key: str
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle a flow initialized by the user."""
errors: dict[str, str] = {}
websession = async_get_clientsession(self.hass)
if user_input is not None:
self.api_key = user_input[CONF_API_KEY]
try:
async with timeout(10):
self.nextdns = await NextDns.create(
websession, user_input[CONF_API_KEY]
)
except InvalidApiKeyError:
errors["base"] = "invalid_api_key"
except (ApiError, ClientConnectorError, asyncio.TimeoutError):
errors["base"] = "cannot_connect"
except Exception: # pylint: disable=broad-except
errors["base"] = "unknown"
else:
return await self.async_step_profiles()
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({vol.Required(CONF_API_KEY): str}),
errors=errors,
)
async def async_step_profiles(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the profiles step."""
errors: dict[str, str] = {}
if user_input is not None:
profile_name = user_input[CONF_PROFILE_NAME]
profile_id = self.nextdns.get_profile_id(profile_name)
await self.async_set_unique_id(profile_id)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=profile_name,
data={CONF_PROFILE_ID: profile_id, CONF_API_KEY: self.api_key},
)
return self.async_show_form(
step_id="profiles",
data_schema=vol.Schema(
{
vol.Required(CONF_PROFILE_NAME): vol.In(
[profile.name for profile in self.nextdns.profiles]
)
}
),
errors=errors,
)

View File

@ -0,0 +1,15 @@
"""Constants for NextDNS integration."""
from datetime import timedelta
ATTR_DNSSEC = "dnssec"
ATTR_ENCRYPTION = "encryption"
ATTR_IP_VERSIONS = "ip_versions"
ATTR_PROTOCOLS = "protocols"
ATTR_STATUS = "status"
CONF_PROFILE_ID = "profile_id"
CONF_PROFILE_NAME = "profile_name"
UPDATE_INTERVAL_ANALYTICS = timedelta(minutes=10)
DOMAIN = "nextdns"

View File

@ -0,0 +1,10 @@
{
"domain": "nextdns",
"name": "NextDNS",
"documentation": "https://www.home-assistant.io/integrations/nextdns",
"codeowners": ["@bieniu"],
"requirements": ["nextdns==1.0.0"],
"config_flow": true,
"iot_class": "cloud_polling",
"loggers": ["nextdns"]
}

View File

@ -0,0 +1,299 @@
"""Support for the NextDNS service."""
from __future__ import annotations
from dataclasses import dataclass
from typing import cast
from homeassistant.components.sensor import (
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import PERCENTAGE
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import NextDnsUpdateCoordinator
from .const import (
ATTR_DNSSEC,
ATTR_ENCRYPTION,
ATTR_IP_VERSIONS,
ATTR_PROTOCOLS,
ATTR_STATUS,
DOMAIN,
)
PARALLEL_UPDATES = 1
@dataclass
class NextDnsSensorRequiredKeysMixin:
"""Class for NextDNS entity required keys."""
coordinator_type: str
@dataclass
class NextDnsSensorEntityDescription(
SensorEntityDescription, NextDnsSensorRequiredKeysMixin
):
"""NextDNS sensor entity description."""
SENSORS = (
NextDnsSensorEntityDescription(
key="all_queries",
coordinator_type=ATTR_STATUS,
entity_category=EntityCategory.DIAGNOSTIC,
icon="mdi:dns",
name="{profile_name} DNS Queries",
native_unit_of_measurement="queries",
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="blocked_queries",
coordinator_type=ATTR_STATUS,
entity_category=EntityCategory.DIAGNOSTIC,
icon="mdi:dns",
name="{profile_name} DNS Queries Blocked",
native_unit_of_measurement="queries",
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="relayed_queries",
coordinator_type=ATTR_STATUS,
entity_category=EntityCategory.DIAGNOSTIC,
icon="mdi:dns",
name="{profile_name} DNS Queries Relayed",
native_unit_of_measurement="queries",
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="blocked_queries_ratio",
coordinator_type=ATTR_STATUS,
entity_category=EntityCategory.DIAGNOSTIC,
icon="mdi:dns",
name="{profile_name} DNS Queries Blocked Ratio",
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="doh_queries",
coordinator_type=ATTR_PROTOCOLS,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
icon="mdi:dns",
name="{profile_name} DNS-over-HTTPS Queries",
native_unit_of_measurement="queries",
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="dot_queries",
coordinator_type=ATTR_PROTOCOLS,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
icon="mdi:dns",
name="{profile_name} DNS-over-TLS Queries",
native_unit_of_measurement="queries",
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="doq_queries",
coordinator_type=ATTR_PROTOCOLS,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
icon="mdi:dns",
name="{profile_name} DNS-over-QUIC Queries",
native_unit_of_measurement="queries",
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="udp_queries",
coordinator_type=ATTR_PROTOCOLS,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
icon="mdi:dns",
name="{profile_name} UDP Queries",
native_unit_of_measurement="queries",
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="doh_queries_ratio",
coordinator_type=ATTR_PROTOCOLS,
entity_registry_enabled_default=False,
icon="mdi:dns",
entity_category=EntityCategory.DIAGNOSTIC,
name="{profile_name} DNS-over-HTTPS Queries Ratio",
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="dot_queries_ratio",
coordinator_type=ATTR_PROTOCOLS,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
icon="mdi:dns",
name="{profile_name} DNS-over-TLS Queries Ratio",
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="doq_queries_ratio",
coordinator_type=ATTR_PROTOCOLS,
entity_registry_enabled_default=False,
icon="mdi:dns",
entity_category=EntityCategory.DIAGNOSTIC,
name="{profile_name} DNS-over-QUIC Queries Ratio",
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="udp_queries_ratio",
coordinator_type=ATTR_PROTOCOLS,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
icon="mdi:dns",
name="{profile_name} UDP Queries Ratio",
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="encrypted_queries",
coordinator_type=ATTR_ENCRYPTION,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
icon="mdi:lock",
name="{profile_name} Encrypted Queries",
native_unit_of_measurement="queries",
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="unencrypted_queries",
coordinator_type=ATTR_ENCRYPTION,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
icon="mdi:lock-open",
name="{profile_name} Unencrypted Queries",
native_unit_of_measurement="queries",
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="encrypted_queries_ratio",
coordinator_type=ATTR_ENCRYPTION,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
icon="mdi:lock",
name="{profile_name} Encrypted Queries Ratio",
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="ipv4_queries",
coordinator_type=ATTR_IP_VERSIONS,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
icon="mdi:ip",
name="{profile_name} IPv4 Queries",
native_unit_of_measurement="queries",
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="ipv6_queries",
coordinator_type=ATTR_IP_VERSIONS,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
icon="mdi:ip",
name="{profile_name} IPv6 Queries",
native_unit_of_measurement="queries",
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="ipv6_queries_ratio",
coordinator_type=ATTR_IP_VERSIONS,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
icon="mdi:ip",
name="{profile_name} IPv6 Queries Ratio",
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="validated_queries",
coordinator_type=ATTR_DNSSEC,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
icon="mdi:lock-check",
name="{profile_name} DNSSEC Validated Queries",
native_unit_of_measurement="queries",
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="not_validated_queries",
coordinator_type=ATTR_DNSSEC,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
icon="mdi:lock-alert",
name="{profile_name} DNSSEC Not Validated Queries",
native_unit_of_measurement="queries",
state_class=SensorStateClass.MEASUREMENT,
),
NextDnsSensorEntityDescription(
key="validated_queries_ratio",
coordinator_type=ATTR_DNSSEC,
entity_category=EntityCategory.DIAGNOSTIC,
entity_registry_enabled_default=False,
icon="mdi:lock-check",
name="{profile_name} DNSSEC Validated Queries Ratio",
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
),
)
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Add a NextDNS entities from a config_entry."""
sensors: list[NextDnsSensor] = []
coordinators = hass.data[DOMAIN][entry.entry_id]
for description in SENSORS:
sensors.append(
NextDnsSensor(coordinators[description.coordinator_type], description)
)
async_add_entities(sensors)
class NextDnsSensor(CoordinatorEntity, SensorEntity):
"""Define an NextDNS sensor."""
coordinator: NextDnsUpdateCoordinator
def __init__(
self,
coordinator: NextDnsUpdateCoordinator,
description: SensorEntityDescription,
) -> None:
"""Initialize."""
super().__init__(coordinator)
self._attr_device_info = coordinator.device_info
self._attr_unique_id = f"{coordinator.profile_id}_{description.key}"
self._attr_name = cast(str, description.name).format(
profile_name=coordinator.profile_name
)
self._attr_native_value = getattr(coordinator.data, description.key)
self.entity_description = description
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self._attr_native_value = getattr(
self.coordinator.data, self.entity_description.key
)
self.async_write_ha_state()

View File

@ -0,0 +1,24 @@
{
"config": {
"step": {
"user": {
"data": {
"api_key": "[%key:common::config_flow::data::api_key%]"
}
},
"profiles": {
"data": {
"profile": "Profile"
}
}
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_api_key": "[%key:common::config_flow::error::invalid_api_key%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"already_configured": "This NextDNS profile is already configured."
}
}
}

View File

@ -235,6 +235,7 @@ FLOWS = {
"netatmo",
"netgear",
"nexia",
"nextdns",
"nfandroidtv",
"nightscout",
"nina",

View File

@ -1094,6 +1094,9 @@ nextcloudmonitor==1.1.0
# homeassistant.components.discord
nextcord==2.0.0a8
# homeassistant.components.nextdns
nextdns==1.0.0
# homeassistant.components.niko_home_control
niko-home-control==0.2.1

View File

@ -762,6 +762,9 @@ nexia==2.0.1
# homeassistant.components.discord
nextcord==2.0.0a8
# homeassistant.components.nextdns
nextdns==1.0.0
# homeassistant.components.nfandroidtv
notifications-android-tv==0.1.5

View File

@ -0,0 +1,63 @@
"""Tests for the NextDNS integration."""
from unittest.mock import patch
from nextdns import (
AnalyticsDnssec,
AnalyticsEncryption,
AnalyticsIpVersions,
AnalyticsProtocols,
AnalyticsStatus,
)
from homeassistant.components.nextdns.const import CONF_PROFILE_ID, DOMAIN
from homeassistant.const import CONF_API_KEY
from tests.common import MockConfigEntry
PROFILES = [{"id": "xyz12", "fingerprint": "aabbccdd123", "name": "Fake Profile"}]
STATUS = AnalyticsStatus(
default_queries=40, allowed_queries=30, blocked_queries=20, relayed_queries=10
)
DNSSEC = AnalyticsDnssec(not_validated_queries=25, validated_queries=75)
ENCRYPTION = AnalyticsEncryption(encrypted_queries=60, unencrypted_queries=40)
IP_VERSIONS = AnalyticsIpVersions(ipv4_queries=90, ipv6_queries=10)
PROTOCOLS = AnalyticsProtocols(
doh_queries=20, doq_queries=10, dot_queries=30, udp_queries=40
)
async def init_integration(hass, add_to_hass=True) -> MockConfigEntry:
"""Set up the NextDNS integration in Home Assistant."""
entry = MockConfigEntry(
domain=DOMAIN,
title="Fake Profile",
unique_id="xyz12",
data={CONF_API_KEY: "fake_api_key", CONF_PROFILE_ID: "xyz12"},
)
if not add_to_hass:
return entry
with patch(
"homeassistant.components.nextdns.NextDns.get_profiles", return_value=PROFILES
), patch(
"homeassistant.components.nextdns.NextDns.get_analytics_status",
return_value=STATUS,
), patch(
"homeassistant.components.nextdns.NextDns.get_analytics_encryption",
return_value=ENCRYPTION,
), patch(
"homeassistant.components.nextdns.NextDns.get_analytics_dnssec",
return_value=DNSSEC,
), patch(
"homeassistant.components.nextdns.NextDns.get_analytics_ip_versions",
return_value=IP_VERSIONS,
), patch(
"homeassistant.components.nextdns.NextDns.get_analytics_protocols",
return_value=PROTOCOLS,
):
entry.add_to_hass(hass)
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
return entry

View File

@ -0,0 +1,100 @@
"""Define tests for the NextDNS config flow."""
import asyncio
from unittest.mock import patch
from nextdns import ApiError, InvalidApiKeyError
import pytest
from homeassistant import data_entry_flow
from homeassistant.components.nextdns.const import (
CONF_PROFILE_ID,
CONF_PROFILE_NAME,
DOMAIN,
)
from homeassistant.config_entries import SOURCE_USER
from homeassistant.const import CONF_API_KEY
from . import PROFILES, init_integration
async def test_form_create_entry(hass):
"""Test that the user step works."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == SOURCE_USER
assert result["errors"] == {}
with patch(
"homeassistant.components.nextdns.NextDns.get_profiles", return_value=PROFILES
), patch(
"homeassistant.components.nextdns.async_setup_entry", return_value=True
) as mock_setup_entry:
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_API_KEY: "fake_api_key"},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "profiles"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_PROFILE_NAME: "Fake Profile"}
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "Fake Profile"
assert result["data"][CONF_API_KEY] == "fake_api_key"
assert result["data"][CONF_PROFILE_ID] == "xyz12"
assert len(mock_setup_entry.mock_calls) == 1
@pytest.mark.parametrize(
"error",
[
(ApiError("API Error"), "cannot_connect"),
(InvalidApiKeyError, "invalid_api_key"),
(asyncio.TimeoutError, "cannot_connect"),
(ValueError, "unknown"),
],
)
async def test_form_errors(hass, error):
"""Test we handle errors."""
exc, base_error = error
with patch(
"homeassistant.components.nextdns.NextDns.get_profiles", side_effect=exc
):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
data={CONF_API_KEY: "fake_api_key"},
)
assert result["errors"] == {"base": base_error}
async def test_form_already_configured(hass):
"""Test that errors are shown when duplicates are added."""
entry = await init_integration(hass)
entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
with patch(
"homeassistant.components.nextdns.NextDns.get_profiles", return_value=PROFILES
):
await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_API_KEY: "fake_api_key"},
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_PROFILE_NAME: "Fake Profile"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"

View File

@ -0,0 +1,47 @@
"""Test init of NextDNS integration."""
from unittest.mock import patch
from nextdns import ApiError
from homeassistant.components.nextdns.const import DOMAIN
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import STATE_UNAVAILABLE
from . import init_integration
async def test_async_setup_entry(hass):
"""Test a successful setup entry."""
await init_integration(hass)
state = hass.states.get("sensor.fake_profile_dns_queries_blocked_ratio")
assert state is not None
assert state.state != STATE_UNAVAILABLE
assert state.state == "20.0"
async def test_config_not_ready(hass):
"""Test for setup failure if the connection to the service fails."""
entry = await init_integration(hass, add_to_hass=False)
with patch(
"homeassistant.components.nextdns.NextDns.get_profiles",
side_effect=ApiError("API Error"),
):
entry.add_to_hass(hass)
await hass.config_entries.async_setup(entry.entry_id)
assert entry.state is ConfigEntryState.SETUP_RETRY
async def test_unload_entry(hass):
"""Test successful unload of entry."""
entry = await init_integration(hass)
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
assert entry.state is ConfigEntryState.LOADED
assert await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
assert entry.state is ConfigEntryState.NOT_LOADED
assert not hass.data.get(DOMAIN)

View File

@ -0,0 +1,502 @@
"""Test sensor of NextDNS integration."""
from datetime import timedelta
from unittest.mock import patch
from nextdns import ApiError
from homeassistant.components.nextdns.const import DOMAIN
from homeassistant.components.sensor import (
ATTR_STATE_CLASS,
DOMAIN as SENSOR_DOMAIN,
SensorStateClass,
)
from homeassistant.const import ATTR_UNIT_OF_MEASUREMENT, PERCENTAGE, STATE_UNAVAILABLE
from homeassistant.helpers import entity_registry as er
from homeassistant.util.dt import utcnow
from . import DNSSEC, ENCRYPTION, IP_VERSIONS, PROTOCOLS, STATUS, init_integration
from tests.common import async_fire_time_changed
async def test_sensor(hass):
"""Test states of sensors."""
registry = er.async_get(hass)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_doh_queries",
suggested_object_id="fake_profile_dns_over_https_queries",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_doh_queries_ratio",
suggested_object_id="fake_profile_dns_over_https_queries_ratio",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_doq_queries",
suggested_object_id="fake_profile_dns_over_quic_queries",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_doq_queries_ratio",
suggested_object_id="fake_profile_dns_over_quic_queries_ratio",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_dot_queries",
suggested_object_id="fake_profile_dns_over_tls_queries",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_dot_queries_ratio",
suggested_object_id="fake_profile_dns_over_tls_queries_ratio",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_not_validated_queries",
suggested_object_id="fake_profile_dnssec_not_validated_queries",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_validated_queries",
suggested_object_id="fake_profile_dnssec_validated_queries",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_validated_queries_ratio",
suggested_object_id="fake_profile_dnssec_validated_queries_ratio",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_encrypted_queries",
suggested_object_id="fake_profile_encrypted_queries",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_encrypted_queries_ratio",
suggested_object_id="fake_profile_encrypted_queries_ratio",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_ipv4_queries",
suggested_object_id="fake_profile_ipv4_queries",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_ipv6_queries",
suggested_object_id="fake_profile_ipv6_queries",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_ipv6_queries_ratio",
suggested_object_id="fake_profile_ipv6_queries_ratio",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_udp_queries",
suggested_object_id="fake_profile_udp_queries",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_udp_queries_ratio",
suggested_object_id="fake_profile_udp_queries_ratio",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_unencrypted_queries",
suggested_object_id="fake_profile_unencrypted_queries",
disabled_by=None,
)
await init_integration(hass)
state = hass.states.get("sensor.fake_profile_dns_queries")
assert state
assert state.state == "100"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "queries"
entry = registry.async_get("sensor.fake_profile_dns_queries")
assert entry
assert entry.unique_id == "xyz12_all_queries"
state = hass.states.get("sensor.fake_profile_dns_queries_blocked")
assert state
assert state.state == "20"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "queries"
entry = registry.async_get("sensor.fake_profile_dns_queries_blocked")
assert entry
assert entry.unique_id == "xyz12_blocked_queries"
state = hass.states.get("sensor.fake_profile_dns_queries_blocked_ratio")
assert state
assert state.state == "20.0"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == PERCENTAGE
entry = registry.async_get("sensor.fake_profile_dns_queries_blocked_ratio")
assert entry
assert entry.unique_id == "xyz12_blocked_queries_ratio"
state = hass.states.get("sensor.fake_profile_dns_queries_relayed")
assert state
assert state.state == "10"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "queries"
entry = registry.async_get("sensor.fake_profile_dns_queries_relayed")
assert entry
assert entry.unique_id == "xyz12_relayed_queries"
state = hass.states.get("sensor.fake_profile_dns_over_https_queries")
assert state
assert state.state == "20"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "queries"
entry = registry.async_get("sensor.fake_profile_dns_over_https_queries")
assert entry
assert entry.unique_id == "xyz12_doh_queries"
state = hass.states.get("sensor.fake_profile_dns_over_https_queries_ratio")
assert state
assert state.state == "22.2"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == PERCENTAGE
entry = registry.async_get("sensor.fake_profile_dns_over_https_queries_ratio")
assert entry
assert entry.unique_id == "xyz12_doh_queries_ratio"
state = hass.states.get("sensor.fake_profile_dns_over_quic_queries")
assert state
assert state.state == "10"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "queries"
entry = registry.async_get("sensor.fake_profile_dns_over_quic_queries")
assert entry
assert entry.unique_id == "xyz12_doq_queries"
state = hass.states.get("sensor.fake_profile_dns_over_quic_queries_ratio")
assert state
assert state.state == "11.1"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == PERCENTAGE
entry = registry.async_get("sensor.fake_profile_dns_over_quic_queries_ratio")
assert entry
assert entry.unique_id == "xyz12_doq_queries_ratio"
state = hass.states.get("sensor.fake_profile_dns_over_tls_queries")
assert state
assert state.state == "30"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "queries"
entry = registry.async_get("sensor.fake_profile_dns_over_tls_queries")
assert entry
assert entry.unique_id == "xyz12_dot_queries"
state = hass.states.get("sensor.fake_profile_dns_over_tls_queries_ratio")
assert state
assert state.state == "33.3"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == PERCENTAGE
entry = registry.async_get("sensor.fake_profile_dns_over_tls_queries_ratio")
assert entry
assert entry.unique_id == "xyz12_dot_queries_ratio"
state = hass.states.get("sensor.fake_profile_dnssec_not_validated_queries")
assert state
assert state.state == "25"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "queries"
entry = registry.async_get("sensor.fake_profile_dnssec_not_validated_queries")
assert entry
assert entry.unique_id == "xyz12_not_validated_queries"
state = hass.states.get("sensor.fake_profile_dnssec_validated_queries")
assert state
assert state.state == "75"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "queries"
entry = registry.async_get("sensor.fake_profile_dnssec_validated_queries")
assert entry
assert entry.unique_id == "xyz12_validated_queries"
state = hass.states.get("sensor.fake_profile_dnssec_validated_queries_ratio")
assert state
assert state.state == "75.0"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == PERCENTAGE
entry = registry.async_get("sensor.fake_profile_dnssec_validated_queries_ratio")
assert entry
assert entry.unique_id == "xyz12_validated_queries_ratio"
state = hass.states.get("sensor.fake_profile_encrypted_queries")
assert state
assert state.state == "60"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "queries"
entry = registry.async_get("sensor.fake_profile_encrypted_queries")
assert entry
assert entry.unique_id == "xyz12_encrypted_queries"
state = hass.states.get("sensor.fake_profile_unencrypted_queries")
assert state
assert state.state == "40"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "queries"
entry = registry.async_get("sensor.fake_profile_unencrypted_queries")
assert entry
assert entry.unique_id == "xyz12_unencrypted_queries"
state = hass.states.get("sensor.fake_profile_encrypted_queries_ratio")
assert state
assert state.state == "60.0"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == PERCENTAGE
entry = registry.async_get("sensor.fake_profile_encrypted_queries_ratio")
assert entry
assert entry.unique_id == "xyz12_encrypted_queries_ratio"
state = hass.states.get("sensor.fake_profile_ipv4_queries")
assert state
assert state.state == "90"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "queries"
entry = registry.async_get("sensor.fake_profile_ipv4_queries")
assert entry
assert entry.unique_id == "xyz12_ipv4_queries"
state = hass.states.get("sensor.fake_profile_ipv6_queries")
assert state
assert state.state == "10"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "queries"
entry = registry.async_get("sensor.fake_profile_ipv6_queries")
assert entry
assert entry.unique_id == "xyz12_ipv6_queries"
state = hass.states.get("sensor.fake_profile_ipv6_queries_ratio")
assert state
assert state.state == "10.0"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == PERCENTAGE
entry = registry.async_get("sensor.fake_profile_ipv6_queries_ratio")
assert entry
assert entry.unique_id == "xyz12_ipv6_queries_ratio"
state = hass.states.get("sensor.fake_profile_udp_queries")
assert state
assert state.state == "40"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "queries"
entry = registry.async_get("sensor.fake_profile_udp_queries")
assert entry
assert entry.unique_id == "xyz12_udp_queries"
state = hass.states.get("sensor.fake_profile_udp_queries_ratio")
assert state
assert state.state == "44.4"
assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT
assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == PERCENTAGE
entry = registry.async_get("sensor.fake_profile_udp_queries_ratio")
assert entry
assert entry.unique_id == "xyz12_udp_queries_ratio"
async def test_availability(hass):
"""Ensure that we mark the entities unavailable correctly when service causes an error."""
registry = er.async_get(hass)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_doh_queries",
suggested_object_id="fake_profile_dns_over_https_queries",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_validated_queries",
suggested_object_id="fake_profile_dnssec_validated_queries",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_encrypted_queries",
suggested_object_id="fake_profile_encrypted_queries",
disabled_by=None,
)
registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"xyz12_ipv4_queries",
suggested_object_id="fake_profile_ipv4_queries",
disabled_by=None,
)
await init_integration(hass)
state = hass.states.get("sensor.fake_profile_dns_queries")
assert state
assert state.state != STATE_UNAVAILABLE
assert state.state == "100"
state = hass.states.get("sensor.fake_profile_dns_over_https_queries")
assert state
assert state.state != STATE_UNAVAILABLE
assert state.state == "20"
state = hass.states.get("sensor.fake_profile_dnssec_validated_queries")
assert state
assert state.state != STATE_UNAVAILABLE
assert state.state == "75"
state = hass.states.get("sensor.fake_profile_encrypted_queries")
assert state
assert state.state != STATE_UNAVAILABLE
assert state.state == "60"
state = hass.states.get("sensor.fake_profile_ipv4_queries")
assert state
assert state.state != STATE_UNAVAILABLE
assert state.state == "90"
future = utcnow() + timedelta(minutes=10)
with patch(
"homeassistant.components.nextdns.NextDns.get_analytics_status",
side_effect=ApiError("API Error"),
), patch(
"homeassistant.components.nextdns.NextDns.get_analytics_dnssec",
side_effect=ApiError("API Error"),
), patch(
"homeassistant.components.nextdns.NextDns.get_analytics_encryption",
side_effect=ApiError("API Error"),
), patch(
"homeassistant.components.nextdns.NextDns.get_analytics_ip_versions",
side_effect=ApiError("API Error"),
), patch(
"homeassistant.components.nextdns.NextDns.get_analytics_protocols",
side_effect=ApiError("API Error"),
):
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
state = hass.states.get("sensor.fake_profile_dns_queries")
assert state
assert state.state == STATE_UNAVAILABLE
state = hass.states.get("sensor.fake_profile_dns_over_https_queries")
assert state
assert state.state == STATE_UNAVAILABLE
state = hass.states.get("sensor.fake_profile_dnssec_validated_queries")
assert state
assert state.state == STATE_UNAVAILABLE
state = hass.states.get("sensor.fake_profile_encrypted_queries")
assert state
assert state.state == STATE_UNAVAILABLE
state = hass.states.get("sensor.fake_profile_ipv4_queries")
assert state
assert state.state == STATE_UNAVAILABLE
future = utcnow() + timedelta(minutes=20)
with patch(
"homeassistant.components.nextdns.NextDns.get_analytics_status",
return_value=STATUS,
), patch(
"homeassistant.components.nextdns.NextDns.get_analytics_encryption",
return_value=ENCRYPTION,
), patch(
"homeassistant.components.nextdns.NextDns.get_analytics_dnssec",
return_value=DNSSEC,
), patch(
"homeassistant.components.nextdns.NextDns.get_analytics_ip_versions",
return_value=IP_VERSIONS,
), patch(
"homeassistant.components.nextdns.NextDns.get_analytics_protocols",
return_value=PROTOCOLS,
):
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
state = hass.states.get("sensor.fake_profile_dns_queries")
assert state
assert state.state != STATE_UNAVAILABLE
assert state.state == "100"
state = hass.states.get("sensor.fake_profile_dns_over_https_queries")
assert state
assert state.state != STATE_UNAVAILABLE
assert state.state == "20"
state = hass.states.get("sensor.fake_profile_dnssec_validated_queries")
assert state
assert state.state != STATE_UNAVAILABLE
assert state.state == "75"
state = hass.states.get("sensor.fake_profile_encrypted_queries")
assert state
assert state.state != STATE_UNAVAILABLE
assert state.state == "60"
state = hass.states.get("sensor.fake_profile_ipv4_queries")
assert state
assert state.state != STATE_UNAVAILABLE
assert state.state == "90"