core/homeassistant/components/zha/switch.py

169 lines
5.9 KiB
Python
Raw Normal View History

"""Switches on Zigbee Home Automation networks."""
import functools
import logging
from typing import Any, List, Optional
from zigpy.zcl.clusters.general import OnOff
from zigpy.zcl.foundation import Status
from homeassistant.components.switch import DOMAIN, SwitchDevice
from homeassistant.const import STATE_ON, STATE_UNAVAILABLE
from homeassistant.core import CALLBACK_TYPE, State, callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.event import async_track_state_change
ZHA device channel refactoring (#31971) * Add ZHA core typing helper. * Add aux_channels to ZHA rule matching. * Add match rule claim_channels() method. * Expose underlying zigpy device. * Not sure we need this one. * Move "base" channels. * Framework for channel discovery. * Make DEVICE_CLASS and REMOTE_DEVICE_TYPE default dicts. * Remove attribute reporting configuration registry. * Refactor channels. - Refactor zha events - Use compound IDs and unique_ids - Refactor signal dispatching on attribute updates * Use unique id compatible with entities unique ids. * Refactor ZHA Entity registry. Let match rule to check for the match. * Refactor discovery to use new channels. * Cleanup ZDO channel. Remove unused zha store call. * Handle channel configuration and initialization. * Refactor ZHA Device to use new channels. * Refactor ZHA Gateway to use new discovery framework. Use hass.data for entity info intermediate store. * Don't keep entities in hass.data. * ZHA gateway new discovery framework. * Refactor ZHA platform loading. * Don't update ZHA entities, when restoring from zigpy. * ZHA entity discover tests. * Add AnalogInput sensor. * Remove 0xFC02 based entity from Keen smart vents. * Clean up IAS channels. * Refactor entity restoration. * Fix lumi.router entities name. * Rename EndpointsChannel to ChannelPool. * Make Channels.pools a list. * Fix cover test. * Fix FakeDevice class. * Fix device actions. * Fix channels typing. * Revert update_before_add=False * Refactor channel class matching. * Use a helper function for adding entities. * Make Pylint happy. * Rebase cleanup. * Update coverage for ZHA device type overrides. * Use cluster_id for single output cluster registry. * Remove ZHA typing from coverage. * Fix tests. * Address comments. * Address comments.
2020-02-21 23:06:57 +00:00
from .core import discovery
from .core.const import (
2019-08-02 14:37:21 +00:00
CHANNEL_ON_OFF,
2019-07-31 19:25:30 +00:00
DATA_ZHA,
DATA_ZHA_DISPATCHERS,
ZHA device channel refactoring (#31971) * Add ZHA core typing helper. * Add aux_channels to ZHA rule matching. * Add match rule claim_channels() method. * Expose underlying zigpy device. * Not sure we need this one. * Move "base" channels. * Framework for channel discovery. * Make DEVICE_CLASS and REMOTE_DEVICE_TYPE default dicts. * Remove attribute reporting configuration registry. * Refactor channels. - Refactor zha events - Use compound IDs and unique_ids - Refactor signal dispatching on attribute updates * Use unique id compatible with entities unique ids. * Refactor ZHA Entity registry. Let match rule to check for the match. * Refactor discovery to use new channels. * Cleanup ZDO channel. Remove unused zha store call. * Handle channel configuration and initialization. * Refactor ZHA Device to use new channels. * Refactor ZHA Gateway to use new discovery framework. Use hass.data for entity info intermediate store. * Don't keep entities in hass.data. * ZHA gateway new discovery framework. * Refactor ZHA platform loading. * Don't update ZHA entities, when restoring from zigpy. * ZHA entity discover tests. * Add AnalogInput sensor. * Remove 0xFC02 based entity from Keen smart vents. * Clean up IAS channels. * Refactor entity restoration. * Fix lumi.router entities name. * Rename EndpointsChannel to ChannelPool. * Make Channels.pools a list. * Fix cover test. * Fix FakeDevice class. * Fix device actions. * Fix channels typing. * Revert update_before_add=False * Refactor channel class matching. * Use a helper function for adding entities. * Make Pylint happy. * Rebase cleanup. * Update coverage for ZHA device type overrides. * Use cluster_id for single output cluster registry. * Remove ZHA typing from coverage. * Fix tests. * Address comments. * Address comments.
2020-02-21 23:06:57 +00:00
SIGNAL_ADD_ENTITIES,
2019-07-31 19:25:30 +00:00
SIGNAL_ATTR_UPDATED,
SIGNAL_REMOVE_GROUP,
)
from .core.registries import ZHA_ENTITIES
from .entity import BaseZhaEntity, ZhaEntity
_LOGGER = logging.getLogger(__name__)
STRICT_MATCH = functools.partial(ZHA_ENTITIES.strict_match, DOMAIN)
GROUP_MATCH = functools.partial(ZHA_ENTITIES.group_match, DOMAIN)
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up the Zigbee Home Automation switch from config entry."""
entities_to_create = hass.data[DATA_ZHA][DOMAIN]
unsub = async_dispatcher_connect(
ZHA device channel refactoring (#31971) * Add ZHA core typing helper. * Add aux_channels to ZHA rule matching. * Add match rule claim_channels() method. * Expose underlying zigpy device. * Not sure we need this one. * Move "base" channels. * Framework for channel discovery. * Make DEVICE_CLASS and REMOTE_DEVICE_TYPE default dicts. * Remove attribute reporting configuration registry. * Refactor channels. - Refactor zha events - Use compound IDs and unique_ids - Refactor signal dispatching on attribute updates * Use unique id compatible with entities unique ids. * Refactor ZHA Entity registry. Let match rule to check for the match. * Refactor discovery to use new channels. * Cleanup ZDO channel. Remove unused zha store call. * Handle channel configuration and initialization. * Refactor ZHA Device to use new channels. * Refactor ZHA Gateway to use new discovery framework. Use hass.data for entity info intermediate store. * Don't keep entities in hass.data. * ZHA gateway new discovery framework. * Refactor ZHA platform loading. * Don't update ZHA entities, when restoring from zigpy. * ZHA entity discover tests. * Add AnalogInput sensor. * Remove 0xFC02 based entity from Keen smart vents. * Clean up IAS channels. * Refactor entity restoration. * Fix lumi.router entities name. * Rename EndpointsChannel to ChannelPool. * Make Channels.pools a list. * Fix cover test. * Fix FakeDevice class. * Fix device actions. * Fix channels typing. * Revert update_before_add=False * Refactor channel class matching. * Use a helper function for adding entities. * Make Pylint happy. * Rebase cleanup. * Update coverage for ZHA device type overrides. * Use cluster_id for single output cluster registry. * Remove ZHA typing from coverage. * Fix tests. * Address comments. * Address comments.
2020-02-21 23:06:57 +00:00
hass,
SIGNAL_ADD_ENTITIES,
functools.partial(
discovery.async_add_entities, async_add_entities, entities_to_create
),
2019-07-31 19:25:30 +00:00
)
hass.data[DATA_ZHA][DATA_ZHA_DISPATCHERS].append(unsub)
class BaseSwitch(BaseZhaEntity, SwitchDevice):
"""Common base class for zha switches."""
def __init__(self, *args, **kwargs):
"""Initialize the ZHA switch."""
self._on_off_channel = None
self._state = None
super().__init__(*args, **kwargs)
@property
def is_on(self) -> bool:
"""Return if the switch is on based on the statemachine."""
if self._state is None:
return False
return self._state
async def async_turn_on(self, **kwargs) -> None:
"""Turn the entity on."""
result = await self._on_off_channel.on()
if not isinstance(result, list) or result[1] is not Status.SUCCESS:
return
self._state = True
self.async_write_ha_state()
async def async_turn_off(self, **kwargs) -> None:
"""Turn the entity off."""
result = await self._on_off_channel.off()
if not isinstance(result, list) or result[1] is not Status.SUCCESS:
return
self._state = False
self.async_write_ha_state()
@STRICT_MATCH(channel_names=CHANNEL_ON_OFF)
class Switch(ZhaEntity, BaseSwitch):
"""ZHA switch."""
def __init__(self, unique_id, zha_device, channels, **kwargs):
"""Initialize the ZHA switch."""
super().__init__(unique_id, zha_device, channels, **kwargs)
self._on_off_channel = self.cluster_channels.get(CHANNEL_ON_OFF)
@callback
def async_set_state(self, attr_id: int, attr_name: str, value: Any):
"""Handle state update from channel."""
self._state = bool(value)
self.async_write_ha_state()
async def async_added_to_hass(self) -> None:
"""Run when about to be added to hass."""
await super().async_added_to_hass()
await self.async_accept_signal(
2019-07-31 19:25:30 +00:00
self._on_off_channel, SIGNAL_ATTR_UPDATED, self.async_set_state
)
@callback
def async_restore_last_state(self, last_state) -> None:
"""Restore previous state."""
self._state = last_state.state == STATE_ON
async def async_update(self) -> None:
"""Attempt to retrieve on off state from the switch."""
await super().async_update()
if self._on_off_channel:
state = await self._on_off_channel.get_attribute_value("on_off")
if state is not None:
self._state = state
@GROUP_MATCH()
class SwitchGroup(BaseSwitch):
"""Representation of a switch group."""
def __init__(
self, entity_ids: List[str], unique_id: str, group_id: int, zha_device, **kwargs
) -> None:
"""Initialize a switch group."""
super().__init__(unique_id, zha_device, **kwargs)
self._name: str = f"{zha_device.gateway.groups.get(group_id).name}_group_{group_id}"
self._group_id: int = group_id
self._available: bool = False
self._entity_ids: List[str] = entity_ids
group = self.zha_device.gateway.get_group(self._group_id)
self._on_off_channel = group.endpoint[OnOff.cluster_id]
self._async_unsub_state_changed: Optional[CALLBACK_TYPE] = None
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
await super().async_added_to_hass()
await self.async_accept_signal(
None,
f"{SIGNAL_REMOVE_GROUP}_{self._group_id}",
self.async_remove,
signal_override=True,
)
@callback
def async_state_changed_listener(
entity_id: str, old_state: State, new_state: State
):
"""Handle child updates."""
self.async_schedule_update_ha_state(True)
self._async_unsub_state_changed = async_track_state_change(
self.hass, self._entity_ids, async_state_changed_listener
)
await self.async_update()
async def async_will_remove_from_hass(self) -> None:
"""Handle removal from Home Assistant."""
await super().async_will_remove_from_hass()
if self._async_unsub_state_changed is not None:
self._async_unsub_state_changed()
self._async_unsub_state_changed = None
async def async_update(self) -> None:
"""Query all members and determine the light group state."""
all_states = [self.hass.states.get(x) for x in self._entity_ids]
states: List[State] = list(filter(None, all_states))
on_states = [state for state in states if state.state == STATE_ON]
self._state = len(on_states) > 0
self._available = any(state.state != STATE_UNAVAILABLE for state in states)