Create empty Z-Wave JS device on smart start provisioning (#140872)

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
pull/143160/head
Petar Petrov 2025-04-17 14:18:48 +03:00 committed by GitHub
parent 4ed81fb03f
commit 0aaa4fa79b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 667 additions and 66 deletions

View File

@ -363,11 +363,17 @@ class DriverEvents:
self.dev_reg.async_get_device(identifiers={get_device_id(driver, node)})
for node in controller.nodes.values()
]
provisioned_devices = [
self.dev_reg.async_get(entry.additional_properties["device_id"])
for entry in await controller.async_get_provisioning_entries()
if entry.additional_properties
and "device_id" in entry.additional_properties
]
# Devices that are in the device registry that are not known by the controller
# can be removed
for device in stored_devices:
if device not in known_devices:
if device not in known_devices and device not in provisioned_devices:
self.dev_reg.async_remove_device(device.id)
# run discovery on controller node
@ -448,6 +454,8 @@ class ControllerEvents:
)
)
await self.async_check_preprovisioned_device(node)
if node.is_controller_node:
# Create a controller status sensor for each device
async_dispatcher_send(
@ -497,7 +505,7 @@ class ControllerEvents:
# we do submit the node to device registry so user has
# some visual feedback that something is (in the process of) being added
self.register_node_in_dev_reg(node)
await self.async_register_node_in_dev_reg(node)
@callback
def async_on_node_removed(self, event: dict) -> None:
@ -574,18 +582,52 @@ class ControllerEvents:
f"{DOMAIN}.identify_controller.{dev_id[1]}",
)
@callback
def register_node_in_dev_reg(self, node: ZwaveNode) -> dr.DeviceEntry:
async def async_check_preprovisioned_device(self, node: ZwaveNode) -> None:
"""Check if the node was preprovisioned and update the device registry."""
provisioning_entry = (
await self.driver_events.driver.controller.async_get_provisioning_entry(
node.node_id
)
)
if (
provisioning_entry
and provisioning_entry.additional_properties
and "device_id" in provisioning_entry.additional_properties
):
preprovisioned_device = self.dev_reg.async_get(
provisioning_entry.additional_properties["device_id"]
)
if preprovisioned_device:
dsk = provisioning_entry.dsk
dsk_identifier = (DOMAIN, f"provision_{dsk}")
# If the pre-provisioned device has the DSK identifier, remove it
if dsk_identifier in preprovisioned_device.identifiers:
driver = self.driver_events.driver
device_id = get_device_id(driver, node)
device_id_ext = get_device_id_ext(driver, node)
new_identifiers = preprovisioned_device.identifiers.copy()
new_identifiers.remove(dsk_identifier)
new_identifiers.add(device_id)
if device_id_ext:
new_identifiers.add(device_id_ext)
self.dev_reg.async_update_device(
preprovisioned_device.id,
new_identifiers=new_identifiers,
)
async def async_register_node_in_dev_reg(self, node: ZwaveNode) -> dr.DeviceEntry:
"""Register node in dev reg."""
driver = self.driver_events.driver
device_id = get_device_id(driver, node)
device_id_ext = get_device_id_ext(driver, node)
node_id_device = self.dev_reg.async_get_device(identifiers={device_id})
via_device_id = None
via_identifier = None
controller = driver.controller
# Get the controller node device ID if this node is not the controller
if controller.own_node and controller.own_node != node:
via_device_id = get_device_id(driver, controller.own_node)
via_identifier = get_device_id(driver, controller.own_node)
if device_id_ext:
# If there is a device with this node ID but with a different hardware
@ -632,7 +674,7 @@ class ControllerEvents:
model=node.device_config.label,
manufacturer=node.device_config.manufacturer,
suggested_area=node.location if node.location else UNDEFINED,
via_device=via_device_id,
via_device=via_identifier,
)
async_dispatcher_send(self.hass, EVENT_DEVICE_ADDED_TO_REGISTRY, device)
@ -666,7 +708,7 @@ class NodeEvents:
"""Handle node ready event."""
LOGGER.debug("Processing node %s", node)
# register (or update) node in device registry
device = self.controller_events.register_node_in_dev_reg(node)
device = await self.controller_events.async_register_node_in_dev_reg(node)
# Remove any old value ids if this is a reinterview.
self.controller_events.discovered_value_ids.pop(device.id, None)

View File

@ -91,6 +91,7 @@ from .const import (
from .helpers import (
async_enable_statistics,
async_get_node_from_device_id,
async_get_provisioning_entry_from_device_id,
get_device_id,
)
@ -171,6 +172,10 @@ ADDITIONAL_PROPERTIES = "additional_properties"
STATUS = "status"
REQUESTED_SECURITY_CLASSES = "requestedSecurityClasses"
PROTOCOL = "protocol"
DEVICE_NAME = "device_name"
AREA_ID = "area_id"
FEATURE = "feature"
STRATEGY = "strategy"
@ -398,6 +403,7 @@ def async_register_api(hass: HomeAssistant) -> None:
websocket_api.async_register_command(hass, websocket_subscribe_s2_inclusion)
websocket_api.async_register_command(hass, websocket_grant_security_classes)
websocket_api.async_register_command(hass, websocket_validate_dsk_and_enter_pin)
websocket_api.async_register_command(hass, websocket_subscribe_new_devices)
websocket_api.async_register_command(hass, websocket_provision_smart_start_node)
websocket_api.async_register_command(hass, websocket_unprovision_smart_start_node)
websocket_api.async_register_command(hass, websocket_get_provisioning_entries)
@ -631,14 +637,38 @@ async def websocket_node_metadata(
}
)
@websocket_api.async_response
@async_get_node
async def websocket_node_alerts(
hass: HomeAssistant,
connection: ActiveConnection,
msg: dict[str, Any],
node: Node,
) -> None:
"""Get the alerts for a Z-Wave JS node."""
try:
node = async_get_node_from_device_id(hass, msg[DEVICE_ID])
except ValueError as err:
if "can't be found" in err.args[0]:
provisioning_entry = await async_get_provisioning_entry_from_device_id(
hass, msg[DEVICE_ID]
)
if provisioning_entry:
connection.send_result(
msg[ID],
{
"comments": [
{
"level": "info",
"text": "This device has been provisioned but is not yet included in the "
"network.",
}
],
},
)
else:
connection.send_error(msg[ID], ERR_NOT_FOUND, str(err))
else:
connection.send_error(msg[ID], ERR_NOT_LOADED, str(err))
return
connection.send_result(
msg[ID],
{
@ -971,12 +1001,58 @@ async def websocket_validate_dsk_and_enter_pin(
connection.send_result(msg[ID])
@websocket_api.require_admin
@websocket_api.websocket_command(
{
vol.Required(TYPE): "zwave_js/subscribe_new_devices",
vol.Required(ENTRY_ID): str,
}
)
@websocket_api.async_response
async def websocket_subscribe_new_devices(
hass: HomeAssistant,
connection: ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Subscribe to new devices."""
@callback
def async_cleanup() -> None:
for unsub in unsubs:
unsub()
@callback
def device_registered(device: dr.DeviceEntry) -> None:
device_details = {
"name": device.name,
"id": device.id,
"manufacturer": device.manufacturer,
"model": device.model,
}
connection.send_message(
websocket_api.event_message(
msg[ID], {"event": "device registered", "device": device_details}
)
)
connection.subscriptions[msg["id"]] = async_cleanup
msg[DATA_UNSUBSCRIBE] = unsubs = [
async_dispatcher_connect(
hass, EVENT_DEVICE_ADDED_TO_REGISTRY, device_registered
),
]
connection.send_result(msg[ID])
@websocket_api.require_admin
@websocket_api.websocket_command(
{
vol.Required(TYPE): "zwave_js/provision_smart_start_node",
vol.Required(ENTRY_ID): str,
vol.Required(QR_PROVISIONING_INFORMATION): QR_PROVISIONING_INFORMATION_SCHEMA,
vol.Optional(PROTOCOL): vol.Coerce(Protocols),
vol.Optional(DEVICE_NAME): str,
vol.Optional(AREA_ID): str,
}
)
@websocket_api.async_response
@ -991,18 +1067,68 @@ async def websocket_provision_smart_start_node(
driver: Driver,
) -> None:
"""Pre-provision a smart start node."""
qr_info = msg[QR_PROVISIONING_INFORMATION]
provisioning_info = msg[QR_PROVISIONING_INFORMATION]
if provisioning_info.version == QRCodeVersion.S2:
if qr_info.version == QRCodeVersion.S2:
connection.send_error(
msg[ID],
ERR_INVALID_FORMAT,
"QR code version S2 is not supported for this command",
)
return
provisioning_info = ProvisioningEntry(
dsk=qr_info.dsk,
security_classes=qr_info.security_classes,
requested_security_classes=qr_info.requested_security_classes,
protocol=msg.get(PROTOCOL),
additional_properties=qr_info.additional_properties,
)
device = None
# Create an empty device if device_name is provided
if device_name := msg.get(DEVICE_NAME):
dev_reg = dr.async_get(hass)
# Create a unique device identifier using the DSK
device_identifier = (DOMAIN, f"provision_{qr_info.dsk}")
manufacturer = None
model = None
device_info = await driver.config_manager.lookup_device(
qr_info.manufacturer_id,
qr_info.product_type,
qr_info.product_id,
)
if device_info:
manufacturer = device_info.manufacturer
model = device_info.label
# Create an empty device
device = dev_reg.async_get_or_create(
config_entry_id=entry.entry_id,
identifiers={device_identifier},
name=device_name,
manufacturer=manufacturer,
model=model,
via_device=get_device_id(driver, driver.controller.own_node)
if driver.controller.own_node
else None,
)
dev_reg.async_update_device(
device.id, area_id=msg.get(AREA_ID), name_by_user=device_name
)
if provisioning_info.additional_properties is None:
provisioning_info.additional_properties = {}
provisioning_info.additional_properties["device_id"] = device.id
await driver.controller.async_provision_smart_start_node(provisioning_info)
connection.send_result(msg[ID])
if device:
connection.send_result(msg[ID], device.id)
else:
connection.send_result(msg[ID])
@websocket_api.require_admin
@ -1036,7 +1162,24 @@ async def websocket_unprovision_smart_start_node(
)
return
dsk_or_node_id = msg.get(DSK) or msg[NODE_ID]
provisioning_entry = await driver.controller.async_get_provisioning_entry(
dsk_or_node_id
)
if (
provisioning_entry
and provisioning_entry.additional_properties
and "device_id" in provisioning_entry.additional_properties
):
device_identifier = (DOMAIN, f"provision_{provisioning_entry.dsk}")
device_id = provisioning_entry.additional_properties["device_id"]
dev_reg = dr.async_get(hass)
device = dev_reg.async_get(device_id)
if device and device.identifiers == {device_identifier}:
# Only remove the device if nothing else has claimed it
dev_reg.async_remove_device(device_id)
await driver.controller.async_unprovision_smart_start_node(dsk_or_node_id)
connection.send_result(msg[ID])

View File

@ -15,7 +15,7 @@ from zwave_js_server.const import (
ConfigurationValueType,
LogLevel,
)
from zwave_js_server.model.controller import Controller
from zwave_js_server.model.controller import Controller, ProvisioningEntry
from zwave_js_server.model.driver import Driver
from zwave_js_server.model.log_config import LogConfig
from zwave_js_server.model.node import Node as ZwaveNode
@ -233,7 +233,7 @@ def get_home_and_node_id_from_device_entry(
),
None,
)
if device_id is None:
if device_id is None or device_id.startswith("provision_"):
return None
id_ = device_id.split("-")
return (id_[0], int(id_[1]))
@ -264,12 +264,12 @@ def async_get_node_from_device_id(
),
None,
)
if entry and entry.state != ConfigEntryState.LOADED:
raise ValueError(f"Device {device_id} config entry is not loaded")
if entry is None:
raise ValueError(
f"Device {device_id} is not from an existing zwave_js config entry"
)
if entry.state != ConfigEntryState.LOADED:
raise ValueError(f"Device {device_id} config entry is not loaded")
client: ZwaveClient = entry.runtime_data[DATA_CLIENT]
driver = client.driver
@ -289,6 +289,53 @@ def async_get_node_from_device_id(
return driver.controller.nodes[node_id]
async def async_get_provisioning_entry_from_device_id(
hass: HomeAssistant, device_id: str
) -> ProvisioningEntry | None:
"""Get provisioning entry from a device ID.
Raises ValueError if device is invalid
"""
dev_reg = dr.async_get(hass)
if not (device_entry := dev_reg.async_get(device_id)):
raise ValueError(f"Device ID {device_id} is not valid")
# Use device config entry ID's to validate that this is a valid zwave_js device
# and to get the client
config_entry_ids = device_entry.config_entries
entry = next(
(
entry
for entry in hass.config_entries.async_entries(DOMAIN)
if entry.entry_id in config_entry_ids
),
None,
)
if entry is None:
raise ValueError(
f"Device {device_id} is not from an existing zwave_js config entry"
)
if entry.state != ConfigEntryState.LOADED:
raise ValueError(f"Device {device_id} config entry is not loaded")
client: ZwaveClient = entry.runtime_data[DATA_CLIENT]
driver = client.driver
if driver is None:
raise ValueError("Driver is not ready.")
provisioning_entries = await driver.controller.async_get_provisioning_entries()
for provisioning_entry in provisioning_entries:
if (
provisioning_entry.additional_properties
and provisioning_entry.additional_properties.get("device_id") == device_id
):
return provisioning_entry
return None
@callback
def async_get_node_from_entity_id(
hass: HomeAssistant,

View File

@ -39,10 +39,12 @@ from zwave_js_server.model.value import ConfigurationValue, get_value_id_str
from homeassistant.components.websocket_api import ERR_INVALID_FORMAT, ERR_NOT_FOUND
from homeassistant.components.zwave_js.api import (
APPLICATION_VERSION,
AREA_ID,
CLIENT_SIDE_AUTH,
COMMAND_CLASS_ID,
CONFIG,
DEVICE_ID,
DEVICE_NAME,
DSK,
ENABLED,
ENDPOINT,
@ -67,6 +69,7 @@ from homeassistant.components.zwave_js.api import (
PRODUCT_TYPE,
PROPERTY,
PROPERTY_KEY,
PROTOCOL,
QR_CODE_STRING,
QR_PROVISIONING_INFORMATION,
REQUESTED_SECURITY_CLASSES,
@ -485,14 +488,14 @@ async def test_node_alerts(
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test the node comments websocket command."""
entry = integration
ws_client = await hass_ws_client(hass)
device = device_registry.async_get_device(identifiers={(DOMAIN, "3245146787-35")})
assert device
await ws_client.send_json(
await ws_client.send_json_auto_id(
{
ID: 3,
TYPE: "zwave_js/node_alerts",
DEVICE_ID: device.id,
}
@ -502,6 +505,83 @@ async def test_node_alerts(
assert result["comments"] == [{"level": "info", "text": "test"}]
assert result["is_embedded"]
# Test with provisioned device
valid_qr_info = {
VERSION: 1,
SECURITY_CLASSES: [0],
DSK: "test",
GENERIC_DEVICE_CLASS: 1,
SPECIFIC_DEVICE_CLASS: 1,
INSTALLER_ICON_TYPE: 1,
MANUFACTURER_ID: 1,
PRODUCT_TYPE: 1,
PRODUCT_ID: 1,
APPLICATION_VERSION: "test",
}
# Test QR provisioning information
await ws_client.send_json_auto_id(
{
TYPE: "zwave_js/provision_smart_start_node",
ENTRY_ID: entry.entry_id,
QR_PROVISIONING_INFORMATION: valid_qr_info,
DEVICE_NAME: "test",
}
)
msg = await ws_client.receive_json()
assert msg["success"]
with patch(
f"{CONTROLLER_PATCH_PREFIX}.async_get_provisioning_entries",
return_value=[
ProvisioningEntry.from_dict({**valid_qr_info, "device_id": msg["result"]})
],
):
await ws_client.send_json_auto_id(
{
TYPE: "zwave_js/node_alerts",
DEVICE_ID: msg["result"],
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert msg["result"]["comments"] == [
{
"level": "info",
"text": "This device has been provisioned but is not yet included in the network.",
}
]
# Test missing node with no provisioning entry
device = device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
identifiers={(DOMAIN, "3245146787-12")},
)
assert device
await ws_client.send_json_auto_id(
{
TYPE: "zwave_js/node_alerts",
DEVICE_ID: device.id,
}
)
msg = await ws_client.receive_json()
assert not msg["success"]
assert msg["error"]["code"] == ERR_NOT_FOUND
# Test integration not loaded error - need to unload the integration
await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
await ws_client.send_json_auto_id(
{
TYPE: "zwave_js/node_alerts",
DEVICE_ID: device.id,
}
)
msg = await ws_client.receive_json()
assert not msg["success"]
assert msg["error"]["code"] == ERR_NOT_LOADED
async def test_add_node(
hass: HomeAssistant,
@ -1093,7 +1173,11 @@ async def test_validate_dsk_and_enter_pin(
async def test_provision_smart_start_node(
hass: HomeAssistant, integration, client, hass_ws_client: WebSocketGenerator
hass: HomeAssistant,
device_registry: dr.DeviceRegistry,
integration,
client,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test provision_smart_start_node websocket command."""
entry = integration
@ -1131,20 +1215,9 @@ async def test_provision_smart_start_node(
assert len(client.async_send_command.call_args_list) == 1
assert client.async_send_command.call_args[0][0] == {
"command": "controller.provision_smart_start_node",
"entry": QRProvisioningInformation(
version=QRCodeVersion.SMART_START,
security_classes=[SecurityClass.S2_UNAUTHENTICATED],
"entry": ProvisioningEntry(
dsk="test",
generic_device_class=1,
specific_device_class=1,
installer_icon_type=1,
manufacturer_id=1,
product_type=1,
product_id=1,
application_version="test",
max_inclusion_request_interval=None,
uuid=None,
supported_protocols=None,
security_classes=[SecurityClass.S2_UNAUTHENTICATED],
additional_properties={"name": "test"},
).to_dict(),
}
@ -1152,6 +1225,51 @@ async def test_provision_smart_start_node(
client.async_send_command.reset_mock()
client.async_send_command.return_value = {"success": True}
# Test QR provisioning information with device name and area
await ws_client.send_json(
{
ID: 4,
TYPE: "zwave_js/provision_smart_start_node",
ENTRY_ID: entry.entry_id,
QR_PROVISIONING_INFORMATION: {
**valid_qr_info,
},
PROTOCOL: Protocols.ZWAVE_LONG_RANGE,
DEVICE_NAME: "test_name",
AREA_ID: "test_area",
}
)
msg = await ws_client.receive_json()
assert msg["success"]
# verify a device was created
device = device_registry.async_get_device(
identifiers={(DOMAIN, "provision_test")},
)
assert device is not None
assert device.name == "test_name"
assert device.area_id == "test_area"
assert len(client.async_send_command.call_args_list) == 2
assert client.async_send_command.call_args_list[0][0][0] == {
"command": "config_manager.lookup_device",
"manufacturerId": 1,
"productType": 1,
"productId": 1,
}
assert client.async_send_command.call_args_list[1][0][0] == {
"command": "controller.provision_smart_start_node",
"entry": ProvisioningEntry(
dsk="test",
security_classes=[SecurityClass.S2_UNAUTHENTICATED],
protocol=Protocols.ZWAVE_LONG_RANGE,
additional_properties={
"name": "test",
"device_id": device.id,
},
).to_dict(),
}
# Test QR provisioning information with S2 version throws error
await ws_client.send_json(
{
@ -1230,7 +1348,11 @@ async def test_provision_smart_start_node(
async def test_unprovision_smart_start_node(
hass: HomeAssistant, integration, client, hass_ws_client: WebSocketGenerator
hass: HomeAssistant,
device_registry: dr.DeviceRegistry,
integration,
client,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test unprovision_smart_start_node websocket command."""
entry = integration
@ -1239,9 +1361,8 @@ async def test_unprovision_smart_start_node(
client.async_send_command.return_value = {}
# Test node ID as input
await ws_client.send_json(
await ws_client.send_json_auto_id(
{
ID: 1,
TYPE: "zwave_js/unprovision_smart_start_node",
ENTRY_ID: entry.entry_id,
NODE_ID: 1,
@ -1251,8 +1372,12 @@ async def test_unprovision_smart_start_node(
msg = await ws_client.receive_json()
assert msg["success"]
assert len(client.async_send_command.call_args_list) == 1
assert client.async_send_command.call_args[0][0] == {
assert len(client.async_send_command.call_args_list) == 2
assert client.async_send_command.call_args_list[0][0][0] == {
"command": "controller.get_provisioning_entry",
"dskOrNodeId": 1,
}
assert client.async_send_command.call_args_list[1][0][0] == {
"command": "controller.unprovision_smart_start_node",
"dskOrNodeId": 1,
}
@ -1261,9 +1386,8 @@ async def test_unprovision_smart_start_node(
client.async_send_command.return_value = {}
# Test DSK as input
await ws_client.send_json(
await ws_client.send_json_auto_id(
{
ID: 2,
TYPE: "zwave_js/unprovision_smart_start_node",
ENTRY_ID: entry.entry_id,
DSK: "test",
@ -1273,8 +1397,12 @@ async def test_unprovision_smart_start_node(
msg = await ws_client.receive_json()
assert msg["success"]
assert len(client.async_send_command.call_args_list) == 1
assert client.async_send_command.call_args[0][0] == {
assert len(client.async_send_command.call_args_list) == 2
assert client.async_send_command.call_args_list[0][0][0] == {
"command": "controller.get_provisioning_entry",
"dskOrNodeId": "test",
}
assert client.async_send_command.call_args_list[1][0][0] == {
"command": "controller.unprovision_smart_start_node",
"dskOrNodeId": "test",
}
@ -1283,9 +1411,8 @@ async def test_unprovision_smart_start_node(
client.async_send_command.return_value = {}
# Test not including DSK or node ID as input fails
await ws_client.send_json(
await ws_client.send_json_auto_id(
{
ID: 3,
TYPE: "zwave_js/unprovision_smart_start_node",
ENTRY_ID: entry.entry_id,
}
@ -1296,14 +1423,78 @@ async def test_unprovision_smart_start_node(
assert len(client.async_send_command.call_args_list) == 0
# Test with pre provisioned device
# Create device registry entry for mock node
device = device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
identifiers={(DOMAIN, "provision_test"), ("other_domain", "test")},
name="Node 67",
)
provisioning_entry = ProvisioningEntry.from_dict(
{
"dsk": "test",
"securityClasses": [SecurityClass.S2_UNAUTHENTICATED],
"device_id": device.id,
}
)
with patch.object(
client.driver.controller,
"async_get_provisioning_entry",
return_value=provisioning_entry,
):
# Don't remove the device if it has additional identifiers
await ws_client.send_json_auto_id(
{
TYPE: "zwave_js/unprovision_smart_start_node",
ENTRY_ID: entry.entry_id,
DSK: "test",
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert len(client.async_send_command.call_args_list) == 1
assert client.async_send_command.call_args[0][0] == {
"command": "controller.unprovision_smart_start_node",
"dskOrNodeId": "test",
}
device = device_registry.async_get(device.id)
assert device is not None
client.async_send_command.reset_mock()
# Remove the device if it doesn't have additional identifiers
device_registry.async_update_device(
device.id, new_identifiers={(DOMAIN, "provision_test")}
)
await ws_client.send_json_auto_id(
{
TYPE: "zwave_js/unprovision_smart_start_node",
ENTRY_ID: entry.entry_id,
DSK: "test",
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert len(client.async_send_command.call_args_list) == 1
assert client.async_send_command.call_args[0][0] == {
"command": "controller.unprovision_smart_start_node",
"dskOrNodeId": "test",
}
# Verify device was removed from device registry
device = device_registry.async_get(device.id)
assert device is None
# Test FailedZWaveCommand is caught
with patch(
f"{CONTROLLER_PATCH_PREFIX}.async_unprovision_smart_start_node",
side_effect=FailedZWaveCommand("failed_command", 1, "error message"),
):
await ws_client.send_json(
await ws_client.send_json_auto_id(
{
ID: 6,
TYPE: "zwave_js/unprovision_smart_start_node",
ENTRY_ID: entry.entry_id,
DSK: "test",
@ -1319,9 +1510,8 @@ async def test_unprovision_smart_start_node(
await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
await ws_client.send_json(
await ws_client.send_json_auto_id(
{
ID: 7,
TYPE: "zwave_js/unprovision_smart_start_node",
ENTRY_ID: entry.entry_id,
DSK: "test",
@ -5658,3 +5848,39 @@ async def test_lookup_device(
assert not msg["success"]
assert msg["error"]["code"] == error_message
assert msg["error"]["message"] == f"Command failed: {error_message}"
async def test_subscribe_new_devices(
hass: HomeAssistant,
integration,
client,
hass_ws_client: WebSocketGenerator,
multisensor_6_state,
) -> None:
"""Test the subscribe_new_devices websocket command."""
entry = integration
ws_client = await hass_ws_client(hass)
await ws_client.send_json_auto_id(
{
TYPE: "zwave_js/subscribe_new_devices",
ENTRY_ID: entry.entry_id,
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert msg["result"] is None
# Simulate a device being registered
node = Node(client, deepcopy(multisensor_6_state))
client.driver.controller.emit("node added", {"node": node})
await hass.async_block_till_done()
# Verify we receive the expected message
msg = await ws_client.receive_json()
assert msg["type"] == "event"
assert msg["event"]["event"] == "device registered"
assert msg["event"]["device"]["name"] == node.device_config.description
assert msg["event"]["device"]["manufacturer"] == node.device_config.manufacturer
assert msg["event"]["device"]["model"] == node.device_config.label

View File

@ -1,17 +1,27 @@
"""Test the Z-Wave JS helpers module."""
import voluptuous as vol
from unittest.mock import patch
import pytest
import voluptuous as vol
from zwave_js_server.const import SecurityClass
from zwave_js_server.model.controller import ProvisioningEntry
from homeassistant.components.zwave_js.const import DOMAIN
from homeassistant.components.zwave_js.helpers import (
async_get_node_status_sensor_entity_id,
async_get_nodes_from_area_id,
async_get_provisioning_entry_from_device_id,
get_value_state_schema,
)
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from homeassistant.helpers import area_registry as ar, device_registry as dr
from tests.common import MockConfigEntry
CONTROLLER_PATCH_PREFIX = "zwave_js_server.model.controller.Controller"
async def test_async_get_node_status_sensor_entity_id(
hass: HomeAssistant, device_registry: dr.DeviceRegistry
@ -43,3 +53,82 @@ async def test_get_value_state_schema_boolean_config_value(
)
assert isinstance(schema_validator, vol.Coerce)
assert schema_validator.type is bool
async def test_async_get_provisioning_entry_from_device_id(
hass: HomeAssistant, client, device_registry: dr.DeviceRegistry, integration
) -> None:
"""Test async_get_provisioning_entry_from_device_id function."""
device = device_registry.async_get_or_create(
config_entry_id=integration.entry_id,
identifiers={(DOMAIN, "test-device")},
)
provisioning_entry = ProvisioningEntry.from_dict(
{
"dsk": "test",
"securityClasses": [SecurityClass.S2_UNAUTHENTICATED],
"device_id": device.id,
}
)
with patch(
f"{CONTROLLER_PATCH_PREFIX}.async_get_provisioning_entries",
return_value=[provisioning_entry],
):
result = await async_get_provisioning_entry_from_device_id(hass, device.id)
assert result == provisioning_entry
# Test invalid device
with pytest.raises(ValueError, match="Device ID not-a-real-device is not valid"):
await async_get_provisioning_entry_from_device_id(hass, "not-a-real-device")
# Test device exists but is not from a zwave_js config entry
non_zwave_config_entry = MockConfigEntry(domain="not_zwave_js")
non_zwave_config_entry.add_to_hass(hass)
non_zwave_device = device_registry.async_get_or_create(
config_entry_id=non_zwave_config_entry.entry_id,
identifiers={("not_zwave_js", "test-device")},
)
with pytest.raises(
ValueError,
match=f"Device {non_zwave_device.id} is not from an existing zwave_js config entry",
):
await async_get_provisioning_entry_from_device_id(hass, non_zwave_device.id)
# Test device exists but config entry is not loaded
not_loaded_config_entry = MockConfigEntry(
domain=DOMAIN, state=ConfigEntryState.NOT_LOADED
)
not_loaded_config_entry.add_to_hass(hass)
not_loaded_device = device_registry.async_get_or_create(
config_entry_id=not_loaded_config_entry.entry_id,
identifiers={(DOMAIN, "not-loaded-device")},
)
with pytest.raises(
ValueError, match=f"Device {not_loaded_device.id} config entry is not loaded"
):
await async_get_provisioning_entry_from_device_id(hass, not_loaded_device.id)
# Test no matching provisioning entry
with patch(
f"{CONTROLLER_PATCH_PREFIX}.async_get_provisioning_entries",
return_value=[],
):
result = await async_get_provisioning_entry_from_device_id(hass, device.id)
assert result is None
# Test multiple provisioning entries but only one matches
other_provisioning_entry = ProvisioningEntry.from_dict(
{
"dsk": "other",
"securityClasses": [SecurityClass.S2_UNAUTHENTICATED],
"device_id": "other-id",
}
)
with patch(
f"{CONTROLLER_PATCH_PREFIX}.async_get_provisioning_entries",
return_value=[other_provisioning_entry, provisioning_entry],
):
result = await async_get_provisioning_entry_from_device_id(hass, device.id)
assert result == provisioning_entry

View File

@ -11,12 +11,14 @@ from aiohasupervisor import SupervisorError
from aiohasupervisor.models import AddonsOptions
import pytest
from zwave_js_server.client import Client
from zwave_js_server.const import SecurityClass
from zwave_js_server.event import Event
from zwave_js_server.exceptions import (
BaseZwaveJSServerError,
InvalidServerVersion,
NotConnected,
)
from zwave_js_server.model.controller import ProvisioningEntry
from zwave_js_server.model.node import Node, NodeDataType
from zwave_js_server.model.version import VersionInfo
@ -24,7 +26,7 @@ from homeassistant.components.hassio import HassioAPIError
from homeassistant.components.logger import DOMAIN as LOGGER_DOMAIN, SERVICE_SET_LEVEL
from homeassistant.components.persistent_notification import async_dismiss
from homeassistant.components.zwave_js import DOMAIN
from homeassistant.components.zwave_js.helpers import get_device_id
from homeassistant.components.zwave_js.helpers import get_device_id, get_device_id_ext
from homeassistant.config_entries import ConfigEntryDisabler, ConfigEntryState
from homeassistant.const import STATE_UNAVAILABLE
from homeassistant.core import CoreState, HomeAssistant
@ -45,6 +47,8 @@ from tests.common import (
)
from tests.typing import WebSocketGenerator
CONTROLLER_PATCH_PREFIX = "zwave_js_server.model.controller.Controller"
@pytest.fixture(name="connect_timeout")
def connect_timeout_fixture() -> Generator[int]:
@ -277,10 +281,13 @@ async def test_listen_done_during_setup_after_forward_entry(
"""Test listen task finishing during setup after forward entry."""
assert hass.state is CoreState.running
original_send_command_side_effect = client.async_send_command.side_effect
async def send_command_side_effect(*args: Any, **kwargs: Any) -> None:
"""Mock send command."""
listen_block.set()
getattr(listen_result, listen_future_result_method)(listen_future_result)
client.async_send_command.side_effect = original_send_command_side_effect
# Yield to allow the listen task to run
await asyncio.sleep(0)
@ -427,6 +434,46 @@ async def test_on_node_added_ready(
)
async def test_on_node_added_preprovisioned(
hass: HomeAssistant,
device_registry: dr.DeviceRegistry,
multisensor_6_state,
client,
integration,
) -> None:
"""Test node added event with a preprovisioned device."""
dsk = "test"
node = Node(client, deepcopy(multisensor_6_state))
device = device_registry.async_get_or_create(
config_entry_id=integration.entry_id,
identifiers={(DOMAIN, f"provision_{dsk}")},
)
provisioning_entry = ProvisioningEntry.from_dict(
{
"dsk": dsk,
"securityClasses": [SecurityClass.S2_UNAUTHENTICATED],
"device_id": device.id,
}
)
with patch(
f"{CONTROLLER_PATCH_PREFIX}.async_get_provisioning_entry",
side_effect=lambda id: provisioning_entry if id == node.node_id else None,
):
event = {"node": node}
client.driver.controller.emit("node added", event)
await hass.async_block_till_done()
device = device_registry.async_get(device.id)
assert device
assert device.identifiers == {
get_device_id(client.driver, node),
get_device_id_ext(client.driver, node),
}
assert device.sw_version == node.firmware_version
# There should only be the controller and the preprovisioned device
assert len(device_registry.devices) == 2
@pytest.mark.usefixtures("integration")
async def test_on_node_added_not_ready(
hass: HomeAssistant,
@ -2045,7 +2092,14 @@ async def test_server_logging(hass: HomeAssistant, client: MagicMock) -> None:
# is enabled
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert len(client.async_send_command.call_args_list) == 0
assert len(client.async_send_command.call_args_list) == 2
assert client.async_send_command.call_args_list[0][0][0] == {
"command": "controller.get_provisioning_entries",
}
assert client.async_send_command.call_args_list[1][0][0] == {
"command": "controller.get_provisioning_entry",
"dskOrNodeId": 1,
}
assert not client.enable_server_logging.called
assert not client.disable_server_logging.called

View File

@ -123,7 +123,7 @@ async def test_number_writeable(
blocking=True,
)
assert len(client.async_send_command.call_args_list) == 2
assert len(client.async_send_command.call_args_list) == 5
args = client.async_send_command.call_args[0][0]
assert args["command"] == "node.set_value"
assert args["nodeId"] == 4

View File

@ -324,12 +324,12 @@ async def test_update_entity_ha_not_running(
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert len(client.async_send_command.call_args_list) == 1
assert len(client.async_send_command.call_args_list) == 4
await hass.async_start()
await hass.async_block_till_done()
assert len(client.async_send_command.call_args_list) == 1
assert len(client.async_send_command.call_args_list) == 4
# Update should be delayed by a day because HA is not running
hass.set_state(CoreState.starting)
@ -337,15 +337,15 @@ async def test_update_entity_ha_not_running(
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(minutes=5))
await hass.async_block_till_done()
assert len(client.async_send_command.call_args_list) == 1
assert len(client.async_send_command.call_args_list) == 4
hass.set_state(CoreState.running)
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(minutes=5, days=1))
await hass.async_block_till_done()
assert len(client.async_send_command.call_args_list) == 2
args = client.async_send_command.call_args_list[1][0][0]
assert len(client.async_send_command.call_args_list) == 5
args = client.async_send_command.call_args_list[4][0][0]
assert args["command"] == "controller.get_available_firmware_updates"
assert args["nodeId"] == zen_31.node_id
@ -651,12 +651,12 @@ async def test_update_entity_delay(
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert len(client.async_send_command.call_args_list) == 2
assert len(client.async_send_command.call_args_list) == 6
await hass.async_start()
await hass.async_block_till_done()
assert len(client.async_send_command.call_args_list) == 2
assert len(client.async_send_command.call_args_list) == 6
update_interval = timedelta(minutes=5)
freezer.tick(update_interval)
@ -665,8 +665,8 @@ async def test_update_entity_delay(
nodes: set[int] = set()
assert len(client.async_send_command.call_args_list) == 3
args = client.async_send_command.call_args_list[2][0][0]
assert len(client.async_send_command.call_args_list) == 7
args = client.async_send_command.call_args_list[6][0][0]
assert args["command"] == "controller.get_available_firmware_updates"
nodes.add(args["nodeId"])
@ -674,8 +674,8 @@ async def test_update_entity_delay(
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(client.async_send_command.call_args_list) == 4
args = client.async_send_command.call_args_list[3][0][0]
assert len(client.async_send_command.call_args_list) == 8
args = client.async_send_command.call_args_list[7][0][0]
assert args["command"] == "controller.get_available_firmware_updates"
nodes.add(args["nodeId"])
@ -846,8 +846,8 @@ async def test_update_entity_full_restore_data_update_available(
assert attrs[ATTR_IN_PROGRESS] is True
assert attrs[ATTR_UPDATE_PERCENTAGE] is None
assert len(client.async_send_command.call_args_list) == 2
assert client.async_send_command.call_args_list[1][0][0] == {
assert len(client.async_send_command.call_args_list) == 5
assert client.async_send_command.call_args_list[4][0][0] == {
"command": "controller.firmware_update_ota",
"nodeId": climate_radio_thermostat_ct100_plus_different_endpoints.node_id,
"updateInfo": {