From c609236a6346ed54091992221a0c1a261aae5f86 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sat, 21 Aug 2021 07:24:21 -0500 Subject: [PATCH] Move get_serial_by_id and human_readable_device_name to usb (#54968) --- homeassistant/components/usb/__init__.py | 32 ++++++++ homeassistant/components/zha/config_flow.py | 40 ++-------- homeassistant/components/zha/manifest.json | 2 +- tests/components/usb/test_init.py | 83 ++++++++++++++++++++- tests/components/zha/test_config_flow.py | 50 +------------ 5 files changed, 121 insertions(+), 86 deletions(-) diff --git a/homeassistant/components/usb/__init__.py b/homeassistant/components/usb/__init__.py index ff8bb5fae88..7fd27b32b9a 100644 --- a/homeassistant/components/usb/__init__.py +++ b/homeassistant/components/usb/__init__.py @@ -4,6 +4,7 @@ from __future__ import annotations import dataclasses import datetime import logging +import os import sys from serial.tools.list_ports import comports @@ -26,6 +27,37 @@ _LOGGER = logging.getLogger(__name__) SCAN_INTERVAL = datetime.timedelta(minutes=60) +def human_readable_device_name( + device: str, + serial_number: str | None, + manufacturer: str | None, + description: str | None, + vid: str | None, + pid: str | None, +) -> str: + """Return a human readable name from USBDevice attributes.""" + device_details = f"{device}, s/n: {serial_number or 'n/a'}" + manufacturer_details = f" - {manufacturer}" if manufacturer else "" + vendor_details = f" - {vid}:{pid}" if vid else "" + full_details = f"{device_details}{manufacturer_details}{vendor_details}" + + if not description: + return full_details + return f"{description[:26]} - {full_details}" + + +def get_serial_by_id(dev_path: str) -> str: + """Return a /dev/serial/by-id match for given device if available.""" + by_id = "/dev/serial/by-id" + if not os.path.isdir(by_id): + return dev_path + + for path in (entry.path for entry in os.scandir(by_id) if entry.is_symlink()): + if os.path.realpath(path) == dev_path: + return path + return dev_path + + async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the USB Discovery integration.""" usb = await async_get_usb(hass) diff --git a/homeassistant/components/zha/config_flow.py b/homeassistant/components/zha/config_flow.py index a305c97d436..2d8443642e7 100644 --- a/homeassistant/components/zha/config_flow.py +++ b/homeassistant/components/zha/config_flow.py @@ -1,7 +1,6 @@ """Config flow for ZHA.""" from __future__ import annotations -import os from typing import Any import serial.tools.list_ports @@ -9,6 +8,7 @@ import voluptuous as vol from zigpy.config import CONF_DEVICE, CONF_DEVICE_PATH from homeassistant import config_entries +from homeassistant.components import usb from homeassistant.const import CONF_HOST, CONF_NAME from homeassistant.helpers.typing import DiscoveryInfoType @@ -27,24 +27,6 @@ SUPPORTED_PORT_SETTINGS = ( ) -def _format_port_human_readable( - device: str, - serial_number: str | None, - manufacturer: str | None, - description: str | None, - vid: str | None, - pid: str | None, -) -> str: - device_details = f"{device}, s/n: {serial_number or 'n/a'}" - manufacturer_details = f" - {manufacturer}" if manufacturer else "" - vendor_details = f" - {vid}:{pid}" if vid else "" - full_details = f"{device_details}{manufacturer_details}{vendor_details}" - - if not description: - return full_details - return f"{description[:26]} - {full_details}" - - class ZhaFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): """Handle a config flow.""" @@ -81,7 +63,7 @@ class ZhaFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): port = ports[list_of_ports.index(user_selection)] dev_path = await self.hass.async_add_executor_job( - get_serial_by_id, port.device + usb.get_serial_by_id, port.device ) auto_detected_data = await detect_radios(dev_path) if auto_detected_data is not None: @@ -145,12 +127,12 @@ class ZhaFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): if vid == "10C4" and pid == "8A2A" and "ZigBee" not in description: return self.async_abort(reason="not_zha_device") - dev_path = await self.hass.async_add_executor_job(get_serial_by_id, device) + dev_path = await self.hass.async_add_executor_job(usb.get_serial_by_id, device) self._auto_detected_data = await detect_radios(dev_path) if self._auto_detected_data is None: return self.async_abort(reason="not_zha_device") self._device_path = dev_path - self._title = _format_port_human_readable( + self._title = usb.human_readable_device_name( dev_path, serial_number, manufacturer, @@ -215,7 +197,7 @@ class ZhaFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): self._device_path = user_input.get(CONF_DEVICE_PATH) if await app_cls.probe(user_input): serial_by_id = await self.hass.async_add_executor_job( - get_serial_by_id, user_input[CONF_DEVICE_PATH] + usb.get_serial_by_id, user_input[CONF_DEVICE_PATH] ) user_input[CONF_DEVICE_PATH] = serial_by_id return self.async_create_entry( @@ -255,15 +237,3 @@ async def detect_radios(dev_path: str) -> dict[str, Any] | None: return {CONF_RADIO_TYPE: radio.name, CONF_DEVICE: dev_config} return None - - -def get_serial_by_id(dev_path: str) -> str: - """Return a /dev/serial/by-id match for given device if available.""" - by_id = "/dev/serial/by-id" - if not os.path.isdir(by_id): - return dev_path - - for path in (entry.path for entry in os.scandir(by_id) if entry.is_symlink()): - if os.path.realpath(path) == dev_path: - return path - return dev_path diff --git a/homeassistant/components/zha/manifest.json b/homeassistant/components/zha/manifest.json index 57d926042db..5df8cddc167 100644 --- a/homeassistant/components/zha/manifest.json +++ b/homeassistant/components/zha/manifest.json @@ -28,6 +28,6 @@ "name": "tube*" } ], - "after_dependencies": ["zeroconf"], + "after_dependencies": ["usb", "zeroconf"], "iot_class": "local_polling" } diff --git a/tests/components/usb/test_init.py b/tests/components/usb/test_init.py index cb547edc939..e511fac061e 100644 --- a/tests/components/usb/test_init.py +++ b/tests/components/usb/test_init.py @@ -1,10 +1,12 @@ """Tests for the USB Discovery integration.""" import datetime +import os import sys -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, patch, sentinel import pytest +from homeassistant.components import usb from homeassistant.const import EVENT_HOMEASSISTANT_STARTED from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util @@ -271,3 +273,82 @@ async def test_non_matching_discovered_by_scanner_after_started(hass): await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 0 + + +def test_get_serial_by_id_no_dir(): + """Test serial by id conversion if there's no /dev/serial/by-id.""" + p1 = patch("os.path.isdir", MagicMock(return_value=False)) + p2 = patch("os.scandir") + with p1 as is_dir_mock, p2 as scan_mock: + res = usb.get_serial_by_id(sentinel.path) + assert res is sentinel.path + assert is_dir_mock.call_count == 1 + assert scan_mock.call_count == 0 + + +def test_get_serial_by_id(): + """Test serial by id conversion.""" + p1 = patch("os.path.isdir", MagicMock(return_value=True)) + p2 = patch("os.scandir") + + def _realpath(path): + if path is sentinel.matched_link: + return sentinel.path + return sentinel.serial_link_path + + p3 = patch("os.path.realpath", side_effect=_realpath) + with p1 as is_dir_mock, p2 as scan_mock, p3: + res = usb.get_serial_by_id(sentinel.path) + assert res is sentinel.path + assert is_dir_mock.call_count == 1 + assert scan_mock.call_count == 1 + + entry1 = MagicMock(spec_set=os.DirEntry) + entry1.is_symlink.return_value = True + entry1.path = sentinel.some_path + + entry2 = MagicMock(spec_set=os.DirEntry) + entry2.is_symlink.return_value = False + entry2.path = sentinel.other_path + + entry3 = MagicMock(spec_set=os.DirEntry) + entry3.is_symlink.return_value = True + entry3.path = sentinel.matched_link + + scan_mock.return_value = [entry1, entry2, entry3] + res = usb.get_serial_by_id(sentinel.path) + assert res is sentinel.matched_link + assert is_dir_mock.call_count == 2 + assert scan_mock.call_count == 2 + + +def test_human_readable_device_name(): + """Test human readable device name includes the passed data.""" + name = usb.human_readable_device_name( + "/dev/null", + "612020FD", + "Silicon Labs", + "HubZ Smart Home Controller - HubZ Z-Wave Com Port", + "10C4", + "8A2A", + ) + assert "/dev/null" in name + assert "612020FD" in name + assert "Silicon Labs" in name + assert "HubZ Smart Home Controller - HubZ Z-Wave Com Port"[:26] in name + assert "10C4" in name + assert "8A2A" in name + + name = usb.human_readable_device_name( + "/dev/null", + "612020FD", + "Silicon Labs", + None, + "10C4", + "8A2A", + ) + assert "/dev/null" in name + assert "612020FD" in name + assert "Silicon Labs" in name + assert "10C4" in name + assert "8A2A" in name diff --git a/tests/components/zha/test_config_flow.py b/tests/components/zha/test_config_flow.py index c603e665912..ed975f77eae 100644 --- a/tests/components/zha/test_config_flow.py +++ b/tests/components/zha/test_config_flow.py @@ -1,7 +1,6 @@ """Tests for ZHA config flow.""" -import os -from unittest.mock import AsyncMock, MagicMock, patch, sentinel +from unittest.mock import AsyncMock, MagicMock, patch import pytest import serial.tools.list_ports @@ -400,50 +399,3 @@ async def test_user_port_config(probe_mock, hass): ) assert result["data"][CONF_RADIO_TYPE] == "ezsp" assert probe_mock.await_count == 1 - - -def test_get_serial_by_id_no_dir(): - """Test serial by id conversion if there's no /dev/serial/by-id.""" - p1 = patch("os.path.isdir", MagicMock(return_value=False)) - p2 = patch("os.scandir") - with p1 as is_dir_mock, p2 as scan_mock: - res = config_flow.get_serial_by_id(sentinel.path) - assert res is sentinel.path - assert is_dir_mock.call_count == 1 - assert scan_mock.call_count == 0 - - -def test_get_serial_by_id(): - """Test serial by id conversion.""" - p1 = patch("os.path.isdir", MagicMock(return_value=True)) - p2 = patch("os.scandir") - - def _realpath(path): - if path is sentinel.matched_link: - return sentinel.path - return sentinel.serial_link_path - - p3 = patch("os.path.realpath", side_effect=_realpath) - with p1 as is_dir_mock, p2 as scan_mock, p3: - res = config_flow.get_serial_by_id(sentinel.path) - assert res is sentinel.path - assert is_dir_mock.call_count == 1 - assert scan_mock.call_count == 1 - - entry1 = MagicMock(spec_set=os.DirEntry) - entry1.is_symlink.return_value = True - entry1.path = sentinel.some_path - - entry2 = MagicMock(spec_set=os.DirEntry) - entry2.is_symlink.return_value = False - entry2.path = sentinel.other_path - - entry3 = MagicMock(spec_set=os.DirEntry) - entry3.is_symlink.return_value = True - entry3.path = sentinel.matched_link - - scan_mock.return_value = [entry1, entry2, entry3] - res = config_flow.get_serial_by_id(sentinel.path) - assert res is sentinel.matched_link - assert is_dir_mock.call_count == 2 - assert scan_mock.call_count == 2