First cut at skill tester
Minimum viable solution for running existing testcases (json files). Currently considered additions: (16, 3) # TODO: Make template for testing one skill only, for the skill developer to use (138, 11) # TODO: Pass something to intent, that tells that this is a test run. The skill intent can then avoid side effects (144, 11) # TODO: add optional timeout parameter to test_case (155, 11) # TODO: Check that all intents are checked (what about context) (169, 3) # TODO: Add command line utility to test an event against a test_case, allow for debugging tests (173, 11) # TODO: Add support for expected response, and otherspull/1527/head
parent
397435afcb
commit
7e9a05f4e0
|
@ -12,8 +12,9 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
|
import Queue
|
||||||
import json
|
import json
|
||||||
from time import sleep
|
import time
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
@ -23,7 +24,6 @@ from pyee import EventEmitter
|
||||||
from mycroft.messagebus.message import Message
|
from mycroft.messagebus.message import Message
|
||||||
from mycroft.skills.core import create_skill_descriptor, load_skill
|
from mycroft.skills.core import create_skill_descriptor, load_skill
|
||||||
|
|
||||||
|
|
||||||
MainModule = '__init__'
|
MainModule = '__init__'
|
||||||
|
|
||||||
|
|
||||||
|
@ -67,27 +67,17 @@ def unload_skills(skills):
|
||||||
class RegistrationOnlyEmitter(object):
|
class RegistrationOnlyEmitter(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.emitter = EventEmitter()
|
self.emitter = EventEmitter()
|
||||||
|
self.q = None
|
||||||
|
|
||||||
def on(self, event, f):
|
def on(self, event, f):
|
||||||
allow_events_to_execute = True
|
# run all events
|
||||||
|
print "Event: " + str(event)
|
||||||
if allow_events_to_execute:
|
self.emitter.on(event, f)
|
||||||
# don't filter events, just run them all
|
|
||||||
print "Event: "+str(event)
|
|
||||||
self.emitter.on(event, f)
|
|
||||||
else:
|
|
||||||
# filter to just the registration events,
|
|
||||||
# preventing them from actually executing
|
|
||||||
if event in [
|
|
||||||
'register_intent',
|
|
||||||
'register_vocab',
|
|
||||||
'recognizer_loop:utterance'
|
|
||||||
]:
|
|
||||||
print "Event: " + str(event)
|
|
||||||
self.emitter.on(event, f)
|
|
||||||
|
|
||||||
def emit(self, event, *args, **kwargs):
|
def emit(self, event, *args, **kwargs):
|
||||||
event_name = event.type
|
event_name = event.type
|
||||||
|
if self.q:
|
||||||
|
self.q.put(event)
|
||||||
self.emitter.emit(event_name, event, *args, **kwargs)
|
self.emitter.emit(event_name, event, *args, **kwargs)
|
||||||
|
|
||||||
def once(self, event, f):
|
def once(self, event, f):
|
||||||
|
@ -103,6 +93,7 @@ class MockSkillsLoader(object):
|
||||||
self.emitter = RegistrationOnlyEmitter()
|
self.emitter = RegistrationOnlyEmitter()
|
||||||
from mycroft.skills.intent_service import IntentService
|
from mycroft.skills.intent_service import IntentService
|
||||||
self.ih = IntentService(self.emitter)
|
self.ih = IntentService(self.emitter)
|
||||||
|
self.skills = None
|
||||||
|
|
||||||
def load_skills(self):
|
def load_skills(self):
|
||||||
self.skills = load_skills(self.emitter, self.skills_root)
|
self.skills = load_skills(self.emitter, self.skills_root)
|
||||||
|
@ -114,84 +105,137 @@ class MockSkillsLoader(object):
|
||||||
|
|
||||||
|
|
||||||
class SkillTest(object):
|
class SkillTest(object):
|
||||||
def __init__(self, skill, example, emitter):
|
def __init__(self, skill, test_case_file, emitter):
|
||||||
self.skill = skill
|
self.skill = skill
|
||||||
self.example = example
|
self.test_case_file = test_case_file
|
||||||
self.emitter = emitter
|
self.emitter = emitter
|
||||||
self.dict = dict
|
self.dict = dict
|
||||||
self.output_file = None
|
self.output_file = None
|
||||||
self.returned_intent = False
|
self.returned_intent = False
|
||||||
|
|
||||||
def compare_intents(self, expected, actual):
|
|
||||||
for key in expected.keys():
|
|
||||||
if actual.get(key, "").lower() != expected.get(key, "").lower():
|
|
||||||
print(
|
|
||||||
"Expected %s: %s, Actual: %s" % (key, expected.get(key),
|
|
||||||
actual.get(key)))
|
|
||||||
assert False
|
|
||||||
|
|
||||||
def check_speech(self, message):
|
|
||||||
print "Spoken response: " + message.data['utterance']
|
|
||||||
# Comparing the expected output and actual spoken response
|
|
||||||
|
|
||||||
def run_test(output_file, utterance):
|
|
||||||
dialog_file = open(output_file, 'r')
|
|
||||||
dialog_line = [line.rstrip('\n') for line in dialog_file]
|
|
||||||
match_found = False
|
|
||||||
for i in range(len(dialog_line)):
|
|
||||||
if '{{' in dialog_line[i]:
|
|
||||||
replaced_dialog = re.sub('\{\{(\S+)\}\}',
|
|
||||||
'.*', dialog_line[i])
|
|
||||||
m = re.match(replaced_dialog, utterance)
|
|
||||||
if m is not None:
|
|
||||||
match_found = True
|
|
||||||
else:
|
|
||||||
if dialog_line[i] == utterance:
|
|
||||||
match_found = True
|
|
||||||
|
|
||||||
if match_found is True:
|
|
||||||
assert True
|
|
||||||
|
|
||||||
else:
|
|
||||||
assert False
|
|
||||||
|
|
||||||
dialog_file.close()
|
|
||||||
run_test(self.output_file, message.data['utterance'])
|
|
||||||
|
|
||||||
def run(self, loader):
|
def run(self, loader):
|
||||||
for s in loader.skills:
|
s = filter(lambda s: s and s._dir == self.skill, loader.skills)[0]
|
||||||
if s and s._dir == self.skill:
|
print('Test case file: ' + self.test_case_file)
|
||||||
name = s.name
|
test_case = json.load(open(self.test_case_file, 'r'))
|
||||||
break
|
print "Test case: " + str(test_case)
|
||||||
print('file: ' + self.example)
|
evaluation_rule = EvaluationRule(test_case)
|
||||||
example_json = json.load(open(self.example, 'r'))
|
|
||||||
event = {'utterances': [example_json.get('utterance')]}
|
|
||||||
# Extracting the expected output from json file
|
|
||||||
if "expected_output" in example_json:
|
|
||||||
output_file = str(example_json.get("expected_output"))
|
|
||||||
self.output_file = output_file
|
|
||||||
self.emitter.once('speak', self.check_speech)
|
|
||||||
else:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def compare(intent):
|
# Set up queue for emitted events. Because
|
||||||
self.compare_intents(example_json.get('intent'), intent.data)
|
# the evaluation method expects events to be received in convoy,
|
||||||
self.returned_intent = True
|
# and be handled one by one. We cant make assumptions about threading
|
||||||
|
# in the core or the skill
|
||||||
|
q = Queue.Queue()
|
||||||
|
s.emitter.q = q
|
||||||
|
|
||||||
self.emitter.once(name + ':' + example_json.get('intent_type'),
|
event = {'utterances': [test_case.get('utterance')]}
|
||||||
compare)
|
|
||||||
|
|
||||||
# Emit an utterance, just like the STT engine does. This sends the
|
# Emit an utterance, just like the STT engine does. This sends the
|
||||||
# provided text to the skill engine for intent matching and it then
|
# provided text to the skill engine for intent matching and it then
|
||||||
# invokes the skill.
|
# invokes the skill.
|
||||||
|
# TODO: Pass something to intent, that tells that this is a test run. The skill intent can then avoid side effects
|
||||||
self.emitter.emit(
|
self.emitter.emit(
|
||||||
'recognizer_loop:utterance',
|
'recognizer_loop:utterance',
|
||||||
Message('recognizer_loop:utterance', event))
|
Message('recognizer_loop:utterance', event))
|
||||||
|
|
||||||
sleep(0.2) # wait for 0.2 seconds
|
# Wait up to 30 seconds for the test_case to complete (
|
||||||
|
# TODO: add optional timeout parameter to test_case
|
||||||
|
timeout = time.time() + 30
|
||||||
|
while not evaluation_rule.all_succeeded():
|
||||||
|
try:
|
||||||
|
event = q.get(timeout=1)
|
||||||
|
except Queue.Empty:
|
||||||
|
pass
|
||||||
|
evaluation_rule.evaluate(event.data)
|
||||||
|
if time.time() > timeout:
|
||||||
|
break
|
||||||
|
|
||||||
|
# TODO: Check that all intents are checked (what about context)
|
||||||
|
|
||||||
|
# Stop emmiter from sending on queue
|
||||||
|
s.emitter.q = None
|
||||||
|
|
||||||
# remove the skill which is not responding
|
# remove the skill which is not responding
|
||||||
self.emitter.remove_all_listeners('speak')
|
self.emitter.remove_all_listeners('speak')
|
||||||
if not self.returned_intent:
|
|
||||||
print("No intent handled")
|
if not evaluation_rule.all_succeeded():
|
||||||
|
print "Evaluation failed"
|
||||||
|
print "Rule status: " + str(evaluation_rule.rule)
|
||||||
assert False
|
assert False
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: Add command line utility to test an event against a test_case, allow for debugging tests
|
||||||
|
class EvaluationRule(object):
|
||||||
|
def __init__(self, test_case):
|
||||||
|
# Convert test case to internal rule format
|
||||||
|
# TODO: Add support for expected response, and others
|
||||||
|
self.rule = []
|
||||||
|
|
||||||
|
_x = ['and']
|
||||||
|
if test_case.get('utterance', None):
|
||||||
|
_x.append(['endsWith', 'intent_type', str(test_case['intent_type'])])
|
||||||
|
|
||||||
|
if test_case.get('intent', None):
|
||||||
|
for item in test_case['intent'].items():
|
||||||
|
_x.append(['equal', str(item[0]), str(item[1])])
|
||||||
|
|
||||||
|
if _x != ['and']:
|
||||||
|
self.rule.append(_x)
|
||||||
|
|
||||||
|
if test_case.get('assert', None):
|
||||||
|
for _x in eval(test_case['assert']):
|
||||||
|
self.rule.append(_x)
|
||||||
|
|
||||||
|
print "Rule created " + str(self.rule)
|
||||||
|
|
||||||
|
def get_field_value(self, rule, msg):
|
||||||
|
if isinstance(rule, list):
|
||||||
|
value = msg.get(rule[0], None)
|
||||||
|
if len(rule) > 1 and value:
|
||||||
|
for field in rule[1:]:
|
||||||
|
value = value.get(field, None)
|
||||||
|
if not value:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
value = msg.get(rule, None)
|
||||||
|
|
||||||
|
return value
|
||||||
|
|
||||||
|
def evaluate(self, msg):
|
||||||
|
print "Evaluating message: " + str(msg)
|
||||||
|
for r in self.rule:
|
||||||
|
self.partial_evaluate(r, msg)
|
||||||
|
|
||||||
|
def partial_evaluate(self, rule, msg):
|
||||||
|
if rule[0] == 'equal':
|
||||||
|
if self.get_field_value(rule[1], msg) != rule[2]:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if rule[0] == 'notEqual':
|
||||||
|
if self.get_field_value(rule[1], msg) == rule[2]:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if rule[0] == 'endsWith':
|
||||||
|
if not (self.get_field_value(rule[1], msg) and self.get_field_value(rule[1], msg).endswith(rule[2])):
|
||||||
|
return False
|
||||||
|
|
||||||
|
if rule[0] == 'match':
|
||||||
|
if not (self.get_field_value(rule[1], msg) and re.match(rule[2], self.get_field_value(rule[1], msg))):
|
||||||
|
return False
|
||||||
|
|
||||||
|
if rule[0] == 'and':
|
||||||
|
for i in rule[1:]:
|
||||||
|
if not self.partial_evaluate(i, msg):
|
||||||
|
return False
|
||||||
|
|
||||||
|
if rule[0] == 'or':
|
||||||
|
for i in rule[1:]:
|
||||||
|
if self.partial_evaluate(i, msg):
|
||||||
|
rule.append('succeeded')
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
rule.append('succeeded')
|
||||||
|
return True
|
||||||
|
|
||||||
|
def all_succeeded(self):
|
||||||
|
return len(filter(lambda x: x[-1] != 'succeeded', self.rule)) == 0
|
||||||
|
|
Loading…
Reference in New Issue