Merge pull request #2792 from MycroftAI/feature/common-bus-connection

Refactor - common bus connection method
pull/2814/head
Kris Gesling 2021-01-18 15:58:37 +09:30 committed by GitHub
commit 303f6a3b95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 66 additions and 116 deletions

View File

@ -16,10 +16,12 @@
This handles playback of audio and speech This handles playback of audio and speech
""" """
from mycroft.configuration import Configuration from mycroft.util import (
from mycroft.messagebus.client import MessageBusClient check_for_signal,
from mycroft.util import reset_sigint_handler, wait_for_exit_signal, \ reset_sigint_handler,
create_daemon, create_echo_function, check_for_signal start_message_bus_client,
wait_for_exit_signal
)
from mycroft.util.log import LOG from mycroft.util.log import LOG
import mycroft.audio.speech as speech import mycroft.audio.speech as speech
@ -39,24 +41,20 @@ def on_stopping():
def main(ready_hook=on_ready, error_hook=on_error, stopping_hook=on_stopping): def main(ready_hook=on_ready, error_hook=on_error, stopping_hook=on_stopping):
""" Main function. Run when file is invoked. """ """Start the Audio Service and connect to the Message Bus"""
LOG.info("Starting Audio Service")
try: try:
reset_sigint_handler() reset_sigint_handler()
check_for_signal("isSpeaking") check_for_signal("isSpeaking")
bus = MessageBusClient() # Connect to the Mycroft Messagebus whitelist = ['mycroft.audio.service']
Configuration.set_config_update_handlers(bus) bus = start_message_bus_client("AUDIO", whitelist=whitelist)
speech.init(bus) speech.init(bus)
LOG.info("Starting Audio Services")
bus.on('message', create_echo_function('AUDIO',
['mycroft.audio.service']))
# Connect audio service instance to message bus # Connect audio service instance to message bus
audio = AudioService(bus) audio = AudioService(bus)
except Exception as e: except Exception as e:
error_hook(e) error_hook(e)
else: else:
create_daemon(bus.run_forever)
if audio.wait_for_load() and len(audio.service) > 0: if audio.wait_for_load() and len(audio.service) > 0:
# If at least one service exists, report ready # If at least one service exists, report ready
ready_hook() ready_hook()

View File

@ -208,13 +208,13 @@ class AudioService:
self.volume_is_low = False self.volume_is_low = False
self._loaded = MonotonicEvent() self._loaded = MonotonicEvent()
bus.once('open', self.load_services_callback) self.load_services()
def load_services_callback(self): def load_services(self):
""" """Method for loading services.
Main callback function for loading services. Sets up the globals
service and default and registers the event handlers for the Sets up the global service, default and registers the event handlers
subsystem. for the subsystem.
""" """
services = load_services(self.config, self.bus) services = load_services(self.config, self.bus)
# Sort services so local services are checked first # Sort services so local services are checked first

View File

@ -19,8 +19,7 @@ from threading import Lock
from mycroft.configuration import Configuration from mycroft.configuration import Configuration
from mycroft.messagebus.client import MessageBusClient from mycroft.messagebus.client import MessageBusClient
from mycroft.services import start_message_bus_client from mycroft.util import create_daemon, start_message_bus_client
from mycroft.util import create_daemon
from mycroft.util.log import LOG from mycroft.util.log import LOG
import json import json

View File

@ -20,10 +20,13 @@ from mycroft.client.speech.listener import RecognizerLoop
from mycroft.configuration import Configuration from mycroft.configuration import Configuration
from mycroft.identity import IdentityManager from mycroft.identity import IdentityManager
from mycroft.lock import Lock as PIDLock # Create/Support PID locking file from mycroft.lock import Lock as PIDLock # Create/Support PID locking file
from mycroft.messagebus.client import MessageBusClient
from mycroft.messagebus.message import Message from mycroft.messagebus.message import Message
from mycroft.util import create_daemon, wait_for_exit_signal, \ from mycroft.util import (
reset_sigint_handler, create_echo_function create_daemon,
reset_sigint_handler,
start_message_bus_client,
wait_for_exit_signal
)
from mycroft.util.log import LOG from mycroft.util.log import LOG
bus = None # Mycroft messagebus connection bus = None # Mycroft messagebus connection
@ -208,7 +211,6 @@ def connect_bus_events(bus):
bus.on('recognizer_loop:audio_output_start', handle_audio_start) bus.on('recognizer_loop:audio_output_start', handle_audio_start)
bus.on('recognizer_loop:audio_output_end', handle_audio_end) bus.on('recognizer_loop:audio_output_end', handle_audio_end)
bus.on('mycroft.stop', handle_stop) bus.on('mycroft.stop', handle_stop)
bus.on('message', create_echo_function('VOICE'))
def main(ready_hook=on_ready, error_hook=on_error, stopping_hook=on_stopping, def main(ready_hook=on_ready, error_hook=on_error, stopping_hook=on_stopping,
@ -219,15 +221,13 @@ def main(ready_hook=on_ready, error_hook=on_error, stopping_hook=on_stopping,
try: try:
reset_sigint_handler() reset_sigint_handler()
PIDLock("voice") PIDLock("voice")
bus = MessageBusClient() # Mycroft messagebus, see mycroft.messagebus
Configuration.set_config_update_handlers(bus)
config = Configuration.get() config = Configuration.get()
bus = start_message_bus_client("VOICE")
connect_bus_events(bus)
# Register handlers on internal RecognizerLoop bus # Register handlers on internal RecognizerLoop bus
loop = RecognizerLoop(watchdog) loop = RecognizerLoop(watchdog)
connect_loop_events(loop) connect_loop_events(loop)
connect_bus_events(bus)
create_daemon(bus.run_forever)
create_daemon(loop.run) create_daemon(loop.run)
except Exception as e: except Exception as e:
error_hook(e) error_hook(e)

View File

@ -1,15 +0,0 @@
# Copyright 2020 Mycroft AI Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mycroft.services.util import start_message_bus_client

View File

@ -1,45 +0,0 @@
# Copyright 2020 Mycroft AI Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from threading import Event
from mycroft.messagebus.client import MessageBusClient
from mycroft.configuration import Configuration
from mycroft.util import create_daemon, create_echo_function
from mycroft.util.log import LOG
def start_message_bus_client(service, bus=None):
"""Start the bus client daemon and wait for connection.
Arguments:
service (str): name of the service starting the connection
bus (MessageBusClient): an instance of the Mycroft MessageBusClient
Returns:
A connected instance of the MessageBusClient
"""
# Create a client if one was not provided
if bus is None:
bus = MessageBusClient()
# Configuration.set_config_update_handlers(bus)
bus_connected = Event()
bus.on('message', create_echo_function(service))
# Set the bus connected event when connection is established
bus.once('open', bus_connected.set)
create_daemon(bus.run_forever)
# Wait for connection
bus_connected.wait()
LOG.info('Connected to messagebus')
return bus

View File

@ -29,13 +29,11 @@ from mycroft.api import is_paired, BackendDown, DeviceApi
from mycroft.audio import wait_while_speaking from mycroft.audio import wait_while_speaking
from mycroft.enclosure.api import EnclosureAPI from mycroft.enclosure.api import EnclosureAPI
from mycroft.configuration import Configuration from mycroft.configuration import Configuration
from mycroft.messagebus.client import MessageBusClient
from mycroft.messagebus.message import Message from mycroft.messagebus.message import Message
from mycroft.util import ( from mycroft.util import (
connected, connected,
create_echo_function,
create_daemon,
reset_sigint_handler, reset_sigint_handler,
start_message_bus_client,
wait_for_exit_signal wait_for_exit_signal
) )
from mycroft.util.lang import set_active_lang from mycroft.util.lang import set_active_lang
@ -194,7 +192,7 @@ def main(ready_hook=on_ready, error_hook=on_error, stopping_hook=on_stopping,
set_active_lang(config.get('lang', 'en-us')) set_active_lang(config.get('lang', 'en-us'))
# Connect this process to the Mycroft message bus # Connect this process to the Mycroft message bus
bus = _start_message_bus_client() bus = start_message_bus_client("SKILLS")
_register_intent_services(bus) _register_intent_services(bus)
event_scheduler = EventScheduler(bus) event_scheduler = EventScheduler(bus)
skill_manager = _initialize_skill_manager(bus, watchdog) skill_manager = _initialize_skill_manager(bus, watchdog)
@ -215,23 +213,6 @@ def main(ready_hook=on_ready, error_hook=on_error, stopping_hook=on_stopping,
shutdown(skill_manager, event_scheduler) shutdown(skill_manager, event_scheduler)
def _start_message_bus_client():
"""Start the bus client daemon and wait for connection."""
bus = MessageBusClient()
Configuration.set_config_update_handlers(bus)
bus_connected = Event()
bus.on('message', create_echo_function('SKILLS'))
# Set the bus connected event when connection is established
bus.once('open', bus_connected.set)
create_daemon(bus.run_forever)
# Wait for connection
bus_connected.wait()
LOG.info('Connected to messagebus')
return bus
def _register_intent_services(bus): def _register_intent_services(bus):
"""Start up the all intent services and connect them as needed. """Start up the all intent services and connect them as needed.

View File

@ -30,7 +30,8 @@ from .file_utils import (resolve_resource_file, read_stripped_lines, read_dict,
curate_cache, get_cache_directory) curate_cache, get_cache_directory)
from .network_utils import connected from .network_utils import connected
from .process_utils import (reset_sigint_handler, create_daemon, from .process_utils import (reset_sigint_handler, create_daemon,
wait_for_exit_signal, create_echo_function) wait_for_exit_signal, create_echo_function,
start_message_bus_client)
from .log import LOG from .log import LOG
from .parse import extract_datetime, extract_number, normalize from .parse import extract_datetime, extract_number, normalize
from .signal import check_for_signal, create_signal, get_ipc_directory from .signal import check_for_signal, create_signal, get_ipc_directory

View File

@ -1,7 +1,7 @@
import json import json
import logging import logging
import signal as sig import signal as sig
from threading import Thread from threading import Event, Thread
from time import sleep from time import sleep
from .log import LOG from .log import LOG
@ -121,3 +121,34 @@ def create_echo_function(name, whitelist=None):
# Listen for messages and echo them for logging # Listen for messages and echo them for logging
LOG(name).info("BUS: {}".format(message)) LOG(name).info("BUS: {}".format(message))
return echo return echo
def start_message_bus_client(service, bus=None, whitelist=None):
"""Start the bus client daemon and wait for connection.
Arguments:
service (str): name of the service starting the connection
bus (MessageBusClient): an instance of the Mycroft MessageBusClient
whitelist (list, optional): List of "type" strings. If defined, only
messages in this list will be logged.
Returns:
A connected instance of the MessageBusClient
"""
# Local imports to avoid circular importing
from mycroft.messagebus.client import MessageBusClient
from mycroft.configuration import Configuration
# Create a client if one was not provided
if bus is None:
bus = MessageBusClient()
Configuration.set_config_update_handlers(bus)
bus_connected = Event()
bus.on('message', create_echo_function(service, whitelist))
# Set the bus connected event when connection is established
bus.once('open', bus_connected.set)
create_daemon(bus.run_forever)
# Wait for connection
bus_connected.wait()
LOG.info('Connected to messagebus')
return bus

View File

@ -75,7 +75,7 @@ class TestService(unittest.TestCase):
backend, second_backend = setup_mock_backends(mock_load_services, backend, second_backend = setup_mock_backends(mock_load_services,
self.emitter) self.emitter)
service = audio_service.AudioService(self.emitter) service = audio_service.AudioService(self.emitter)
service.load_services_callback() service.load_services()
service.default = backend service.default = backend
@ -91,7 +91,7 @@ class TestService(unittest.TestCase):
backend, second_backend = setup_mock_backends(mock_load_services, backend, second_backend = setup_mock_backends(mock_load_services,
self.emitter) self.emitter)
service = audio_service.AudioService(self.emitter) service = audio_service.AudioService(self.emitter)
service.load_services_callback() service.load_services()
service.default = backend service.default = backend
self.emitter.reset() self.emitter.reset()
@ -112,7 +112,7 @@ class TestService(unittest.TestCase):
mock_load_services.return_value = [backend, second_backend] mock_load_services.return_value = [backend, second_backend]
service = audio_service.AudioService(self.emitter) service = audio_service.AudioService(self.emitter)
service.load_services_callback() service.load_services()
service.default = backend service.default = backend
@ -145,7 +145,7 @@ class TestService(unittest.TestCase):
mock_load_services.return_value = [backend, second_backend] mock_load_services.return_value = [backend, second_backend]
service = audio_service.AudioService(self.emitter) service = audio_service.AudioService(self.emitter)
service.load_services_callback() service.load_services()
service.default = backend service.default = backend
@ -207,7 +207,7 @@ class TestService(unittest.TestCase):
mock_load_services.return_value = [backend, second_backend] mock_load_services.return_value = [backend, second_backend]
service = audio_service.AudioService(self.emitter) service = audio_service.AudioService(self.emitter)
service.load_services_callback() service.load_services()
service.default = backend service.default = backend