Add support for bleak 0.21 (#97212)

pull/97255/head
J. Nick Koston 2023-07-25 12:30:54 -05:00 committed by GitHub
parent 213a1690f3
commit 6ae79524bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 80 additions and 3 deletions

View File

@ -6,13 +6,18 @@ from collections.abc import Callable
import contextlib
from dataclasses import dataclass
from functools import partial
import inspect
import logging
from typing import TYPE_CHECKING, Any, Final
from bleak import BleakClient, BleakError
from bleak.backends.client import BaseBleakClient, get_platform_client_backend_type
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementDataCallback, BaseBleakScanner
from bleak.backends.scanner import (
AdvertisementData,
AdvertisementDataCallback,
BaseBleakScanner,
)
from bleak_retry_connector import (
NO_RSSI_VALUE,
ble_device_description,
@ -58,6 +63,7 @@ class HaBleakScannerWrapper(BaseBleakScanner):
self._detection_cancel: CALLBACK_TYPE | None = None
self._mapped_filters: dict[str, set[str]] = {}
self._advertisement_data_callback: AdvertisementDataCallback | None = None
self._background_tasks: set[asyncio.Task] = set()
remapped_kwargs = {
"detection_callback": detection_callback,
"service_uuids": service_uuids or [],
@ -128,12 +134,24 @@ class HaBleakScannerWrapper(BaseBleakScanner):
"""Set up the detection callback."""
if self._advertisement_data_callback is None:
return
callback = self._advertisement_data_callback
self._cancel_callback()
super().register_detection_callback(self._advertisement_data_callback)
assert models.MANAGER is not None
assert self._callback is not None
if not inspect.iscoroutinefunction(callback):
detection_callback = callback
else:
def detection_callback(
ble_device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
task = asyncio.create_task(callback(ble_device, advertisement_data))
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
self._detection_cancel = models.MANAGER.async_register_bleak_callback(
self._callback, self._mapped_filters
detection_callback, self._mapped_filters
)
def __del__(self) -> None:

View File

@ -2386,6 +2386,65 @@ async def test_wrapped_instance_with_service_uuids(
assert len(detected) == 2
async def test_wrapped_instance_with_service_uuids_with_coro_callback(
hass: HomeAssistant, mock_bleak_scanner_start: MagicMock, enable_bluetooth: None
) -> None:
"""Test consumers can use the wrapped instance with a service_uuids list as if it was normal BleakScanner.
Verify that coro callbacks are supported.
"""
with patch(
"homeassistant.components.bluetooth.async_get_bluetooth", return_value=[]
):
await async_setup_with_default_adapter(hass)
with patch.object(hass.config_entries.flow, "async_init"):
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
detected = []
async def _device_detected(
device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Handle a detected device."""
detected.append((device, advertisement_data))
switchbot_device = generate_ble_device("44:44:33:11:23:45", "wohand")
switchbot_adv = generate_advertisement_data(
local_name="wohand",
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"],
manufacturer_data={89: b"\xd8.\xad\xcd\r\x85"},
service_data={"00000d00-0000-1000-8000-00805f9b34fb": b"H\x10c"},
)
switchbot_adv_2 = generate_advertisement_data(
local_name="wohand",
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"],
manufacturer_data={89: b"\xd8.\xad\xcd\r\x84"},
service_data={"00000d00-0000-1000-8000-00805f9b34fb": b"H\x10c"},
)
empty_device = generate_ble_device("11:22:33:44:55:66", "empty")
empty_adv = generate_advertisement_data(local_name="empty")
assert _get_manager() is not None
scanner = HaBleakScannerWrapper(
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"]
)
scanner.register_detection_callback(_device_detected)
inject_advertisement(hass, switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv_2)
await hass.async_block_till_done()
assert len(detected) == 2
# The UUIDs list we created in the wrapped scanner with should be respected
# and we should not get another callback
inject_advertisement(hass, empty_device, empty_adv)
assert len(detected) == 2
async def test_wrapped_instance_with_broken_callbacks(
hass: HomeAssistant, mock_bleak_scanner_start: MagicMock, enable_bluetooth: None
) -> None: