core/homeassistant/components/binary_sensor/mystrom.py

96 lines
3.0 KiB
Python

"""
Support for the myStrom buttons.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/binary_sensor.mystrom/
"""
import asyncio
import logging
from homeassistant.components.binary_sensor import (BinarySensorDevice, DOMAIN)
from homeassistant.components.http import HomeAssistantView
from homeassistant.const import HTTP_UNPROCESSABLE_ENTITY
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['http']
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Set up myStrom Binary Sensor."""
hass.http.register_view(MyStromView(async_add_devices))
return True
class MyStromView(HomeAssistantView):
"""View to handle requests from myStrom buttons."""
url = '/api/mystrom'
name = 'api:mystrom'
def __init__(self, add_devices):
"""Initialize the myStrom URL endpoint."""
self.buttons = {}
self.add_devices = add_devices
@asyncio.coroutine
def get(self, request):
"""The GET request received from a myStrom button."""
res = yield from self._handle(request.app['hass'], request.query)
return res
@asyncio.coroutine
def _handle(self, hass, data):
"""Handle requests to the myStrom endpoint."""
button_action = list(data.keys())[0]
button_id = data[button_action]
entity_id = '{}.{}_{}'.format(DOMAIN, button_id, button_action)
if button_action not in ['single', 'double', 'long', 'touch']:
_LOGGER.error(
"Received unidentified message from myStrom button: %s", data)
return ("Received unidentified message: {}".format(data),
HTTP_UNPROCESSABLE_ENTITY)
if entity_id not in self.buttons:
_LOGGER.info("New myStrom button/action detected: %s/%s",
button_id, button_action)
self.buttons[entity_id] = MyStromBinarySensor(
'{}_{}'.format(button_id, button_action))
hass.async_add_job(self.add_devices, [self.buttons[entity_id]])
else:
new_state = True if self.buttons[entity_id].state == 'off' \
else False
self.buttons[entity_id].async_on_update(new_state)
class MyStromBinarySensor(BinarySensorDevice):
"""Representation of a myStrom button."""
def __init__(self, button_id):
"""Initialize the myStrom Binary sensor."""
self._button_id = button_id
self._state = None
@property
def name(self):
"""Return the name of the sensor."""
return self._button_id
@property
def should_poll(self):
"""No polling needed."""
return False
@property
def is_on(self):
"""Return true if the binary sensor is on."""
return self._state
def async_on_update(self, value):
"""Receive an update."""
self._state = value
self.hass.async_add_job(self.async_update_ha_state())