Improve stream playback on high latency cameras (#72547)

* Disable LL-HLS for HLS sources
* Add extra wait for Nest cameras
pull/73604/head
uvjustin 2022-06-17 01:48:52 +10:00 committed by GitHub
parent 9687aab802
commit 01a4a83bab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 145 additions and 69 deletions

View File

@ -454,7 +454,7 @@ class Camera(Entity):
def __init__(self) -> None:
"""Initialize a camera."""
self.stream: Stream | None = None
self.stream_options: dict[str, str | bool] = {}
self.stream_options: dict[str, str | bool | float] = {}
self.content_type: str = DEFAULT_CONTENT_TYPE
self.access_tokens: collections.deque = collections.deque([], 2)
self._warned_old_signature = False

View File

@ -209,7 +209,7 @@ async def async_test_stream(hass, info) -> dict[str, str]:
except TemplateError as err:
_LOGGER.warning("Problem rendering template %s: %s", stream_source, err)
return {CONF_STREAM_SOURCE: "template_error"}
stream_options: dict[str, bool | str] = {}
stream_options: dict[str, str | bool | float] = {}
if rtsp_transport := info.get(CONF_RTSP_TRANSPORT):
stream_options[CONF_RTSP_TRANSPORT] = rtsp_transport
if info.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS):

View File

@ -20,6 +20,7 @@ from google_nest_sdm.exceptions import ApiException
from homeassistant.components.camera import Camera, CameraEntityFeature
from homeassistant.components.camera.const import StreamType
from homeassistant.components.stream import CONF_EXTRA_PART_WAIT_TIME
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
@ -67,6 +68,7 @@ class NestCamera(Camera):
self._create_stream_url_lock = asyncio.Lock()
self._stream_refresh_unsub: Callable[[], None] | None = None
self._attr_is_streaming = CameraLiveStreamTrait.NAME in self._device.traits
self.stream_options[CONF_EXTRA_PART_WAIT_TIME] = 3
@property
def should_poll(self) -> bool:

View File

@ -18,6 +18,7 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable, Mapping
import copy
import logging
import re
import secrets
@ -38,6 +39,7 @@ from .const import (
ATTR_ENDPOINTS,
ATTR_SETTINGS,
ATTR_STREAMS,
CONF_EXTRA_PART_WAIT_TIME,
CONF_LL_HLS,
CONF_PART_DURATION,
CONF_RTSP_TRANSPORT,
@ -62,8 +64,11 @@ from .diagnostics import Diagnostics
from .hls import HlsStreamOutput, async_setup_hls
__all__ = [
"ATTR_SETTINGS",
"CONF_EXTRA_PART_WAIT_TIME",
"CONF_RTSP_TRANSPORT",
"CONF_USE_WALLCLOCK_AS_TIMESTAMPS",
"DOMAIN",
"FORMAT_CONTENT_TYPE",
"HLS_PROVIDER",
"OUTPUT_FORMATS",
@ -91,7 +96,7 @@ def redact_credentials(data: str) -> str:
def create_stream(
hass: HomeAssistant,
stream_source: str,
options: dict[str, str | bool],
options: Mapping[str, str | bool | float],
stream_label: str | None = None,
) -> Stream:
"""Create a stream with the specified identfier based on the source url.
@ -101,11 +106,35 @@ def create_stream(
The stream_label is a string used as an additional message in logging.
"""
def convert_stream_options(
hass: HomeAssistant, stream_options: Mapping[str, str | bool | float]
) -> tuple[dict[str, str], StreamSettings]:
"""Convert options from stream options into PyAV options and stream settings."""
stream_settings = copy.copy(hass.data[DOMAIN][ATTR_SETTINGS])
pyav_options: dict[str, str] = {}
try:
STREAM_OPTIONS_SCHEMA(stream_options)
except vol.Invalid as exc:
raise HomeAssistantError("Invalid stream options") from exc
if extra_wait_time := stream_options.get(CONF_EXTRA_PART_WAIT_TIME):
stream_settings.hls_part_timeout += extra_wait_time
if rtsp_transport := stream_options.get(CONF_RTSP_TRANSPORT):
assert isinstance(rtsp_transport, str)
# The PyAV options currently match the stream CONF constants, but this
# will not necessarily always be the case, so they are hard coded here
pyav_options["rtsp_transport"] = rtsp_transport
if stream_options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS):
pyav_options["use_wallclock_as_timestamps"] = "1"
return pyav_options, stream_settings
if DOMAIN not in hass.config.components:
raise HomeAssistantError("Stream integration is not set up.")
# Convert extra stream options into PyAV options
pyav_options = convert_stream_options(options)
# Convert extra stream options into PyAV options and stream settings
pyav_options, stream_settings = convert_stream_options(hass, options)
# For RTSP streams, prefer TCP
if isinstance(stream_source, str) and stream_source[:7] == "rtsp://":
pyav_options = {
@ -115,7 +144,11 @@ def create_stream(
}
stream = Stream(
hass, stream_source, options=pyav_options, stream_label=stream_label
hass,
stream_source,
pyav_options=pyav_options,
stream_settings=stream_settings,
stream_label=stream_label,
)
hass.data[DOMAIN][ATTR_STREAMS].append(stream)
return stream
@ -230,13 +263,15 @@ class Stream:
self,
hass: HomeAssistant,
source: str,
options: dict[str, str],
pyav_options: dict[str, str],
stream_settings: StreamSettings,
stream_label: str | None = None,
) -> None:
"""Initialize a stream."""
self.hass = hass
self.source = source
self.options = options
self.pyav_options = pyav_options
self._stream_settings = stream_settings
self._stream_label = stream_label
self.keepalive = False
self.access_token: str | None = None
@ -284,7 +319,9 @@ class Stream:
self.check_idle()
provider = PROVIDERS[fmt](
self.hass, IdleTimer(self.hass, timeout, idle_callback)
self.hass,
IdleTimer(self.hass, timeout, idle_callback),
self._stream_settings,
)
self._outputs[fmt] = provider
@ -368,7 +405,8 @@ class Stream:
try:
stream_worker(
self.source,
self.options,
self.pyav_options,
self._stream_settings,
stream_state,
self._keyframe_converter,
self._thread_quit,
@ -507,22 +545,6 @@ STREAM_OPTIONS_SCHEMA: Final = vol.Schema(
{
vol.Optional(CONF_RTSP_TRANSPORT): vol.In(RTSP_TRANSPORTS),
vol.Optional(CONF_USE_WALLCLOCK_AS_TIMESTAMPS): bool,
vol.Optional(CONF_EXTRA_PART_WAIT_TIME): cv.positive_float,
}
)
def convert_stream_options(stream_options: dict[str, str | bool]) -> dict[str, str]:
"""Convert options from stream options into PyAV options."""
pyav_options: dict[str, str] = {}
try:
STREAM_OPTIONS_SCHEMA(stream_options)
except vol.Invalid as exc:
raise HomeAssistantError("Invalid stream options") from exc
if rtsp_transport := stream_options.get(CONF_RTSP_TRANSPORT):
assert isinstance(rtsp_transport, str)
pyav_options["rtsp_transport"] = rtsp_transport
if stream_options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS):
pyav_options["use_wallclock_as_timestamps"] = "1"
return pyav_options

View File

@ -53,3 +53,4 @@ RTSP_TRANSPORTS = {
"http": "HTTP",
}
CONF_USE_WALLCLOCK_AS_TIMESTAMPS = "use_wallclock_as_timestamps"
CONF_EXTRA_PART_WAIT_TIME = "extra_part_wait_time"

View File

@ -118,6 +118,10 @@ class Segment:
if self.hls_playlist_complete:
return self.hls_playlist_template[0]
if not self.hls_playlist_template:
# Logically EXT-X-DISCONTINUITY makes sense above the parts, but Apple's
# media stream validator seems to only want it before the segment
if last_stream_id != self.stream_id:
self.hls_playlist_template.append("#EXT-X-DISCONTINUITY")
# This is a placeholder where the rendered parts will be inserted
self.hls_playlist_template.append("{}")
if render_parts:
@ -133,22 +137,19 @@ class Segment:
# the first element to avoid an extra newline when we don't render any parts.
# Append an empty string to create a trailing newline when we do render parts
self.hls_playlist_parts.append("")
self.hls_playlist_template = []
# Logically EXT-X-DISCONTINUITY would make sense above the parts, but Apple's
# media stream validator seems to only want it before the segment
if last_stream_id != self.stream_id:
self.hls_playlist_template.append("#EXT-X-DISCONTINUITY")
self.hls_playlist_template = (
[] if last_stream_id == self.stream_id else ["#EXT-X-DISCONTINUITY"]
)
# Add the remaining segment metadata
# The placeholder goes on the same line as the next element
self.hls_playlist_template.extend(
[
"#EXT-X-PROGRAM-DATE-TIME:"
"{}#EXT-X-PROGRAM-DATE-TIME:"
+ self.start_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
+ "Z",
f"#EXTINF:{self.duration:.3f},\n./segment/{self.sequence}.m4s",
]
)
# The placeholder now goes on the same line as the first element
self.hls_playlist_template[0] = "{}" + self.hls_playlist_template[0]
# Store intermediate playlist data in member variables for reuse
self.hls_playlist_template = ["\n".join(self.hls_playlist_template)]
@ -237,11 +238,13 @@ class StreamOutput:
self,
hass: HomeAssistant,
idle_timer: IdleTimer,
stream_settings: StreamSettings,
deque_maxlen: int | None = None,
) -> None:
"""Initialize a stream output."""
self._hass = hass
self.idle_timer = idle_timer
self.stream_settings = stream_settings
self._event = asyncio.Event()
self._part_event = asyncio.Event()
self._segments: deque[Segment] = deque(maxlen=deque_maxlen)

View File

@ -9,8 +9,6 @@ from aiohttp import web
from homeassistant.core import HomeAssistant, callback
from .const import (
ATTR_SETTINGS,
DOMAIN,
EXT_X_START_LL_HLS,
EXT_X_START_NON_LL_HLS,
FORMAT_CONTENT_TYPE,
@ -47,11 +45,15 @@ def async_setup_hls(hass: HomeAssistant) -> str:
class HlsStreamOutput(StreamOutput):
"""Represents HLS Output formats."""
def __init__(self, hass: HomeAssistant, idle_timer: IdleTimer) -> None:
def __init__(
self,
hass: HomeAssistant,
idle_timer: IdleTimer,
stream_settings: StreamSettings,
) -> None:
"""Initialize HLS output."""
super().__init__(hass, idle_timer, deque_maxlen=MAX_SEGMENTS)
self.stream_settings: StreamSettings = hass.data[DOMAIN][ATTR_SETTINGS]
self._target_duration = self.stream_settings.min_segment_duration
super().__init__(hass, idle_timer, stream_settings, deque_maxlen=MAX_SEGMENTS)
self._target_duration = stream_settings.min_segment_duration
@property
def name(self) -> str:
@ -78,14 +80,20 @@ class HlsStreamOutput(StreamOutput):
)
def discontinuity(self) -> None:
"""Remove incomplete segment from deque."""
"""Fix incomplete segment at end of deque."""
self._hass.loop.call_soon_threadsafe(self._async_discontinuity)
@callback
def _async_discontinuity(self) -> None:
"""Remove incomplete segment from deque in event loop."""
if self._segments and not self._segments[-1].complete:
self._segments.pop()
"""Fix incomplete segment at end of deque in event loop."""
# Fill in the segment duration or delete the segment if empty
if self._segments:
if (last_segment := self._segments[-1]).parts:
last_segment.duration = sum(
part.duration for part in last_segment.parts
)
else:
self._segments.pop()
class HlsMasterPlaylistView(StreamView):

View File

@ -17,7 +17,7 @@ from .const import (
RECORDER_PROVIDER,
SEGMENT_CONTAINER_FORMAT,
)
from .core import PROVIDERS, IdleTimer, Segment, StreamOutput
from .core import PROVIDERS, IdleTimer, Segment, StreamOutput, StreamSettings
_LOGGER = logging.getLogger(__name__)
@ -121,9 +121,14 @@ def recorder_save_worker(file_out: str, segments: deque[Segment]) -> None:
class RecorderOutput(StreamOutput):
"""Represents HLS Output formats."""
def __init__(self, hass: HomeAssistant, idle_timer: IdleTimer) -> None:
def __init__(
self,
hass: HomeAssistant,
idle_timer: IdleTimer,
stream_settings: StreamSettings,
) -> None:
"""Initialize recorder output."""
super().__init__(hass, idle_timer)
super().__init__(hass, idle_timer, stream_settings)
self.video_path: str
@property

View File

@ -16,9 +16,7 @@ from homeassistant.core import HomeAssistant
from . import redact_credentials
from .const import (
ATTR_SETTINGS,
AUDIO_CODECS,
DOMAIN,
HLS_PROVIDER,
MAX_MISSING_DTS,
MAX_TIMESTAMP_GAP,
@ -87,7 +85,7 @@ class StreamState:
# simple to check for discontinuity at output time, and to determine
# the discontinuity sequence number.
self._stream_id += 1
# Call discontinuity to remove incomplete segment from the HLS output
# Call discontinuity to fix incomplete segment in HLS output
if hls_output := self._outputs_callback().get(HLS_PROVIDER):
cast(HlsStreamOutput, hls_output).discontinuity()
@ -111,6 +109,7 @@ class StreamMuxer:
video_stream: av.video.VideoStream,
audio_stream: av.audio.stream.AudioStream | None,
stream_state: StreamState,
stream_settings: StreamSettings,
) -> None:
"""Initialize StreamMuxer."""
self._hass = hass
@ -126,7 +125,7 @@ class StreamMuxer:
self._memory_file_pos: int = cast(int, None)
self._part_start_dts: int = cast(int, None)
self._part_has_keyframe = False
self._stream_settings: StreamSettings = hass.data[DOMAIN][ATTR_SETTINGS]
self._stream_settings = stream_settings
self._stream_state = stream_state
self._start_time = datetime.datetime.utcnow()
@ -445,19 +444,20 @@ def unsupported_audio(packets: Iterator[av.Packet], audio_stream: Any) -> bool:
def stream_worker(
source: str,
options: dict[str, str],
pyav_options: dict[str, str],
stream_settings: StreamSettings,
stream_state: StreamState,
keyframe_converter: KeyFrameConverter,
quit_event: Event,
) -> None:
"""Handle consuming streams."""
if av.library_versions["libavformat"][0] >= 59 and "stimeout" in options:
if av.library_versions["libavformat"][0] >= 59 and "stimeout" in pyav_options:
# the stimeout option was renamed to timeout as of ffmpeg 5.0
options["timeout"] = options["stimeout"]
del options["stimeout"]
pyav_options["timeout"] = pyav_options["stimeout"]
del pyav_options["stimeout"]
try:
container = av.open(source, options=options, timeout=SOURCE_TIMEOUT)
container = av.open(source, options=pyav_options, timeout=SOURCE_TIMEOUT)
except av.AVError as err:
raise StreamWorkerError(
f"Error opening stream ({err.type}, {err.strerror}) {redact_credentials(str(source))}"
@ -480,6 +480,9 @@ def stream_worker(
# Some audio streams do not have a profile and throw errors when remuxing
if audio_stream and audio_stream.profile is None:
audio_stream = None
# Disable ll-hls for hls inputs
if container.format.name == "hls":
stream_settings.ll_hls = False
stream_state.diagnostics.set_value("container_format", container.format.name)
stream_state.diagnostics.set_value("video_codec", video_stream.name)
if audio_stream:
@ -535,7 +538,9 @@ def stream_worker(
"Error demuxing stream while finding first packet: %s" % str(ex)
) from ex
muxer = StreamMuxer(stream_state.hass, video_stream, audio_stream, stream_state)
muxer = StreamMuxer(
stream_state.hass, video_stream, audio_stream, stream_state, stream_settings
)
muxer.reset(start_dts)
# Mux the first keyframe, then proceed through the rest of the packets

View File

@ -91,12 +91,12 @@ def make_segment_with_parts(
):
"""Create a playlist response for a segment including part segments."""
response = []
if discontinuity:
response.append("#EXT-X-DISCONTINUITY")
for i in range(num_parts):
response.append(
f'#EXT-X-PART:DURATION={TEST_PART_DURATION:.3f},URI="./segment/{segment}.{i}.m4s"{",INDEPENDENT=YES" if i%independent_period==0 else ""}'
)
if discontinuity:
response.append("#EXT-X-DISCONTINUITY")
response.extend(
[
"#EXT-X-PROGRAM-DATE-TIME:"

View File

@ -268,17 +268,24 @@ class MockPyAv:
return self.container
def run_worker(hass, stream, stream_source):
def run_worker(hass, stream, stream_source, stream_settings=None):
"""Run the stream worker under test."""
stream_state = StreamState(hass, stream.outputs, stream._diagnostics)
stream_worker(
stream_source, {}, stream_state, KeyFrameConverter(hass), threading.Event()
stream_source,
{},
stream_settings or hass.data[DOMAIN][ATTR_SETTINGS],
stream_state,
KeyFrameConverter(hass),
threading.Event(),
)
async def async_decode_stream(hass, packets, py_av=None):
async def async_decode_stream(hass, packets, py_av=None, stream_settings=None):
"""Start a stream worker that decodes incoming stream packets into output segments."""
stream = Stream(hass, STREAM_SOURCE, {})
stream = Stream(
hass, STREAM_SOURCE, {}, stream_settings or hass.data[DOMAIN][ATTR_SETTINGS]
)
stream.add_provider(HLS_PROVIDER)
if not py_av:
@ -290,7 +297,7 @@ async def async_decode_stream(hass, packets, py_av=None):
side_effect=py_av.capture_buffer.capture_output_segment,
):
try:
run_worker(hass, stream, STREAM_SOURCE)
run_worker(hass, stream, STREAM_SOURCE, stream_settings)
except StreamEndedError:
# Tests only use a limited number of packets, then the worker exits as expected. In
# production, stream ending would be unexpected.
@ -304,7 +311,7 @@ async def async_decode_stream(hass, packets, py_av=None):
async def test_stream_open_fails(hass):
"""Test failure on stream open."""
stream = Stream(hass, STREAM_SOURCE, {})
stream = Stream(hass, STREAM_SOURCE, {}, hass.data[DOMAIN][ATTR_SETTINGS])
stream.add_provider(HLS_PROVIDER)
with patch("av.open") as av_open, pytest.raises(StreamWorkerError):
av_open.side_effect = av.error.InvalidDataError(-2, "error")
@ -637,7 +644,7 @@ async def test_stream_stopped_while_decoding(hass):
worker_open = threading.Event()
worker_wake = threading.Event()
stream = Stream(hass, STREAM_SOURCE, {})
stream = Stream(hass, STREAM_SOURCE, {}, hass.data[DOMAIN][ATTR_SETTINGS])
stream.add_provider(HLS_PROVIDER)
py_av = MockPyAv()
@ -667,7 +674,7 @@ async def test_update_stream_source(hass):
worker_open = threading.Event()
worker_wake = threading.Event()
stream = Stream(hass, STREAM_SOURCE, {})
stream = Stream(hass, STREAM_SOURCE, {}, hass.data[DOMAIN][ATTR_SETTINGS])
stream.add_provider(HLS_PROVIDER)
# Note that retries are disabled by default in tests, however the stream is "restarted" when
# the stream source is updated.
@ -709,7 +716,9 @@ async def test_update_stream_source(hass):
async def test_worker_log(hass, caplog):
"""Test that the worker logs the url without username and password."""
stream = Stream(hass, "https://abcd:efgh@foo.bar", {})
stream = Stream(
hass, "https://abcd:efgh@foo.bar", {}, hass.data[DOMAIN][ATTR_SETTINGS]
)
stream.add_provider(HLS_PROVIDER)
with patch("av.open") as av_open, pytest.raises(StreamWorkerError) as err:
@ -906,3 +915,24 @@ async def test_get_image(hass, record_worker_sync):
assert await stream.async_get_image() == EMPTY_8_6_JPEG
await stream.stop()
async def test_worker_disable_ll_hls(hass):
"""Test that the worker disables ll-hls for hls inputs."""
stream_settings = StreamSettings(
ll_hls=True,
min_segment_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS
- SEGMENT_DURATION_ADJUSTER,
part_target_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS,
hls_advance_part_limit=3,
hls_part_timeout=TARGET_SEGMENT_DURATION_NON_LL_HLS,
)
py_av = MockPyAv()
py_av.container.format.name = "hls"
await async_decode_stream(
hass,
PacketSequence(TEST_SEQUENCE_LENGTH),
py_av=py_av,
stream_settings=stream_settings,
)
assert stream_settings.ll_hls is False