302 lines
12 KiB
Python
302 lines
12 KiB
Python
"""Base module for the KNX integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
from xknx import XKNX
|
|
from xknx.core import XknxConnectionState
|
|
from xknx.core.state_updater import StateTrackerType, TrackerOptions
|
|
from xknx.core.telegram_queue import TelegramQueue
|
|
from xknx.dpt import DPTBase
|
|
from xknx.exceptions import ConversionError, CouldNotParseTelegram
|
|
from xknx.io import ConnectionConfig, ConnectionType, SecureConfig
|
|
from xknx.telegram import AddressFilter, Telegram
|
|
from xknx.telegram.address import DeviceGroupAddress, GroupAddress, InternalGroupAddress
|
|
from xknx.telegram.apci import GroupValueResponse, GroupValueWrite
|
|
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import (
|
|
CONF_EVENT,
|
|
CONF_HOST,
|
|
CONF_PORT,
|
|
CONF_TYPE,
|
|
EVENT_HOMEASSISTANT_STOP,
|
|
)
|
|
from homeassistant.core import Event, HomeAssistant
|
|
from homeassistant.helpers.storage import STORAGE_DIR
|
|
from homeassistant.helpers.typing import ConfigType
|
|
|
|
from .const import (
|
|
CONF_KNX_CONNECTION_TYPE,
|
|
CONF_KNX_INDIVIDUAL_ADDRESS,
|
|
CONF_KNX_KNXKEY_FILENAME,
|
|
CONF_KNX_KNXKEY_PASSWORD,
|
|
CONF_KNX_LOCAL_IP,
|
|
CONF_KNX_MCAST_GRP,
|
|
CONF_KNX_MCAST_PORT,
|
|
CONF_KNX_RATE_LIMIT,
|
|
CONF_KNX_ROUTE_BACK,
|
|
CONF_KNX_ROUTING,
|
|
CONF_KNX_ROUTING_BACKBONE_KEY,
|
|
CONF_KNX_ROUTING_SECURE,
|
|
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE,
|
|
CONF_KNX_SECURE_DEVICE_AUTHENTICATION,
|
|
CONF_KNX_SECURE_USER_ID,
|
|
CONF_KNX_SECURE_USER_PASSWORD,
|
|
CONF_KNX_STATE_UPDATER,
|
|
CONF_KNX_TELEGRAM_LOG_SIZE,
|
|
CONF_KNX_TUNNEL_ENDPOINT_IA,
|
|
CONF_KNX_TUNNELING,
|
|
CONF_KNX_TUNNELING_TCP,
|
|
CONF_KNX_TUNNELING_TCP_SECURE,
|
|
KNX_ADDRESS,
|
|
TELEGRAM_LOG_DEFAULT,
|
|
)
|
|
from .device import KNXInterfaceDevice
|
|
from .expose import KNXExposeSensor, KNXExposeTime
|
|
from .project import KNXProject
|
|
from .storage.config_store import KNXConfigStore
|
|
from .telegrams import Telegrams
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class KNXModule:
|
|
"""Representation of KNX Object."""
|
|
|
|
def __init__(
|
|
self, hass: HomeAssistant, config: ConfigType, entry: ConfigEntry
|
|
) -> None:
|
|
"""Initialize KNX module."""
|
|
self.hass = hass
|
|
self.config_yaml = config
|
|
self.connected = False
|
|
self.exposures: list[KNXExposeSensor | KNXExposeTime] = []
|
|
self.service_exposures: dict[str, KNXExposeSensor | KNXExposeTime] = {}
|
|
self.entry = entry
|
|
|
|
self.project = KNXProject(hass=hass, entry=entry)
|
|
self.config_store = KNXConfigStore(hass=hass, config_entry=entry)
|
|
|
|
default_state_updater = (
|
|
TrackerOptions(tracker_type=StateTrackerType.EXPIRE, update_interval_min=60)
|
|
if self.entry.data[CONF_KNX_STATE_UPDATER]
|
|
else TrackerOptions(
|
|
tracker_type=StateTrackerType.INIT, update_interval_min=60
|
|
)
|
|
)
|
|
self.xknx = XKNX(
|
|
address_format=self.project.get_address_format(),
|
|
connection_config=self.connection_config(),
|
|
rate_limit=self.entry.data[CONF_KNX_RATE_LIMIT],
|
|
state_updater=default_state_updater,
|
|
)
|
|
self.xknx.connection_manager.register_connection_state_changed_cb(
|
|
self.connection_state_changed_cb
|
|
)
|
|
self.telegrams = Telegrams(
|
|
hass=hass,
|
|
xknx=self.xknx,
|
|
project=self.project,
|
|
log_size=entry.data.get(CONF_KNX_TELEGRAM_LOG_SIZE, TELEGRAM_LOG_DEFAULT),
|
|
)
|
|
self.interface_device = KNXInterfaceDevice(
|
|
hass=hass, entry=entry, xknx=self.xknx
|
|
)
|
|
|
|
self._address_filter_transcoder: dict[AddressFilter, type[DPTBase]] = {}
|
|
self.group_address_transcoder: dict[DeviceGroupAddress, type[DPTBase]] = {}
|
|
self.knx_event_callback: TelegramQueue.Callback = self.register_event_callback()
|
|
|
|
self.entry.async_on_unload(
|
|
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.stop)
|
|
)
|
|
|
|
async def start(self) -> None:
|
|
"""Start XKNX object. Connect to tunneling or Routing device."""
|
|
await self.project.load_project(self.xknx)
|
|
await self.config_store.load_data()
|
|
await self.telegrams.load_history()
|
|
await self.xknx.start()
|
|
|
|
async def stop(self, event: Event | None = None) -> None:
|
|
"""Stop XKNX object. Disconnect from tunneling or Routing device."""
|
|
await self.xknx.stop()
|
|
await self.telegrams.save_history()
|
|
|
|
def connection_config(self) -> ConnectionConfig:
|
|
"""Return the connection_config."""
|
|
_conn_type: str = self.entry.data[CONF_KNX_CONNECTION_TYPE]
|
|
_knxkeys_file: str | None = (
|
|
self.hass.config.path(
|
|
STORAGE_DIR,
|
|
self.entry.data[CONF_KNX_KNXKEY_FILENAME],
|
|
)
|
|
if self.entry.data.get(CONF_KNX_KNXKEY_FILENAME) is not None
|
|
else None
|
|
)
|
|
if _conn_type == CONF_KNX_ROUTING:
|
|
return ConnectionConfig(
|
|
connection_type=ConnectionType.ROUTING,
|
|
individual_address=self.entry.data[CONF_KNX_INDIVIDUAL_ADDRESS],
|
|
multicast_group=self.entry.data[CONF_KNX_MCAST_GRP],
|
|
multicast_port=self.entry.data[CONF_KNX_MCAST_PORT],
|
|
local_ip=self.entry.data.get(CONF_KNX_LOCAL_IP),
|
|
auto_reconnect=True,
|
|
secure_config=SecureConfig(
|
|
knxkeys_password=self.entry.data.get(CONF_KNX_KNXKEY_PASSWORD),
|
|
knxkeys_file_path=_knxkeys_file,
|
|
),
|
|
threaded=True,
|
|
)
|
|
if _conn_type == CONF_KNX_TUNNELING:
|
|
return ConnectionConfig(
|
|
connection_type=ConnectionType.TUNNELING,
|
|
gateway_ip=self.entry.data[CONF_HOST],
|
|
gateway_port=self.entry.data[CONF_PORT],
|
|
local_ip=self.entry.data.get(CONF_KNX_LOCAL_IP),
|
|
route_back=self.entry.data.get(CONF_KNX_ROUTE_BACK, False),
|
|
auto_reconnect=True,
|
|
secure_config=SecureConfig(
|
|
knxkeys_password=self.entry.data.get(CONF_KNX_KNXKEY_PASSWORD),
|
|
knxkeys_file_path=_knxkeys_file,
|
|
),
|
|
threaded=True,
|
|
)
|
|
if _conn_type == CONF_KNX_TUNNELING_TCP:
|
|
return ConnectionConfig(
|
|
connection_type=ConnectionType.TUNNELING_TCP,
|
|
individual_address=self.entry.data.get(CONF_KNX_TUNNEL_ENDPOINT_IA),
|
|
gateway_ip=self.entry.data[CONF_HOST],
|
|
gateway_port=self.entry.data[CONF_PORT],
|
|
auto_reconnect=True,
|
|
secure_config=SecureConfig(
|
|
knxkeys_password=self.entry.data.get(CONF_KNX_KNXKEY_PASSWORD),
|
|
knxkeys_file_path=_knxkeys_file,
|
|
),
|
|
threaded=True,
|
|
)
|
|
if _conn_type == CONF_KNX_TUNNELING_TCP_SECURE:
|
|
return ConnectionConfig(
|
|
connection_type=ConnectionType.TUNNELING_TCP_SECURE,
|
|
individual_address=self.entry.data.get(CONF_KNX_TUNNEL_ENDPOINT_IA),
|
|
gateway_ip=self.entry.data[CONF_HOST],
|
|
gateway_port=self.entry.data[CONF_PORT],
|
|
secure_config=SecureConfig(
|
|
user_id=self.entry.data.get(CONF_KNX_SECURE_USER_ID),
|
|
user_password=self.entry.data.get(CONF_KNX_SECURE_USER_PASSWORD),
|
|
device_authentication_password=self.entry.data.get(
|
|
CONF_KNX_SECURE_DEVICE_AUTHENTICATION
|
|
),
|
|
knxkeys_password=self.entry.data.get(CONF_KNX_KNXKEY_PASSWORD),
|
|
knxkeys_file_path=_knxkeys_file,
|
|
),
|
|
auto_reconnect=True,
|
|
threaded=True,
|
|
)
|
|
if _conn_type == CONF_KNX_ROUTING_SECURE:
|
|
return ConnectionConfig(
|
|
connection_type=ConnectionType.ROUTING_SECURE,
|
|
individual_address=self.entry.data[CONF_KNX_INDIVIDUAL_ADDRESS],
|
|
multicast_group=self.entry.data[CONF_KNX_MCAST_GRP],
|
|
multicast_port=self.entry.data[CONF_KNX_MCAST_PORT],
|
|
local_ip=self.entry.data.get(CONF_KNX_LOCAL_IP),
|
|
secure_config=SecureConfig(
|
|
backbone_key=self.entry.data.get(CONF_KNX_ROUTING_BACKBONE_KEY),
|
|
latency_ms=self.entry.data.get(
|
|
CONF_KNX_ROUTING_SYNC_LATENCY_TOLERANCE
|
|
),
|
|
knxkeys_password=self.entry.data.get(CONF_KNX_KNXKEY_PASSWORD),
|
|
knxkeys_file_path=_knxkeys_file,
|
|
),
|
|
auto_reconnect=True,
|
|
threaded=True,
|
|
)
|
|
return ConnectionConfig(
|
|
auto_reconnect=True,
|
|
individual_address=self.entry.data.get(
|
|
CONF_KNX_TUNNEL_ENDPOINT_IA, # may be configured at knxkey upload
|
|
),
|
|
secure_config=SecureConfig(
|
|
knxkeys_password=self.entry.data.get(CONF_KNX_KNXKEY_PASSWORD),
|
|
knxkeys_file_path=_knxkeys_file,
|
|
),
|
|
threaded=True,
|
|
)
|
|
|
|
def connection_state_changed_cb(self, state: XknxConnectionState) -> None:
|
|
"""Call invoked after a KNX connection state change was received."""
|
|
self.connected = state == XknxConnectionState.CONNECTED
|
|
for device in self.xknx.devices:
|
|
device.after_update()
|
|
|
|
def telegram_received_cb(self, telegram: Telegram) -> None:
|
|
"""Call invoked after a KNX telegram was received."""
|
|
# Not all telegrams have serializable data.
|
|
data: int | tuple[int, ...] | None = None
|
|
value = None
|
|
if (
|
|
isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse))
|
|
and telegram.payload.value is not None
|
|
and isinstance(
|
|
telegram.destination_address, (GroupAddress, InternalGroupAddress)
|
|
)
|
|
):
|
|
data = telegram.payload.value.value
|
|
if transcoder := (
|
|
self.group_address_transcoder.get(telegram.destination_address)
|
|
or next(
|
|
(
|
|
_transcoder
|
|
for _filter, _transcoder in self._address_filter_transcoder.items()
|
|
if _filter.match(telegram.destination_address)
|
|
),
|
|
None,
|
|
)
|
|
):
|
|
try:
|
|
value = transcoder.from_knx(telegram.payload.value)
|
|
except (ConversionError, CouldNotParseTelegram) as err:
|
|
_LOGGER.warning(
|
|
(
|
|
"Error in `knx_event` at decoding type '%s' from"
|
|
" telegram %s\n%s"
|
|
),
|
|
transcoder.__name__,
|
|
telegram,
|
|
err,
|
|
)
|
|
|
|
self.hass.bus.async_fire(
|
|
"knx_event",
|
|
{
|
|
"data": data,
|
|
"destination": str(telegram.destination_address),
|
|
"direction": telegram.direction.value,
|
|
"value": value,
|
|
"source": str(telegram.source_address),
|
|
"telegramtype": telegram.payload.__class__.__name__,
|
|
},
|
|
)
|
|
|
|
def register_event_callback(self) -> TelegramQueue.Callback:
|
|
"""Register callback for knx_event within XKNX TelegramQueue."""
|
|
address_filters = []
|
|
for filter_set in self.config_yaml[CONF_EVENT]:
|
|
_filters = list(map(AddressFilter, filter_set[KNX_ADDRESS]))
|
|
address_filters.extend(_filters)
|
|
if (dpt := filter_set.get(CONF_TYPE)) and (
|
|
transcoder := DPTBase.parse_transcoder(dpt)
|
|
):
|
|
self._address_filter_transcoder.update(
|
|
dict.fromkeys(_filters, transcoder)
|
|
)
|
|
|
|
return self.xknx.telegram_queue.register_telegram_received_cb(
|
|
self.telegram_received_cb,
|
|
address_filters=address_filters,
|
|
group_addresses=[],
|
|
match_for_outgoing=True,
|
|
)
|