Listen to out of band coil updates in Nibe Heat Pumps (#78976)

Listen to callbacks
pull/79139/head
Joakim Plate 2022-09-27 08:24:58 +02:00 committed by GitHub
parent dc82ae4f69
commit fb32e745fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 75 additions and 17 deletions

View File

@ -2,7 +2,10 @@
from __future__ import annotations
from collections import defaultdict
from collections.abc import Callable, Iterable
from datetime import timedelta
from functools import cached_property
from typing import Any, Generic, TypeVar
from nibe.coil import Coil
from nibe.connection import Connection
@ -18,7 +21,7 @@ from homeassistant.const import (
EVENT_HOMEASSISTANT_STOP,
Platform,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.entity import DeviceInfo, async_generate_entity_id
@ -105,7 +108,52 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return unload_ok
class Coordinator(DataUpdateCoordinator[dict[int, Coil]]):
_DataTypeT = TypeVar("_DataTypeT")
_ContextTypeT = TypeVar("_ContextTypeT")
class ContextCoordinator(
Generic[_DataTypeT, _ContextTypeT], DataUpdateCoordinator[_DataTypeT]
):
"""Update coordinator with context adjustments."""
@cached_property
def context_callbacks(self) -> dict[_ContextTypeT, list[CALLBACK_TYPE]]:
"""Return a dict of all callbacks registered for a given context."""
callbacks: dict[_ContextTypeT, list[CALLBACK_TYPE]] = defaultdict(list)
for update_callback, context in list(self._listeners.values()):
assert isinstance(context, set)
for address in context:
callbacks[address].append(update_callback)
return callbacks
@callback
def async_update_context_listeners(self, contexts: Iterable[_ContextTypeT]) -> None:
"""Update all listeners given a set of contexts."""
update_callbacks: set[CALLBACK_TYPE] = set()
for context in contexts:
update_callbacks.update(self.context_callbacks.get(context, []))
for update_callback in update_callbacks:
update_callback()
@callback
def async_add_listener(
self, update_callback: CALLBACK_TYPE, context: Any = None
) -> Callable[[], None]:
"""Wrap standard function to prune cached callback database."""
release = super().async_add_listener(update_callback, context)
self.__dict__.pop("context_callbacks", None)
@callback
def release_update():
release()
self.__dict__.pop("context_callbacks", None)
return release_update
class Coordinator(ContextCoordinator[dict[int, Coil], int]):
"""Update coordinator for nibe heat pumps."""
config_entry: ConfigEntry
@ -122,9 +170,18 @@ class Coordinator(DataUpdateCoordinator[dict[int, Coil]]):
)
self.data = {}
self.seed: dict[int, Coil] = {}
self.connection = connection
self.heatpump = heatpump
heatpump.subscribe(heatpump.COIL_UPDATE_EVENT, self._on_coil_update)
def _on_coil_update(self, coil: Coil):
"""Handle callback on coil updates."""
self.data[coil.address] = coil
self.seed[coil.address] = coil
self.async_update_context_listeners([coil.address])
@property
def coils(self) -> list[Coil]:
"""Return the full coil database."""
@ -157,9 +214,9 @@ class Coordinator(DataUpdateCoordinator[dict[int, Coil]]):
coil.value = value
coil = await self.connection.write_coil(coil)
if self.data:
self.data[coil.address] = coil
self.async_update_listeners()
self.data[coil.address] = coil
self.async_update_context_listeners([coil.address])
async def _async_update_data(self) -> dict[int, Coil]:
@retry(
@ -169,25 +226,26 @@ class Coordinator(DataUpdateCoordinator[dict[int, Coil]]):
async def read_coil(coil: Coil):
return await self.connection.read_coil(coil)
callbacks: dict[int, list[CALLBACK_TYPE]] = defaultdict(list)
for update_callback, context in list(self._listeners.values()):
assert isinstance(context, set)
for address in context:
callbacks[address].append(update_callback)
result: dict[int, Coil] = {}
for address, callback_list in callbacks.items():
for address in self.context_callbacks.keys():
if seed := self.seed.pop(address, None):
self.logger.debug("Skipping seeded coil: %d", address)
result[address] = seed
continue
try:
coil = self.heatpump.get_coil_by_address(address)
self.data[coil.address] = result[coil.address] = await read_coil(coil)
except (CoilReadException, RetryError) as exception:
raise UpdateFailed(f"Failed to update: {exception}") from exception
except CoilNotFoundException as exception:
self.logger.debug("Skipping missing coil: %s", exception)
continue
for update_callback in callback_list:
update_callback()
try:
result[coil.address] = await read_coil(coil)
except (CoilReadException, RetryError) as exception:
raise UpdateFailed(f"Failed to update: {exception}") from exception
self.seed.pop(coil.address, None)
return result