diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py
index 20931abf11e..5158ba185b1 100644
--- a/homeassistant/components/stream/core.py
+++ b/homeassistant/components/stream/core.py
@@ -117,9 +117,13 @@ class StreamOutput:
         self._cursor = segment.sequence
         return segment
 
-    @callback
     def put(self, segment: Segment) -> None:
         """Store output."""
+        self._stream.hass.loop.call_soon_threadsafe(self._async_put, segment)
+
+    @callback
+    def _async_put(self, segment: Segment) -> None:
+        """Store output from event loop."""
         # Start idle timeout when we start receiving data
         if self._unsub is None:
             self._unsub = async_call_later(
diff --git a/homeassistant/components/stream/hls.py b/homeassistant/components/stream/hls.py
index 92801c4807f..2b305442b80 100644
--- a/homeassistant/components/stream/hls.py
+++ b/homeassistant/components/stream/hls.py
@@ -52,7 +52,8 @@ class HlsMasterPlaylistView(StreamView):
         stream.start()
         # Wait for a segment to be ready
         if not track.segments:
-            await track.recv()
+            if not await track.recv():
+                return web.HTTPNotFound()
         headers = {"Content-Type": FORMAT_CONTENT_TYPE["hls"]}
         return web.Response(body=self.render(track).encode("utf-8"), headers=headers)
 
@@ -105,7 +106,8 @@ class HlsPlaylistView(StreamView):
         stream.start()
         # Wait for a segment to be ready
         if not track.segments:
-            await track.recv()
+            if not await track.recv():
+                return web.HTTPNotFound()
         headers = {"Content-Type": FORMAT_CONTENT_TYPE["hls"]}
         return web.Response(body=self.render(track).encode("utf-8"), headers=headers)
 
diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py
index 68cbbc79726..cccbfd1b48b 100644
--- a/homeassistant/components/stream/worker.py
+++ b/homeassistant/components/stream/worker.py
@@ -224,7 +224,7 @@ def _stream_worker_internal(hass, stream, quit_event):
         if not stream.keepalive:
             # End of stream, clear listeners and stop thread
             for fmt in stream.outputs:
-                hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None)
+                stream.outputs[fmt].put(None)
 
     if not peek_first_pts():
         container.close()
@@ -275,8 +275,7 @@ def _stream_worker_internal(hass, stream, quit_event):
                 for fmt, (buffer, _) in outputs.items():
                     buffer.output.close()
                     if stream.outputs.get(fmt):
-                        hass.loop.call_soon_threadsafe(
-                            stream.outputs[fmt].put,
+                        stream.outputs[fmt].put(
                             Segment(
                                 sequence,
                                 buffer.segment,
diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py
index 3af48cb580d..e19a3b96687 100644
--- a/tests/components/stream/test_hls.py
+++ b/tests/components/stream/test_hls.py
@@ -1,5 +1,16 @@
-"""The tests for hls streams."""
+"""The tests for hls streams.
+
+The tests encode stream (as an h264 video), then load the stream and verify
+that it is decoded properly. The background worker thread responsible for
+decoding will decode the stream as fast as possible, and when completed
+clears all output buffers. This can be a problem for the test that wishes
+to retrieve and verify decoded segments. If the worker finishes first, there is
+nothing for the test to verify. The solution is the WorkerSync class that
+allows the tests to pause the worker thread before finalizing the stream
+so that it can inspect the output.
+"""
 from datetime import timedelta
+import threading
 from unittest.mock import patch
 from urllib.parse import urlparse
 
@@ -7,6 +18,7 @@ import av
 import pytest
 
 from homeassistant.components.stream import request_stream
+from homeassistant.components.stream.core import Segment, StreamOutput
 from homeassistant.const import HTTP_NOT_FOUND
 from homeassistant.setup import async_setup_component
 import homeassistant.util.dt as dt_util
@@ -15,8 +27,47 @@ from tests.common import async_fire_time_changed
 from tests.components.stream.common import generate_h264_video, preload_stream
 
 
-@pytest.mark.skip("Flaky in CI")
-async def test_hls_stream(hass, hass_client):
+class WorkerSync:
+    """Test fixture that intercepts stream worker calls to StreamOutput."""
+
+    def __init__(self):
+        """Initialize WorkerSync."""
+        self._event = None
+        self._put_original = StreamOutput.put
+
+    def pause(self):
+        """Pause the worker before it finalizes the stream."""
+        self._event = threading.Event()
+
+    def resume(self):
+        """Allow the worker thread to finalize the stream."""
+        self._event.set()
+
+    def blocking_put(self, stream_output: StreamOutput, segment: Segment):
+        """Proxy StreamOutput.put, intercepted for test to pause worker."""
+        if segment is None and self._event:
+            # Worker is ending the stream, which clears all output buffers.
+            # Block the worker thread until the test has a chance to verify
+            # the segments under test.
+            self._event.wait()
+
+        # Forward to actual StreamOutput.put
+        self._put_original(stream_output, segment)
+
+
+@pytest.fixture()
+def worker_sync(hass):
+    """Patch StreamOutput to allow test to synchronize worker stream end."""
+    sync = WorkerSync()
+    with patch(
+        "homeassistant.components.stream.core.StreamOutput.put",
+        side_effect=sync.blocking_put,
+        autospec=True,
+    ):
+        yield sync
+
+
+async def test_hls_stream(hass, hass_client, worker_sync):
     """
     Test hls stream.
 
@@ -25,6 +76,8 @@ async def test_hls_stream(hass, hass_client):
     """
     await async_setup_component(hass, "stream", {"stream": {}})
 
+    worker_sync.pause()
+
     # Setup demo HLS track
     source = generate_h264_video()
     stream = preload_stream(hass, source)
@@ -50,10 +103,12 @@ async def test_hls_stream(hass, hass_client):
     # Fetch segment
     playlist = await playlist_response.text()
     playlist_url = "/".join(parsed_url.path.split("/")[:-1])
-    segment_url = playlist_url + playlist.splitlines()[-1][1:]
+    segment_url = playlist_url + "/" + playlist.splitlines()[-1]
     segment_response = await http_client.get(segment_url)
     assert segment_response.status == 200
 
+    worker_sync.resume()
+
     # Stop stream, if it hasn't quit already
     stream.stop()
 
@@ -62,11 +117,12 @@ async def test_hls_stream(hass, hass_client):
     assert fail_response.status == HTTP_NOT_FOUND
 
 
-@pytest.mark.skip("Flaky in CI")
-async def test_stream_timeout(hass, hass_client):
+async def test_stream_timeout(hass, hass_client, worker_sync):
     """Test hls stream timeout."""
     await async_setup_component(hass, "stream", {"stream": {}})
 
+    worker_sync.pause()
+
     # Setup demo HLS track
     source = generate_h264_video()
     stream = preload_stream(hass, source)
@@ -90,6 +146,8 @@ async def test_stream_timeout(hass, hass_client):
     playlist_response = await http_client.get(parsed_url.path)
     assert playlist_response.status == 200
 
+    worker_sync.resume()
+
     # Wait 5 minutes
     future = dt_util.utcnow() + timedelta(minutes=5)
     async_fire_time_changed(hass, future)
@@ -99,11 +157,12 @@ async def test_stream_timeout(hass, hass_client):
     assert fail_response.status == HTTP_NOT_FOUND
 
 
-@pytest.mark.skip("Flaky in CI")
-async def test_stream_ended(hass):
+async def test_stream_ended(hass, worker_sync):
     """Test hls stream packets ended."""
     await async_setup_component(hass, "stream", {"stream": {}})
 
+    worker_sync.pause()
+
     # Setup demo HLS track
     source = generate_h264_video()
     stream = preload_stream(hass, source)
@@ -118,6 +177,9 @@ async def test_stream_ended(hass):
         if segment is None:
             break
         segments = segment.sequence
+        # Allow worker to finalize once enough of the stream is been consumed
+        if segments > 1:
+            worker_sync.resume()
 
     assert segments > 1
     assert not track.get_segment()