core/homeassistant/components/tuya/__init__.py

196 lines
6.3 KiB
Python

"""Support for Tuya Smart devices."""
from __future__ import annotations
from typing import NamedTuple
import requests
from tuya_iot import (
AuthType,
TuyaDevice,
TuyaDeviceListener,
TuyaDeviceManager,
TuyaHomeManager,
TuyaOpenAPI,
TuyaOpenMQ,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_COUNTRY_CODE, CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.dispatcher import dispatcher_send
from .const import (
CONF_ACCESS_ID,
CONF_ACCESS_SECRET,
CONF_APP_TYPE,
CONF_AUTH_TYPE,
CONF_ENDPOINT,
DOMAIN,
LOGGER,
PLATFORMS,
TUYA_DISCOVERY_NEW,
TUYA_HA_SIGNAL_UPDATE_ENTITY,
)
class HomeAssistantTuyaData(NamedTuple):
"""Tuya data stored in the Home Assistant data object."""
device_listener: TuyaDeviceListener
device_manager: TuyaDeviceManager
home_manager: TuyaHomeManager
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Async setup hass config entry."""
hass.data.setdefault(DOMAIN, {})
auth_type = AuthType(entry.data[CONF_AUTH_TYPE])
api = TuyaOpenAPI(
endpoint=entry.data[CONF_ENDPOINT],
access_id=entry.data[CONF_ACCESS_ID],
access_secret=entry.data[CONF_ACCESS_SECRET],
auth_type=auth_type,
)
api.set_dev_channel("hass")
try:
if auth_type == AuthType.CUSTOM:
response = await hass.async_add_executor_job(
api.connect, entry.data[CONF_USERNAME], entry.data[CONF_PASSWORD]
)
else:
response = await hass.async_add_executor_job(
api.connect,
entry.data[CONF_USERNAME],
entry.data[CONF_PASSWORD],
entry.data[CONF_COUNTRY_CODE],
entry.data[CONF_APP_TYPE],
)
except requests.exceptions.RequestException as err:
raise ConfigEntryNotReady(err) from err
if response.get("success", False) is False:
raise ConfigEntryNotReady(response)
tuya_mq = TuyaOpenMQ(api)
tuya_mq.start()
device_ids: set[str] = set()
device_manager = TuyaDeviceManager(api, tuya_mq)
home_manager = TuyaHomeManager(api, tuya_mq, device_manager)
listener = DeviceListener(hass, device_manager, device_ids)
device_manager.add_device_listener(listener)
hass.data[DOMAIN][entry.entry_id] = HomeAssistantTuyaData(
device_listener=listener,
device_manager=device_manager,
home_manager=home_manager,
)
# Get devices & clean up device entities
await hass.async_add_executor_job(home_manager.update_device_cache)
await cleanup_device_registry(hass, device_manager)
# Register known device IDs
device_registry = dr.async_get(hass)
for device in device_manager.device_map.values():
device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
identifiers={(DOMAIN, device.id)},
manufacturer="Tuya",
name=device.name,
model=f"{device.product_name} (unsupported)",
)
device_ids.add(device.id)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def cleanup_device_registry(
hass: HomeAssistant, device_manager: TuyaDeviceManager
) -> None:
"""Remove deleted device registry entry if there are no remaining entities."""
device_registry = dr.async_get(hass)
for dev_id, device_entry in list(device_registry.devices.items()):
for item in device_entry.identifiers:
if item[0] == DOMAIN and item[1] not in device_manager.device_map:
device_registry.async_remove_device(dev_id)
break
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unloading the Tuya platforms."""
unload = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload:
hass_data: HomeAssistantTuyaData = hass.data[DOMAIN][entry.entry_id]
hass_data.device_manager.mq.stop()
hass_data.device_manager.remove_device_listener(hass_data.device_listener)
hass.data[DOMAIN].pop(entry.entry_id)
if not hass.data[DOMAIN]:
hass.data.pop(DOMAIN)
return unload
class DeviceListener(TuyaDeviceListener):
"""Device Update Listener."""
def __init__(
self,
hass: HomeAssistant,
device_manager: TuyaDeviceManager,
device_ids: set[str],
) -> None:
"""Init DeviceListener."""
self.hass = hass
self.device_manager = device_manager
self.device_ids = device_ids
def update_device(self, device: TuyaDevice) -> None:
"""Update device status."""
if device.id in self.device_ids:
LOGGER.debug(
"Received update for device %s: %s",
device.id,
self.device_manager.device_map[device.id].status,
)
dispatcher_send(self.hass, f"{TUYA_HA_SIGNAL_UPDATE_ENTITY}_{device.id}")
def add_device(self, device: TuyaDevice) -> None:
"""Add device added listener."""
# Ensure the device isn't present stale
self.hass.add_job(self.async_remove_device, device.id)
self.device_ids.add(device.id)
dispatcher_send(self.hass, TUYA_DISCOVERY_NEW, [device.id])
device_manager = self.device_manager
device_manager.mq.stop()
tuya_mq = TuyaOpenMQ(device_manager.api)
tuya_mq.start()
device_manager.mq = tuya_mq
tuya_mq.add_message_listener(device_manager.on_message)
def remove_device(self, device_id: str) -> None:
"""Add device removed listener."""
self.hass.add_job(self.async_remove_device, device_id)
@callback
def async_remove_device(self, device_id: str) -> None:
"""Remove device from Home Assistant."""
LOGGER.debug("Remove device: %s", device_id)
device_registry = dr.async_get(self.hass)
device_entry = device_registry.async_get_device(
identifiers={(DOMAIN, device_id)}
)
if device_entry is not None:
device_registry.async_remove_device(device_entry.id)
self.device_ids.discard(device_id)