398 lines
14 KiB
Python
398 lines
14 KiB
Python
# -*- coding: iso-8859-15 -*-
|
|
|
|
#
|
|
# Copyright (c) 2010-2019 Contributors to the openHAB project
|
|
#
|
|
# See the NOTICE file(s) distributed with this work for additional
|
|
# information.
|
|
#
|
|
# This program and the accompanying materials are made available under the
|
|
# terms of the Eclipse Public License 2.0 which is available at
|
|
# http://www.eclipse.org/legal/epl-2.0
|
|
#
|
|
# SPDX-License-Identifier: EPL-2.0
|
|
#
|
|
|
|
from os.path import dirname
|
|
|
|
from adapt.intent import IntentBuilder
|
|
from mycroft.skills.core import MycroftSkill, intent_handler
|
|
from mycroft.util.log import getLogger
|
|
from rapidfuzz import fuzz
|
|
|
|
import requests
|
|
import json
|
|
|
|
# v 0.1 - just switch on and switch off a fix light
|
|
# v 0.2 - code review
|
|
# v 0.3 - first working version on fixed light item
|
|
# v 0.4 - getTaggedItems method in order to get all the tagged items from openHAB
|
|
# v 0.5 - refresh tagged item intent
|
|
# v 0.6 - add findItemName method and import fuzzywuzzy
|
|
# v 0.7 - add intent for switchable items
|
|
# v 0.8 - merged lighting and switchable intent in onoff intent
|
|
# v 0.9 - added support to dimmable items
|
|
# v 1.0 - added Thermostat tag support
|
|
# v 1.1 - added what status Switchable tag
|
|
# v 1.2 - support to python 3.0
|
|
# v 1.3 - support german
|
|
# v 1.4 - support spanish
|
|
# v 1.5 - support to 19
|
|
|
|
|
|
__author__ = 'mortommy'
|
|
|
|
LOGGER = getLogger(__name__)
|
|
|
|
class openHABSkill(MycroftSkill):
|
|
|
|
def __init__(self):
|
|
super(openHABSkill, self).__init__(name="openHABSkill")
|
|
|
|
self.command_headers = {"Content-type": "text/plain"}
|
|
|
|
self.polling_headers = {"Accept": "application/json"}
|
|
|
|
self.url = None
|
|
self.lightingItemsDic = dict()
|
|
self.switchableItemsDic = dict()
|
|
self.currentTempItemsDic = dict()
|
|
self.currentHumItemsDic = dict()
|
|
#self.currentThermostatItemsDic = dict()
|
|
self.targetTemperatureItemsDic = dict()
|
|
#self.homekitHeatingCoolingModeDic = dict()
|
|
|
|
def initialize(self):
|
|
|
|
supported_languages = ["en-us", "it-it", "de-de", "es-es"]
|
|
|
|
if self.lang not in supported_languages:
|
|
self.log.warning("Unsupported language for " + self.name + ", shutting down skill.")
|
|
self.shutdown()
|
|
|
|
self.handle_websettings_update()
|
|
|
|
if self.url is not None:
|
|
self.getTaggedItems()
|
|
else:
|
|
self.speak_dialog('ConfigurationNeeded')
|
|
|
|
refresh_tagged_items_intent = IntentBuilder("RefreshTaggedItemsIntent").require("RefreshTaggedItemsKeyword").build()
|
|
self.register_intent(refresh_tagged_items_intent, self.handle_refresh_tagged_items_intent)
|
|
|
|
onoff_status_intent = IntentBuilder("OnOff_StatusIntent").require("OnOffStatusKeyword").require("Command").require("Item").build()
|
|
self.register_intent(onoff_status_intent, self.handle_onoff_status_intent)
|
|
|
|
dimmer_status_intent = IntentBuilder("Dimmer_StatusIntent").require("DimmerStatusKeyword").require("Item").optionally("BrightPercentage").build()
|
|
self.register_intent(dimmer_status_intent, self.handle_dimmer_status_intent)
|
|
|
|
#what_status_intent = IntentBuilder("What_StatusIntent").require("WhatStatusKeyword").require("Item").require("RequestType").build()
|
|
#self.register_intent(what_status_intent, self.handle_what_status_intent)
|
|
self.register_entity_file('item.entity')
|
|
self.register_entity_file('requesttype.entity')
|
|
self.register_intent_file('what.status.intent',self.handle_what_status_intent)
|
|
|
|
setTemp_status_intent = IntentBuilder("SetTemp_StatusIntent").require("ThermostatStatusKeyword").require("Item").require("TempValue").build()
|
|
self.register_intent(setTemp_status_intent, self.handle_setTemp_status_intent)
|
|
|
|
list_items_intent = IntentBuilder("ListItemsIntent").require("ListItemsKeyword").build()
|
|
self.register_intent(list_items_intent, self.handle_list_items_intent)
|
|
|
|
self.settings_change_callback = self.handle_websettings_update
|
|
|
|
def get_config(self, key):
|
|
return (self.settings.get(key) or self.config_core.get('openHABSkill', {}).get(key))
|
|
|
|
def handle_websettings_update(self):
|
|
if self.get_config('host') is not None and self.get_config('port') is not None:
|
|
self.url = "http://%s:%s/rest" % (self.get_config('host'), self.get_config('port'))
|
|
self.getTaggedItems()
|
|
else:
|
|
self.url = None
|
|
|
|
def getTaggedItems(self):
|
|
#find all the items tagged Lighting and Switchable from openHAB
|
|
#the labeled items are stored in dictionaries
|
|
|
|
self.lightingItemsDic = {}
|
|
self.switchableItemsDic = {}
|
|
self.currentTempItemsDic = {}
|
|
self.currentHumItemsDic = {}
|
|
self.currentThermostatItemsDic = {}
|
|
self.targetTemperatureItemsDic = {}
|
|
self.homekitHeatingCoolingModeDic = {}
|
|
|
|
if self.url == None:
|
|
LOGGER.error("Configuration needed!")
|
|
self.speak_dialog('ConfigurationNeeded')
|
|
else:
|
|
requestUrl = self.url+"/items?recursive=false"
|
|
|
|
try:
|
|
req = requests.get(requestUrl, headers=self.polling_headers)
|
|
if req.status_code == 200:
|
|
json_response = req.json()
|
|
for x in range(0,len(json_response)):
|
|
if ("Lighting" in json_response[x]['tags']):
|
|
self.lightingItemsDic.update({json_response[x]['name']: json_response[x]['label']})
|
|
elif ("Switchable" in json_response[x]['tags']):
|
|
self.switchableItemsDic.update({json_response[x]['name']: json_response[x]['label']})
|
|
elif ("CurrentTemperature" in json_response[x]['tags']):
|
|
self.currentTempItemsDic.update({json_response[x]['name']: json_response[x]['label']})
|
|
elif ("CurrentHumidity" in json_response[x]['tags']):
|
|
self.currentHumItemsDic.update({json_response[x]['name']: json_response[x]['label']})
|
|
elif ("Thermostat" in json_response[x]['tags']):
|
|
self.currentThermostatItemsDic.update({json_response[x]['name']: json_response[x]['label']})
|
|
elif ("TargetTemperature" in json_response[x]['tags']):
|
|
self.targetTemperatureItemsDic.update({json_response[x]['name']: json_response[x]['label']})
|
|
elif ("homekit:HeatingCoolingMode" in json_response[x]['tags']):
|
|
self.homekitHeatingCoolingModeDic.update({json_response[x]['name']: json_response[x]['label']})
|
|
else:
|
|
pass
|
|
else:
|
|
LOGGER.error("Some issues with the command execution!")
|
|
self.speak_dialog('GetItemsListError')
|
|
|
|
except KeyError:
|
|
pass
|
|
except Exception:
|
|
LOGGER.error("Some issues with the command execution!")
|
|
self.speak_dialog('GetItemsListError')
|
|
|
|
def findItemName(self, itemDictionary, messageItem):
|
|
|
|
bestScore = 0
|
|
score = 0
|
|
bestItem = None
|
|
|
|
try:
|
|
for itemName, itemLabel in list(itemDictionary.items()):
|
|
score = fuzz.ratio(messageItem, itemLabel, score_cutoff=bestScore)
|
|
if score > bestScore:
|
|
bestScore = score
|
|
bestItem = itemName
|
|
except KeyError:
|
|
pass
|
|
|
|
return bestItem
|
|
|
|
|
|
def getItemsFromDict(self, typeStr, itemsDict):
|
|
if len(itemsDict) == 0:
|
|
return ""
|
|
else:
|
|
return "%s: %s" % (typeStr, ', '.join(list(itemsDict.values())))
|
|
|
|
def handle_list_items_intent(self, message):
|
|
msg = self.getItemsFromDict("Lights", self.lightingItemsDic) + "\n"
|
|
msg = msg.strip() + ' ' + self.getItemsFromDict("Switches", self.switchableItemsDic) + "\n"
|
|
msg = msg.strip() + ' ' + self.getItemsFromDict("Current Temperature", self.currentTempItemsDic) + "\n"
|
|
msg = msg.strip() + ' ' + self.getItemsFromDict("Current Humidity", self.currentHumItemsDic) + "\n"
|
|
msg = msg.strip() + ' ' + self.getItemsFromDict("Thermostat", self.currentThermostatItemsDic) + "\n"
|
|
msg = msg.strip() + ' ' + self.getItemsFromDict("Target Temperature", self.targetTemperatureItemsDic) + "\n"
|
|
msg = msg.strip() + ' ' + self.getItemsFromDict("Homekit Heating and Cooling", self.homekitHeatingCoolingModeDic)
|
|
self.speak_dialog('FoundItems', {'items': msg.strip()})
|
|
|
|
def handle_refresh_tagged_items_intent(self, message):
|
|
#to refresh the openHAB items labeled list we use an intent, we can ask Mycroft to make the refresh
|
|
|
|
self.getTaggedItems()
|
|
dictLenght = str(len(self.lightingItemsDic) + len(self.switchableItemsDic) + len(self.currentTempItemsDic) + len(self.currentHumItemsDic) + len(self.currentThermostatItemsDic) + len(self.targetTemperatureItemsDic) + len(self.homekitHeatingCoolingModeDic))
|
|
self.speak_dialog('RefreshTaggedItems', {'number_item': dictLenght})
|
|
|
|
def handle_onoff_status_intent(self, message):
|
|
command = message.data.get('Command')
|
|
messageItem = message.data.get('Item')
|
|
|
|
#We have to find the item to update from our dictionaries
|
|
self.lightingSwitchableItemsDic = dict()
|
|
self.lightingSwitchableItemsDic.update(self.lightingItemsDic)
|
|
self.lightingSwitchableItemsDic.update(self.switchableItemsDic)
|
|
|
|
ohItem = self.findItemName(self.lightingSwitchableItemsDic, messageItem)
|
|
|
|
if ohItem != None:
|
|
if (command != "on") and (command != "off"):
|
|
self.speak_dialog('ErrorDialog')
|
|
else:
|
|
statusCode = self.sendCommandToItem(ohItem, command.upper())
|
|
if statusCode == 200:
|
|
self.speak_dialog('StatusOnOff', {'command': command, 'item': messageItem})
|
|
elif statusCode == 404:
|
|
LOGGER.error("Some issues with the command execution!. Item not found")
|
|
self.speak_dialog('ItemNotFoundError')
|
|
else:
|
|
LOGGER.error("Some issues with the command execution!")
|
|
self.speak_dialog('CommunicationError')
|
|
else:
|
|
LOGGER.error("Item not found!")
|
|
self.speak_dialog('ItemNotFoundError')
|
|
|
|
def handle_dimmer_status_intent(self, message):
|
|
command = message.data.get('DimmerStatusKeyword')
|
|
messageItem = message.data.get('Item')
|
|
brightValue = message.data.get('BrightPercentage', None)
|
|
|
|
statusCode = 0
|
|
newBrightValue = 0
|
|
|
|
ohItem = self.findItemName(self.lightingItemsDic, messageItem)
|
|
|
|
if ohItem != None:
|
|
#if ((command == "set") or (command == "imposta") or (command == "setze") or (command == "pone")):
|
|
if self.voc_match(command, 'Set'):
|
|
if ((brightValue == None) or (int(brightValue) < 0) or (int(brightValue) > 100)):
|
|
self.speak_dialog('ErrorDialog')
|
|
else:
|
|
statusCode = self.sendCommandToItem(ohItem, brightValue)
|
|
else:
|
|
#find current item statusCode
|
|
state = self.getCurrentItemStatus(ohItem)
|
|
if (state != None):
|
|
#dim or brighten the value
|
|
curBrightList = state.split(',')
|
|
curBright = int(curBrightList[len(curBrightList)-1])
|
|
|
|
if(brightValue == None):
|
|
brightValue = "10"
|
|
|
|
#if ((command == "dim") or (command == "abbassa") or (command == "dimme") or (command == "oscurece")):
|
|
if self.voc_match(command, 'Dim'):
|
|
newBrightValue = curBright-(int(brightValue))
|
|
else:
|
|
newBrightValue = curBright+(int(brightValue))
|
|
|
|
if (newBrightValue < 0):
|
|
newBrightValue = 0
|
|
elif (newBrightValue > 100):
|
|
newBrightValue = 100
|
|
else:
|
|
pass
|
|
|
|
#send command to item
|
|
statusCode = self.sendCommandToItem(ohItem, str(newBrightValue))
|
|
else:
|
|
pass
|
|
|
|
if statusCode == 200:
|
|
self.speak_dialog('StatusDimmer', {'item': messageItem})
|
|
elif statusCode == 404:
|
|
LOGGER.error("Some issues with the command execution!. Item not found")
|
|
self.speak_dialog('ItemNotFoundError')
|
|
else:
|
|
LOGGER.error("Some issues with the command execution!")
|
|
self.speak_dialog('CommunicationError')
|
|
|
|
else:
|
|
LOGGER.error("Item not found!")
|
|
self.speak_dialog('ItemNotFoundError')
|
|
|
|
def handle_what_status_intent(self, message):
|
|
|
|
messageItem = message.data.get('item')
|
|
LOGGER.debug("Item: %s" % (messageItem))
|
|
requestType = message.data.get('requesttype')
|
|
LOGGER.debug("Request Type: %s" % (requestType))
|
|
|
|
unitOfMeasure = self.translate('Degree')
|
|
infoType = self.translate('Temperature')
|
|
|
|
self.currStatusItemsDic = dict()
|
|
|
|
if self.voc_match(requestType, 'Temperature'):
|
|
self.currStatusItemsDic.update(self.currentTempItemsDic)
|
|
elif self.voc_match(requestType, 'Humidity'):
|
|
unitOfMeasure = self.translate('Percentage')
|
|
infoType = self.translate('Humidity')
|
|
self.currStatusItemsDic.update(self.currentHumItemsDic)
|
|
elif self.voc_match(requestType, 'Status'):
|
|
infoType = self.translate('Status')
|
|
unitOfMeasure = ""
|
|
self.currStatusItemsDic.update(self.switchableItemsDic)
|
|
else:
|
|
self.currStatusItemsDic.update(self.targetTemperatureItemsDic)
|
|
|
|
ohItem = self.findItemName(self.currStatusItemsDic, messageItem)
|
|
|
|
if ohItem != None:
|
|
state = self.getCurrentItemStatus(ohItem)
|
|
self.speak_dialog('TempHumStatus', {'item': messageItem, 'temp_hum': infoType, 'temp_hum_val': state, 'units_of_measurement': unitOfMeasure})
|
|
else:
|
|
LOGGER.error("Item not found!")
|
|
self.speak_dialog('ItemNotFoundError')
|
|
|
|
def handle_setTemp_status_intent(self, message):
|
|
command = message.data.get('ThermostatStatusKeyword')
|
|
messageItem = message.data.get('Item')
|
|
tempVal = message.data.get('TempValue')
|
|
|
|
statusCode = 0
|
|
newTempValue = 0
|
|
|
|
ohItem = self.findItemName(self.targetTemperatureItemsDic, messageItem)
|
|
|
|
if ohItem != None:
|
|
if self.voc_match(command, 'Regulate'):
|
|
statusCode = self.sendCommandToItem(ohItem, tempVal)
|
|
newTempValue = tempVal
|
|
else:
|
|
state = self.getCurrentItemStatus(ohItem)
|
|
if ((state != None) and (state.isdigit())):
|
|
if self.voc_match(command, 'Increase'):
|
|
newTempValue = int(state)+(int(tempVal))
|
|
else:
|
|
newTempValue = int(state)-(int(tempVal))
|
|
|
|
statusCode = self.sendCommandToItem(ohItem, str(newTempValue))
|
|
else:
|
|
pass
|
|
|
|
if statusCode == 200:
|
|
self.speak_dialog('ThermostatStatus', {'item': messageItem, 'temp_val': str(newTempValue)})
|
|
elif statusCode == 404:
|
|
LOGGER.error("Some issues with the command execution! Item not found")
|
|
self.speak_dialog('ItemNotFoundError')
|
|
else:
|
|
LOGGER.error("Some issues with the command execution!")
|
|
self.speak_dialog('CommunicationError')
|
|
|
|
else:
|
|
LOGGER.error("Item not found!")
|
|
self.speak_dialog('ItemNotFoundError')
|
|
|
|
def sendStatusToItem(self, ohItem, command):
|
|
requestUrl = self.url+"/items/%s/state" % (ohItem)
|
|
req = requests.put(requestUrl, data=command, headers=self.command_headers)
|
|
|
|
return req.status_code
|
|
|
|
def sendCommandToItem(self, ohItem, command):
|
|
requestUrl = self.url+"/items/%s" % (ohItem)
|
|
req = requests.post(requestUrl, data=command, headers=self.command_headers)
|
|
|
|
return req.status_code
|
|
|
|
def getCurrentItemStatus(self, ohItem):
|
|
requestUrl = self.url+"/items/%s/state" % (ohItem)
|
|
state = None
|
|
|
|
try:
|
|
req = requests.get(requestUrl, headers=self.command_headers)
|
|
|
|
if req.status_code == 200:
|
|
state = req.text
|
|
else:
|
|
LOGGER.error("Some issues with the command execution!")
|
|
self.speak_dialog('CommunicationError')
|
|
|
|
except KeyError:
|
|
pass
|
|
|
|
return state
|
|
|
|
def stop(self):
|
|
pass
|
|
|
|
def create_skill():
|
|
return openHABSkill()
|