Speedtestdotnet code cleanup and type hints (#52533)

pull/53337/head
Rami Mosleh 2021-07-22 13:25:54 +03:00 committed by GitHub
parent 7768f53281
commit 1a450c2084
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 287 additions and 237 deletions

View File

@ -969,7 +969,6 @@ omit =
homeassistant/components/sonos/*
homeassistant/components/sony_projector/switch.py
homeassistant/components/spc/*
homeassistant/components/speedtestdotnet/*
homeassistant/components/spider/*
homeassistant/components/splunk/*
homeassistant/components/spotify/__init__.py

View File

@ -1,19 +1,22 @@
"""Support for testing internet speed via Speedtest.net."""
from __future__ import annotations
from datetime import timedelta
import logging
import speedtest
import voluptuous as vol
from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import (
CONF_MONITORED_CONDITIONS,
CONF_SCAN_INTERVAL,
EVENT_HOMEASSISTANT_STARTED,
)
from homeassistant.core import CoreState, callback
from homeassistant.core import CoreState, HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.typing import ConfigType
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
@ -22,6 +25,7 @@ from .const import (
DEFAULT_SCAN_INTERVAL,
DEFAULT_SERVER,
DOMAIN,
PLATFORMS,
SENSOR_TYPES,
SPEED_TEST_SERVICE,
)
@ -51,10 +55,8 @@ CONFIG_SCHEMA = vol.Schema(
extra=vol.ALLOW_EXTRA,
)
PLATFORMS = ["sensor"]
def server_id_valid(server_id):
def server_id_valid(server_id: str) -> bool:
"""Check if server_id is valid."""
try:
api = speedtest.Speedtest()
@ -65,7 +67,7 @@ def server_id_valid(server_id):
return True
async def async_setup(hass, config):
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Import integration from config."""
if DOMAIN in config:
hass.async_create_task(
@ -76,7 +78,7 @@ async def async_setup(hass, config):
return True
async def async_setup_entry(hass, config_entry):
async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Set up the Speedtest.net component."""
coordinator = SpeedTestDataCoordinator(hass, config_entry)
await coordinator.async_setup()
@ -88,11 +90,9 @@ async def async_setup_entry(hass, config_entry):
)
await coordinator.async_refresh()
if not config_entry.options[CONF_MANUAL]:
if not config_entry.options.get(CONF_MANUAL, False):
if hass.state == CoreState.running:
await _enable_scheduled_speedtests()
if not coordinator.last_update_success:
raise ConfigEntryNotReady
else:
# Running a speed test during startup can prevent
# integrations from being able to setup because it
@ -108,12 +108,10 @@ async def async_setup_entry(hass, config_entry):
return True
async def async_unload_entry(hass, config_entry):
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Unload SpeedTest Entry from config_entry."""
hass.services.async_remove(DOMAIN, SPEED_TEST_SERVICE)
hass.data[DOMAIN].async_unload()
unload_ok = await hass.config_entries.async_unload_platforms(
config_entry, PLATFORMS
)
@ -125,13 +123,12 @@ async def async_unload_entry(hass, config_entry):
class SpeedTestDataCoordinator(DataUpdateCoordinator):
"""Get the latest data from speedtest.net."""
def __init__(self, hass, config_entry):
def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None:
"""Initialize the data object."""
self.hass = hass
self.config_entry = config_entry
self.api = None
self.servers = {}
self._unsub_update_listener = None
self.config_entry: ConfigEntry = config_entry
self.api: speedtest.Speedtest | None = None
self.servers: dict[str, dict] = {DEFAULT_SERVER: {}}
super().__init__(
self.hass,
_LOGGER,
@ -141,51 +138,49 @@ class SpeedTestDataCoordinator(DataUpdateCoordinator):
def update_servers(self):
"""Update list of test servers."""
try:
server_list = self.api.get_servers()
except speedtest.ConfigRetrievalError:
_LOGGER.debug("Error retrieving server list")
return
self.servers[DEFAULT_SERVER] = {}
for server in sorted(
server_list.values(),
key=lambda server: server[0]["country"] + server[0]["sponsor"],
):
self.servers[
f"{server[0]['country']} - {server[0]['sponsor']} - {server[0]['name']}"
] = server[0]
test_servers = self.api.get_servers()
test_servers_list = []
for servers in test_servers.values():
for server in servers:
test_servers_list.append(server)
if test_servers_list:
for server in sorted(
test_servers_list,
key=lambda server: (
server["country"],
server["name"],
server["sponsor"],
),
):
self.servers[
f"{server['country']} - {server['sponsor']} - {server['name']}"
] = server
def update_data(self):
"""Get the latest data from speedtest.net."""
self.update_servers()
self.api.closest.clear()
if self.config_entry.options.get(CONF_SERVER_ID):
server_id = self.config_entry.options.get(CONF_SERVER_ID)
self.api.get_servers(servers=[server_id])
try:
self.api.get_best_server()
except speedtest.SpeedtestBestServerFailure as err:
raise UpdateFailed(
"Failed to retrieve best server for speedtest", err
) from err
best_server = self.api.get_best_server()
_LOGGER.debug(
"Executing speedtest.net speed test with server_id: %s",
self.api.best["id"],
best_server["id"],
)
self.api.download()
self.api.upload()
return self.api.results.dict()
async def async_update(self, *_):
async def async_update(self) -> dict[str, str]:
"""Update Speedtest data."""
try:
return await self.hass.async_add_executor_job(self.update_data)
except (speedtest.ConfigRetrievalError, speedtest.NoMatchedServers) as err:
raise UpdateFailed from err
except speedtest.NoMatchedServers as err:
raise UpdateFailed("Selected server is not found.") from err
except speedtest.SpeedtestException as err:
raise UpdateFailed(err) from err
async def async_set_options(self):
"""Set options for entry."""
@ -200,11 +195,12 @@ class SpeedTestDataCoordinator(DataUpdateCoordinator):
self.config_entry, data=data, options=options
)
async def async_setup(self):
async def async_setup(self) -> None:
"""Set up SpeedTest."""
try:
self.api = await self.hass.async_add_executor_job(speedtest.Speedtest)
except speedtest.ConfigRetrievalError as err:
await self.hass.async_add_executor_job(self.update_servers)
except speedtest.SpeedtestException as err:
raise ConfigEntryNotReady from err
async def request_update(call):
@ -213,24 +209,14 @@ class SpeedTestDataCoordinator(DataUpdateCoordinator):
await self.async_set_options()
await self.hass.async_add_executor_job(self.update_servers)
self.hass.services.async_register(DOMAIN, SPEED_TEST_SERVICE, request_update)
self._unsub_update_listener = self.config_entry.add_update_listener(
options_updated_listener
self.config_entry.async_on_unload(
self.config_entry.add_update_listener(options_updated_listener)
)
@callback
def async_unload(self):
"""Unload the coordinator."""
if not self._unsub_update_listener:
return
self._unsub_update_listener()
self._unsub_update_listener = None
async def options_updated_listener(hass, entry):
async def options_updated_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update."""
if entry.options[CONF_MANUAL]:
hass.data[DOMAIN].update_interval = None

View File

@ -1,9 +1,14 @@
"""Config flow for Speedtest.net."""
from __future__ import annotations
from typing import Any
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_MONITORED_CONDITIONS, CONF_SCAN_INTERVAL
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
from . import server_id_valid
from .const import (
@ -24,11 +29,15 @@ class SpeedTestFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
@staticmethod
@callback
def async_get_options_flow(config_entry):
def async_get_options_flow(
config_entry: config_entries.ConfigEntry,
) -> config_entries.OptionsFlow:
"""Get the options flow for this handler."""
return SpeedTestOptionsFlowHandler(config_entry)
async def async_step_user(self, user_input=None):
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle a flow initialized by the user."""
if self._async_current_entries():
return self.async_abort(reason="single_instance_allowed")
@ -59,14 +68,16 @@ class SpeedTestFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
class SpeedTestOptionsFlowHandler(config_entries.OptionsFlow):
"""Handle SpeedTest options."""
def __init__(self, config_entry):
def __init__(self, config_entry: config_entries.ConfigEntry) -> None:
"""Initialize options flow."""
self.config_entry = config_entry
self._servers = {}
self._servers: dict = {}
async def async_step_init(self, user_input=None):
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Manage the options."""
errors = {}
errors: dict[str, str] = {}
if user_input is not None:
server_name = user_input[CONF_SERVER_NAME]

View File

@ -1,32 +1,35 @@
"""Consts used by Speedtest.net."""
from typing import Final
from homeassistant.const import DATA_RATE_MEGABITS_PER_SECOND, TIME_MILLISECONDS
DOMAIN = "speedtestdotnet"
DOMAIN: Final = "speedtestdotnet"
SPEED_TEST_SERVICE = "speedtest"
DATA_UPDATED = f"{DOMAIN}_data_updated"
SPEED_TEST_SERVICE: Final = "speedtest"
SENSOR_TYPES = {
SENSOR_TYPES: Final = {
"ping": ["Ping", TIME_MILLISECONDS],
"download": ["Download", DATA_RATE_MEGABITS_PER_SECOND],
"upload": ["Upload", DATA_RATE_MEGABITS_PER_SECOND],
}
CONF_SERVER_NAME = "server_name"
CONF_SERVER_ID = "server_id"
CONF_MANUAL = "manual"
CONF_SERVER_NAME: Final = "server_name"
CONF_SERVER_ID: Final = "server_id"
CONF_MANUAL: Final = "manual"
ATTR_BYTES_RECEIVED = "bytes_received"
ATTR_BYTES_SENT = "bytes_sent"
ATTR_SERVER_COUNTRY = "server_country"
ATTR_SERVER_ID = "server_id"
ATTR_SERVER_NAME = "server_name"
ATTR_BYTES_RECEIVED: Final = "bytes_received"
ATTR_BYTES_SENT: Final = "bytes_sent"
ATTR_SERVER_COUNTRY: Final = "server_country"
ATTR_SERVER_ID: Final = "server_id"
ATTR_SERVER_NAME: Final = "server_name"
DEFAULT_NAME = "SpeedTest"
DEFAULT_SCAN_INTERVAL = 60
DEFAULT_SERVER = "*Auto Detect"
DEFAULT_NAME: Final = "SpeedTest"
DEFAULT_SCAN_INTERVAL: Final = 60
DEFAULT_SERVER: Final = "*Auto Detect"
ATTRIBUTION = "Data retrieved from Speedtest.net by Ookla"
ATTRIBUTION: Final = "Data retrieved from Speedtest.net by Ookla"
ICON = "mdi:speedometer"
ICON: Final = "mdi:speedometer"
PLATFORMS: Final = ["sensor"]

View File

@ -54,26 +54,28 @@ class SpeedtestSensor(CoordinatorEntity, RestoreEntity, SensorEntity):
self._attr_name = f"{DEFAULT_NAME} {SENSOR_TYPES[sensor_type][0]}"
self._attr_unit_of_measurement = SENSOR_TYPES[self.type][1]
self._attr_unique_id = sensor_type
self._attrs = {ATTR_ATTRIBUTION: ATTRIBUTION}
@property
def extra_state_attributes(self) -> dict[str, Any] | None:
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes."""
if not self.coordinator.data:
return None
if self.coordinator.data:
self._attrs.update(
{
ATTR_SERVER_NAME: self.coordinator.data["server"]["name"],
ATTR_SERVER_COUNTRY: self.coordinator.data["server"]["country"],
ATTR_SERVER_ID: self.coordinator.data["server"]["id"],
}
)
attributes = {
ATTR_ATTRIBUTION: ATTRIBUTION,
ATTR_SERVER_NAME: self.coordinator.data["server"]["name"],
ATTR_SERVER_COUNTRY: self.coordinator.data["server"]["country"],
ATTR_SERVER_ID: self.coordinator.data["server"]["id"],
}
if self.type == "download":
self._attrs[ATTR_BYTES_RECEIVED] = self.coordinator.data[
"bytes_received"
]
elif self.type == "upload":
self._attrs[ATTR_BYTES_SENT] = self.coordinator.data["bytes_sent"]
if self.type == "download":
attributes[ATTR_BYTES_RECEIVED] = self.coordinator.data["bytes_received"]
elif self.type == "upload":
attributes[ATTR_BYTES_SENT] = self.coordinator.data["bytes_sent"]
return attributes
return self._attrs
async def async_added_to_hass(self) -> None:
"""Handle entity which will be added."""
@ -91,14 +93,12 @@ class SpeedtestSensor(CoordinatorEntity, RestoreEntity, SensorEntity):
self.async_on_remove(self.coordinator.async_add_listener(update))
self._update_state()
def _update_state(self) -> None:
def _update_state(self):
"""Update sensors state."""
if not self.coordinator.data:
return
if self.type == "ping":
self._attr_state = self.coordinator.data["ping"]
elif self.type == "download":
self._attr_state = round(self.coordinator.data["download"] / 10 ** 6, 2)
elif self.type == "upload":
self._attr_state = round(self.coordinator.data["upload"] / 10 ** 6, 2)
if self.coordinator.data:
if self.type == "ping":
self._attr_state = self.coordinator.data["ping"]
elif self.type == "download":
self._attr_state = round(self.coordinator.data["download"] / 10 ** 6, 2)
elif self.type == "upload":
self._attr_state = round(self.coordinator.data["upload"] / 10 ** 6, 2)

View File

@ -6,8 +6,7 @@
}
},
"abort": {
"single_instance_allowed": "[%key:common::config_flow::abort::single_instance_allowed%]",
"wrong_server_id": "Server ID is not valid"
"single_instance_allowed": "[%key:common::config_flow::abort::single_instance_allowed%]"
}
},
"options": {
@ -21,4 +20,4 @@
}
}
}
}
}

View File

@ -1,8 +1,7 @@
{
"config": {
"abort": {
"single_instance_allowed": "Already configured. Only a single configuration possible.",
"wrong_server_id": "Server ID is not valid"
"single_instance_allowed": "Already configured. Only a single configuration possible."
},
"step": {
"user": {

View File

@ -0,0 +1,16 @@
"""Conftest for speedtestdotnet."""
from unittest.mock import patch
import pytest
from tests.components.speedtestdotnet import MOCK_RESULTS, MOCK_SERVERS
@pytest.fixture(autouse=True)
def mock_api():
"""Mock entry setup."""
with patch("speedtest.Speedtest") as mock_api:
mock_api.return_value.get_servers.return_value = MOCK_SERVERS
mock_api.return_value.get_best_server.return_value = MOCK_SERVERS[1][0]
mock_api.return_value.results.dict.return_value = MOCK_RESULTS
yield mock_api

View File

@ -1,8 +1,7 @@
"""Tests for SpeedTest config flow."""
from datetime import timedelta
from unittest.mock import patch
from unittest.mock import MagicMock
import pytest
from speedtest import NoMatchedServers
from homeassistant import config_entries, data_entry_flow
@ -15,23 +14,12 @@ from homeassistant.components.speedtestdotnet.const import (
SENSOR_TYPES,
)
from homeassistant.const import CONF_MONITORED_CONDITIONS, CONF_SCAN_INTERVAL
from . import MOCK_SERVERS
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
@pytest.fixture(name="mock_setup")
def mock_setup():
"""Mock entry setup."""
with patch(
"homeassistant.components.speedtestdotnet.async_setup_entry",
return_value=True,
):
yield
async def test_flow_works(hass, mock_setup):
async def test_flow_works(hass: HomeAssistant) -> None:
"""Test user config."""
result = await hass.config_entries.flow.async_init(
speedtestdotnet.DOMAIN, context={"source": config_entries.SOURCE_USER}
@ -43,92 +31,104 @@ async def test_flow_works(hass, mock_setup):
result["flow_id"], user_input={}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "SpeedTest"
async def test_import_fails(hass, mock_setup):
async def test_import_fails(hass: HomeAssistant, mock_api: MagicMock) -> None:
"""Test import step fails if server_id is not valid."""
with patch("speedtest.Speedtest") as mock_api:
mock_api.return_value.get_servers.side_effect = NoMatchedServers
result = await hass.config_entries.flow.async_init(
speedtestdotnet.DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
CONF_SERVER_ID: "223",
CONF_MANUAL: True,
CONF_SCAN_INTERVAL: timedelta(minutes=1),
CONF_MONITORED_CONDITIONS: list(SENSOR_TYPES),
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "wrong_server_id"
mock_api.return_value.get_servers.side_effect = NoMatchedServers
result = await hass.config_entries.flow.async_init(
speedtestdotnet.DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
CONF_SERVER_ID: "223",
CONF_MANUAL: True,
CONF_SCAN_INTERVAL: timedelta(minutes=1),
CONF_MONITORED_CONDITIONS: list(SENSOR_TYPES),
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "wrong_server_id"
async def test_import_success(hass, mock_setup):
async def test_import_success(hass):
"""Test import step is successful if server_id is valid."""
with patch("speedtest.Speedtest"):
result = await hass.config_entries.flow.async_init(
speedtestdotnet.DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
CONF_SERVER_ID: "1",
CONF_MANUAL: True,
CONF_SCAN_INTERVAL: timedelta(minutes=1),
CONF_MONITORED_CONDITIONS: list(SENSOR_TYPES),
},
)
result = await hass.config_entries.flow.async_init(
speedtestdotnet.DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
CONF_SERVER_ID: "1",
CONF_MANUAL: True,
CONF_SCAN_INTERVAL: timedelta(minutes=1),
CONF_MONITORED_CONDITIONS: list(SENSOR_TYPES),
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "SpeedTest"
assert result["data"][CONF_SERVER_ID] == "1"
assert result["data"][CONF_MANUAL] is True
assert result["data"][CONF_SCAN_INTERVAL] == 1
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "SpeedTest"
assert result["data"][CONF_SERVER_ID] == "1"
assert result["data"][CONF_MANUAL] is True
assert result["data"][CONF_SCAN_INTERVAL] == 1
async def test_options(hass):
async def test_options(hass: HomeAssistant, mock_api: MagicMock) -> None:
"""Test updating options."""
entry = MockConfigEntry(
domain=DOMAIN,
title="SpeedTest",
data={},
options={},
)
entry.add_to_hass(hass)
with patch("speedtest.Speedtest") as mock_api:
mock_api.return_value.get_servers.return_value = MOCK_SERVERS
await hass.config_entries.async_setup(entry.entry_id)
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
result = await hass.config_entries.options.async_init(entry.entry_id)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_init(entry.entry_id)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_SERVER_NAME: "Country1 - Sponsor1 - Server1",
CONF_SCAN_INTERVAL: 30,
CONF_MANUAL: False,
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["data"] == {
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_SERVER_NAME: "Country1 - Sponsor1 - Server1",
CONF_SCAN_INTERVAL: 30,
CONF_MANUAL: True,
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["data"] == {
CONF_SERVER_NAME: "Country1 - Sponsor1 - Server1",
CONF_SERVER_ID: "1",
CONF_SCAN_INTERVAL: 30,
CONF_MANUAL: True,
}
await hass.async_block_till_done()
assert hass.data[DOMAIN].update_interval is None
# test setting the option to update periodically
result2 = await hass.config_entries.options.async_init(entry.entry_id)
assert result2["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result2["step_id"] == "init"
result2 = await hass.config_entries.options.async_configure(
result2["flow_id"],
user_input={
CONF_SERVER_NAME: "Country1 - Sponsor1 - Server1",
CONF_SERVER_ID: "1",
CONF_SCAN_INTERVAL: 30,
CONF_MANUAL: False,
}
},
)
await hass.async_block_till_done()
assert hass.data[DOMAIN].update_interval == timedelta(minutes=30)
async def test_integration_already_configured(hass):
async def test_integration_already_configured(hass: HomeAssistant) -> None:
"""Test integration is already configured."""
entry = MockConfigEntry(
domain=DOMAIN,
data={},
options={},
)
entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(

View File

@ -1,79 +1,113 @@
"""Tests for SpeedTest integration."""
from unittest.mock import patch
from unittest.mock import MagicMock
import speedtest
from homeassistant import config_entries
from homeassistant.components import speedtestdotnet
from homeassistant.setup import async_setup_component
from homeassistant.components.speedtestdotnet.const import (
CONF_MANUAL,
CONF_SERVER_ID,
CONF_SERVER_NAME,
DOMAIN,
SPEED_TEST_SERVICE,
)
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import CONF_SCAN_INTERVAL, STATE_UNAVAILABLE
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
async def test_setup_with_config(hass):
"""Test that we import the config and setup the integration."""
config = {
speedtestdotnet.DOMAIN: {
speedtestdotnet.CONF_SERVER_ID: "1",
speedtestdotnet.CONF_MANUAL: True,
speedtestdotnet.CONF_SCAN_INTERVAL: "00:01:00",
}
}
with patch("speedtest.Speedtest"):
assert await async_setup_component(hass, speedtestdotnet.DOMAIN, config)
async def test_successful_config_entry(hass):
async def test_successful_config_entry(hass: HomeAssistant) -> None:
"""Test that SpeedTestDotNet is configured successfully."""
entry = MockConfigEntry(
domain=speedtestdotnet.DOMAIN,
domain=DOMAIN,
data={},
options={
CONF_SERVER_NAME: "Country1 - Sponsor1 - Server1",
CONF_SERVER_ID: "1",
CONF_SCAN_INTERVAL: 30,
CONF_MANUAL: False,
},
)
entry.add_to_hass(hass)
with patch("speedtest.Speedtest"), patch(
"homeassistant.config_entries.ConfigEntries.async_forward_entry_setup",
return_value=True,
) as forward_entry_setup:
await hass.config_entries.async_setup(entry.entry_id)
await hass.config_entries.async_setup(entry.entry_id)
assert entry.state is config_entries.ConfigEntryState.LOADED
assert forward_entry_setup.mock_calls[0][1] == (
entry,
"sensor",
)
assert entry.state == ConfigEntryState.LOADED
assert hass.data[DOMAIN]
assert hass.services.has_service(DOMAIN, SPEED_TEST_SERVICE)
async def test_setup_failed(hass):
async def test_setup_failed(hass: HomeAssistant, mock_api: MagicMock) -> None:
"""Test SpeedTestDotNet failed due to an error."""
entry = MockConfigEntry(
domain=speedtestdotnet.DOMAIN,
data={},
domain=DOMAIN,
)
entry.add_to_hass(hass)
with patch("speedtest.Speedtest", side_effect=speedtest.ConfigRetrievalError):
await hass.config_entries.async_setup(entry.entry_id)
assert entry.state is config_entries.ConfigEntryState.SETUP_RETRY
mock_api.side_effect = speedtest.ConfigRetrievalError
await hass.config_entries.async_setup(entry.entry_id)
assert entry.state is ConfigEntryState.SETUP_RETRY
async def test_unload_entry(hass):
async def test_unload_entry(hass: HomeAssistant) -> None:
"""Test removing SpeedTestDotNet."""
entry = MockConfigEntry(
domain=speedtestdotnet.DOMAIN,
data={},
domain=DOMAIN,
)
entry.add_to_hass(hass)
with patch("speedtest.Speedtest"):
await hass.config_entries.async_setup(entry.entry_id)
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
assert entry.state is config_entries.ConfigEntryState.NOT_LOADED
assert speedtestdotnet.DOMAIN not in hass.data
assert entry.state is ConfigEntryState.NOT_LOADED
assert DOMAIN not in hass.data
async def test_server_not_found(hass: HomeAssistant, mock_api: MagicMock) -> None:
"""Test configured server id is not found."""
entry = MockConfigEntry(
domain=DOMAIN,
)
entry.add_to_hass(hass)
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert hass.data[DOMAIN]
mock_api.return_value.get_servers.side_effect = speedtest.NoMatchedServers
await hass.data[DOMAIN].async_refresh()
await hass.async_block_till_done()
state = hass.states.get("sensor.speedtest_ping")
assert state is not None
assert state.state == STATE_UNAVAILABLE
async def test_get_best_server_error(hass: HomeAssistant, mock_api: MagicMock) -> None:
"""Test configured server id is not found."""
entry = MockConfigEntry(
domain=DOMAIN,
)
entry.add_to_hass(hass)
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert hass.data[DOMAIN]
mock_api.return_value.get_best_server.side_effect = (
speedtest.SpeedtestBestServerFailure(
"Unable to connect to servers to test latency."
)
)
await hass.data[DOMAIN].async_refresh()
await hass.async_block_till_done()
state = hass.states.get("sensor.speedtest_ping")
assert state is not None
assert state.state == STATE_UNAVAILABLE

View File

@ -1,26 +1,28 @@
"""Tests for SpeedTest sensors."""
from unittest.mock import patch
from unittest.mock import MagicMock
from homeassistant.components import speedtestdotnet
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.components.speedtestdotnet.const import DEFAULT_NAME, SENSOR_TYPES
from homeassistant.core import HomeAssistant
from . import MOCK_RESULTS, MOCK_SERVERS, MOCK_STATES
from tests.common import MockConfigEntry
async def test_speedtestdotnet_sensors(hass):
async def test_speedtestdotnet_sensors(
hass: HomeAssistant, mock_api: MagicMock
) -> None:
"""Test sensors created for speedtestdotnet integration."""
entry = MockConfigEntry(domain=speedtestdotnet.DOMAIN, data={})
entry.add_to_hass(hass)
with patch("speedtest.Speedtest") as mock_api:
mock_api.return_value.get_best_server.return_value = MOCK_SERVERS[1][0]
mock_api.return_value.results.dict.return_value = MOCK_RESULTS
mock_api.return_value.get_best_server.return_value = MOCK_SERVERS[1][0]
mock_api.return_value.results.dict.return_value = MOCK_RESULTS
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids(SENSOR_DOMAIN)) == 3
@ -28,4 +30,5 @@ async def test_speedtestdotnet_sensors(hass):
sensor = hass.states.get(
f"sensor.{DEFAULT_NAME}_{SENSOR_TYPES[sensor_type][0]}"
)
assert sensor
assert sensor.state == MOCK_STATES[sensor_type]