core/homeassistant/components/airzone_cloud/diagnostics.py

139 lines
3.6 KiB
Python

"""Support for the Airzone Cloud diagnostics."""
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from aioairzone_cloud.const import (
API_STAT_AP_MAC,
API_STAT_SSID,
AZD_WIFI_MAC,
RAW_DEVICES_STATUS,
RAW_INSTALLATIONS,
RAW_WEBSERVERS,
)
from homeassistant.components.diagnostics.util import async_redact_data
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import HomeAssistant
from .const import DOMAIN
from .coordinator import AirzoneUpdateCoordinator
TO_REDACT_API = [
"_id",
"city",
"group_id",
"location_id",
"pin",
"user_id",
API_STAT_AP_MAC,
API_STAT_SSID,
]
TO_REDACT_CONFIG = [
CONF_PASSWORD,
CONF_USERNAME,
]
TO_REDACT_COORD = [
AZD_WIFI_MAC,
]
def gather_ids(api_data: dict[str, Any]) -> dict[str, Any]:
"""Return dict with IDs."""
ids: dict[str, Any] = {}
dev_idx = 1
for dev_id in api_data[RAW_DEVICES_STATUS]:
if dev_id not in ids:
ids[dev_id] = f"device{dev_idx}"
dev_idx += 1
inst_idx = 1
for inst_id in api_data[RAW_INSTALLATIONS]:
if inst_id not in ids:
ids[inst_id] = f"installation{inst_idx}"
inst_idx += 1
ws_idx = 1
for ws_id in api_data[RAW_WEBSERVERS]:
if ws_id not in ids:
ids[ws_id] = f"webserver{ws_idx}"
ws_idx += 1
return ids
def redact_keys(data: Any, ids: dict[str, Any]) -> Any:
"""Redact sensitive keys in a dict."""
if not isinstance(data, (Mapping, list)):
return data
if isinstance(data, list):
return [redact_keys(val, ids) for val in data]
redacted = {**data}
keys = list(redacted)
for key in keys:
if key in ids:
redacted[ids[key]] = redacted.pop(key)
elif isinstance(redacted[key], Mapping):
redacted[key] = redact_keys(redacted[key], ids)
elif isinstance(redacted[key], list):
redacted[key] = [redact_keys(item, ids) for item in redacted[key]]
return redacted
def redact_values(data: Any, ids: dict[str, Any]) -> Any:
"""Redact sensitive values in a dict."""
if not isinstance(data, (Mapping, list)):
if data in ids:
return ids[data]
return data
if isinstance(data, list):
return [redact_values(val, ids) for val in data]
redacted = {**data}
for key, value in redacted.items():
if value is None:
continue
if isinstance(value, Mapping):
redacted[key] = redact_values(value, ids)
elif isinstance(value, list):
redacted[key] = [redact_values(item, ids) for item in value]
elif value in ids:
redacted[key] = ids[value]
return redacted
def redact_all(
data: dict[str, Any], ids: dict[str, Any], to_redact: list[str]
) -> dict[str, Any]:
"""Redact sensitive data."""
_data = redact_keys(data, ids)
_data = redact_values(_data, ids)
return async_redact_data(_data, to_redact)
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, config_entry: ConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
coordinator: AirzoneUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
raw_data = coordinator.airzone.raw_data()
ids = gather_ids(raw_data)
return {
"api_data": redact_all(raw_data, ids, TO_REDACT_API),
"config_entry": redact_all(config_entry.as_dict(), ids, TO_REDACT_CONFIG),
"coord_data": redact_all(coordinator.data, ids, TO_REDACT_COORD),
}