core/tests/components/bluetooth/test_base_scanner.py

523 lines
18 KiB
Python

"""Tests for the Bluetooth base scanner models."""
from __future__ import annotations
from datetime import timedelta
import time
from unittest.mock import patch
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from homeassistant.components import bluetooth
from homeassistant.components.bluetooth import (
BaseHaRemoteScanner,
HaBluetoothConnector,
storage,
)
from homeassistant.components.bluetooth.advertisement_tracker import (
TRACKER_BUFFERING_WOBBLE_SECONDS,
)
from homeassistant.components.bluetooth.const import (
CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS,
FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS,
UNAVAILABLE_TRACK_SECONDS,
)
from homeassistant.core import callback
from homeassistant.helpers.json import json_loads
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from . import MockBleakClient, _get_manager, generate_advertisement_data
from tests.common import async_fire_time_changed, load_fixture
async def test_remote_scanner(hass, enable_bluetooth):
"""Test the remote scanner base class merges advertisement_data."""
manager = _get_manager()
switchbot_device = BLEDevice(
"44:44:33:11:23:45",
"wohand",
{},
rssi=-100,
)
switchbot_device_adv = generate_advertisement_data(
local_name="wohand",
service_uuids=["050a021a-0000-1000-8000-00805f9b34fb"],
service_data={"050a021a-0000-1000-8000-00805f9b34fb": b"\n\xff"},
manufacturer_data={1: b"\x01"},
rssi=-100,
)
switchbot_device_2 = BLEDevice(
"44:44:33:11:23:45",
"w",
{},
rssi=-100,
)
switchbot_device_adv_2 = generate_advertisement_data(
local_name="wohand",
service_uuids=["00000001-0000-1000-8000-00805f9b34fb"],
service_data={"00000001-0000-1000-8000-00805f9b34fb": b"\n\xff"},
manufacturer_data={1: b"\x01", 2: b"\x02"},
rssi=-100,
)
class FakeScanner(BaseHaRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
{"scanner_specific_data": "test"},
)
new_info_callback = manager.scanner_adv_received
connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
)
scanner = FakeScanner(hass, "esp32", "esp32", new_info_callback, connector, True)
unsetup = scanner.async_setup()
cancel = manager.async_register_scanner(scanner, True)
scanner.inject_advertisement(switchbot_device, switchbot_device_adv)
data = scanner.discovered_devices_and_advertisement_data
discovered_device, discovered_adv_data = data[switchbot_device.address]
assert discovered_device.address == switchbot_device.address
assert discovered_device.name == switchbot_device.name
assert (
discovered_adv_data.manufacturer_data == switchbot_device_adv.manufacturer_data
)
assert discovered_adv_data.service_data == switchbot_device_adv.service_data
assert discovered_adv_data.service_uuids == switchbot_device_adv.service_uuids
scanner.inject_advertisement(switchbot_device_2, switchbot_device_adv_2)
data = scanner.discovered_devices_and_advertisement_data
discovered_device, discovered_adv_data = data[switchbot_device.address]
assert discovered_device.address == switchbot_device.address
assert discovered_device.name == switchbot_device.name
assert discovered_adv_data.manufacturer_data == {1: b"\x01", 2: b"\x02"}
assert discovered_adv_data.service_data == {
"050a021a-0000-1000-8000-00805f9b34fb": b"\n\xff",
"00000001-0000-1000-8000-00805f9b34fb": b"\n\xff",
}
assert set(discovered_adv_data.service_uuids) == {
"050a021a-0000-1000-8000-00805f9b34fb",
"00000001-0000-1000-8000-00805f9b34fb",
}
cancel()
unsetup()
async def test_remote_scanner_expires_connectable(hass, enable_bluetooth):
"""Test the remote scanner expires stale connectable data."""
manager = _get_manager()
switchbot_device = BLEDevice(
"44:44:33:11:23:45",
"wohand",
{},
rssi=-100,
)
switchbot_device_adv = generate_advertisement_data(
local_name="wohand",
service_uuids=[],
manufacturer_data={1: b"\x01"},
rssi=-100,
)
class FakeScanner(BaseHaRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
{"scanner_specific_data": "test"},
)
new_info_callback = manager.scanner_adv_received
connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
)
scanner = FakeScanner(hass, "esp32", "esp32", new_info_callback, connector, True)
unsetup = scanner.async_setup()
cancel = manager.async_register_scanner(scanner, True)
start_time_monotonic = time.monotonic()
scanner.inject_advertisement(switchbot_device, switchbot_device_adv)
devices = scanner.discovered_devices
assert len(scanner.discovered_devices) == 1
assert len(scanner.discovered_devices_and_advertisement_data) == 1
assert devices[0].name == "wohand"
expire_monotonic = (
start_time_monotonic
+ CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
+ 1
)
expire_utc = dt_util.utcnow() + timedelta(
seconds=CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
)
with patch(
"homeassistant.components.bluetooth.base_scanner.MONOTONIC_TIME",
return_value=expire_monotonic,
):
async_fire_time_changed(hass, expire_utc)
await hass.async_block_till_done()
devices = scanner.discovered_devices
assert len(scanner.discovered_devices) == 0
assert len(scanner.discovered_devices_and_advertisement_data) == 0
cancel()
unsetup()
async def test_remote_scanner_expires_non_connectable(hass, enable_bluetooth):
"""Test the remote scanner expires stale non connectable data."""
manager = _get_manager()
switchbot_device = BLEDevice(
"44:44:33:11:23:45",
"wohand",
{},
rssi=-100,
)
switchbot_device_adv = generate_advertisement_data(
local_name="wohand",
service_uuids=[],
manufacturer_data={1: b"\x01"},
rssi=-100,
)
class FakeScanner(BaseHaRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
{"scanner_specific_data": "test"},
)
new_info_callback = manager.scanner_adv_received
connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
)
scanner = FakeScanner(hass, "esp32", "esp32", new_info_callback, connector, False)
unsetup = scanner.async_setup()
cancel = manager.async_register_scanner(scanner, True)
start_time_monotonic = time.monotonic()
scanner.inject_advertisement(switchbot_device, switchbot_device_adv)
devices = scanner.discovered_devices
assert len(scanner.discovered_devices) == 1
assert len(scanner.discovered_devices_and_advertisement_data) == 1
assert devices[0].name == "wohand"
assert (
FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
> CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
)
# The connectable timeout is used for all devices
# as the manager takes care of availability and the scanner
# if only concerned about making a connection
expire_monotonic = (
start_time_monotonic
+ CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
+ 1
)
expire_utc = dt_util.utcnow() + timedelta(
seconds=CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
)
with patch(
"homeassistant.components.bluetooth.base_scanner.MONOTONIC_TIME",
return_value=expire_monotonic,
):
async_fire_time_changed(hass, expire_utc)
await hass.async_block_till_done()
assert len(scanner.discovered_devices) == 0
assert len(scanner.discovered_devices_and_advertisement_data) == 0
expire_monotonic = (
start_time_monotonic + FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
)
expire_utc = dt_util.utcnow() + timedelta(
seconds=FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
)
with patch(
"homeassistant.components.bluetooth.base_scanner.MONOTONIC_TIME",
return_value=expire_monotonic,
):
async_fire_time_changed(hass, expire_utc)
await hass.async_block_till_done()
assert len(scanner.discovered_devices) == 0
assert len(scanner.discovered_devices_and_advertisement_data) == 0
cancel()
unsetup()
async def test_base_scanner_connecting_behavior(hass, enable_bluetooth):
"""Test that the default behavior is to mark the scanner as not scanning when connecting."""
manager = _get_manager()
switchbot_device = BLEDevice(
"44:44:33:11:23:45",
"wohand",
{},
rssi=-100,
)
switchbot_device_adv = generate_advertisement_data(
local_name="wohand",
service_uuids=[],
manufacturer_data={1: b"\x01"},
rssi=-100,
)
class FakeScanner(BaseHaRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
{"scanner_specific_data": "test"},
)
new_info_callback = manager.scanner_adv_received
connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
)
scanner = FakeScanner(hass, "esp32", "esp32", new_info_callback, connector, False)
unsetup = scanner.async_setup()
cancel = manager.async_register_scanner(scanner, True)
with scanner.connecting():
assert scanner.scanning is False
# We should still accept new advertisements while connecting
# since advertisements are delivered asynchronously and
# we don't want to miss any even when we are willing to
# accept advertisements from another scanner in the brief window
# between when we start connecting and when we stop scanning
scanner.inject_advertisement(switchbot_device, switchbot_device_adv)
devices = scanner.discovered_devices
assert len(scanner.discovered_devices) == 1
assert len(scanner.discovered_devices_and_advertisement_data) == 1
assert devices[0].name == "wohand"
cancel()
unsetup()
async def test_restore_history_remote_adapter(
hass, hass_storage, disable_new_discovery_flows
):
"""Test we can restore history for a remote adapter."""
data = hass_storage[storage.REMOTE_SCANNER_STORAGE_KEY] = json_loads(
load_fixture("bluetooth.remote_scanners", bluetooth.DOMAIN)
)
now = time.time()
timestamps = data["data"]["atom-bluetooth-proxy-ceaac4"][
"discovered_device_timestamps"
]
for address in timestamps:
if address != "E3:A5:63:3E:5E:23":
timestamps[address] = now
with patch(
"bluetooth_adapters.systems.linux.LinuxAdapters.history",
{},
), patch(
"bluetooth_adapters.systems.linux.LinuxAdapters.refresh",
):
assert await async_setup_component(hass, bluetooth.DOMAIN, {})
await hass.async_block_till_done()
connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
)
scanner = BaseHaRemoteScanner(
hass,
"atom-bluetooth-proxy-ceaac4",
"atom-bluetooth-proxy-ceaac4",
lambda adv: None,
connector,
True,
)
unsetup = scanner.async_setup()
cancel = _get_manager().async_register_scanner(scanner, True)
assert "EB:0B:36:35:6F:A4" in scanner.discovered_devices_and_advertisement_data
assert "E3:A5:63:3E:5E:23" not in scanner.discovered_devices_and_advertisement_data
cancel()
unsetup()
scanner = BaseHaRemoteScanner(
hass,
"atom-bluetooth-proxy-ceaac4",
"atom-bluetooth-proxy-ceaac4",
lambda adv: None,
connector,
True,
)
unsetup = scanner.async_setup()
cancel = _get_manager().async_register_scanner(scanner, True)
assert "EB:0B:36:35:6F:A4" in scanner.discovered_devices_and_advertisement_data
assert "E3:A5:63:3E:5E:23" not in scanner.discovered_devices_and_advertisement_data
cancel()
unsetup()
async def test_device_with_ten_minute_advertising_interval(
hass, caplog, enable_bluetooth
):
"""Test a device with a 10 minute advertising interval."""
manager = _get_manager()
bparasite_device = BLEDevice(
"44:44:33:11:23:45",
"bparasite",
{},
rssi=-100,
)
bparasite_device_adv = generate_advertisement_data(
local_name="bparasite",
service_uuids=[],
manufacturer_data={1: b"\x01"},
rssi=-100,
)
class FakeScanner(BaseHaRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
{"scanner_specific_data": "test"},
)
new_info_callback = manager.scanner_adv_received
connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
)
scanner = FakeScanner(hass, "esp32", "esp32", new_info_callback, connector, False)
unsetup = scanner.async_setup()
cancel = manager.async_register_scanner(scanner, True)
monotonic_now = time.monotonic()
new_time = monotonic_now
bparasite_device_went_unavailable = False
@callback
def _bparasite_device_unavailable_callback(_address: str) -> None:
"""Barasite device unavailable callback."""
nonlocal bparasite_device_went_unavailable
bparasite_device_went_unavailable = True
advertising_interval = 60 * 10
bparasite_device_unavailable_cancel = bluetooth.async_track_unavailable(
hass,
_bparasite_device_unavailable_callback,
bparasite_device.address,
connectable=False,
)
for _ in range(0, 20):
new_time += advertising_interval
with patch(
"homeassistant.components.bluetooth.base_scanner.MONOTONIC_TIME",
return_value=new_time,
):
scanner.inject_advertisement(bparasite_device, bparasite_device_adv)
future_time = new_time
assert (
bluetooth.async_address_present(hass, bparasite_device.address, False) is True
)
assert bparasite_device_went_unavailable is False
with patch(
"homeassistant.components.bluetooth.manager.MONOTONIC_TIME",
return_value=new_time,
):
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=future_time))
await hass.async_block_till_done()
assert bparasite_device_went_unavailable is False
missed_advertisement_future_time = (
future_time + advertising_interval + TRACKER_BUFFERING_WOBBLE_SECONDS + 1
)
with patch(
"homeassistant.components.bluetooth.manager.MONOTONIC_TIME",
return_value=missed_advertisement_future_time,
), patch(
"homeassistant.components.bluetooth.base_scanner.MONOTONIC_TIME",
return_value=missed_advertisement_future_time,
):
# Fire once for the scanner to expire the device
async_fire_time_changed(
hass, dt_util.utcnow() + timedelta(seconds=UNAVAILABLE_TRACK_SECONDS)
)
await hass.async_block_till_done()
# Fire again for the manager to expire the device
async_fire_time_changed(
hass, dt_util.utcnow() + timedelta(seconds=missed_advertisement_future_time)
)
await hass.async_block_till_done()
assert (
bluetooth.async_address_present(hass, bparasite_device.address, False) is False
)
assert bparasite_device_went_unavailable is True
bparasite_device_unavailable_cancel()
cancel()
unsetup()