core/tests/components/config/test_config_entries.py

676 lines
21 KiB
Python
Raw Normal View History

"""Test config entries API."""
import asyncio
from collections import OrderedDict
from unittest.mock import patch
import pytest
import voluptuous as vol
from homeassistant import config_entries as core_ce, data_entry_flow
from homeassistant.components.config import config_entries
from homeassistant.config_entries import HANDLERS
from homeassistant.core import callback
from homeassistant.generated import config_flows
from homeassistant.setup import async_setup_component
from tests.common import (
2019-07-31 19:25:30 +00:00
MockConfigEntry,
MockModule,
mock_coro_func,
mock_entity_platform,
mock_integration,
2019-07-31 19:25:30 +00:00
)
@pytest.fixture(autouse=True)
def mock_test_component(hass):
"""Ensure a component called 'test' exists."""
2019-07-31 19:25:30 +00:00
mock_integration(hass, MockModule("test"))
@pytest.fixture
def client(hass, hass_client):
"""Fixture that can interact with the config manager API."""
2019-07-31 19:25:30 +00:00
hass.loop.run_until_complete(async_setup_component(hass, "http", {}))
hass.loop.run_until_complete(config_entries.async_setup(hass))
yield hass.loop.run_until_complete(hass_client())
async def test_get_entries(hass, client):
"""Test get entries."""
with patch.dict(HANDLERS, clear=True):
@HANDLERS.register("comp1")
class Comp1ConfigFlow:
"""Config flow with options flow."""
@staticmethod
@callback
def async_get_options_flow(config, options):
"""Get options flow."""
pass
hass.helpers.config_entry_flow.register_discovery_flow(
"comp2", "Comp 2", lambda: None, core_ce.CONN_CLASS_ASSUMED
)
MockConfigEntry(
domain="comp1",
title="Test 1",
source="bla",
connection_class=core_ce.CONN_CLASS_LOCAL_POLL,
).add_to_hass(hass)
MockConfigEntry(
domain="comp2",
title="Test 2",
source="bla2",
state=core_ce.ENTRY_STATE_LOADED,
connection_class=core_ce.CONN_CLASS_ASSUMED,
).add_to_hass(hass)
resp = await client.get("/api/config/config_entries/entry")
assert resp.status == 200
data = await resp.json()
for entry in data:
entry.pop("entry_id")
assert data == [
{
"domain": "comp1",
"title": "Test 1",
"source": "bla",
"state": "not_loaded",
"connection_class": "local_poll",
"supports_options": True,
},
{
"domain": "comp2",
"title": "Test 2",
"source": "bla2",
"state": "loaded",
"connection_class": "assumed",
"supports_options": False,
},
]
@asyncio.coroutine
def test_remove_entry(hass, client):
"""Test removing an entry via the API."""
2019-07-31 19:25:30 +00:00
entry = MockConfigEntry(domain="demo", state=core_ce.ENTRY_STATE_LOADED)
entry.add_to_hass(hass)
resp = yield from client.delete(
2019-07-31 19:25:30 +00:00
"/api/config/config_entries/entry/{}".format(entry.entry_id)
)
assert resp.status == 200
data = yield from resp.json()
2019-07-31 19:25:30 +00:00
assert data == {"require_restart": True}
assert len(hass.config_entries.async_entries()) == 0
async def test_remove_entry_unauth(hass, client, hass_admin_user):
"""Test removing an entry via the API."""
hass_admin_user.groups = []
2019-07-31 19:25:30 +00:00
entry = MockConfigEntry(domain="demo", state=core_ce.ENTRY_STATE_LOADED)
entry.add_to_hass(hass)
resp = await client.delete(
2019-07-31 19:25:30 +00:00
"/api/config/config_entries/entry/{}".format(entry.entry_id)
)
assert resp.status == 401
assert len(hass.config_entries.async_entries()) == 1
@asyncio.coroutine
def test_available_flows(hass, client):
"""Test querying the available flows."""
2019-07-31 19:25:30 +00:00
with patch.object(config_flows, "FLOWS", ["hello", "world"]):
resp = yield from client.get("/api/config/config_entries/flow_handlers")
assert resp.status == 200
data = yield from resp.json()
2019-07-31 19:25:30 +00:00
assert set(data) == set(["hello", "world"])
############################
# FLOW MANAGER API TESTS #
############################
@asyncio.coroutine
def test_initialize_flow(hass, client):
"""Test we can initialize a flow."""
2019-07-31 19:25:30 +00:00
mock_entity_platform(hass, "config_flow.test", None)
class TestFlow(core_ce.ConfigFlow):
@asyncio.coroutine
def async_step_user(self, user_input=None):
schema = OrderedDict()
2019-07-31 19:25:30 +00:00
schema[vol.Required("username")] = str
schema[vol.Required("password")] = str
return self.async_show_form(
2019-07-31 19:25:30 +00:00
step_id="user",
data_schema=schema,
2019-07-31 19:25:30 +00:00
description_placeholders={"url": "https://example.com"},
errors={"username": "Should be unique."},
)
2019-07-31 19:25:30 +00:00
with patch.dict(HANDLERS, {"test": TestFlow}):
resp = yield from client.post(
"/api/config/config_entries/flow", json={"handler": "test"}
)
assert resp.status == 200
data = yield from resp.json()
2019-07-31 19:25:30 +00:00
data.pop("flow_id")
assert data == {
2019-07-31 19:25:30 +00:00
"type": "form",
"handler": "test",
"step_id": "user",
"data_schema": [
{"name": "username", "required": True, "type": "string"},
{"name": "password", "required": True, "type": "string"},
],
2019-07-31 19:25:30 +00:00
"description_placeholders": {"url": "https://example.com"},
"errors": {"username": "Should be unique."},
}
async def test_initialize_flow_unauth(hass, client, hass_admin_user):
"""Test we can initialize a flow."""
hass_admin_user.groups = []
class TestFlow(core_ce.ConfigFlow):
@asyncio.coroutine
def async_step_user(self, user_input=None):
schema = OrderedDict()
2019-07-31 19:25:30 +00:00
schema[vol.Required("username")] = str
schema[vol.Required("password")] = str
return self.async_show_form(
2019-07-31 19:25:30 +00:00
step_id="user",
data_schema=schema,
2019-07-31 19:25:30 +00:00
description_placeholders={"url": "https://example.com"},
errors={"username": "Should be unique."},
)
2019-07-31 19:25:30 +00:00
with patch.dict(HANDLERS, {"test": TestFlow}):
resp = await client.post(
"/api/config/config_entries/flow", json={"handler": "test"}
)
assert resp.status == 401
@asyncio.coroutine
def test_abort(hass, client):
"""Test a flow that aborts."""
2019-07-31 19:25:30 +00:00
mock_entity_platform(hass, "config_flow.test", None)
class TestFlow(core_ce.ConfigFlow):
@asyncio.coroutine
def async_step_user(self, user_input=None):
2019-07-31 19:25:30 +00:00
return self.async_abort(reason="bla")
2019-07-31 19:25:30 +00:00
with patch.dict(HANDLERS, {"test": TestFlow}):
resp = yield from client.post(
"/api/config/config_entries/flow", json={"handler": "test"}
)
assert resp.status == 200
data = yield from resp.json()
2019-07-31 19:25:30 +00:00
data.pop("flow_id")
assert data == {
2019-07-31 19:25:30 +00:00
"description_placeholders": None,
"handler": "test",
"reason": "bla",
"type": "abort",
}
@asyncio.coroutine
def test_create_account(hass, client):
"""Test a flow that creates an account."""
2019-07-31 19:25:30 +00:00
mock_entity_platform(hass, "config_flow.test", None)
2019-07-31 19:25:30 +00:00
mock_integration(hass, MockModule("test", async_setup_entry=mock_coro_func(True)))
class TestFlow(core_ce.ConfigFlow):
VERSION = 1
@asyncio.coroutine
def async_step_user(self, user_input=None):
return self.async_create_entry(
2019-07-31 19:25:30 +00:00
title="Test Entry", data={"secret": "account_token"}
)
2019-07-31 19:25:30 +00:00
with patch.dict(HANDLERS, {"test": TestFlow}):
resp = yield from client.post(
"/api/config/config_entries/flow", json={"handler": "test"}
)
assert resp.status == 200
2019-07-31 19:25:30 +00:00
entries = hass.config_entries.async_entries("test")
assert len(entries) == 1
data = yield from resp.json()
2019-07-31 19:25:30 +00:00
data.pop("flow_id")
assert data == {
2019-07-31 19:25:30 +00:00
"handler": "test",
"title": "Test Entry",
"type": "create_entry",
"version": 1,
"result": entries[0].entry_id,
"description": None,
"description_placeholders": None,
}
@asyncio.coroutine
def test_two_step_flow(hass, client):
"""Test we can finish a two step flow."""
2019-07-31 19:25:30 +00:00
mock_integration(hass, MockModule("test", async_setup_entry=mock_coro_func(True)))
mock_entity_platform(hass, "config_flow.test", None)
class TestFlow(core_ce.ConfigFlow):
VERSION = 1
@asyncio.coroutine
def async_step_user(self, user_input=None):
return self.async_show_form(
2019-07-31 19:25:30 +00:00
step_id="account", data_schema=vol.Schema({"user_title": str})
)
@asyncio.coroutine
def async_step_account(self, user_input=None):
return self.async_create_entry(
2019-07-31 19:25:30 +00:00
title=user_input["user_title"], data={"secret": "account_token"}
)
2019-07-31 19:25:30 +00:00
with patch.dict(HANDLERS, {"test": TestFlow}):
resp = yield from client.post(
"/api/config/config_entries/flow", json={"handler": "test"}
)
assert resp.status == 200
data = yield from resp.json()
2019-07-31 19:25:30 +00:00
flow_id = data.pop("flow_id")
assert data == {
2019-07-31 19:25:30 +00:00
"type": "form",
"handler": "test",
"step_id": "account",
"data_schema": [{"name": "user_title", "type": "string"}],
"description_placeholders": None,
"errors": None,
}
2019-07-31 19:25:30 +00:00
with patch.dict(HANDLERS, {"test": TestFlow}):
resp = yield from client.post(
2019-07-31 19:25:30 +00:00
"/api/config/config_entries/flow/{}".format(flow_id),
json={"user_title": "user-title"},
)
assert resp.status == 200
2019-07-31 19:25:30 +00:00
entries = hass.config_entries.async_entries("test")
assert len(entries) == 1
data = yield from resp.json()
2019-07-31 19:25:30 +00:00
data.pop("flow_id")
assert data == {
2019-07-31 19:25:30 +00:00
"handler": "test",
"type": "create_entry",
"title": "user-title",
"version": 1,
"result": entries[0].entry_id,
"description": None,
"description_placeholders": None,
}
async def test_continue_flow_unauth(hass, client, hass_admin_user):
"""Test we can't finish a two step flow."""
2019-07-31 19:25:30 +00:00
mock_integration(hass, MockModule("test", async_setup_entry=mock_coro_func(True)))
mock_entity_platform(hass, "config_flow.test", None)
class TestFlow(core_ce.ConfigFlow):
VERSION = 1
@asyncio.coroutine
def async_step_user(self, user_input=None):
return self.async_show_form(
2019-07-31 19:25:30 +00:00
step_id="account", data_schema=vol.Schema({"user_title": str})
)
@asyncio.coroutine
def async_step_account(self, user_input=None):
return self.async_create_entry(
2019-07-31 19:25:30 +00:00
title=user_input["user_title"], data={"secret": "account_token"}
)
2019-07-31 19:25:30 +00:00
with patch.dict(HANDLERS, {"test": TestFlow}):
resp = await client.post(
"/api/config/config_entries/flow", json={"handler": "test"}
)
assert resp.status == 200
data = await resp.json()
2019-07-31 19:25:30 +00:00
flow_id = data.pop("flow_id")
assert data == {
2019-07-31 19:25:30 +00:00
"type": "form",
"handler": "test",
"step_id": "account",
"data_schema": [{"name": "user_title", "type": "string"}],
"description_placeholders": None,
"errors": None,
}
hass_admin_user.groups = []
resp = await client.post(
2019-07-31 19:25:30 +00:00
"/api/config/config_entries/flow/{}".format(flow_id),
json={"user_title": "user-title"},
)
assert resp.status == 401
2019-08-20 07:17:52 +00:00
async def test_get_progress_index(hass, hass_ws_client):
"""Test querying for the flows that are in progress."""
2019-08-20 07:17:52 +00:00
assert await async_setup_component(hass, "config", {})
2019-07-31 19:25:30 +00:00
mock_entity_platform(hass, "config_flow.test", None)
2019-08-20 07:17:52 +00:00
ws_client = await hass_ws_client(hass)
class TestFlow(core_ce.ConfigFlow):
VERSION = 5
2019-08-20 07:17:52 +00:00
async def async_step_hassio(self, info):
return await self.async_step_account()
2019-08-20 07:17:52 +00:00
async def async_step_account(self, user_input=None):
2019-07-31 19:25:30 +00:00
return self.async_show_form(step_id="account")
2019-07-31 19:25:30 +00:00
with patch.dict(HANDLERS, {"test": TestFlow}):
2019-08-20 07:17:52 +00:00
form = await hass.config_entries.flow.async_init(
2019-07-31 19:25:30 +00:00
"test", context={"source": "hassio"}
)
2019-08-20 07:17:52 +00:00
await ws_client.send_json({"id": 5, "type": "config_entries/flow/progress"})
response = await ws_client.receive_json()
assert response["success"]
assert response["result"] == [
2019-07-31 19:25:30 +00:00
{"flow_id": form["flow_id"], "handler": "test", "context": {"source": "hassio"}}
]
2019-08-20 07:17:52 +00:00
async def test_get_progress_index_unauth(hass, hass_ws_client, hass_admin_user):
"""Test we can't get flows that are in progress."""
2019-08-20 07:17:52 +00:00
assert await async_setup_component(hass, "config", {})
hass_admin_user.groups = []
2019-08-20 07:17:52 +00:00
ws_client = await hass_ws_client(hass)
await ws_client.send_json({"id": 5, "type": "config_entries/flow/progress"})
response = await ws_client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "unauthorized"
@asyncio.coroutine
def test_get_progress_flow(hass, client):
"""Test we can query the API for same result as we get from init a flow."""
2019-07-31 19:25:30 +00:00
mock_entity_platform(hass, "config_flow.test", None)
class TestFlow(core_ce.ConfigFlow):
@asyncio.coroutine
def async_step_user(self, user_input=None):
schema = OrderedDict()
2019-07-31 19:25:30 +00:00
schema[vol.Required("username")] = str
schema[vol.Required("password")] = str
return self.async_show_form(
2019-07-31 19:25:30 +00:00
step_id="user",
data_schema=schema,
2019-07-31 19:25:30 +00:00
errors={"username": "Should be unique."},
)
2019-07-31 19:25:30 +00:00
with patch.dict(HANDLERS, {"test": TestFlow}):
resp = yield from client.post(
"/api/config/config_entries/flow", json={"handler": "test"}
)
assert resp.status == 200
data = yield from resp.json()
resp2 = yield from client.get(
2019-07-31 19:25:30 +00:00
"/api/config/config_entries/flow/{}".format(data["flow_id"])
)
assert resp2.status == 200
data2 = yield from resp2.json()
assert data == data2
2018-12-13 19:08:31 +00:00
async def test_get_progress_flow_unauth(hass, client, hass_admin_user):
"""Test we can can't query the API for result of flow."""
2019-07-31 19:25:30 +00:00
mock_entity_platform(hass, "config_flow.test", None)
class TestFlow(core_ce.ConfigFlow):
async def async_step_user(self, user_input=None):
schema = OrderedDict()
2019-07-31 19:25:30 +00:00
schema[vol.Required("username")] = str
schema[vol.Required("password")] = str
return self.async_show_form(
2019-07-31 19:25:30 +00:00
step_id="user",
data_schema=schema,
2019-07-31 19:25:30 +00:00
errors={"username": "Should be unique."},
)
2019-07-31 19:25:30 +00:00
with patch.dict(HANDLERS, {"test": TestFlow}):
resp = await client.post(
"/api/config/config_entries/flow", json={"handler": "test"}
)
assert resp.status == 200
data = await resp.json()
hass_admin_user.groups = []
resp2 = await client.get(
2019-07-31 19:25:30 +00:00
"/api/config/config_entries/flow/{}".format(data["flow_id"])
)
assert resp2.status == 401
async def test_options_flow(hass, client):
"""Test we can change options."""
2019-07-31 19:25:30 +00:00
class TestFlow(core_ce.ConfigFlow):
@staticmethod
@callback
2019-08-15 21:11:55 +00:00
def async_get_options_flow(config_entry):
class OptionsFlowHandler(data_entry_flow.FlowHandler):
async def async_step_init(self, user_input=None):
schema = OrderedDict()
2019-07-31 19:25:30 +00:00
schema[vol.Required("enabled")] = bool
return self.async_show_form(
2019-07-31 19:25:30 +00:00
step_id="user",
data_schema=schema,
2019-07-31 19:25:30 +00:00
description_placeholders={"enabled": "Set to true to be true"},
)
2019-07-31 19:25:30 +00:00
2019-08-15 21:11:55 +00:00
return OptionsFlowHandler()
MockConfigEntry(
2019-07-31 19:25:30 +00:00
domain="test",
entry_id="test1",
source="bla",
connection_class=core_ce.CONN_CLASS_LOCAL_POLL,
).add_to_hass(hass)
entry = hass.config_entries._entries[0]
2019-07-31 19:25:30 +00:00
with patch.dict(HANDLERS, {"test": TestFlow}):
2019-08-15 21:11:55 +00:00
url = "/api/config/config_entries/options/flow"
2019-07-31 19:25:30 +00:00
resp = await client.post(url, json={"handler": entry.entry_id})
assert resp.status == 200
data = await resp.json()
2019-07-31 19:25:30 +00:00
data.pop("flow_id")
assert data == {
2019-07-31 19:25:30 +00:00
"type": "form",
"handler": "test1",
"step_id": "user",
"data_schema": [{"name": "enabled", "required": True, "type": "boolean"}],
"description_placeholders": {"enabled": "Set to true to be true"},
"errors": None,
}
async def test_two_step_options_flow(hass, client):
"""Test we can finish a two step options flow."""
2019-07-31 19:25:30 +00:00
mock_integration(hass, MockModule("test", async_setup_entry=mock_coro_func(True)))
class TestFlow(core_ce.ConfigFlow):
@staticmethod
@callback
2019-08-15 21:11:55 +00:00
def async_get_options_flow(config_entry):
class OptionsFlowHandler(data_entry_flow.FlowHandler):
async def async_step_init(self, user_input=None):
return self.async_show_form(
2019-07-31 19:25:30 +00:00
step_id="finish", data_schema=vol.Schema({"enabled": bool})
)
async def async_step_finish(self, user_input=None):
return self.async_create_entry(
2019-07-31 19:25:30 +00:00
title="Enable disable", data=user_input
)
2019-07-31 19:25:30 +00:00
2019-08-15 21:11:55 +00:00
return OptionsFlowHandler()
MockConfigEntry(
2019-07-31 19:25:30 +00:00
domain="test",
entry_id="test1",
source="bla",
connection_class=core_ce.CONN_CLASS_LOCAL_POLL,
).add_to_hass(hass)
entry = hass.config_entries._entries[0]
2019-07-31 19:25:30 +00:00
with patch.dict(HANDLERS, {"test": TestFlow}):
2019-08-15 21:11:55 +00:00
url = "/api/config/config_entries/options/flow"
2019-07-31 19:25:30 +00:00
resp = await client.post(url, json={"handler": entry.entry_id})
assert resp.status == 200
data = await resp.json()
2019-07-31 19:25:30 +00:00
flow_id = data.pop("flow_id")
assert data == {
2019-07-31 19:25:30 +00:00
"type": "form",
"handler": "test1",
"step_id": "finish",
"data_schema": [{"name": "enabled", "type": "boolean"}],
"description_placeholders": None,
"errors": None,
}
2019-07-31 19:25:30 +00:00
with patch.dict(HANDLERS, {"test": TestFlow}):
resp = await client.post(
2019-07-31 19:25:30 +00:00
"/api/config/config_entries/options/flow/{}".format(flow_id),
json={"enabled": True},
)
assert resp.status == 200
data = await resp.json()
2019-07-31 19:25:30 +00:00
data.pop("flow_id")
assert data == {
2019-07-31 19:25:30 +00:00
"handler": "test1",
"type": "create_entry",
"title": "Enable disable",
"version": 1,
"description": None,
"description_placeholders": None,
}
async def test_list_system_options(hass, hass_ws_client):
"""Test that we can list an entries system options."""
assert await async_setup_component(hass, "config", {})
ws_client = await hass_ws_client(hass)
entry = MockConfigEntry(domain="demo")
entry.add_to_hass(hass)
await ws_client.send_json(
{
"id": 5,
"type": "config_entries/system_options/list",
"entry_id": entry.entry_id,
}
)
response = await ws_client.receive_json()
assert response["success"]
assert response["result"] == {"disable_new_entities": False}
async def test_update_system_options(hass, hass_ws_client):
"""Test that we can update system options."""
assert await async_setup_component(hass, "config", {})
ws_client = await hass_ws_client(hass)
entry = MockConfigEntry(domain="demo")
entry.add_to_hass(hass)
await ws_client.send_json(
{
"id": 5,
"type": "config_entries/system_options/update",
"entry_id": entry.entry_id,
"disable_new_entities": True,
}
)
response = await ws_client.receive_json()
assert response["success"]
assert response["result"]["disable_new_entities"]
assert entry.system_options.disable_new_entities
2019-12-18 06:41:01 +00:00
async def test_ignore_flow(hass, hass_ws_client):
"""Test we can ignore a flow."""
assert await async_setup_component(hass, "config", {})
mock_integration(hass, MockModule("test", async_setup_entry=mock_coro_func(True)))
mock_entity_platform(hass, "config_flow.test", None)
class TestFlow(core_ce.ConfigFlow):
VERSION = 1
async def async_step_user(self, user_input=None):
await self.async_set_unique_id("mock-unique-id")
return self.async_show_form(step_id="account", data_schema=vol.Schema({}))
ws_client = await hass_ws_client(hass)
with patch.dict(HANDLERS, {"test": TestFlow}):
result = await hass.config_entries.flow.async_init(
"test", context={"source": "user"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
await ws_client.send_json(
{
"id": 5,
"type": "config_entries/ignore_flow",
"flow_id": result["flow_id"],
}
)
response = await ws_client.receive_json()
assert response["success"]
assert len(hass.config_entries.flow.async_progress()) == 0
entry = hass.config_entries.async_entries("test")[0]
assert entry.source == "ignore"
assert entry.unique_id == "mock-unique-id"