2023-01-18 16:33:15 +00:00
|
|
|
"""Test OTBR Websocket API."""
|
2023-06-01 10:32:14 +00:00
|
|
|
from unittest.mock import patch
|
2023-01-18 16:33:15 +00:00
|
|
|
|
|
|
|
import pytest
|
2023-02-27 12:59:16 +00:00
|
|
|
import python_otbr_api
|
2023-01-18 16:33:15 +00:00
|
|
|
|
2023-03-14 14:28:06 +00:00
|
|
|
from homeassistant.components import otbr, thread
|
2023-01-18 16:33:15 +00:00
|
|
|
from homeassistant.core import HomeAssistant
|
|
|
|
from homeassistant.setup import async_setup_component
|
|
|
|
|
2023-03-14 14:28:06 +00:00
|
|
|
from . import BASE_URL, DATASET_CH15, DATASET_CH16
|
2023-01-18 16:33:15 +00:00
|
|
|
|
|
|
|
from tests.test_util.aiohttp import AiohttpClientMocker
|
2023-02-15 15:19:46 +00:00
|
|
|
from tests.typing import WebSocketGenerator
|
2023-01-18 16:33:15 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
async def websocket_client(hass, hass_ws_client):
|
|
|
|
"""Create a websocket client."""
|
|
|
|
return await hass_ws_client(hass)
|
|
|
|
|
|
|
|
|
|
|
|
async def test_get_info(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-01-18 16:33:15 +00:00
|
|
|
websocket_client,
|
2023-02-15 09:50:02 +00:00
|
|
|
) -> None:
|
2023-01-18 16:33:15 +00:00
|
|
|
"""Test async_get_info."""
|
|
|
|
|
2023-07-19 08:48:32 +00:00
|
|
|
with patch(
|
|
|
|
"python_otbr_api.OTBR.get_active_dataset",
|
|
|
|
return_value=python_otbr_api.ActiveDataSet(channel=16),
|
|
|
|
), patch("python_otbr_api.OTBR.get_active_dataset_tlvs", return_value=DATASET_CH16):
|
|
|
|
await websocket_client.send_json_auto_id({"type": "otbr/info"})
|
|
|
|
msg = await websocket_client.receive_json()
|
2023-01-18 16:33:15 +00:00
|
|
|
|
|
|
|
assert msg["success"]
|
|
|
|
assert msg["result"] == {
|
|
|
|
"url": BASE_URL,
|
2023-03-13 18:09:09 +00:00
|
|
|
"active_dataset_tlvs": DATASET_CH16.hex().lower(),
|
2023-07-19 08:48:32 +00:00
|
|
|
"channel": 16,
|
2023-01-18 16:33:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
async def test_get_info_no_entry(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-02-15 15:19:46 +00:00
|
|
|
hass_ws_client: WebSocketGenerator,
|
2023-02-15 09:50:02 +00:00
|
|
|
) -> None:
|
2023-01-18 16:33:15 +00:00
|
|
|
"""Test async_get_info."""
|
|
|
|
await async_setup_component(hass, "otbr", {})
|
2023-02-15 15:19:46 +00:00
|
|
|
websocket_client = await hass_ws_client(hass)
|
2023-03-14 20:26:16 +00:00
|
|
|
await websocket_client.send_json_auto_id({"type": "otbr/info"})
|
2023-01-18 16:33:15 +00:00
|
|
|
|
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "not_loaded"
|
|
|
|
|
|
|
|
|
|
|
|
async def test_get_info_fetch_fails(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-01-18 16:33:15 +00:00
|
|
|
websocket_client,
|
2023-02-15 09:50:02 +00:00
|
|
|
) -> None:
|
2023-01-18 16:33:15 +00:00
|
|
|
"""Test async_get_info."""
|
|
|
|
with patch(
|
2023-07-19 08:48:32 +00:00
|
|
|
"python_otbr_api.OTBR.get_active_dataset",
|
2023-02-27 12:59:16 +00:00
|
|
|
side_effect=python_otbr_api.OTBRError,
|
2023-01-18 16:33:15 +00:00
|
|
|
):
|
2023-03-14 20:26:16 +00:00
|
|
|
await websocket_client.send_json_auto_id({"type": "otbr/info"})
|
2023-01-18 16:33:15 +00:00
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "get_dataset_failed"
|
2023-02-27 15:19:13 +00:00
|
|
|
|
|
|
|
|
|
|
|
async def test_create_network(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-02-27 15:19:13 +00:00
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test create network."""
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
"python_otbr_api.OTBR.create_active_dataset"
|
|
|
|
) as create_dataset_mock, patch(
|
2023-08-08 09:38:01 +00:00
|
|
|
"python_otbr_api.OTBR.factory_reset"
|
|
|
|
) as factory_reset_mock, patch(
|
2023-02-27 15:19:13 +00:00
|
|
|
"python_otbr_api.OTBR.set_enabled"
|
2023-03-13 18:09:09 +00:00
|
|
|
) as set_enabled_mock, patch(
|
|
|
|
"python_otbr_api.OTBR.get_active_dataset_tlvs", return_value=DATASET_CH16
|
|
|
|
) as get_active_dataset_tlvs_mock, patch(
|
|
|
|
"homeassistant.components.thread.dataset_store.DatasetStore.async_add"
|
|
|
|
) as mock_add:
|
2023-03-14 20:26:16 +00:00
|
|
|
await websocket_client.send_json_auto_id({"type": "otbr/create_network"})
|
2023-02-27 15:19:13 +00:00
|
|
|
|
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
assert msg["success"]
|
|
|
|
assert msg["result"] is None
|
|
|
|
|
|
|
|
create_dataset_mock.assert_called_once_with(
|
2023-05-30 08:11:21 +00:00
|
|
|
python_otbr_api.models.ActiveDataSet(channel=15, network_name="home-assistant")
|
2023-02-27 15:19:13 +00:00
|
|
|
)
|
2023-08-08 09:38:01 +00:00
|
|
|
factory_reset_mock.assert_called_once_with()
|
2023-02-27 15:19:13 +00:00
|
|
|
assert len(set_enabled_mock.mock_calls) == 2
|
|
|
|
assert set_enabled_mock.mock_calls[0][1][0] is False
|
|
|
|
assert set_enabled_mock.mock_calls[1][1][0] is True
|
2023-03-13 18:09:09 +00:00
|
|
|
get_active_dataset_tlvs_mock.assert_called_once()
|
2023-08-14 13:47:18 +00:00
|
|
|
mock_add.assert_called_once_with(otbr.DOMAIN, DATASET_CH16.hex(), None)
|
2023-02-27 15:19:13 +00:00
|
|
|
|
|
|
|
|
|
|
|
async def test_create_network_no_entry(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
|
|
|
hass_ws_client: WebSocketGenerator,
|
|
|
|
) -> None:
|
|
|
|
"""Test create network."""
|
|
|
|
await async_setup_component(hass, "otbr", {})
|
|
|
|
websocket_client = await hass_ws_client(hass)
|
2023-03-14 20:26:16 +00:00
|
|
|
await websocket_client.send_json_auto_id({"type": "otbr/create_network"})
|
2023-02-27 15:19:13 +00:00
|
|
|
|
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "not_loaded"
|
|
|
|
|
|
|
|
|
2023-03-13 12:56:08 +00:00
|
|
|
async def test_create_network_fails_1(
|
2023-02-27 15:19:13 +00:00
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-02-27 15:19:13 +00:00
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test create network."""
|
|
|
|
with patch(
|
|
|
|
"python_otbr_api.OTBR.set_enabled",
|
|
|
|
side_effect=python_otbr_api.OTBRError,
|
|
|
|
):
|
2023-03-14 20:26:16 +00:00
|
|
|
await websocket_client.send_json_auto_id({"type": "otbr/create_network"})
|
2023-02-27 15:19:13 +00:00
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "set_enabled_failed"
|
|
|
|
|
|
|
|
|
2023-03-13 12:56:08 +00:00
|
|
|
async def test_create_network_fails_2(
|
2023-02-27 15:19:13 +00:00
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-02-27 15:19:13 +00:00
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test create network."""
|
|
|
|
with patch(
|
|
|
|
"python_otbr_api.OTBR.set_enabled",
|
|
|
|
), patch(
|
|
|
|
"python_otbr_api.OTBR.create_active_dataset",
|
|
|
|
side_effect=python_otbr_api.OTBRError,
|
2023-08-08 09:38:01 +00:00
|
|
|
), patch("python_otbr_api.OTBR.factory_reset"):
|
2023-03-14 20:26:16 +00:00
|
|
|
await websocket_client.send_json_auto_id({"type": "otbr/create_network"})
|
2023-02-27 15:19:13 +00:00
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "create_active_dataset_failed"
|
|
|
|
|
|
|
|
|
2023-03-13 12:56:08 +00:00
|
|
|
async def test_create_network_fails_3(
|
2023-02-27 15:19:13 +00:00
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-02-27 15:19:13 +00:00
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test create network."""
|
|
|
|
with patch(
|
|
|
|
"python_otbr_api.OTBR.set_enabled",
|
|
|
|
side_effect=[None, python_otbr_api.OTBRError],
|
|
|
|
), patch(
|
|
|
|
"python_otbr_api.OTBR.create_active_dataset",
|
2023-06-07 16:42:40 +00:00
|
|
|
), patch(
|
2023-08-08 09:38:01 +00:00
|
|
|
"python_otbr_api.OTBR.factory_reset"
|
2023-02-27 15:19:13 +00:00
|
|
|
):
|
2023-03-14 20:26:16 +00:00
|
|
|
await websocket_client.send_json_auto_id({"type": "otbr/create_network"})
|
2023-02-27 15:19:13 +00:00
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "set_enabled_failed"
|
2023-03-08 20:52:53 +00:00
|
|
|
|
|
|
|
|
2023-03-13 18:09:09 +00:00
|
|
|
async def test_create_network_fails_4(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-03-13 18:09:09 +00:00
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test create network."""
|
|
|
|
with patch("python_otbr_api.OTBR.set_enabled"), patch(
|
|
|
|
"python_otbr_api.OTBR.create_active_dataset"
|
|
|
|
), patch(
|
|
|
|
"python_otbr_api.OTBR.get_active_dataset_tlvs",
|
|
|
|
side_effect=python_otbr_api.OTBRError,
|
2023-06-07 16:42:40 +00:00
|
|
|
), patch(
|
2023-08-08 09:38:01 +00:00
|
|
|
"python_otbr_api.OTBR.factory_reset"
|
2023-03-13 18:09:09 +00:00
|
|
|
):
|
2023-03-14 20:26:16 +00:00
|
|
|
await websocket_client.send_json_auto_id({"type": "otbr/create_network"})
|
2023-03-13 18:09:09 +00:00
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "get_active_dataset_tlvs_failed"
|
|
|
|
|
|
|
|
|
|
|
|
async def test_create_network_fails_5(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-03-13 18:09:09 +00:00
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test create network."""
|
|
|
|
with patch("python_otbr_api.OTBR.set_enabled"), patch(
|
|
|
|
"python_otbr_api.OTBR.create_active_dataset"
|
2023-06-07 16:42:40 +00:00
|
|
|
), patch("python_otbr_api.OTBR.get_active_dataset_tlvs", return_value=None), patch(
|
2023-08-08 09:38:01 +00:00
|
|
|
"python_otbr_api.OTBR.factory_reset"
|
2023-06-07 16:42:40 +00:00
|
|
|
):
|
2023-03-14 20:26:16 +00:00
|
|
|
await websocket_client.send_json_auto_id({"type": "otbr/create_network"})
|
2023-03-13 18:09:09 +00:00
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "get_active_dataset_tlvs_empty"
|
|
|
|
|
|
|
|
|
2023-06-07 16:42:40 +00:00
|
|
|
async def test_create_network_fails_6(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-06-07 16:42:40 +00:00
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test create network."""
|
|
|
|
with patch("python_otbr_api.OTBR.set_enabled"), patch(
|
|
|
|
"python_otbr_api.OTBR.create_active_dataset"
|
|
|
|
), patch("python_otbr_api.OTBR.get_active_dataset_tlvs", return_value=None), patch(
|
2023-08-08 09:38:01 +00:00
|
|
|
"python_otbr_api.OTBR.factory_reset",
|
2023-06-07 16:42:40 +00:00
|
|
|
side_effect=python_otbr_api.OTBRError,
|
|
|
|
):
|
|
|
|
await websocket_client.send_json_auto_id({"type": "otbr/create_network"})
|
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert not msg["success"]
|
2023-08-08 09:38:01 +00:00
|
|
|
assert msg["error"]["code"] == "factory_reset_failed"
|
2023-06-07 16:42:40 +00:00
|
|
|
|
|
|
|
|
2023-03-14 14:28:06 +00:00
|
|
|
async def test_set_network(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-03-14 14:28:06 +00:00
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test set network."""
|
|
|
|
|
|
|
|
await thread.async_add_dataset(hass, "test", DATASET_CH15.hex())
|
|
|
|
dataset_store = await thread.dataset_store.async_get_store(hass)
|
|
|
|
dataset_id = list(dataset_store.datasets)[1]
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
"python_otbr_api.OTBR.set_active_dataset_tlvs"
|
|
|
|
) as set_active_dataset_tlvs_mock, patch(
|
|
|
|
"python_otbr_api.OTBR.set_enabled"
|
|
|
|
) as set_enabled_mock:
|
|
|
|
await websocket_client.send_json_auto_id(
|
|
|
|
{
|
|
|
|
"type": "otbr/set_network",
|
|
|
|
"dataset_id": dataset_id,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
assert msg["success"]
|
|
|
|
assert msg["result"] is None
|
|
|
|
|
|
|
|
set_active_dataset_tlvs_mock.assert_called_once_with(DATASET_CH15)
|
|
|
|
assert len(set_enabled_mock.mock_calls) == 2
|
|
|
|
assert set_enabled_mock.mock_calls[0][1][0] is False
|
|
|
|
assert set_enabled_mock.mock_calls[1][1][0] is True
|
|
|
|
|
|
|
|
|
|
|
|
async def test_set_network_no_entry(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
|
|
|
hass_ws_client: WebSocketGenerator,
|
|
|
|
) -> None:
|
|
|
|
"""Test set network."""
|
|
|
|
await async_setup_component(hass, "otbr", {})
|
|
|
|
websocket_client = await hass_ws_client(hass)
|
|
|
|
await websocket_client.send_json_auto_id(
|
|
|
|
{
|
|
|
|
"type": "otbr/set_network",
|
|
|
|
"dataset_id": "abc",
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "not_loaded"
|
|
|
|
|
|
|
|
|
|
|
|
async def test_set_network_channel_conflict(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-06-01 10:32:14 +00:00
|
|
|
multiprotocol_addon_manager_mock,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-03-14 14:28:06 +00:00
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test set network."""
|
|
|
|
|
|
|
|
dataset_store = await thread.dataset_store.async_get_store(hass)
|
|
|
|
dataset_id = list(dataset_store.datasets)[0]
|
|
|
|
|
2023-06-01 10:32:14 +00:00
|
|
|
multiprotocol_addon_manager_mock.async_get_channel.return_value = 15
|
2023-03-14 14:28:06 +00:00
|
|
|
|
2023-06-01 10:32:14 +00:00
|
|
|
await websocket_client.send_json_auto_id(
|
|
|
|
{
|
|
|
|
"type": "otbr/set_network",
|
|
|
|
"dataset_id": dataset_id,
|
|
|
|
}
|
|
|
|
)
|
2023-03-28 10:34:25 +00:00
|
|
|
|
2023-06-01 10:32:14 +00:00
|
|
|
msg = await websocket_client.receive_json()
|
2023-03-14 14:28:06 +00:00
|
|
|
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "channel_conflict"
|
|
|
|
|
|
|
|
|
|
|
|
async def test_set_network_unknown_dataset(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-03-14 14:28:06 +00:00
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test set network."""
|
|
|
|
|
|
|
|
await websocket_client.send_json_auto_id(
|
|
|
|
{
|
|
|
|
"type": "otbr/set_network",
|
|
|
|
"dataset_id": "abc",
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "unknown_dataset"
|
|
|
|
|
|
|
|
|
|
|
|
async def test_set_network_fails_1(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-03-14 14:28:06 +00:00
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test set network."""
|
|
|
|
await thread.async_add_dataset(hass, "test", DATASET_CH15.hex())
|
|
|
|
dataset_store = await thread.dataset_store.async_get_store(hass)
|
|
|
|
dataset_id = list(dataset_store.datasets)[1]
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
"python_otbr_api.OTBR.set_enabled",
|
|
|
|
side_effect=python_otbr_api.OTBRError,
|
|
|
|
):
|
|
|
|
await websocket_client.send_json_auto_id(
|
|
|
|
{
|
|
|
|
"type": "otbr/set_network",
|
|
|
|
"dataset_id": dataset_id,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "set_enabled_failed"
|
|
|
|
|
|
|
|
|
|
|
|
async def test_set_network_fails_2(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-03-14 14:28:06 +00:00
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test set network."""
|
|
|
|
await thread.async_add_dataset(hass, "test", DATASET_CH15.hex())
|
|
|
|
dataset_store = await thread.dataset_store.async_get_store(hass)
|
|
|
|
dataset_id = list(dataset_store.datasets)[1]
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
"python_otbr_api.OTBR.set_enabled",
|
|
|
|
), patch(
|
|
|
|
"python_otbr_api.OTBR.set_active_dataset_tlvs",
|
|
|
|
side_effect=python_otbr_api.OTBRError,
|
|
|
|
):
|
|
|
|
await websocket_client.send_json_auto_id(
|
|
|
|
{
|
|
|
|
"type": "otbr/set_network",
|
|
|
|
"dataset_id": dataset_id,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "set_active_dataset_tlvs_failed"
|
|
|
|
|
|
|
|
|
|
|
|
async def test_set_network_fails_3(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-03-14 14:28:06 +00:00
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test set network."""
|
|
|
|
await thread.async_add_dataset(hass, "test", DATASET_CH15.hex())
|
|
|
|
dataset_store = await thread.dataset_store.async_get_store(hass)
|
|
|
|
dataset_id = list(dataset_store.datasets)[1]
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
"python_otbr_api.OTBR.set_enabled",
|
|
|
|
side_effect=[None, python_otbr_api.OTBRError],
|
|
|
|
), patch(
|
|
|
|
"python_otbr_api.OTBR.set_active_dataset_tlvs",
|
|
|
|
):
|
|
|
|
await websocket_client.send_json_auto_id(
|
|
|
|
{
|
|
|
|
"type": "otbr/set_network",
|
|
|
|
"dataset_id": dataset_id,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "set_enabled_failed"
|
|
|
|
|
|
|
|
|
2023-03-08 20:52:53 +00:00
|
|
|
async def test_get_extended_address(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-03-08 20:52:53 +00:00
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test get extended address."""
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
"python_otbr_api.OTBR.get_extended_address",
|
|
|
|
return_value=bytes.fromhex("4EF6C4F3FF750626"),
|
|
|
|
):
|
2023-03-14 20:26:16 +00:00
|
|
|
await websocket_client.send_json_auto_id({"type": "otbr/get_extended_address"})
|
2023-03-08 20:52:53 +00:00
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert msg["success"]
|
|
|
|
assert msg["result"] == {"extended_address": "4EF6C4F3FF750626".lower()}
|
|
|
|
|
|
|
|
|
|
|
|
async def test_get_extended_address_no_entry(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
|
|
|
hass_ws_client: WebSocketGenerator,
|
|
|
|
) -> None:
|
|
|
|
"""Test get extended address."""
|
|
|
|
await async_setup_component(hass, "otbr", {})
|
|
|
|
websocket_client = await hass_ws_client(hass)
|
2023-03-14 20:26:16 +00:00
|
|
|
await websocket_client.send_json_auto_id({"type": "otbr/get_extended_address"})
|
2023-03-08 20:52:53 +00:00
|
|
|
|
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "not_loaded"
|
|
|
|
|
|
|
|
|
|
|
|
async def test_get_extended_address_fetch_fails(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
2023-07-19 08:48:32 +00:00
|
|
|
otbr_config_entry_multipan,
|
2023-03-08 20:52:53 +00:00
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test get extended address."""
|
|
|
|
with patch(
|
|
|
|
"python_otbr_api.OTBR.get_extended_address",
|
|
|
|
side_effect=python_otbr_api.OTBRError,
|
|
|
|
):
|
2023-03-14 20:26:16 +00:00
|
|
|
await websocket_client.send_json_auto_id({"type": "otbr/get_extended_address"})
|
2023-03-08 20:52:53 +00:00
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "get_extended_address_failed"
|
2023-07-19 08:48:32 +00:00
|
|
|
|
|
|
|
|
|
|
|
async def test_set_channel(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
|
|
|
otbr_config_entry_thread,
|
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test set channel."""
|
|
|
|
|
|
|
|
with patch("python_otbr_api.OTBR.set_channel"):
|
|
|
|
await websocket_client.send_json_auto_id(
|
|
|
|
{"type": "otbr/set_channel", "channel": 12}
|
|
|
|
)
|
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert msg["success"]
|
|
|
|
assert msg["result"] == {"delay": 300.0}
|
|
|
|
|
|
|
|
|
|
|
|
async def test_set_channel_multiprotocol(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
|
|
|
otbr_config_entry_multipan,
|
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test set channel."""
|
|
|
|
|
|
|
|
with patch("python_otbr_api.OTBR.set_channel"):
|
|
|
|
await websocket_client.send_json_auto_id(
|
|
|
|
{"type": "otbr/set_channel", "channel": 12}
|
|
|
|
)
|
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "multiprotocol_enabled"
|
|
|
|
|
|
|
|
|
|
|
|
async def test_set_channel_no_entry(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
|
|
|
hass_ws_client: WebSocketGenerator,
|
|
|
|
) -> None:
|
|
|
|
"""Test set channel."""
|
|
|
|
await async_setup_component(hass, "otbr", {})
|
|
|
|
websocket_client = await hass_ws_client(hass)
|
|
|
|
await websocket_client.send_json_auto_id(
|
|
|
|
{"type": "otbr/set_channel", "channel": 12}
|
|
|
|
)
|
|
|
|
|
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "not_loaded"
|
|
|
|
|
|
|
|
|
|
|
|
async def test_set_channel_fails(
|
|
|
|
hass: HomeAssistant,
|
|
|
|
aioclient_mock: AiohttpClientMocker,
|
|
|
|
otbr_config_entry_thread,
|
|
|
|
websocket_client,
|
|
|
|
) -> None:
|
|
|
|
"""Test set channel."""
|
|
|
|
with patch(
|
|
|
|
"python_otbr_api.OTBR.set_channel",
|
|
|
|
side_effect=python_otbr_api.OTBRError,
|
|
|
|
):
|
|
|
|
await websocket_client.send_json_auto_id(
|
|
|
|
{"type": "otbr/set_channel", "channel": 12}
|
|
|
|
)
|
|
|
|
msg = await websocket_client.receive_json()
|
|
|
|
|
|
|
|
assert not msg["success"]
|
|
|
|
assert msg["error"]["code"] == "set_channel_failed"
|