2022-01-18 04:42:18 +00:00
|
|
|
"""The Diagnostics integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
|
2022-01-20 04:48:32 +00:00
|
|
|
from http import HTTPStatus
|
2022-01-18 04:42:18 +00:00
|
|
|
import json
|
|
|
|
import logging
|
|
|
|
from typing import Protocol
|
|
|
|
|
|
|
|
from aiohttp import web
|
|
|
|
import voluptuous as vol
|
|
|
|
|
|
|
|
from homeassistant.components import http, websocket_api
|
|
|
|
from homeassistant.config_entries import ConfigEntry
|
|
|
|
from homeassistant.core import HomeAssistant, callback
|
|
|
|
from homeassistant.helpers import integration_platform
|
2022-01-20 04:48:32 +00:00
|
|
|
from homeassistant.helpers.device_registry import DeviceEntry, async_get
|
2022-01-18 04:42:18 +00:00
|
|
|
from homeassistant.helpers.json import ExtendedJSONEncoder
|
2022-01-19 17:00:34 +00:00
|
|
|
from homeassistant.helpers.typing import ConfigType
|
2022-01-18 04:42:18 +00:00
|
|
|
from homeassistant.util.json import (
|
|
|
|
find_paths_unserializable_data,
|
|
|
|
format_unserializable_data,
|
|
|
|
)
|
|
|
|
|
2022-01-20 04:48:32 +00:00
|
|
|
from .const import DOMAIN, REDACTED, DiagnosticsSubType, DiagnosticsType
|
2022-01-19 21:51:03 +00:00
|
|
|
|
|
|
|
__all__ = ["REDACTED"]
|
2022-01-18 04:42:18 +00:00
|
|
|
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
2022-01-19 17:00:34 +00:00
|
|
|
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
2022-01-18 04:42:18 +00:00
|
|
|
"""Set up Diagnostics from a config entry."""
|
|
|
|
hass.data[DOMAIN] = {}
|
|
|
|
|
|
|
|
await integration_platform.async_process_integration_platforms(
|
|
|
|
hass, DOMAIN, _register_diagnostics_platform
|
|
|
|
)
|
|
|
|
|
|
|
|
websocket_api.async_register_command(hass, handle_info)
|
2022-01-20 04:48:32 +00:00
|
|
|
websocket_api.async_register_command(hass, handle_get)
|
2022-01-18 04:42:18 +00:00
|
|
|
hass.http.register_view(DownloadDiagnosticsView)
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
class DiagnosticsProtocol(Protocol):
|
|
|
|
"""Define the format that diagnostics platforms can have."""
|
|
|
|
|
|
|
|
async def async_get_config_entry_diagnostics(
|
|
|
|
self, hass: HomeAssistant, config_entry: ConfigEntry
|
|
|
|
) -> dict:
|
|
|
|
"""Return diagnostics for a config entry."""
|
|
|
|
|
2022-01-20 04:48:32 +00:00
|
|
|
async def async_get_device_diagnostics(
|
|
|
|
self, hass: HomeAssistant, config_entry: ConfigEntry, device: DeviceEntry
|
|
|
|
) -> dict:
|
|
|
|
"""Return diagnostics for a device."""
|
|
|
|
|
2022-01-18 04:42:18 +00:00
|
|
|
|
|
|
|
async def _register_diagnostics_platform(
|
|
|
|
hass: HomeAssistant, integration_domain: str, platform: DiagnosticsProtocol
|
|
|
|
):
|
|
|
|
"""Register a diagnostics platform."""
|
|
|
|
hass.data[DOMAIN][integration_domain] = {
|
2022-01-20 04:48:32 +00:00
|
|
|
DiagnosticsType.CONFIG_ENTRY.value: getattr(
|
|
|
|
platform, "async_get_config_entry_diagnostics", None
|
|
|
|
),
|
|
|
|
DiagnosticsSubType.DEVICE.value: getattr(
|
|
|
|
platform, "async_get_device_diagnostics", None
|
|
|
|
),
|
2022-01-18 04:42:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@websocket_api.require_admin
|
|
|
|
@websocket_api.websocket_command({vol.Required("type"): "diagnostics/list"})
|
|
|
|
@callback
|
|
|
|
def handle_info(
|
|
|
|
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
|
|
|
|
):
|
|
|
|
"""List all possible diagnostic handlers."""
|
|
|
|
connection.send_result(
|
|
|
|
msg["id"],
|
|
|
|
[
|
|
|
|
{
|
|
|
|
"domain": domain,
|
|
|
|
"handlers": {key: val is not None for key, val in info.items()},
|
|
|
|
}
|
|
|
|
for domain, info in hass.data[DOMAIN].items()
|
|
|
|
],
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2022-01-20 04:48:32 +00:00
|
|
|
@websocket_api.require_admin
|
|
|
|
@websocket_api.websocket_command(
|
|
|
|
{
|
|
|
|
vol.Required("type"): "diagnostics/get",
|
|
|
|
vol.Required("domain"): str,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
@callback
|
|
|
|
def handle_get(
|
|
|
|
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
|
|
|
|
):
|
|
|
|
"""List all possible diagnostic handlers."""
|
|
|
|
domain = msg["domain"]
|
|
|
|
info = hass.data[DOMAIN].get(domain)
|
|
|
|
|
|
|
|
if info is None:
|
|
|
|
connection.send_error(
|
|
|
|
msg["id"], websocket_api.ERR_NOT_FOUND, "Domain not supported"
|
|
|
|
)
|
|
|
|
return
|
|
|
|
|
|
|
|
connection.send_result(
|
|
|
|
msg["id"],
|
|
|
|
{
|
|
|
|
"domain": domain,
|
|
|
|
"handlers": {key: val is not None for key, val in info.items()},
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def _get_json_file_response(
|
|
|
|
data: dict | list,
|
|
|
|
filename: str,
|
|
|
|
d_type: DiagnosticsType,
|
|
|
|
d_id: str,
|
|
|
|
sub_type: DiagnosticsSubType | None = None,
|
|
|
|
sub_id: str | None = None,
|
|
|
|
) -> web.Response:
|
|
|
|
"""Return JSON file from dictionary."""
|
|
|
|
try:
|
|
|
|
json_data = json.dumps(data, indent=2, cls=ExtendedJSONEncoder)
|
|
|
|
except TypeError:
|
|
|
|
_LOGGER.error(
|
|
|
|
"Failed to serialize to JSON: %s/%s%s. Bad data at %s",
|
|
|
|
d_type.value,
|
|
|
|
d_id,
|
|
|
|
f"/{sub_type.value}/{sub_id}" if sub_type is not None else "",
|
|
|
|
format_unserializable_data(find_paths_unserializable_data(data)),
|
|
|
|
)
|
|
|
|
return web.Response(status=HTTPStatus.INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
|
|
return web.Response(
|
|
|
|
body=json_data,
|
|
|
|
content_type="application/json",
|
|
|
|
headers={"Content-Disposition": f'attachment; filename="{filename}.json"'},
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2022-01-18 04:42:18 +00:00
|
|
|
class DownloadDiagnosticsView(http.HomeAssistantView):
|
|
|
|
"""Download diagnostics view."""
|
|
|
|
|
|
|
|
url = "/api/diagnostics/{d_type}/{d_id}"
|
2022-01-20 04:48:32 +00:00
|
|
|
extra_urls = ["/api/diagnostics/{d_type}/{d_id}/{sub_type}/{sub_id}"]
|
2022-01-18 04:42:18 +00:00
|
|
|
name = "api:diagnostics"
|
|
|
|
|
|
|
|
async def get( # pylint: disable=no-self-use
|
2022-01-20 04:48:32 +00:00
|
|
|
self,
|
|
|
|
request: web.Request,
|
|
|
|
d_type: str,
|
|
|
|
d_id: str,
|
|
|
|
sub_type: str | None = None,
|
|
|
|
sub_id: str | None = None,
|
2022-01-18 04:42:18 +00:00
|
|
|
) -> web.Response:
|
|
|
|
"""Download diagnostics."""
|
2022-01-20 04:48:32 +00:00
|
|
|
# t_type handling
|
|
|
|
try:
|
|
|
|
d_type = DiagnosticsType(d_type)
|
|
|
|
except ValueError:
|
|
|
|
return web.Response(status=HTTPStatus.BAD_REQUEST)
|
2022-01-18 04:42:18 +00:00
|
|
|
|
|
|
|
hass = request.app["hass"]
|
|
|
|
config_entry = hass.config_entries.async_get_entry(d_id)
|
|
|
|
|
|
|
|
if config_entry is None:
|
2022-01-20 04:48:32 +00:00
|
|
|
return web.Response(status=HTTPStatus.NOT_FOUND)
|
2022-01-18 04:42:18 +00:00
|
|
|
|
|
|
|
info = hass.data[DOMAIN].get(config_entry.domain)
|
|
|
|
|
|
|
|
if info is None:
|
2022-01-20 04:48:32 +00:00
|
|
|
return web.Response(status=HTTPStatus.NOT_FOUND)
|
2022-01-18 04:42:18 +00:00
|
|
|
|
2022-01-20 04:48:32 +00:00
|
|
|
filename = f"{config_entry.domain}-{config_entry.entry_id}"
|
2022-01-18 04:42:18 +00:00
|
|
|
|
2022-01-20 04:48:32 +00:00
|
|
|
if sub_type is None:
|
|
|
|
if info[d_type.value] is None:
|
|
|
|
return web.Response(status=HTTPStatus.NOT_FOUND)
|
|
|
|
data = await info[d_type.value](hass, config_entry)
|
|
|
|
filename = f"{d_type}-{filename}"
|
|
|
|
return _get_json_file_response(data, filename, d_type.value, d_id)
|
2022-01-18 04:42:18 +00:00
|
|
|
|
2022-01-20 04:48:32 +00:00
|
|
|
# sub_type handling
|
2022-01-18 04:42:18 +00:00
|
|
|
try:
|
2022-01-20 04:48:32 +00:00
|
|
|
sub_type = DiagnosticsSubType(sub_type)
|
|
|
|
except ValueError:
|
|
|
|
return web.Response(status=HTTPStatus.BAD_REQUEST)
|
|
|
|
|
|
|
|
dev_reg = async_get(hass)
|
|
|
|
assert sub_id
|
|
|
|
device = dev_reg.async_get(sub_id)
|
|
|
|
|
|
|
|
if device is None:
|
|
|
|
return web.Response(status=HTTPStatus.NOT_FOUND)
|
|
|
|
|
|
|
|
filename += f"-{device.name}-{device.id}"
|
|
|
|
|
|
|
|
if info[sub_type.value] is None:
|
|
|
|
return web.Response(status=HTTPStatus.NOT_FOUND)
|
|
|
|
|
2022-01-20 06:19:09 +00:00
|
|
|
data = await info[sub_type.value](hass, config_entry, device)
|
2022-01-20 04:48:32 +00:00
|
|
|
return _get_json_file_response(data, filename, d_type, d_id, sub_type, sub_id)
|