core/homeassistant/components/igloohome/lock.py

92 lines
2.9 KiB
Python

"""Implementation of the lock platform."""
from datetime import timedelta
from aiohttp import ClientError
from igloohome_api import (
BRIDGE_JOB_LOCK,
BRIDGE_JOB_UNLOCK,
DEVICE_TYPE_LOCK,
Api as IgloohomeApi,
ApiException,
GetDeviceInfoResponse,
)
from homeassistant.components.lock import LockEntity, LockEntityFeature
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from . import IgloohomeConfigEntry
from .entity import IgloohomeBaseEntity
from .utils import get_linked_bridge
# Scan interval set to allow Lock entity update the bridge linked to it.
SCAN_INTERVAL = timedelta(hours=1)
async def async_setup_entry(
hass: HomeAssistant,
entry: IgloohomeConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up lock entities."""
async_add_entities(
IgloohomeLockEntity(
api_device_info=device,
api=entry.runtime_data.api,
bridge_id=str(bridge),
)
for device in entry.runtime_data.devices
if device.type == DEVICE_TYPE_LOCK
and (bridge := get_linked_bridge(device.deviceId, entry.runtime_data.devices))
is not None
)
class IgloohomeLockEntity(IgloohomeBaseEntity, LockEntity):
"""Implementation of a device that has locking capabilities."""
# Operating on assumed state because there is no API to query the state.
_attr_assumed_state = True
_attr_supported_features = LockEntityFeature.OPEN
_attr_name = None
def __init__(
self, api_device_info: GetDeviceInfoResponse, api: IgloohomeApi, bridge_id: str
) -> None:
"""Initialize the class."""
super().__init__(
api_device_info=api_device_info,
api=api,
unique_key="lock",
)
self.bridge_id = bridge_id
async def async_lock(self, **kwargs):
"""Lock this lock."""
try:
await self.api.create_bridge_proxied_job(
self.api_device_info.deviceId, self.bridge_id, BRIDGE_JOB_LOCK
)
except (ApiException, ClientError) as err:
raise HomeAssistantError from err
async def async_unlock(self, **kwargs):
"""Unlock this lock."""
try:
await self.api.create_bridge_proxied_job(
self.api_device_info.deviceId, self.bridge_id, BRIDGE_JOB_UNLOCK
)
except (ApiException, ClientError) as err:
raise HomeAssistantError from err
async def async_open(self, **kwargs):
"""Open (unlatch) this lock."""
try:
await self.api.create_bridge_proxied_job(
self.api_device_info.deviceId, self.bridge_id, BRIDGE_JOB_UNLOCK
)
except (ApiException, ClientError) as err:
raise HomeAssistantError from err