core/homeassistant/components/knx/config_flow.py

694 lines
27 KiB
Python

"""Config flow for KNX."""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator
from typing import Any, Final
import voluptuous as vol
from xknx import XKNX
from xknx.exceptions.exception import CommunicationError, InvalidSecureConfiguration
from xknx.io import DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT
from xknx.io.gateway_scanner import GatewayDescriptor, GatewayScanner
from xknx.io.self_description import request_description
from xknx.secure import load_keyring
from homeassistant.config_entries import ConfigEntry, ConfigFlow, OptionsFlow
from homeassistant.const import CONF_HOST, CONF_PORT
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowHandler, FlowResult
from homeassistant.helpers import selector
from homeassistant.helpers.storage import STORAGE_DIR
from homeassistant.helpers.typing import UNDEFINED
from .const import (
CONF_KNX_AUTOMATIC,
CONF_KNX_CONNECTION_TYPE,
CONF_KNX_DEFAULT_RATE_LIMIT,
CONF_KNX_DEFAULT_STATE_UPDATER,
CONF_KNX_INDIVIDUAL_ADDRESS,
CONF_KNX_KNXKEY_FILENAME,
CONF_KNX_KNXKEY_PASSWORD,
CONF_KNX_LOCAL_IP,
CONF_KNX_MCAST_GRP,
CONF_KNX_MCAST_PORT,
CONF_KNX_RATE_LIMIT,
CONF_KNX_ROUTE_BACK,
CONF_KNX_ROUTING,
CONF_KNX_ROUTING_BACKBONE_KEY,
CONF_KNX_ROUTING_SECURE,
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE,
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
CONF_KNX_SECURE_USER_ID,
CONF_KNX_SECURE_USER_PASSWORD,
CONF_KNX_STATE_UPDATER,
CONF_KNX_TUNNELING,
CONF_KNX_TUNNELING_TCP,
CONF_KNX_TUNNELING_TCP_SECURE,
CONST_KNX_STORAGE_KEY,
DEFAULT_ROUTING_IA,
DOMAIN,
KNXConfigEntryData,
)
from .schema import ia_validator, ip_v4_validator
CONF_KNX_GATEWAY: Final = "gateway"
CONF_MAX_RATE_LIMIT: Final = 60
DEFAULT_ENTRY_DATA = KNXConfigEntryData(
individual_address=DEFAULT_ROUTING_IA,
local_ip=None,
multicast_group=DEFAULT_MCAST_GRP,
multicast_port=DEFAULT_MCAST_PORT,
rate_limit=CONF_KNX_DEFAULT_RATE_LIMIT,
route_back=False,
state_updater=CONF_KNX_DEFAULT_STATE_UPDATER,
)
CONF_KNX_TUNNELING_TYPE: Final = "tunneling_type"
CONF_KNX_TUNNELING_TYPE_LABELS: Final = {
CONF_KNX_TUNNELING: "UDP (Tunnelling v1)",
CONF_KNX_TUNNELING_TCP: "TCP (Tunnelling v2)",
CONF_KNX_TUNNELING_TCP_SECURE: "Secure Tunnelling (TCP)",
}
OPTION_MANUAL_TUNNEL: Final = "Manual"
_IA_SELECTOR = selector.TextSelector()
_IP_SELECTOR = selector.TextSelector()
_PORT_SELECTOR = vol.All(
selector.NumberSelector(
selector.NumberSelectorConfig(
min=1, max=65535, mode=selector.NumberSelectorMode.BOX
),
),
vol.Coerce(int),
)
class KNXCommonFlow(ABC, FlowHandler):
"""Base class for KNX flows."""
def __init__(self, initial_data: KNXConfigEntryData) -> None:
"""Initialize KNXCommonFlow."""
self.initial_data = initial_data
self.new_entry_data = KNXConfigEntryData()
self._found_gateways: list[GatewayDescriptor] = []
self._found_tunnels: list[GatewayDescriptor] = []
self._selected_tunnel: GatewayDescriptor | None = None
self._gatewayscanner: GatewayScanner | None = None
self._async_scan_gen: AsyncGenerator[GatewayDescriptor, None] | None = None
@abstractmethod
def finish_flow(self, title: str) -> FlowResult:
"""Finish the flow."""
async def async_step_connection_type(
self, user_input: dict | None = None
) -> FlowResult:
"""Handle connection type configuration."""
if user_input is not None:
if self._async_scan_gen:
await self._async_scan_gen.aclose() # stop the scan
self._async_scan_gen = None
if self._gatewayscanner:
self._found_gateways = list(
self._gatewayscanner.found_gateways.values()
)
connection_type = user_input[CONF_KNX_CONNECTION_TYPE]
if connection_type == CONF_KNX_ROUTING:
return await self.async_step_routing()
if connection_type == CONF_KNX_TUNNELING:
self._found_tunnels = [
gateway
for gateway in self._found_gateways
if gateway.supports_tunnelling
]
self._found_tunnels.sort(
key=lambda tunnel: tunnel.individual_address.raw
if tunnel.individual_address
else 0
)
return await self.async_step_tunnel()
# Automatic connection type
self.new_entry_data = KNXConfigEntryData(connection_type=CONF_KNX_AUTOMATIC)
return self.finish_flow(title=CONF_KNX_AUTOMATIC.capitalize())
supported_connection_types = {
CONF_KNX_TUNNELING: CONF_KNX_TUNNELING.capitalize(),
CONF_KNX_ROUTING: CONF_KNX_ROUTING.capitalize(),
}
if isinstance(self, OptionsFlow) and (knx_module := self.hass.data.get(DOMAIN)):
xknx = knx_module.xknx
else:
xknx = XKNX()
self._gatewayscanner = GatewayScanner(
xknx, stop_on_found=0, timeout_in_seconds=2
)
# keep a reference to the generator to scan in background until user selects a connection type
self._async_scan_gen = self._gatewayscanner.async_scan()
try:
await self._async_scan_gen.__anext__()
except StopAsyncIteration:
pass # scan finished, no interfaces discovered
else:
# add automatic at first position only if a gateway responded
supported_connection_types = {
CONF_KNX_AUTOMATIC: CONF_KNX_AUTOMATIC.capitalize()
} | supported_connection_types
fields = {
vol.Required(CONF_KNX_CONNECTION_TYPE): vol.In(supported_connection_types)
}
return self.async_show_form(
step_id="connection_type", data_schema=vol.Schema(fields)
)
async def async_step_tunnel(self, user_input: dict | None = None) -> FlowResult:
"""Select a tunnel from a list. Will be skipped if the gateway scan was unsuccessful or if only one gateway was found."""
if user_input is not None:
if user_input[CONF_KNX_GATEWAY] == OPTION_MANUAL_TUNNEL:
if self._found_tunnels:
self._selected_tunnel = self._found_tunnels[0]
return await self.async_step_manual_tunnel()
self._selected_tunnel = next(
tunnel
for tunnel in self._found_tunnels
if user_input[CONF_KNX_GATEWAY] == str(tunnel)
)
connection_type = (
CONF_KNX_TUNNELING_TCP_SECURE
if self._selected_tunnel.tunnelling_requires_secure
else CONF_KNX_TUNNELING_TCP
if self._selected_tunnel.supports_tunnelling_tcp
else CONF_KNX_TUNNELING
)
self.new_entry_data = KNXConfigEntryData(
host=self._selected_tunnel.ip_addr,
port=self._selected_tunnel.port,
route_back=False,
connection_type=connection_type,
)
if connection_type == CONF_KNX_TUNNELING_TCP_SECURE:
return self.async_show_menu(
step_id="secure_key_source",
menu_options=["secure_knxkeys", "secure_tunnel_manual"],
)
return self.finish_flow(title=f"Tunneling @ {self._selected_tunnel}")
if not self._found_tunnels:
return await self.async_step_manual_tunnel()
errors: dict = {}
tunnel_options = {
str(tunnel): f"{tunnel}{' 🔐' if tunnel.tunnelling_requires_secure else ''}"
for tunnel in self._found_tunnels
}
tunnel_options |= {OPTION_MANUAL_TUNNEL: OPTION_MANUAL_TUNNEL}
fields = {vol.Required(CONF_KNX_GATEWAY): vol.In(tunnel_options)}
return self.async_show_form(
step_id="tunnel", data_schema=vol.Schema(fields), errors=errors
)
async def async_step_manual_tunnel(
self, user_input: dict | None = None
) -> FlowResult:
"""Manually configure tunnel connection parameters. Fields default to preselected gateway if one was found."""
errors: dict = {}
if user_input is not None:
try:
_host = ip_v4_validator(user_input[CONF_HOST], multicast=False)
except vol.Invalid:
errors[CONF_HOST] = "invalid_ip_address"
if _local_ip := user_input.get(CONF_KNX_LOCAL_IP):
try:
_local_ip = ip_v4_validator(_local_ip, multicast=False)
except vol.Invalid:
errors[CONF_KNX_LOCAL_IP] = "invalid_ip_address"
selected_tunnelling_type = user_input[CONF_KNX_TUNNELING_TYPE]
if not errors:
try:
self._selected_tunnel = await request_description(
gateway_ip=_host,
gateway_port=user_input[CONF_PORT],
local_ip=_local_ip,
route_back=user_input[CONF_KNX_ROUTE_BACK],
)
except CommunicationError:
errors["base"] = "cannot_connect"
else:
if bool(self._selected_tunnel.tunnelling_requires_secure) is not (
selected_tunnelling_type == CONF_KNX_TUNNELING_TCP_SECURE
):
errors[CONF_KNX_TUNNELING_TYPE] = "unsupported_tunnel_type"
elif (
selected_tunnelling_type == CONF_KNX_TUNNELING_TCP
and not self._selected_tunnel.supports_tunnelling_tcp
):
errors[CONF_KNX_TUNNELING_TYPE] = "unsupported_tunnel_type"
if not errors:
self.new_entry_data = KNXConfigEntryData(
connection_type=selected_tunnelling_type,
host=_host,
port=user_input[CONF_PORT],
route_back=user_input[CONF_KNX_ROUTE_BACK],
local_ip=_local_ip,
)
if selected_tunnelling_type == CONF_KNX_TUNNELING_TCP_SECURE:
return self.async_show_menu(
step_id="secure_key_source",
menu_options=["secure_knxkeys", "secure_tunnel_manual"],
)
return self.finish_flow(title=f"Tunneling @ {_host}")
_reconfiguring_existing_tunnel = (
self.initial_data.get(CONF_KNX_CONNECTION_TYPE)
in CONF_KNX_TUNNELING_TYPE_LABELS
)
if ( # initial attempt on ConfigFlow or coming from automatic / routing
(isinstance(self, ConfigFlow) or not _reconfiguring_existing_tunnel)
and not user_input
and self._selected_tunnel is not None
): # default to first found tunnel
ip_address = self._selected_tunnel.ip_addr
port = self._selected_tunnel.port
if self._selected_tunnel.tunnelling_requires_secure:
default_type = CONF_KNX_TUNNELING_TCP_SECURE
elif self._selected_tunnel.supports_tunnelling_tcp:
default_type = CONF_KNX_TUNNELING_TCP
else:
default_type = CONF_KNX_TUNNELING
else: # OptionFlow, no tunnel discovered or user input
ip_address = (
user_input[CONF_HOST]
if user_input
else self.initial_data.get(CONF_HOST)
)
port = (
user_input[CONF_PORT]
if user_input
else self.initial_data.get(CONF_PORT, DEFAULT_MCAST_PORT)
)
default_type = (
user_input[CONF_KNX_TUNNELING_TYPE]
if user_input
else self.initial_data[CONF_KNX_CONNECTION_TYPE]
if _reconfiguring_existing_tunnel
else CONF_KNX_TUNNELING
)
_route_back: bool = self.initial_data.get(
CONF_KNX_ROUTE_BACK, not bool(self._selected_tunnel)
)
fields = {
vol.Required(CONF_KNX_TUNNELING_TYPE, default=default_type): vol.In(
CONF_KNX_TUNNELING_TYPE_LABELS
),
vol.Required(CONF_HOST, default=ip_address): _IP_SELECTOR,
vol.Required(CONF_PORT, default=port): _PORT_SELECTOR,
vol.Required(
CONF_KNX_ROUTE_BACK, default=_route_back
): selector.BooleanSelector(),
}
if self.show_advanced_options:
fields[vol.Optional(CONF_KNX_LOCAL_IP)] = _IP_SELECTOR
if not self._found_tunnels and not errors.get("base"):
errors["base"] = "no_tunnel_discovered"
return self.async_show_form(
step_id="manual_tunnel", data_schema=vol.Schema(fields), errors=errors
)
async def async_step_secure_tunnel_manual(
self, user_input: dict | None = None
) -> FlowResult:
"""Configure ip secure tunnelling manually."""
errors: dict = {}
if user_input is not None:
self.new_entry_data |= KNXConfigEntryData(
device_authentication=user_input[CONF_KNX_SECURE_DEVICE_AUTHENTICATION],
user_id=user_input[CONF_KNX_SECURE_USER_ID],
user_password=user_input[CONF_KNX_SECURE_USER_PASSWORD],
)
return self.finish_flow(
title=f"Secure Tunneling @ {self.new_entry_data[CONF_HOST]}"
)
fields = {
vol.Required(
CONF_KNX_SECURE_USER_ID,
default=self.initial_data.get(CONF_KNX_SECURE_USER_ID, 2),
): vol.All(
selector.NumberSelector(
selector.NumberSelectorConfig(
min=1, max=127, mode=selector.NumberSelectorMode.BOX
),
),
vol.Coerce(int),
),
vol.Required(
CONF_KNX_SECURE_USER_PASSWORD,
default=self.initial_data.get(CONF_KNX_SECURE_USER_PASSWORD),
): selector.TextSelector(
selector.TextSelectorConfig(type=selector.TextSelectorType.PASSWORD),
),
vol.Required(
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
default=self.initial_data.get(CONF_KNX_SECURE_DEVICE_AUTHENTICATION),
): selector.TextSelector(
selector.TextSelectorConfig(type=selector.TextSelectorType.PASSWORD),
),
}
return self.async_show_form(
step_id="secure_tunnel_manual",
data_schema=vol.Schema(fields),
errors=errors,
)
async def async_step_secure_routing_manual(
self, user_input: dict | None = None
) -> FlowResult:
"""Configure ip secure routing manually."""
errors: dict = {}
if user_input is not None:
try:
key_bytes = bytes.fromhex(user_input[CONF_KNX_ROUTING_BACKBONE_KEY])
if len(key_bytes) != 16:
raise ValueError
except ValueError:
errors[CONF_KNX_ROUTING_BACKBONE_KEY] = "invalid_backbone_key"
if not errors:
self.new_entry_data |= KNXConfigEntryData(
backbone_key=user_input[CONF_KNX_ROUTING_BACKBONE_KEY],
sync_latency_tolerance=user_input[
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE
],
)
return self.finish_flow(
title=(
"Secure Routing as"
f" {self.new_entry_data[CONF_KNX_INDIVIDUAL_ADDRESS]}"
)
)
fields = {
vol.Required(
CONF_KNX_ROUTING_BACKBONE_KEY,
default=self.initial_data.get(CONF_KNX_ROUTING_BACKBONE_KEY),
): selector.TextSelector(
selector.TextSelectorConfig(type=selector.TextSelectorType.PASSWORD),
),
vol.Required(
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE,
default=self.initial_data.get(CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE)
or 1000,
): vol.All(
selector.NumberSelector(
selector.NumberSelectorConfig(
min=400,
max=4000,
unit_of_measurement="ms",
mode=selector.NumberSelectorMode.BOX,
),
),
vol.Coerce(int),
),
}
return self.async_show_form(
step_id="secure_routing_manual",
data_schema=vol.Schema(fields),
errors=errors,
)
async def async_step_secure_knxkeys(
self, user_input: dict | None = None
) -> FlowResult:
"""Configure secure knxkeys used to authenticate."""
errors = {}
description_placeholders = {}
if user_input is not None:
connection_type = self.new_entry_data[CONF_KNX_CONNECTION_TYPE]
storage_key = CONST_KNX_STORAGE_KEY + user_input[CONF_KNX_KNXKEY_FILENAME]
try:
keyring = await load_keyring(
path=self.hass.config.path(STORAGE_DIR, storage_key),
password=user_input[CONF_KNX_KNXKEY_PASSWORD],
)
except FileNotFoundError:
errors[CONF_KNX_KNXKEY_FILENAME] = "keyfile_not_found"
except InvalidSecureConfiguration:
errors[CONF_KNX_KNXKEY_PASSWORD] = "keyfile_invalid_signature"
else:
if (
connection_type == CONF_KNX_TUNNELING_TCP_SECURE
and self._selected_tunnel is not None
):
tunnel_endpoints = []
if host_ia := self._selected_tunnel.individual_address:
tunnel_endpoints = keyring.get_tunnel_interfaces_by_host(
host=host_ia
)
if not tunnel_endpoints:
errors["base"] = "keyfile_no_tunnel_for_host"
description_placeholders = {CONF_HOST: str(host_ia)}
if connection_type == CONF_KNX_ROUTING_SECURE:
if not (keyring.backbone is not None and keyring.backbone.key):
errors["base"] = "keyfile_no_backbone_key"
if not errors:
self.new_entry_data |= KNXConfigEntryData(
knxkeys_filename=storage_key,
knxkeys_password=user_input[CONF_KNX_KNXKEY_PASSWORD],
backbone_key=None,
sync_latency_tolerance=None,
device_authentication=None,
user_id=None,
user_password=None,
)
if connection_type == CONF_KNX_ROUTING_SECURE:
title = (
"Secure Routing as"
f" {self.new_entry_data[CONF_KNX_INDIVIDUAL_ADDRESS]}"
)
else:
title = f"Secure Tunneling @ {self.new_entry_data[CONF_HOST]}"
return self.finish_flow(title=title)
if _default_filename := self.initial_data.get(CONF_KNX_KNXKEY_FILENAME):
_default_filename = _default_filename.lstrip(CONST_KNX_STORAGE_KEY)
fields = {
vol.Required(
CONF_KNX_KNXKEY_FILENAME, default=_default_filename
): selector.TextSelector(),
vol.Required(
CONF_KNX_KNXKEY_PASSWORD,
default=self.initial_data.get(CONF_KNX_KNXKEY_PASSWORD),
): selector.TextSelector(),
}
return self.async_show_form(
step_id="secure_knxkeys",
data_schema=vol.Schema(fields),
errors=errors,
description_placeholders=description_placeholders,
)
async def async_step_routing(self, user_input: dict | None = None) -> FlowResult:
"""Routing setup."""
errors: dict = {}
_individual_address = (
user_input[CONF_KNX_INDIVIDUAL_ADDRESS]
if user_input
else self.initial_data[CONF_KNX_INDIVIDUAL_ADDRESS]
)
_multicast_group = (
user_input[CONF_KNX_MCAST_GRP]
if user_input
else self.initial_data[CONF_KNX_MCAST_GRP]
)
_multicast_port = (
user_input[CONF_KNX_MCAST_PORT]
if user_input
else self.initial_data[CONF_KNX_MCAST_PORT]
)
if user_input is not None:
try:
ia_validator(_individual_address)
except vol.Invalid:
errors[CONF_KNX_INDIVIDUAL_ADDRESS] = "invalid_individual_address"
try:
ip_v4_validator(_multicast_group, multicast=True)
except vol.Invalid:
errors[CONF_KNX_MCAST_GRP] = "invalid_ip_address"
if _local_ip := user_input.get(CONF_KNX_LOCAL_IP):
try:
ip_v4_validator(_local_ip, multicast=False)
except vol.Invalid:
errors[CONF_KNX_LOCAL_IP] = "invalid_ip_address"
if not errors:
connection_type = (
CONF_KNX_ROUTING_SECURE
if user_input[CONF_KNX_ROUTING_SECURE]
else CONF_KNX_ROUTING
)
self.new_entry_data = KNXConfigEntryData(
connection_type=connection_type,
individual_address=_individual_address,
multicast_group=_multicast_group,
multicast_port=_multicast_port,
local_ip=_local_ip,
)
if connection_type == CONF_KNX_ROUTING_SECURE:
return self.async_show_menu(
step_id="secure_key_source",
menu_options=["secure_knxkeys", "secure_routing_manual"],
)
return self.finish_flow(title=f"Routing as {_individual_address}")
routers = [router for router in self._found_gateways if router.supports_routing]
if not routers:
errors["base"] = "no_router_discovered"
default_secure_routing_enable = any(
router for router in routers if router.routing_requires_secure
)
fields = {
vol.Required(
CONF_KNX_INDIVIDUAL_ADDRESS, default=_individual_address
): _IA_SELECTOR,
vol.Required(
CONF_KNX_ROUTING_SECURE, default=default_secure_routing_enable
): selector.BooleanSelector(),
vol.Required(CONF_KNX_MCAST_GRP, default=_multicast_group): _IP_SELECTOR,
vol.Required(CONF_KNX_MCAST_PORT, default=_multicast_port): _PORT_SELECTOR,
}
if self.show_advanced_options:
# Optional with default doesn't work properly in flow UI
fields[vol.Optional(CONF_KNX_LOCAL_IP)] = _IP_SELECTOR
return self.async_show_form(
step_id="routing", data_schema=vol.Schema(fields), errors=errors
)
class KNXConfigFlow(KNXCommonFlow, ConfigFlow, domain=DOMAIN):
"""Handle a KNX config flow."""
VERSION = 1
def __init__(self) -> None:
"""Initialize KNX options flow."""
super().__init__(initial_data=DEFAULT_ENTRY_DATA)
@staticmethod
@callback
def async_get_options_flow(config_entry: ConfigEntry) -> KNXOptionsFlow:
"""Get the options flow for this handler."""
return KNXOptionsFlow(config_entry)
@callback
def finish_flow(self, title: str) -> FlowResult:
"""Create the ConfigEntry."""
return self.async_create_entry(
title=title,
data=DEFAULT_ENTRY_DATA | self.new_entry_data,
)
async def async_step_user(self, user_input: dict | None = None) -> FlowResult:
"""Handle a flow initialized by the user."""
if self._async_current_entries():
return self.async_abort(reason="single_instance_allowed")
return await self.async_step_connection_type()
class KNXOptionsFlow(KNXCommonFlow, OptionsFlow):
"""Handle KNX options."""
general_settings: dict
def __init__(self, config_entry: ConfigEntry) -> None:
"""Initialize KNX options flow."""
self.config_entry = config_entry
super().__init__(initial_data=config_entry.data) # type: ignore[arg-type]
@callback
def finish_flow(self, title: str | None) -> FlowResult:
"""Update the ConfigEntry and finish the flow."""
new_data = DEFAULT_ENTRY_DATA | self.initial_data | self.new_entry_data
self.hass.config_entries.async_update_entry(
self.config_entry,
data=new_data,
title=title or UNDEFINED,
)
return self.async_create_entry(title="", data={})
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Manage KNX options."""
return self.async_show_menu(
step_id="options_init",
menu_options=["connection_type", "communication_settings"],
)
async def async_step_communication_settings(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Manage KNX communication settings."""
if user_input is not None:
self.new_entry_data = KNXConfigEntryData(
state_updater=user_input[CONF_KNX_STATE_UPDATER],
rate_limit=user_input[CONF_KNX_RATE_LIMIT],
)
return self.finish_flow(title=None)
data_schema = {
vol.Required(
CONF_KNX_STATE_UPDATER,
default=self.initial_data.get(
CONF_KNX_STATE_UPDATER,
CONF_KNX_DEFAULT_STATE_UPDATER,
),
): selector.BooleanSelector(),
vol.Required(
CONF_KNX_RATE_LIMIT,
default=self.initial_data.get(
CONF_KNX_RATE_LIMIT,
CONF_KNX_DEFAULT_RATE_LIMIT,
),
): vol.All(
selector.NumberSelector(
selector.NumberSelectorConfig(
min=0,
max=CONF_MAX_RATE_LIMIT,
mode=selector.NumberSelectorMode.BOX,
),
),
vol.Coerce(int),
),
}
return self.async_show_form(
step_id="communication_settings",
data_schema=vol.Schema(data_schema),
last_step=True,
)