655 lines
26 KiB
Python
655 lines
26 KiB
Python
"""Bluetooth client for esphome."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import Callable, Coroutine
|
|
import contextlib
|
|
import logging
|
|
from typing import Any, TypeVar, cast
|
|
import uuid
|
|
|
|
from aioesphomeapi import (
|
|
ESP_CONNECTION_ERROR_DESCRIPTION,
|
|
ESPHOME_GATT_ERRORS,
|
|
BLEConnectionError,
|
|
)
|
|
from aioesphomeapi.connection import APIConnectionError, TimeoutAPIError
|
|
from aioesphomeapi.core import BluetoothGATTAPIError
|
|
import async_timeout
|
|
from bleak.backends.characteristic import BleakGATTCharacteristic
|
|
from bleak.backends.client import BaseBleakClient, NotifyCallback
|
|
from bleak.backends.device import BLEDevice
|
|
from bleak.backends.service import BleakGATTServiceCollection
|
|
from bleak.exc import BleakError
|
|
|
|
from homeassistant.components.bluetooth import async_scanner_by_source
|
|
from homeassistant.core import CALLBACK_TYPE, HomeAssistant
|
|
|
|
from ..domain_data import DomainData
|
|
from .characteristic import BleakGATTCharacteristicESPHome
|
|
from .descriptor import BleakGATTDescriptorESPHome
|
|
from .service import BleakGATTServiceESPHome
|
|
|
|
DEFAULT_MTU = 23
|
|
GATT_HEADER_SIZE = 3
|
|
DISCONNECT_TIMEOUT = 5.0
|
|
CONNECT_FREE_SLOT_TIMEOUT = 2.0
|
|
GATT_READ_TIMEOUT = 30.0
|
|
|
|
# CCCD (Characteristic Client Config Descriptor)
|
|
CCCD_UUID = "00002902-0000-1000-8000-00805f9b34fb"
|
|
CCCD_NOTIFY_BYTES = b"\x01\x00"
|
|
CCCD_INDICATE_BYTES = b"\x02\x00"
|
|
|
|
MIN_BLUETOOTH_PROXY_VERSION_HAS_CACHE = 3
|
|
|
|
DEFAULT_MAX_WRITE_WITHOUT_RESPONSE = DEFAULT_MTU - GATT_HEADER_SIZE
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
_WrapFuncType = TypeVar( # pylint: disable=invalid-name
|
|
"_WrapFuncType", bound=Callable[..., Any]
|
|
)
|
|
|
|
|
|
def mac_to_int(address: str) -> int:
|
|
"""Convert a mac address to an integer."""
|
|
return int(address.replace(":", ""), 16)
|
|
|
|
|
|
def verify_connected(func: _WrapFuncType) -> _WrapFuncType:
|
|
"""Define a wrapper throw BleakError if not connected."""
|
|
|
|
async def _async_wrap_bluetooth_connected_operation(
|
|
self: "ESPHomeClient", *args: Any, **kwargs: Any
|
|
) -> Any:
|
|
disconnected_event = (
|
|
self._disconnected_event # pylint: disable=protected-access
|
|
)
|
|
if not disconnected_event:
|
|
raise BleakError("Not connected")
|
|
task = asyncio.create_task(func(self, *args, **kwargs))
|
|
done, _ = await asyncio.wait(
|
|
(task, disconnected_event.wait()),
|
|
return_when=asyncio.FIRST_COMPLETED,
|
|
)
|
|
if disconnected_event.is_set():
|
|
task.cancel()
|
|
with contextlib.suppress(asyncio.CancelledError):
|
|
await task
|
|
raise BleakError(
|
|
f"{self._source}: {self._ble_device.name} - {self._ble_device.address}: " # pylint: disable=protected-access
|
|
"Disconnected during operation"
|
|
)
|
|
return next(iter(done)).result()
|
|
|
|
return cast(_WrapFuncType, _async_wrap_bluetooth_connected_operation)
|
|
|
|
|
|
def api_error_as_bleak_error(func: _WrapFuncType) -> _WrapFuncType:
|
|
"""Define a wrapper throw esphome api errors as BleakErrors."""
|
|
|
|
async def _async_wrap_bluetooth_operation(
|
|
self: "ESPHomeClient", *args: Any, **kwargs: Any
|
|
) -> Any:
|
|
try:
|
|
return await func(self, *args, **kwargs)
|
|
except TimeoutAPIError as err:
|
|
raise asyncio.TimeoutError(str(err)) from err
|
|
except BluetoothGATTAPIError as ex:
|
|
# If the device disconnects in the middle of an operation
|
|
# be sure to mark it as disconnected so any library using
|
|
# the proxy knows to reconnect.
|
|
#
|
|
# Because callbacks are delivered asynchronously it's possible
|
|
# that we find out about the disconnection during the operation
|
|
# before the callback is delivered.
|
|
|
|
# pylint: disable=protected-access
|
|
if ex.error.error == -1:
|
|
_LOGGER.debug(
|
|
"%s: %s - %s: BLE device disconnected during %s operation",
|
|
self._source,
|
|
self._ble_device.name,
|
|
self._ble_device.address,
|
|
func.__name__,
|
|
)
|
|
self._async_ble_device_disconnected()
|
|
raise BleakError(str(ex)) from ex
|
|
except APIConnectionError as err:
|
|
raise BleakError(str(err)) from err
|
|
|
|
return cast(_WrapFuncType, _async_wrap_bluetooth_operation)
|
|
|
|
|
|
class ESPHomeClient(BaseBleakClient):
|
|
"""ESPHome Bleak Client."""
|
|
|
|
def __init__(
|
|
self, address_or_ble_device: BLEDevice | str, *args: Any, **kwargs: Any
|
|
) -> None:
|
|
"""Initialize the ESPHomeClient."""
|
|
assert isinstance(address_or_ble_device, BLEDevice)
|
|
super().__init__(address_or_ble_device, *args, **kwargs)
|
|
self._hass: HomeAssistant = kwargs["hass"]
|
|
self._ble_device = address_or_ble_device
|
|
self._address_as_int = mac_to_int(self._ble_device.address)
|
|
assert self._ble_device.details is not None
|
|
self._source = self._ble_device.details["source"]
|
|
self.domain_data = DomainData.get(self._hass)
|
|
config_entry = self.domain_data.get_by_unique_id(self._source)
|
|
self.entry_data = self.domain_data.get_entry_data(config_entry)
|
|
self._client = self.entry_data.client
|
|
self._is_connected = False
|
|
self._mtu: int | None = None
|
|
self._cancel_connection_state: CALLBACK_TYPE | None = None
|
|
self._notify_cancels: dict[
|
|
int, tuple[Callable[[], Coroutine[Any, Any, None]], Callable[[], None]]
|
|
] = {}
|
|
self._disconnected_event: asyncio.Event | None = None
|
|
device_info = self.entry_data.device_info
|
|
assert device_info is not None
|
|
self._connection_version = device_info.bluetooth_proxy_version
|
|
self._address_type = address_or_ble_device.details["address_type"]
|
|
|
|
def __str__(self) -> str:
|
|
"""Return the string representation of the client."""
|
|
return f"ESPHomeClient ({self.address})"
|
|
|
|
def _unsubscribe_connection_state(self) -> None:
|
|
"""Unsubscribe from connection state updates."""
|
|
if not self._cancel_connection_state:
|
|
return
|
|
try:
|
|
self._cancel_connection_state()
|
|
except (AssertionError, ValueError) as ex:
|
|
_LOGGER.debug(
|
|
"%s: %s - %s: Failed to unsubscribe from connection state (likely connection dropped): %s",
|
|
self._source,
|
|
self._ble_device.name,
|
|
self._ble_device.address,
|
|
ex,
|
|
)
|
|
self._cancel_connection_state = None
|
|
|
|
def _async_disconnected_cleanup(self) -> None:
|
|
"""Clean up on disconnect."""
|
|
self.services = BleakGATTServiceCollection() # type: ignore[no-untyped-call]
|
|
self._is_connected = False
|
|
for _, notify_abort in self._notify_cancels.values():
|
|
notify_abort()
|
|
self._notify_cancels.clear()
|
|
if self._disconnected_event:
|
|
self._disconnected_event.set()
|
|
self._disconnected_event = None
|
|
self._unsubscribe_connection_state()
|
|
|
|
def _async_ble_device_disconnected(self) -> None:
|
|
"""Handle the BLE device disconnecting from the ESP."""
|
|
was_connected = self._is_connected
|
|
self._async_disconnected_cleanup()
|
|
if was_connected:
|
|
_LOGGER.debug(
|
|
"%s: %s - %s: BLE device disconnected",
|
|
self._source,
|
|
self._ble_device.name,
|
|
self._ble_device.address,
|
|
)
|
|
self._async_call_bleak_disconnected_callback()
|
|
|
|
def _async_esp_disconnected(self) -> None:
|
|
"""Handle the esp32 client disconnecting from hass."""
|
|
_LOGGER.debug(
|
|
"%s: %s - %s: ESP device disconnected",
|
|
self._source,
|
|
self._ble_device.name,
|
|
self._ble_device.address,
|
|
)
|
|
self.entry_data.disconnect_callbacks.remove(self._async_esp_disconnected)
|
|
self._async_ble_device_disconnected()
|
|
|
|
def _async_call_bleak_disconnected_callback(self) -> None:
|
|
"""Call the disconnected callback to inform the bleak consumer."""
|
|
if self._disconnected_callback:
|
|
self._disconnected_callback(self)
|
|
self._disconnected_callback = None
|
|
|
|
@api_error_as_bleak_error
|
|
async def connect(
|
|
self, dangerous_use_bleak_cache: bool = False, **kwargs: Any
|
|
) -> bool:
|
|
"""Connect to a specified Peripheral.
|
|
|
|
Keyword Args:
|
|
timeout (float): Timeout for required ``BleakScanner.find_device_by_address`` call. Defaults to 10.0.
|
|
Returns:
|
|
Boolean representing connection status.
|
|
"""
|
|
await self._wait_for_free_connection_slot(CONNECT_FREE_SLOT_TIMEOUT)
|
|
entry_data = self.entry_data
|
|
self._mtu = entry_data.get_gatt_mtu_cache(self._address_as_int)
|
|
has_cache = bool(
|
|
dangerous_use_bleak_cache
|
|
and self._connection_version >= MIN_BLUETOOTH_PROXY_VERSION_HAS_CACHE
|
|
and entry_data.get_gatt_services_cache(self._address_as_int)
|
|
and self._mtu
|
|
)
|
|
connected_future: asyncio.Future[bool] = asyncio.Future()
|
|
|
|
def _on_bluetooth_connection_state(
|
|
connected: bool, mtu: int, error: int
|
|
) -> None:
|
|
"""Handle a connect or disconnect."""
|
|
_LOGGER.debug(
|
|
"%s: %s - %s: Connection state changed to connected=%s mtu=%s error=%s",
|
|
self._source,
|
|
self._ble_device.name,
|
|
self._ble_device.address,
|
|
connected,
|
|
mtu,
|
|
error,
|
|
)
|
|
if connected:
|
|
self._is_connected = True
|
|
if not self._mtu:
|
|
self._mtu = mtu
|
|
entry_data.set_gatt_mtu_cache(self._address_as_int, mtu)
|
|
else:
|
|
self._async_ble_device_disconnected()
|
|
|
|
if connected_future.done():
|
|
return
|
|
|
|
if error:
|
|
try:
|
|
ble_connection_error = BLEConnectionError(error)
|
|
ble_connection_error_name = ble_connection_error.name
|
|
human_error = ESP_CONNECTION_ERROR_DESCRIPTION[ble_connection_error]
|
|
except (KeyError, ValueError):
|
|
ble_connection_error_name = str(error)
|
|
human_error = ESPHOME_GATT_ERRORS.get(
|
|
error, f"Unknown error code {error}"
|
|
)
|
|
connected_future.set_exception(
|
|
BleakError(
|
|
f"Error {ble_connection_error_name} while connecting: {human_error}"
|
|
)
|
|
)
|
|
return
|
|
|
|
if not connected:
|
|
connected_future.set_exception(BleakError("Disconnected"))
|
|
return
|
|
|
|
_LOGGER.debug(
|
|
"%s: %s - %s: connected, registering for disconnected callbacks",
|
|
self._source,
|
|
self._ble_device.name,
|
|
self._ble_device.address,
|
|
)
|
|
entry_data.disconnect_callbacks.append(self._async_esp_disconnected)
|
|
connected_future.set_result(connected)
|
|
|
|
timeout = kwargs.get("timeout", self._timeout)
|
|
if not (scanner := async_scanner_by_source(self._hass, self._source)):
|
|
raise BleakError("Scanner disappeared for {self._source}")
|
|
with scanner.connecting():
|
|
try:
|
|
self._cancel_connection_state = (
|
|
await self._client.bluetooth_device_connect(
|
|
self._address_as_int,
|
|
_on_bluetooth_connection_state,
|
|
timeout=timeout,
|
|
has_cache=has_cache,
|
|
version=self._connection_version,
|
|
address_type=self._address_type,
|
|
)
|
|
)
|
|
except Exception:
|
|
with contextlib.suppress(BleakError):
|
|
# If the connect call throws an exception,
|
|
# we need to make sure we await the future
|
|
# to avoid a warning about an un-retrieved
|
|
# exception since we prefer to raise the
|
|
# exception from the connect call as it
|
|
# will be more descriptive.
|
|
if connected_future.done():
|
|
await connected_future
|
|
connected_future.cancel()
|
|
raise
|
|
await connected_future
|
|
|
|
try:
|
|
await self.get_services(dangerous_use_bleak_cache=dangerous_use_bleak_cache)
|
|
except asyncio.CancelledError:
|
|
# On cancel we must still raise cancelled error
|
|
# to avoid blocking the cancellation even if the
|
|
# disconnect call fails.
|
|
with contextlib.suppress(Exception):
|
|
await self.disconnect()
|
|
raise
|
|
except Exception:
|
|
await self.disconnect()
|
|
raise
|
|
|
|
self._disconnected_event = asyncio.Event()
|
|
return True
|
|
|
|
@api_error_as_bleak_error
|
|
async def disconnect(self) -> bool:
|
|
"""Disconnect from the peripheral device."""
|
|
self._async_disconnected_cleanup()
|
|
await self._client.bluetooth_device_disconnect(self._address_as_int)
|
|
await self._wait_for_free_connection_slot(DISCONNECT_TIMEOUT)
|
|
return True
|
|
|
|
async def _wait_for_free_connection_slot(self, timeout: float) -> None:
|
|
"""Wait for a free connection slot."""
|
|
if self.entry_data.ble_connections_free:
|
|
return
|
|
_LOGGER.debug(
|
|
"%s: %s - %s: Out of connection slots, waiting for a free one",
|
|
self._source,
|
|
self._ble_device.name,
|
|
self._ble_device.address,
|
|
)
|
|
async with async_timeout.timeout(timeout):
|
|
await self.entry_data.wait_for_ble_connections_free()
|
|
|
|
@property
|
|
def is_connected(self) -> bool:
|
|
"""Is Connected."""
|
|
return self._is_connected
|
|
|
|
@property
|
|
def mtu_size(self) -> int:
|
|
"""Get ATT MTU size for active connection."""
|
|
return self._mtu or DEFAULT_MTU
|
|
|
|
@verify_connected
|
|
@api_error_as_bleak_error
|
|
async def pair(self, *args: Any, **kwargs: Any) -> bool:
|
|
"""Attempt to pair."""
|
|
raise NotImplementedError("Pairing is not available in ESPHome.")
|
|
|
|
@verify_connected
|
|
@api_error_as_bleak_error
|
|
async def unpair(self) -> bool:
|
|
"""Attempt to unpair."""
|
|
raise NotImplementedError("Pairing is not available in ESPHome.")
|
|
|
|
@api_error_as_bleak_error
|
|
async def get_services(
|
|
self, dangerous_use_bleak_cache: bool = False, **kwargs: Any
|
|
) -> BleakGATTServiceCollection:
|
|
"""Get all services registered for this GATT server.
|
|
|
|
Returns:
|
|
A :py:class:`bleak.backends.service.BleakGATTServiceCollection` with this device's services tree.
|
|
"""
|
|
address_as_int = self._address_as_int
|
|
entry_data = self.entry_data
|
|
# If the connection version >= 3, we must use the cache
|
|
# because the esp has already wiped the services list to
|
|
# save memory.
|
|
if (
|
|
self._connection_version >= MIN_BLUETOOTH_PROXY_VERSION_HAS_CACHE
|
|
or dangerous_use_bleak_cache
|
|
) and (cached_services := entry_data.get_gatt_services_cache(address_as_int)):
|
|
_LOGGER.debug(
|
|
"%s: %s - %s: Cached services hit",
|
|
self._source,
|
|
self._ble_device.name,
|
|
self._ble_device.address,
|
|
)
|
|
self.services = cached_services
|
|
return self.services
|
|
_LOGGER.debug(
|
|
"%s: %s - %s: Cached services miss",
|
|
self._source,
|
|
self._ble_device.name,
|
|
self._ble_device.address,
|
|
)
|
|
esphome_services = await self._client.bluetooth_gatt_get_services(
|
|
address_as_int
|
|
)
|
|
_LOGGER.debug(
|
|
"%s: %s - %s: Got services: %s",
|
|
self._source,
|
|
self._ble_device.name,
|
|
self._ble_device.address,
|
|
esphome_services,
|
|
)
|
|
max_write_without_response = self.mtu_size - GATT_HEADER_SIZE
|
|
services = BleakGATTServiceCollection() # type: ignore[no-untyped-call]
|
|
for service in esphome_services.services:
|
|
services.add_service(BleakGATTServiceESPHome(service))
|
|
for characteristic in service.characteristics:
|
|
services.add_characteristic(
|
|
BleakGATTCharacteristicESPHome(
|
|
characteristic,
|
|
max_write_without_response,
|
|
service.uuid,
|
|
service.handle,
|
|
)
|
|
)
|
|
for descriptor in characteristic.descriptors:
|
|
services.add_descriptor(
|
|
BleakGATTDescriptorESPHome(
|
|
descriptor,
|
|
characteristic.uuid,
|
|
characteristic.handle,
|
|
)
|
|
)
|
|
|
|
if not esphome_services.services:
|
|
# If we got no services, we must have disconnected
|
|
# or something went wrong on the ESP32's BLE stack.
|
|
raise BleakError("Failed to get services from remote esp")
|
|
|
|
self.services = services
|
|
_LOGGER.debug(
|
|
"%s: %s - %s: Cached services saved",
|
|
self._source,
|
|
self._ble_device.name,
|
|
self._ble_device.address,
|
|
)
|
|
entry_data.set_gatt_services_cache(address_as_int, services)
|
|
return services
|
|
|
|
def _resolve_characteristic(
|
|
self, char_specifier: BleakGATTCharacteristic | int | str | uuid.UUID
|
|
) -> BleakGATTCharacteristic:
|
|
"""Resolve a characteristic specifier to a BleakGATTCharacteristic object."""
|
|
if not isinstance(char_specifier, BleakGATTCharacteristic):
|
|
characteristic = self.services.get_characteristic(char_specifier)
|
|
else:
|
|
characteristic = char_specifier
|
|
if not characteristic:
|
|
raise BleakError(f"Characteristic {char_specifier} was not found!")
|
|
return characteristic
|
|
|
|
async def clear_cache(self) -> None:
|
|
"""Clear the GATT cache."""
|
|
self.entry_data.clear_gatt_services_cache(self._address_as_int)
|
|
self.entry_data.clear_gatt_mtu_cache(self._address_as_int)
|
|
|
|
@verify_connected
|
|
@api_error_as_bleak_error
|
|
async def read_gatt_char(
|
|
self,
|
|
char_specifier: BleakGATTCharacteristic | int | str | uuid.UUID,
|
|
**kwargs: Any,
|
|
) -> bytearray:
|
|
"""Perform read operation on the specified GATT characteristic.
|
|
|
|
Args:
|
|
char_specifier (BleakGATTCharacteristic, int, str or UUID): The characteristic to read from,
|
|
specified by either integer handle, UUID or directly by the
|
|
BleakGATTCharacteristic object representing it.
|
|
Returns:
|
|
(bytearray) The read data.
|
|
"""
|
|
characteristic = self._resolve_characteristic(char_specifier)
|
|
return await self._client.bluetooth_gatt_read(
|
|
self._address_as_int, characteristic.handle, GATT_READ_TIMEOUT
|
|
)
|
|
|
|
@verify_connected
|
|
@api_error_as_bleak_error
|
|
async def read_gatt_descriptor(self, handle: int, **kwargs: Any) -> bytearray:
|
|
"""Perform read operation on the specified GATT descriptor.
|
|
|
|
Args:
|
|
handle (int): The handle of the descriptor to read from.
|
|
Returns:
|
|
(bytearray) The read data.
|
|
"""
|
|
return await self._client.bluetooth_gatt_read_descriptor(
|
|
self._address_as_int, handle, GATT_READ_TIMEOUT
|
|
)
|
|
|
|
@verify_connected
|
|
@api_error_as_bleak_error
|
|
async def write_gatt_char(
|
|
self,
|
|
char_specifier: BleakGATTCharacteristic | int | str | uuid.UUID,
|
|
data: bytes | bytearray | memoryview,
|
|
response: bool = False,
|
|
) -> None:
|
|
"""Perform a write operation of the specified GATT characteristic.
|
|
|
|
Args:
|
|
char_specifier (BleakGATTCharacteristic, int, str or UUID): The characteristic to write
|
|
to, specified by either integer handle, UUID or directly by the
|
|
BleakGATTCharacteristic object representing it.
|
|
data (bytes or bytearray): The data to send.
|
|
response (bool): If write-with-response operation should be done. Defaults to `False`.
|
|
"""
|
|
characteristic = self._resolve_characteristic(char_specifier)
|
|
await self._client.bluetooth_gatt_write(
|
|
self._address_as_int, characteristic.handle, bytes(data), response
|
|
)
|
|
|
|
@verify_connected
|
|
@api_error_as_bleak_error
|
|
async def write_gatt_descriptor(
|
|
self, handle: int, data: bytes | bytearray | memoryview
|
|
) -> None:
|
|
"""Perform a write operation on the specified GATT descriptor.
|
|
|
|
Args:
|
|
handle (int): The handle of the descriptor to read from.
|
|
data (bytes or bytearray): The data to send.
|
|
"""
|
|
await self._client.bluetooth_gatt_write_descriptor(
|
|
self._address_as_int, handle, bytes(data)
|
|
)
|
|
|
|
@verify_connected
|
|
@api_error_as_bleak_error
|
|
async def start_notify(
|
|
self,
|
|
characteristic: BleakGATTCharacteristic,
|
|
callback: NotifyCallback,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Activate notifications/indications on a characteristic.
|
|
|
|
Callbacks must accept two inputs. The first will be a integer handle of the characteristic generating the
|
|
data and the second will be a ``bytearray`` containing the data sent from the connected server.
|
|
.. code-block:: python
|
|
def callback(sender: int, data: bytearray):
|
|
print(f"{sender}: {data}")
|
|
client.start_notify(char_uuid, callback)
|
|
Args:
|
|
char_specifier (BleakGATTCharacteristic, int, str or UUID): The characteristic to activate
|
|
notifications/indications on a characteristic, specified by either integer handle,
|
|
UUID or directly by the BleakGATTCharacteristic object representing it.
|
|
callback (function): The function to be called on notification.
|
|
"""
|
|
ble_handle = characteristic.handle
|
|
if ble_handle in self._notify_cancels:
|
|
raise BleakError(
|
|
"Notifications are already enabled on "
|
|
f"service:{characteristic.service_uuid} "
|
|
f"characteristic:{characteristic.uuid} "
|
|
f"handle:{ble_handle}"
|
|
)
|
|
if (
|
|
"notify" not in characteristic.properties
|
|
and "indicate" not in characteristic.properties
|
|
):
|
|
raise BleakError(
|
|
f"Characteristic {characteristic.uuid} does not have notify or indicate property set."
|
|
)
|
|
|
|
self._notify_cancels[
|
|
ble_handle
|
|
] = await self._client.bluetooth_gatt_start_notify(
|
|
self._address_as_int,
|
|
ble_handle,
|
|
lambda handle, data: callback(data),
|
|
)
|
|
|
|
if self._connection_version < MIN_BLUETOOTH_PROXY_VERSION_HAS_CACHE:
|
|
return
|
|
|
|
# For connection v3 we are responsible for enabling notifications
|
|
# on the cccd (characteristic client config descriptor) handle since
|
|
# the esp32 will not have resolved the characteristic descriptors to
|
|
# save memory since doing so can exhaust the memory and cause a soft
|
|
# reset
|
|
cccd_descriptor = characteristic.get_descriptor(CCCD_UUID)
|
|
if not cccd_descriptor:
|
|
raise BleakError(
|
|
f"Characteristic {characteristic.uuid} does not have a "
|
|
"characteristic client config descriptor."
|
|
)
|
|
|
|
_LOGGER.debug(
|
|
"%s: %s - %s: Writing to CCD descriptor %s for notifications with properties=%s",
|
|
self._source,
|
|
self._ble_device.name,
|
|
self._ble_device.address,
|
|
cccd_descriptor.handle,
|
|
characteristic.properties,
|
|
)
|
|
supports_notify = "notify" in characteristic.properties
|
|
await self._client.bluetooth_gatt_write_descriptor(
|
|
self._address_as_int,
|
|
cccd_descriptor.handle,
|
|
CCCD_NOTIFY_BYTES if supports_notify else CCCD_INDICATE_BYTES,
|
|
wait_for_response=False,
|
|
)
|
|
|
|
@api_error_as_bleak_error
|
|
async def stop_notify(
|
|
self,
|
|
char_specifier: BleakGATTCharacteristic | int | str | uuid.UUID,
|
|
) -> None:
|
|
"""Deactivate notification/indication on a specified characteristic.
|
|
|
|
Args:
|
|
char_specifier (BleakGATTCharacteristic, int, str or UUID): The characteristic to deactivate
|
|
notification/indication on, specified by either integer handle, UUID or
|
|
directly by the BleakGATTCharacteristic object representing it.
|
|
"""
|
|
characteristic = self._resolve_characteristic(char_specifier)
|
|
# Do not raise KeyError if notifications are not enabled on this characteristic
|
|
# to be consistent with the behavior of the BlueZ backend
|
|
if notify_cancel := self._notify_cancels.pop(characteristic.handle, None):
|
|
notify_stop, _ = notify_cancel
|
|
await notify_stop()
|
|
|
|
def __del__(self) -> None:
|
|
"""Destructor to make sure the connection state is unsubscribed."""
|
|
if self._cancel_connection_state:
|
|
_LOGGER.warning(
|
|
"%s: %s - %s: ESPHomeClient bleak client was not properly disconnected before destruction",
|
|
self._source,
|
|
self._ble_device.name,
|
|
self._ble_device.address,
|
|
)
|
|
if not self._hass.loop.is_closed():
|
|
self._hass.loop.call_soon_threadsafe(self._async_disconnected_cleanup)
|