Merge branch 'dev' into feature/setup_virtenvpath

pull/992/head
Åke 2017-08-17 10:31:48 +02:00 committed by GitHub
commit 5c369f1517
64 changed files with 693 additions and 103 deletions

View File

@ -10,13 +10,11 @@ cache: pocketsphinx-python
install:
- VIRTUALENV_ROOT=${VIRTUAL_ENV} ./dev_setup.sh
- pip install -r requirements.txt
- pip install nose2 cov-core
- pip install python-coveralls
# - pip install -r test-requirements.txt
- pip install -r test-requirements.txt
# command to run tests
script:
- pep8 mycroft test
- nose2 test --with-coverage --config=test/unittest.cfg
- nose2 -t ./ -s test/unittests/ --with-coverage --config=test/unittests/unittest.cfg
env:
- IS_TRAVIS=true

View File

@ -98,10 +98,12 @@ fi
# removing the pip2 explicit usage here for consistency with the above use.
pip install -r requirements.txt
if [[ $(free|awk '/^Mem:/{print $2}') -lt 1572864 ]] ; then
CORES=1
else
CORES=$(nproc)
SYSMEM=$(free|awk '/^Mem:/{print $2}')
MAXCORES=$(($SYSMEM / 512000))
CORES=$(nproc)
if [[ ${MAXCORES} -lt ${CORES} ]]; then
CORES=${MAXCORES}
fi
echo "Building with $CORES cores."
@ -113,10 +115,10 @@ cd "${TOP}"
if [[ "$build_mimic" == 'y' ]] || [[ "$build_mimic" == 'Y' ]]; then
echo "WARNING: The following can take a long time to run!"
"${TOP}/scripts/install-mimic.sh"
"${TOP}/scripts/install-mimic.sh" " ${CORES}"
else
echo "Skipping mimic build."
fi
# install pygtk for desktop_launcher skill
"${TOP}/scripts/install-pygtk.sh"
"${TOP}/scripts/install-pygtk.sh" " ${CORES}"

33
mycroft/skills/context.py Normal file
View File

@ -0,0 +1,33 @@
from functools import wraps
"""
Helper decorators for handling context from skills.
"""
def adds_context(context, words=''):
"""
Adds context to context manager.
"""
def context_add_decorator(func):
@wraps(func)
def func_wrapper(*args, **kwargs):
ret = func(*args, **kwargs)
args[0].set_context(context)
return ret
return func_wrapper
return context_add_decorator
def removes_context(context):
"""
Removes context from the context manager.
"""
def context_removes_decorator(func):
@wraps(func)
def func_wrapper(*args, **kwargs):
ret = func(*args, **kwargs)
args[0].remove_context(context)
return ret
return func_wrapper
return context_removes_decorator

View File

@ -314,6 +314,29 @@ class MycroftSkill(object):
logger.error('Could not enable ' + intent_name +
', it hasn\'t been registered.')
def set_context(self, context, word=''):
"""
Add context to intent service
Args:
context: Keyword
word: word connected to keyword
"""
if not isinstance(context, basestring):
raise ValueError('context should be a string')
if not isinstance(word, basestring):
raise ValueError('word should be a string')
self.emitter.emit(Message('add_context', {'context': context, 'word':
word}))
def remove_context(self, context):
"""
remove_context removes a keyword from from the context manager.
"""
if not isinstance(context, basestring):
raise ValueError('context should be a string')
self.emitter.emit(Message('remove_context', {'context': context}))
def register_vocabulary(self, entity, entity_type):
self.emitter.emit(Message('register_vocab', {
'start': entity, 'end': entity_type

View File

@ -22,21 +22,130 @@ from mycroft.messagebus.message import Message
from mycroft.skills.core import open_intent_envelope
from mycroft.util.log import getLogger
from mycroft.util.parse import normalize
from mycroft.configuration import ConfigurationManager
from adapt.context import ContextManagerFrame
import time
__author__ = 'seanfitz'
logger = getLogger(__name__)
class ContextManager(object):
"""
ContextManager
Use to track context throughout the course of a conversational session.
How to manage a session's lifecycle is not captured here.
"""
def __init__(self, timeout):
self.frame_stack = []
self.timeout = timeout * 60 # minutes to seconds
def clear_context(self):
self.frame_stack = []
def remove_context(self, context_id):
self.frame_stack = [(f, t) for (f, t) in self.frame_stack
if context_id in f.entities[0].get('data', [])]
def inject_context(self, entity, metadata={}):
"""
Args:
entity(object):
format {'data': 'Entity tag as <str>',
'key': 'entity proper name as <str>',
'confidence': <float>'
}
metadata(object): dict, arbitrary metadata about the entity being
added
"""
top_frame = self.frame_stack[0] if len(self.frame_stack) > 0 else None
if top_frame and top_frame[0].metadata_matches(metadata):
top_frame[0].merge_context(entity, metadata)
else:
frame = ContextManagerFrame(entities=[entity],
metadata=metadata.copy())
self.frame_stack.insert(0, (frame, time.time()))
def get_context(self, max_frames=None, missing_entities=[]):
"""
Constructs a list of entities from the context.
Args:
max_frames(int): maximum number of frames to look back
missing_entities(list of str): a list or set of tag names,
as strings
Returns:
list: a list of entities
"""
relevant_frames = [frame[0] for frame in self.frame_stack if
time.time() - frame[1] < self.timeout]
if not max_frames or max_frames > len(relevant_frames):
max_frames = len(relevant_frames)
missing_entities = list(missing_entities)
context = []
for i in xrange(max_frames):
frame_entities = [entity.copy() for entity in
relevant_frames[i].entities]
for entity in frame_entities:
entity['confidence'] = entity.get('confidence', 1.0) \
/ (2.0 + i)
context += frame_entities
result = []
if len(missing_entities) > 0:
for entity in context:
if entity.get('data') in missing_entities:
result.append(entity)
# NOTE: this implies that we will only ever get one
# of an entity kind from context, unless specified
# multiple times in missing_entities. Cannot get
# an arbitrary number of an entity kind.
missing_entities.remove(entity.get('data'))
else:
result = context
# Only use the latest instance of each keyword
stripped = []
processed = []
for f in result:
keyword = f['data'][0][1]
if keyword not in processed:
stripped.append(f)
processed.append(keyword)
result = stripped
return result
class IntentService(object):
def __init__(self, emitter):
self.config = ConfigurationManager.get().get('context', {})
self.engine = IntentDeterminationEngine()
self.context_keywords = self.config.get('keywords', ['Location'])
self.context_max_frames = self.config.get('max_frames', 3)
self.context_timeout = self.config.get('timeout', 2)
self.context_greedy = self.config.get('greedy', False)
self.context_manager = ContextManager(self.context_timeout)
self.emitter = emitter
self.emitter.on('register_vocab', self.handle_register_vocab)
self.emitter.on('register_intent', self.handle_register_intent)
self.emitter.on('recognizer_loop:utterance', self.handle_utterance)
self.emitter.on('detach_intent', self.handle_detach_intent)
self.emitter.on('detach_skill', self.handle_detach_skill)
# Context related handlers
self.emitter.on('add_context', self.handle_add_context)
self.emitter.on('remove_context', self.handle_remove_context)
self.emitter.on('clear_context', self.handle_clear_context)
def update_context(self, intent):
for tag in intent['__tags__']:
context_entity = tag.get('entities')[0]
if self.context_greedy:
self.context_manager.inject_context(context_entity)
elif context_entity['data'][0][1] in self.context_keywords:
self.context_manager.inject_context(context_entity)
def handle_utterance(self, message):
# Get language of the utterance
@ -51,8 +160,9 @@ class IntentService(object):
try:
# normalize() changes "it's a boy" to "it is boy", etc.
best_intent = next(self.engine.determine_intent(
normalize(utterance, lang), 100))
normalize(utterance, lang), 100,
include_tags=True,
context_manager=self.context_manager))
# TODO - Should Adapt handle this?
best_intent['utterance'] = utterance
except StopIteration, e:
@ -60,6 +170,7 @@ class IntentService(object):
continue
if best_intent and best_intent.get('confidence', 0.0) > 0.0:
self.update_context(best_intent)
reply = message.reply(
best_intent.get('intent_type'), best_intent)
self.emitter.emit(reply)
@ -96,3 +207,19 @@ class IntentService(object):
p for p in self.engine.intent_parsers if
not p.name.startswith(skill_name)]
self.engine.intent_parsers = new_parsers
def handle_add_context(self, message):
entity = {'confidence': 1.0}
context = message.data.get('context')
word = message.data.get('word') or ''
entity['data'] = [(word, context)]
entity['match'] = word
entity['key'] = word
self.context_manager.inject_context(entity)
def handle_remove_context(self, message):
context = message.data.get('context')
self.context_manager.remove_context(context)
def handle_clear_context(self, message):
self.context_manager.clear_context()

104
mycroft/util/download.py Normal file
View File

@ -0,0 +1,104 @@
from threading import Thread
import requests
import os
from os.path import exists
_running_downloads = {}
def _get_download_tmp(dest):
tmp_base = dest + '.part'
if not exists(tmp_base):
return tmp_base
else:
i = 1
while(True):
tmp = tmp_base + '.' + str(i)
if not exists(tmp):
return tmp
else:
i += 1
class Downloader(Thread):
"""
Downloader is a thread based downloader instance when instanciated
it will download the provided url to a file on disk.
When the download is complete or failed the `.done` property will
be set to true and the `.status` will indicate the status code.
200 = Success.
Args:
url: Url to download
dest: Path to save data to
complet_action: Function to run when download is complete.
`func(dest)`
"""
def __init__(self, url, dest, complete_action=None):
super(Downloader, self).__init__()
self.url = url
self.dest = dest
self.complete_action = complete_action
self.status = None
self.done = False
self._abort = False
# Start thread
self.start()
def run(self):
"""
Does the actual download.
"""
r = requests.get(self.url, stream=True)
tmp = _get_download_tmp(self.dest)
with open(tmp, 'w') as f:
for chunk in r.iter_content():
f.write(chunk)
if self._abort:
break
self.status = r.status_code
if not self._abort and self.status == 200:
self.finalize(tmp)
else:
self.cleanup(self, tmp)
self.done = True
arg_hash = hash(self.url + self.dest)
# Remove from list of currently running downloads
if arg_hash in _running_downloads:
_running_downloads.pop(arg_hash)
def finalize(self, tmp):
"""
Move the .part file to the final destination and perform any
actions that should be performed at completion.
"""
os.rename(tmp, self.dest)
if self.complete_action:
self.complete_action(self.dest)
def cleanup(tmp):
"""
Cleanup after download attempt
"""
if exists(tmp):
os.remove(self.dest + '.part')
if self.status == 200:
self.status = -1
def abort(self):
"""
Abort download process
"""
self._abort = True
def download(url, dest, complete_action=None):
global _running_downloads
arg_hash = hash(url + dest)
if arg_hash not in _running_downloads:
_running_downloads[arg_hash] = Downloader(url, dest, complete_action)
return _running_downloads[arg_hash]

View File

@ -33,7 +33,6 @@ pyric==0.1.6
inflection==0.3.1
pytz==2017.2
pillow==4.1.1
mock==2.0.0
python-dateutil==2.6.0
pychromecast==0.7.7
python-vlc==1.1.2

View File

@ -3,7 +3,7 @@
set -Ee
MIMIC_DIR=mimic
CORES=$(nproc)
CORES=$1
MIMIC_VERSION=1.2.0.2
# for ubuntu precise in travis, that does not provide pkg-config:
@ -16,7 +16,7 @@ if [ ! -d ${MIMIC_DIR} ]; then
cd ${MIMIC_DIR}
./autogen.sh
./configure --with-audio=alsa --enable-shared --prefix=$(pwd)
make -j$CORES
make -j${CORES}
make install
else
# ensure mimic is up to date
@ -28,6 +28,6 @@ else
./autogen.sh
./configure --with-audio=alsa --enable-shared --prefix=$(pwd)
make clean
make -j$CORES
make -j${CORES}
make install
fi

View File

@ -8,7 +8,7 @@ fi
# Setup variables.
CACHE="/tmp/install-pygtk-$$"
CORES=$(nproc)
CORES=$1
# Make temp directory.
mkdir -p $CACHE
@ -29,7 +29,7 @@ then
( cd py2cairo*
autoreconf -ivf
./configure --prefix=$VIRTUAL_ENV --disable-dependency-tracking
make -j$CORES
make -j${CORES}
make install
)
)
@ -50,7 +50,7 @@ then
tar -xvf pygobject.tar.bz2
( cd pygobject*
./configure --prefix=$VIRTUAL_ENV --disable-introspection
make -j$CORES
make -j${CORES}
make install
)
)
@ -71,7 +71,7 @@ then
tar -xvf pygtk.tar.bz2
( cd pygtk-*
./configure --prefix=$VIRTUAL_ENV PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$VIRTUAL_ENV/lib/pkgconfig
make -j$CORES
make -j${CORES}
make install
)
)

View File

@ -1,2 +1,5 @@
pep8
xmlrunner==1.7.7
pep8==1.7.0
nose2==0.6.5
cov-core==1.15.0
python-coveralls==2.9.1
mock==2.0.0

View File

@ -0,0 +1,157 @@
"""This is a unittest for the message buss
It's important to note that this requires this test to run mycroft service
to test the buss. It is not expected that the service be already running
when the tests are ran.
"""
import unittest
from mycroft.messagebus.message import Message
from mycroft.messagebus.client.ws import WebsocketClient
from subprocess import Popen, call
from threading import Thread
import time
class TestMessagebusMethods(unittest.TestCase):
"""This class is for testing the messsagebus.
It currently only tests send and receive. The tests could include
more.
"""
def setUp(self):
"""
This sets up for testing the message buss
This requires starting the mycroft service and creating two
WebsocketClient object to talk with eachother. Not this is
threaded and will require cleanup
"""
# start the mycroft service. and get the pid of the script.
self.pid = Popen(["python", "mycroft/messagebus/service/main.py"]).pid
# delay to allow the service to start up.
time.sleep(10)
# Create the two web clients
self.ws1 = WebsocketClient()
self.ws2 = WebsocketClient()
# init the flags for handler's
self.handle1 = False
self.handle2 = False
# Start threads to handle websockets
Thread(target=self.ws1.run_forever).start()
Thread(target=self.ws2.run_forever).start()
# Sleep to give the websockets to startup before adding handlers
time.sleep(10)
# Setup handlers for each of the messages.
self.ws1.on('ws1.message', self.onHandle1)
self.ws2.on('ws2.message', self.onHandle2)
def onHandle1(self, event):
"""This is the handler for ws1.message
This for now simply sets a flag to true when received.
Args:
event(Message): this is the message received
"""
self.handle1 = True
def onHandle2(self, event):
"""This is the handler for ws2.message
This for now simply sets a flag to true when received.
Args:
event(Message): this is the message received
"""
self.handle2 = True
def tearDown(self):
"""This is the clean up for the tests
This will close the websockets ending the threads then kill the
mycroft service that was started in setUp.
"""
self.ws1.close()
self.ws2.close()
retcode = call(["kill", "-9", str(self.pid)])
def test_ClientServer(self):
"""This is the test to send a message from each of the websockets
to the other.
"""
# Send the messages
self.ws2.emit(Message('ws1.message'))
self.ws1.emit(Message('ws2.message'))
# allow time for messages to be processed
time.sleep(10)
# Check that both of the handlers were called.
self.assertTrue(self.handle1)
self.assertTrue(self.handle2)
class TestMessageMethods(unittest.TestCase):
"""This tests the Message class functions
"""
def setUp(self):
"""This sets up some basic messages for testing.
"""
self.empty_message = Message("empty")
self.message1 = Message("enclosure.reset")
self.message2 = Message("enclosure.system.blink",
{'target': 4}, {'target': 5})
self.message3 = Message("status", "OK")
# serialized results of each of the messages
self.serialized = ['{"data": {}, "type": "empty", "context": null}',
'{"data": {}, "type": "enclosure.reset",\
"context": null}',
'{"data": { "target": 4}, \
"type": "enclosure.system.blink", \
"context": {"target": 5}}',
'{"data": "OK", "type": "status", \
"context": null}']
def test_serialize(self):
"""This test the serialize method
"""
self.assertEqual(self.empty_message.serialize(), self.serialized[0])
self.assertEqual(self.message1.serialize(), self.serialized[1])
self.assertEqual(self.message2.serialize(), self.serialized[2])
self.assertEqual(self.message3.serialize(), self.serialized[3])
def test_deserialize(self):
"""This test's the deserialize method
"""
messages = []
# create the messages from the serialized strings above
messages.append(Message.deserialize(self.serialized[0]))
messages.append(Message.deserialize(self.serialized[1]))
messages.append(Message.deserialize(self.serialized[2]))
# check the created messages match the strings
self.assertEqual(messages[0].serialize(), self.serialized[0])
self.assertEqual(messages[1].serialize(), self.serialized[1])
self.assertEqual(messages[2].serialize(), self.serialized[2])
def test_reply(self):
"""This tests the reply method
This is probably incomplete as the use of the reply message escapes me.
"""
message = self.empty_message.reply("status", "OK")
self.assertEqual(message.serialize(),
'{"data": "OK", "type": "status", "context": {}}')
message = self.message1.reply("status", "OK")
self.assertEqual(message.serialize(),
'{"data": "OK", "type": "status", "context": {}}')
message = self.message2.reply("status", "OK")
def test_publish(self):
"""This is for testing the publish method
TODO: Needs to be completed
"""
pass
if __name__ == '__main__':
"""This is to start the testing"""
unittest.main()

View File

@ -1,18 +1,24 @@
import os
import sys
import glob
import unittest
from test.skills.skill_tester import MockSkillsLoader, SkillTest
from test.integrationtests.skills.skill_tester import MockSkillsLoader
from test.integrationtests.skills.skill_tester import SkillTest
__author__ = 'seanfitz'
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
SKILL_PATH = '/opt/mycroft/skills'
def discover_tests():
global SKILL_PATH
if len(sys.argv) > 1:
SKILL_PATH = sys.argv.pop(1)
tests = {}
skills = [
skill for skill
in glob.glob(os.path.join(PROJECT_ROOT, 'mycroft/skills/*'))
in glob.glob(SKILL_PATH + '/*')
if os.path.isdir(skill)
]
@ -31,13 +37,14 @@ class IntentTestSequenceMeta(type):
def __new__(mcs, name, bases, d):
def gen_test(a, b):
def test(self):
SkillTest(a, b, self.emitter).run()
SkillTest(a, b, self.emitter).run(self.loader)
return test
tests = discover_tests()
for skill in tests.keys():
skill_name = os.path.basename(skill)
skill_name = os.path.basename(skill) # Path of the skill
for example in tests[skill]:
# Name of the intent
example_name = os.path.basename(
os.path.splitext(os.path.splitext(example)[0])[0])
test_name = "test_IntentValidation[%s:%s]" % (skill_name,
@ -49,14 +56,16 @@ class IntentTestSequenceMeta(type):
class IntentTestSequence(unittest.TestCase):
__metaclass__ = IntentTestSequenceMeta
def setUp(self):
self.loader = MockSkillsLoader(
os.path.join(PROJECT_ROOT, 'mycroft', 'skills'))
@classmethod
def setUpClass(self):
self.loader = MockSkillsLoader(SKILL_PATH)
self.emitter = self.loader.load_skills()
def tearDown(self):
@classmethod
def tearDownClass(self):
self.loader.unload_skills()
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,145 @@
import json
from os.path import dirname
import re
from time import sleep
from pyee import EventEmitter
from mycroft.messagebus.client.ws import WebsocketClient
from mycroft.messagebus.message import Message
from mycroft.skills.core import load_skills, unload_skills
__author__ = 'seanfitz'
class RegistrationOnlyEmitter(object):
def __init__(self):
self.emitter = EventEmitter()
def on(self, event, f):
allow_events_to_execute = True
if allow_events_to_execute:
# don't filter events, just run them all
print "Event: "+str(event)
self.emitter.on(event, f)
else:
# filter to just the registration events,
# preventing them from actually executing
if event in [
'register_intent',
'register_vocab',
'recognizer_loop:utterance'
]:
print "Event: " + str(event)
self.emitter.on(event, f)
def emit(self, event, *args, **kwargs):
event_name = event.type
self.emitter.emit(event_name, event, *args, **kwargs)
def once(self, event, f):
self.emitter.once(event, f)
def remove(self, event_name, func):
pass
class MockSkillsLoader(object):
def __init__(self, skills_root):
self.skills_root = skills_root
self.emitter = RegistrationOnlyEmitter()
from mycroft.skills.intent_service import IntentService
self.ih = IntentService(self.emitter)
def load_skills(self):
self.skills = load_skills(self.emitter, self.skills_root)
self.skills = [s for s in self.skills if s]
return self.emitter.emitter # kick out the underlying emitter
def unload_skills(self):
unload_skills(self.skills)
class SkillTest(object):
def __init__(self, skill, example, emitter):
self.skill = skill
self.example = example
self.emitter = emitter
self.dict = dict
self.output_file = None
self.returned_intent = False
def compare_intents(self, expected, actual):
for key in expected.keys():
if actual.get(key, "").lower() != expected.get(key, "").lower():
print(
"Expected %s: %s, Actual: %s" % (key, expected.get(key),
actual.get(key)))
assert False
def check_speech(self, message):
print "Spoken response: " + message.data['utterance']
# Comparing the expected output and actual spoken response
def run_test(output_file, utterance):
dialog_file = open(output_file, 'r')
dialog_line = [line.rstrip('\n') for line in dialog_file]
match_found = False
for i in range(len(dialog_line)):
if '{{' in dialog_line[i]:
replaced_dialog = re.sub('\{\{(\S+)\}\}',
'.*', dialog_line[i])
m = re.match(replaced_dialog, utterance)
if m is not None:
match_found = True
else:
if dialog_line[i] == utterance:
match_found = True
if match_found is True:
assert True
else:
assert False
dialog_file.close()
run_test(self.output_file, message.data['utterance'])
def run(self, loader):
for s in loader.skills:
if s and s._dir == self.skill:
name = s.name
break
print('file: ' + self.example)
example_json = json.load(open(self.example, 'r'))
event = {'utterances': [example_json.get('utterance')]}
# Extracting the expected output from json file
if "expected_output" in example_json:
output_file = str(example_json.get("expected_output"))
self.output_file = output_file
self.emitter.once('speak', self.check_speech)
else:
pass
def compare(intent):
self.compare_intents(example_json.get('intent'), intent.data)
self.returned_intent = True
self.emitter.once(name + ':' + example_json.get('intent_type'),
compare)
# Emit an utterance, just like the STT engine does. This sends the
# provided text to the skill engine for intent matching and it then
# invokes the skill.
self.emitter.emit(
'recognizer_loop:utterance',
Message('recognizer_loop:utterance', event))
sleep(0.2) # wait for 0.2 seconds
# remove the skill which is not responding
self.emitter.remove_all_listeners('speak')
if not self.returned_intent:
print("No intent handled")
assert False

View File

@ -0,0 +1,59 @@
import unittest
from mycroft.skills.intent_service import IntentService, ContextManager
class MockEmitter(object):
def __init__(self):
self.reset()
def emit(self, message):
self.types.append(message.type)
self.results.append(message.data)
def get_types(self):
return self.types
def get_results(self):
return self.results
def reset(self):
self.types = []
self.results = []
class ContextManagerTest(unittest.TestCase):
emitter = MockEmitter()
def setUp(self):
self.context_manager = ContextManager(3)
def test_add_context(self):
entity = {'confidence': 1.0}
context = 'TestContext'
word = 'TestWord'
print "Adding " + context
entity['data'] = [(word, context)]
entity['match'] = word
entity['key'] = word
self.assertEqual(len(self.context_manager.frame_stack), 0)
self.context_manager.inject_context(entity)
self.assertEqual(len(self.context_manager.frame_stack), 1)
def test_remove_context(self):
entity = {'confidence': 1.0}
context = 'TestContext'
word = 'TestWord'
print "Adding " + context
entity['data'] = [(word, context)]
entity['match'] = word
entity['key'] = word
self.context_manager.inject_context(entity)
self.assertEqual(len(self.context_manager.frame_stack), 1)
self.context_manager.remove_context('TestContext')
self.assertEqual(len(self.context_manager.frame_stack), 0)
if __name__ == '__main__':
unittest.main()

View File

@ -1 +0,0 @@
{"d": {"a": 1, "c": 3, "b": 2}, "int": 42, "float": 4.2, "list": ["batman", 2, true, "superman"], "l": ["a", "b", "c", "d"], "bool": true, "string": "Always carry a towel"}

View File

@ -1,70 +0,0 @@
import json
from pyee import EventEmitter
from mycroft.messagebus.message import Message
from mycroft.skills.core import load_skills, unload_skills
__author__ = 'seanfitz'
class RegistrationOnlyEmitter(object):
def __init__(self):
self.emitter = EventEmitter()
def on(self, event, f):
if event in [
'register_intent',
'register_vocab',
'recognizer_loop:utterance'
]:
self.emitter.on(event, f)
def emit(self, event, *args, **kwargs):
event_name = event.type
self.emitter.emit(event_name, event, *args, **kwargs)
class MockSkillsLoader(object):
def __init__(self, skills_root):
self.skills_root = skills_root
self.emitter = RegistrationOnlyEmitter()
def load_skills(self):
self.skills = load_skills(self.emitter, self.skills_root)
return self.emitter.emitter # kick out the underlying emitter
def unload_skills(self):
unload_skills(self.skills)
class SkillTest(object):
def __init__(self, skill, example, emitter):
self.skill = skill
self.example = example
self.emitter = emitter
self.returned_intent = False
def compare_intents(self, expected, actual):
for key in expected.keys():
if actual.get(key, "").lower() != expected.get(key, "").lower():
print(
"Expected %s: %s, Actual: %s" % (key, expected.get(key),
actual.get(key)))
assert False
def run(self):
example_json = json.load(open(self.example, 'r'))
event = {'utterances': [example_json.get('utterance')]}
def compare(intent):
self.compare_intents(example_json.get('intent'), intent.data)
self.returned_intent = True
self.emitter.once(example_json.get('intent_type'), compare)
self.emitter.emit(
'recognizer_loop:utterance',
Message('recognizer_loop:utterance', event))
if not self.returned_intent:
print("No intent handled")
assert False

View File

@ -66,7 +66,8 @@ class AudioConsumerTest(unittest.TestCase):
def __create_sample_from_test_file(self, sample_name):
root_dir = dirname(dirname(dirname(__file__)))
filename = join(
root_dir, 'test', 'client', 'data', sample_name + '.wav')
root_dir, 'unittests', 'client',
'data', sample_name + '.wav')
wavfile = WavFile(filename)
with wavfile as source:
return AudioData(

View File

@ -0,0 +1 @@
{"l": ["a", "b", "c", "d"]}

View File

View File