410 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			410 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
"""Service calls related dependencies for LCN component."""
 | 
						|
 | 
						|
import pypck
 | 
						|
import voluptuous as vol
 | 
						|
 | 
						|
from homeassistant.const import (
 | 
						|
    CONF_ADDRESS,
 | 
						|
    CONF_BRIGHTNESS,
 | 
						|
    CONF_HOST,
 | 
						|
    CONF_STATE,
 | 
						|
    CONF_UNIT_OF_MEASUREMENT,
 | 
						|
)
 | 
						|
from homeassistant.core import HomeAssistant, ServiceCall
 | 
						|
import homeassistant.helpers.config_validation as cv
 | 
						|
 | 
						|
from .const import (
 | 
						|
    CONF_KEYS,
 | 
						|
    CONF_LED,
 | 
						|
    CONF_OUTPUT,
 | 
						|
    CONF_PCK,
 | 
						|
    CONF_RELVARREF,
 | 
						|
    CONF_ROW,
 | 
						|
    CONF_SETPOINT,
 | 
						|
    CONF_TABLE,
 | 
						|
    CONF_TEXT,
 | 
						|
    CONF_TIME,
 | 
						|
    CONF_TIME_UNIT,
 | 
						|
    CONF_TRANSITION,
 | 
						|
    CONF_VALUE,
 | 
						|
    CONF_VARIABLE,
 | 
						|
    DOMAIN,
 | 
						|
    LED_PORTS,
 | 
						|
    LED_STATUS,
 | 
						|
    OUTPUT_PORTS,
 | 
						|
    RELVARREF,
 | 
						|
    SENDKEYCOMMANDS,
 | 
						|
    SETPOINTS,
 | 
						|
    THRESHOLDS,
 | 
						|
    TIME_UNITS,
 | 
						|
    VAR_UNITS,
 | 
						|
    VARIABLES,
 | 
						|
)
 | 
						|
from .helpers import (
 | 
						|
    DeviceConnectionType,
 | 
						|
    get_device_connection,
 | 
						|
    is_address,
 | 
						|
    is_states_string,
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
class LcnServiceCall:
 | 
						|
    """Parent class for all LCN service calls."""
 | 
						|
 | 
						|
    schema = vol.Schema({vol.Required(CONF_ADDRESS): is_address})
 | 
						|
 | 
						|
    def __init__(self, hass: HomeAssistant) -> None:
 | 
						|
        """Initialize service call."""
 | 
						|
        self.hass = hass
 | 
						|
 | 
						|
    def get_device_connection(self, service: ServiceCall) -> DeviceConnectionType:
 | 
						|
        """Get address connection object."""
 | 
						|
        address, host_name = service.data[CONF_ADDRESS]
 | 
						|
 | 
						|
        for config_entry in self.hass.config_entries.async_entries(DOMAIN):
 | 
						|
            if config_entry.data[CONF_HOST] == host_name:
 | 
						|
                device_connection = get_device_connection(
 | 
						|
                    self.hass, address, config_entry
 | 
						|
                )
 | 
						|
                if device_connection is None:
 | 
						|
                    raise ValueError("Wrong address.")
 | 
						|
                return device_connection
 | 
						|
        raise ValueError("Invalid host name.")
 | 
						|
 | 
						|
    async def async_call_service(self, service: ServiceCall) -> None:
 | 
						|
        """Execute service call."""
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
 | 
						|
class OutputAbs(LcnServiceCall):
 | 
						|
    """Set absolute brightness of output port in percent."""
 | 
						|
 | 
						|
    schema = LcnServiceCall.schema.extend(
 | 
						|
        {
 | 
						|
            vol.Required(CONF_OUTPUT): vol.All(vol.Upper, vol.In(OUTPUT_PORTS)),
 | 
						|
            vol.Required(CONF_BRIGHTNESS): vol.All(
 | 
						|
                vol.Coerce(int), vol.Range(min=0, max=100)
 | 
						|
            ),
 | 
						|
            vol.Optional(CONF_TRANSITION, default=0): vol.All(
 | 
						|
                vol.Coerce(float), vol.Range(min=0.0, max=486.0)
 | 
						|
            ),
 | 
						|
        }
 | 
						|
    )
 | 
						|
 | 
						|
    async def async_call_service(self, service: ServiceCall) -> None:
 | 
						|
        """Execute service call."""
 | 
						|
        output = pypck.lcn_defs.OutputPort[service.data[CONF_OUTPUT]]
 | 
						|
        brightness = service.data[CONF_BRIGHTNESS]
 | 
						|
        transition = pypck.lcn_defs.time_to_ramp_value(
 | 
						|
            service.data[CONF_TRANSITION] * 1000
 | 
						|
        )
 | 
						|
 | 
						|
        device_connection = self.get_device_connection(service)
 | 
						|
        await device_connection.dim_output(output.value, brightness, transition)
 | 
						|
 | 
						|
 | 
						|
class OutputRel(LcnServiceCall):
 | 
						|
    """Set relative brightness of output port in percent."""
 | 
						|
 | 
						|
    schema = LcnServiceCall.schema.extend(
 | 
						|
        {
 | 
						|
            vol.Required(CONF_OUTPUT): vol.All(vol.Upper, vol.In(OUTPUT_PORTS)),
 | 
						|
            vol.Required(CONF_BRIGHTNESS): vol.All(
 | 
						|
                vol.Coerce(int), vol.Range(min=-100, max=100)
 | 
						|
            ),
 | 
						|
        }
 | 
						|
    )
 | 
						|
 | 
						|
    async def async_call_service(self, service: ServiceCall) -> None:
 | 
						|
        """Execute service call."""
 | 
						|
        output = pypck.lcn_defs.OutputPort[service.data[CONF_OUTPUT]]
 | 
						|
        brightness = service.data[CONF_BRIGHTNESS]
 | 
						|
 | 
						|
        device_connection = self.get_device_connection(service)
 | 
						|
        await device_connection.rel_output(output.value, brightness)
 | 
						|
 | 
						|
 | 
						|
class OutputToggle(LcnServiceCall):
 | 
						|
    """Toggle output port."""
 | 
						|
 | 
						|
    schema = LcnServiceCall.schema.extend(
 | 
						|
        {
 | 
						|
            vol.Required(CONF_OUTPUT): vol.All(vol.Upper, vol.In(OUTPUT_PORTS)),
 | 
						|
            vol.Optional(CONF_TRANSITION, default=0): vol.All(
 | 
						|
                vol.Coerce(float), vol.Range(min=0.0, max=486.0)
 | 
						|
            ),
 | 
						|
        }
 | 
						|
    )
 | 
						|
 | 
						|
    async def async_call_service(self, service: ServiceCall) -> None:
 | 
						|
        """Execute service call."""
 | 
						|
        output = pypck.lcn_defs.OutputPort[service.data[CONF_OUTPUT]]
 | 
						|
        transition = pypck.lcn_defs.time_to_ramp_value(
 | 
						|
            service.data[CONF_TRANSITION] * 1000
 | 
						|
        )
 | 
						|
 | 
						|
        device_connection = self.get_device_connection(service)
 | 
						|
        await device_connection.toggle_output(output.value, transition)
 | 
						|
 | 
						|
 | 
						|
class Relays(LcnServiceCall):
 | 
						|
    """Set the relays status."""
 | 
						|
 | 
						|
    schema = LcnServiceCall.schema.extend({vol.Required(CONF_STATE): is_states_string})
 | 
						|
 | 
						|
    async def async_call_service(self, service: ServiceCall) -> None:
 | 
						|
        """Execute service call."""
 | 
						|
        states = [
 | 
						|
            pypck.lcn_defs.RelayStateModifier[state]
 | 
						|
            for state in service.data[CONF_STATE]
 | 
						|
        ]
 | 
						|
 | 
						|
        device_connection = self.get_device_connection(service)
 | 
						|
        await device_connection.control_relays(states)
 | 
						|
 | 
						|
 | 
						|
class Led(LcnServiceCall):
 | 
						|
    """Set the led state."""
 | 
						|
 | 
						|
    schema = LcnServiceCall.schema.extend(
 | 
						|
        {
 | 
						|
            vol.Required(CONF_LED): vol.All(vol.Upper, vol.In(LED_PORTS)),
 | 
						|
            vol.Required(CONF_STATE): vol.All(vol.Upper, vol.In(LED_STATUS)),
 | 
						|
        }
 | 
						|
    )
 | 
						|
 | 
						|
    async def async_call_service(self, service: ServiceCall) -> None:
 | 
						|
        """Execute service call."""
 | 
						|
        led = pypck.lcn_defs.LedPort[service.data[CONF_LED]]
 | 
						|
        led_state = pypck.lcn_defs.LedStatus[service.data[CONF_STATE]]
 | 
						|
 | 
						|
        device_connection = self.get_device_connection(service)
 | 
						|
        await device_connection.control_led(led, led_state)
 | 
						|
 | 
						|
 | 
						|
class VarAbs(LcnServiceCall):
 | 
						|
    """Set absolute value of a variable or setpoint.
 | 
						|
 | 
						|
    Variable has to be set as counter!
 | 
						|
    Regulator setpoints can also be set using R1VARSETPOINT, R2VARSETPOINT.
 | 
						|
    """
 | 
						|
 | 
						|
    schema = LcnServiceCall.schema.extend(
 | 
						|
        {
 | 
						|
            vol.Required(CONF_VARIABLE): vol.All(
 | 
						|
                vol.Upper, vol.In(VARIABLES + SETPOINTS)
 | 
						|
            ),
 | 
						|
            vol.Optional(CONF_VALUE, default=0): vol.Coerce(float),
 | 
						|
            vol.Optional(CONF_UNIT_OF_MEASUREMENT, default="native"): vol.All(
 | 
						|
                vol.Upper, vol.In(VAR_UNITS)
 | 
						|
            ),
 | 
						|
        }
 | 
						|
    )
 | 
						|
 | 
						|
    async def async_call_service(self, service: ServiceCall) -> None:
 | 
						|
        """Execute service call."""
 | 
						|
        var = pypck.lcn_defs.Var[service.data[CONF_VARIABLE]]
 | 
						|
        value = service.data[CONF_VALUE]
 | 
						|
        unit = pypck.lcn_defs.VarUnit.parse(service.data[CONF_UNIT_OF_MEASUREMENT])
 | 
						|
 | 
						|
        device_connection = self.get_device_connection(service)
 | 
						|
        await device_connection.var_abs(var, value, unit)
 | 
						|
 | 
						|
 | 
						|
class VarReset(LcnServiceCall):
 | 
						|
    """Reset value of variable or setpoint."""
 | 
						|
 | 
						|
    schema = LcnServiceCall.schema.extend(
 | 
						|
        {vol.Required(CONF_VARIABLE): vol.All(vol.Upper, vol.In(VARIABLES + SETPOINTS))}
 | 
						|
    )
 | 
						|
 | 
						|
    async def async_call_service(self, service: ServiceCall) -> None:
 | 
						|
        """Execute service call."""
 | 
						|
        var = pypck.lcn_defs.Var[service.data[CONF_VARIABLE]]
 | 
						|
 | 
						|
        device_connection = self.get_device_connection(service)
 | 
						|
        await device_connection.var_reset(var)
 | 
						|
 | 
						|
 | 
						|
class VarRel(LcnServiceCall):
 | 
						|
    """Shift value of a variable, setpoint or threshold."""
 | 
						|
 | 
						|
    schema = LcnServiceCall.schema.extend(
 | 
						|
        {
 | 
						|
            vol.Required(CONF_VARIABLE): vol.All(
 | 
						|
                vol.Upper, vol.In(VARIABLES + SETPOINTS + THRESHOLDS)
 | 
						|
            ),
 | 
						|
            vol.Optional(CONF_VALUE, default=0): vol.Coerce(float),
 | 
						|
            vol.Optional(CONF_UNIT_OF_MEASUREMENT, default="native"): vol.All(
 | 
						|
                vol.Upper, vol.In(VAR_UNITS)
 | 
						|
            ),
 | 
						|
            vol.Optional(CONF_RELVARREF, default="current"): vol.All(
 | 
						|
                vol.Upper, vol.In(RELVARREF)
 | 
						|
            ),
 | 
						|
        }
 | 
						|
    )
 | 
						|
 | 
						|
    async def async_call_service(self, service: ServiceCall) -> None:
 | 
						|
        """Execute service call."""
 | 
						|
        var = pypck.lcn_defs.Var[service.data[CONF_VARIABLE]]
 | 
						|
        value = service.data[CONF_VALUE]
 | 
						|
        unit = pypck.lcn_defs.VarUnit.parse(service.data[CONF_UNIT_OF_MEASUREMENT])
 | 
						|
        value_ref = pypck.lcn_defs.RelVarRef[service.data[CONF_RELVARREF]]
 | 
						|
 | 
						|
        device_connection = self.get_device_connection(service)
 | 
						|
        await device_connection.var_rel(var, value, unit, value_ref)
 | 
						|
 | 
						|
 | 
						|
class LockRegulator(LcnServiceCall):
 | 
						|
    """Locks a regulator setpoint."""
 | 
						|
 | 
						|
    schema = LcnServiceCall.schema.extend(
 | 
						|
        {
 | 
						|
            vol.Required(CONF_SETPOINT): vol.All(vol.Upper, vol.In(SETPOINTS)),
 | 
						|
            vol.Optional(CONF_STATE, default=False): bool,
 | 
						|
        }
 | 
						|
    )
 | 
						|
 | 
						|
    async def async_call_service(self, service: ServiceCall) -> None:
 | 
						|
        """Execute service call."""
 | 
						|
        setpoint = pypck.lcn_defs.Var[service.data[CONF_SETPOINT]]
 | 
						|
        state = service.data[CONF_STATE]
 | 
						|
 | 
						|
        reg_id = pypck.lcn_defs.Var.to_set_point_id(setpoint)
 | 
						|
        device_connection = self.get_device_connection(service)
 | 
						|
        await device_connection.lock_regulator(reg_id, state)
 | 
						|
 | 
						|
 | 
						|
class SendKeys(LcnServiceCall):
 | 
						|
    """Sends keys (which executes bound commands)."""
 | 
						|
 | 
						|
    schema = LcnServiceCall.schema.extend(
 | 
						|
        {
 | 
						|
            vol.Required(CONF_KEYS): vol.All(
 | 
						|
                vol.Upper, cv.matches_regex(r"^([A-D][1-8])+$")
 | 
						|
            ),
 | 
						|
            vol.Optional(CONF_STATE, default="hit"): vol.All(
 | 
						|
                vol.Upper, vol.In(SENDKEYCOMMANDS)
 | 
						|
            ),
 | 
						|
            vol.Optional(CONF_TIME, default=0): cv.positive_int,
 | 
						|
            vol.Optional(CONF_TIME_UNIT, default="S"): vol.All(
 | 
						|
                vol.Upper, vol.In(TIME_UNITS)
 | 
						|
            ),
 | 
						|
        }
 | 
						|
    )
 | 
						|
 | 
						|
    async def async_call_service(self, service: ServiceCall) -> None:
 | 
						|
        """Execute service call."""
 | 
						|
        device_connection = self.get_device_connection(service)
 | 
						|
 | 
						|
        keys = [[False] * 8 for i in range(4)]
 | 
						|
 | 
						|
        key_strings = zip(service.data[CONF_KEYS][::2], service.data[CONF_KEYS][1::2])
 | 
						|
 | 
						|
        for table, key in key_strings:
 | 
						|
            table_id = ord(table) - 65
 | 
						|
            key_id = int(key) - 1
 | 
						|
            keys[table_id][key_id] = True
 | 
						|
 | 
						|
        if (delay_time := service.data[CONF_TIME]) != 0:
 | 
						|
            hit = pypck.lcn_defs.SendKeyCommand.HIT
 | 
						|
            if pypck.lcn_defs.SendKeyCommand[service.data[CONF_STATE]] != hit:
 | 
						|
                raise ValueError(
 | 
						|
                    "Only hit command is allowed when sending deferred keys."
 | 
						|
                )
 | 
						|
            delay_unit = pypck.lcn_defs.TimeUnit.parse(service.data[CONF_TIME_UNIT])
 | 
						|
            await device_connection.send_keys_hit_deferred(keys, delay_time, delay_unit)
 | 
						|
        else:
 | 
						|
            state = pypck.lcn_defs.SendKeyCommand[service.data[CONF_STATE]]
 | 
						|
            await device_connection.send_keys(keys, state)
 | 
						|
 | 
						|
 | 
						|
class LockKeys(LcnServiceCall):
 | 
						|
    """Lock keys."""
 | 
						|
 | 
						|
    schema = LcnServiceCall.schema.extend(
 | 
						|
        {
 | 
						|
            vol.Optional(CONF_TABLE, default="a"): vol.All(
 | 
						|
                vol.Upper, cv.matches_regex(r"^[A-D]$")
 | 
						|
            ),
 | 
						|
            vol.Required(CONF_STATE): is_states_string,
 | 
						|
            vol.Optional(CONF_TIME, default=0): cv.positive_int,
 | 
						|
            vol.Optional(CONF_TIME_UNIT, default="S"): vol.All(
 | 
						|
                vol.Upper, vol.In(TIME_UNITS)
 | 
						|
            ),
 | 
						|
        }
 | 
						|
    )
 | 
						|
 | 
						|
    async def async_call_service(self, service: ServiceCall) -> None:
 | 
						|
        """Execute service call."""
 | 
						|
        device_connection = self.get_device_connection(service)
 | 
						|
 | 
						|
        states = [
 | 
						|
            pypck.lcn_defs.KeyLockStateModifier[state]
 | 
						|
            for state in service.data[CONF_STATE]
 | 
						|
        ]
 | 
						|
        table_id = ord(service.data[CONF_TABLE]) - 65
 | 
						|
 | 
						|
        if (delay_time := service.data[CONF_TIME]) != 0:
 | 
						|
            if table_id != 0:
 | 
						|
                raise ValueError(
 | 
						|
                    "Only table A is allowed when locking keys for a specific time."
 | 
						|
                )
 | 
						|
            delay_unit = pypck.lcn_defs.TimeUnit.parse(service.data[CONF_TIME_UNIT])
 | 
						|
            await device_connection.lock_keys_tab_a_temporary(
 | 
						|
                delay_time, delay_unit, states
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            await device_connection.lock_keys(table_id, states)
 | 
						|
 | 
						|
        handler = device_connection.status_requests_handler
 | 
						|
        await handler.request_status_locked_keys_timeout()
 | 
						|
 | 
						|
 | 
						|
class DynText(LcnServiceCall):
 | 
						|
    """Send dynamic text to LCN-GTxD displays."""
 | 
						|
 | 
						|
    schema = LcnServiceCall.schema.extend(
 | 
						|
        {
 | 
						|
            vol.Required(CONF_ROW): vol.All(int, vol.Range(min=1, max=4)),
 | 
						|
            vol.Required(CONF_TEXT): vol.All(str, vol.Length(max=60)),
 | 
						|
        }
 | 
						|
    )
 | 
						|
 | 
						|
    async def async_call_service(self, service: ServiceCall) -> None:
 | 
						|
        """Execute service call."""
 | 
						|
        row_id = service.data[CONF_ROW] - 1
 | 
						|
        text = service.data[CONF_TEXT]
 | 
						|
 | 
						|
        device_connection = self.get_device_connection(service)
 | 
						|
        await device_connection.dyn_text(row_id, text)
 | 
						|
 | 
						|
 | 
						|
class Pck(LcnServiceCall):
 | 
						|
    """Send arbitrary PCK command."""
 | 
						|
 | 
						|
    schema = LcnServiceCall.schema.extend({vol.Required(CONF_PCK): str})
 | 
						|
 | 
						|
    async def async_call_service(self, service: ServiceCall) -> None:
 | 
						|
        """Execute service call."""
 | 
						|
        pck = service.data[CONF_PCK]
 | 
						|
        device_connection = self.get_device_connection(service)
 | 
						|
        await device_connection.pck(pck)
 | 
						|
 | 
						|
 | 
						|
SERVICES = (
 | 
						|
    ("output_abs", OutputAbs),
 | 
						|
    ("output_rel", OutputRel),
 | 
						|
    ("output_toggle", OutputToggle),
 | 
						|
    ("relays", Relays),
 | 
						|
    ("var_abs", VarAbs),
 | 
						|
    ("var_reset", VarReset),
 | 
						|
    ("var_rel", VarRel),
 | 
						|
    ("lock_regulator", LockRegulator),
 | 
						|
    ("led", Led),
 | 
						|
    ("send_keys", SendKeys),
 | 
						|
    ("lock_keys", LockKeys),
 | 
						|
    ("dyn_text", DynText),
 | 
						|
    ("pck", Pck),
 | 
						|
)
 |