Prefer IPv4 locations over IPv6 locations for upnp devices/component (#103792)
parent
1e57bc5415
commit
39c81cb4b1
|
@ -117,6 +117,7 @@ class SsdpServiceInfo(BaseServiceInfo):
|
||||||
ssdp_ext: str | None = None
|
ssdp_ext: str | None = None
|
||||||
ssdp_server: str | None = None
|
ssdp_server: str | None = None
|
||||||
ssdp_headers: Mapping[str, Any] = field(default_factory=dict)
|
ssdp_headers: Mapping[str, Any] = field(default_factory=dict)
|
||||||
|
ssdp_all_locations: set[str] = field(default_factory=set)
|
||||||
x_homeassistant_matching_domains: set[str] = field(default_factory=set)
|
x_homeassistant_matching_domains: set[str] = field(default_factory=set)
|
||||||
|
|
||||||
|
|
||||||
|
@ -283,6 +284,7 @@ class Scanner:
|
||||||
self.hass = hass
|
self.hass = hass
|
||||||
self._cancel_scan: Callable[[], None] | None = None
|
self._cancel_scan: Callable[[], None] | None = None
|
||||||
self._ssdp_listeners: list[SsdpListener] = []
|
self._ssdp_listeners: list[SsdpListener] = []
|
||||||
|
self._device_tracker = SsdpDeviceTracker()
|
||||||
self._callbacks: list[tuple[SsdpCallback, dict[str, str]]] = []
|
self._callbacks: list[tuple[SsdpCallback, dict[str, str]]] = []
|
||||||
self._description_cache: DescriptionCache | None = None
|
self._description_cache: DescriptionCache | None = None
|
||||||
self.integration_matchers = integration_matchers
|
self.integration_matchers = integration_matchers
|
||||||
|
@ -290,21 +292,7 @@ class Scanner:
|
||||||
@property
|
@property
|
||||||
def _ssdp_devices(self) -> list[SsdpDevice]:
|
def _ssdp_devices(self) -> list[SsdpDevice]:
|
||||||
"""Get all seen devices."""
|
"""Get all seen devices."""
|
||||||
return [
|
return list(self._device_tracker.devices.values())
|
||||||
ssdp_device
|
|
||||||
for ssdp_listener in self._ssdp_listeners
|
|
||||||
for ssdp_device in ssdp_listener.devices.values()
|
|
||||||
]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def _all_headers_from_ssdp_devices(
|
|
||||||
self,
|
|
||||||
) -> dict[tuple[str, str], CaseInsensitiveDict]:
|
|
||||||
return {
|
|
||||||
(ssdp_device.udn, dst): headers
|
|
||||||
for ssdp_device in self._ssdp_devices
|
|
||||||
for dst, headers in ssdp_device.all_combined_headers.items()
|
|
||||||
}
|
|
||||||
|
|
||||||
async def async_register_callback(
|
async def async_register_callback(
|
||||||
self, callback: SsdpCallback, match_dict: None | dict[str, str] = None
|
self, callback: SsdpCallback, match_dict: None | dict[str, str] = None
|
||||||
|
@ -317,13 +305,16 @@ class Scanner:
|
||||||
|
|
||||||
# Make sure any entries that happened
|
# Make sure any entries that happened
|
||||||
# before the callback was registered are fired
|
# before the callback was registered are fired
|
||||||
for headers in self._all_headers_from_ssdp_devices.values():
|
for ssdp_device in self._ssdp_devices:
|
||||||
if _async_headers_match(headers, lower_match_dict):
|
for headers in ssdp_device.all_combined_headers.values():
|
||||||
await _async_process_callbacks(
|
if _async_headers_match(headers, lower_match_dict):
|
||||||
[callback],
|
await _async_process_callbacks(
|
||||||
await self._async_headers_to_discovery_info(headers),
|
[callback],
|
||||||
SsdpChange.ALIVE,
|
await self._async_headers_to_discovery_info(
|
||||||
)
|
ssdp_device, headers
|
||||||
|
),
|
||||||
|
SsdpChange.ALIVE,
|
||||||
|
)
|
||||||
|
|
||||||
callback_entry = (callback, lower_match_dict)
|
callback_entry = (callback, lower_match_dict)
|
||||||
self._callbacks.append(callback_entry)
|
self._callbacks.append(callback_entry)
|
||||||
|
@ -386,7 +377,6 @@ class Scanner:
|
||||||
async def _async_start_ssdp_listeners(self) -> None:
|
async def _async_start_ssdp_listeners(self) -> None:
|
||||||
"""Start the SSDP Listeners."""
|
"""Start the SSDP Listeners."""
|
||||||
# Devices are shared between all sources.
|
# Devices are shared between all sources.
|
||||||
device_tracker = SsdpDeviceTracker()
|
|
||||||
for source_ip in await async_build_source_set(self.hass):
|
for source_ip in await async_build_source_set(self.hass):
|
||||||
source_ip_str = str(source_ip)
|
source_ip_str = str(source_ip)
|
||||||
if source_ip.version == 6:
|
if source_ip.version == 6:
|
||||||
|
@ -405,7 +395,7 @@ class Scanner:
|
||||||
callback=self._ssdp_listener_callback,
|
callback=self._ssdp_listener_callback,
|
||||||
source=source,
|
source=source,
|
||||||
target=target,
|
target=target,
|
||||||
device_tracker=device_tracker,
|
device_tracker=self._device_tracker,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
results = await asyncio.gather(
|
results = await asyncio.gather(
|
||||||
|
@ -454,14 +444,16 @@ class Scanner:
|
||||||
if info_desc is None:
|
if info_desc is None:
|
||||||
# Fetch info desc in separate task and process from there.
|
# Fetch info desc in separate task and process from there.
|
||||||
self.hass.async_create_task(
|
self.hass.async_create_task(
|
||||||
self._ssdp_listener_process_with_lookup(ssdp_device, dst, source)
|
self._ssdp_listener_process_callback_with_lookup(
|
||||||
|
ssdp_device, dst, source
|
||||||
|
)
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Info desc known, process directly.
|
# Info desc known, process directly.
|
||||||
self._ssdp_listener_process(ssdp_device, dst, source, info_desc)
|
self._ssdp_listener_process_callback(ssdp_device, dst, source, info_desc)
|
||||||
|
|
||||||
async def _ssdp_listener_process_with_lookup(
|
async def _ssdp_listener_process_callback_with_lookup(
|
||||||
self,
|
self,
|
||||||
ssdp_device: SsdpDevice,
|
ssdp_device: SsdpDevice,
|
||||||
dst: DeviceOrServiceType,
|
dst: DeviceOrServiceType,
|
||||||
|
@ -469,14 +461,14 @@ class Scanner:
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Handle a device/service change."""
|
"""Handle a device/service change."""
|
||||||
location = ssdp_device.location
|
location = ssdp_device.location
|
||||||
self._ssdp_listener_process(
|
self._ssdp_listener_process_callback(
|
||||||
ssdp_device,
|
ssdp_device,
|
||||||
dst,
|
dst,
|
||||||
source,
|
source,
|
||||||
await self._async_get_description_dict(location),
|
await self._async_get_description_dict(location),
|
||||||
)
|
)
|
||||||
|
|
||||||
def _ssdp_listener_process(
|
def _ssdp_listener_process_callback(
|
||||||
self,
|
self,
|
||||||
ssdp_device: SsdpDevice,
|
ssdp_device: SsdpDevice,
|
||||||
dst: DeviceOrServiceType,
|
dst: DeviceOrServiceType,
|
||||||
|
@ -502,7 +494,7 @@ class Scanner:
|
||||||
return
|
return
|
||||||
|
|
||||||
discovery_info = discovery_info_from_headers_and_description(
|
discovery_info = discovery_info_from_headers_and_description(
|
||||||
combined_headers, info_desc
|
ssdp_device, combined_headers, info_desc
|
||||||
)
|
)
|
||||||
discovery_info.x_homeassistant_matching_domains = matching_domains
|
discovery_info.x_homeassistant_matching_domains = matching_domains
|
||||||
|
|
||||||
|
@ -557,7 +549,7 @@ class Scanner:
|
||||||
return await self._description_cache.async_get_description_dict(location) or {}
|
return await self._description_cache.async_get_description_dict(location) or {}
|
||||||
|
|
||||||
async def _async_headers_to_discovery_info(
|
async def _async_headers_to_discovery_info(
|
||||||
self, headers: CaseInsensitiveDict
|
self, ssdp_device: SsdpDevice, headers: CaseInsensitiveDict
|
||||||
) -> SsdpServiceInfo:
|
) -> SsdpServiceInfo:
|
||||||
"""Combine the headers and description into discovery_info.
|
"""Combine the headers and description into discovery_info.
|
||||||
|
|
||||||
|
@ -567,34 +559,42 @@ class Scanner:
|
||||||
|
|
||||||
location = headers["location"]
|
location = headers["location"]
|
||||||
info_desc = await self._async_get_description_dict(location)
|
info_desc = await self._async_get_description_dict(location)
|
||||||
return discovery_info_from_headers_and_description(headers, info_desc)
|
return discovery_info_from_headers_and_description(
|
||||||
|
ssdp_device, headers, info_desc
|
||||||
|
)
|
||||||
|
|
||||||
async def async_get_discovery_info_by_udn_st(
|
async def async_get_discovery_info_by_udn_st(
|
||||||
self, udn: str, st: str
|
self, udn: str, st: str
|
||||||
) -> SsdpServiceInfo | None:
|
) -> SsdpServiceInfo | None:
|
||||||
"""Return discovery_info for a udn and st."""
|
"""Return discovery_info for a udn and st."""
|
||||||
if headers := self._all_headers_from_ssdp_devices.get((udn, st)):
|
for ssdp_device in self._ssdp_devices:
|
||||||
return await self._async_headers_to_discovery_info(headers)
|
if ssdp_device.udn == udn:
|
||||||
|
if headers := ssdp_device.combined_headers(st):
|
||||||
|
return await self._async_headers_to_discovery_info(
|
||||||
|
ssdp_device, headers
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def async_get_discovery_info_by_st(self, st: str) -> list[SsdpServiceInfo]:
|
async def async_get_discovery_info_by_st(self, st: str) -> list[SsdpServiceInfo]:
|
||||||
"""Return matching discovery_infos for a st."""
|
"""Return matching discovery_infos for a st."""
|
||||||
return [
|
return [
|
||||||
await self._async_headers_to_discovery_info(headers)
|
await self._async_headers_to_discovery_info(ssdp_device, headers)
|
||||||
for udn_st, headers in self._all_headers_from_ssdp_devices.items()
|
for ssdp_device in self._ssdp_devices
|
||||||
if udn_st[1] == st
|
if (headers := ssdp_device.combined_headers(st))
|
||||||
]
|
]
|
||||||
|
|
||||||
async def async_get_discovery_info_by_udn(self, udn: str) -> list[SsdpServiceInfo]:
|
async def async_get_discovery_info_by_udn(self, udn: str) -> list[SsdpServiceInfo]:
|
||||||
"""Return matching discovery_infos for a udn."""
|
"""Return matching discovery_infos for a udn."""
|
||||||
return [
|
return [
|
||||||
await self._async_headers_to_discovery_info(headers)
|
await self._async_headers_to_discovery_info(ssdp_device, headers)
|
||||||
for udn_st, headers in self._all_headers_from_ssdp_devices.items()
|
for ssdp_device in self._ssdp_devices
|
||||||
if udn_st[0] == udn
|
for headers in ssdp_device.all_combined_headers.values()
|
||||||
|
if ssdp_device.udn == udn
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def discovery_info_from_headers_and_description(
|
def discovery_info_from_headers_and_description(
|
||||||
|
ssdp_device: SsdpDevice,
|
||||||
combined_headers: CaseInsensitiveDict,
|
combined_headers: CaseInsensitiveDict,
|
||||||
info_desc: Mapping[str, Any],
|
info_desc: Mapping[str, Any],
|
||||||
) -> SsdpServiceInfo:
|
) -> SsdpServiceInfo:
|
||||||
|
@ -627,6 +627,7 @@ def discovery_info_from_headers_and_description(
|
||||||
ssdp_nt=combined_headers.get_lower("nt"),
|
ssdp_nt=combined_headers.get_lower("nt"),
|
||||||
ssdp_headers=combined_headers,
|
ssdp_headers=combined_headers,
|
||||||
upnp=upnp_info,
|
upnp=upnp_info,
|
||||||
|
ssdp_all_locations=set(ssdp_device.locations),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@ from .const import (
|
||||||
LOGGER,
|
LOGGER,
|
||||||
)
|
)
|
||||||
from .coordinator import UpnpDataUpdateCoordinator
|
from .coordinator import UpnpDataUpdateCoordinator
|
||||||
from .device import async_create_device
|
from .device import async_create_device, get_preferred_location
|
||||||
|
|
||||||
NOTIFICATION_ID = "upnp_notification"
|
NOTIFICATION_ID = "upnp_notification"
|
||||||
NOTIFICATION_TITLE = "UPnP/IGD Setup"
|
NOTIFICATION_TITLE = "UPnP/IGD Setup"
|
||||||
|
@ -57,7 +57,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
return
|
return
|
||||||
|
|
||||||
nonlocal discovery_info
|
nonlocal discovery_info
|
||||||
LOGGER.debug("Device discovered: %s, at: %s", usn, headers.ssdp_location)
|
LOGGER.debug("Device discovered: %s, at: %s", usn, headers.ssdp_all_locations)
|
||||||
discovery_info = headers
|
discovery_info = headers
|
||||||
device_discovered_event.set()
|
device_discovered_event.set()
|
||||||
|
|
||||||
|
@ -79,8 +79,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
|
|
||||||
# Create device.
|
# Create device.
|
||||||
assert discovery_info is not None
|
assert discovery_info is not None
|
||||||
assert discovery_info.ssdp_location is not None
|
assert discovery_info.ssdp_all_locations
|
||||||
location = discovery_info.ssdp_location
|
location = get_preferred_location(discovery_info.ssdp_all_locations)
|
||||||
try:
|
try:
|
||||||
device = await async_create_device(hass, location)
|
device = await async_create_device(hass, location)
|
||||||
except UpnpConnectionError as err:
|
except UpnpConnectionError as err:
|
||||||
|
|
|
@ -3,6 +3,7 @@ from __future__ import annotations
|
||||||
|
|
||||||
from collections.abc import Mapping
|
from collections.abc import Mapping
|
||||||
from typing import Any, cast
|
from typing import Any, cast
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
|
||||||
|
@ -25,7 +26,7 @@ from .const import (
|
||||||
ST_IGD_V1,
|
ST_IGD_V1,
|
||||||
ST_IGD_V2,
|
ST_IGD_V2,
|
||||||
)
|
)
|
||||||
from .device import async_get_mac_address_from_host
|
from .device import async_get_mac_address_from_host, get_preferred_location
|
||||||
|
|
||||||
|
|
||||||
def _friendly_name_from_discovery(discovery_info: ssdp.SsdpServiceInfo) -> str:
|
def _friendly_name_from_discovery(discovery_info: ssdp.SsdpServiceInfo) -> str:
|
||||||
|
@ -43,7 +44,7 @@ def _is_complete_discovery(discovery_info: ssdp.SsdpServiceInfo) -> bool:
|
||||||
return bool(
|
return bool(
|
||||||
ssdp.ATTR_UPNP_UDN in discovery_info.upnp
|
ssdp.ATTR_UPNP_UDN in discovery_info.upnp
|
||||||
and discovery_info.ssdp_st
|
and discovery_info.ssdp_st
|
||||||
and discovery_info.ssdp_location
|
and discovery_info.ssdp_all_locations
|
||||||
and discovery_info.ssdp_usn
|
and discovery_info.ssdp_usn
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -61,7 +62,9 @@ async def _async_mac_address_from_discovery(
|
||||||
hass: HomeAssistant, discovery: SsdpServiceInfo
|
hass: HomeAssistant, discovery: SsdpServiceInfo
|
||||||
) -> str | None:
|
) -> str | None:
|
||||||
"""Get the mac address from a discovery."""
|
"""Get the mac address from a discovery."""
|
||||||
host = discovery.ssdp_headers["_host"]
|
location = get_preferred_location(discovery.ssdp_all_locations)
|
||||||
|
host = urlparse(location).hostname
|
||||||
|
assert host is not None
|
||||||
return await async_get_mac_address_from_host(hass, host)
|
return await async_get_mac_address_from_host(hass, host)
|
||||||
|
|
||||||
|
|
||||||
|
@ -178,7 +181,9 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||||
# when the location changes, the entry is reloaded.
|
# when the location changes, the entry is reloaded.
|
||||||
updates={
|
updates={
|
||||||
CONFIG_ENTRY_MAC_ADDRESS: mac_address,
|
CONFIG_ENTRY_MAC_ADDRESS: mac_address,
|
||||||
CONFIG_ENTRY_LOCATION: discovery_info.ssdp_location,
|
CONFIG_ENTRY_LOCATION: get_preferred_location(
|
||||||
|
discovery_info.ssdp_all_locations
|
||||||
|
),
|
||||||
CONFIG_ENTRY_HOST: host,
|
CONFIG_ENTRY_HOST: host,
|
||||||
CONFIG_ENTRY_ST: discovery_info.ssdp_st,
|
CONFIG_ENTRY_ST: discovery_info.ssdp_st,
|
||||||
},
|
},
|
||||||
|
@ -249,7 +254,7 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||||
CONFIG_ENTRY_ORIGINAL_UDN: discovery.upnp[ssdp.ATTR_UPNP_UDN],
|
CONFIG_ENTRY_ORIGINAL_UDN: discovery.upnp[ssdp.ATTR_UPNP_UDN],
|
||||||
CONFIG_ENTRY_MAC_ADDRESS: mac_address,
|
CONFIG_ENTRY_MAC_ADDRESS: mac_address,
|
||||||
CONFIG_ENTRY_HOST: discovery.ssdp_headers["_host"],
|
CONFIG_ENTRY_HOST: discovery.ssdp_headers["_host"],
|
||||||
CONFIG_ENTRY_LOCATION: discovery.ssdp_location,
|
CONFIG_ENTRY_LOCATION: get_preferred_location(discovery.ssdp_all_locations),
|
||||||
}
|
}
|
||||||
|
|
||||||
await self.async_set_unique_id(user_input["unique_id"], raise_on_progress=False)
|
await self.async_set_unique_id(user_input["unique_id"], raise_on_progress=False)
|
||||||
|
@ -271,7 +276,7 @@ class UpnpFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||||
CONFIG_ENTRY_UDN: discovery.upnp[ssdp.ATTR_UPNP_UDN],
|
CONFIG_ENTRY_UDN: discovery.upnp[ssdp.ATTR_UPNP_UDN],
|
||||||
CONFIG_ENTRY_ST: discovery.ssdp_st,
|
CONFIG_ENTRY_ST: discovery.ssdp_st,
|
||||||
CONFIG_ENTRY_ORIGINAL_UDN: discovery.upnp[ssdp.ATTR_UPNP_UDN],
|
CONFIG_ENTRY_ORIGINAL_UDN: discovery.upnp[ssdp.ATTR_UPNP_UDN],
|
||||||
CONFIG_ENTRY_LOCATION: discovery.ssdp_location,
|
CONFIG_ENTRY_LOCATION: get_preferred_location(discovery.ssdp_all_locations),
|
||||||
CONFIG_ENTRY_MAC_ADDRESS: mac_address,
|
CONFIG_ENTRY_MAC_ADDRESS: mac_address,
|
||||||
CONFIG_ENTRY_HOST: discovery.ssdp_headers["_host"],
|
CONFIG_ENTRY_HOST: discovery.ssdp_headers["_host"],
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,22 @@ from .const import (
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_preferred_location(locations: set[str]) -> str:
|
||||||
|
"""Get the preferred location (an IPv4 location) from a set of locations."""
|
||||||
|
# Prefer IPv4 over IPv6.
|
||||||
|
for location in locations:
|
||||||
|
if location.startswith("http://[") or location.startswith("https://["):
|
||||||
|
continue
|
||||||
|
|
||||||
|
return location
|
||||||
|
|
||||||
|
# Fallback to any.
|
||||||
|
for location in locations:
|
||||||
|
return location
|
||||||
|
|
||||||
|
raise ValueError("No location found")
|
||||||
|
|
||||||
|
|
||||||
async def async_get_mac_address_from_host(hass: HomeAssistant, host: str) -> str | None:
|
async def async_get_mac_address_from_host(hass: HomeAssistant, host: str) -> str | None:
|
||||||
"""Get mac address from host."""
|
"""Get mac address from host."""
|
||||||
ip_addr = ip_address(host)
|
ip_addr = ip_address(host)
|
||||||
|
@ -47,13 +63,13 @@ async def async_get_mac_address_from_host(hass: HomeAssistant, host: str) -> str
|
||||||
return mac_address
|
return mac_address
|
||||||
|
|
||||||
|
|
||||||
async def async_create_device(hass: HomeAssistant, ssdp_location: str) -> Device:
|
async def async_create_device(hass: HomeAssistant, location: str) -> Device:
|
||||||
"""Create UPnP/IGD device."""
|
"""Create UPnP/IGD device."""
|
||||||
session = async_get_clientsession(hass, verify_ssl=False)
|
session = async_get_clientsession(hass, verify_ssl=False)
|
||||||
requester = AiohttpSessionRequester(session, with_sleep=True, timeout=20)
|
requester = AiohttpSessionRequester(session, with_sleep=True, timeout=20)
|
||||||
|
|
||||||
factory = UpnpFactory(requester, non_strict=True)
|
factory = UpnpFactory(requester, non_strict=True)
|
||||||
upnp_device = await factory.async_create_device(ssdp_location)
|
upnp_device = await factory.async_create_device(location)
|
||||||
|
|
||||||
# Create profile wrapper.
|
# Create profile wrapper.
|
||||||
igd_device = IgdDevice(upnp_device, None)
|
igd_device = IgdDevice(upnp_device, None)
|
||||||
|
@ -119,8 +135,7 @@ class Device:
|
||||||
@property
|
@property
|
||||||
def host(self) -> str | None:
|
def host(self) -> str | None:
|
||||||
"""Get the hostname."""
|
"""Get the hostname."""
|
||||||
url = self._igd_device.device.device_url
|
parsed = urlparse(self.device_url)
|
||||||
parsed = urlparse(url)
|
|
||||||
return parsed.hostname
|
return parsed.hostname
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
"""Configuration for SSDP tests."""
|
"""Configuration for SSDP tests."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import copy
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from unittest.mock import AsyncMock, MagicMock, PropertyMock, create_autospec, patch
|
from unittest.mock import AsyncMock, MagicMock, PropertyMock, create_autospec, patch
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
@ -26,6 +27,7 @@ TEST_UDN = "uuid:device"
|
||||||
TEST_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
|
TEST_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
|
||||||
TEST_USN = f"{TEST_UDN}::{TEST_ST}"
|
TEST_USN = f"{TEST_UDN}::{TEST_ST}"
|
||||||
TEST_LOCATION = "http://192.168.1.1/desc.xml"
|
TEST_LOCATION = "http://192.168.1.1/desc.xml"
|
||||||
|
TEST_LOCATION6 = "http://[fe80::1%2]/desc.xml"
|
||||||
TEST_HOST = urlparse(TEST_LOCATION).hostname
|
TEST_HOST = urlparse(TEST_LOCATION).hostname
|
||||||
TEST_FRIENDLY_NAME = "mock-name"
|
TEST_FRIENDLY_NAME = "mock-name"
|
||||||
TEST_MAC_ADDRESS = "00:11:22:33:44:55"
|
TEST_MAC_ADDRESS = "00:11:22:33:44:55"
|
||||||
|
@ -48,11 +50,23 @@ TEST_DISCOVERY = ssdp.SsdpServiceInfo(
|
||||||
ssdp_headers={
|
ssdp_headers={
|
||||||
"_host": TEST_HOST,
|
"_host": TEST_HOST,
|
||||||
},
|
},
|
||||||
|
ssdp_all_locations={
|
||||||
|
TEST_LOCATION,
|
||||||
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_async_create_device():
|
||||||
|
"""Mock async_upnp_client create device."""
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.upnp.device.UpnpFactory.async_create_device"
|
||||||
|
) as mock_create:
|
||||||
|
yield mock_create
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def mock_igd_device() -> IgdDevice:
|
def mock_igd_device(mock_async_create_device) -> IgdDevice:
|
||||||
"""Mock async_upnp_client device."""
|
"""Mock async_upnp_client device."""
|
||||||
mock_upnp_device = create_autospec(UpnpDevice, instance=True)
|
mock_upnp_device = create_autospec(UpnpDevice, instance=True)
|
||||||
mock_upnp_device.device_url = TEST_DISCOVERY.ssdp_location
|
mock_upnp_device.device_url = TEST_DISCOVERY.ssdp_location
|
||||||
|
@ -85,8 +99,6 @@ def mock_igd_device() -> IgdDevice:
|
||||||
)
|
)
|
||||||
|
|
||||||
with patch(
|
with patch(
|
||||||
"homeassistant.components.upnp.device.UpnpFactory.async_create_device"
|
|
||||||
), patch(
|
|
||||||
"homeassistant.components.upnp.device.IgdDevice.__new__",
|
"homeassistant.components.upnp.device.IgdDevice.__new__",
|
||||||
return_value=mock_igd_device,
|
return_value=mock_igd_device,
|
||||||
):
|
):
|
||||||
|
@ -140,7 +152,7 @@ async def silent_ssdp_scanner(hass):
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
async def ssdp_instant_discovery():
|
async def ssdp_instant_discovery():
|
||||||
"""Instance discovery."""
|
"""Instant discovery."""
|
||||||
|
|
||||||
# Set up device discovery callback.
|
# Set up device discovery callback.
|
||||||
async def register_callback(hass, callback, match_dict):
|
async def register_callback(hass, callback, match_dict):
|
||||||
|
@ -158,6 +170,30 @@ async def ssdp_instant_discovery():
|
||||||
yield (mock_register, mock_get_info)
|
yield (mock_register, mock_get_info)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def ssdp_instant_discovery_multi_location():
|
||||||
|
"""Instant discovery."""
|
||||||
|
|
||||||
|
test_discovery = copy.deepcopy(TEST_DISCOVERY)
|
||||||
|
test_discovery.ssdp_location = TEST_LOCATION6 # "Default" location is IPv6.
|
||||||
|
test_discovery.ssdp_all_locations = {TEST_LOCATION6, TEST_LOCATION}
|
||||||
|
|
||||||
|
# Set up device discovery callback.
|
||||||
|
async def register_callback(hass, callback, match_dict):
|
||||||
|
"""Immediately do callback."""
|
||||||
|
await callback(test_discovery, ssdp.SsdpChange.ALIVE)
|
||||||
|
return MagicMock()
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.ssdp.async_register_callback",
|
||||||
|
side_effect=register_callback,
|
||||||
|
) as mock_register, patch(
|
||||||
|
"homeassistant.components.ssdp.async_get_discovery_info_by_st",
|
||||||
|
return_value=[test_discovery],
|
||||||
|
) as mock_get_info:
|
||||||
|
yield (mock_register, mock_get_info)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
async def ssdp_no_discovery():
|
async def ssdp_no_discovery():
|
||||||
"""No discovery."""
|
"""No discovery."""
|
||||||
|
@ -197,6 +233,8 @@ async def mock_config_entry(
|
||||||
CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS,
|
CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Store igd_device for binary_sensor/sensor tests.
|
||||||
entry.igd_device = mock_igd_device
|
entry.igd_device = mock_igd_device
|
||||||
|
|
||||||
# Load config_entry.
|
# Load config_entry.
|
||||||
|
|
|
@ -134,6 +134,7 @@ async def test_flow_ssdp_non_igd_device(hass: HomeAssistant) -> None:
|
||||||
ssdp_usn=TEST_USN,
|
ssdp_usn=TEST_USN,
|
||||||
ssdp_st=TEST_ST,
|
ssdp_st=TEST_ST,
|
||||||
ssdp_location=TEST_LOCATION,
|
ssdp_location=TEST_LOCATION,
|
||||||
|
ssdp_all_locations=[TEST_LOCATION],
|
||||||
upnp={
|
upnp={
|
||||||
ssdp.ATTR_UPNP_DEVICE_TYPE: "urn:schemas-upnp-org:device:WFADevice:1", # Non-IGD
|
ssdp.ATTR_UPNP_DEVICE_TYPE: "urn:schemas-upnp-org:device:WFADevice:1", # Non-IGD
|
||||||
ssdp.ATTR_UPNP_UDN: TEST_UDN,
|
ssdp.ATTR_UPNP_UDN: TEST_UDN,
|
||||||
|
@ -324,6 +325,7 @@ async def test_flow_ssdp_discovery_changed_location(hass: HomeAssistant) -> None
|
||||||
new_location = TEST_DISCOVERY.ssdp_location + "2"
|
new_location = TEST_DISCOVERY.ssdp_location + "2"
|
||||||
new_discovery = deepcopy(TEST_DISCOVERY)
|
new_discovery = deepcopy(TEST_DISCOVERY)
|
||||||
new_discovery.ssdp_location = new_location
|
new_discovery.ssdp_location = new_location
|
||||||
|
new_discovery.ssdp_all_locations = {new_location}
|
||||||
result = await hass.config_entries.flow.async_init(
|
result = await hass.config_entries.flow.async_init(
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
context={"source": config_entries.SOURCE_SSDP},
|
context={"source": config_entries.SOURCE_SSDP},
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
"""Test UPnP/IGD setup process."""
|
"""Test UPnP/IGD setup process."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from unittest.mock import AsyncMock
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from homeassistant.components.upnp.const import (
|
from homeassistant.components.upnp.const import (
|
||||||
|
@ -60,3 +62,35 @@ async def test_async_setup_entry_default_no_mac_address(hass: HomeAssistant) ->
|
||||||
# Load config_entry.
|
# Load config_entry.
|
||||||
entry.add_to_hass(hass)
|
entry.add_to_hass(hass)
|
||||||
assert await hass.config_entries.async_setup(entry.entry_id) is True
|
assert await hass.config_entries.async_setup(entry.entry_id) is True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures(
|
||||||
|
"ssdp_instant_discovery_multi_location",
|
||||||
|
"mock_get_source_ip",
|
||||||
|
"mock_mac_address_from_host",
|
||||||
|
)
|
||||||
|
async def test_async_setup_entry_multi_location(
|
||||||
|
hass: HomeAssistant, mock_async_create_device: AsyncMock
|
||||||
|
) -> None:
|
||||||
|
"""Test async_setup_entry for a device both seen via IPv4 and IPv6.
|
||||||
|
|
||||||
|
The resulting IPv4 location is preferred/stored.
|
||||||
|
"""
|
||||||
|
entry = MockConfigEntry(
|
||||||
|
domain=DOMAIN,
|
||||||
|
unique_id=TEST_USN,
|
||||||
|
data={
|
||||||
|
CONFIG_ENTRY_ST: TEST_ST,
|
||||||
|
CONFIG_ENTRY_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_ORIGINAL_UDN: TEST_UDN,
|
||||||
|
CONFIG_ENTRY_LOCATION: TEST_LOCATION,
|
||||||
|
CONFIG_ENTRY_MAC_ADDRESS: TEST_MAC_ADDRESS,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Load config_entry.
|
||||||
|
entry.add_to_hass(hass)
|
||||||
|
assert await hass.config_entries.async_setup(entry.entry_id) is True
|
||||||
|
|
||||||
|
# Ensure that the IPv4 location is used.
|
||||||
|
mock_async_create_device.assert_called_once_with(TEST_LOCATION)
|
||||||
|
|
Loading…
Reference in New Issue