Refactor zwave_js event handling (#77732)
* Refactor zwave_js event handling * Clean uppull/77773/head
parent
5d7e9a6695
commit
7ca7a28db9
|
@ -3,7 +3,7 @@ from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from collections.abc import Callable
|
from collections.abc import Coroutine
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from async_timeout import timeout
|
from async_timeout import timeout
|
||||||
|
@ -79,7 +79,6 @@ from .const import (
|
||||||
CONF_USB_PATH,
|
CONF_USB_PATH,
|
||||||
CONF_USE_ADDON,
|
CONF_USE_ADDON,
|
||||||
DATA_CLIENT,
|
DATA_CLIENT,
|
||||||
DATA_PLATFORM_SETUP,
|
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
EVENT_DEVICE_ADDED_TO_REGISTRY,
|
EVENT_DEVICE_ADDED_TO_REGISTRY,
|
||||||
LOGGER,
|
LOGGER,
|
||||||
|
@ -104,7 +103,8 @@ from .services import ZWaveServices
|
||||||
|
|
||||||
CONNECT_TIMEOUT = 10
|
CONNECT_TIMEOUT = 10
|
||||||
DATA_CLIENT_LISTEN_TASK = "client_listen_task"
|
DATA_CLIENT_LISTEN_TASK = "client_listen_task"
|
||||||
DATA_START_PLATFORM_TASK = "start_platform_task"
|
DATA_DRIVER_EVENTS = "driver_events"
|
||||||
|
DATA_START_CLIENT_TASK = "start_client_task"
|
||||||
|
|
||||||
|
|
||||||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||||
|
@ -118,51 +118,6 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
@callback
|
|
||||||
def register_node_in_dev_reg(
|
|
||||||
hass: HomeAssistant,
|
|
||||||
entry: ConfigEntry,
|
|
||||||
dev_reg: device_registry.DeviceRegistry,
|
|
||||||
driver: Driver,
|
|
||||||
node: ZwaveNode,
|
|
||||||
remove_device_func: Callable[[device_registry.DeviceEntry], None],
|
|
||||||
) -> device_registry.DeviceEntry:
|
|
||||||
"""Register node in dev reg."""
|
|
||||||
device_id = get_device_id(driver, node)
|
|
||||||
device_id_ext = get_device_id_ext(driver, node)
|
|
||||||
device = dev_reg.async_get_device({device_id})
|
|
||||||
|
|
||||||
# Replace the device if it can be determined that this node is not the
|
|
||||||
# same product as it was previously.
|
|
||||||
if (
|
|
||||||
device_id_ext
|
|
||||||
and device
|
|
||||||
and len(device.identifiers) == 2
|
|
||||||
and device_id_ext not in device.identifiers
|
|
||||||
):
|
|
||||||
remove_device_func(device)
|
|
||||||
device = None
|
|
||||||
|
|
||||||
if device_id_ext:
|
|
||||||
ids = {device_id, device_id_ext}
|
|
||||||
else:
|
|
||||||
ids = {device_id}
|
|
||||||
|
|
||||||
device = dev_reg.async_get_or_create(
|
|
||||||
config_entry_id=entry.entry_id,
|
|
||||||
identifiers=ids,
|
|
||||||
sw_version=node.firmware_version,
|
|
||||||
name=node.name or node.device_config.description or f"Node {node.node_id}",
|
|
||||||
model=node.device_config.label,
|
|
||||||
manufacturer=node.device_config.manufacturer,
|
|
||||||
suggested_area=node.location if node.location else UNDEFINED,
|
|
||||||
)
|
|
||||||
|
|
||||||
async_dispatcher_send(hass, EVENT_DEVICE_ADDED_TO_REGISTRY, device)
|
|
||||||
|
|
||||||
return device
|
|
||||||
|
|
||||||
|
|
||||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
"""Set up Z-Wave JS from a config entry."""
|
"""Set up Z-Wave JS from a config entry."""
|
||||||
if use_addon := entry.data.get(CONF_USE_ADDON):
|
if use_addon := entry.data.get(CONF_USE_ADDON):
|
||||||
|
@ -191,37 +146,40 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
# Set up websocket API
|
# Set up websocket API
|
||||||
async_register_api(hass)
|
async_register_api(hass)
|
||||||
|
|
||||||
platform_task = hass.async_create_task(start_platforms(hass, entry, client))
|
# Create a task to allow the config entry to be unloaded before the driver is ready.
|
||||||
|
# Unloading the config entry is needed if the client listen task errors.
|
||||||
|
start_client_task = hass.async_create_task(start_client(hass, entry, client))
|
||||||
hass.data[DOMAIN].setdefault(entry.entry_id, {})[
|
hass.data[DOMAIN].setdefault(entry.entry_id, {})[
|
||||||
DATA_START_PLATFORM_TASK
|
DATA_START_CLIENT_TASK
|
||||||
] = platform_task
|
] = start_client_task
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
async def start_platforms(
|
async def start_client(
|
||||||
hass: HomeAssistant, entry: ConfigEntry, client: ZwaveClient
|
hass: HomeAssistant, entry: ConfigEntry, client: ZwaveClient
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Start platforms and perform discovery."""
|
"""Start listening with the client."""
|
||||||
entry_hass_data: dict = hass.data[DOMAIN].setdefault(entry.entry_id, {})
|
entry_hass_data: dict = hass.data[DOMAIN].setdefault(entry.entry_id, {})
|
||||||
entry_hass_data[DATA_CLIENT] = client
|
entry_hass_data[DATA_CLIENT] = client
|
||||||
entry_hass_data[DATA_PLATFORM_SETUP] = {}
|
driver_events = entry_hass_data[DATA_DRIVER_EVENTS] = DriverEvents(hass, entry)
|
||||||
driver_ready = asyncio.Event()
|
|
||||||
|
|
||||||
async def handle_ha_shutdown(event: Event) -> None:
|
async def handle_ha_shutdown(event: Event) -> None:
|
||||||
"""Handle HA shutdown."""
|
"""Handle HA shutdown."""
|
||||||
await disconnect_client(hass, entry)
|
await disconnect_client(hass, entry)
|
||||||
|
|
||||||
listen_task = asyncio.create_task(client_listen(hass, entry, client, driver_ready))
|
listen_task = asyncio.create_task(
|
||||||
|
client_listen(hass, entry, client, driver_events.ready)
|
||||||
|
)
|
||||||
entry_hass_data[DATA_CLIENT_LISTEN_TASK] = listen_task
|
entry_hass_data[DATA_CLIENT_LISTEN_TASK] = listen_task
|
||||||
entry.async_on_unload(
|
entry.async_on_unload(
|
||||||
hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, handle_ha_shutdown)
|
hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, handle_ha_shutdown)
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await driver_ready.wait()
|
await driver_events.ready.wait()
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
LOGGER.debug("Cancelling start platforms")
|
LOGGER.debug("Cancelling start client")
|
||||||
return
|
return
|
||||||
|
|
||||||
LOGGER.info("Connection to Zwave JS Server initialized")
|
LOGGER.info("Connection to Zwave JS Server initialized")
|
||||||
|
@ -229,37 +187,289 @@ async def start_platforms(
|
||||||
if client.driver is None:
|
if client.driver is None:
|
||||||
raise RuntimeError("Driver not ready.")
|
raise RuntimeError("Driver not ready.")
|
||||||
|
|
||||||
await setup_driver(hass, entry, client, client.driver)
|
await driver_events.setup(client.driver)
|
||||||
|
|
||||||
|
|
||||||
async def setup_driver( # noqa: C901
|
class DriverEvents:
|
||||||
hass: HomeAssistant, entry: ConfigEntry, client: ZwaveClient, driver: Driver
|
"""Represent driver events."""
|
||||||
) -> None:
|
|
||||||
"""Set up devices using the ready driver."""
|
|
||||||
dev_reg = device_registry.async_get(hass)
|
|
||||||
ent_reg = entity_registry.async_get(hass)
|
|
||||||
entry_hass_data: dict = hass.data[DOMAIN].setdefault(entry.entry_id, {})
|
|
||||||
platform_setup_tasks = entry_hass_data[DATA_PLATFORM_SETUP]
|
|
||||||
registered_unique_ids: dict[str, dict[str, set[str]]] = defaultdict(dict)
|
|
||||||
discovered_value_ids: dict[str, set[str]] = defaultdict(set)
|
|
||||||
|
|
||||||
async def async_setup_platform(platform: Platform) -> None:
|
driver: Driver
|
||||||
"""Set up platform if needed."""
|
|
||||||
if platform not in platform_setup_tasks:
|
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
|
||||||
platform_setup_tasks[platform] = hass.async_create_task(
|
"""Set up the driver events instance."""
|
||||||
hass.config_entries.async_forward_entry_setup(entry, platform)
|
self.config_entry = entry
|
||||||
|
self.dev_reg = device_registry.async_get(hass)
|
||||||
|
self.hass = hass
|
||||||
|
self.platform_setup_tasks: dict[str, asyncio.Task] = {}
|
||||||
|
self.ready = asyncio.Event()
|
||||||
|
# Make sure to not pass self to ControllerEvents until all attributes are set.
|
||||||
|
self.controller_events = ControllerEvents(hass, self)
|
||||||
|
|
||||||
|
async def setup(self, driver: Driver) -> None:
|
||||||
|
"""Set up devices using the ready driver."""
|
||||||
|
self.driver = driver
|
||||||
|
|
||||||
|
# If opt in preference hasn't been specified yet, we do nothing, otherwise
|
||||||
|
# we apply the preference
|
||||||
|
if opted_in := self.config_entry.data.get(CONF_DATA_COLLECTION_OPTED_IN):
|
||||||
|
await async_enable_statistics(driver)
|
||||||
|
elif opted_in is False:
|
||||||
|
await driver.async_disable_statistics()
|
||||||
|
|
||||||
|
# Check for nodes that no longer exist and remove them
|
||||||
|
stored_devices = device_registry.async_entries_for_config_entry(
|
||||||
|
self.dev_reg, self.config_entry.entry_id
|
||||||
|
)
|
||||||
|
known_devices = [
|
||||||
|
self.dev_reg.async_get_device({get_device_id(driver, node)})
|
||||||
|
for node in driver.controller.nodes.values()
|
||||||
|
]
|
||||||
|
|
||||||
|
# Devices that are in the device registry that are not known by the controller can be removed
|
||||||
|
for device in stored_devices:
|
||||||
|
if device not in known_devices:
|
||||||
|
self.dev_reg.async_remove_device(device.id)
|
||||||
|
|
||||||
|
# run discovery on all ready nodes
|
||||||
|
await asyncio.gather(
|
||||||
|
*(
|
||||||
|
self.controller_events.async_on_node_added(node)
|
||||||
|
for node in driver.controller.nodes.values()
|
||||||
)
|
)
|
||||||
await platform_setup_tasks[platform]
|
)
|
||||||
|
|
||||||
|
# listen for new nodes being added to the mesh
|
||||||
|
self.config_entry.async_on_unload(
|
||||||
|
driver.controller.on(
|
||||||
|
"node added",
|
||||||
|
lambda event: self.hass.async_create_task(
|
||||||
|
self.controller_events.async_on_node_added(event["node"])
|
||||||
|
),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# listen for nodes being removed from the mesh
|
||||||
|
# NOTE: This will not remove nodes that were removed when HA was not running
|
||||||
|
self.config_entry.async_on_unload(
|
||||||
|
driver.controller.on(
|
||||||
|
"node removed", self.controller_events.async_on_node_removed
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def async_setup_platform(self, platform: Platform) -> None:
|
||||||
|
"""Set up platform if needed."""
|
||||||
|
if platform not in self.platform_setup_tasks:
|
||||||
|
self.platform_setup_tasks[platform] = self.hass.async_create_task(
|
||||||
|
self.hass.config_entries.async_forward_entry_setup(
|
||||||
|
self.config_entry, platform
|
||||||
|
)
|
||||||
|
)
|
||||||
|
await self.platform_setup_tasks[platform]
|
||||||
|
|
||||||
|
|
||||||
|
class ControllerEvents:
|
||||||
|
"""Represent controller events.
|
||||||
|
|
||||||
|
Handle the following events:
|
||||||
|
- node added
|
||||||
|
- node removed
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, hass: HomeAssistant, driver_events: DriverEvents) -> None:
|
||||||
|
"""Set up the controller events instance."""
|
||||||
|
self.hass = hass
|
||||||
|
self.config_entry = driver_events.config_entry
|
||||||
|
self.discovered_value_ids: dict[str, set[str]] = defaultdict(set)
|
||||||
|
self.driver_events = driver_events
|
||||||
|
self.dev_reg = driver_events.dev_reg
|
||||||
|
self.registered_unique_ids: dict[str, dict[str, set[str]]] = defaultdict(dict)
|
||||||
|
self.node_events = NodeEvents(hass, self)
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def remove_device(device: device_registry.DeviceEntry) -> None:
|
def remove_device(self, device: device_registry.DeviceEntry) -> None:
|
||||||
"""Remove device from registry."""
|
"""Remove device from registry."""
|
||||||
# note: removal of entity registry entry is handled by core
|
# note: removal of entity registry entry is handled by core
|
||||||
dev_reg.async_remove_device(device.id)
|
self.dev_reg.async_remove_device(device.id)
|
||||||
registered_unique_ids.pop(device.id, None)
|
self.registered_unique_ids.pop(device.id, None)
|
||||||
discovered_value_ids.pop(device.id, None)
|
self.discovered_value_ids.pop(device.id, None)
|
||||||
|
|
||||||
|
async def async_on_node_added(self, node: ZwaveNode) -> None:
|
||||||
|
"""Handle node added event."""
|
||||||
|
# No need for a ping button or node status sensor for controller nodes
|
||||||
|
if not node.is_controller_node:
|
||||||
|
# Create a node status sensor for each device
|
||||||
|
await self.driver_events.async_setup_platform(Platform.SENSOR)
|
||||||
|
async_dispatcher_send(
|
||||||
|
self.hass,
|
||||||
|
f"{DOMAIN}_{self.config_entry.entry_id}_add_node_status_sensor",
|
||||||
|
node,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a ping button for each device
|
||||||
|
await self.driver_events.async_setup_platform(Platform.BUTTON)
|
||||||
|
async_dispatcher_send(
|
||||||
|
self.hass,
|
||||||
|
f"{DOMAIN}_{self.config_entry.entry_id}_add_ping_button_entity",
|
||||||
|
node,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a firmware update entity for each device
|
||||||
|
await self.driver_events.async_setup_platform(Platform.UPDATE)
|
||||||
|
async_dispatcher_send(
|
||||||
|
self.hass,
|
||||||
|
f"{DOMAIN}_{self.config_entry.entry_id}_add_firmware_update_entity",
|
||||||
|
node,
|
||||||
|
)
|
||||||
|
|
||||||
|
# we only want to run discovery when the node has reached ready state,
|
||||||
|
# otherwise we'll have all kinds of missing info issues.
|
||||||
|
if node.ready:
|
||||||
|
await self.node_events.async_on_node_ready(node)
|
||||||
|
return
|
||||||
|
# if node is not yet ready, register one-time callback for ready state
|
||||||
|
LOGGER.debug("Node added: %s - waiting for it to become ready", node.node_id)
|
||||||
|
node.once(
|
||||||
|
"ready",
|
||||||
|
lambda event: self.hass.async_create_task(
|
||||||
|
self.node_events.async_on_node_ready(event["node"])
|
||||||
|
),
|
||||||
|
)
|
||||||
|
# we do submit the node to device registry so user has
|
||||||
|
# some visual feedback that something is (in the process of) being added
|
||||||
|
self.register_node_in_dev_reg(node)
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def async_on_node_removed(self, event: dict) -> None:
|
||||||
|
"""Handle node removed event."""
|
||||||
|
node: ZwaveNode = event["node"]
|
||||||
|
replaced: bool = event.get("replaced", False)
|
||||||
|
# grab device in device registry attached to this node
|
||||||
|
dev_id = get_device_id(self.driver_events.driver, node)
|
||||||
|
device = self.dev_reg.async_get_device({dev_id})
|
||||||
|
# We assert because we know the device exists
|
||||||
|
assert device
|
||||||
|
if replaced:
|
||||||
|
self.discovered_value_ids.pop(device.id, None)
|
||||||
|
|
||||||
|
async_dispatcher_send(
|
||||||
|
self.hass,
|
||||||
|
f"{DOMAIN}_{get_valueless_base_unique_id(self.driver_events.driver, node)}_remove_entity",
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.remove_device(device)
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def register_node_in_dev_reg(self, node: ZwaveNode) -> device_registry.DeviceEntry:
|
||||||
|
"""Register node in dev reg."""
|
||||||
|
driver = self.driver_events.driver
|
||||||
|
device_id = get_device_id(driver, node)
|
||||||
|
device_id_ext = get_device_id_ext(driver, node)
|
||||||
|
device = self.dev_reg.async_get_device({device_id})
|
||||||
|
|
||||||
|
# Replace the device if it can be determined that this node is not the
|
||||||
|
# same product as it was previously.
|
||||||
|
if (
|
||||||
|
device_id_ext
|
||||||
|
and device
|
||||||
|
and len(device.identifiers) == 2
|
||||||
|
and device_id_ext not in device.identifiers
|
||||||
|
):
|
||||||
|
self.remove_device(device)
|
||||||
|
device = None
|
||||||
|
|
||||||
|
if device_id_ext:
|
||||||
|
ids = {device_id, device_id_ext}
|
||||||
|
else:
|
||||||
|
ids = {device_id}
|
||||||
|
|
||||||
|
device = self.dev_reg.async_get_or_create(
|
||||||
|
config_entry_id=self.config_entry.entry_id,
|
||||||
|
identifiers=ids,
|
||||||
|
sw_version=node.firmware_version,
|
||||||
|
name=node.name or node.device_config.description or f"Node {node.node_id}",
|
||||||
|
model=node.device_config.label,
|
||||||
|
manufacturer=node.device_config.manufacturer,
|
||||||
|
suggested_area=node.location if node.location else UNDEFINED,
|
||||||
|
)
|
||||||
|
|
||||||
|
async_dispatcher_send(self.hass, EVENT_DEVICE_ADDED_TO_REGISTRY, device)
|
||||||
|
|
||||||
|
return device
|
||||||
|
|
||||||
|
|
||||||
|
class NodeEvents:
|
||||||
|
"""Represent node events.
|
||||||
|
|
||||||
|
Handle the following events:
|
||||||
|
- ready
|
||||||
|
- value added
|
||||||
|
- value updated
|
||||||
|
- metadata updated
|
||||||
|
- value notification
|
||||||
|
- notification
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self, hass: HomeAssistant, controller_events: ControllerEvents
|
||||||
|
) -> None:
|
||||||
|
"""Set up the node events instance."""
|
||||||
|
self.config_entry = controller_events.config_entry
|
||||||
|
self.controller_events = controller_events
|
||||||
|
self.dev_reg = controller_events.dev_reg
|
||||||
|
self.ent_reg = entity_registry.async_get(hass)
|
||||||
|
self.hass = hass
|
||||||
|
|
||||||
|
async def async_on_node_ready(self, node: ZwaveNode) -> None:
|
||||||
|
"""Handle node ready event."""
|
||||||
|
LOGGER.debug("Processing node %s", node)
|
||||||
|
# register (or update) node in device registry
|
||||||
|
device = self.controller_events.register_node_in_dev_reg(node)
|
||||||
|
# We only want to create the defaultdict once, even on reinterviews
|
||||||
|
if device.id not in self.controller_events.registered_unique_ids:
|
||||||
|
self.controller_events.registered_unique_ids[device.id] = defaultdict(set)
|
||||||
|
|
||||||
|
value_updates_disc_info: dict[str, ZwaveDiscoveryInfo] = {}
|
||||||
|
|
||||||
|
# run discovery on all node values and create/update entities
|
||||||
|
await asyncio.gather(
|
||||||
|
*(
|
||||||
|
self.async_handle_discovery_info(
|
||||||
|
device, disc_info, value_updates_disc_info
|
||||||
|
)
|
||||||
|
for disc_info in async_discover_node_values(
|
||||||
|
node, device, self.controller_events.discovered_value_ids
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# add listeners to handle new values that get added later
|
||||||
|
for event in ("value added", "value updated", "metadata updated"):
|
||||||
|
self.config_entry.async_on_unload(
|
||||||
|
node.on(
|
||||||
|
event,
|
||||||
|
lambda event: self.hass.async_create_task(
|
||||||
|
self.async_on_value_added(
|
||||||
|
value_updates_disc_info, event["value"]
|
||||||
|
)
|
||||||
|
),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# add listener for stateless node value notification events
|
||||||
|
self.config_entry.async_on_unload(
|
||||||
|
node.on(
|
||||||
|
"value notification",
|
||||||
|
lambda event: self.async_on_value_notification(
|
||||||
|
event["value_notification"]
|
||||||
|
),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# add listener for stateless node notification events
|
||||||
|
self.config_entry.async_on_unload(
|
||||||
|
node.on("notification", self.async_on_notification)
|
||||||
|
)
|
||||||
|
|
||||||
async def async_handle_discovery_info(
|
async def async_handle_discovery_info(
|
||||||
|
self,
|
||||||
device: device_registry.DeviceEntry,
|
device: device_registry.DeviceEntry,
|
||||||
disc_info: ZwaveDiscoveryInfo,
|
disc_info: ZwaveDiscoveryInfo,
|
||||||
value_updates_disc_info: dict[str, ZwaveDiscoveryInfo],
|
value_updates_disc_info: dict[str, ZwaveDiscoveryInfo],
|
||||||
|
@ -269,20 +479,22 @@ async def setup_driver( # noqa: C901
|
||||||
# the value_id format. Some time in the future, this call (as well as the
|
# the value_id format. Some time in the future, this call (as well as the
|
||||||
# helper functions) can be removed.
|
# helper functions) can be removed.
|
||||||
async_migrate_discovered_value(
|
async_migrate_discovered_value(
|
||||||
hass,
|
self.hass,
|
||||||
ent_reg,
|
self.ent_reg,
|
||||||
registered_unique_ids[device.id][disc_info.platform],
|
self.controller_events.registered_unique_ids[device.id][disc_info.platform],
|
||||||
device,
|
device,
|
||||||
driver,
|
self.controller_events.driver_events.driver,
|
||||||
disc_info,
|
disc_info,
|
||||||
)
|
)
|
||||||
|
|
||||||
platform = disc_info.platform
|
platform = disc_info.platform
|
||||||
await async_setup_platform(platform)
|
await self.controller_events.driver_events.async_setup_platform(platform)
|
||||||
|
|
||||||
LOGGER.debug("Discovered entity: %s", disc_info)
|
LOGGER.debug("Discovered entity: %s", disc_info)
|
||||||
async_dispatcher_send(
|
async_dispatcher_send(
|
||||||
hass, f"{DOMAIN}_{entry.entry_id}_add_{platform}", disc_info
|
self.hass,
|
||||||
|
f"{DOMAIN}_{self.config_entry.entry_id}_add_{platform}",
|
||||||
|
disc_info,
|
||||||
)
|
)
|
||||||
|
|
||||||
# If we don't need to watch for updates return early
|
# If we don't need to watch for updates return early
|
||||||
|
@ -294,151 +506,57 @@ async def setup_driver( # noqa: C901
|
||||||
if len(value_updates_disc_info) != 1:
|
if len(value_updates_disc_info) != 1:
|
||||||
return
|
return
|
||||||
# add listener for value updated events
|
# add listener for value updated events
|
||||||
entry.async_on_unload(
|
self.config_entry.async_on_unload(
|
||||||
disc_info.node.on(
|
disc_info.node.on(
|
||||||
"value updated",
|
"value updated",
|
||||||
lambda event: async_on_value_updated_fire_event(
|
lambda event: self.async_on_value_updated_fire_event(
|
||||||
value_updates_disc_info, event["value"]
|
value_updates_disc_info, event["value"]
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
async def async_on_node_ready(node: ZwaveNode) -> None:
|
|
||||||
"""Handle node ready event."""
|
|
||||||
LOGGER.debug("Processing node %s", node)
|
|
||||||
# register (or update) node in device registry
|
|
||||||
device = register_node_in_dev_reg(
|
|
||||||
hass, entry, dev_reg, driver, node, remove_device
|
|
||||||
)
|
|
||||||
# We only want to create the defaultdict once, even on reinterviews
|
|
||||||
if device.id not in registered_unique_ids:
|
|
||||||
registered_unique_ids[device.id] = defaultdict(set)
|
|
||||||
|
|
||||||
value_updates_disc_info: dict[str, ZwaveDiscoveryInfo] = {}
|
|
||||||
|
|
||||||
# run discovery on all node values and create/update entities
|
|
||||||
await asyncio.gather(
|
|
||||||
*(
|
|
||||||
async_handle_discovery_info(device, disc_info, value_updates_disc_info)
|
|
||||||
for disc_info in async_discover_node_values(
|
|
||||||
node, device, discovered_value_ids
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# add listeners to handle new values that get added later
|
|
||||||
for event in ("value added", "value updated", "metadata updated"):
|
|
||||||
entry.async_on_unload(
|
|
||||||
node.on(
|
|
||||||
event,
|
|
||||||
lambda event: hass.async_create_task(
|
|
||||||
async_on_value_added(value_updates_disc_info, event["value"])
|
|
||||||
),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# add listener for stateless node value notification events
|
|
||||||
entry.async_on_unload(
|
|
||||||
node.on(
|
|
||||||
"value notification",
|
|
||||||
lambda event: async_on_value_notification(event["value_notification"]),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
# add listener for stateless node notification events
|
|
||||||
entry.async_on_unload(node.on("notification", async_on_notification))
|
|
||||||
|
|
||||||
async def async_on_node_added(node: ZwaveNode) -> None:
|
|
||||||
"""Handle node added event."""
|
|
||||||
# No need for a ping button or node status sensor for controller nodes
|
|
||||||
if not node.is_controller_node:
|
|
||||||
# Create a node status sensor for each device
|
|
||||||
await async_setup_platform(Platform.SENSOR)
|
|
||||||
async_dispatcher_send(
|
|
||||||
hass, f"{DOMAIN}_{entry.entry_id}_add_node_status_sensor", node
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create a ping button for each device
|
|
||||||
await async_setup_platform(Platform.BUTTON)
|
|
||||||
async_dispatcher_send(
|
|
||||||
hass, f"{DOMAIN}_{entry.entry_id}_add_ping_button_entity", node
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create a firmware update entity for each device
|
|
||||||
await async_setup_platform(Platform.UPDATE)
|
|
||||||
async_dispatcher_send(
|
|
||||||
hass, f"{DOMAIN}_{entry.entry_id}_add_firmware_update_entity", node
|
|
||||||
)
|
|
||||||
|
|
||||||
# we only want to run discovery when the node has reached ready state,
|
|
||||||
# otherwise we'll have all kinds of missing info issues.
|
|
||||||
if node.ready:
|
|
||||||
await async_on_node_ready(node)
|
|
||||||
return
|
|
||||||
# if node is not yet ready, register one-time callback for ready state
|
|
||||||
LOGGER.debug("Node added: %s - waiting for it to become ready", node.node_id)
|
|
||||||
node.once(
|
|
||||||
"ready",
|
|
||||||
lambda event: hass.async_create_task(async_on_node_ready(event["node"])),
|
|
||||||
)
|
|
||||||
# we do submit the node to device registry so user has
|
|
||||||
# some visual feedback that something is (in the process of) being added
|
|
||||||
register_node_in_dev_reg(hass, entry, dev_reg, driver, node, remove_device)
|
|
||||||
|
|
||||||
async def async_on_value_added(
|
async def async_on_value_added(
|
||||||
value_updates_disc_info: dict[str, ZwaveDiscoveryInfo], value: Value
|
self, value_updates_disc_info: dict[str, ZwaveDiscoveryInfo], value: Value
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Fire value updated event."""
|
"""Fire value updated event."""
|
||||||
# If node isn't ready or a device for this node doesn't already exist, we can
|
# If node isn't ready or a device for this node doesn't already exist, we can
|
||||||
# let the node ready event handler perform discovery. If a value has already
|
# let the node ready event handler perform discovery. If a value has already
|
||||||
# been processed, we don't need to do it again
|
# been processed, we don't need to do it again
|
||||||
device_id = get_device_id(driver, value.node)
|
device_id = get_device_id(
|
||||||
|
self.controller_events.driver_events.driver, value.node
|
||||||
|
)
|
||||||
if (
|
if (
|
||||||
not value.node.ready
|
not value.node.ready
|
||||||
or not (device := dev_reg.async_get_device({device_id}))
|
or not (device := self.dev_reg.async_get_device({device_id}))
|
||||||
or value.value_id in discovered_value_ids[device.id]
|
or value.value_id in self.controller_events.discovered_value_ids[device.id]
|
||||||
):
|
):
|
||||||
return
|
return
|
||||||
|
|
||||||
LOGGER.debug("Processing node %s added value %s", value.node, value)
|
LOGGER.debug("Processing node %s added value %s", value.node, value)
|
||||||
await asyncio.gather(
|
await asyncio.gather(
|
||||||
*(
|
*(
|
||||||
async_handle_discovery_info(device, disc_info, value_updates_disc_info)
|
self.async_handle_discovery_info(
|
||||||
|
device, disc_info, value_updates_disc_info
|
||||||
|
)
|
||||||
for disc_info in async_discover_single_value(
|
for disc_info in async_discover_single_value(
|
||||||
value, device, discovered_value_ids
|
value, device, self.controller_events.discovered_value_ids
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def async_on_node_removed(event: dict) -> None:
|
def async_on_value_notification(self, notification: ValueNotification) -> None:
|
||||||
"""Handle node removed event."""
|
|
||||||
node: ZwaveNode = event["node"]
|
|
||||||
replaced: bool = event.get("replaced", False)
|
|
||||||
# grab device in device registry attached to this node
|
|
||||||
dev_id = get_device_id(driver, node)
|
|
||||||
device = dev_reg.async_get_device({dev_id})
|
|
||||||
# We assert because we know the device exists
|
|
||||||
assert device
|
|
||||||
if replaced:
|
|
||||||
discovered_value_ids.pop(device.id, None)
|
|
||||||
|
|
||||||
async_dispatcher_send(
|
|
||||||
hass,
|
|
||||||
f"{DOMAIN}_{get_valueless_base_unique_id(driver, node)}_remove_entity",
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
remove_device(device)
|
|
||||||
|
|
||||||
@callback
|
|
||||||
def async_on_value_notification(notification: ValueNotification) -> None:
|
|
||||||
"""Relay stateless value notification events from Z-Wave nodes to hass."""
|
"""Relay stateless value notification events from Z-Wave nodes to hass."""
|
||||||
device = dev_reg.async_get_device({get_device_id(driver, notification.node)})
|
driver = self.controller_events.driver_events.driver
|
||||||
|
device = self.dev_reg.async_get_device(
|
||||||
|
{get_device_id(driver, notification.node)}
|
||||||
|
)
|
||||||
# We assert because we know the device exists
|
# We assert because we know the device exists
|
||||||
assert device
|
assert device
|
||||||
raw_value = value = notification.value
|
raw_value = value = notification.value
|
||||||
if notification.metadata.states:
|
if notification.metadata.states:
|
||||||
value = notification.metadata.states.get(str(value), value)
|
value = notification.metadata.states.get(str(value), value)
|
||||||
hass.bus.async_fire(
|
self.hass.bus.async_fire(
|
||||||
ZWAVE_JS_VALUE_NOTIFICATION_EVENT,
|
ZWAVE_JS_VALUE_NOTIFICATION_EVENT,
|
||||||
{
|
{
|
||||||
ATTR_DOMAIN: DOMAIN,
|
ATTR_DOMAIN: DOMAIN,
|
||||||
|
@ -459,15 +577,19 @@ async def setup_driver( # noqa: C901
|
||||||
)
|
)
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def async_on_notification(event: dict[str, Any]) -> None:
|
def async_on_notification(self, event: dict[str, Any]) -> None:
|
||||||
"""Relay stateless notification events from Z-Wave nodes to hass."""
|
"""Relay stateless notification events from Z-Wave nodes to hass."""
|
||||||
if "notification" not in event:
|
if "notification" not in event:
|
||||||
LOGGER.info("Unknown notification: %s", event)
|
LOGGER.info("Unknown notification: %s", event)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
driver = self.controller_events.driver_events.driver
|
||||||
notification: EntryControlNotification | NotificationNotification | PowerLevelNotification | MultilevelSwitchNotification = event[
|
notification: EntryControlNotification | NotificationNotification | PowerLevelNotification | MultilevelSwitchNotification = event[
|
||||||
"notification"
|
"notification"
|
||||||
]
|
]
|
||||||
device = dev_reg.async_get_device({get_device_id(driver, notification.node)})
|
device = self.dev_reg.async_get_device(
|
||||||
|
{get_device_id(driver, notification.node)}
|
||||||
|
)
|
||||||
# We assert because we know the device exists
|
# We assert because we know the device exists
|
||||||
assert device
|
assert device
|
||||||
event_data = {
|
event_data = {
|
||||||
|
@ -521,31 +643,35 @@ async def setup_driver( # noqa: C901
|
||||||
else:
|
else:
|
||||||
raise TypeError(f"Unhandled notification type: {notification}")
|
raise TypeError(f"Unhandled notification type: {notification}")
|
||||||
|
|
||||||
hass.bus.async_fire(ZWAVE_JS_NOTIFICATION_EVENT, event_data)
|
self.hass.bus.async_fire(ZWAVE_JS_NOTIFICATION_EVENT, event_data)
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def async_on_value_updated_fire_event(
|
def async_on_value_updated_fire_event(
|
||||||
value_updates_disc_info: dict[str, ZwaveDiscoveryInfo], value: Value
|
self, value_updates_disc_info: dict[str, ZwaveDiscoveryInfo], value: Value
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Fire value updated event."""
|
"""Fire value updated event."""
|
||||||
# Get the discovery info for the value that was updated. If there is
|
# Get the discovery info for the value that was updated. If there is
|
||||||
# no discovery info for this value, we don't need to fire an event
|
# no discovery info for this value, we don't need to fire an event
|
||||||
if value.value_id not in value_updates_disc_info:
|
if value.value_id not in value_updates_disc_info:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
driver = self.controller_events.driver_events.driver
|
||||||
disc_info = value_updates_disc_info[value.value_id]
|
disc_info = value_updates_disc_info[value.value_id]
|
||||||
|
|
||||||
device = dev_reg.async_get_device({get_device_id(driver, value.node)})
|
device = self.dev_reg.async_get_device({get_device_id(driver, value.node)})
|
||||||
# We assert because we know the device exists
|
# We assert because we know the device exists
|
||||||
assert device
|
assert device
|
||||||
|
|
||||||
unique_id = get_unique_id(driver, disc_info.primary_value.value_id)
|
unique_id = get_unique_id(driver, disc_info.primary_value.value_id)
|
||||||
entity_id = ent_reg.async_get_entity_id(disc_info.platform, DOMAIN, unique_id)
|
entity_id = self.ent_reg.async_get_entity_id(
|
||||||
|
disc_info.platform, DOMAIN, unique_id
|
||||||
|
)
|
||||||
|
|
||||||
raw_value = value_ = value.value
|
raw_value = value_ = value.value
|
||||||
if value.metadata.states:
|
if value.metadata.states:
|
||||||
value_ = value.metadata.states.get(str(value), value_)
|
value_ = value.metadata.states.get(str(value), value_)
|
||||||
|
|
||||||
hass.bus.async_fire(
|
self.hass.bus.async_fire(
|
||||||
ZWAVE_JS_VALUE_UPDATED_EVENT,
|
ZWAVE_JS_VALUE_UPDATED_EVENT,
|
||||||
{
|
{
|
||||||
ATTR_NODE_ID: value.node.node_id,
|
ATTR_NODE_ID: value.node.node_id,
|
||||||
|
@ -564,43 +690,6 @@ async def setup_driver( # noqa: C901
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
# If opt in preference hasn't been specified yet, we do nothing, otherwise
|
|
||||||
# we apply the preference
|
|
||||||
if opted_in := entry.data.get(CONF_DATA_COLLECTION_OPTED_IN):
|
|
||||||
await async_enable_statistics(driver)
|
|
||||||
elif opted_in is False:
|
|
||||||
await driver.async_disable_statistics()
|
|
||||||
|
|
||||||
# Check for nodes that no longer exist and remove them
|
|
||||||
stored_devices = device_registry.async_entries_for_config_entry(
|
|
||||||
dev_reg, entry.entry_id
|
|
||||||
)
|
|
||||||
known_devices = [
|
|
||||||
dev_reg.async_get_device({get_device_id(driver, node)})
|
|
||||||
for node in driver.controller.nodes.values()
|
|
||||||
]
|
|
||||||
|
|
||||||
# Devices that are in the device registry that are not known by the controller can be removed
|
|
||||||
for device in stored_devices:
|
|
||||||
if device not in known_devices:
|
|
||||||
dev_reg.async_remove_device(device.id)
|
|
||||||
|
|
||||||
# run discovery on all ready nodes
|
|
||||||
await asyncio.gather(
|
|
||||||
*(async_on_node_added(node) for node in driver.controller.nodes.values())
|
|
||||||
)
|
|
||||||
|
|
||||||
# listen for new nodes being added to the mesh
|
|
||||||
entry.async_on_unload(
|
|
||||||
driver.controller.on(
|
|
||||||
"node added",
|
|
||||||
lambda event: hass.async_create_task(async_on_node_added(event["node"])),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
# listen for nodes being removed from the mesh
|
|
||||||
# NOTE: This will not remove nodes that were removed when HA was not running
|
|
||||||
entry.async_on_unload(driver.controller.on("node removed", async_on_node_removed))
|
|
||||||
|
|
||||||
|
|
||||||
async def client_listen(
|
async def client_listen(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
@ -633,14 +722,15 @@ async def disconnect_client(hass: HomeAssistant, entry: ConfigEntry) -> None:
|
||||||
data = hass.data[DOMAIN][entry.entry_id]
|
data = hass.data[DOMAIN][entry.entry_id]
|
||||||
client: ZwaveClient = data[DATA_CLIENT]
|
client: ZwaveClient = data[DATA_CLIENT]
|
||||||
listen_task: asyncio.Task = data[DATA_CLIENT_LISTEN_TASK]
|
listen_task: asyncio.Task = data[DATA_CLIENT_LISTEN_TASK]
|
||||||
platform_task: asyncio.Task = data[DATA_START_PLATFORM_TASK]
|
start_client_task: asyncio.Task = data[DATA_START_CLIENT_TASK]
|
||||||
|
driver_events: DriverEvents = data[DATA_DRIVER_EVENTS]
|
||||||
listen_task.cancel()
|
listen_task.cancel()
|
||||||
platform_task.cancel()
|
start_client_task.cancel()
|
||||||
platform_setup_tasks = data.get(DATA_PLATFORM_SETUP, {}).values()
|
platform_setup_tasks = driver_events.platform_setup_tasks.values()
|
||||||
for task in platform_setup_tasks:
|
for task in platform_setup_tasks:
|
||||||
task.cancel()
|
task.cancel()
|
||||||
|
|
||||||
await asyncio.gather(listen_task, platform_task, *platform_setup_tasks)
|
await asyncio.gather(listen_task, start_client_task, *platform_setup_tasks)
|
||||||
|
|
||||||
if client.connected:
|
if client.connected:
|
||||||
await client.disconnect()
|
await client.disconnect()
|
||||||
|
@ -650,9 +740,10 @@ async def disconnect_client(hass: HomeAssistant, entry: ConfigEntry) -> None:
|
||||||
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
"""Unload a config entry."""
|
"""Unload a config entry."""
|
||||||
info = hass.data[DOMAIN][entry.entry_id]
|
info = hass.data[DOMAIN][entry.entry_id]
|
||||||
|
driver_events: DriverEvents = info[DATA_DRIVER_EVENTS]
|
||||||
|
|
||||||
tasks = []
|
tasks: list[asyncio.Task | Coroutine] = []
|
||||||
for platform, task in info[DATA_PLATFORM_SETUP].items():
|
for platform, task in driver_events.platform_setup_tasks.items():
|
||||||
if task.done():
|
if task.done():
|
||||||
tasks.append(
|
tasks.append(
|
||||||
hass.config_entries.async_forward_entry_unload(entry, platform)
|
hass.config_entries.async_forward_entry_unload(entry, platform)
|
||||||
|
|
|
@ -21,7 +21,6 @@ CONF_DATA_COLLECTION_OPTED_IN = "data_collection_opted_in"
|
||||||
DOMAIN = "zwave_js"
|
DOMAIN = "zwave_js"
|
||||||
|
|
||||||
DATA_CLIENT = "client"
|
DATA_CLIENT = "client"
|
||||||
DATA_PLATFORM_SETUP = "platform_setup"
|
|
||||||
|
|
||||||
EVENT_DEVICE_ADDED_TO_REGISTRY = f"{DOMAIN}_device_added_to_registry"
|
EVENT_DEVICE_ADDED_TO_REGISTRY = f"{DOMAIN}_device_added_to_registry"
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue