core/homeassistant/components/inkbird/coordinator.py

137 lines
4.7 KiB
Python

"""The INKBIRD Bluetooth integration."""
from __future__ import annotations
from datetime import datetime, timedelta
import logging
from typing import Any
from inkbird_ble import INKBIRDBluetoothDeviceData, SensorUpdate
from homeassistant.components.bluetooth import (
BluetoothScanningMode,
BluetoothServiceInfo,
BluetoothServiceInfoBleak,
async_ble_device_from_address,
async_last_service_info,
)
from homeassistant.components.bluetooth.active_update_processor import (
ActiveBluetoothProcessorCoordinator,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers.event import async_track_time_interval
from .const import CONF_DEVICE_DATA, CONF_DEVICE_TYPE, DOMAIN
_LOGGER = logging.getLogger(__name__)
FALLBACK_POLL_INTERVAL = timedelta(seconds=180)
class INKBIRDActiveBluetoothProcessorCoordinator(
ActiveBluetoothProcessorCoordinator[SensorUpdate]
):
"""Coordinator for INKBIRD Bluetooth devices."""
_data: INKBIRDBluetoothDeviceData
def __init__(
self,
hass: HomeAssistant,
entry: ConfigEntry,
device_type: str | None,
device_data: dict[str, Any] | None,
) -> None:
"""Initialize the INKBIRD Bluetooth processor coordinator."""
self._entry = entry
self._device_type = device_type
self._device_data = device_data
address = entry.unique_id
assert address is not None
super().__init__(
hass=hass,
logger=_LOGGER,
address=address,
mode=BluetoothScanningMode.ACTIVE,
update_method=self._async_on_update,
needs_poll_method=self._async_needs_poll,
poll_method=self._async_poll_data,
connectable=False, # Polling only happens if active scanning is disabled
)
async def async_init(self) -> None:
"""Initialize the coordinator."""
self._data = INKBIRDBluetoothDeviceData(
self._device_type,
self._device_data,
self.async_set_updated_data,
self._async_device_data_changed,
)
if not self._data.uses_notify:
self._entry.async_on_unload(
async_track_time_interval(
self.hass, self._async_schedule_poll, FALLBACK_POLL_INTERVAL
)
)
return
if not (service_info := async_last_service_info(self.hass, self.address)):
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="no_advertisement",
translation_placeholders={"address": self.address},
)
await self._data.async_start(service_info, service_info.device)
self._entry.async_on_unload(self._data.async_stop)
async def _async_poll_data(
self, last_service_info: BluetoothServiceInfoBleak
) -> SensorUpdate:
"""Poll the device."""
return await self._data.async_poll(last_service_info.device)
@callback
def _async_needs_poll(
self, service_info: BluetoothServiceInfoBleak, last_poll: float | None
) -> bool:
return (
not self.hass.is_stopping
and self._data.poll_needed(service_info, last_poll)
and bool(
async_ble_device_from_address(
self.hass, service_info.device.address, connectable=True
)
)
)
@callback
def _async_device_data_changed(self, new_device_data: dict[str, Any]) -> None:
"""Handle device data changed."""
self.hass.config_entries.async_update_entry(
self._entry, data={**self._entry.data, CONF_DEVICE_DATA: new_device_data}
)
@callback
def _async_on_update(self, service_info: BluetoothServiceInfo) -> SensorUpdate:
"""Handle update callback from the passive BLE processor."""
update = self._data.update(service_info)
if (
self._entry.data.get(CONF_DEVICE_TYPE) is None
and self._data.device_type is not None
):
device_type_str = str(self._data.device_type)
self.hass.config_entries.async_update_entry(
self._entry,
data={**self._entry.data, CONF_DEVICE_TYPE: device_type_str},
)
return update
@callback
def _async_schedule_poll(self, _: datetime) -> None:
"""Schedule a poll of the device."""
if self._last_service_info and self._async_needs_poll(
self._last_service_info, self._last_poll
):
self._debounced_poll.async_schedule_call()