336 lines
11 KiB
Python
336 lines
11 KiB
Python
# Copyright 2020 Mycroft AI Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
"""Common tools to use when creating step files for behave tests."""
|
|
|
|
from threading import Event
|
|
import time
|
|
|
|
from mycroft.audio.utils import wait_while_speaking
|
|
from mycroft.messagebus import Message
|
|
|
|
|
|
DEFAULT_TIMEOUT = 10
|
|
|
|
|
|
class CriteriaWaiter:
|
|
"""Wait for a message to meet a certain criteria.
|
|
|
|
Args:
|
|
msg_type: message type to watch
|
|
criteria_func: Function to determine if a message fulfilling the
|
|
test case has been found.
|
|
context: behave context
|
|
"""
|
|
def __init__(self, msg_type, criteria_func, context):
|
|
self.msg_type = msg_type
|
|
self.criteria_func = criteria_func
|
|
self.context = context
|
|
self.result = Event()
|
|
|
|
def reset(self):
|
|
"""Reset the wait state."""
|
|
self.result.clear()
|
|
|
|
# TODO: Remove in 21.08
|
|
def wait_unspecific(self, timeout):
|
|
"""
|
|
Wait for a specified time for criteria to be fulfilled by any message.
|
|
|
|
This use case is deprecated and only for backward compatibility
|
|
|
|
Args:
|
|
timeout: Time allowance for a message fulfilling the criteria, if
|
|
provided will override the normal normal step timeout.
|
|
|
|
Returns:
|
|
tuple (bool, str) test status and debug output
|
|
"""
|
|
timeout = timeout or self.context.step_timeout
|
|
start_time = time.monotonic()
|
|
debug = ''
|
|
while time.monotonic() < start_time + timeout:
|
|
for message in self.context.bus.get_messages(None):
|
|
status, test_dbg = self.criteria_func(message)
|
|
debug += test_dbg
|
|
if status:
|
|
self.context.matched_message = message
|
|
self.context.bus.remove_message(message)
|
|
return True, debug
|
|
self.context.bus.new_message_available.wait(0.5)
|
|
# Timed out return debug from test
|
|
return False, debug
|
|
|
|
def _check_historical_messages(self):
|
|
"""Search through the already received messages for a match.
|
|
|
|
Returns:
|
|
tuple (bool, str) test status and debug output
|
|
|
|
"""
|
|
debug = ''
|
|
for message in self.context.bus.get_messages(self.msg_type):
|
|
status, test_dbg = self.criteria_func(message)
|
|
debug += test_dbg
|
|
if status:
|
|
self.context.matched_message = message
|
|
self.context.bus.remove_message(message)
|
|
self.result.set()
|
|
break
|
|
return debug
|
|
|
|
def wait_specific(self, timeout=None):
|
|
"""Wait for a specific message type to fullfil a criteria.
|
|
|
|
Uses an event-handler to not repeatedly loop.
|
|
|
|
Args:
|
|
timeout: Time allowance for a message fulfilling the criteria, if
|
|
provided will override the normal normal step timeout.
|
|
|
|
Returns:
|
|
tuple (bool, str) test status and debug output
|
|
"""
|
|
timeout = timeout or self.context.step_timeout
|
|
|
|
debug = ''
|
|
|
|
def on_message(message):
|
|
nonlocal debug
|
|
status, test_dbg = self.criteria_func(message)
|
|
debug += test_dbg
|
|
if status:
|
|
self.context.matched_message = message
|
|
self.result.set()
|
|
|
|
self.context.bus.on(self.msg_type, on_message)
|
|
# Check historical messages
|
|
historical_debug = self._check_historical_messages()
|
|
|
|
# If no matching message was already caught, wait for it
|
|
if not self.result.is_set():
|
|
self.result.wait(timeout=timeout)
|
|
self.context.bus.remove(self.msg_type, on_message)
|
|
return self.result.is_set(), historical_debug + debug
|
|
|
|
def wait(self, timeout=None):
|
|
"""Wait for a specific message type to fullfil a criteria.
|
|
|
|
Uses an event-handler to not repeatedly loop.
|
|
|
|
Args:
|
|
timeout: Time allowance for a message fulfilling the criteria, if
|
|
provided will override the normal normal step timeout.
|
|
|
|
Returns:
|
|
(result (bool), debug (str)) Result containing status and debug
|
|
message.
|
|
"""
|
|
if self.msg_type is None:
|
|
return self.wait_unspecific(timeout)
|
|
else:
|
|
return self.wait_specific(timeout)
|
|
|
|
|
|
def then_wait(msg_type, criteria_func, context, timeout=None):
|
|
"""Wait for a specific message type to fullfil a criteria.
|
|
|
|
Uses an event-handler to not repeatedly loop.
|
|
|
|
Args:
|
|
msg_type: message type to watch
|
|
criteria_func: Function to determine if a message fulfilling the
|
|
test case has been found.
|
|
context: behave context
|
|
timeout: Time allowance for a message fulfilling the criteria, if
|
|
provided will override the normal normal step timeout.
|
|
|
|
Returns:
|
|
(result (bool), debug (str)) Result containing status and debug
|
|
message.
|
|
"""
|
|
waiter = CriteriaWaiter(msg_type, criteria_func, context)
|
|
return waiter.wait(timeout)
|
|
|
|
|
|
def then_wait_fail(msg_type, criteria_func, context, timeout=None):
|
|
"""Wait for a specified time, failing if criteria is fulfilled.
|
|
|
|
Args:
|
|
msg_type: message type to watch
|
|
criteria_func: Function to determine if a message fulfilling the
|
|
test case has been found.
|
|
context: behave context
|
|
timeout: Time allowance for a message fulfilling the criteria
|
|
|
|
Returns:
|
|
tuple (bool, str) test status and debug output
|
|
"""
|
|
status, debug = then_wait(msg_type, criteria_func, context, timeout)
|
|
return not status, debug
|
|
|
|
|
|
def mycroft_responses(context):
|
|
"""Collect and format mycroft responses from context.
|
|
|
|
Args:
|
|
context: behave context to extract messages from.
|
|
|
|
Returns: (str) Mycroft responses including skill and dialog file
|
|
"""
|
|
responses = ''
|
|
messages = context.bus.get_messages('speak')
|
|
if len(messages) > 0:
|
|
responses = 'Mycroft responded with:\n'
|
|
for m in messages:
|
|
responses += 'Mycroft: '
|
|
if 'meta' in m.data and 'dialog' in m.data['meta']:
|
|
responses += '{}.dialog'.format(m.data['meta']['dialog'])
|
|
responses += '({})\n'.format(m.data['meta'].get('skill'))
|
|
responses += '"{}"\n'.format(m.data['utterance'])
|
|
return responses
|
|
|
|
|
|
def print_mycroft_responses(context):
|
|
print(mycroft_responses(context))
|
|
|
|
|
|
def format_dialog_match_error(potential_matches, speak_messages):
|
|
"""Format error message to be displayed when an expected
|
|
|
|
This is similar to the mycroft_responses function above. The difference
|
|
is that here the expected responses are passed in instead of making
|
|
a second loop through message bus messages.
|
|
|
|
Args:
|
|
potential_matches (list): one of the dialog files in this list were
|
|
expected to be spoken
|
|
speak_messages (list): "speak" event messages from the message bus
|
|
that don't match the list of potential matches.
|
|
|
|
Returns: (str) Message detailing the error to the user
|
|
"""
|
|
error_message = (
|
|
'Expected Mycroft to respond with one of:\n'
|
|
f"\t{', '.join(potential_matches)}\n"
|
|
"Actual response(s):\n"
|
|
)
|
|
if speak_messages:
|
|
for message in speak_messages:
|
|
meta = message.data.get("meta")
|
|
if meta is not None:
|
|
if 'dialog' in meta:
|
|
error_message += f"\tDialog: {meta['dialog']}"
|
|
if 'skill' in meta:
|
|
error_message += f" (from {meta['skill']} skill)\n"
|
|
error_message += f"\t\tUtterance: {message.data['utterance']}\n"
|
|
else:
|
|
error_message += "\tMycroft didn't respond"
|
|
|
|
return error_message
|
|
|
|
|
|
def emit_utterance(bus, utterance):
|
|
"""Emit an utterance event on the message bus.
|
|
|
|
Args:
|
|
bus (InterceptAllBusClient): Bus instance to listen on
|
|
utterance (str): list of acceptable dialogs
|
|
"""
|
|
bus.emit(Message('recognizer_loop:utterance',
|
|
data={'utterances': [utterance],
|
|
'lang': 'en-us',
|
|
'session': '',
|
|
'ident': time.time()},
|
|
context={'client_name': 'mycroft_listener'}))
|
|
|
|
|
|
def wait_for_dialog(bus, dialogs, context=None, timeout=None):
|
|
"""Wait for one of the dialogs given as argument.
|
|
|
|
Args:
|
|
bus (InterceptAllBusClient): Bus instance to listen on
|
|
dialogs (list): list of acceptable dialogs
|
|
context (behave Context): optional context providing scenario timeout
|
|
timeout (int): how long to wait for the message, defaults to timeout
|
|
provided by context or 10 seconds
|
|
"""
|
|
if context:
|
|
timeout_duration = timeout or context.step_timeout
|
|
else:
|
|
timeout_duration = timeout or DEFAULT_TIMEOUT
|
|
wait_for_dialog_match(bus, dialogs, timeout_duration)
|
|
|
|
|
|
def wait_for_dialog_match(bus, dialogs, timeout=DEFAULT_TIMEOUT):
|
|
"""Match dialogs spoken to the specified list of expected dialogs.
|
|
|
|
Only one of the dialogs in the provided list need to match for this
|
|
check to be successful.
|
|
|
|
Args:
|
|
bus (InterceptAllBusClient): Bus instance to listen on
|
|
dialogs (list): list of acceptable dialogs
|
|
timeout (int): how long to wait for the message, defaults to timeout
|
|
provided by context or 10 seconds
|
|
|
|
Returns:
|
|
A boolean indicating if a match was found and the list of "speak"
|
|
events found on the message bus during the matching process.
|
|
"""
|
|
match_found = False
|
|
speak_messages = list()
|
|
timeout_time = time.monotonic() + timeout
|
|
while time.monotonic() < timeout_time:
|
|
for message in bus.get_messages('speak'):
|
|
speak_messages.append(message)
|
|
dialog = message.data.get('meta', {}).get('dialog')
|
|
if dialog in dialogs:
|
|
wait_while_speaking()
|
|
match_found = True
|
|
break
|
|
bus.clear_messages()
|
|
if match_found:
|
|
break
|
|
time.sleep(1)
|
|
|
|
return match_found, speak_messages
|
|
|
|
|
|
def wait_for_audio_service(context, message_type):
|
|
"""Wait for audio.service message that matches type provided.
|
|
|
|
May be play, stop, or pause messages
|
|
|
|
Args:
|
|
context (behave Context): optional context providing scenario timeout
|
|
message_type (string): final component of bus message in form
|
|
`mycroft.audio.service.{type}
|
|
"""
|
|
msg_type = 'mycroft.audio.service.{}'.format(message_type)
|
|
|
|
def check_for_msg(message):
|
|
return message.msg_type == msg_type, ''
|
|
|
|
passed, debug = then_wait(msg_type, check_for_msg, context)
|
|
|
|
if not passed:
|
|
debug += mycroft_responses(context)
|
|
if not debug:
|
|
if message_type == 'play':
|
|
message_type = 'start'
|
|
debug = "Mycroft didn't {} playback".format(message_type)
|
|
|
|
assert passed, debug
|