Repair flaky and broken stream tests in test_hls.py, and turn back on (#45025)

* Unmark tests as flaky (though still flaky)

This put tests into the broken state where they are flaky and do not yet pass

* Fix bug in test_hls_stream with incorrect path

* Enable and de-flake HLS stream tests

Background: Tests encode a fake video them start a stream to be decoded. Test
assert on the decoded segments, however there is a race with the stream worker
which can finish decoding first, and end the stream which ereases all buffers.

Breadown of fixes:
- Fix the race conditions by adding synchronization points right before the
  stream is finalized.
- Refactor StreamOutput.put so that a patch() can block the worker
  thread.  Previously, the put call would happen in the event loop which was
  not safe to block. This is a bit of a hack, but it is the simplist possible
  code change to add this synchronization and arguably provides slightly better
  separation of responsibilities from the worker anyway.
- Fix bugs in the tests that make them not pass, likely due to changes
  introduced while the tests were disabled
- Fix case where the HLS stream view recv() call returns None, indicating
  the worker finished while the request was waiting.

The tests were previously failing anywhere from 2-5% of the time on a lightly
loaded machine doing 1k iterations.  Now, have 0% flake rate.  Tested with:
$ py.test --count=1000 tests/components/strema/test_hls.py
pull/45049/head
Allen Porter 2021-01-11 05:34:45 -08:00 committed by GitHub
parent ed4e8cdbc5
commit 65e3661f88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 81 additions and 14 deletions

View File

@ -117,9 +117,13 @@ class StreamOutput:
self._cursor = segment.sequence
return segment
@callback
def put(self, segment: Segment) -> None:
"""Store output."""
self._stream.hass.loop.call_soon_threadsafe(self._async_put, segment)
@callback
def _async_put(self, segment: Segment) -> None:
"""Store output from event loop."""
# Start idle timeout when we start receiving data
if self._unsub is None:
self._unsub = async_call_later(

View File

@ -52,7 +52,8 @@ class HlsMasterPlaylistView(StreamView):
stream.start()
# Wait for a segment to be ready
if not track.segments:
await track.recv()
if not await track.recv():
return web.HTTPNotFound()
headers = {"Content-Type": FORMAT_CONTENT_TYPE["hls"]}
return web.Response(body=self.render(track).encode("utf-8"), headers=headers)
@ -105,7 +106,8 @@ class HlsPlaylistView(StreamView):
stream.start()
# Wait for a segment to be ready
if not track.segments:
await track.recv()
if not await track.recv():
return web.HTTPNotFound()
headers = {"Content-Type": FORMAT_CONTENT_TYPE["hls"]}
return web.Response(body=self.render(track).encode("utf-8"), headers=headers)

View File

@ -224,7 +224,7 @@ def _stream_worker_internal(hass, stream, quit_event):
if not stream.keepalive:
# End of stream, clear listeners and stop thread
for fmt in stream.outputs:
hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None)
stream.outputs[fmt].put(None)
if not peek_first_pts():
container.close()
@ -275,8 +275,7 @@ def _stream_worker_internal(hass, stream, quit_event):
for fmt, (buffer, _) in outputs.items():
buffer.output.close()
if stream.outputs.get(fmt):
hass.loop.call_soon_threadsafe(
stream.outputs[fmt].put,
stream.outputs[fmt].put(
Segment(
sequence,
buffer.segment,

View File

@ -1,5 +1,16 @@
"""The tests for hls streams."""
"""The tests for hls streams.
The tests encode stream (as an h264 video), then load the stream and verify
that it is decoded properly. The background worker thread responsible for
decoding will decode the stream as fast as possible, and when completed
clears all output buffers. This can be a problem for the test that wishes
to retrieve and verify decoded segments. If the worker finishes first, there is
nothing for the test to verify. The solution is the WorkerSync class that
allows the tests to pause the worker thread before finalizing the stream
so that it can inspect the output.
"""
from datetime import timedelta
import threading
from unittest.mock import patch
from urllib.parse import urlparse
@ -7,6 +18,7 @@ import av
import pytest
from homeassistant.components.stream import request_stream
from homeassistant.components.stream.core import Segment, StreamOutput
from homeassistant.const import HTTP_NOT_FOUND
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
@ -15,8 +27,47 @@ from tests.common import async_fire_time_changed
from tests.components.stream.common import generate_h264_video, preload_stream
@pytest.mark.skip("Flaky in CI")
async def test_hls_stream(hass, hass_client):
class WorkerSync:
"""Test fixture that intercepts stream worker calls to StreamOutput."""
def __init__(self):
"""Initialize WorkerSync."""
self._event = None
self._put_original = StreamOutput.put
def pause(self):
"""Pause the worker before it finalizes the stream."""
self._event = threading.Event()
def resume(self):
"""Allow the worker thread to finalize the stream."""
self._event.set()
def blocking_put(self, stream_output: StreamOutput, segment: Segment):
"""Proxy StreamOutput.put, intercepted for test to pause worker."""
if segment is None and self._event:
# Worker is ending the stream, which clears all output buffers.
# Block the worker thread until the test has a chance to verify
# the segments under test.
self._event.wait()
# Forward to actual StreamOutput.put
self._put_original(stream_output, segment)
@pytest.fixture()
def worker_sync(hass):
"""Patch StreamOutput to allow test to synchronize worker stream end."""
sync = WorkerSync()
with patch(
"homeassistant.components.stream.core.StreamOutput.put",
side_effect=sync.blocking_put,
autospec=True,
):
yield sync
async def test_hls_stream(hass, hass_client, worker_sync):
"""
Test hls stream.
@ -25,6 +76,8 @@ async def test_hls_stream(hass, hass_client):
"""
await async_setup_component(hass, "stream", {"stream": {}})
worker_sync.pause()
# Setup demo HLS track
source = generate_h264_video()
stream = preload_stream(hass, source)
@ -50,10 +103,12 @@ async def test_hls_stream(hass, hass_client):
# Fetch segment
playlist = await playlist_response.text()
playlist_url = "/".join(parsed_url.path.split("/")[:-1])
segment_url = playlist_url + playlist.splitlines()[-1][1:]
segment_url = playlist_url + "/" + playlist.splitlines()[-1]
segment_response = await http_client.get(segment_url)
assert segment_response.status == 200
worker_sync.resume()
# Stop stream, if it hasn't quit already
stream.stop()
@ -62,11 +117,12 @@ async def test_hls_stream(hass, hass_client):
assert fail_response.status == HTTP_NOT_FOUND
@pytest.mark.skip("Flaky in CI")
async def test_stream_timeout(hass, hass_client):
async def test_stream_timeout(hass, hass_client, worker_sync):
"""Test hls stream timeout."""
await async_setup_component(hass, "stream", {"stream": {}})
worker_sync.pause()
# Setup demo HLS track
source = generate_h264_video()
stream = preload_stream(hass, source)
@ -90,6 +146,8 @@ async def test_stream_timeout(hass, hass_client):
playlist_response = await http_client.get(parsed_url.path)
assert playlist_response.status == 200
worker_sync.resume()
# Wait 5 minutes
future = dt_util.utcnow() + timedelta(minutes=5)
async_fire_time_changed(hass, future)
@ -99,11 +157,12 @@ async def test_stream_timeout(hass, hass_client):
assert fail_response.status == HTTP_NOT_FOUND
@pytest.mark.skip("Flaky in CI")
async def test_stream_ended(hass):
async def test_stream_ended(hass, worker_sync):
"""Test hls stream packets ended."""
await async_setup_component(hass, "stream", {"stream": {}})
worker_sync.pause()
# Setup demo HLS track
source = generate_h264_video()
stream = preload_stream(hass, source)
@ -118,6 +177,9 @@ async def test_stream_ended(hass):
if segment is None:
break
segments = segment.sequence
# Allow worker to finalize once enough of the stream is been consumed
if segments > 1:
worker_sync.resume()
assert segments > 1
assert not track.get_segment()