Bump habluetooth to 0.8.0 (#105109)

pull/105119/head
J. Nick Koston 2023-12-05 21:15:24 -10:00 committed by GitHub
parent 3fda43ef33
commit a528183556
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 96 additions and 201 deletions

View File

@ -20,6 +20,6 @@
"bluetooth-auto-recovery==1.2.3", "bluetooth-auto-recovery==1.2.3",
"bluetooth-data-tools==1.17.0", "bluetooth-data-tools==1.17.0",
"dbus-fast==2.20.0", "dbus-fast==2.20.0",
"habluetooth==0.6.1" "habluetooth==0.8.0"
] ]
} }

View File

@ -23,7 +23,7 @@ dbus-fast==2.20.0
fnv-hash-fast==0.5.0 fnv-hash-fast==0.5.0
ha-av==10.1.1 ha-av==10.1.1
ha-ffmpeg==3.1.0 ha-ffmpeg==3.1.0
habluetooth==0.6.1 habluetooth==0.8.0
hass-nabucasa==0.74.0 hass-nabucasa==0.74.0
hassil==1.5.1 hassil==1.5.1
home-assistant-bluetooth==1.10.4 home-assistant-bluetooth==1.10.4

View File

@ -984,7 +984,7 @@ ha-philipsjs==3.1.1
habitipy==0.2.0 habitipy==0.2.0
# homeassistant.components.bluetooth # homeassistant.components.bluetooth
habluetooth==0.6.1 habluetooth==0.8.0
# homeassistant.components.cloud # homeassistant.components.cloud
hass-nabucasa==0.74.0 hass-nabucasa==0.74.0

View File

@ -783,7 +783,7 @@ ha-philipsjs==3.1.1
habitipy==0.2.0 habitipy==0.2.0
# homeassistant.components.bluetooth # homeassistant.components.bluetooth
habluetooth==0.6.1 habluetooth==0.8.0
# homeassistant.components.cloud # homeassistant.components.cloud
hass-nabucasa==0.74.0 hass-nabucasa==0.74.0

View File

@ -5,11 +5,12 @@ from contextlib import contextmanager
import itertools import itertools
import time import time
from typing import Any from typing import Any
from unittest.mock import MagicMock from unittest.mock import MagicMock, patch
from bleak import BleakClient from bleak import BleakClient
from bleak.backends.scanner import AdvertisementData, BLEDevice from bleak.backends.scanner import AdvertisementData, BLEDevice
from bluetooth_adapters import DEFAULT_ADDRESS from bluetooth_adapters import DEFAULT_ADDRESS
from habluetooth import BaseHaScanner, BluetoothManager
from homeassistant.components.bluetooth import ( from homeassistant.components.bluetooth import (
DOMAIN, DOMAIN,
@ -19,8 +20,6 @@ from homeassistant.components.bluetooth import (
async_get_advertisement_callback, async_get_advertisement_callback,
models, models,
) )
from homeassistant.components.bluetooth.base_scanner import BaseHaScanner
from homeassistant.components.bluetooth.manager import BluetoothManager
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
@ -37,6 +36,7 @@ __all__ = (
"generate_advertisement_data", "generate_advertisement_data",
"generate_ble_device", "generate_ble_device",
"MockBleakClient", "MockBleakClient",
"patch_bluetooth_time",
) )
ADVERTISEMENT_DATA_DEFAULTS = { ADVERTISEMENT_DATA_DEFAULTS = {
@ -56,6 +56,22 @@ BLE_DEVICE_DEFAULTS = {
} }
@contextmanager
def patch_bluetooth_time(mock_time: float) -> None:
"""Patch the bluetooth time."""
with patch(
"homeassistant.components.bluetooth.manager.MONOTONIC_TIME",
return_value=mock_time,
), patch(
"homeassistant.components.bluetooth.MONOTONIC_TIME", return_value=mock_time
), patch(
"habluetooth.base_scanner.monotonic_time_coarse", return_value=mock_time
), patch(
"habluetooth.manager.monotonic_time_coarse", return_value=mock_time
), patch("habluetooth.scanner.monotonic_time_coarse", return_value=mock_time):
yield
def generate_advertisement_data(**kwargs: Any) -> AdvertisementData: def generate_advertisement_data(**kwargs: Any) -> AdvertisementData:
"""Generate advertisement data with defaults.""" """Generate advertisement data with defaults."""
new = kwargs.copy() new = kwargs.copy()

View File

@ -35,11 +35,35 @@ from . import (
_get_manager, _get_manager,
generate_advertisement_data, generate_advertisement_data,
generate_ble_device, generate_ble_device,
patch_bluetooth_time,
) )
from tests.common import async_fire_time_changed, load_fixture from tests.common import async_fire_time_changed, load_fixture
class FakeScanner(HomeAssistantRemoteScanner):
"""Fake scanner."""
def inject_advertisement(
self,
device: BLEDevice,
advertisement_data: AdvertisementData,
now: float | None = None,
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
{"scanner_specific_data": "test"},
now or MONOTONIC_TIME(),
)
@pytest.mark.parametrize("name_2", [None, "w"]) @pytest.mark.parametrize("name_2", [None, "w"])
async def test_remote_scanner( async def test_remote_scanner(
hass: HomeAssistant, enable_bluetooth: None, name_2: str | None hass: HomeAssistant, enable_bluetooth: None, name_2: str | None
@ -87,23 +111,6 @@ async def test_remote_scanner(
rssi=-100, rssi=-100,
) )
class FakeScanner(HomeAssistantRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
{"scanner_specific_data": "test"},
MONOTONIC_TIME(),
)
new_info_callback = manager.scanner_adv_received new_info_callback = manager.scanner_adv_received
connector = ( connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False), HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
@ -171,23 +178,6 @@ async def test_remote_scanner_expires_connectable(
rssi=-100, rssi=-100,
) )
class FakeScanner(HomeAssistantRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
{"scanner_specific_data": "test"},
MONOTONIC_TIME(),
)
new_info_callback = manager.scanner_adv_received new_info_callback = manager.scanner_adv_received
connector = ( connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False), HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
@ -212,10 +202,7 @@ async def test_remote_scanner_expires_connectable(
expire_utc = dt_util.utcnow() + timedelta( expire_utc = dt_util.utcnow() + timedelta(
seconds=CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1 seconds=CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
) )
with patch( with patch_bluetooth_time(expire_monotonic):
"habluetooth.base_scanner.MONOTONIC_TIME",
return_value=expire_monotonic,
):
async_fire_time_changed(hass, expire_utc) async_fire_time_changed(hass, expire_utc)
await hass.async_block_till_done() await hass.async_block_till_done()
@ -246,23 +233,6 @@ async def test_remote_scanner_expires_non_connectable(
rssi=-100, rssi=-100,
) )
class FakeScanner(HomeAssistantRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
{"scanner_specific_data": "test"},
MONOTONIC_TIME(),
)
new_info_callback = manager.scanner_adv_received new_info_callback = manager.scanner_adv_received
connector = ( connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False), HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
@ -295,10 +265,7 @@ async def test_remote_scanner_expires_non_connectable(
expire_utc = dt_util.utcnow() + timedelta( expire_utc = dt_util.utcnow() + timedelta(
seconds=CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1 seconds=CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
) )
with patch( with patch_bluetooth_time(expire_monotonic):
"habluetooth.base_scanner.MONOTONIC_TIME",
return_value=expire_monotonic,
):
async_fire_time_changed(hass, expire_utc) async_fire_time_changed(hass, expire_utc)
await hass.async_block_till_done() await hass.async_block_till_done()
@ -311,10 +278,7 @@ async def test_remote_scanner_expires_non_connectable(
expire_utc = dt_util.utcnow() + timedelta( expire_utc = dt_util.utcnow() + timedelta(
seconds=FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1 seconds=FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
) )
with patch( with patch_bluetooth_time(expire_monotonic):
"habluetooth.base_scanner.MONOTONIC_TIME",
return_value=expire_monotonic,
):
async_fire_time_changed(hass, expire_utc) async_fire_time_changed(hass, expire_utc)
await hass.async_block_till_done() await hass.async_block_till_done()
@ -344,23 +308,6 @@ async def test_base_scanner_connecting_behavior(
rssi=-100, rssi=-100,
) )
class FakeScanner(HomeAssistantRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
{"scanner_specific_data": "test"},
MONOTONIC_TIME(),
)
new_info_callback = manager.scanner_adv_received new_info_callback = manager.scanner_adv_received
connector = ( connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False), HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
@ -468,23 +415,6 @@ async def test_device_with_ten_minute_advertising_interval(
rssi=-100, rssi=-100,
) )
class FakeScanner(HomeAssistantRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
{"scanner_specific_data": "test"},
MONOTONIC_TIME(),
)
new_info_callback = manager.scanner_adv_received new_info_callback = manager.scanner_adv_received
connector = ( connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False), HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
@ -512,11 +442,8 @@ async def test_device_with_ten_minute_advertising_interval(
connectable=False, connectable=False,
) )
with patch( with patch_bluetooth_time(new_time):
"habluetooth.base_scanner.MONOTONIC_TIME", scanner.inject_advertisement(bparasite_device, bparasite_device_adv, new_time)
return_value=new_time,
):
scanner.inject_advertisement(bparasite_device, bparasite_device_adv)
original_device = scanner.discovered_devices_and_advertisement_data[ original_device = scanner.discovered_devices_and_advertisement_data[
bparasite_device.address bparasite_device.address
@ -525,11 +452,10 @@ async def test_device_with_ten_minute_advertising_interval(
for _ in range(1, 20): for _ in range(1, 20):
new_time += advertising_interval new_time += advertising_interval
with patch( with patch_bluetooth_time(new_time):
"habluetooth.base_scanner.MONOTONIC_TIME", scanner.inject_advertisement(
return_value=new_time, bparasite_device, bparasite_device_adv, new_time
): )
scanner.inject_advertisement(bparasite_device, bparasite_device_adv)
# Make sure the BLEDevice object gets updated # Make sure the BLEDevice object gets updated
# and not replaced # and not replaced
@ -543,10 +469,7 @@ async def test_device_with_ten_minute_advertising_interval(
bluetooth.async_address_present(hass, bparasite_device.address, False) is True bluetooth.async_address_present(hass, bparasite_device.address, False) is True
) )
assert bparasite_device_went_unavailable is False assert bparasite_device_went_unavailable is False
with patch( with patch_bluetooth_time(new_time):
"homeassistant.components.bluetooth.manager.MONOTONIC_TIME",
return_value=new_time,
):
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=future_time)) async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=future_time))
await hass.async_block_till_done() await hass.async_block_till_done()
@ -556,13 +479,7 @@ async def test_device_with_ten_minute_advertising_interval(
future_time + advertising_interval + TRACKER_BUFFERING_WOBBLE_SECONDS + 1 future_time + advertising_interval + TRACKER_BUFFERING_WOBBLE_SECONDS + 1
) )
with patch( with patch_bluetooth_time(missed_advertisement_future_time):
"homeassistant.components.bluetooth.manager.MONOTONIC_TIME",
return_value=missed_advertisement_future_time,
), patch(
"habluetooth.base_scanner.MONOTONIC_TIME",
return_value=missed_advertisement_future_time,
):
# Fire once for the scanner to expire the device # Fire once for the scanner to expire the device
async_fire_time_changed( async_fire_time_changed(
hass, dt_util.utcnow() + timedelta(seconds=UNAVAILABLE_TRACK_SECONDS) hass, dt_util.utcnow() + timedelta(seconds=UNAVAILABLE_TRACK_SECONDS)
@ -590,25 +507,6 @@ async def test_scanner_stops_responding(
"""Test we mark a scanner are not scanning when it stops responding.""" """Test we mark a scanner are not scanning when it stops responding."""
manager = _get_manager() manager = _get_manager()
class FakeScanner(HomeAssistantRemoteScanner):
"""A fake remote scanner."""
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
{"scanner_specific_data": "test"},
MONOTONIC_TIME(),
)
new_info_callback = manager.scanner_adv_received new_info_callback = manager.scanner_adv_received
connector = ( connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False), HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
@ -626,10 +524,7 @@ async def test_scanner_stops_responding(
+ SCANNER_WATCHDOG_INTERVAL.total_seconds() + SCANNER_WATCHDOG_INTERVAL.total_seconds()
) )
# We hit the timer with no detections, so we reset the adapter and restart the scanner # We hit the timer with no detections, so we reset the adapter and restart the scanner
with patch( with patch_bluetooth_time(failure_reached_time):
"habluetooth.base_scanner.MONOTONIC_TIME",
return_value=failure_reached_time,
):
async_fire_time_changed(hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL) async_fire_time_changed(hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL)
await hass.async_block_till_done() await hass.async_block_till_done()
@ -650,11 +545,10 @@ async def test_scanner_stops_responding(
failure_reached_time += 1 failure_reached_time += 1
with patch( with patch_bluetooth_time(failure_reached_time):
"habluetooth.base_scanner.MONOTONIC_TIME", scanner.inject_advertisement(
return_value=failure_reached_time, bparasite_device, bparasite_device_adv, failure_reached_time
): )
scanner.inject_advertisement(bparasite_device, bparasite_device_adv)
# As soon as we get a detection, we know the scanner is working again # As soon as we get a detection, we know the scanner is working again
assert scanner.scanning is True assert scanner.scanning is True

View File

@ -25,6 +25,7 @@ from . import (
async_setup_with_one_adapter, async_setup_with_one_adapter,
generate_advertisement_data, generate_advertisement_data,
generate_ble_device, generate_ble_device,
patch_bluetooth_time,
) )
from tests.common import MockConfigEntry, async_fire_time_changed from tests.common import MockConfigEntry, async_fire_time_changed
@ -226,9 +227,8 @@ async def test_recovery_from_dbus_restart(
mock_discovered = [MagicMock()] mock_discovered = [MagicMock()]
# Ensure we don't restart the scanner if we don't need to # Ensure we don't restart the scanner if we don't need to
with patch( with patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic + 10,
return_value=start_time_monotonic + 10,
): ):
async_fire_time_changed(hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL) async_fire_time_changed(hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL)
await hass.async_block_till_done() await hass.async_block_till_done()
@ -236,9 +236,8 @@ async def test_recovery_from_dbus_restart(
assert called_start == 1 assert called_start == 1
# Fire a callback to reset the timer # Fire a callback to reset the timer
with patch( with patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic,
return_value=start_time_monotonic,
): ):
_callback( _callback(
generate_ble_device("44:44:33:11:23:42", "any_name"), generate_ble_device("44:44:33:11:23:42", "any_name"),
@ -246,9 +245,8 @@ async def test_recovery_from_dbus_restart(
) )
# Ensure we don't restart the scanner if we don't need to # Ensure we don't restart the scanner if we don't need to
with patch( with patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic + 20,
return_value=start_time_monotonic + 20,
): ):
async_fire_time_changed(hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL) async_fire_time_changed(hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL)
await hass.async_block_till_done() await hass.async_block_till_done()
@ -256,9 +254,8 @@ async def test_recovery_from_dbus_restart(
assert called_start == 1 assert called_start == 1
# We hit the timer, so we restart the scanner # We hit the timer, so we restart the scanner
with patch( with patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic + SCANNER_WATCHDOG_TIMEOUT + 20,
return_value=start_time_monotonic + SCANNER_WATCHDOG_TIMEOUT + 20,
): ):
async_fire_time_changed( async_fire_time_changed(
hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL + timedelta(seconds=20) hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL + timedelta(seconds=20)
@ -301,9 +298,8 @@ async def test_adapter_recovery(hass: HomeAssistant, one_adapter: None) -> None:
scanner = MockBleakScanner() scanner = MockBleakScanner()
start_time_monotonic = time.monotonic() start_time_monotonic = time.monotonic()
with patch( with patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic,
return_value=start_time_monotonic,
), patch( ), patch(
"habluetooth.scanner.OriginalBleakScanner", "habluetooth.scanner.OriginalBleakScanner",
return_value=scanner, return_value=scanner,
@ -316,9 +312,8 @@ async def test_adapter_recovery(hass: HomeAssistant, one_adapter: None) -> None:
mock_discovered = [MagicMock()] mock_discovered = [MagicMock()]
# Ensure we don't restart the scanner if we don't need to # Ensure we don't restart the scanner if we don't need to
with patch( with patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic + 10,
return_value=start_time_monotonic + 10,
): ):
async_fire_time_changed(hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL) async_fire_time_changed(hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL)
await hass.async_block_till_done() await hass.async_block_till_done()
@ -326,9 +321,8 @@ async def test_adapter_recovery(hass: HomeAssistant, one_adapter: None) -> None:
assert called_start == 1 assert called_start == 1
# Ensure we don't restart the scanner if we don't need to # Ensure we don't restart the scanner if we don't need to
with patch( with patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic + 20,
return_value=start_time_monotonic + 20,
): ):
async_fire_time_changed(hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL) async_fire_time_changed(hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL)
await hass.async_block_till_done() await hass.async_block_till_done()
@ -336,9 +330,8 @@ async def test_adapter_recovery(hass: HomeAssistant, one_adapter: None) -> None:
assert called_start == 1 assert called_start == 1
# We hit the timer with no detections, so we reset the adapter and restart the scanner # We hit the timer with no detections, so we reset the adapter and restart the scanner
with patch( with patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic
return_value=start_time_monotonic
+ SCANNER_WATCHDOG_TIMEOUT + SCANNER_WATCHDOG_TIMEOUT
+ SCANNER_WATCHDOG_INTERVAL.total_seconds(), + SCANNER_WATCHDOG_INTERVAL.total_seconds(),
), patch( ), patch(
@ -390,9 +383,8 @@ async def test_adapter_scanner_fails_to_start_first_time(
scanner = MockBleakScanner() scanner = MockBleakScanner()
start_time_monotonic = time.monotonic() start_time_monotonic = time.monotonic()
with patch( with patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic,
return_value=start_time_monotonic,
), patch( ), patch(
"habluetooth.scanner.OriginalBleakScanner", "habluetooth.scanner.OriginalBleakScanner",
return_value=scanner, return_value=scanner,
@ -405,9 +397,8 @@ async def test_adapter_scanner_fails_to_start_first_time(
mock_discovered = [MagicMock()] mock_discovered = [MagicMock()]
# Ensure we don't restart the scanner if we don't need to # Ensure we don't restart the scanner if we don't need to
with patch( with patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic + 10,
return_value=start_time_monotonic + 10,
): ):
async_fire_time_changed(hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL) async_fire_time_changed(hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL)
await hass.async_block_till_done() await hass.async_block_till_done()
@ -415,9 +406,8 @@ async def test_adapter_scanner_fails_to_start_first_time(
assert called_start == 1 assert called_start == 1
# Ensure we don't restart the scanner if we don't need to # Ensure we don't restart the scanner if we don't need to
with patch( with patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic + 20,
return_value=start_time_monotonic + 20,
): ):
async_fire_time_changed(hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL) async_fire_time_changed(hass, dt_util.utcnow() + SCANNER_WATCHDOG_INTERVAL)
await hass.async_block_till_done() await hass.async_block_till_done()
@ -425,9 +415,8 @@ async def test_adapter_scanner_fails_to_start_first_time(
assert called_start == 1 assert called_start == 1
# We hit the timer with no detections, so we reset the adapter and restart the scanner # We hit the timer with no detections, so we reset the adapter and restart the scanner
with patch( with patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic
return_value=start_time_monotonic
+ SCANNER_WATCHDOG_TIMEOUT + SCANNER_WATCHDOG_TIMEOUT
+ SCANNER_WATCHDOG_INTERVAL.total_seconds(), + SCANNER_WATCHDOG_INTERVAL.total_seconds(),
), patch( ), patch(
@ -441,9 +430,8 @@ async def test_adapter_scanner_fails_to_start_first_time(
# We hit the timer again the previous start call failed, make sure # We hit the timer again the previous start call failed, make sure
# we try again # we try again
with patch( with patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic
return_value=start_time_monotonic
+ SCANNER_WATCHDOG_TIMEOUT + SCANNER_WATCHDOG_TIMEOUT
+ SCANNER_WATCHDOG_INTERVAL.total_seconds(), + SCANNER_WATCHDOG_INTERVAL.total_seconds(),
), patch( ), patch(
@ -504,9 +492,8 @@ async def test_adapter_fails_to_start_and_takes_a_bit_to_init(
with patch( with patch(
"habluetooth.scanner.ADAPTER_INIT_TIME", "habluetooth.scanner.ADAPTER_INIT_TIME",
0, 0,
), patch( ), patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic,
return_value=start_time_monotonic,
), patch( ), patch(
"habluetooth.scanner.OriginalBleakScanner", "habluetooth.scanner.OriginalBleakScanner",
return_value=scanner, return_value=scanner,
@ -555,9 +542,8 @@ async def test_restart_takes_longer_than_watchdog_time(
with patch( with patch(
"habluetooth.scanner.ADAPTER_INIT_TIME", "habluetooth.scanner.ADAPTER_INIT_TIME",
0, 0,
), patch( ), patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic,
return_value=start_time_monotonic,
), patch( ), patch(
"habluetooth.scanner.OriginalBleakScanner", "habluetooth.scanner.OriginalBleakScanner",
return_value=scanner, return_value=scanner,
@ -568,9 +554,8 @@ async def test_restart_takes_longer_than_watchdog_time(
# Now force a recover adapter 2x # Now force a recover adapter 2x
for _ in range(2): for _ in range(2):
with patch( with patch_bluetooth_time(
"habluetooth.base_scanner.MONOTONIC_TIME", start_time_monotonic
return_value=start_time_monotonic
+ SCANNER_WATCHDOG_TIMEOUT + SCANNER_WATCHDOG_TIMEOUT
+ SCANNER_WATCHDOG_INTERVAL.total_seconds(), + SCANNER_WATCHDOG_INTERVAL.total_seconds(),
): ):