Move rainmachine coordinator to separate module (#117983)

* Move rainmachine coordinator to separate module

* Coverage
pull/117986/head
epenet 2024-05-23 16:41:50 +02:00 committed by GitHub
parent 162f5ccbde
commit 6b2ddcca5e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 103 additions and 92 deletions

View File

@ -1094,6 +1094,7 @@ omit =
homeassistant/components/rainmachine/__init__.py
homeassistant/components/rainmachine/binary_sensor.py
homeassistant/components/rainmachine/button.py
homeassistant/components/rainmachine/coordinator.py
homeassistant/components/rainmachine/select.py
homeassistant/components/rainmachine/sensor.py
homeassistant/components/rainmachine/switch.py

View File

@ -53,8 +53,8 @@ from .const import (
DOMAIN,
LOGGER,
)
from .coordinator import RainMachineDataUpdateCoordinator
from .model import RainMachineEntityDescription
from .util import RainMachineDataUpdateCoordinator
DEFAULT_SSL = True

View File

@ -0,0 +1,100 @@
"""Coordinator for the RainMachine integration."""
from __future__ import annotations
from collections.abc import Awaitable, Callable
from datetime import timedelta
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import LOGGER
SIGNAL_REBOOT_COMPLETED = "rainmachine_reboot_completed_{0}"
SIGNAL_REBOOT_REQUESTED = "rainmachine_reboot_requested_{0}"
class RainMachineDataUpdateCoordinator(DataUpdateCoordinator[dict]):
"""Define an extended DataUpdateCoordinator."""
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
*,
entry: ConfigEntry,
name: str,
api_category: str,
update_interval: timedelta,
update_method: Callable[..., Awaitable],
) -> None:
"""Initialize."""
super().__init__(
hass,
LOGGER,
name=name,
update_interval=update_interval,
update_method=update_method,
always_update=False,
)
self._rebooting = False
self._signal_handler_unsubs: list[Callable[..., None]] = []
self.config_entry = entry
self.signal_reboot_completed = SIGNAL_REBOOT_COMPLETED.format(
self.config_entry.entry_id
)
self.signal_reboot_requested = SIGNAL_REBOOT_REQUESTED.format(
self.config_entry.entry_id
)
@callback
def async_initialize(self) -> None:
"""Initialize the coordinator."""
@callback
def async_reboot_completed() -> None:
"""Respond to a reboot completed notification."""
LOGGER.debug("%s responding to reboot complete", self.name)
self._rebooting = False
self.last_update_success = True
self.async_update_listeners()
@callback
def async_reboot_requested() -> None:
"""Respond to a reboot request."""
LOGGER.debug("%s responding to reboot request", self.name)
self._rebooting = True
self.last_update_success = False
self.async_update_listeners()
for signal, func in (
(self.signal_reboot_completed, async_reboot_completed),
(self.signal_reboot_requested, async_reboot_requested),
):
self._signal_handler_unsubs.append(
async_dispatcher_connect(self.hass, signal, func)
)
@callback
def async_check_reboot_complete() -> None:
"""Check whether an active reboot has been completed."""
if self._rebooting and self.last_update_success:
LOGGER.debug("%s discovered reboot complete", self.name)
async_dispatcher_send(self.hass, self.signal_reboot_completed)
self.async_add_listener(async_check_reboot_complete)
@callback
def async_teardown() -> None:
"""Tear the coordinator down appropriately."""
for unsub in self._signal_handler_unsubs:
unsub()
self.config_entry.async_on_unload(async_teardown)

View File

@ -2,26 +2,17 @@
from __future__ import annotations
from collections.abc import Awaitable, Callable, Iterable
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import timedelta
from enum import StrEnum
from typing import Any
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import LOGGER
SIGNAL_REBOOT_COMPLETED = "rainmachine_reboot_completed_{0}"
SIGNAL_REBOOT_REQUESTED = "rainmachine_reboot_requested_{0}"
class RunStates(StrEnum):
"""Define an enum for program/zone run states."""
@ -84,84 +75,3 @@ def key_exists(data: dict[str, Any], search_key: str) -> bool:
if isinstance(value, dict):
return key_exists(value, search_key)
return False
class RainMachineDataUpdateCoordinator(DataUpdateCoordinator[dict]): # pylint: disable=hass-enforce-coordinator-module
"""Define an extended DataUpdateCoordinator."""
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
*,
entry: ConfigEntry,
name: str,
api_category: str,
update_interval: timedelta,
update_method: Callable[..., Awaitable],
) -> None:
"""Initialize."""
super().__init__(
hass,
LOGGER,
name=name,
update_interval=update_interval,
update_method=update_method,
always_update=False,
)
self._rebooting = False
self._signal_handler_unsubs: list[Callable[..., None]] = []
self.config_entry = entry
self.signal_reboot_completed = SIGNAL_REBOOT_COMPLETED.format(
self.config_entry.entry_id
)
self.signal_reboot_requested = SIGNAL_REBOOT_REQUESTED.format(
self.config_entry.entry_id
)
@callback
def async_initialize(self) -> None:
"""Initialize the coordinator."""
@callback
def async_reboot_completed() -> None:
"""Respond to a reboot completed notification."""
LOGGER.debug("%s responding to reboot complete", self.name)
self._rebooting = False
self.last_update_success = True
self.async_update_listeners()
@callback
def async_reboot_requested() -> None:
"""Respond to a reboot request."""
LOGGER.debug("%s responding to reboot request", self.name)
self._rebooting = True
self.last_update_success = False
self.async_update_listeners()
for signal, func in (
(self.signal_reboot_completed, async_reboot_completed),
(self.signal_reboot_requested, async_reboot_requested),
):
self._signal_handler_unsubs.append(
async_dispatcher_connect(self.hass, signal, func)
)
@callback
def async_check_reboot_complete() -> None:
"""Check whether an active reboot has been completed."""
if self._rebooting and self.last_update_success:
LOGGER.debug("%s discovered reboot complete", self.name)
async_dispatcher_send(self.hass, self.signal_reboot_completed)
self.async_add_listener(async_check_reboot_complete)
@callback
def async_teardown() -> None:
"""Tear the coordinator down appropriately."""
for unsub in self._signal_handler_unsubs:
unsub()
self.config_entry.async_on_unload(async_teardown)