From 6c3a0890c7bbf39d7b01d88cbaa3f8888127295a Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 22 Oct 2024 05:53:02 -1000 Subject: [PATCH] Add support for fetching bindkey from Mi cloud (#128394) --- .../components/xiaomi_ble/config_flow.py | 81 ++- .../components/xiaomi_ble/strings.json | 21 +- .../components/xiaomi_ble/test_config_flow.py | 516 +++++++++++++++--- 3 files changed, 521 insertions(+), 97 deletions(-) diff --git a/homeassistant/components/xiaomi_ble/config_flow.py b/homeassistant/components/xiaomi_ble/config_flow.py index 7a24763c011..df2de381d39 100644 --- a/homeassistant/components/xiaomi_ble/config_flow.py +++ b/homeassistant/components/xiaomi_ble/config_flow.py @@ -4,10 +4,16 @@ from __future__ import annotations from collections.abc import Mapping import dataclasses +import logging from typing import Any import voluptuous as vol -from xiaomi_ble import XiaomiBluetoothDeviceData as DeviceData +from xiaomi_ble import ( + XiaomiBluetoothDeviceData as DeviceData, + XiaomiCloudException, + XiaomiCloudInvalidAuthenticationException, + XiaomiCloudTokenFetch, +) from xiaomi_ble.parser import EncryptionScheme from homeassistant.components import onboarding @@ -18,13 +24,17 @@ from homeassistant.components.bluetooth import ( async_process_advertisements, ) from homeassistant.config_entries import SOURCE_REAUTH, ConfigFlow, ConfigFlowResult -from homeassistant.const import CONF_ADDRESS +from homeassistant.const import CONF_ADDRESS, CONF_PASSWORD, CONF_USERNAME +from homeassistant.data_entry_flow import AbortFlow +from homeassistant.helpers.aiohttp_client import async_get_clientsession from .const import DOMAIN # How long to wait for additional advertisement packets if we don't have the right ones ADDITIONAL_DISCOVERY_TIMEOUT = 60 +_LOGGER = logging.getLogger(__name__) + @dataclasses.dataclass class Discovery: @@ -104,7 +114,7 @@ class XiaomiConfigFlow(ConfigFlow, domain=DOMAIN): if device.encryption_scheme == EncryptionScheme.MIBEACON_LEGACY: return await self.async_step_get_encryption_key_legacy() if device.encryption_scheme == EncryptionScheme.MIBEACON_4_5: - return await self.async_step_get_encryption_key_4_5() + return await self.async_step_get_encryption_key_4_5_choose_method() return await self.async_step_bluetooth_confirm() async def async_step_get_encryption_key_legacy( @@ -175,6 +185,67 @@ class XiaomiConfigFlow(ConfigFlow, domain=DOMAIN): errors=errors, ) + async def async_step_cloud_auth( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Handle the cloud auth step.""" + assert self._discovery_info + + errors: dict[str, str] = {} + description_placeholders: dict[str, str] = {} + if user_input is not None: + session = async_get_clientsession(self.hass) + fetcher = XiaomiCloudTokenFetch( + user_input[CONF_USERNAME], user_input[CONF_PASSWORD], session + ) + try: + device_details = await fetcher.get_device_info( + self._discovery_info.address + ) + except XiaomiCloudInvalidAuthenticationException as ex: + _LOGGER.debug("Authentication failed: %s", ex, exc_info=True) + errors = {"base": "auth_failed"} + description_placeholders = {"error_detail": str(ex)} + except XiaomiCloudException as ex: + _LOGGER.debug("Failed to connect to MI API: %s", ex, exc_info=True) + raise AbortFlow( + "api_error", description_placeholders={"error_detail": str(ex)} + ) from ex + else: + if device_details: + return await self.async_step_get_encryption_key_4_5( + {"bindkey": device_details.bindkey} + ) + errors = {"base": "api_device_not_found"} + + user_input = user_input or {} + return self.async_show_form( + step_id="cloud_auth", + errors=errors, + data_schema=vol.Schema( + { + vol.Required( + CONF_USERNAME, default=user_input.get(CONF_USERNAME) + ): str, + vol.Required(CONF_PASSWORD): str, + } + ), + description_placeholders={ + **self.context["title_placeholders"], + **description_placeholders, + }, + ) + + async def async_step_get_encryption_key_4_5_choose_method( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Choose method to get the bind key for a version 4/5 device.""" + return self.async_show_menu( + step_id="get_encryption_key_4_5_choose_method", + menu_options=["cloud_auth", "get_encryption_key_4_5"], + description_placeholders=self.context["title_placeholders"], + ) + async def async_step_bluetooth_confirm( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: @@ -231,7 +302,7 @@ class XiaomiConfigFlow(ConfigFlow, domain=DOMAIN): return await self.async_step_get_encryption_key_legacy() if discovery.device.encryption_scheme == EncryptionScheme.MIBEACON_4_5: - return await self.async_step_get_encryption_key_4_5() + return await self.async_step_get_encryption_key_4_5_choose_method() return self._async_get_or_create_entry() @@ -273,7 +344,7 @@ class XiaomiConfigFlow(ConfigFlow, domain=DOMAIN): return await self.async_step_get_encryption_key_legacy() if device.encryption_scheme == EncryptionScheme.MIBEACON_4_5: - return await self.async_step_get_encryption_key_4_5() + return await self.async_step_get_encryption_key_4_5_choose_method() # Otherwise there wasn't actually encryption so abort return self.async_abort(reason="reauth_successful") diff --git a/homeassistant/components/xiaomi_ble/strings.json b/homeassistant/components/xiaomi_ble/strings.json index 048c9bd92e2..4ea4a47c61e 100644 --- a/homeassistant/components/xiaomi_ble/strings.json +++ b/homeassistant/components/xiaomi_ble/strings.json @@ -25,18 +25,35 @@ "data": { "bindkey": "Bindkey" } + }, + "cloud_auth": { + "description": "Please provide your Mi app username and password. This data won't be saved and only used to retrieve the device encryption key. Usernames and passwords are case sensitive.", + "data": { + "username": "[%key:common::config_flow::data::username%]", + "password": "[%key:common::config_flow::data::password%]" + } + }, + "get_encryption_key_4_5_choose_method": { + "description": "A Mi device can be set up in Home Assistant in two different ways.\n\nYou can enter the bindkey yourself, or Home Assistant can import them from your Mi account.", + "menu_options": { + "cloud_auth": "Mi account (recommended)", + "get_encryption_key_4_5": "Enter encryption key manually" + } } }, "error": { "decryption_failed": "The provided bindkey did not work, sensor data could not be decrypted. Please check it and try again.", "expected_24_characters": "Expected a 24 character hexadecimal bindkey.", - "expected_32_characters": "Expected a 32 character hexadecimal bindkey." + "expected_32_characters": "Expected a 32 character hexadecimal bindkey.", + "auth_failed": "Authentication failed: {error_detail}", + "api_device_not_found": "The device was not found in your Mi account." }, "abort": { "reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]", "no_devices_found": "[%key:common::config_flow::abort::no_devices_found%]", "already_in_progress": "[%key:common::config_flow::abort::already_in_progress%]", - "already_configured": "[%key:common::config_flow::abort::already_configured_device%]" + "already_configured": "[%key:common::config_flow::abort::already_configured_device%]", + "api_error": "Error while communicating with Mi API: {error_detail}" } }, "device_automation": { diff --git a/tests/components/xiaomi_ble/test_config_flow.py b/tests/components/xiaomi_ble/test_config_flow.py index f690665608b..e25ac939a53 100644 --- a/tests/components/xiaomi_ble/test_config_flow.py +++ b/tests/components/xiaomi_ble/test_config_flow.py @@ -2,7 +2,12 @@ from unittest.mock import patch -from xiaomi_ble import XiaomiBluetoothDeviceData as DeviceData +from xiaomi_ble import ( + XiaomiBluetoothDeviceData as DeviceData, + XiaomiCloudBLEDevice, + XiaomiCloudException, + XiaomiCloudInvalidAuthenticationException, +) from homeassistant import config_entries from homeassistant.components.bluetooth import BluetoothChange @@ -96,20 +101,25 @@ async def test_async_step_bluetooth_valid_device_but_missing_payload_then_full( context={"source": config_entries.SOURCE_BLUETOOTH}, data=MISSING_PAYLOAD_ENCRYPTED, ) - assert result["type"] is FlowResultType.FORM - assert result["step_id"] == "get_encryption_key_4_5" + assert result["type"] is FlowResultType.MENU + assert result["step_id"] == "get_encryption_key_4_5_choose_method" + + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={"next_step_id": "get_encryption_key_4_5"}, + ) with patch( "homeassistant.components.xiaomi_ble.async_setup_entry", return_value=True ): - result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], user_input={"bindkey": "a115210eed7a88e50ad52662e732a9fb"}, ) - assert result2["type"] is FlowResultType.CREATE_ENTRY - assert result2["data"] == {"bindkey": "a115210eed7a88e50ad52662e732a9fb"} - assert result2["result"].unique_id == "A4:C1:38:56:53:84" + assert result3["type"] is FlowResultType.CREATE_ENTRY + assert result3["data"] == {"bindkey": "a115210eed7a88e50ad52662e732a9fb"} + assert result3["result"].unique_id == "A4:C1:38:56:53:84" async def test_async_step_bluetooth_during_onboarding(hass: HomeAssistant) -> None: @@ -239,21 +249,244 @@ async def test_async_step_bluetooth_valid_device_v4_encryption( context={"source": config_entries.SOURCE_BLUETOOTH}, data=JTYJGD03MI_SERVICE_INFO, ) - assert result["type"] is FlowResultType.FORM - assert result["step_id"] == "get_encryption_key_4_5" + assert result["type"] is FlowResultType.MENU + assert result["step_id"] == "get_encryption_key_4_5_choose_method" + + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={"next_step_id": "get_encryption_key_4_5"}, + ) with patch( "homeassistant.components.xiaomi_ble.async_setup_entry", return_value=True ): - result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], user_input={"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"}, ) - assert result2["type"] is FlowResultType.CREATE_ENTRY - assert result2["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" - assert result2["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} - assert result2["result"].unique_id == "54:EF:44:E3:9C:BC" + assert result3["type"] is FlowResultType.CREATE_ENTRY + assert result3["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" + assert result3["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} + assert result3["result"].unique_id == "54:EF:44:E3:9C:BC" + + +async def test_bluetooth_discovery_device_v4_encryption_from_cloud( + hass: HomeAssistant, +) -> None: + """Test discovery via bluetooth with a valid v4 device, with auth from cloud.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_BLUETOOTH}, + data=JTYJGD03MI_SERVICE_INFO, + ) + assert result["type"] is FlowResultType.MENU + assert result["step_id"] == "get_encryption_key_4_5_choose_method" + + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={"next_step_id": "cloud_auth"}, + ) + device = XiaomiCloudBLEDevice( + name="x", + mac="54:EF:44:E3:9C:BC", + bindkey="5b51a7c91cde6707c9ef18dfda143a58", + ) + with patch( + "homeassistant.components.xiaomi_ble.config_flow.XiaomiCloudTokenFetch.get_device_info", + return_value=device, + ): + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], + user_input={"username": "x@x.x", "password": "x"}, + ) + + assert result3["type"] is FlowResultType.CREATE_ENTRY + assert result3["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" + assert result3["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} + assert result3["result"].unique_id == "54:EF:44:E3:9C:BC" + + +async def test_bluetooth_discovery_device_v4_encryption_from_cloud_wrong_key( + hass: HomeAssistant, +) -> None: + """Test discovery via bluetooth with a valid v4 device, with wrong auth from cloud.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_BLUETOOTH}, + data=JTYJGD03MI_SERVICE_INFO, + ) + assert result["type"] is FlowResultType.MENU + assert result["step_id"] == "get_encryption_key_4_5_choose_method" + + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={"next_step_id": "cloud_auth"}, + ) + + device = XiaomiCloudBLEDevice( + name="x", + mac="54:EF:44:E3:9C:BC", + bindkey="aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + ) + with patch( + "homeassistant.components.xiaomi_ble.config_flow.XiaomiCloudTokenFetch.get_device_info", + return_value=device, + ): + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], + user_input={"username": "x@x.x", "password": "x"}, + ) + + assert result3["type"] is FlowResultType.FORM + assert result3["step_id"] == "get_encryption_key_4_5" + assert result3["errors"]["bindkey"] == "decryption_failed" + + # Verify we can fallback to manual key + with patch( + "homeassistant.components.xiaomi_ble.async_setup_entry", return_value=True + ): + result4 = await hass.config_entries.flow.async_configure( + result3["flow_id"], + user_input={"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"}, + ) + + assert result4["type"] is FlowResultType.CREATE_ENTRY + assert result4["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" + assert result4["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} + assert result4["result"].unique_id == "54:EF:44:E3:9C:BC" + + +async def test_bluetooth_discovery_incorrect_cloud_account( + hass: HomeAssistant, +) -> None: + """Test discovery via bluetooth with incorrect cloud account.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_BLUETOOTH}, + data=JTYJGD03MI_SERVICE_INFO, + ) + assert result["type"] is FlowResultType.MENU + assert result["step_id"] == "get_encryption_key_4_5_choose_method" + + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={"next_step_id": "cloud_auth"}, + ) + + with patch( + "homeassistant.components.xiaomi_ble.config_flow.XiaomiCloudTokenFetch.get_device_info", + return_value=None, + ): + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], + user_input={"username": "wrong@wrong.wrong", "password": "correct"}, + ) + + assert result3["type"] is FlowResultType.FORM + assert result3["step_id"] == "cloud_auth" + assert result3["errors"]["base"] == "api_device_not_found" + + device = XiaomiCloudBLEDevice( + name="x", + mac="54:EF:44:E3:9C:BC", + bindkey="5b51a7c91cde6707c9ef18dfda143a58", + ) + # Verify we can try again with the correct account + with patch( + "homeassistant.components.xiaomi_ble.config_flow.XiaomiCloudTokenFetch.get_device_info", + return_value=device, + ): + result4 = await hass.config_entries.flow.async_configure( + result3["flow_id"], + user_input={"username": "correct@correct.correct", "password": "correct"}, + ) + + assert result4["type"] is FlowResultType.CREATE_ENTRY + assert result4["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" + assert result4["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} + assert result4["result"].unique_id == "54:EF:44:E3:9C:BC" + + +async def test_bluetooth_discovery_incorrect_cloud_auth( + hass: HomeAssistant, +) -> None: + """Test discovery via bluetooth with incorrect cloud auth.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_BLUETOOTH}, + data=JTYJGD03MI_SERVICE_INFO, + ) + assert result["type"] is FlowResultType.MENU + assert result["step_id"] == "get_encryption_key_4_5_choose_method" + + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={"next_step_id": "cloud_auth"}, + ) + + with patch( + "homeassistant.components.xiaomi_ble.config_flow.XiaomiCloudTokenFetch.get_device_info", + side_effect=XiaomiCloudInvalidAuthenticationException, + ): + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], + user_input={"username": "x@x.x", "password": "wrong"}, + ) + + assert result3["type"] is FlowResultType.FORM + assert result3["step_id"] == "cloud_auth" + assert result3["errors"]["base"] == "auth_failed" + + device = XiaomiCloudBLEDevice( + name="x", + mac="54:EF:44:E3:9C:BC", + bindkey="5b51a7c91cde6707c9ef18dfda143a58", + ) + # Verify we can try again with the correct password + with patch( + "homeassistant.components.xiaomi_ble.config_flow.XiaomiCloudTokenFetch.get_device_info", + return_value=device, + ): + result4 = await hass.config_entries.flow.async_configure( + result3["flow_id"], + user_input={"username": "x@x.x", "password": "correct"}, + ) + + assert result4["type"] is FlowResultType.CREATE_ENTRY + assert result4["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" + assert result4["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} + assert result4["result"].unique_id == "54:EF:44:E3:9C:BC" + + +async def test_bluetooth_discovery_cloud_offline( + hass: HomeAssistant, +) -> None: + """Test discovery via bluetooth when the cloud is offline.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_BLUETOOTH}, + data=JTYJGD03MI_SERVICE_INFO, + ) + assert result["type"] is FlowResultType.MENU + assert result["step_id"] == "get_encryption_key_4_5_choose_method" + + result2 = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={"next_step_id": "cloud_auth"}, + ) + + with patch( + "homeassistant.components.xiaomi_ble.config_flow.XiaomiCloudTokenFetch.get_device_info", + side_effect=XiaomiCloudException, + ): + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], + user_input={"username": "x@x.x", "password": "wrong"}, + ) + + assert result3["type"] is FlowResultType.ABORT + assert result3["reason"] == "api_error" async def test_async_step_bluetooth_valid_device_v4_encryption_wrong_key( @@ -265,31 +498,36 @@ async def test_async_step_bluetooth_valid_device_v4_encryption_wrong_key( context={"source": config_entries.SOURCE_BLUETOOTH}, data=JTYJGD03MI_SERVICE_INFO, ) - assert result["type"] is FlowResultType.FORM - assert result["step_id"] == "get_encryption_key_4_5" + assert result["type"] is FlowResultType.MENU + assert result["step_id"] == "get_encryption_key_4_5_choose_method" result2 = await hass.config_entries.flow.async_configure( result["flow_id"], + user_input={"next_step_id": "get_encryption_key_4_5"}, + ) + + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], user_input={"bindkey": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}, ) - assert result2["type"] is FlowResultType.FORM - assert result2["step_id"] == "get_encryption_key_4_5" - assert result2["errors"]["bindkey"] == "decryption_failed" + assert result3["type"] is FlowResultType.FORM + assert result3["step_id"] == "get_encryption_key_4_5" + assert result3["errors"]["bindkey"] == "decryption_failed" # Test can finish flow with patch( "homeassistant.components.xiaomi_ble.async_setup_entry", return_value=True ): - result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], + result4 = await hass.config_entries.flow.async_configure( + result3["flow_id"], user_input={"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"}, ) - assert result2["type"] is FlowResultType.CREATE_ENTRY - assert result2["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" - assert result2["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} - assert result2["result"].unique_id == "54:EF:44:E3:9C:BC" + assert result4["type"] is FlowResultType.CREATE_ENTRY + assert result4["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" + assert result4["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} + assert result4["result"].unique_id == "54:EF:44:E3:9C:BC" async def test_async_step_bluetooth_valid_device_v4_encryption_wrong_key_length( @@ -301,31 +539,36 @@ async def test_async_step_bluetooth_valid_device_v4_encryption_wrong_key_length( context={"source": config_entries.SOURCE_BLUETOOTH}, data=JTYJGD03MI_SERVICE_INFO, ) - assert result["type"] is FlowResultType.FORM - assert result["step_id"] == "get_encryption_key_4_5" + assert result["type"] is FlowResultType.MENU + assert result["step_id"] == "get_encryption_key_4_5_choose_method" result2 = await hass.config_entries.flow.async_configure( result["flow_id"], + user_input={"next_step_id": "get_encryption_key_4_5"}, + ) + + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], user_input={"bindkey": "5b51a7c91cde6707c9ef18fda143a58"}, ) - assert result2["type"] is FlowResultType.FORM - assert result2["step_id"] == "get_encryption_key_4_5" - assert result2["errors"]["bindkey"] == "expected_32_characters" + assert result3["type"] is FlowResultType.FORM + assert result3["step_id"] == "get_encryption_key_4_5" + assert result3["errors"]["bindkey"] == "expected_32_characters" # Test can finish flow with patch( "homeassistant.components.xiaomi_ble.async_setup_entry", return_value=True ): - result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], + result4 = await hass.config_entries.flow.async_configure( + result3["flow_id"], user_input={"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"}, ) - assert result2["type"] is FlowResultType.CREATE_ENTRY - assert result2["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" - assert result2["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} - assert result2["result"].unique_id == "54:EF:44:E3:9C:BC" + assert result4["type"] is FlowResultType.CREATE_ENTRY + assert result4["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" + assert result4["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} + assert result4["result"].unique_id == "54:EF:44:E3:9C:BC" async def test_async_step_bluetooth_not_xiaomi(hass: HomeAssistant) -> None: @@ -457,20 +700,25 @@ async def test_async_step_user_short_payload_then_full(hass: HomeAssistant) -> N result["flow_id"], user_input={"address": "A4:C1:38:56:53:84"}, ) - assert result1["type"] is FlowResultType.FORM - assert result1["step_id"] == "get_encryption_key_4_5" + assert result1["type"] is FlowResultType.MENU + assert result1["step_id"] == "get_encryption_key_4_5_choose_method" + + result2 = await hass.config_entries.flow.async_configure( + result1["flow_id"], + user_input={"next_step_id": "get_encryption_key_4_5"}, + ) with patch( "homeassistant.components.xiaomi_ble.async_setup_entry", return_value=True ): - result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], user_input={"bindkey": "a115210eed7a88e50ad52662e732a9fb"}, ) - assert result2["type"] is FlowResultType.CREATE_ENTRY - assert result2["title"] == "Temperature/Humidity Sensor 5384 (LYWSD03MMC)" - assert result2["data"] == {"bindkey": "a115210eed7a88e50ad52662e732a9fb"} + assert result3["type"] is FlowResultType.CREATE_ENTRY + assert result3["title"] == "Temperature/Humidity Sensor 5384 (LYWSD03MMC)" + assert result3["data"] == {"bindkey": "a115210eed7a88e50ad52662e732a9fb"} async def test_async_step_user_with_found_devices_v4_encryption( @@ -492,21 +740,26 @@ async def test_async_step_user_with_found_devices_v4_encryption( result["flow_id"], user_input={"address": "54:EF:44:E3:9C:BC"}, ) - assert result1["type"] is FlowResultType.FORM - assert result1["step_id"] == "get_encryption_key_4_5" + assert result1["type"] is FlowResultType.MENU + assert result1["step_id"] == "get_encryption_key_4_5_choose_method" + + result2 = await hass.config_entries.flow.async_configure( + result1["flow_id"], + user_input={"next_step_id": "get_encryption_key_4_5"}, + ) with patch( "homeassistant.components.xiaomi_ble.async_setup_entry", return_value=True ): - result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], user_input={"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"}, ) - assert result2["type"] is FlowResultType.CREATE_ENTRY - assert result2["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" - assert result2["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} - assert result2["result"].unique_id == "54:EF:44:E3:9C:BC" + assert result3["type"] is FlowResultType.CREATE_ENTRY + assert result3["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" + assert result3["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} + assert result3["result"].unique_id == "54:EF:44:E3:9C:BC" async def test_async_step_user_with_found_devices_v4_encryption_wrong_key( @@ -530,31 +783,36 @@ async def test_async_step_user_with_found_devices_v4_encryption_wrong_key( result["flow_id"], user_input={"address": "54:EF:44:E3:9C:BC"}, ) - assert result1["type"] is FlowResultType.FORM - assert result1["step_id"] == "get_encryption_key_4_5" + assert result1["type"] is FlowResultType.MENU + assert result1["step_id"] == "get_encryption_key_4_5_choose_method" + + result2 = await hass.config_entries.flow.async_configure( + result1["flow_id"], + user_input={"next_step_id": "get_encryption_key_4_5"}, + ) # Try an incorrect key - result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], user_input={"bindkey": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}, ) - assert result2["type"] is FlowResultType.FORM - assert result2["step_id"] == "get_encryption_key_4_5" - assert result2["errors"]["bindkey"] == "decryption_failed" + assert result3["type"] is FlowResultType.FORM + assert result3["step_id"] == "get_encryption_key_4_5" + assert result3["errors"]["bindkey"] == "decryption_failed" # Check can still finish flow with patch( "homeassistant.components.xiaomi_ble.async_setup_entry", return_value=True ): - result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], + result4 = await hass.config_entries.flow.async_configure( + result3["flow_id"], user_input={"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"}, ) - assert result2["type"] is FlowResultType.CREATE_ENTRY - assert result2["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" - assert result2["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} - assert result2["result"].unique_id == "54:EF:44:E3:9C:BC" + assert result4["type"] is FlowResultType.CREATE_ENTRY + assert result4["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" + assert result4["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} + assert result4["result"].unique_id == "54:EF:44:E3:9C:BC" async def test_async_step_user_with_found_devices_v4_encryption_wrong_key_length( @@ -578,33 +836,38 @@ async def test_async_step_user_with_found_devices_v4_encryption_wrong_key_length result["flow_id"], user_input={"address": "54:EF:44:E3:9C:BC"}, ) - assert result1["type"] is FlowResultType.FORM - assert result1["step_id"] == "get_encryption_key_4_5" + assert result1["type"] is FlowResultType.MENU + assert result1["step_id"] == "get_encryption_key_4_5_choose_method" + + result2 = await hass.config_entries.flow.async_configure( + result1["flow_id"], + user_input={"next_step_id": "get_encryption_key_4_5"}, + ) # Try an incorrect key - result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], user_input={"bindkey": "5b51a7c91cde6707c9ef1dfda143a58"}, ) - assert result2["type"] is FlowResultType.FORM - assert result2["type"] is FlowResultType.FORM - assert result2["step_id"] == "get_encryption_key_4_5" - assert result2["errors"]["bindkey"] == "expected_32_characters" + assert result3["type"] is FlowResultType.FORM + assert result3["type"] is FlowResultType.FORM + assert result3["step_id"] == "get_encryption_key_4_5" + assert result3["errors"]["bindkey"] == "expected_32_characters" # Check can still finish flow with patch( "homeassistant.components.xiaomi_ble.async_setup_entry", return_value=True ): - result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], + result4 = await hass.config_entries.flow.async_configure( + result3["flow_id"], user_input={"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"}, ) - assert result2["type"] is FlowResultType.CREATE_ENTRY - assert result2["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" - assert result2["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} - assert result2["result"].unique_id == "54:EF:44:E3:9C:BC" + assert result4["type"] is FlowResultType.CREATE_ENTRY + assert result4["title"] == "Smoke Detector 9CBC (JTYJGD03MI)" + assert result4["data"] == {"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"} + assert result4["result"].unique_id == "54:EF:44:E3:9C:BC" async def test_async_step_user_with_found_devices_legacy_encryption( @@ -1003,14 +1266,19 @@ async def test_async_step_reauth_v4(hass: HomeAssistant) -> None: assert len(results) == 1 result = results[0] - assert result["step_id"] == "get_encryption_key_4_5" + assert result["step_id"] == "get_encryption_key_4_5_choose_method" result2 = await hass.config_entries.flow.async_configure( result["flow_id"], + user_input={"next_step_id": "get_encryption_key_4_5"}, + ) + + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], user_input={"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"}, ) - assert result2["type"] is FlowResultType.ABORT - assert result2["reason"] == "reauth_successful" + assert result3["type"] is FlowResultType.ABORT + assert result3["reason"] == "reauth_successful" async def test_async_step_reauth_v4_wrong_key(hass: HomeAssistant) -> None: @@ -1052,22 +1320,90 @@ async def test_async_step_reauth_v4_wrong_key(hass: HomeAssistant) -> None: assert len(results) == 1 result = results[0] - assert result["step_id"] == "get_encryption_key_4_5" + assert result["step_id"] == "get_encryption_key_4_5_choose_method" result2 = await hass.config_entries.flow.async_configure( result["flow_id"], + user_input={"next_step_id": "get_encryption_key_4_5"}, + ) + + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], user_input={"bindkey": "5b51a7c91cde6707c9ef18dada143a58"}, ) - assert result2["type"] is FlowResultType.FORM - assert result2["step_id"] == "get_encryption_key_4_5" - assert result2["errors"]["bindkey"] == "decryption_failed" + assert result3["type"] is FlowResultType.FORM + assert result3["step_id"] == "get_encryption_key_4_5" + assert result3["errors"]["bindkey"] == "decryption_failed" + + result4 = await hass.config_entries.flow.async_configure( + result3["flow_id"], + user_input={"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"}, + ) + assert result4["type"] is FlowResultType.ABORT + assert result4["reason"] == "reauth_successful" + + +async def test_async_step_reauth_v4_from_cloud(hass: HomeAssistant) -> None: + """Test reauth with a v4 key from the cloud.""" + entry = MockConfigEntry( + domain=DOMAIN, + unique_id="54:EF:44:E3:9C:BC", + ) + entry.add_to_hass(hass) + saved_callback = None + + def _async_register_callback(_hass, _callback, _matcher, _mode): + nonlocal saved_callback + saved_callback = _callback + return lambda: None + + with patch( + "homeassistant.components.bluetooth.update_coordinator.async_register_callback", + _async_register_callback, + ): + assert await hass.config_entries.async_setup(entry.entry_id) + await hass.async_block_till_done() + + assert len(hass.states.async_all()) == 0 + + # WARNING: This test data is synthetic, rather than captured from a real device + # obj type is 0x1310, payload len is 0x2 and payload is 0x6000 + saved_callback( + make_advertisement( + "54:EF:44:E3:9C:BC", + b"XY\x97\tf\xbc\x9c\xe3D\xefT\x01\x08\x12\x05\x00\x00\x00q^\xbe\x90", + ), + BluetoothChange.ADVERTISEMENT, + ) + + await hass.async_block_till_done() + + results = hass.config_entries.flow.async_progress() + assert len(results) == 1 + result = results[0] + + assert result["step_id"] == "get_encryption_key_4_5_choose_method" result2 = await hass.config_entries.flow.async_configure( result["flow_id"], - user_input={"bindkey": "5b51a7c91cde6707c9ef18dfda143a58"}, + user_input={"next_step_id": "cloud_auth"}, ) - assert result2["type"] is FlowResultType.ABORT - assert result2["reason"] == "reauth_successful" + device = XiaomiCloudBLEDevice( + name="x", + mac="54:EF:44:E3:9C:BC", + bindkey="5b51a7c91cde6707c9ef18dfda143a58", + ) + with patch( + "homeassistant.components.xiaomi_ble.config_flow.XiaomiCloudTokenFetch.get_device_info", + return_value=device, + ): + result3 = await hass.config_entries.flow.async_configure( + result2["flow_id"], + user_input={"username": "x@x.x", "password": "x"}, + ) + + assert result3["type"] is FlowResultType.ABORT + assert result3["reason"] == "reauth_successful" async def test_async_step_reauth_abort_early(hass: HomeAssistant) -> None: