"""Support for Tuya Smart devices.""" from __future__ import annotations import logging from typing import Any, NamedTuple from tuya_sharing import ( CustomerDevice, Manager, SharingDeviceListener, SharingTokenListener, ) from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant, callback from homeassistant.exceptions import ConfigEntryAuthFailed from homeassistant.helpers import device_registry as dr from homeassistant.helpers.dispatcher import dispatcher_send from .const import ( CONF_APP_TYPE, CONF_ENDPOINT, CONF_TERMINAL_ID, CONF_TOKEN_INFO, CONF_USER_CODE, DOMAIN, LOGGER, PLATFORMS, TUYA_CLIENT_ID, TUYA_DISCOVERY_NEW, TUYA_HA_SIGNAL_UPDATE_ENTITY, ) # Suppress logs from the library, it logs unneeded on error logging.getLogger("tuya_sharing").setLevel(logging.CRITICAL) class HomeAssistantTuyaData(NamedTuple): """Tuya data stored in the Home Assistant data object.""" manager: Manager listener: SharingDeviceListener async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Async setup hass config entry.""" if CONF_APP_TYPE in entry.data: raise ConfigEntryAuthFailed("Authentication failed. Please re-authenticate.") token_listener = TokenListener(hass, entry) manager = Manager( TUYA_CLIENT_ID, entry.data[CONF_USER_CODE], entry.data[CONF_TERMINAL_ID], entry.data[CONF_ENDPOINT], entry.data[CONF_TOKEN_INFO], token_listener, ) listener = DeviceListener(hass, manager) manager.add_device_listener(listener) # Get all devices from Tuya try: await hass.async_add_executor_job(manager.update_device_cache) except Exception as exc: # While in general, we should avoid catching broad exceptions, # we have no other way of detecting this case. if "sign invalid" in str(exc): msg = "Authentication failed. Please re-authenticate" raise ConfigEntryAuthFailed(msg) from exc raise # Connection is successful, store the manager & listener hass.data.setdefault(DOMAIN, {})[entry.entry_id] = HomeAssistantTuyaData( manager=manager, listener=listener ) # Cleanup device registry await cleanup_device_registry(hass, manager) # Register known device IDs device_registry = dr.async_get(hass) for device in manager.device_map.values(): device_registry.async_get_or_create( config_entry_id=entry.entry_id, identifiers={(DOMAIN, device.id)}, manufacturer="Tuya", name=device.name, model=f"{device.product_name} (unsupported)", ) await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) # If the device does not register any entities, the device does not need to subscribe # So the subscription is here await hass.async_add_executor_job(manager.refresh_mq) return True async def cleanup_device_registry(hass: HomeAssistant, device_manager: Manager) -> None: """Remove deleted device registry entry if there are no remaining entities.""" device_registry = dr.async_get(hass) for dev_id, device_entry in list(device_registry.devices.items()): for item in device_entry.identifiers: if item[0] == DOMAIN and item[1] not in device_manager.device_map: device_registry.async_remove_device(dev_id) break async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unloading the Tuya platforms.""" if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS): tuya: HomeAssistantTuyaData = hass.data[DOMAIN][entry.entry_id] if tuya.manager.mq is not None: tuya.manager.mq.stop() tuya.manager.remove_device_listener(tuya.listener) del hass.data[DOMAIN][entry.entry_id] return unload_ok async def async_remove_entry(hass: HomeAssistant, entry: ConfigEntry) -> None: """Remove a config entry. This will revoke the credentials from Tuya. """ manager = Manager( TUYA_CLIENT_ID, entry.data[CONF_USER_CODE], entry.data[CONF_TERMINAL_ID], entry.data[CONF_ENDPOINT], entry.data[CONF_TOKEN_INFO], ) await hass.async_add_executor_job(manager.unload) class DeviceListener(SharingDeviceListener): """Device Update Listener.""" def __init__( self, hass: HomeAssistant, manager: Manager, ) -> None: """Init DeviceListener.""" self.hass = hass self.manager = manager def update_device(self, device: CustomerDevice) -> None: """Update device status.""" LOGGER.debug( "Received update for device %s: %s", device.id, self.manager.device_map[device.id].status, ) dispatcher_send(self.hass, f"{TUYA_HA_SIGNAL_UPDATE_ENTITY}_{device.id}") def add_device(self, device: CustomerDevice) -> None: """Add device added listener.""" # Ensure the device isn't present stale self.hass.add_job(self.async_remove_device, device.id) dispatcher_send(self.hass, TUYA_DISCOVERY_NEW, [device.id]) def remove_device(self, device_id: str) -> None: """Add device removed listener.""" self.hass.add_job(self.async_remove_device, device_id) @callback def async_remove_device(self, device_id: str) -> None: """Remove device from Home Assistant.""" LOGGER.debug("Remove device: %s", device_id) device_registry = dr.async_get(self.hass) device_entry = device_registry.async_get_device( identifiers={(DOMAIN, device_id)} ) if device_entry is not None: device_registry.async_remove_device(device_entry.id) class TokenListener(SharingTokenListener): """Token listener for upstream token updates.""" def __init__( self, hass: HomeAssistant, entry: ConfigEntry, ) -> None: """Init TokenListener.""" self.hass = hass self.entry = entry def update_token(self, token_info: dict[str, Any]) -> None: """Update token info in config entry.""" data = { **self.entry.data, CONF_TOKEN_INFO: { "t": token_info["t"], "uid": token_info["uid"], "expire_time": token_info["expire_time"], "access_token": token_info["access_token"], "refresh_token": token_info["refresh_token"], }, } @callback def async_update_entry() -> None: """Update config entry.""" self.hass.config_entries.async_update_entry(self.entry, data=data) self.hass.add_job(async_update_entry)