215 lines
6.7 KiB
Python
215 lines
6.7 KiB
Python
|
"""
|
||
|
A component which allows you to send data to the IBM Watson IoT Platform.
|
||
|
|
||
|
For more details about this component, please refer to the documentation at
|
||
|
https://home-assistant.io/components/watson_iot/
|
||
|
"""
|
||
|
|
||
|
import logging
|
||
|
import queue
|
||
|
import threading
|
||
|
import time
|
||
|
|
||
|
import voluptuous as vol
|
||
|
|
||
|
from homeassistant.const import (
|
||
|
CONF_DOMAINS, CONF_ENTITIES, CONF_EXCLUDE, CONF_INCLUDE,
|
||
|
CONF_TOKEN, CONF_TYPE, EVENT_STATE_CHANGED, EVENT_HOMEASSISTANT_STOP,
|
||
|
STATE_UNAVAILABLE, STATE_UNKNOWN)
|
||
|
from homeassistant.helpers import state as state_helper
|
||
|
import homeassistant.helpers.config_validation as cv
|
||
|
|
||
|
REQUIREMENTS = ['ibmiotf==0.3.4']
|
||
|
|
||
|
_LOGGER = logging.getLogger(__name__)
|
||
|
|
||
|
CONF_ORG = 'organization'
|
||
|
CONF_ID = 'id'
|
||
|
|
||
|
DOMAIN = 'watson_iot'
|
||
|
|
||
|
RETRY_DELAY = 20
|
||
|
MAX_TRIES = 3
|
||
|
|
||
|
CONFIG_SCHEMA = vol.Schema({
|
||
|
DOMAIN: vol.All(vol.Schema({
|
||
|
vol.Required(CONF_ORG): cv.string,
|
||
|
vol.Required(CONF_TYPE): cv.string,
|
||
|
vol.Required(CONF_ID): cv.string,
|
||
|
vol.Required(CONF_TOKEN): cv.string,
|
||
|
vol.Optional(CONF_EXCLUDE, default={}): vol.Schema({
|
||
|
vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids,
|
||
|
vol.Optional(CONF_DOMAINS, default=[]):
|
||
|
vol.All(cv.ensure_list, [cv.string])
|
||
|
}),
|
||
|
vol.Optional(CONF_INCLUDE, default={}): vol.Schema({
|
||
|
vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids,
|
||
|
vol.Optional(CONF_DOMAINS, default=[]):
|
||
|
vol.All(cv.ensure_list, [cv.string])
|
||
|
}),
|
||
|
})),
|
||
|
}, extra=vol.ALLOW_EXTRA)
|
||
|
|
||
|
|
||
|
def setup(hass, config):
|
||
|
"""Set up the Watson IoT Platform component."""
|
||
|
from ibmiotf import gateway
|
||
|
|
||
|
conf = config[DOMAIN]
|
||
|
|
||
|
include = conf[CONF_INCLUDE]
|
||
|
exclude = conf[CONF_EXCLUDE]
|
||
|
whitelist_e = set(include[CONF_ENTITIES])
|
||
|
whitelist_d = set(include[CONF_DOMAINS])
|
||
|
blacklist_e = set(exclude[CONF_ENTITIES])
|
||
|
blacklist_d = set(exclude[CONF_DOMAINS])
|
||
|
|
||
|
client_args = {
|
||
|
'org': conf[CONF_ORG],
|
||
|
'type': conf[CONF_TYPE],
|
||
|
'id': conf[CONF_ID],
|
||
|
'auth-method': 'token',
|
||
|
'auth-token': conf[CONF_TOKEN],
|
||
|
}
|
||
|
watson_gateway = gateway.Client(client_args)
|
||
|
|
||
|
def event_to_json(event):
|
||
|
"""Add an event to the outgoing list."""
|
||
|
state = event.data.get('new_state')
|
||
|
if state is None or state.state in (
|
||
|
STATE_UNKNOWN, '', STATE_UNAVAILABLE) or \
|
||
|
state.entity_id in blacklist_e or state.domain in blacklist_d:
|
||
|
return
|
||
|
|
||
|
if (whitelist_e and state.entity_id not in whitelist_e) or \
|
||
|
(whitelist_d and state.domain not in whitelist_d):
|
||
|
return
|
||
|
|
||
|
try:
|
||
|
_state_as_value = float(state.state)
|
||
|
except ValueError:
|
||
|
_state_as_value = None
|
||
|
|
||
|
if _state_as_value is None:
|
||
|
try:
|
||
|
_state_as_value = float(state_helper.state_as_number(state))
|
||
|
except ValueError:
|
||
|
_state_as_value = None
|
||
|
|
||
|
out_event = {
|
||
|
'tags': {
|
||
|
'domain': state.domain,
|
||
|
'entity_id': state.object_id,
|
||
|
},
|
||
|
'time': event.time_fired.isoformat(),
|
||
|
'fields': {
|
||
|
'state': state.state
|
||
|
}
|
||
|
}
|
||
|
if _state_as_value is not None:
|
||
|
out_event['fields']['state_value'] = _state_as_value
|
||
|
|
||
|
for key, value in state.attributes.items():
|
||
|
if key != 'unit_of_measurement':
|
||
|
# If the key is already in fields
|
||
|
if key in out_event['fields']:
|
||
|
key = key + "_"
|
||
|
# For each value we try to cast it as float
|
||
|
# But if we can not do it we store the value
|
||
|
# as string
|
||
|
try:
|
||
|
out_event['fields'][key] = float(value)
|
||
|
except (ValueError, TypeError):
|
||
|
out_event['fields'][key] = str(value)
|
||
|
|
||
|
return out_event
|
||
|
|
||
|
instance = hass.data[DOMAIN] = WatsonIOTThread(
|
||
|
hass, watson_gateway, event_to_json)
|
||
|
instance.start()
|
||
|
|
||
|
def shutdown(event):
|
||
|
"""Shut down the thread."""
|
||
|
instance.queue.put(None)
|
||
|
instance.join()
|
||
|
|
||
|
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
|
||
|
|
||
|
return True
|
||
|
|
||
|
|
||
|
class WatsonIOTThread(threading.Thread):
|
||
|
"""A threaded event handler class."""
|
||
|
|
||
|
def __init__(self, hass, gateway, event_to_json):
|
||
|
"""Initialize the listener."""
|
||
|
threading.Thread.__init__(self, name='WatsonIOT')
|
||
|
self.queue = queue.Queue()
|
||
|
self.gateway = gateway
|
||
|
self.gateway.connect()
|
||
|
self.event_to_json = event_to_json
|
||
|
self.write_errors = 0
|
||
|
self.shutdown = False
|
||
|
hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener)
|
||
|
|
||
|
def _event_listener(self, event):
|
||
|
"""Listen for new messages on the bus and queue them for Watson IOT."""
|
||
|
item = (time.monotonic(), event)
|
||
|
self.queue.put(item)
|
||
|
|
||
|
def get_events_json(self):
|
||
|
"""Return an event formatted for writing."""
|
||
|
events = []
|
||
|
|
||
|
try:
|
||
|
item = self.queue.get()
|
||
|
|
||
|
if item is None:
|
||
|
self.shutdown = True
|
||
|
else:
|
||
|
event_json = self.event_to_json(item[1])
|
||
|
if event_json:
|
||
|
events.append(event_json)
|
||
|
|
||
|
except queue.Empty:
|
||
|
pass
|
||
|
|
||
|
return events
|
||
|
|
||
|
def write_to_watson(self, events):
|
||
|
"""Write preprocessed events to watson."""
|
||
|
import ibmiotf
|
||
|
|
||
|
for event in events:
|
||
|
for retry in range(MAX_TRIES + 1):
|
||
|
try:
|
||
|
for field in event['fields']:
|
||
|
value = event['fields'][field]
|
||
|
device_success = self.gateway.publishDeviceEvent(
|
||
|
event['tags']['domain'],
|
||
|
event['tags']['entity_id'],
|
||
|
field, 'json', value)
|
||
|
if not device_success:
|
||
|
_LOGGER.error(
|
||
|
"Failed to publish message to watson iot")
|
||
|
continue
|
||
|
break
|
||
|
except (ibmiotf.MissingMessageEncoderException, IOError):
|
||
|
if retry < MAX_TRIES:
|
||
|
time.sleep(RETRY_DELAY)
|
||
|
else:
|
||
|
_LOGGER.exception(
|
||
|
"Failed to publish message to watson iot")
|
||
|
|
||
|
def run(self):
|
||
|
"""Process incoming events."""
|
||
|
while not self.shutdown:
|
||
|
event = self.get_events_json()
|
||
|
if event:
|
||
|
self.write_to_watson(event)
|
||
|
self.queue.task_done()
|
||
|
|
||
|
def block_till_done(self):
|
||
|
"""Block till all events processed."""
|
||
|
self.queue.join()
|