"""KNX integration services.""" from __future__ import annotations from functools import partial import logging from typing import TYPE_CHECKING import voluptuous as vol from xknx.dpt import DPTArray, DPTBase, DPTBinary from xknx.telegram import Telegram from xknx.telegram.address import parse_device_group_address from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite from homeassistant.const import CONF_TYPE, SERVICE_RELOAD from homeassistant.core import HomeAssistant, ServiceCall, callback from homeassistant.exceptions import HomeAssistantError, ServiceValidationError import homeassistant.helpers.config_validation as cv from homeassistant.helpers.service import async_register_admin_service from .const import ( DOMAIN, KNX_ADDRESS, SERVICE_KNX_ATTR_PAYLOAD, SERVICE_KNX_ATTR_REMOVE, SERVICE_KNX_ATTR_RESPONSE, SERVICE_KNX_ATTR_TYPE, SERVICE_KNX_EVENT_REGISTER, SERVICE_KNX_EXPOSURE_REGISTER, SERVICE_KNX_READ, SERVICE_KNX_SEND, ) from .expose import create_knx_exposure from .schema import ExposeSchema, ga_validator, sensor_type_validator if TYPE_CHECKING: from . import KNXModule _LOGGER = logging.getLogger(__name__) @callback def register_knx_services(hass: HomeAssistant) -> None: """Register KNX integration services.""" hass.services.async_register( DOMAIN, SERVICE_KNX_SEND, partial(service_send_to_knx_bus, hass), schema=SERVICE_KNX_SEND_SCHEMA, ) hass.services.async_register( DOMAIN, SERVICE_KNX_READ, partial(service_read_to_knx_bus, hass), schema=SERVICE_KNX_READ_SCHEMA, ) async_register_admin_service( hass, DOMAIN, SERVICE_KNX_EVENT_REGISTER, partial(service_event_register_modify, hass), schema=SERVICE_KNX_EVENT_REGISTER_SCHEMA, ) async_register_admin_service( hass, DOMAIN, SERVICE_KNX_EXPOSURE_REGISTER, partial(service_exposure_register_modify, hass), schema=SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA, ) async_register_admin_service( hass, DOMAIN, SERVICE_RELOAD, partial(service_reload_integration, hass), ) @callback def get_knx_module(hass: HomeAssistant) -> KNXModule: """Return KNXModule instance.""" try: return hass.data[DOMAIN] # type: ignore[no-any-return] except KeyError as err: raise HomeAssistantError("KNX entry not loaded") from err SERVICE_KNX_EVENT_REGISTER_SCHEMA = vol.Schema( { vol.Required(KNX_ADDRESS): vol.All( cv.ensure_list, [ga_validator], ), vol.Optional(CONF_TYPE): sensor_type_validator, vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean, } ) async def service_event_register_modify(hass: HomeAssistant, call: ServiceCall) -> None: """Service for adding or removing a GroupAddress to the knx_event filter.""" knx_module = get_knx_module(hass) attr_address = call.data[KNX_ADDRESS] group_addresses = list(map(parse_device_group_address, attr_address)) if call.data.get(SERVICE_KNX_ATTR_REMOVE): for group_address in group_addresses: try: knx_module.knx_event_callback.group_addresses.remove(group_address) except ValueError: _LOGGER.warning( "Service event_register could not remove event for '%s'", str(group_address), ) if group_address in knx_module.group_address_transcoder: del knx_module.group_address_transcoder[group_address] return if (dpt := call.data.get(CONF_TYPE)) and ( transcoder := DPTBase.parse_transcoder(dpt) ): knx_module.group_address_transcoder.update( { _address: transcoder # type: ignore[type-abstract] for _address in group_addresses } ) for group_address in group_addresses: if group_address in knx_module.knx_event_callback.group_addresses: continue knx_module.knx_event_callback.group_addresses.append(group_address) _LOGGER.debug( "Service event_register registered event for '%s'", str(group_address), ) SERVICE_KNX_EXPOSURE_REGISTER_SCHEMA = vol.Any( ExposeSchema.EXPOSE_SENSOR_SCHEMA.extend( { vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean, } ), vol.Schema( # for removing only `address` is required { vol.Required(KNX_ADDRESS): ga_validator, vol.Required(SERVICE_KNX_ATTR_REMOVE): vol.All(cv.boolean, True), }, extra=vol.ALLOW_EXTRA, ), ) async def service_exposure_register_modify( hass: HomeAssistant, call: ServiceCall ) -> None: """Service for adding or removing an exposure to KNX bus.""" knx_module = get_knx_module(hass) group_address = call.data[KNX_ADDRESS] if call.data.get(SERVICE_KNX_ATTR_REMOVE): try: removed_exposure = knx_module.service_exposures.pop(group_address) except KeyError as err: raise ServiceValidationError( f"Could not find exposure for '{group_address}' to remove." ) from err removed_exposure.shutdown() return if group_address in knx_module.service_exposures: replaced_exposure = knx_module.service_exposures.pop(group_address) _LOGGER.warning( ( "Service exposure_register replacing already registered exposure" " for '%s' - %s" ), group_address, replaced_exposure.device.name, ) replaced_exposure.shutdown() exposure = create_knx_exposure(knx_module.hass, knx_module.xknx, call.data) knx_module.service_exposures[group_address] = exposure _LOGGER.debug( "Service exposure_register registered exposure for '%s' - %s", group_address, exposure.device.name, ) SERVICE_KNX_SEND_SCHEMA = vol.Any( vol.Schema( { vol.Required(KNX_ADDRESS): vol.All( cv.ensure_list, [ga_validator], ), vol.Required(SERVICE_KNX_ATTR_PAYLOAD): cv.match_all, vol.Required(SERVICE_KNX_ATTR_TYPE): sensor_type_validator, vol.Optional(SERVICE_KNX_ATTR_RESPONSE, default=False): cv.boolean, } ), vol.Schema( # without type given payload is treated as raw bytes { vol.Required(KNX_ADDRESS): vol.All( cv.ensure_list, [ga_validator], ), vol.Required(SERVICE_KNX_ATTR_PAYLOAD): vol.Any( cv.positive_int, [cv.positive_int] ), vol.Optional(SERVICE_KNX_ATTR_RESPONSE, default=False): cv.boolean, } ), ) async def service_send_to_knx_bus(hass: HomeAssistant, call: ServiceCall) -> None: """Service for sending an arbitrary KNX message to the KNX bus.""" knx_module = get_knx_module(hass) attr_address = call.data[KNX_ADDRESS] attr_payload = call.data[SERVICE_KNX_ATTR_PAYLOAD] attr_type = call.data.get(SERVICE_KNX_ATTR_TYPE) attr_response = call.data[SERVICE_KNX_ATTR_RESPONSE] payload: DPTBinary | DPTArray if attr_type is not None: transcoder = DPTBase.parse_transcoder(attr_type) if transcoder is None: raise ValueError(f"Invalid type for knx.send service: {attr_type}") payload = transcoder.to_knx(attr_payload) elif isinstance(attr_payload, int): payload = DPTBinary(attr_payload) else: payload = DPTArray(attr_payload) for address in attr_address: telegram = Telegram( destination_address=parse_device_group_address(address), payload=GroupValueResponse(payload) if attr_response else GroupValueWrite(payload), source_address=knx_module.xknx.current_address, ) await knx_module.xknx.telegrams.put(telegram) SERVICE_KNX_READ_SCHEMA = vol.Schema( { vol.Required(KNX_ADDRESS): vol.All( cv.ensure_list, [ga_validator], ) } ) async def service_read_to_knx_bus(hass: HomeAssistant, call: ServiceCall) -> None: """Service for sending a GroupValueRead telegram to the KNX bus.""" knx_module = get_knx_module(hass) for address in call.data[KNX_ADDRESS]: telegram = Telegram( destination_address=parse_device_group_address(address), payload=GroupValueRead(), source_address=knx_module.xknx.current_address, ) await knx_module.xknx.telegrams.put(telegram) async def service_reload_integration(hass: HomeAssistant, call: ServiceCall) -> None: """Reload the integration.""" knx_module = get_knx_module(hass) await hass.config_entries.async_reload(knx_module.entry.entry_id) hass.bus.async_fire(f"event_{DOMAIN}_reloaded", context=call.context)