"""Component to embed TP-Link smart home devices.""" from __future__ import annotations from datetime import timedelta import logging import time from typing import Any from pyHS100.smartdevice import SmartDevice, SmartDeviceException from pyHS100.smartplug import SmartPlug import voluptuous as vol from homeassistant import config_entries from homeassistant.components.switch import ATTR_CURRENT_POWER_W, ATTR_TODAY_ENERGY_KWH from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( ATTR_VOLTAGE, CONF_ALIAS, CONF_DEVICE_ID, CONF_HOST, CONF_MAC, CONF_STATE, ) from homeassistant.core import HomeAssistant from homeassistant.helpers import device_registry as dr import homeassistant.helpers.config_validation as cv from homeassistant.helpers.event import async_track_time_interval from homeassistant.helpers.typing import ConfigType from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from .common import SmartDevices, async_discover_devices, get_static_devices from .const import ( ATTR_CONFIG, ATTR_CURRENT_A, ATTR_TOTAL_ENERGY_KWH, CONF_DIMMER, CONF_DISCOVERY, CONF_EMETER_PARAMS, CONF_LIGHT, CONF_MODEL, CONF_STRIP, CONF_SW_VERSION, CONF_SWITCH, COORDINATORS, PLATFORMS, UNAVAILABLE_DEVICES, UNAVAILABLE_RETRY_DELAY, ) _LOGGER = logging.getLogger(__name__) DOMAIN = "tplink" TPLINK_HOST_SCHEMA = vol.Schema({vol.Required(CONF_HOST): cv.string}) CONFIG_SCHEMA = vol.Schema( { DOMAIN: vol.Schema( { vol.Optional(CONF_LIGHT, default=[]): vol.All( cv.ensure_list, [TPLINK_HOST_SCHEMA] ), vol.Optional(CONF_SWITCH, default=[]): vol.All( cv.ensure_list, [TPLINK_HOST_SCHEMA] ), vol.Optional(CONF_STRIP, default=[]): vol.All( cv.ensure_list, [TPLINK_HOST_SCHEMA] ), vol.Optional(CONF_DIMMER, default=[]): vol.All( cv.ensure_list, [TPLINK_HOST_SCHEMA] ), vol.Optional(CONF_DISCOVERY, default=True): cv.boolean, } ) }, extra=vol.ALLOW_EXTRA, ) async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the TP-Link component.""" conf = config.get(DOMAIN) hass.data[DOMAIN] = {} hass.data[DOMAIN][ATTR_CONFIG] = conf if conf is not None: hass.async_create_task( hass.config_entries.flow.async_init( DOMAIN, context={"source": config_entries.SOURCE_IMPORT} ) ) return True async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up TPLink from a config entry.""" config_data = hass.data[DOMAIN].get(ATTR_CONFIG) if config_data is None and entry.data: config_data = entry.data elif config_data is not None: hass.config_entries.async_update_entry(entry, data=config_data) device_registry = dr.async_get(hass) tplink_devices = dr.async_entries_for_config_entry(device_registry, entry.entry_id) device_count = len(tplink_devices) hass_data: dict[str, Any] = hass.data[DOMAIN] # These will contain the initialized devices hass_data[CONF_LIGHT] = [] hass_data[CONF_SWITCH] = [] hass_data[UNAVAILABLE_DEVICES] = [] lights: list[SmartDevice] = hass_data[CONF_LIGHT] switches: list[SmartPlug] = hass_data[CONF_SWITCH] unavailable_devices: list[SmartDevice] = hass_data[UNAVAILABLE_DEVICES] # Add static devices static_devices = SmartDevices() if config_data is not None: static_devices = get_static_devices(config_data) lights.extend(static_devices.lights) switches.extend(static_devices.switches) # Add discovered devices if config_data is None or config_data[CONF_DISCOVERY]: discovered_devices = await async_discover_devices( hass, static_devices, device_count ) lights.extend(discovered_devices.lights) switches.extend(discovered_devices.switches) if lights: _LOGGER.debug( "Got %s lights: %s", len(lights), ", ".join(d.host for d in lights) ) if switches: _LOGGER.debug( "Got %s switches: %s", len(switches), ", ".join(d.host for d in switches), ) async def async_retry_devices(self) -> None: """Retry unavailable devices.""" unavailable_devices: list[SmartDevice] = hass_data[UNAVAILABLE_DEVICES] _LOGGER.debug( "retry during setup unavailable devices: %s", [d.host for d in unavailable_devices], ) for device in unavailable_devices: try: await hass.async_add_executor_job(device.get_sysinfo) except SmartDeviceException: continue _LOGGER.debug( "at least one device is available again, so reload integration" ) await hass.config_entries.async_reload(entry.entry_id) break # prepare DataUpdateCoordinators hass_data[COORDINATORS] = {} for switch in switches: try: info = await hass.async_add_executor_job(switch.get_sysinfo) except SmartDeviceException: _LOGGER.warning( "Device at '%s' not reachable during setup, will retry later", switch.host, ) unavailable_devices.append(switch) continue hass_data[COORDINATORS][ switch.context or switch.mac ] = coordinator = SmartPlugDataUpdateCoordinator(hass, switch, info["alias"]) await coordinator.async_config_entry_first_refresh() if unavailable_devices: entry.async_on_unload( async_track_time_interval( hass, async_retry_devices, UNAVAILABLE_RETRY_DELAY ) ) unavailable_devices_hosts = [d.host for d in unavailable_devices] hass_data[CONF_SWITCH] = [ s for s in switches if s.host not in unavailable_devices_hosts ] hass.config_entries.async_setup_platforms(entry, PLATFORMS) return True async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unload a config entry.""" unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) hass_data: dict[str, Any] = hass.data[DOMAIN] if unload_ok: hass_data.clear() return unload_ok class SmartPlugDataUpdateCoordinator(DataUpdateCoordinator): """DataUpdateCoordinator to gather data for specific SmartPlug.""" def __init__( self, hass: HomeAssistant, smartplug: SmartPlug, alias: str, ) -> None: """Initialize DataUpdateCoordinator to gather data for specific SmartPlug.""" self.smartplug = smartplug update_interval = timedelta(seconds=30) super().__init__( hass, _LOGGER, name=alias, update_interval=update_interval, ) def _update_data(self) -> dict: """Fetch all device and sensor data from api.""" try: info = self.smartplug.sys_info data = { CONF_HOST: self.smartplug.host, CONF_MAC: info["mac"], CONF_MODEL: info["model"], CONF_SW_VERSION: info["sw_ver"], } if self.smartplug.context is None: data[CONF_ALIAS] = info["alias"] data[CONF_DEVICE_ID] = info["mac"] data[CONF_STATE] = bool(info["relay_state"]) else: plug_from_context = next( c for c in self.smartplug.sys_info["children"] if c["id"] == self.smartplug.context ) data[CONF_ALIAS] = plug_from_context["alias"] data[CONF_DEVICE_ID] = self.smartplug.context data[CONF_STATE] = plug_from_context["state"] == 1 # Check if the device has emeter if "ENE" in info["feature"]: emeter_readings = self.smartplug.get_emeter_realtime() data[CONF_EMETER_PARAMS] = { ATTR_CURRENT_POWER_W: round(float(emeter_readings["power"]), 2), ATTR_TOTAL_ENERGY_KWH: round(float(emeter_readings["total"]), 3), ATTR_VOLTAGE: round(float(emeter_readings["voltage"]), 1), ATTR_CURRENT_A: round(float(emeter_readings["current"]), 2), } emeter_statics = self.smartplug.get_emeter_daily() if emeter_statics.get(int(time.strftime("%e"))): data[CONF_EMETER_PARAMS][ATTR_TODAY_ENERGY_KWH] = round( float(emeter_statics[int(time.strftime("%e"))]), 3 ) else: # today's consumption not available, when device was off all the day data[CONF_EMETER_PARAMS][ATTR_TODAY_ENERGY_KWH] = 0.0 except SmartDeviceException as ex: raise UpdateFailed(ex) from ex self.name = data[CONF_ALIAS] return data async def _async_update_data(self) -> dict: """Fetch all device and sensor data from api.""" return await self.hass.async_add_executor_job(self._update_data)