Replace whois library in Whois integration (#63227)
parent
51c8e041ea
commit
20a277c0ab
|
@ -4,6 +4,13 @@ from __future__ import annotations
|
|||
from typing import Any
|
||||
|
||||
import voluptuous as vol
|
||||
import whois
|
||||
from whois.exceptions import (
|
||||
FailedParsingWhoisOutput,
|
||||
UnknownDateFormat,
|
||||
UnknownTld,
|
||||
WhoisCommandFailed,
|
||||
)
|
||||
|
||||
from homeassistant.config_entries import ConfigFlow
|
||||
from homeassistant.const import CONF_DOMAIN, CONF_NAME
|
||||
|
@ -23,23 +30,44 @@ class WhoisFlowHandler(ConfigFlow, domain=DOMAIN):
|
|||
self, user_input: dict[str, Any] | None = None
|
||||
) -> FlowResult:
|
||||
"""Handle a flow initialized by the user."""
|
||||
errors = {}
|
||||
|
||||
if user_input is not None:
|
||||
await self.async_set_unique_id(user_input[CONF_DOMAIN].lower())
|
||||
domain = user_input[CONF_DOMAIN].lower()
|
||||
|
||||
await self.async_set_unique_id(domain)
|
||||
self._abort_if_unique_id_configured()
|
||||
return self.async_create_entry(
|
||||
title=self.imported_name or user_input[CONF_DOMAIN],
|
||||
data={
|
||||
CONF_DOMAIN: user_input[CONF_DOMAIN].lower(),
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
await self.hass.async_add_executor_job(whois.query, domain)
|
||||
except UnknownTld:
|
||||
errors["base"] = "unknown_tld"
|
||||
except WhoisCommandFailed:
|
||||
errors["base"] = "whois_command_failed"
|
||||
except FailedParsingWhoisOutput:
|
||||
errors["base"] = "unexpected_response"
|
||||
except UnknownDateFormat:
|
||||
errors["base"] = "unknown_date_format"
|
||||
else:
|
||||
return self.async_create_entry(
|
||||
title=self.imported_name or user_input[CONF_DOMAIN],
|
||||
data={
|
||||
CONF_DOMAIN: domain,
|
||||
},
|
||||
)
|
||||
else:
|
||||
user_input = {}
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
data_schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_DOMAIN): str,
|
||||
vol.Required(
|
||||
CONF_DOMAIN, default=user_input.get(CONF_DOMAIN, "")
|
||||
): str,
|
||||
}
|
||||
),
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
async def async_step_import(self, config: dict[str, Any]) -> FlowResult:
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
"domain": "whois",
|
||||
"name": "Whois",
|
||||
"documentation": "https://www.home-assistant.io/integrations/whois",
|
||||
"requirements": ["python-whois==0.7.3"],
|
||||
"requirements": ["whois==0.9.13"],
|
||||
"config_flow": true,
|
||||
"codeowners": ["@frenck"],
|
||||
"iot_class": "cloud_polling"
|
||||
|
|
|
@ -5,6 +5,13 @@ from datetime import timedelta
|
|||
|
||||
import voluptuous as vol
|
||||
import whois
|
||||
from whois import Domain
|
||||
from whois.exceptions import (
|
||||
FailedParsingWhoisOutput,
|
||||
UnknownDateFormat,
|
||||
UnknownTld,
|
||||
WhoisCommandFailed,
|
||||
)
|
||||
|
||||
from homeassistant.components.sensor import PLATFORM_SCHEMA, SensorEntity
|
||||
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
|
||||
|
@ -64,13 +71,12 @@ async def async_setup_entry(
|
|||
"""Set up the platform from config_entry."""
|
||||
domain = entry.data[CONF_DOMAIN]
|
||||
try:
|
||||
info = await hass.async_add_executor_job(whois.whois, domain)
|
||||
except whois.BaseException as ex: # pylint: disable=broad-except
|
||||
LOGGER.error("Exception %s occurred during WHOIS lookup for %s", ex, domain)
|
||||
await hass.async_add_executor_job(whois.query, domain)
|
||||
except UnknownTld:
|
||||
LOGGER.error("Could not set up whois for %s, TLD is unknown", domain)
|
||||
return
|
||||
|
||||
if "expiration_date" not in info:
|
||||
LOGGER.error("WHOIS lookup for %s didn't contain an expiration date", domain)
|
||||
except (FailedParsingWhoisOutput, WhoisCommandFailed, UnknownDateFormat) as ex:
|
||||
LOGGER.error("Exception %s occurred during WHOIS lookup for %s", ex, domain)
|
||||
return
|
||||
|
||||
async_add_entities([WhoisSensor(domain)], True)
|
||||
|
@ -85,7 +91,6 @@ class WhoisSensor(SensorEntity):
|
|||
def __init__(self, domain: str) -> None:
|
||||
"""Initialize the sensor."""
|
||||
self._attr_name = domain
|
||||
self.whois = whois.whois
|
||||
self._domain = domain
|
||||
|
||||
def _empty_value_and_attributes(self) -> None:
|
||||
|
@ -96,50 +101,31 @@ class WhoisSensor(SensorEntity):
|
|||
def update(self) -> None:
|
||||
"""Get the current WHOIS data for the domain."""
|
||||
try:
|
||||
response = self.whois(self._domain)
|
||||
except whois.BaseException as ex: # pylint: disable=broad-except
|
||||
response: Domain | None = whois.query(self._domain)
|
||||
except (FailedParsingWhoisOutput, WhoisCommandFailed, UnknownDateFormat) as ex:
|
||||
LOGGER.error("Exception %s occurred during WHOIS lookup", ex)
|
||||
self._empty_value_and_attributes()
|
||||
return
|
||||
|
||||
if response:
|
||||
if "expiration_date" not in response:
|
||||
LOGGER.error(
|
||||
"Failed to find expiration_date in whois lookup response. "
|
||||
"Did find: %s",
|
||||
", ".join(response.keys()),
|
||||
)
|
||||
self._empty_value_and_attributes()
|
||||
return
|
||||
|
||||
if not response["expiration_date"]:
|
||||
LOGGER.error("Whois response contains empty expiration_date")
|
||||
if not response.expiration_date:
|
||||
LOGGER.error("Failed to find expiration_date in whois lookup response")
|
||||
self._empty_value_and_attributes()
|
||||
return
|
||||
|
||||
attrs = {}
|
||||
attrs[ATTR_EXPIRES] = response.expiration_date.isoformat()
|
||||
|
||||
expiration_date = response["expiration_date"]
|
||||
if isinstance(expiration_date, list):
|
||||
attrs[ATTR_EXPIRES] = expiration_date[0].isoformat()
|
||||
expiration_date = expiration_date[0]
|
||||
else:
|
||||
attrs[ATTR_EXPIRES] = expiration_date.isoformat()
|
||||
if response.name_servers:
|
||||
attrs[ATTR_NAME_SERVERS] = " ".join(response.name_servers)
|
||||
|
||||
if "nameservers" in response:
|
||||
attrs[ATTR_NAME_SERVERS] = " ".join(response["nameservers"])
|
||||
if response.last_updated:
|
||||
attrs[ATTR_UPDATED] = response.last_updated.isoformat()
|
||||
|
||||
if "updated_date" in response:
|
||||
update_date = response["updated_date"]
|
||||
if isinstance(update_date, list):
|
||||
attrs[ATTR_UPDATED] = update_date[0].isoformat()
|
||||
else:
|
||||
attrs[ATTR_UPDATED] = update_date.isoformat()
|
||||
if response.registrar:
|
||||
attrs[ATTR_REGISTRAR] = response.registrar
|
||||
|
||||
if "registrar" in response:
|
||||
attrs[ATTR_REGISTRAR] = response["registrar"]
|
||||
|
||||
time_delta = expiration_date - expiration_date.now()
|
||||
time_delta = response.expiration_date - response.expiration_date.now()
|
||||
|
||||
self._attr_extra_state_attributes = attrs
|
||||
self._attr_native_value = time_delta.days
|
||||
|
|
|
@ -7,6 +7,12 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"error": {
|
||||
"unexpected_response": "Unexpected response from whois server",
|
||||
"unknown_date_format": "Unknown date format in whois server response",
|
||||
"unknown_tld": "The given TLD is unknown or not available to this integration",
|
||||
"whois_command_failed": "Whois command failed: could not retrieve whois information"
|
||||
},
|
||||
"abort": {
|
||||
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]"
|
||||
}
|
||||
|
|
|
@ -3,6 +3,12 @@
|
|||
"abort": {
|
||||
"already_configured": "Service is already configured"
|
||||
},
|
||||
"error": {
|
||||
"unexpected_response": "Unexpected response from whois server",
|
||||
"unknown_date_format": "Unknown date format in whois server response",
|
||||
"unknown_tld": "The given TLD is unknown or not available to this integration",
|
||||
"whois_command_failed": "Whois command failed; could not retrieve whois information"
|
||||
},
|
||||
"step": {
|
||||
"user": {
|
||||
"data": {
|
||||
|
|
|
@ -1986,9 +1986,6 @@ python-twitch-client==0.6.0
|
|||
# homeassistant.components.vlc
|
||||
python-vlc==1.1.2
|
||||
|
||||
# homeassistant.components.whois
|
||||
python-whois==0.7.3
|
||||
|
||||
# homeassistant.components.awair
|
||||
python_awair==0.2.1
|
||||
|
||||
|
@ -2468,6 +2465,9 @@ webexteamssdk==1.1.1
|
|||
# homeassistant.components.whirlpool
|
||||
whirlpool-sixth-sense==0.15.1
|
||||
|
||||
# homeassistant.components.whois
|
||||
whois==0.9.13
|
||||
|
||||
# homeassistant.components.wiffi
|
||||
wiffi==1.1.0
|
||||
|
||||
|
|
|
@ -1223,9 +1223,6 @@ python-tado==0.12.0
|
|||
# homeassistant.components.twitch
|
||||
python-twitch-client==0.6.0
|
||||
|
||||
# homeassistant.components.whois
|
||||
python-whois==0.7.3
|
||||
|
||||
# homeassistant.components.awair
|
||||
python_awair==0.2.1
|
||||
|
||||
|
@ -1505,6 +1502,9 @@ watchdog==2.1.6
|
|||
# homeassistant.components.whirlpool
|
||||
whirlpool-sixth-sense==0.15.1
|
||||
|
||||
# homeassistant.components.whois
|
||||
whois==0.9.13
|
||||
|
||||
# homeassistant.components.wiffi
|
||||
wiffi==1.1.0
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator
|
||||
from unittest.mock import AsyncMock, patch
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
@ -25,6 +25,13 @@ def mock_config_entry() -> MockConfigEntry:
|
|||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_whois_config_flow() -> Generator[MagicMock, None, None]:
|
||||
"""Return a mocked whois."""
|
||||
with patch("homeassistant.components.whois.config_flow.whois.query") as whois_mock:
|
||||
yield whois_mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_setup_entry() -> Generator[AsyncMock, None, None]:
|
||||
"""Mock setting up a config entry."""
|
||||
|
|
|
@ -1,6 +1,13 @@
|
|||
"""Tests for the Whois config flow."""
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
from unittest.mock import AsyncMock
|
||||
import pytest
|
||||
from whois.exceptions import (
|
||||
FailedParsingWhoisOutput,
|
||||
UnknownDateFormat,
|
||||
UnknownTld,
|
||||
WhoisCommandFailed,
|
||||
)
|
||||
|
||||
from homeassistant.components.whois.const import DOMAIN
|
||||
from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER
|
||||
|
@ -18,6 +25,7 @@ from tests.common import MockConfigEntry
|
|||
async def test_full_user_flow(
|
||||
hass: HomeAssistant,
|
||||
mock_setup_entry: AsyncMock,
|
||||
mock_whois_config_flow: MagicMock,
|
||||
) -> None:
|
||||
"""Test the full user configuration flow."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
|
@ -40,10 +48,68 @@ async def test_full_user_flow(
|
|||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"throw,reason",
|
||||
[
|
||||
(UnknownTld, "unknown_tld"),
|
||||
(FailedParsingWhoisOutput, "unexpected_response"),
|
||||
(UnknownDateFormat, "unknown_date_format"),
|
||||
(WhoisCommandFailed, "whois_command_failed"),
|
||||
],
|
||||
)
|
||||
async def test_full_flow_with_error(
|
||||
hass: HomeAssistant,
|
||||
mock_setup_entry: AsyncMock,
|
||||
mock_whois_config_flow: MagicMock,
|
||||
throw: Exception,
|
||||
reason: str,
|
||||
) -> None:
|
||||
"""Test the full user configuration flow with an error.
|
||||
|
||||
This tests tests a full config flow, with an error happening; allowing
|
||||
the user to fix the error and try again.
|
||||
"""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": SOURCE_USER}
|
||||
)
|
||||
|
||||
assert result.get("type") == RESULT_TYPE_FORM
|
||||
assert result.get("step_id") == SOURCE_USER
|
||||
assert "flow_id" in result
|
||||
|
||||
mock_whois_config_flow.side_effect = throw
|
||||
result2 = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
user_input={CONF_DOMAIN: "Example.com"},
|
||||
)
|
||||
|
||||
assert result2.get("type") == RESULT_TYPE_FORM
|
||||
assert result2.get("step_id") == SOURCE_USER
|
||||
assert result2.get("errors") == {"base": reason}
|
||||
assert "flow_id" in result2
|
||||
|
||||
assert len(mock_setup_entry.mock_calls) == 0
|
||||
assert len(mock_whois_config_flow.mock_calls) == 1
|
||||
|
||||
mock_whois_config_flow.side_effect = None
|
||||
result3 = await hass.config_entries.flow.async_configure(
|
||||
result2["flow_id"],
|
||||
user_input={CONF_DOMAIN: "Example.com"},
|
||||
)
|
||||
|
||||
assert result3.get("type") == RESULT_TYPE_CREATE_ENTRY
|
||||
assert result3.get("title") == "Example.com"
|
||||
assert result3.get("data") == {CONF_DOMAIN: "example.com"}
|
||||
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
assert len(mock_whois_config_flow.mock_calls) == 2
|
||||
|
||||
|
||||
async def test_already_configured(
|
||||
hass: HomeAssistant,
|
||||
mock_setup_entry: AsyncMock,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
mock_whois_config_flow: MagicMock,
|
||||
) -> None:
|
||||
"""Test we abort if already configured."""
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
|
@ -63,6 +129,7 @@ async def test_already_configured(
|
|||
async def test_import_flow(
|
||||
hass: HomeAssistant,
|
||||
mock_setup_entry: AsyncMock,
|
||||
mock_whois_config_flow: MagicMock,
|
||||
) -> None:
|
||||
"""Test the import configuration flow."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
|
|
Loading…
Reference in New Issue