"""Support for Konnected devices.""" import copy import hmac import json import logging from aiohttp.hdrs import AUTHORIZATION from aiohttp.web import Request, Response import voluptuous as vol from homeassistant import config_entries from homeassistant.components.binary_sensor import DEVICE_CLASSES_SCHEMA from homeassistant.components.http import HomeAssistantView from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( ATTR_ENTITY_ID, CONF_ACCESS_TOKEN, CONF_BINARY_SENSORS, CONF_DEVICES, CONF_DISCOVERY, CONF_HOST, CONF_ID, CONF_NAME, CONF_PIN, CONF_PORT, CONF_REPEAT, CONF_SENSORS, CONF_SWITCHES, CONF_TYPE, CONF_ZONE, HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, STATE_OFF, STATE_ON, ) from homeassistant.core import HomeAssistant from homeassistant.helpers import config_validation as cv from .config_flow import ( # Loading the config flow file will register the flow CONF_DEFAULT_OPTIONS, CONF_IO, CONF_IO_BIN, CONF_IO_DIG, CONF_IO_SWI, OPTIONS_SCHEMA, ) from .const import ( CONF_ACTIVATION, CONF_API_HOST, CONF_BLINK, CONF_INVERSE, CONF_MOMENTARY, CONF_PAUSE, CONF_POLL_INTERVAL, DOMAIN, PIN_TO_ZONE, STATE_HIGH, STATE_LOW, UNDO_UPDATE_LISTENER, UPDATE_ENDPOINT, ZONE_TO_PIN, ZONES, ) from .handlers import HANDLERS from .panel import AlarmPanel _LOGGER = logging.getLogger(__name__) def ensure_pin(value): """Check if valid pin and coerce to string.""" if value is None: raise vol.Invalid("pin value is None") if PIN_TO_ZONE.get(str(value)) is None: raise vol.Invalid("pin not valid") return str(value) def ensure_zone(value): """Check if valid zone and coerce to string.""" if value is None: raise vol.Invalid("zone value is None") if str(value) not in ZONES is None: raise vol.Invalid("zone not valid") return str(value) def import_device_validator(config): """Validate zones and reformat for import.""" config = copy.deepcopy(config) io_cfgs = {} # Replace pins with zones for conf_platform, conf_io in ( (CONF_BINARY_SENSORS, CONF_IO_BIN), (CONF_SENSORS, CONF_IO_DIG), (CONF_SWITCHES, CONF_IO_SWI), ): for zone in config.get(conf_platform, []): if zone.get(CONF_PIN): zone[CONF_ZONE] = PIN_TO_ZONE[zone[CONF_PIN]] del zone[CONF_PIN] io_cfgs[zone[CONF_ZONE]] = conf_io # Migrate config_entry data into default_options structure config[CONF_IO] = io_cfgs config[CONF_DEFAULT_OPTIONS] = OPTIONS_SCHEMA(config) # clean up fields migrated to options config.pop(CONF_BINARY_SENSORS, None) config.pop(CONF_SENSORS, None) config.pop(CONF_SWITCHES, None) config.pop(CONF_BLINK, None) config.pop(CONF_DISCOVERY, None) config.pop(CONF_API_HOST, None) config.pop(CONF_IO, None) return config def import_validator(config): """Reformat for import.""" config = copy.deepcopy(config) # push api_host into device configs for device in config.get(CONF_DEVICES, []): device[CONF_API_HOST] = config.get(CONF_API_HOST, "") return config # configuration.yaml schemas (legacy) BINARY_SENSOR_SCHEMA_YAML = vol.All( vol.Schema( { vol.Exclusive(CONF_ZONE, "s_io"): ensure_zone, vol.Exclusive(CONF_PIN, "s_io"): ensure_pin, vol.Required(CONF_TYPE): DEVICE_CLASSES_SCHEMA, vol.Optional(CONF_NAME): cv.string, vol.Optional(CONF_INVERSE, default=False): cv.boolean, } ), cv.has_at_least_one_key(CONF_PIN, CONF_ZONE), ) SENSOR_SCHEMA_YAML = vol.All( vol.Schema( { vol.Exclusive(CONF_ZONE, "s_io"): ensure_zone, vol.Exclusive(CONF_PIN, "s_io"): ensure_pin, vol.Required(CONF_TYPE): vol.All(vol.Lower, vol.In(["dht", "ds18b20"])), vol.Optional(CONF_NAME): cv.string, vol.Optional(CONF_POLL_INTERVAL, default=3): vol.All( vol.Coerce(int), vol.Range(min=1) ), } ), cv.has_at_least_one_key(CONF_PIN, CONF_ZONE), ) SWITCH_SCHEMA_YAML = vol.All( vol.Schema( { vol.Exclusive(CONF_ZONE, "s_io"): ensure_zone, vol.Exclusive(CONF_PIN, "s_io"): ensure_pin, vol.Optional(CONF_NAME): cv.string, vol.Optional(CONF_ACTIVATION, default=STATE_HIGH): vol.All( vol.Lower, vol.Any(STATE_HIGH, STATE_LOW) ), vol.Optional(CONF_MOMENTARY): vol.All(vol.Coerce(int), vol.Range(min=10)), vol.Optional(CONF_PAUSE): vol.All(vol.Coerce(int), vol.Range(min=10)), vol.Optional(CONF_REPEAT): vol.All(vol.Coerce(int), vol.Range(min=-1)), } ), cv.has_at_least_one_key(CONF_PIN, CONF_ZONE), ) DEVICE_SCHEMA_YAML = vol.All( vol.Schema( { vol.Required(CONF_ID): cv.matches_regex("[0-9a-f]{12}"), vol.Optional(CONF_BINARY_SENSORS): vol.All( cv.ensure_list, [BINARY_SENSOR_SCHEMA_YAML] ), vol.Optional(CONF_SENSORS): vol.All(cv.ensure_list, [SENSOR_SCHEMA_YAML]), vol.Optional(CONF_SWITCHES): vol.All(cv.ensure_list, [SWITCH_SCHEMA_YAML]), vol.Inclusive(CONF_HOST, "host_info"): cv.string, vol.Inclusive(CONF_PORT, "host_info"): cv.port, vol.Optional(CONF_BLINK, default=True): cv.boolean, vol.Optional(CONF_API_HOST, default=""): vol.Any("", cv.url), vol.Optional(CONF_DISCOVERY, default=True): cv.boolean, } ), import_device_validator, ) # pylint: disable=no-value-for-parameter CONFIG_SCHEMA = vol.Schema( { DOMAIN: vol.All( import_validator, vol.Schema( { vol.Required(CONF_ACCESS_TOKEN): cv.string, vol.Optional(CONF_API_HOST): vol.Url(), vol.Optional(CONF_DEVICES): vol.All( cv.ensure_list, [DEVICE_SCHEMA_YAML] ), } ), ) }, extra=vol.ALLOW_EXTRA, ) YAML_CONFIGS = "yaml_configs" PLATFORMS = ["binary_sensor", "sensor", "switch"] async def async_setup(hass: HomeAssistant, config: dict): """Set up the Konnected platform.""" cfg = config.get(DOMAIN) if cfg is None: cfg = {} if DOMAIN not in hass.data: hass.data[DOMAIN] = { CONF_ACCESS_TOKEN: cfg.get(CONF_ACCESS_TOKEN), CONF_API_HOST: cfg.get(CONF_API_HOST), CONF_DEVICES: {}, } hass.http.register_view(KonnectedView) # Check if they have yaml configured devices if CONF_DEVICES not in cfg: return True for device in cfg.get(CONF_DEVICES, []): # Attempt to importing the cfg. Use # hass.async_add_job to avoid a deadlock. hass.async_create_task( hass.config_entries.flow.async_init( DOMAIN, context={"source": config_entries.SOURCE_IMPORT}, data=device ) ) return True async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry): """Set up panel from a config entry.""" client = AlarmPanel(hass, entry) # creates a panel data store in hass.data[DOMAIN][CONF_DEVICES] await client.async_save_data() # if the cfg entry was created we know we could connect to the panel at some point # async_connect will handle retries until it establishes a connection await client.async_connect() hass.config_entries.async_setup_platforms(entry, PLATFORMS) # config entry specific data to enable unload hass.data[DOMAIN][entry.entry_id] = { UNDO_UPDATE_LISTENER: entry.add_update_listener(async_entry_updated) } return True async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry): """Unload a config entry.""" unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) hass.data[DOMAIN][entry.entry_id][UNDO_UPDATE_LISTENER]() if unload_ok: hass.data[DOMAIN][CONF_DEVICES].pop(entry.data[CONF_ID]) hass.data[DOMAIN].pop(entry.entry_id) return unload_ok async def async_entry_updated(hass: HomeAssistant, entry: ConfigEntry): """Reload the config entry when options change.""" await hass.config_entries.async_reload(entry.entry_id) class KonnectedView(HomeAssistantView): """View creates an endpoint to receive push updates from the device.""" url = UPDATE_ENDPOINT name = "api:konnected" requires_auth = False # Uses access token from configuration def __init__(self): """Initialize the view.""" @staticmethod def binary_value(state, activation): """Return binary value for GPIO based on state and activation.""" if activation == STATE_HIGH: return 1 if state == STATE_ON else 0 return 0 if state == STATE_ON else 1 async def update_sensor(self, request: Request, device_id) -> Response: """Process a put or post.""" hass = request.app["hass"] data = hass.data[DOMAIN] auth = request.headers.get(AUTHORIZATION) tokens = [] if hass.data[DOMAIN].get(CONF_ACCESS_TOKEN): tokens.extend([hass.data[DOMAIN][CONF_ACCESS_TOKEN]]) tokens.extend( [ entry.data[CONF_ACCESS_TOKEN] for entry in hass.config_entries.async_entries(DOMAIN) if entry.data.get(CONF_ACCESS_TOKEN) ] ) if auth is None or not next( (True for token in tokens if hmac.compare_digest(f"Bearer {token}", auth)), False, ): return self.json_message("unauthorized", status_code=HTTP_UNAUTHORIZED) try: # Konnected 2.2.0 and above supports JSON payloads payload = await request.json() except json.decoder.JSONDecodeError: _LOGGER.error( "Your Konnected device software may be out of " "date. Visit https://help.konnected.io for " "updating instructions" ) device = data[CONF_DEVICES].get(device_id) if device is None: return self.json_message( "unregistered device", status_code=HTTP_BAD_REQUEST ) panel = device.get("panel") if panel is not None: # connect if we haven't already hass.async_create_task(panel.async_connect()) try: zone_num = str(payload.get(CONF_ZONE) or PIN_TO_ZONE[payload[CONF_PIN]]) payload[CONF_ZONE] = zone_num zone_data = ( device[CONF_BINARY_SENSORS].get(zone_num) or next( (s for s in device[CONF_SWITCHES] if s[CONF_ZONE] == zone_num), None ) or next( (s for s in device[CONF_SENSORS] if s[CONF_ZONE] == zone_num), None ) ) except KeyError: zone_data = None if zone_data is None: return self.json_message( "unregistered sensor/actuator", status_code=HTTP_BAD_REQUEST ) zone_data["device_id"] = device_id for attr in ["state", "temp", "humi", "addr"]: value = payload.get(attr) handler = HANDLERS.get(attr) if value is not None and handler: hass.async_create_task(handler(hass, zone_data, payload)) return self.json_message("ok") async def get(self, request: Request, device_id) -> Response: """Return the current binary state of a switch.""" hass = request.app["hass"] data = hass.data[DOMAIN] device = data[CONF_DEVICES].get(device_id) if not device: return self.json_message( f"Device {device_id} not configured", status_code=HTTP_NOT_FOUND ) panel = device.get("panel") if panel is not None: # connect if we haven't already hass.async_create_task(panel.async_connect()) # Our data model is based on zone ids but we convert from/to pin ids # based on whether they are specified in the request try: zone_num = str( request.query.get(CONF_ZONE) or PIN_TO_ZONE[request.query[CONF_PIN]] ) zone = next( switch for switch in device[CONF_SWITCHES] if switch[CONF_ZONE] == zone_num ) except StopIteration: zone = None except KeyError: zone = None zone_num = None if not zone: target = request.query.get( CONF_ZONE, request.query.get(CONF_PIN, "unknown") ) return self.json_message( f"Switch on zone or pin {target} not configured", status_code=HTTP_NOT_FOUND, ) resp = {} if request.query.get(CONF_ZONE): resp[CONF_ZONE] = zone_num else: resp[CONF_PIN] = ZONE_TO_PIN[zone_num] # Make sure entity is setup zone_entity_id = zone.get(ATTR_ENTITY_ID) if zone_entity_id: resp["state"] = self.binary_value( hass.states.get(zone_entity_id).state, zone[CONF_ACTIVATION] ) return self.json(resp) _LOGGER.warning("Konnected entity not yet setup, returning default") resp["state"] = self.binary_value(STATE_OFF, zone[CONF_ACTIVATION]) return self.json(resp) async def put(self, request: Request, device_id) -> Response: """Receive a sensor update via PUT request and async set state.""" return await self.update_sensor(request, device_id) async def post(self, request: Request, device_id) -> Response: """Receive a sensor update via POST request and async set state.""" return await self.update_sensor(request, device_id)