Add wait_for_message() method to messagebus client

- Refactor message waiting into a MessageWaiter class to be able to use the
  same code in both wait_for_message and wait_for_response.
- Add some basic unittests
pull/2628/head
Åke Forslund 2020-07-03 08:29:12 +02:00
parent 29f60e6d66
commit b7d709c3c8
3 changed files with 87 additions and 28 deletions

View File

@ -11,4 +11,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .client import MessageBusClient
from .client import MessageBusClient, MessageWaiter

View File

@ -30,6 +30,53 @@ from mycroft.util.log import LOG
from .threaded_event_emitter import ThreadedEventEmitter
class MessageWaiter:
"""Wait for a single message.
Encapsulate the wait for a message logic separating the setup from
the actual waiting act so the waiting can be setuo, actions can be
performed and _then_ the message can be waited for.
Argunments:
bus: Bus to check for messages on
message_type: message type to wait for
"""
def __init__(self, bus, message_type):
self.bus = bus
self.msg_type = message_type
self.received_msg = None
# Setup response handler
self.bus.once(message_type, self._handler)
def _handler(self, message):
"""Receive response data."""
self.received_msg = message
def wait(self, timeout=3.0):
"""Wait for message.
Arguments:
timeout (int or float): seconds to wait for message
Returns:
Message or None
"""
start_time = time.monotonic()
while self.received_msg is None:
time.sleep(0.2)
if time.monotonic() - start_time > timeout:
try:
self.bus.remove(self.msg_type, self._handler)
except (ValueError, KeyError):
# ValueError occurs on pyee 5.0.1 removing handlers
# registered with once.
# KeyError may theoretically occur if the event occurs as
# the handler is removed
pass
break
return self.received_msg
class MessageBusClient:
def __init__(self, host=None, port=None, route=None, ssl=None):
config_overrides = dict(host=host, port=port, route=route, ssl=ssl)
@ -120,6 +167,19 @@ class MessageBusClient:
LOG.warning('Could not send {} message because connection '
'has been closed'.format(message.msg_type))
def wait_for_message(self, message_type, timeout=3.0):
"""Wait for a message of a specific type.
Arguments:
message_type (str): the message type of the expected message
timeout: seconds to wait before timeout, defaults to 3
Returns:
The received message or None if the response timed out
"""
return MessageWaiter(self, message_type).wait(timeout)
def wait_for_response(self, message, reply_type=None, timeout=3.0):
"""Send a message and wait for a response.
@ -132,32 +192,11 @@ class MessageBusClient:
Returns:
The received message or None if the response timed out
"""
response = None
def handler(message):
"""Receive response data."""
nonlocal response
response = message
# Setup response handler
self.once(reply_type or message.msg_type + '.response', handler)
# Send request
message_type = reply_type or message.msg_type + '.response'
waiter = MessageWaiter(self, message_type) # Setup response handler
# Send message and wait for it's response
self.emit(message)
# Wait for response
start_time = time.monotonic()
while response is None:
time.sleep(0.2)
if time.monotonic() - start_time > timeout:
try:
self.remove(reply_type, handler)
except (ValueError, KeyError):
# ValueError occurs on pyee 1.0.1 removing handlers
# registered with once.
# KeyError may theoretically occur if the event occurs as
# the handler is removed
pass
return None
return response
return waiter.wait()
def on(self, event_name, func):
self.emitter.on(event_name, func)

View File

@ -12,9 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from unittest.mock import patch
from unittest import TestCase
from unittest.mock import patch, Mock
from mycroft.messagebus.client import MessageBusClient
from mycroft.messagebus.client import MessageBusClient, MessageWaiter
WS_CONF = {
'websocket': {
@ -37,3 +38,22 @@ class TestMessageBusClient:
def test_create_client(self, mock_conf):
mc = MessageBusClient()
assert mc.client.url == 'ws://testhost:1337/core'
class TestMessageWaiter(TestCase):
def test_message_wait_success(self):
bus = Mock()
waiter = MessageWaiter(bus, 'delayed.message')
bus.once.assert_called_with('delayed.message', waiter._handler)
test_msg = Mock(name='test_msg')
waiter._handler(test_msg) # Inject response
self.assertEqual(waiter.wait(), test_msg)
def test_message_wait_timeout(self):
bus = Mock()
waiter = MessageWaiter(bus, 'delayed.message')
bus.once.assert_called_with('delayed.message', waiter._handler)
self.assertEqual(waiter.wait(0.3), None)