mycroft-core/mycroft/client/enclosure/base.py

561 lines
21 KiB
Python

# Copyright 2017 Mycroft AI Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
from collections import namedtuple
from threading import Lock
from mycroft.configuration import Configuration
from mycroft.messagebus.client import MessageBusClient
from mycroft.util import create_daemon
from mycroft.util.log import LOG
import json
import tornado.web as web
from tornado import ioloop
from tornado.websocket import WebSocketHandler
from mycroft.messagebus.message import Message
Namespace = namedtuple('Namespace', ['name', 'pages'])
write_lock = Lock()
namespace_lock = Lock()
RESERVED_KEYS = ['__from', '__idle']
def _get_page_data(message):
""" Extract page related data from a message.
Args:
message: messagebus message object
Returns:
tuple (page, namespace, index)
Raises:
ValueError if value is missing.
"""
data = message.data
# Note: 'page' can be either a string or a list of strings
if 'page' not in data:
raise ValueError("Page missing in data")
if 'index' in data:
index = data['index']
else:
index = 0
page = data.get("page", "")
namespace = data.get("__from", "")
return page, namespace, index
class Enclosure:
def __init__(self):
# Establish Enclosure's websocket connection to the messagebus
self.bus = MessageBusClient()
# Load full config
Configuration.set_config_update_handlers(self.bus)
config = Configuration.get()
self.lang = config['lang']
self.config = config.get("enclosure")
self.global_config = config
self.gui = create_gui_service(self, config['gui_websocket'])
# This datastore holds the data associated with the GUI provider. Data
# is stored in Namespaces, so you can have:
# self.datastore["namespace"]["name"] = value
# Typically the namespace is a meaningless identifier, but there is a
# special "SYSTEM" namespace.
self.datastore = {}
# self.loaded is a list, each element consists of a namespace named
# tuple.
# The namespace namedtuple has the properties "name" and "pages"
# The name contains the namespace name as a string and pages is a
# mutable list of loaded pages.
#
# [Namespace name, [List of loaded qml pages]]
# [
# ["SKILL_NAME", ["page1.qml, "page2.qml", ... , "pageN.qml"]
# [...]
# ]
self.loaded = [] # list of lists in order.
self.explicit_move = True # Set to true to send reorder commands
# Listen for new GUI clients to announce themselves on the main bus
self.active_namespaces = []
self.bus.on("mycroft.gui.connected", self.on_gui_client_connected)
self.register_gui_handlers()
# First send any data:
self.bus.on("gui.value.set", self.on_gui_set_value)
self.bus.on("gui.page.show", self.on_gui_show_page)
self.bus.on("gui.page.delete", self.on_gui_delete_page)
self.bus.on("gui.clear.namespace", self.on_gui_delete_namespace)
self.bus.on("gui.event.send", self.on_gui_send_event)
def run(self):
try:
self.bus.run_forever()
except Exception as e:
LOG.error("Error: {0}".format(e))
self.stop()
######################################################################
# GUI client API
def send(self, msg_dict):
""" Send to all registered GUIs. """
LOG.info('SENDING...')
for connection in GUIWebsocketHandler.clients:
try:
connection.send(msg_dict)
except Exception as e:
LOG.exception(repr(e))
def on_gui_send_event(self, message):
""" Send an event to the GUIs. """
try:
data = {'type': 'mycroft.events.triggered',
'namespace': message.data.get('__from'),
'event_name': message.data.get('event_name'),
'params': message.data.get('params')}
self.send(data)
except Exception as e:
LOG.error('Could not send event ({})'.format(repr(e)))
def on_gui_set_value(self, message):
data = message.data
namespace = data.get("__from", "")
# Pass these values on to the GUI renderers
for key in data:
if key not in RESERVED_KEYS:
try:
self.set(namespace, key, data[key])
except Exception as e:
LOG.exception(repr(e))
def set(self, namespace, name, value):
""" Perform the send of the values to the connected GUIs. """
if namespace not in self.datastore:
self.datastore[namespace] = {}
if self.datastore[namespace].get(name) != value:
self.datastore[namespace][name] = value
# If the namespace is loaded send data to GUI
if namespace in [l.name for l in self.loaded]:
msg = {"type": "mycroft.session.set",
"namespace": namespace,
"data": {name: value}}
self.send(msg)
def on_gui_delete_page(self, message):
""" Bus handler for removing pages. """
page, namespace, _ = _get_page_data(message)
try:
with namespace_lock:
self.remove_pages(namespace, page)
except Exception as e:
LOG.exception(repr(e))
def on_gui_delete_namespace(self, message):
""" Bus handler for removing namespace. """
try:
namespace = message.data['__from']
with namespace_lock:
self.remove_namespace(namespace)
except Exception as e:
LOG.exception(repr(e))
def on_gui_show_page(self, message):
try:
page, namespace, index = _get_page_data(message)
# Pass the request to the GUI(s) to pull up a page template
with namespace_lock:
self.show(namespace, page, index)
except Exception as e:
LOG.exception(repr(e))
def __find_namespace(self, namespace):
for i, skill in enumerate(self.loaded):
if skill[0] == namespace:
return i
return None
def __insert_pages(self, namespace, pages):
""" Insert pages into the namespace
Args:
namespace (str): Namespace to add to
pages (list): Pages (str) to insert
"""
LOG.debug("Inserting new pages")
if not isinstance(pages, list):
raise ValueError('Argument must be list of pages')
self.send({"type": "mycroft.gui.list.insert",
"namespace": namespace,
"position": len(self.loaded[0].pages),
"data": [{"url": p} for p in pages]
})
# Insert the pages into local reprensentation as well.
updated = Namespace(self.loaded[0].name, self.loaded[0].pages + pages)
self.loaded[0] = updated
def __remove_page(self, namespace, pos):
""" Delete page.
Args:
namespace (str): Namespace to remove from
pos (int): Page position to remove
"""
LOG.debug("Deleting {} from {}".format(pos, namespace))
self.send({"type": "mycroft.gui.list.remove",
"namespace": namespace,
"position": pos,
"items_number": 1
})
# Remove the page from the local reprensentation as well.
self.loaded[0].pages.pop(pos)
def __insert_new_namespace(self, namespace, pages):
""" Insert new namespace and pages.
This first sends a message adding a new namespace at the
highest priority (position 0 in the namespace stack)
Args:
namespace (str): The skill namespace to create
pages (str): Pages to insert (name matches QML)
"""
LOG.debug("Inserting new namespace")
self.send({"type": "mycroft.session.list.insert",
"namespace": "mycroft.system.active_skills",
"position": 0,
"data": [{"skill_id": namespace}]
})
# Load any already stored Data
data = self.datastore.get(namespace, {})
for key in data:
msg = {"type": "mycroft.session.set",
"namespace": namespace,
"data": {key: data[key]}}
self.send(msg)
LOG.debug("Inserting new page")
self.send({"type": "mycroft.gui.list.insert",
"namespace": namespace,
"position": 0,
"data": [{"url": p} for p in pages]
})
# Make sure the local copy is updated
self.loaded.insert(0, Namespace(namespace, pages))
def __move_namespace(self, from_pos, to_pos):
""" Move an existing namespace to a new position in the stack.
Args:
from_pos (int): Position in the stack to move from
to_pos (int): Position to move to
"""
LOG.debug("Activating existing namespace")
# Seems like the namespace is moved to the top automatically when
# a page change is done. Deactivating this for now.
if self.explicit_move:
LOG.debug("move {} to {}".format(from_pos, to_pos))
self.send({"type": "mycroft.session.list.move",
"namespace": "mycroft.system.active_skills",
"from": from_pos, "to": to_pos,
"items_number": 1})
# Move the local representation of the skill from current
# position to position 0.
self.loaded.insert(to_pos, self.loaded.pop(from_pos))
def __switch_page(self, namespace, pages):
""" Switch page to an already loaded page.
Args:
pages (list): pages (str) to switch to
namespace (str): skill namespace
"""
try:
num = self.loaded[0].pages.index(pages[0])
except Exception as e:
LOG.exception(repr(e))
num = 0
LOG.debug('Switching to already loaded page at '
'index {} in namespace {}'.format(num, namespace))
self.send({"type": "mycroft.events.triggered",
"namespace": namespace,
"event_name": "page_gained_focus",
"data": {"number": num}})
def show(self, namespace, page, index):
""" Show a page and load it as needed.
Args:
page (str or list): page(s) to show
namespace (str): skill namespace
index (int): ??? TODO: Unused in code ???
TODO: - Update sync to match.
- Separate into multiple functions/methods
"""
LOG.debug("GUIConnection activating: " + namespace)
pages = page if isinstance(page, list) else [page]
# find namespace among loaded namespaces
try:
index = self.__find_namespace(namespace)
if index is None:
# This namespace doesn't exist, insert them first so they're
# shown.
self.__insert_new_namespace(namespace, pages)
return
else: # Namespace exists
if index > 0:
# Namespace is inactive, activate it by moving it to
# position 0
self.__move_namespace(index, 0)
# Find if any new pages needs to be inserted
new_pages = [p for p in pages if p not in self.loaded[0].pages]
if new_pages:
self.__insert_pages(namespace, new_pages)
else:
# No new pages, just switch
self.__switch_page(namespace, pages)
except Exception as e:
LOG.exception(repr(e))
def remove_namespace(self, namespace):
""" Remove namespace.
Args:
namespace (str): namespace to remove
"""
index = self.__find_namespace(namespace)
if index is None:
return
else:
LOG.debug("Removing namespace {} at {}".format(namespace, index))
self.send({"type": "mycroft.session.list.remove",
"namespace": "mycroft.system.active_skills",
"position": index,
"items_number": 1
})
# Remove namespace from loaded namespaces
self.loaded.pop(index)
def remove_pages(self, namespace, pages):
""" Remove the listed pages from the provided namespace.
Args:
namespace (str): The namespace to modify
pages (list): List of page names (str) to delete
"""
try:
index = self.__find_namespace(namespace)
if index is None:
return
else:
# Remove any pages that doesn't exist in the namespace
pages = [p for p in pages if p in self.loaded[index].pages]
# Make sure to remove pages from the back
indexes = [self.loaded[index].pages.index(p) for p in pages]
indexes = sorted(indexes)
indexes.reverse()
for page_index in indexes:
self.__remove_page(namespace, page_index)
except Exception as e:
LOG.exception(repr(e))
######################################################################
# GUI client socket
#
# The basic mechanism is:
# 1) GUI client announces itself on the main messagebus
# 2) Mycroft prepares a port for a socket connection to this GUI
# 3) The port is announced over the messagebus
# 4) The GUI connects on the socket
# 5) Connection persists for graphical interaction indefinitely
#
# If the connection is lost, it must be renegotiated and restarted.
def on_gui_client_connected(self, message):
# GUI has announced presence
LOG.info('GUI HAS ANNOUNCED!')
port = self.global_config["gui_websocket"]["base_port"]
LOG.debug("on_gui_client_connected")
gui_id = message.data.get("gui_id")
LOG.debug("Heard announcement from gui_id: {}".format(gui_id))
# Announce connection, the GUI should connect on it soon
self.bus.emit(Message("mycroft.gui.port",
{"port": port,
"gui_id": gui_id}))
def register_gui_handlers(self):
# TODO: Register handlers for standard (Mark 1) events
# self.bus.on('enclosure.eyes.on', self.on)
# self.bus.on('enclosure.eyes.off', self.off)
# self.bus.on('enclosure.eyes.blink', self.blink)
# self.bus.on('enclosure.eyes.narrow', self.narrow)
# self.bus.on('enclosure.eyes.look', self.look)
# self.bus.on('enclosure.eyes.color', self.color)
# self.bus.on('enclosure.eyes.level', self.brightness)
# self.bus.on('enclosure.eyes.volume', self.volume)
# self.bus.on('enclosure.eyes.spin', self.spin)
# self.bus.on('enclosure.eyes.timedspin', self.timed_spin)
# self.bus.on('enclosure.eyes.reset', self.reset)
# self.bus.on('enclosure.eyes.setpixel', self.set_pixel)
# self.bus.on('enclosure.eyes.fill', self.fill)
# self.bus.on('enclosure.mouth.reset', self.reset)
# self.bus.on('enclosure.mouth.talk', self.talk)
# self.bus.on('enclosure.mouth.think', self.think)
# self.bus.on('enclosure.mouth.listen', self.listen)
# self.bus.on('enclosure.mouth.smile', self.smile)
# self.bus.on('enclosure.mouth.viseme', self.viseme)
# self.bus.on('enclosure.mouth.text', self.text)
# self.bus.on('enclosure.mouth.display', self.display)
# self.bus.on('enclosure.mouth.display_image', self.display_image)
# self.bus.on('enclosure.weather.display', self.display_weather)
# self.bus.on('recognizer_loop:record_begin', self.mouth.listen)
# self.bus.on('recognizer_loop:record_end', self.mouth.reset)
# self.bus.on('recognizer_loop:audio_output_start', self.mouth.talk)
# self.bus.on('recognizer_loop:audio_output_end', self.mouth.reset)
pass
##########################################################################
# GUIConnection
##########################################################################
gui_app_settings = {
'debug': True
}
def create_gui_service(enclosure, config):
import tornado.options
LOG.info('Starting message bus for GUI...')
# Disable all tornado logging so mycroft loglevel isn't overridden
tornado.options.parse_command_line(['--logging=None'])
routes = [(config['route'], GUIWebsocketHandler)]
application = web.Application(routes, debug=True)
application.enclosure = enclosure
application.listen(config['base_port'], config['host'])
create_daemon(ioloop.IOLoop.instance().start)
LOG.info('GUI Message bus started!')
return application
class GUIWebsocketHandler(WebSocketHandler):
"""The socket pipeline between the GUI and Mycroft."""
clients = []
def open(self):
GUIWebsocketHandler.clients.append(self)
LOG.info('New Connection opened!')
self.synchronize()
def on_close(self):
LOG.info('Closing {}'.format(id(self)))
GUIWebsocketHandler.clients.remove(self)
def synchronize(self):
""" Upload namespaces, pages and data to the last connected. """
namespace_pos = 0
enclosure = self.application.enclosure
for namespace, pages in enclosure.loaded:
LOG.info('Sync {}'.format(namespace))
# Insert namespace
self.send({"type": "mycroft.session.list.insert",
"namespace": "mycroft.system.active_skills",
"position": namespace_pos,
"data": [{"skill_id": namespace}]
})
# Insert pages
self.send({"type": "mycroft.gui.list.insert",
"namespace": namespace,
"position": 0,
"data": [{"url": p} for p in pages]
})
# Insert data
data = enclosure.datastore.get(namespace, {})
for key in data:
self.send({"type": "mycroft.session.set",
"namespace": namespace,
"data": {key: data[key]}
})
namespace_pos += 1
def on_message(self, message):
LOG.info("Received: {}".format(message))
msg = json.loads(message)
if (msg.get('type') == "mycroft.events.triggered" and
(msg.get('event_name') == 'page_gained_focus' or
msg.get('event_name') == 'system.gui.user.interaction')):
# System event, a page was changed
msg_type = 'gui.page_interaction'
msg_data = {'namespace': msg['namespace'],
'page_number': msg['parameters'].get('number')}
elif msg.get('type') == "mycroft.events.triggered":
# A normal event was triggered
msg_type = '{}.{}'.format(msg['namespace'], msg['event_name'])
msg_data = msg['parameters']
elif msg.get('type') == 'mycroft.session.set':
# A value was changed send it back to the skill
msg_type = '{}.{}'.format(msg['namespace'], 'set')
msg_data = msg['data']
message = Message(msg_type, msg_data)
LOG.info('Forwarding to bus...')
self.application.enclosure.bus.emit(message)
LOG.info('Done!')
def write_message(self, *arg, **kwarg):
"""Wraps WebSocketHandler.write_message() with a lock. """
try:
asyncio.get_event_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
with write_lock:
super().write_message(*arg, **kwarg)
def send(self, data):
"""Send the given data across the socket as JSON
Args:
data (dict): Data to transmit
"""
s = json.dumps(data)
LOG.info('Sending {}'.format(s))
self.write_message(s)
def check_origin(self, origin):
"""Disable origin check to make js connections work."""
return True