Fix Xiaomi BLE not detecting encryption for some devices (#75851)
parent
4ed0463438
commit
10356b9379
|
@ -1,6 +1,7 @@
|
||||||
"""The bluetooth integration."""
|
"""The bluetooth integration."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from asyncio import Future
|
||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
@ -8,6 +9,7 @@ from enum import Enum
|
||||||
import logging
|
import logging
|
||||||
from typing import Final, Union
|
from typing import Final, Union
|
||||||
|
|
||||||
|
import async_timeout
|
||||||
from bleak import BleakError
|
from bleak import BleakError
|
||||||
from bleak.backends.device import BLEDevice
|
from bleak.backends.device import BLEDevice
|
||||||
from bleak.backends.scanner import AdvertisementData
|
from bleak.backends.scanner import AdvertisementData
|
||||||
|
@ -95,6 +97,9 @@ BluetoothChange = Enum("BluetoothChange", "ADVERTISEMENT")
|
||||||
BluetoothCallback = Callable[
|
BluetoothCallback = Callable[
|
||||||
[Union[BluetoothServiceInfoBleak, BluetoothServiceInfo], BluetoothChange], None
|
[Union[BluetoothServiceInfoBleak, BluetoothServiceInfo], BluetoothChange], None
|
||||||
]
|
]
|
||||||
|
ProcessAdvertisementCallback = Callable[
|
||||||
|
[Union[BluetoothServiceInfoBleak, BluetoothServiceInfo]], bool
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
@hass_callback
|
@hass_callback
|
||||||
|
@ -159,6 +164,31 @@ def async_register_callback(
|
||||||
return manager.async_register_callback(callback, match_dict)
|
return manager.async_register_callback(callback, match_dict)
|
||||||
|
|
||||||
|
|
||||||
|
async def async_process_advertisements(
|
||||||
|
hass: HomeAssistant,
|
||||||
|
callback: ProcessAdvertisementCallback,
|
||||||
|
match_dict: BluetoothCallbackMatcher,
|
||||||
|
timeout: int,
|
||||||
|
) -> BluetoothServiceInfo:
|
||||||
|
"""Process advertisements until callback returns true or timeout expires."""
|
||||||
|
done: Future[BluetoothServiceInfo] = Future()
|
||||||
|
|
||||||
|
@hass_callback
|
||||||
|
def _async_discovered_device(
|
||||||
|
service_info: BluetoothServiceInfo, change: BluetoothChange
|
||||||
|
) -> None:
|
||||||
|
if callback(service_info):
|
||||||
|
done.set_result(service_info)
|
||||||
|
|
||||||
|
unload = async_register_callback(hass, _async_discovered_device, match_dict)
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with async_timeout.timeout(timeout):
|
||||||
|
return await done
|
||||||
|
finally:
|
||||||
|
unload()
|
||||||
|
|
||||||
|
|
||||||
@hass_callback
|
@hass_callback
|
||||||
def async_track_unavailable(
|
def async_track_unavailable(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
"""Config flow for Xiaomi Bluetooth integration."""
|
"""Config flow for Xiaomi Bluetooth integration."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import dataclasses
|
import dataclasses
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
@ -12,6 +13,7 @@ from homeassistant.components import onboarding
|
||||||
from homeassistant.components.bluetooth import (
|
from homeassistant.components.bluetooth import (
|
||||||
BluetoothServiceInfo,
|
BluetoothServiceInfo,
|
||||||
async_discovered_service_info,
|
async_discovered_service_info,
|
||||||
|
async_process_advertisements,
|
||||||
)
|
)
|
||||||
from homeassistant.config_entries import ConfigFlow
|
from homeassistant.config_entries import ConfigFlow
|
||||||
from homeassistant.const import CONF_ADDRESS
|
from homeassistant.const import CONF_ADDRESS
|
||||||
|
@ -19,6 +21,9 @@ from homeassistant.data_entry_flow import FlowResult
|
||||||
|
|
||||||
from .const import DOMAIN
|
from .const import DOMAIN
|
||||||
|
|
||||||
|
# How long to wait for additional advertisement packets if we don't have the right ones
|
||||||
|
ADDITIONAL_DISCOVERY_TIMEOUT = 5
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
class Discovery:
|
class Discovery:
|
||||||
|
@ -44,6 +49,24 @@ class XiaomiConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||||
self._discovered_device: DeviceData | None = None
|
self._discovered_device: DeviceData | None = None
|
||||||
self._discovered_devices: dict[str, Discovery] = {}
|
self._discovered_devices: dict[str, Discovery] = {}
|
||||||
|
|
||||||
|
async def _async_wait_for_full_advertisement(
|
||||||
|
self, discovery_info: BluetoothServiceInfo, device: DeviceData
|
||||||
|
) -> BluetoothServiceInfo:
|
||||||
|
"""Sometimes first advertisement we receive is blank or incomplete. Wait until we get a useful one."""
|
||||||
|
if not device.pending:
|
||||||
|
return discovery_info
|
||||||
|
|
||||||
|
def _process_more_advertisements(service_info: BluetoothServiceInfo) -> bool:
|
||||||
|
device.update(service_info)
|
||||||
|
return not device.pending
|
||||||
|
|
||||||
|
return await async_process_advertisements(
|
||||||
|
self.hass,
|
||||||
|
_process_more_advertisements,
|
||||||
|
{"address": discovery_info.address},
|
||||||
|
ADDITIONAL_DISCOVERY_TIMEOUT,
|
||||||
|
)
|
||||||
|
|
||||||
async def async_step_bluetooth(
|
async def async_step_bluetooth(
|
||||||
self, discovery_info: BluetoothServiceInfo
|
self, discovery_info: BluetoothServiceInfo
|
||||||
) -> FlowResult:
|
) -> FlowResult:
|
||||||
|
@ -53,6 +76,16 @@ class XiaomiConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||||
device = DeviceData()
|
device = DeviceData()
|
||||||
if not device.supported(discovery_info):
|
if not device.supported(discovery_info):
|
||||||
return self.async_abort(reason="not_supported")
|
return self.async_abort(reason="not_supported")
|
||||||
|
|
||||||
|
# Wait until we have received enough information about this device to detect its encryption type
|
||||||
|
try:
|
||||||
|
discovery_info = await self._async_wait_for_full_advertisement(
|
||||||
|
discovery_info, device
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
# If we don't see a valid packet within the timeout then this device is not supported.
|
||||||
|
return self.async_abort(reason="not_supported")
|
||||||
|
|
||||||
self._discovery_info = discovery_info
|
self._discovery_info = discovery_info
|
||||||
self._discovered_device = device
|
self._discovered_device = device
|
||||||
|
|
||||||
|
@ -161,13 +194,20 @@ class XiaomiConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||||
await self.async_set_unique_id(address, raise_on_progress=False)
|
await self.async_set_unique_id(address, raise_on_progress=False)
|
||||||
discovery = self._discovered_devices[address]
|
discovery = self._discovered_devices[address]
|
||||||
|
|
||||||
|
# Wait until we have received enough information about this device to detect its encryption type
|
||||||
|
try:
|
||||||
|
self._discovery_info = await self._async_wait_for_full_advertisement(
|
||||||
|
discovery.discovery_info, discovery.device
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
# If we don't see a valid packet within the timeout then this device is not supported.
|
||||||
|
return self.async_abort(reason="not_supported")
|
||||||
|
|
||||||
if discovery.device.encryption_scheme == EncryptionScheme.MIBEACON_LEGACY:
|
if discovery.device.encryption_scheme == EncryptionScheme.MIBEACON_LEGACY:
|
||||||
self._discovery_info = discovery.discovery_info
|
|
||||||
self.context["title_placeholders"] = {"name": discovery.title}
|
self.context["title_placeholders"] = {"name": discovery.title}
|
||||||
return await self.async_step_get_encryption_key_legacy()
|
return await self.async_step_get_encryption_key_legacy()
|
||||||
|
|
||||||
if discovery.device.encryption_scheme == EncryptionScheme.MIBEACON_4_5:
|
if discovery.device.encryption_scheme == EncryptionScheme.MIBEACON_4_5:
|
||||||
self._discovery_info = discovery.discovery_info
|
|
||||||
self.context["title_placeholders"] = {"name": discovery.title}
|
self.context["title_placeholders"] = {"name": discovery.title}
|
||||||
return await self.async_step_get_encryption_key_4_5()
|
return await self.async_step_get_encryption_key_4_5()
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
"service_uuid": "0000fe95-0000-1000-8000-00805f9b34fb"
|
"service_uuid": "0000fe95-0000-1000-8000-00805f9b34fb"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"requirements": ["xiaomi-ble==0.6.1"],
|
"requirements": ["xiaomi-ble==0.6.2"],
|
||||||
"dependencies": ["bluetooth"],
|
"dependencies": ["bluetooth"],
|
||||||
"codeowners": ["@Jc2k", "@Ernst79"],
|
"codeowners": ["@Jc2k", "@Ernst79"],
|
||||||
"iot_class": "local_push"
|
"iot_class": "local_push"
|
||||||
|
|
|
@ -2473,7 +2473,7 @@ xbox-webapi==2.0.11
|
||||||
xboxapi==2.0.1
|
xboxapi==2.0.1
|
||||||
|
|
||||||
# homeassistant.components.xiaomi_ble
|
# homeassistant.components.xiaomi_ble
|
||||||
xiaomi-ble==0.6.1
|
xiaomi-ble==0.6.2
|
||||||
|
|
||||||
# homeassistant.components.knx
|
# homeassistant.components.knx
|
||||||
xknx==0.22.0
|
xknx==0.22.0
|
||||||
|
|
|
@ -1665,7 +1665,7 @@ wolf_smartset==0.1.11
|
||||||
xbox-webapi==2.0.11
|
xbox-webapi==2.0.11
|
||||||
|
|
||||||
# homeassistant.components.xiaomi_ble
|
# homeassistant.components.xiaomi_ble
|
||||||
xiaomi-ble==0.6.1
|
xiaomi-ble==0.6.2
|
||||||
|
|
||||||
# homeassistant.components.knx
|
# homeassistant.components.knx
|
||||||
xknx==0.22.0
|
xknx==0.22.0
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
"""Tests for the Bluetooth integration."""
|
"""Tests for the Bluetooth integration."""
|
||||||
|
import asyncio
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from unittest.mock import MagicMock, patch
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
@ -12,6 +13,7 @@ from homeassistant.components.bluetooth import (
|
||||||
UNAVAILABLE_TRACK_SECONDS,
|
UNAVAILABLE_TRACK_SECONDS,
|
||||||
BluetoothChange,
|
BluetoothChange,
|
||||||
BluetoothServiceInfo,
|
BluetoothServiceInfo,
|
||||||
|
async_process_advertisements,
|
||||||
async_track_unavailable,
|
async_track_unavailable,
|
||||||
models,
|
models,
|
||||||
)
|
)
|
||||||
|
@ -21,7 +23,7 @@ from homeassistant.components.bluetooth.const import (
|
||||||
)
|
)
|
||||||
from homeassistant.config_entries import ConfigEntryState
|
from homeassistant.config_entries import ConfigEntryState
|
||||||
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP
|
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP
|
||||||
from homeassistant.core import callback
|
from homeassistant.core import HomeAssistant, callback
|
||||||
from homeassistant.setup import async_setup_component
|
from homeassistant.setup import async_setup_component
|
||||||
from homeassistant.util import dt as dt_util
|
from homeassistant.util import dt as dt_util
|
||||||
|
|
||||||
|
@ -791,6 +793,92 @@ async def test_register_callback_by_address(
|
||||||
assert service_info.manufacturer_id == 89
|
assert service_info.manufacturer_id == 89
|
||||||
|
|
||||||
|
|
||||||
|
async def test_process_advertisements_bail_on_good_advertisement(
|
||||||
|
hass: HomeAssistant, mock_bleak_scanner_start, enable_bluetooth
|
||||||
|
):
|
||||||
|
"""Test as soon as we see a 'good' advertisement we return it."""
|
||||||
|
done = asyncio.Future()
|
||||||
|
|
||||||
|
def _callback(service_info: BluetoothServiceInfo) -> bool:
|
||||||
|
done.set_result(None)
|
||||||
|
return len(service_info.service_data) > 0
|
||||||
|
|
||||||
|
handle = hass.async_create_task(
|
||||||
|
async_process_advertisements(
|
||||||
|
hass, _callback, {"address": "aa:44:33:11:23:45"}, 5
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
while not done.done():
|
||||||
|
device = BLEDevice("aa:44:33:11:23:45", "wohand")
|
||||||
|
adv = AdvertisementData(
|
||||||
|
local_name="wohand",
|
||||||
|
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51a"],
|
||||||
|
manufacturer_data={89: b"\xd8.\xad\xcd\r\x85"},
|
||||||
|
service_data={"00000d00-0000-1000-8000-00805f9b34fa": b"H\x10c"},
|
||||||
|
)
|
||||||
|
|
||||||
|
_get_underlying_scanner()._callback(device, adv)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
result = await handle
|
||||||
|
assert result.name == "wohand"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_process_advertisements_ignore_bad_advertisement(
|
||||||
|
hass: HomeAssistant, mock_bleak_scanner_start, enable_bluetooth
|
||||||
|
):
|
||||||
|
"""Check that we ignore bad advertisements."""
|
||||||
|
done = asyncio.Event()
|
||||||
|
return_value = asyncio.Event()
|
||||||
|
|
||||||
|
device = BLEDevice("aa:44:33:11:23:45", "wohand")
|
||||||
|
adv = AdvertisementData(
|
||||||
|
local_name="wohand",
|
||||||
|
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51a"],
|
||||||
|
manufacturer_data={89: b"\xd8.\xad\xcd\r\x85"},
|
||||||
|
service_data={"00000d00-0000-1000-8000-00805f9b34fa": b""},
|
||||||
|
)
|
||||||
|
|
||||||
|
def _callback(service_info: BluetoothServiceInfo) -> bool:
|
||||||
|
done.set()
|
||||||
|
return return_value.is_set()
|
||||||
|
|
||||||
|
handle = hass.async_create_task(
|
||||||
|
async_process_advertisements(
|
||||||
|
hass, _callback, {"address": "aa:44:33:11:23:45"}, 5
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# The goal of this loop is to make sure that async_process_advertisements sees at least one
|
||||||
|
# callback that returns False
|
||||||
|
while not done.is_set():
|
||||||
|
_get_underlying_scanner()._callback(device, adv)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
# Set the return value and mutate the advertisement
|
||||||
|
# Check that scan ends and correct advertisement data is returned
|
||||||
|
return_value.set()
|
||||||
|
adv.service_data["00000d00-0000-1000-8000-00805f9b34fa"] = b"H\x10c"
|
||||||
|
_get_underlying_scanner()._callback(device, adv)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
result = await handle
|
||||||
|
assert result.service_data["00000d00-0000-1000-8000-00805f9b34fa"] == b"H\x10c"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_process_advertisements_timeout(
|
||||||
|
hass, mock_bleak_scanner_start, enable_bluetooth
|
||||||
|
):
|
||||||
|
"""Test we timeout if no advertisements at all."""
|
||||||
|
|
||||||
|
def _callback(service_info: BluetoothServiceInfo) -> bool:
|
||||||
|
return False
|
||||||
|
|
||||||
|
with pytest.raises(asyncio.TimeoutError):
|
||||||
|
await async_process_advertisements(hass, _callback, {}, 0)
|
||||||
|
|
||||||
|
|
||||||
async def test_wrapped_instance_with_filter(
|
async def test_wrapped_instance_with_filter(
|
||||||
hass, mock_bleak_scanner_start, enable_bluetooth
|
hass, mock_bleak_scanner_start, enable_bluetooth
|
||||||
):
|
):
|
||||||
|
|
|
@ -61,6 +61,18 @@ YLKG07YL_SERVICE_INFO = BluetoothServiceInfo(
|
||||||
source="local",
|
source="local",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
MISSING_PAYLOAD_ENCRYPTED = BluetoothServiceInfo(
|
||||||
|
name="LYWSD02MMC",
|
||||||
|
address="A4:C1:38:56:53:84",
|
||||||
|
rssi=-56,
|
||||||
|
manufacturer_data={},
|
||||||
|
service_data={
|
||||||
|
"0000fe95-0000-1000-8000-00805f9b34fb": b"0X[\x05\x02\x84\x53\x568\xc1\xa4\x08",
|
||||||
|
},
|
||||||
|
service_uuids=["0000fe95-0000-1000-8000-00805f9b34fb"],
|
||||||
|
source="local",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def make_advertisement(address: str, payload: bytes) -> BluetoothServiceInfo:
|
def make_advertisement(address: str, payload: bytes) -> BluetoothServiceInfo:
|
||||||
"""Make a dummy advertisement."""
|
"""Make a dummy advertisement."""
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
"""Test the Xiaomi config flow."""
|
"""Test the Xiaomi config flow."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
from homeassistant import config_entries
|
from homeassistant import config_entries
|
||||||
|
@ -9,9 +10,11 @@ from homeassistant.data_entry_flow import FlowResultType
|
||||||
from . import (
|
from . import (
|
||||||
JTYJGD03MI_SERVICE_INFO,
|
JTYJGD03MI_SERVICE_INFO,
|
||||||
LYWSDCGQ_SERVICE_INFO,
|
LYWSDCGQ_SERVICE_INFO,
|
||||||
|
MISSING_PAYLOAD_ENCRYPTED,
|
||||||
MMC_T201_1_SERVICE_INFO,
|
MMC_T201_1_SERVICE_INFO,
|
||||||
NOT_SENSOR_PUSH_SERVICE_INFO,
|
NOT_SENSOR_PUSH_SERVICE_INFO,
|
||||||
YLKG07YL_SERVICE_INFO,
|
YLKG07YL_SERVICE_INFO,
|
||||||
|
make_advertisement,
|
||||||
)
|
)
|
||||||
|
|
||||||
from tests.common import MockConfigEntry
|
from tests.common import MockConfigEntry
|
||||||
|
@ -38,6 +41,57 @@ async def test_async_step_bluetooth_valid_device(hass):
|
||||||
assert result2["result"].unique_id == "00:81:F9:DD:6F:C1"
|
assert result2["result"].unique_id == "00:81:F9:DD:6F:C1"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_async_step_bluetooth_valid_device_but_missing_payload(hass):
|
||||||
|
"""Test discovery via bluetooth with a valid device but missing payload."""
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.xiaomi_ble.config_flow.async_process_advertisements",
|
||||||
|
side_effect=asyncio.TimeoutError(),
|
||||||
|
):
|
||||||
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN,
|
||||||
|
context={"source": config_entries.SOURCE_BLUETOOTH},
|
||||||
|
data=MISSING_PAYLOAD_ENCRYPTED,
|
||||||
|
)
|
||||||
|
assert result["type"] == FlowResultType.ABORT
|
||||||
|
assert result["reason"] == "not_supported"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_async_step_bluetooth_valid_device_but_missing_payload_then_full(hass):
|
||||||
|
"""Test discovering a valid device. Payload is too short, but later we get full one."""
|
||||||
|
|
||||||
|
async def _async_process_advertisements(_hass, _callback, _matcher, _timeout):
|
||||||
|
service_info = make_advertisement(
|
||||||
|
"A4:C1:38:56:53:84",
|
||||||
|
b"XX\xe4\x16,\x84SV8\xc1\xa4+n\xf2\xe9\x12\x00\x00l\x88M\x9e",
|
||||||
|
)
|
||||||
|
assert _callback(service_info)
|
||||||
|
return service_info
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.xiaomi_ble.config_flow.async_process_advertisements",
|
||||||
|
_async_process_advertisements,
|
||||||
|
):
|
||||||
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN,
|
||||||
|
context={"source": config_entries.SOURCE_BLUETOOTH},
|
||||||
|
data=MISSING_PAYLOAD_ENCRYPTED,
|
||||||
|
)
|
||||||
|
assert result["type"] == FlowResultType.FORM
|
||||||
|
assert result["step_id"] == "get_encryption_key_4_5"
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.xiaomi_ble.async_setup_entry", return_value=True
|
||||||
|
):
|
||||||
|
result2 = await hass.config_entries.flow.async_configure(
|
||||||
|
result["flow_id"],
|
||||||
|
user_input={"bindkey": "a115210eed7a88e50ad52662e732a9fb"},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result2["type"] == FlowResultType.CREATE_ENTRY
|
||||||
|
assert result2["data"] == {"bindkey": "a115210eed7a88e50ad52662e732a9fb"}
|
||||||
|
assert result2["result"].unique_id == "A4:C1:38:56:53:84"
|
||||||
|
|
||||||
|
|
||||||
async def test_async_step_bluetooth_during_onboarding(hass):
|
async def test_async_step_bluetooth_during_onboarding(hass):
|
||||||
"""Test discovery via bluetooth during onboarding."""
|
"""Test discovery via bluetooth during onboarding."""
|
||||||
with patch(
|
with patch(
|
||||||
|
@ -287,6 +341,75 @@ async def test_async_step_user_with_found_devices(hass):
|
||||||
assert result2["result"].unique_id == "58:2D:34:35:93:21"
|
assert result2["result"].unique_id == "58:2D:34:35:93:21"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_async_step_user_short_payload(hass):
|
||||||
|
"""Test setup from service info cache with devices found but short payloads."""
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.xiaomi_ble.config_flow.async_discovered_service_info",
|
||||||
|
return_value=[MISSING_PAYLOAD_ENCRYPTED],
|
||||||
|
):
|
||||||
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN,
|
||||||
|
context={"source": config_entries.SOURCE_USER},
|
||||||
|
)
|
||||||
|
assert result["type"] == FlowResultType.FORM
|
||||||
|
assert result["step_id"] == "user"
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.xiaomi_ble.config_flow.async_process_advertisements",
|
||||||
|
side_effect=asyncio.TimeoutError(),
|
||||||
|
):
|
||||||
|
result2 = await hass.config_entries.flow.async_configure(
|
||||||
|
result["flow_id"],
|
||||||
|
user_input={"address": "A4:C1:38:56:53:84"},
|
||||||
|
)
|
||||||
|
assert result2["type"] == FlowResultType.ABORT
|
||||||
|
assert result2["reason"] == "not_supported"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_async_step_user_short_payload_then_full(hass):
|
||||||
|
"""Test setup from service info cache with devices found."""
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.xiaomi_ble.config_flow.async_discovered_service_info",
|
||||||
|
return_value=[MISSING_PAYLOAD_ENCRYPTED],
|
||||||
|
):
|
||||||
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN,
|
||||||
|
context={"source": config_entries.SOURCE_USER},
|
||||||
|
)
|
||||||
|
assert result["type"] == FlowResultType.FORM
|
||||||
|
assert result["step_id"] == "user"
|
||||||
|
|
||||||
|
async def _async_process_advertisements(_hass, _callback, _matcher, _timeout):
|
||||||
|
service_info = make_advertisement(
|
||||||
|
"A4:C1:38:56:53:84",
|
||||||
|
b"XX\xe4\x16,\x84SV8\xc1\xa4+n\xf2\xe9\x12\x00\x00l\x88M\x9e",
|
||||||
|
)
|
||||||
|
assert _callback(service_info)
|
||||||
|
return service_info
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.xiaomi_ble.config_flow.async_process_advertisements",
|
||||||
|
_async_process_advertisements,
|
||||||
|
):
|
||||||
|
result1 = await hass.config_entries.flow.async_configure(
|
||||||
|
result["flow_id"],
|
||||||
|
user_input={"address": "A4:C1:38:56:53:84"},
|
||||||
|
)
|
||||||
|
assert result1["type"] == FlowResultType.FORM
|
||||||
|
assert result1["step_id"] == "get_encryption_key_4_5"
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.xiaomi_ble.async_setup_entry", return_value=True
|
||||||
|
):
|
||||||
|
result2 = await hass.config_entries.flow.async_configure(
|
||||||
|
result["flow_id"],
|
||||||
|
user_input={"bindkey": "a115210eed7a88e50ad52662e732a9fb"},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result2["type"] == FlowResultType.CREATE_ENTRY
|
||||||
|
assert result2["title"] == "LYWSD02MMC"
|
||||||
|
assert result2["data"] == {"bindkey": "a115210eed7a88e50ad52662e732a9fb"}
|
||||||
|
|
||||||
|
|
||||||
async def test_async_step_user_with_found_devices_v4_encryption(hass):
|
async def test_async_step_user_with_found_devices_v4_encryption(hass):
|
||||||
"""Test setup from service info cache with devices found, with v4 encryption."""
|
"""Test setup from service info cache with devices found, with v4 encryption."""
|
||||||
with patch(
|
with patch(
|
||||||
|
|
Loading…
Reference in New Issue