Type check KNX integration __init__ and knx_entity (#48044)
parent
d2d78d6205
commit
8fa935234a
|
@ -24,16 +24,17 @@ from homeassistant.const import (
|
|||
EVENT_HOMEASSISTANT_STOP,
|
||||
SERVICE_RELOAD,
|
||||
)
|
||||
from homeassistant.core import ServiceCall
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers import discovery
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.entity_platform import async_get_platforms
|
||||
from homeassistant.helpers.reload import async_integration_yaml_config
|
||||
from homeassistant.helpers.service import async_register_admin_service
|
||||
from homeassistant.helpers.typing import ServiceCallType
|
||||
from homeassistant.helpers.typing import ConfigType, EventType, HomeAssistantType
|
||||
|
||||
from .const import DOMAIN, KNX_ADDRESS, SupportedPlatforms
|
||||
from .expose import create_knx_exposure
|
||||
from .expose import KNXExposeSensor, KNXExposeTime, create_knx_exposure
|
||||
from .factory import create_knx_device
|
||||
from .schema import (
|
||||
BinarySensorSchema,
|
||||
|
@ -211,8 +212,8 @@ SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA = vol.Any(
|
|||
)
|
||||
|
||||
|
||||
async def async_setup(hass, config):
|
||||
"""Set up the KNX component."""
|
||||
async def async_setup(hass: HomeAssistantType, config: ConfigType) -> bool:
|
||||
"""Set up the KNX integration."""
|
||||
try:
|
||||
knx_module = KNXModule(hass, config)
|
||||
hass.data[DOMAIN] = knx_module
|
||||
|
@ -270,7 +271,7 @@ async def async_setup(hass, config):
|
|||
schema=SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA,
|
||||
)
|
||||
|
||||
async def reload_service_handler(service_call: ServiceCallType) -> None:
|
||||
async def reload_service_handler(service_call: ServiceCall) -> None:
|
||||
"""Remove all KNX components and load new ones from config."""
|
||||
|
||||
# First check for config file. If for some reason it is no longer there
|
||||
|
@ -298,19 +299,19 @@ async def async_setup(hass, config):
|
|||
class KNXModule:
|
||||
"""Representation of KNX Object."""
|
||||
|
||||
def __init__(self, hass, config):
|
||||
"""Initialize of KNX module."""
|
||||
def __init__(self, hass: HomeAssistantType, config: ConfigType) -> None:
|
||||
"""Initialize KNX module."""
|
||||
self.hass = hass
|
||||
self.config = config
|
||||
self.connected = False
|
||||
self.exposures = []
|
||||
self.service_exposures = {}
|
||||
self.exposures: list[KNXExposeSensor | KNXExposeTime] = []
|
||||
self.service_exposures: dict[str, KNXExposeSensor | KNXExposeTime] = {}
|
||||
|
||||
self.init_xknx()
|
||||
self._knx_event_callback: TelegramQueue.Callback = self.register_callback()
|
||||
|
||||
def init_xknx(self):
|
||||
"""Initialize of KNX object."""
|
||||
def init_xknx(self) -> None:
|
||||
"""Initialize XKNX object."""
|
||||
self.xknx = XKNX(
|
||||
config=self.config_file(),
|
||||
own_address=self.config[DOMAIN][CONF_KNX_INDIVIDUAL_ADDRESS],
|
||||
|
@ -321,26 +322,26 @@ class KNXModule:
|
|||
state_updater=self.config[DOMAIN][CONF_KNX_STATE_UPDATER],
|
||||
)
|
||||
|
||||
async def start(self):
|
||||
"""Start KNX object. Connect to tunneling or Routing device."""
|
||||
async def start(self) -> None:
|
||||
"""Start XKNX object. Connect to tunneling or Routing device."""
|
||||
await self.xknx.start()
|
||||
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.stop)
|
||||
self.connected = True
|
||||
|
||||
async def stop(self, event):
|
||||
"""Stop KNX object. Disconnect from tunneling or Routing device."""
|
||||
async def stop(self, event: EventType) -> None:
|
||||
"""Stop XKNX object. Disconnect from tunneling or Routing device."""
|
||||
await self.xknx.stop()
|
||||
|
||||
def config_file(self):
|
||||
def config_file(self) -> str | None:
|
||||
"""Resolve and return the full path of xknx.yaml if configured."""
|
||||
config_file = self.config[DOMAIN].get(CONF_KNX_CONFIG)
|
||||
if not config_file:
|
||||
return None
|
||||
if not config_file.startswith("/"):
|
||||
return self.hass.config.path(config_file)
|
||||
return config_file
|
||||
return config_file # type: ignore
|
||||
|
||||
def connection_config(self):
|
||||
def connection_config(self) -> ConnectionConfig:
|
||||
"""Return the connection_config."""
|
||||
if CONF_KNX_TUNNELING in self.config[DOMAIN]:
|
||||
return self.connection_config_tunneling()
|
||||
|
@ -349,7 +350,7 @@ class KNXModule:
|
|||
# config from xknx.yaml always has priority later on
|
||||
return ConnectionConfig(auto_reconnect=True)
|
||||
|
||||
def connection_config_routing(self):
|
||||
def connection_config_routing(self) -> ConnectionConfig:
|
||||
"""Return the connection_config if routing is configured."""
|
||||
local_ip = None
|
||||
# all configuration values are optional
|
||||
|
@ -361,7 +362,7 @@ class KNXModule:
|
|||
connection_type=ConnectionType.ROUTING, local_ip=local_ip
|
||||
)
|
||||
|
||||
def connection_config_tunneling(self):
|
||||
def connection_config_tunneling(self) -> ConnectionConfig:
|
||||
"""Return the connection_config if tunneling is configured."""
|
||||
gateway_ip = self.config[DOMAIN][CONF_KNX_TUNNELING][CONF_HOST]
|
||||
gateway_port = self.config[DOMAIN][CONF_KNX_TUNNELING][CONF_PORT]
|
||||
|
@ -380,12 +381,14 @@ class KNXModule:
|
|||
auto_reconnect=True,
|
||||
)
|
||||
|
||||
async def telegram_received_cb(self, telegram):
|
||||
async def telegram_received_cb(self, telegram: Telegram) -> None:
|
||||
"""Call invoked after a KNX telegram was received."""
|
||||
data = None
|
||||
|
||||
# Not all telegrams have serializable data.
|
||||
if isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse)):
|
||||
if (
|
||||
isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse))
|
||||
and telegram.payload.value is not None
|
||||
):
|
||||
data = telegram.payload.value.value
|
||||
|
||||
self.hass.bus.async_fire(
|
||||
|
@ -404,16 +407,16 @@ class KNXModule:
|
|||
address_filters = list(
|
||||
map(AddressFilter, self.config[DOMAIN][CONF_KNX_EVENT_FILTER])
|
||||
)
|
||||
return self.xknx.telegram_queue.register_telegram_received_cb(
|
||||
return self.xknx.telegram_queue.register_telegram_received_cb( # type: ignore[no-any-return]
|
||||
self.telegram_received_cb,
|
||||
address_filters=address_filters,
|
||||
group_addresses=[],
|
||||
match_for_outgoing=True,
|
||||
)
|
||||
|
||||
async def service_event_register_modify(self, call):
|
||||
async def service_event_register_modify(self, call: ServiceCall) -> None:
|
||||
"""Service for adding or removing a GroupAddress to the knx_event filter."""
|
||||
attr_address = call.data.get(KNX_ADDRESS)
|
||||
attr_address = call.data[KNX_ADDRESS]
|
||||
group_addresses = map(GroupAddress, attr_address)
|
||||
|
||||
if call.data.get(SERVICE_KNX_ATTR_REMOVE):
|
||||
|
@ -434,9 +437,9 @@ class KNXModule:
|
|||
str(group_address),
|
||||
)
|
||||
|
||||
async def service_exposure_register_modify(self, call):
|
||||
async def service_exposure_register_modify(self, call: ServiceCall) -> None:
|
||||
"""Service for adding or removing an exposure to KNX bus."""
|
||||
group_address = call.data.get(KNX_ADDRESS)
|
||||
group_address = call.data[KNX_ADDRESS]
|
||||
|
||||
if call.data.get(SERVICE_KNX_ATTR_REMOVE):
|
||||
try:
|
||||
|
@ -451,13 +454,14 @@ class KNXModule:
|
|||
|
||||
if group_address in self.service_exposures:
|
||||
replaced_exposure = self.service_exposures.pop(group_address)
|
||||
assert replaced_exposure.device is not None
|
||||
_LOGGER.warning(
|
||||
"Service exposure_register replacing already registered exposure for '%s' - %s",
|
||||
group_address,
|
||||
replaced_exposure.device.name,
|
||||
)
|
||||
replaced_exposure.shutdown()
|
||||
exposure = create_knx_exposure(self.hass, self.xknx, call.data)
|
||||
exposure = create_knx_exposure(self.hass, self.xknx, call.data) # type: ignore[arg-type]
|
||||
self.service_exposures[group_address] = exposure
|
||||
_LOGGER.debug(
|
||||
"Service exposure_register registered exposure for '%s' - %s",
|
||||
|
@ -465,10 +469,10 @@ class KNXModule:
|
|||
exposure.device.name,
|
||||
)
|
||||
|
||||
async def service_send_to_knx_bus(self, call):
|
||||
async def service_send_to_knx_bus(self, call: ServiceCall) -> None:
|
||||
"""Service for sending an arbitrary KNX message to the KNX bus."""
|
||||
attr_address = call.data.get(KNX_ADDRESS)
|
||||
attr_payload = call.data.get(SERVICE_KNX_ATTR_PAYLOAD)
|
||||
attr_address = call.data[KNX_ADDRESS]
|
||||
attr_payload = call.data[SERVICE_KNX_ATTR_PAYLOAD]
|
||||
attr_type = call.data.get(SERVICE_KNX_ATTR_TYPE)
|
||||
|
||||
payload: DPTBinary | DPTArray
|
||||
|
@ -489,9 +493,9 @@ class KNXModule:
|
|||
)
|
||||
await self.xknx.telegrams.put(telegram)
|
||||
|
||||
async def service_read_to_knx_bus(self, call):
|
||||
async def service_read_to_knx_bus(self, call: ServiceCall) -> None:
|
||||
"""Service for sending a GroupValueRead telegram to the KNX bus."""
|
||||
for address in call.data.get(KNX_ADDRESS):
|
||||
for address in call.data[KNX_ADDRESS]:
|
||||
telegram = Telegram(
|
||||
destination_address=GroupAddress(address),
|
||||
payload=GroupValueRead(),
|
||||
|
|
|
@ -72,7 +72,7 @@ class KNXClimate(KnxEntity, ClimateEntity):
|
|||
@property
|
||||
def current_temperature(self) -> float | None:
|
||||
"""Return the current temperature."""
|
||||
return self._device.temperature.value # type: ignore[no-any-return]
|
||||
return self._device.temperature.value
|
||||
|
||||
@property
|
||||
def target_temperature_step(self) -> float:
|
||||
|
@ -82,7 +82,7 @@ class KNXClimate(KnxEntity, ClimateEntity):
|
|||
@property
|
||||
def target_temperature(self) -> float | None:
|
||||
"""Return the temperature we try to reach."""
|
||||
return self._device.target_temperature.value # type: ignore[no-any-return]
|
||||
return self._device.target_temperature.value
|
||||
|
||||
@property
|
||||
def min_temp(self) -> float:
|
||||
|
|
|
@ -1,38 +1,42 @@
|
|||
"""Base class for KNX devices."""
|
||||
from typing import cast
|
||||
|
||||
from xknx.devices import Climate as XknxClimate, Device as XknxDevice
|
||||
|
||||
from homeassistant.helpers.entity import Entity
|
||||
|
||||
from . import KNXModule
|
||||
from .const import DOMAIN
|
||||
|
||||
|
||||
class KnxEntity(Entity):
|
||||
"""Representation of a KNX entity."""
|
||||
|
||||
def __init__(self, device: XknxDevice):
|
||||
def __init__(self, device: XknxDevice) -> None:
|
||||
"""Set up device."""
|
||||
self._device = device
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
def name(self) -> str:
|
||||
"""Return the name of the KNX device."""
|
||||
return self._device.name
|
||||
|
||||
@property
|
||||
def available(self):
|
||||
def available(self) -> bool:
|
||||
"""Return True if entity is available."""
|
||||
return self.hass.data[DOMAIN].connected
|
||||
knx_module = cast(KNXModule, self.hass.data[DOMAIN])
|
||||
return knx_module.connected
|
||||
|
||||
@property
|
||||
def should_poll(self):
|
||||
def should_poll(self) -> bool:
|
||||
"""No polling needed within KNX."""
|
||||
return False
|
||||
|
||||
async def async_update(self):
|
||||
async def async_update(self) -> None:
|
||||
"""Request a state update from KNX bus."""
|
||||
await self._device.sync()
|
||||
|
||||
async def after_update_callback(self, device: XknxDevice):
|
||||
async def after_update_callback(self, device: XknxDevice) -> None:
|
||||
"""Call after device was updated."""
|
||||
self.async_write_ha_state()
|
||||
|
||||
|
|
|
@ -43,7 +43,7 @@ warn_redundant_casts = true
|
|||
warn_unused_configs = true
|
||||
|
||||
|
||||
[mypy-homeassistant.block_async_io,homeassistant.bootstrap,homeassistant.components,homeassistant.config_entries,homeassistant.config,homeassistant.const,homeassistant.core,homeassistant.data_entry_flow,homeassistant.exceptions,homeassistant.__init__,homeassistant.loader,homeassistant.__main__,homeassistant.requirements,homeassistant.runner,homeassistant.setup,homeassistant.util,homeassistant.auth.*,homeassistant.components.automation.*,homeassistant.components.binary_sensor.*,homeassistant.components.bond.*,homeassistant.components.calendar.*,homeassistant.components.cover.*,homeassistant.components.device_automation.*,homeassistant.components.frontend.*,homeassistant.components.geo_location.*,homeassistant.components.group.*,homeassistant.components.history.*,homeassistant.components.http.*,homeassistant.components.huawei_lte.*,homeassistant.components.hyperion.*,homeassistant.components.image_processing.*,homeassistant.components.integration.*,homeassistant.components.light.*,homeassistant.components.lock.*,homeassistant.components.mailbox.*,homeassistant.components.media_player.*,homeassistant.components.notify.*,homeassistant.components.number.*,homeassistant.components.persistent_notification.*,homeassistant.components.proximity.*,homeassistant.components.recorder.purge,homeassistant.components.recorder.repack,homeassistant.components.remote.*,homeassistant.components.scene.*,homeassistant.components.sensor.*,homeassistant.components.slack.*,homeassistant.components.sun.*,homeassistant.components.switch.*,homeassistant.components.systemmonitor.*,homeassistant.components.tts.*,homeassistant.components.vacuum.*,homeassistant.components.water_heater.*,homeassistant.components.weather.*,homeassistant.components.websocket_api.*,homeassistant.components.zone.*,homeassistant.components.zwave_js.*,homeassistant.helpers.*,homeassistant.scripts.*,homeassistant.util.*,tests.components.hyperion.*]
|
||||
[mypy-homeassistant.block_async_io,homeassistant.bootstrap,homeassistant.components,homeassistant.config_entries,homeassistant.config,homeassistant.const,homeassistant.core,homeassistant.data_entry_flow,homeassistant.exceptions,homeassistant.__init__,homeassistant.loader,homeassistant.__main__,homeassistant.requirements,homeassistant.runner,homeassistant.setup,homeassistant.util,homeassistant.auth.*,homeassistant.components.automation.*,homeassistant.components.binary_sensor.*,homeassistant.components.bond.*,homeassistant.components.calendar.*,homeassistant.components.cover.*,homeassistant.components.device_automation.*,homeassistant.components.frontend.*,homeassistant.components.geo_location.*,homeassistant.components.group.*,homeassistant.components.history.*,homeassistant.components.http.*,homeassistant.components.huawei_lte.*,homeassistant.components.hyperion.*,homeassistant.components.image_processing.*,homeassistant.components.integration.*,homeassistant.components.knx.*,homeassistant.components.light.*,homeassistant.components.lock.*,homeassistant.components.mailbox.*,homeassistant.components.media_player.*,homeassistant.components.notify.*,homeassistant.components.number.*,homeassistant.components.persistent_notification.*,homeassistant.components.proximity.*,homeassistant.components.recorder.purge,homeassistant.components.recorder.repack,homeassistant.components.remote.*,homeassistant.components.scene.*,homeassistant.components.sensor.*,homeassistant.components.slack.*,homeassistant.components.sun.*,homeassistant.components.switch.*,homeassistant.components.systemmonitor.*,homeassistant.components.tts.*,homeassistant.components.vacuum.*,homeassistant.components.water_heater.*,homeassistant.components.weather.*,homeassistant.components.websocket_api.*,homeassistant.components.zone.*,homeassistant.components.zwave_js.*,homeassistant.helpers.*,homeassistant.scripts.*,homeassistant.util.*,tests.components.hyperion.*]
|
||||
strict = true
|
||||
ignore_errors = false
|
||||
warn_unreachable = true
|
||||
|
|
|
@ -1 +1 @@
|
|||
"""The tests for KNX integration."""
|
||||
"""Tests for the KNX integration."""
|
||||
|
|
Loading…
Reference in New Issue