mycroft-core/test/integrationtests/voight_kampff/tools.py

336 lines
11 KiB
Python

# Copyright 2020 Mycroft AI Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Common tools to use when creating step files for behave tests."""
from threading import Event
import time
from mycroft.audio.utils import wait_while_speaking
from mycroft.messagebus import Message
DEFAULT_TIMEOUT = 10
class CriteriaWaiter:
"""Wait for a message to meet a certain criteria.
Args:
msg_type: message type to watch
criteria_func: Function to determine if a message fulfilling the
test case has been found.
context: behave context
"""
def __init__(self, msg_type, criteria_func, context):
self.msg_type = msg_type
self.criteria_func = criteria_func
self.context = context
self.result = Event()
def reset(self):
"""Reset the wait state."""
self.result.clear()
# TODO: Remove in 21.08
def wait_unspecific(self, timeout):
"""
Wait for a specified time for criteria to be fulfilled by any message.
This use case is deprecated and only for backward compatibility
Args:
timeout: Time allowance for a message fulfilling the criteria, if
provided will override the normal normal step timeout.
Returns:
tuple (bool, str) test status and debug output
"""
timeout = timeout or self.context.step_timeout
start_time = time.monotonic()
debug = ''
while time.monotonic() < start_time + timeout:
for message in self.context.bus.get_messages(None):
status, test_dbg = self.criteria_func(message)
debug += test_dbg
if status:
self.context.matched_message = message
self.context.bus.remove_message(message)
return True, debug
self.context.bus.new_message_available.wait(0.5)
# Timed out return debug from test
return False, debug
def _check_historical_messages(self):
"""Search through the already received messages for a match.
Returns:
tuple (bool, str) test status and debug output
"""
debug = ''
for message in self.context.bus.get_messages(self.msg_type):
status, test_dbg = self.criteria_func(message)
debug += test_dbg
if status:
self.context.matched_message = message
self.context.bus.remove_message(message)
self.result.set()
break
return debug
def wait_specific(self, timeout=None):
"""Wait for a specific message type to fullfil a criteria.
Uses an event-handler to not repeatedly loop.
Args:
timeout: Time allowance for a message fulfilling the criteria, if
provided will override the normal normal step timeout.
Returns:
tuple (bool, str) test status and debug output
"""
timeout = timeout or self.context.step_timeout
debug = ''
def on_message(message):
nonlocal debug
status, test_dbg = self.criteria_func(message)
debug += test_dbg
if status:
self.context.matched_message = message
self.result.set()
self.context.bus.on(self.msg_type, on_message)
# Check historical messages
historical_debug = self._check_historical_messages()
# If no matching message was already caught, wait for it
if not self.result.is_set():
self.result.wait(timeout=timeout)
self.context.bus.remove(self.msg_type, on_message)
return self.result.is_set(), historical_debug + debug
def wait(self, timeout=None):
"""Wait for a specific message type to fullfil a criteria.
Uses an event-handler to not repeatedly loop.
Args:
timeout: Time allowance for a message fulfilling the criteria, if
provided will override the normal normal step timeout.
Returns:
(result (bool), debug (str)) Result containing status and debug
message.
"""
if self.msg_type is None:
return self.wait_unspecific(timeout)
else:
return self.wait_specific(timeout)
def then_wait(msg_type, criteria_func, context, timeout=None):
"""Wait for a specific message type to fullfil a criteria.
Uses an event-handler to not repeatedly loop.
Args:
msg_type: message type to watch
criteria_func: Function to determine if a message fulfilling the
test case has been found.
context: behave context
timeout: Time allowance for a message fulfilling the criteria, if
provided will override the normal normal step timeout.
Returns:
(result (bool), debug (str)) Result containing status and debug
message.
"""
waiter = CriteriaWaiter(msg_type, criteria_func, context)
return waiter.wait(timeout)
def then_wait_fail(msg_type, criteria_func, context, timeout=None):
"""Wait for a specified time, failing if criteria is fulfilled.
Args:
msg_type: message type to watch
criteria_func: Function to determine if a message fulfilling the
test case has been found.
context: behave context
timeout: Time allowance for a message fulfilling the criteria
Returns:
tuple (bool, str) test status and debug output
"""
status, debug = then_wait(msg_type, criteria_func, context, timeout)
return not status, debug
def mycroft_responses(context):
"""Collect and format mycroft responses from context.
Args:
context: behave context to extract messages from.
Returns: (str) Mycroft responses including skill and dialog file
"""
responses = ''
messages = context.bus.get_messages('speak')
if len(messages) > 0:
responses = 'Mycroft responded with:\n'
for m in messages:
responses += 'Mycroft: '
if 'meta' in m.data and 'dialog' in m.data['meta']:
responses += '{}.dialog'.format(m.data['meta']['dialog'])
responses += '({})\n'.format(m.data['meta'].get('skill'))
responses += '"{}"\n'.format(m.data['utterance'])
return responses
def print_mycroft_responses(context):
print(mycroft_responses(context))
def format_dialog_match_error(potential_matches, speak_messages):
"""Format error message to be displayed when an expected
This is similar to the mycroft_responses function above. The difference
is that here the expected responses are passed in instead of making
a second loop through message bus messages.
Args:
potential_matches (list): one of the dialog files in this list were
expected to be spoken
speak_messages (list): "speak" event messages from the message bus
that don't match the list of potential matches.
Returns: (str) Message detailing the error to the user
"""
error_message = (
'Expected Mycroft to respond with one of:\n'
f"\t{', '.join(potential_matches)}\n"
"Actual response(s):\n"
)
if speak_messages:
for message in speak_messages:
meta = message.data.get("meta")
if meta is not None:
if 'dialog' in meta:
error_message += f"\tDialog: {meta['dialog']}"
if 'skill' in meta:
error_message += f" (from {meta['skill']} skill)\n"
error_message += f"\t\tUtterance: {message.data['utterance']}\n"
else:
error_message += "\tMycroft didn't respond"
return error_message
def emit_utterance(bus, utterance):
"""Emit an utterance event on the message bus.
Args:
bus (InterceptAllBusClient): Bus instance to listen on
utterance (str): list of acceptable dialogs
"""
bus.emit(Message('recognizer_loop:utterance',
data={'utterances': [utterance],
'lang': 'en-us',
'session': '',
'ident': time.time()},
context={'client_name': 'mycroft_listener'}))
def wait_for_dialog(bus, dialogs, context=None, timeout=None):
"""Wait for one of the dialogs given as argument.
Args:
bus (InterceptAllBusClient): Bus instance to listen on
dialogs (list): list of acceptable dialogs
context (behave Context): optional context providing scenario timeout
timeout (int): how long to wait for the message, defaults to timeout
provided by context or 10 seconds
"""
if context:
timeout_duration = timeout or context.step_timeout
else:
timeout_duration = timeout or DEFAULT_TIMEOUT
wait_for_dialog_match(bus, dialogs, timeout_duration)
def wait_for_dialog_match(bus, dialogs, timeout=DEFAULT_TIMEOUT):
"""Match dialogs spoken to the specified list of expected dialogs.
Only one of the dialogs in the provided list need to match for this
check to be successful.
Args:
bus (InterceptAllBusClient): Bus instance to listen on
dialogs (list): list of acceptable dialogs
timeout (int): how long to wait for the message, defaults to timeout
provided by context or 10 seconds
Returns:
A boolean indicating if a match was found and the list of "speak"
events found on the message bus during the matching process.
"""
match_found = False
speak_messages = list()
timeout_time = time.monotonic() + timeout
while time.monotonic() < timeout_time:
for message in bus.get_messages('speak'):
speak_messages.append(message)
dialog = message.data.get('meta', {}).get('dialog')
if dialog in dialogs:
wait_while_speaking()
match_found = True
break
bus.clear_messages()
if match_found:
break
time.sleep(1)
return match_found, speak_messages
def wait_for_audio_service(context, message_type):
"""Wait for audio.service message that matches type provided.
May be play, stop, or pause messages
Args:
context (behave Context): optional context providing scenario timeout
message_type (string): final component of bus message in form
`mycroft.audio.service.{type}
"""
msg_type = 'mycroft.audio.service.{}'.format(message_type)
def check_for_msg(message):
return message.msg_type == msg_type, ''
passed, debug = then_wait(msg_type, check_for_msg, context)
if not passed:
debug += mycroft_responses(context)
if not debug:
if message_type == 'play':
message_type = 'start'
debug = "Mycroft didn't {} playback".format(message_type)
assert passed, debug