Redact user codes from zwave_js diagnostics (#68515)

* Redact user codes from zwave_js diagnostics

* simplify test

* Remove unused logic

* revert change and make all inputs to ZwaveValueID optional

* revert change and make all inputs to ZwaveValueID optional

* Remove unused diagnostics data from fixture and test location redaction

* Add empty ZwaveValueID check

* Improve coverage

* Simplify post_init check

* Use dataclasses.astuple for checks instead
pull/68584/head
Raman Gupta 2022-03-23 16:13:27 -04:00 committed by GitHub
parent ccd8c7d5f8
commit 8293430e25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 2064 additions and 44 deletions

View File

@ -1,12 +1,16 @@
"""Provides diagnostics for Z-Wave JS."""
from __future__ import annotations
from dataclasses import astuple
from typing import Any
from zwave_js_server.client import Client
from zwave_js_server.const import CommandClass
from zwave_js_server.dump import dump_msgs
from zwave_js_server.model.node import Node, NodeDataType
from zwave_js_server.model.value import ValueDataType
from homeassistant.components.diagnostics.const import REDACTED
from homeassistant.components.diagnostics.util import async_redact_data
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_URL
@ -17,9 +21,43 @@ from homeassistant.helpers.device_registry import DeviceEntry
from homeassistant.helpers.entity_registry import async_entries_for_device, async_get
from .const import DATA_CLIENT, DOMAIN
from .helpers import get_home_and_node_id_from_device_entry
from .helpers import ZwaveValueID, get_home_and_node_id_from_device_entry
TO_REDACT = {"homeId", "location"}
KEYS_TO_REDACT = {"homeId", "location"}
VALUES_TO_REDACT = (
ZwaveValueID(property_="userCode", command_class=CommandClass.USER_CODE),
)
def redact_value_of_zwave_value(zwave_value: ValueDataType) -> ValueDataType:
"""Redact value of a Z-Wave value."""
for value_to_redact in VALUES_TO_REDACT:
zwave_value_id = ZwaveValueID(
property_=zwave_value["property"],
command_class=CommandClass(zwave_value["commandClass"]),
endpoint=zwave_value["endpoint"],
property_key=zwave_value.get("propertyKey"),
)
if all(
redacted_field_val is None or redacted_field_val == zwave_value_field_val
for redacted_field_val, zwave_value_field_val in zip(
astuple(value_to_redact), astuple(zwave_value_id)
)
):
return {**zwave_value, "value": REDACTED}
return zwave_value
def redact_node_state(node_state: NodeDataType) -> NodeDataType:
"""Redact node state."""
return {
**node_state,
"values": [
redact_value_of_zwave_value(zwave_value)
for zwave_value in node_state["values"]
],
}
def get_device_entities(
@ -79,10 +117,16 @@ async def async_get_config_entry_diagnostics(
hass: HomeAssistant, config_entry: ConfigEntry
) -> list[dict]:
"""Return diagnostics for a config entry."""
msgs: list[dict] = await dump_msgs(
config_entry.data[CONF_URL], async_get_clientsession(hass)
msgs: list[dict] = async_redact_data(
await dump_msgs(config_entry.data[CONF_URL], async_get_clientsession(hass)),
KEYS_TO_REDACT,
)
return async_redact_data(msgs, TO_REDACT)
handshake_msgs = msgs[:-1]
network_state = msgs[-1]
network_state["result"]["state"]["nodes"] = [
redact_node_state(node) for node in network_state["result"]["state"]["nodes"]
]
return [*handshake_msgs, network_state]
async def async_get_device_diagnostics(
@ -104,5 +148,5 @@ async def async_get_device_diagnostics(
"maxSchemaVersion": client.version.max_schema_version,
},
"entities": entities,
"state": async_redact_data(node.data, TO_REDACT),
"state": redact_node_state(async_redact_data(node.data, KEYS_TO_REDACT)),
}

View File

@ -55,8 +55,8 @@ from .discovery_data_template import (
FanValueMapping,
FixedFanValueMappingDataTemplate,
NumericSensorDataTemplate,
ZwaveValueID,
)
from .helpers import ZwaveValueID
class DataclassMustHaveAtLeastOne:
@ -307,7 +307,7 @@ DISCOVERY_SCHEMAS = [
primary_value=SWITCH_MULTILEVEL_CURRENT_VALUE_SCHEMA,
data_template=ConfigurableFanValueMappingDataTemplate(
configuration_option=ZwaveValueID(
5, CommandClass.CONFIGURATION, endpoint=0
property_=5, command_class=CommandClass.CONFIGURATION, endpoint=0
),
configuration_value_to_fan_value_mapping={
0: FanValueMapping(speeds=[(1, 33), (34, 66), (67, 99)]),
@ -325,8 +325,8 @@ DISCOVERY_SCHEMAS = [
primary_value=SWITCH_MULTILEVEL_CURRENT_VALUE_SCHEMA,
data_template=CoverTiltDataTemplate(
tilt_value_id=ZwaveValueID(
"fibaro",
CommandClass.MANUFACTURER_PROPRIETARY,
property_="fibaro",
command_class=CommandClass.MANUFACTURER_PROPRIETARY,
endpoint=0,
property_key="venetianBlindsTilt",
)
@ -391,34 +391,36 @@ DISCOVERY_SCHEMAS = [
lookup_table={
# Internal Sensor
"A": ZwaveValueID(
THERMOSTAT_CURRENT_TEMP_PROPERTY,
CommandClass.SENSOR_MULTILEVEL,
property_=THERMOSTAT_CURRENT_TEMP_PROPERTY,
command_class=CommandClass.SENSOR_MULTILEVEL,
endpoint=2,
),
"AF": ZwaveValueID(
THERMOSTAT_CURRENT_TEMP_PROPERTY,
CommandClass.SENSOR_MULTILEVEL,
property_=THERMOSTAT_CURRENT_TEMP_PROPERTY,
command_class=CommandClass.SENSOR_MULTILEVEL,
endpoint=2,
),
# External Sensor
"A2": ZwaveValueID(
THERMOSTAT_CURRENT_TEMP_PROPERTY,
CommandClass.SENSOR_MULTILEVEL,
property_=THERMOSTAT_CURRENT_TEMP_PROPERTY,
command_class=CommandClass.SENSOR_MULTILEVEL,
endpoint=3,
),
"A2F": ZwaveValueID(
THERMOSTAT_CURRENT_TEMP_PROPERTY,
CommandClass.SENSOR_MULTILEVEL,
property_=THERMOSTAT_CURRENT_TEMP_PROPERTY,
command_class=CommandClass.SENSOR_MULTILEVEL,
endpoint=3,
),
# Floor sensor
"F": ZwaveValueID(
THERMOSTAT_CURRENT_TEMP_PROPERTY,
CommandClass.SENSOR_MULTILEVEL,
property_=THERMOSTAT_CURRENT_TEMP_PROPERTY,
command_class=CommandClass.SENSOR_MULTILEVEL,
endpoint=4,
),
},
dependent_value=ZwaveValueID(2, CommandClass.CONFIGURATION, endpoint=0),
dependent_value=ZwaveValueID(
property_=2, command_class=CommandClass.CONFIGURATION, endpoint=0
),
),
),
# Heatit Z-TRM2fx
@ -438,23 +440,25 @@ DISCOVERY_SCHEMAS = [
lookup_table={
# External Sensor
"A2": ZwaveValueID(
THERMOSTAT_CURRENT_TEMP_PROPERTY,
CommandClass.SENSOR_MULTILEVEL,
property_=THERMOSTAT_CURRENT_TEMP_PROPERTY,
command_class=CommandClass.SENSOR_MULTILEVEL,
endpoint=2,
),
"A2F": ZwaveValueID(
THERMOSTAT_CURRENT_TEMP_PROPERTY,
CommandClass.SENSOR_MULTILEVEL,
property_=THERMOSTAT_CURRENT_TEMP_PROPERTY,
command_class=CommandClass.SENSOR_MULTILEVEL,
endpoint=2,
),
# Floor sensor
"F": ZwaveValueID(
THERMOSTAT_CURRENT_TEMP_PROPERTY,
CommandClass.SENSOR_MULTILEVEL,
property_=THERMOSTAT_CURRENT_TEMP_PROPERTY,
command_class=CommandClass.SENSOR_MULTILEVEL,
endpoint=3,
),
},
dependent_value=ZwaveValueID(2, CommandClass.CONFIGURATION, endpoint=0),
dependent_value=ZwaveValueID(
property_=2, command_class=CommandClass.CONFIGURATION, endpoint=0
),
),
),
# FortrezZ SSA1/SSA2/SSA3

View File

@ -148,6 +148,7 @@ from .const import (
ENTITY_DESC_KEY_TOTAL_INCREASING,
ENTITY_DESC_KEY_VOLTAGE,
)
from .helpers import ZwaveValueID
METER_DEVICE_CLASS_MAP: dict[str, set[MeterScaleType]] = {
ENTITY_DESC_KEY_CURRENT: CURRENT_METER_TYPES,
@ -226,16 +227,6 @@ MULTILEVEL_SENSOR_UNIT_MAP: dict[str, set[MultilevelSensorScaleType]] = {
_LOGGER = logging.getLogger(__name__)
@dataclass
class ZwaveValueID:
"""Class to represent a value ID."""
property_: str | int
command_class: int
endpoint: int | None = None
property_key: str | int | None = None
@dataclass
class BaseDiscoverySchemaDataTemplate:
"""Base class for discovery schema data templates."""
@ -486,7 +477,7 @@ class ConfigurableFanValueMappingDataTemplate(
...
data_template=ConfigurableFanValueMappingDataTemplate(
configuration_option=ZwaveValueID(
5, CommandClass.CONFIGURATION, endpoint=0
property_=5, command_class=CommandClass.CONFIGURATION, endpoint=0
),
configuration_value_to_fan_value_mapping={
0: FanValueMapping(speeds=[(1,33), (34,66), (67,99)]),

View File

@ -223,7 +223,7 @@ class ZWaveBaseEntity(Entity):
value_property: str | int,
command_class: int | None = None,
endpoint: int | None = None,
value_property_key: int | None = None,
value_property_key: int | str | None = None,
add_to_watched_value_ids: bool = True,
check_all_endpoints: bool = False,
) -> ZwaveValue | None:

View File

@ -2,6 +2,7 @@
from __future__ import annotations
from collections.abc import Callable
from dataclasses import astuple, dataclass
from typing import Any, cast
import voluptuous as vol
@ -41,6 +42,21 @@ from .const import (
)
@dataclass
class ZwaveValueID:
"""Class to represent a value ID."""
property_: str | int | None = None
command_class: int | None = None
endpoint: int | None = None
property_key: str | int | None = None
def __post_init__(self) -> None:
"""Post initialization check."""
if all(val is None for val in astuple(self)):
raise ValueError("At least one of the fields must be set.")
@callback
def get_value_of_zwave_value(value: ZwaveValue | None) -> Any | None:
"""Return the value of a ZwaveValue."""

View File

@ -210,6 +210,12 @@ def log_config_state_fixture():
}
@pytest.fixture(name="config_entry_diagnostics", scope="session")
def config_entry_diagnostics_fixture():
"""Load the config entry diagnostics fixture data."""
return json.loads(load_fixture("zwave_js/config_entry_diagnostics.json"))
@pytest.fixture(name="multisensor_6_state", scope="session")
def multisensor_6_state_fixture():
"""Load the multisensor 6 node state fixture data."""

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,7 @@ from unittest.mock import patch
import pytest
from zwave_js_server.event import Event
from homeassistant.components.diagnostics.const import REDACTED
from homeassistant.components.zwave_js.diagnostics import async_get_device_diagnostics
from homeassistant.components.zwave_js.discovery import async_discover_node_values
from homeassistant.components.zwave_js.helpers import get_device_id
@ -17,15 +18,27 @@ from tests.components.diagnostics import (
)
async def test_config_entry_diagnostics(hass, hass_client, integration):
async def test_config_entry_diagnostics(
hass, hass_client, integration, config_entry_diagnostics
):
"""Test the config entry level diagnostics data dump."""
with patch(
"homeassistant.components.zwave_js.diagnostics.dump_msgs",
return_value=[{"hello": "world"}, {"second": "msg"}],
return_value=config_entry_diagnostics,
):
assert await get_diagnostics_for_config_entry(
diagnostics = await get_diagnostics_for_config_entry(
hass, hass_client, integration
) == [{"hello": "world"}, {"second": "msg"}]
)
assert len(diagnostics) == 3
assert diagnostics[0]["homeId"] == REDACTED
nodes = diagnostics[2]["result"]["state"]["nodes"]
for node in nodes:
assert "location" not in node or node["location"] == REDACTED
for value in node["values"]:
if value["commandClass"] == 99 and value["property"] == "userCode":
assert value["value"] == REDACTED
else:
assert value.get("value") != REDACTED
async def test_device_diagnostics(

View File

@ -0,0 +1,10 @@
"""Test Z-Wave JS helpers module."""
import pytest
from homeassistant.components.zwave_js.helpers import ZwaveValueID
async def test_empty_zwave_value_id():
"""Test empty ZwaveValueID is invalid."""
with pytest.raises(ValueError):
ZwaveValueID()