This repository has been archived on 2023-11-27. You can view files and clone it, but cannot push or open issues/pull-requests.
openhab-mycroft/__init__.py

398 lines
14 KiB
Python

# -*- coding: iso-8859-15 -*-
#
# Copyright (c) 2010-2019 Contributors to the openHAB project
#
# See the NOTICE file(s) distributed with this work for additional
# information.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0
#
# SPDX-License-Identifier: EPL-2.0
#
from os.path import dirname
from adapt.intent import IntentBuilder
from mycroft.skills.core import MycroftSkill, intent_handler
from mycroft.util.log import getLogger
from rapidfuzz import fuzz
import requests
import json
# v 0.1 - just switch on and switch off a fix light
# v 0.2 - code review
# v 0.3 - first working version on fixed light item
# v 0.4 - getTaggedItems method in order to get all the tagged items from openHAB
# v 0.5 - refresh tagged item intent
# v 0.6 - add findItemName method and import fuzzywuzzy
# v 0.7 - add intent for switchable items
# v 0.8 - merged lighting and switchable intent in onoff intent
# v 0.9 - added support to dimmable items
# v 1.0 - added Thermostat tag support
# v 1.1 - added what status Switchable tag
# v 1.2 - support to python 3.0
# v 1.3 - support german
# v 1.4 - support spanish
# v 1.5 - support to 19
__author__ = 'mortommy'
LOGGER = getLogger(__name__)
class openHABSkill(MycroftSkill):
def __init__(self):
super(openHABSkill, self).__init__(name="openHABSkill")
self.command_headers = {"Content-type": "text/plain"}
self.polling_headers = {"Accept": "application/json"}
self.url = None
self.lightingItemsDic = dict()
self.switchableItemsDic = dict()
self.currentTempItemsDic = dict()
self.currentHumItemsDic = dict()
#self.currentThermostatItemsDic = dict()
self.targetTemperatureItemsDic = dict()
#self.homekitHeatingCoolingModeDic = dict()
def initialize(self):
supported_languages = ["en-us", "it-it", "de-de", "es-es"]
if self.lang not in supported_languages:
self.log.warning("Unsupported language for " + self.name + ", shutting down skill.")
self.shutdown()
self.handle_websettings_update()
if self.url is not None:
self.getTaggedItems()
else:
self.speak_dialog('ConfigurationNeeded')
refresh_tagged_items_intent = IntentBuilder("RefreshTaggedItemsIntent").require("RefreshTaggedItemsKeyword").build()
self.register_intent(refresh_tagged_items_intent, self.handle_refresh_tagged_items_intent)
onoff_status_intent = IntentBuilder("OnOff_StatusIntent").require("OnOffStatusKeyword").require("Command").require("Item").build()
self.register_intent(onoff_status_intent, self.handle_onoff_status_intent)
dimmer_status_intent = IntentBuilder("Dimmer_StatusIntent").require("DimmerStatusKeyword").require("Item").optionally("BrightPercentage").build()
self.register_intent(dimmer_status_intent, self.handle_dimmer_status_intent)
#what_status_intent = IntentBuilder("What_StatusIntent").require("WhatStatusKeyword").require("Item").require("RequestType").build()
#self.register_intent(what_status_intent, self.handle_what_status_intent)
self.register_entity_file('item.entity')
self.register_entity_file('requesttype.entity')
self.register_intent_file('what.status.intent',self.handle_what_status_intent)
setTemp_status_intent = IntentBuilder("SetTemp_StatusIntent").require("ThermostatStatusKeyword").require("Item").require("TempValue").build()
self.register_intent(setTemp_status_intent, self.handle_setTemp_status_intent)
list_items_intent = IntentBuilder("ListItemsIntent").require("ListItemsKeyword").build()
self.register_intent(list_items_intent, self.handle_list_items_intent)
self.settings_change_callback = self.handle_websettings_update
def get_config(self, key):
return (self.settings.get(key) or self.config_core.get('openHABSkill', {}).get(key))
def handle_websettings_update(self):
if self.get_config('host') is not None and self.get_config('port') is not None:
self.url = "http://%s:%s/rest" % (self.get_config('host'), self.get_config('port'))
self.getTaggedItems()
else:
self.url = None
def getTaggedItems(self):
#find all the items tagged Lighting and Switchable from openHAB
#the labeled items are stored in dictionaries
self.lightingItemsDic = {}
self.switchableItemsDic = {}
self.currentTempItemsDic = {}
self.currentHumItemsDic = {}
self.currentThermostatItemsDic = {}
self.targetTemperatureItemsDic = {}
self.homekitHeatingCoolingModeDic = {}
if self.url == None:
LOGGER.error("Configuration needed!")
self.speak_dialog('ConfigurationNeeded')
else:
requestUrl = self.url+"/items?recursive=false"
try:
req = requests.get(requestUrl, headers=self.polling_headers)
if req.status_code == 200:
json_response = req.json()
for x in range(0,len(json_response)):
if ("Lighting" in json_response[x]['tags']):
self.lightingItemsDic.update({json_response[x]['name']: json_response[x]['label']})
elif ("Switchable" in json_response[x]['tags']):
self.switchableItemsDic.update({json_response[x]['name']: json_response[x]['label']})
elif ("CurrentTemperature" in json_response[x]['tags']):
self.currentTempItemsDic.update({json_response[x]['name']: json_response[x]['label']})
elif ("CurrentHumidity" in json_response[x]['tags']):
self.currentHumItemsDic.update({json_response[x]['name']: json_response[x]['label']})
elif ("Thermostat" in json_response[x]['tags']):
self.currentThermostatItemsDic.update({json_response[x]['name']: json_response[x]['label']})
elif ("TargetTemperature" in json_response[x]['tags']):
self.targetTemperatureItemsDic.update({json_response[x]['name']: json_response[x]['label']})
elif ("homekit:HeatingCoolingMode" in json_response[x]['tags']):
self.homekitHeatingCoolingModeDic.update({json_response[x]['name']: json_response[x]['label']})
else:
pass
else:
LOGGER.error("Some issues with the command execution!")
self.speak_dialog('GetItemsListError')
except KeyError:
pass
except Exception:
LOGGER.error("Some issues with the command execution!")
self.speak_dialog('GetItemsListError')
def findItemName(self, itemDictionary, messageItem):
bestScore = 0
score = 0
bestItem = None
try:
for itemName, itemLabel in list(itemDictionary.items()):
score = fuzz.ratio(messageItem, itemLabel, score_cutoff=bestScore)
if score > bestScore:
bestScore = score
bestItem = itemName
except KeyError:
pass
return bestItem
def getItemsFromDict(self, typeStr, itemsDict):
if len(itemsDict) == 0:
return ""
else:
return "%s: %s" % (typeStr, ', '.join(list(itemsDict.values())))
def handle_list_items_intent(self, message):
msg = self.getItemsFromDict("Lights", self.lightingItemsDic) + "\n"
msg = msg.strip() + ' ' + self.getItemsFromDict("Switches", self.switchableItemsDic) + "\n"
msg = msg.strip() + ' ' + self.getItemsFromDict("Current Temperature", self.currentTempItemsDic) + "\n"
msg = msg.strip() + ' ' + self.getItemsFromDict("Current Humidity", self.currentHumItemsDic) + "\n"
msg = msg.strip() + ' ' + self.getItemsFromDict("Thermostat", self.currentThermostatItemsDic) + "\n"
msg = msg.strip() + ' ' + self.getItemsFromDict("Target Temperature", self.targetTemperatureItemsDic) + "\n"
msg = msg.strip() + ' ' + self.getItemsFromDict("Homekit Heating and Cooling", self.homekitHeatingCoolingModeDic)
self.speak_dialog('FoundItems', {'items': msg.strip()})
def handle_refresh_tagged_items_intent(self, message):
#to refresh the openHAB items labeled list we use an intent, we can ask Mycroft to make the refresh
self.getTaggedItems()
dictLenght = str(len(self.lightingItemsDic) + len(self.switchableItemsDic) + len(self.currentTempItemsDic) + len(self.currentHumItemsDic) + len(self.currentThermostatItemsDic) + len(self.targetTemperatureItemsDic) + len(self.homekitHeatingCoolingModeDic))
self.speak_dialog('RefreshTaggedItems', {'number_item': dictLenght})
def handle_onoff_status_intent(self, message):
command = message.data.get('Command')
messageItem = message.data.get('Item')
#We have to find the item to update from our dictionaries
self.lightingSwitchableItemsDic = dict()
self.lightingSwitchableItemsDic.update(self.lightingItemsDic)
self.lightingSwitchableItemsDic.update(self.switchableItemsDic)
ohItem = self.findItemName(self.lightingSwitchableItemsDic, messageItem)
if ohItem != None:
if (command != "on") and (command != "off"):
self.speak_dialog('ErrorDialog')
else:
statusCode = self.sendCommandToItem(ohItem, command.upper())
if statusCode == 200:
self.speak_dialog('StatusOnOff', {'command': command, 'item': messageItem})
elif statusCode == 404:
LOGGER.error("Some issues with the command execution!. Item not found")
self.speak_dialog('ItemNotFoundError')
else:
LOGGER.error("Some issues with the command execution!")
self.speak_dialog('CommunicationError')
else:
LOGGER.error("Item not found!")
self.speak_dialog('ItemNotFoundError')
def handle_dimmer_status_intent(self, message):
command = message.data.get('DimmerStatusKeyword')
messageItem = message.data.get('Item')
brightValue = message.data.get('BrightPercentage', None)
statusCode = 0
newBrightValue = 0
ohItem = self.findItemName(self.lightingItemsDic, messageItem)
if ohItem != None:
#if ((command == "set") or (command == "imposta") or (command == "setze") or (command == "pone")):
if self.voc_match(command, 'Set'):
if ((brightValue == None) or (int(brightValue) < 0) or (int(brightValue) > 100)):
self.speak_dialog('ErrorDialog')
else:
statusCode = self.sendCommandToItem(ohItem, brightValue)
else:
#find current item statusCode
state = self.getCurrentItemStatus(ohItem)
if (state != None):
#dim or brighten the value
curBrightList = state.split(',')
curBright = int(curBrightList[len(curBrightList)-1])
if(brightValue == None):
brightValue = "10"
#if ((command == "dim") or (command == "abbassa") or (command == "dimme") or (command == "oscurece")):
if self.voc_match(command, 'Dim'):
newBrightValue = curBright-(int(brightValue))
else:
newBrightValue = curBright+(int(brightValue))
if (newBrightValue < 0):
newBrightValue = 0
elif (newBrightValue > 100):
newBrightValue = 100
else:
pass
#send command to item
statusCode = self.sendCommandToItem(ohItem, str(newBrightValue))
else:
pass
if statusCode == 200:
self.speak_dialog('StatusDimmer', {'item': messageItem})
elif statusCode == 404:
LOGGER.error("Some issues with the command execution!. Item not found")
self.speak_dialog('ItemNotFoundError')
else:
LOGGER.error("Some issues with the command execution!")
self.speak_dialog('CommunicationError')
else:
LOGGER.error("Item not found!")
self.speak_dialog('ItemNotFoundError')
def handle_what_status_intent(self, message):
messageItem = message.data.get('item')
LOGGER.debug("Item: %s" % (messageItem))
requestType = message.data.get('requesttype')
LOGGER.debug("Request Type: %s" % (requestType))
unitOfMeasure = self.translate('Degree')
infoType = self.translate('Temperature')
self.currStatusItemsDic = dict()
if self.voc_match(requestType, 'Temperature'):
self.currStatusItemsDic.update(self.currentTempItemsDic)
elif self.voc_match(requestType, 'Humidity'):
unitOfMeasure = self.translate('Percentage')
infoType = self.translate('Humidity')
self.currStatusItemsDic.update(self.currentHumItemsDic)
elif self.voc_match(requestType, 'Status'):
infoType = self.translate('Status')
unitOfMeasure = ""
self.currStatusItemsDic.update(self.switchableItemsDic)
else:
self.currStatusItemsDic.update(self.targetTemperatureItemsDic)
ohItem = self.findItemName(self.currStatusItemsDic, messageItem)
if ohItem != None:
state = self.getCurrentItemStatus(ohItem)
self.speak_dialog('TempHumStatus', {'item': messageItem, 'temp_hum': infoType, 'temp_hum_val': state, 'units_of_measurement': unitOfMeasure})
else:
LOGGER.error("Item not found!")
self.speak_dialog('ItemNotFoundError')
def handle_setTemp_status_intent(self, message):
command = message.data.get('ThermostatStatusKeyword')
messageItem = message.data.get('Item')
tempVal = message.data.get('TempValue')
statusCode = 0
newTempValue = 0
ohItem = self.findItemName(self.targetTemperatureItemsDic, messageItem)
if ohItem != None:
if self.voc_match(command, 'Regulate'):
statusCode = self.sendCommandToItem(ohItem, tempVal)
newTempValue = tempVal
else:
state = self.getCurrentItemStatus(ohItem)
if ((state != None) and (state.isdigit())):
if self.voc_match(command, 'Increase'):
newTempValue = int(state)+(int(tempVal))
else:
newTempValue = int(state)-(int(tempVal))
statusCode = self.sendCommandToItem(ohItem, str(newTempValue))
else:
pass
if statusCode == 200:
self.speak_dialog('ThermostatStatus', {'item': messageItem, 'temp_val': str(newTempValue)})
elif statusCode == 404:
LOGGER.error("Some issues with the command execution! Item not found")
self.speak_dialog('ItemNotFoundError')
else:
LOGGER.error("Some issues with the command execution!")
self.speak_dialog('CommunicationError')
else:
LOGGER.error("Item not found!")
self.speak_dialog('ItemNotFoundError')
def sendStatusToItem(self, ohItem, command):
requestUrl = self.url+"/items/%s/state" % (ohItem)
req = requests.put(requestUrl, data=command, headers=self.command_headers)
return req.status_code
def sendCommandToItem(self, ohItem, command):
requestUrl = self.url+"/items/%s" % (ohItem)
req = requests.post(requestUrl, data=command, headers=self.command_headers)
return req.status_code
def getCurrentItemStatus(self, ohItem):
requestUrl = self.url+"/items/%s/state" % (ohItem)
state = None
try:
req = requests.get(requestUrl, headers=self.command_headers)
if req.status_code == 200:
state = req.text
else:
LOGGER.error("Some issues with the command execution!")
self.speak_dialog('CommunicationError')
except KeyError:
pass
return state
def stop(self):
pass
def create_skill():
return openHABSkill()