Fix playback of hls cameras in stream (#75166)
parent
b5e24048db
commit
326ffdcd49
|
@ -57,9 +57,15 @@ from .const import (
|
|||
SOURCE_TIMEOUT,
|
||||
STREAM_RESTART_INCREMENT,
|
||||
STREAM_RESTART_RESET_TIME,
|
||||
TARGET_SEGMENT_DURATION_NON_LL_HLS,
|
||||
)
|
||||
from .core import PROVIDERS, IdleTimer, KeyFrameConverter, StreamOutput, StreamSettings
|
||||
from .core import (
|
||||
PROVIDERS,
|
||||
STREAM_SETTINGS_NON_LL_HLS,
|
||||
IdleTimer,
|
||||
KeyFrameConverter,
|
||||
StreamOutput,
|
||||
StreamSettings,
|
||||
)
|
||||
from .diagnostics import Diagnostics
|
||||
from .hls import HlsStreamOutput, async_setup_hls
|
||||
|
||||
|
@ -224,14 +230,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|||
hls_part_timeout=2 * conf[CONF_PART_DURATION],
|
||||
)
|
||||
else:
|
||||
hass.data[DOMAIN][ATTR_SETTINGS] = StreamSettings(
|
||||
ll_hls=False,
|
||||
min_segment_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS
|
||||
- SEGMENT_DURATION_ADJUSTER,
|
||||
part_target_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS,
|
||||
hls_advance_part_limit=3,
|
||||
hls_part_timeout=TARGET_SEGMENT_DURATION_NON_LL_HLS,
|
||||
)
|
||||
hass.data[DOMAIN][ATTR_SETTINGS] = STREAM_SETTINGS_NON_LL_HLS
|
||||
|
||||
# Setup HLS
|
||||
hls_endpoint = async_setup_hls(hass)
|
||||
|
|
|
@ -5,6 +5,7 @@ import asyncio
|
|||
from collections import deque
|
||||
from collections.abc import Callable, Coroutine, Iterable
|
||||
import datetime
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from aiohttp import web
|
||||
|
@ -16,13 +17,20 @@ from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
|
|||
from homeassistant.helpers.event import async_call_later
|
||||
from homeassistant.util.decorator import Registry
|
||||
|
||||
from .const import ATTR_STREAMS, DOMAIN
|
||||
from .const import (
|
||||
ATTR_STREAMS,
|
||||
DOMAIN,
|
||||
SEGMENT_DURATION_ADJUSTER,
|
||||
TARGET_SEGMENT_DURATION_NON_LL_HLS,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from av import CodecContext, Packet
|
||||
|
||||
from . import Stream
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
PROVIDERS: Registry[str, type[StreamOutput]] = Registry()
|
||||
|
||||
|
||||
|
@ -37,6 +45,15 @@ class StreamSettings:
|
|||
hls_part_timeout: float = attr.ib()
|
||||
|
||||
|
||||
STREAM_SETTINGS_NON_LL_HLS = StreamSettings(
|
||||
ll_hls=False,
|
||||
min_segment_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS - SEGMENT_DURATION_ADJUSTER,
|
||||
part_target_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS,
|
||||
hls_advance_part_limit=3,
|
||||
hls_part_timeout=TARGET_SEGMENT_DURATION_NON_LL_HLS,
|
||||
)
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
class Part:
|
||||
"""Represent a segment part."""
|
||||
|
@ -426,12 +443,22 @@ class KeyFrameConverter:
|
|||
return
|
||||
packet = self.packet
|
||||
self.packet = None
|
||||
# decode packet (flush afterwards)
|
||||
frames = self._codec_context.decode(packet)
|
||||
for _i in range(2):
|
||||
if frames:
|
||||
for _ in range(2): # Retry once if codec context needs to be flushed
|
||||
try:
|
||||
# decode packet (flush afterwards)
|
||||
frames = self._codec_context.decode(packet)
|
||||
for _i in range(2):
|
||||
if frames:
|
||||
break
|
||||
frames = self._codec_context.decode(None)
|
||||
break
|
||||
frames = self._codec_context.decode(None)
|
||||
except EOFError:
|
||||
_LOGGER.debug("Codec context needs flushing, attempting to reopen")
|
||||
self._codec_context.close()
|
||||
self._codec_context.open()
|
||||
else:
|
||||
_LOGGER.debug("Unable to decode keyframe")
|
||||
return
|
||||
if frames:
|
||||
frame = frames[0]
|
||||
if width and height:
|
||||
|
|
|
@ -2,6 +2,10 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from io import BytesIO
|
||||
|
||||
|
||||
def find_box(
|
||||
|
@ -135,3 +139,11 @@ def get_codec_string(mp4_bytes: bytes) -> str:
|
|||
codecs.append(codec)
|
||||
|
||||
return ",".join(codecs)
|
||||
|
||||
|
||||
def read_init(bytes_io: BytesIO) -> bytes:
|
||||
"""Read the init from a mp4 file."""
|
||||
bytes_io.seek(24)
|
||||
moov_len = int.from_bytes(bytes_io.read(4), byteorder="big")
|
||||
bytes_io.seek(0)
|
||||
return bytes_io.read(24 + moov_len)
|
||||
|
|
|
@ -5,11 +5,12 @@ from collections import defaultdict, deque
|
|||
from collections.abc import Callable, Generator, Iterator, Mapping
|
||||
import contextlib
|
||||
import datetime
|
||||
from io import BytesIO
|
||||
from io import SEEK_END, BytesIO
|
||||
import logging
|
||||
from threading import Event
|
||||
from typing import Any, cast
|
||||
|
||||
import attr
|
||||
import av
|
||||
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
@ -24,8 +25,16 @@ from .const import (
|
|||
SEGMENT_CONTAINER_FORMAT,
|
||||
SOURCE_TIMEOUT,
|
||||
)
|
||||
from .core import KeyFrameConverter, Part, Segment, StreamOutput, StreamSettings
|
||||
from .core import (
|
||||
STREAM_SETTINGS_NON_LL_HLS,
|
||||
KeyFrameConverter,
|
||||
Part,
|
||||
Segment,
|
||||
StreamOutput,
|
||||
StreamSettings,
|
||||
)
|
||||
from .diagnostics import Diagnostics
|
||||
from .fmp4utils import read_init
|
||||
from .hls import HlsStreamOutput
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
@ -108,7 +117,7 @@ class StreamMuxer:
|
|||
hass: HomeAssistant,
|
||||
video_stream: av.video.VideoStream,
|
||||
audio_stream: av.audio.stream.AudioStream | None,
|
||||
audio_bsf: av.BitStreamFilterContext | None,
|
||||
audio_bsf: av.BitStreamFilter | None,
|
||||
stream_state: StreamState,
|
||||
stream_settings: StreamSettings,
|
||||
) -> None:
|
||||
|
@ -120,6 +129,7 @@ class StreamMuxer:
|
|||
self._input_video_stream: av.video.VideoStream = video_stream
|
||||
self._input_audio_stream: av.audio.stream.AudioStream | None = audio_stream
|
||||
self._audio_bsf = audio_bsf
|
||||
self._audio_bsf_context: av.BitStreamFilterContext = None
|
||||
self._output_video_stream: av.video.VideoStream = None
|
||||
self._output_audio_stream: av.audio.stream.AudioStream | None = None
|
||||
self._segment: Segment | None = None
|
||||
|
@ -151,7 +161,7 @@ class StreamMuxer:
|
|||
**{
|
||||
# Removed skip_sidx - see https://github.com/home-assistant/core/pull/39970
|
||||
# "cmaf" flag replaces several of the movflags used, but too recent to use for now
|
||||
"movflags": "frag_custom+empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer",
|
||||
"movflags": "frag_custom+empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer+delay_moov",
|
||||
# Sometimes the first segment begins with negative timestamps, and this setting just
|
||||
# adjusts the timestamps in the output from that segment to start from 0. Helps from
|
||||
# having to make some adjustments in test_durations
|
||||
|
@ -164,7 +174,7 @@ class StreamMuxer:
|
|||
# Fragment durations may exceed the 15% allowed variance but it seems ok
|
||||
**(
|
||||
{
|
||||
"movflags": "empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer",
|
||||
"movflags": "empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer+delay_moov",
|
||||
# Create a fragment every TARGET_PART_DURATION. The data from each fragment is stored in
|
||||
# a "Part" that can be combined with the data from all the other "Part"s, plus an init
|
||||
# section, to reconstitute the data in a "Segment".
|
||||
|
@ -194,8 +204,11 @@ class StreamMuxer:
|
|||
# Check if audio is requested
|
||||
output_astream = None
|
||||
if input_astream:
|
||||
if self._audio_bsf:
|
||||
self._audio_bsf_context = self._audio_bsf.create()
|
||||
self._audio_bsf_context.set_input_stream(input_astream)
|
||||
output_astream = container.add_stream(
|
||||
template=self._audio_bsf or input_astream
|
||||
template=self._audio_bsf_context or input_astream
|
||||
)
|
||||
return container, output_vstream, output_astream
|
||||
|
||||
|
@ -238,15 +251,29 @@ class StreamMuxer:
|
|||
self._part_has_keyframe |= packet.is_keyframe
|
||||
|
||||
elif packet.stream == self._input_audio_stream:
|
||||
if self._audio_bsf:
|
||||
self._audio_bsf.send(packet)
|
||||
while packet := self._audio_bsf.recv():
|
||||
if self._audio_bsf_context:
|
||||
self._audio_bsf_context.send(packet)
|
||||
while packet := self._audio_bsf_context.recv():
|
||||
packet.stream = self._output_audio_stream
|
||||
self._av_output.mux(packet)
|
||||
return
|
||||
packet.stream = self._output_audio_stream
|
||||
self._av_output.mux(packet)
|
||||
|
||||
def create_segment(self) -> None:
|
||||
"""Create a segment when the moov is ready."""
|
||||
self._segment = Segment(
|
||||
sequence=self._stream_state.sequence,
|
||||
stream_id=self._stream_state.stream_id,
|
||||
init=read_init(self._memory_file),
|
||||
# Fetch the latest StreamOutputs, which may have changed since the
|
||||
# worker started.
|
||||
stream_outputs=self._stream_state.outputs,
|
||||
start_time=self._start_time,
|
||||
)
|
||||
self._memory_file_pos = self._memory_file.tell()
|
||||
self._memory_file.seek(0, SEEK_END)
|
||||
|
||||
def check_flush_part(self, packet: av.Packet) -> None:
|
||||
"""Check for and mark a part segment boundary and record its duration."""
|
||||
if self._memory_file_pos == self._memory_file.tell():
|
||||
|
@ -254,16 +281,10 @@ class StreamMuxer:
|
|||
if self._segment is None:
|
||||
# We have our first non-zero byte position. This means the init has just
|
||||
# been written. Create a Segment and put it to the queue of each output.
|
||||
self._segment = Segment(
|
||||
sequence=self._stream_state.sequence,
|
||||
stream_id=self._stream_state.stream_id,
|
||||
init=self._memory_file.getvalue(),
|
||||
# Fetch the latest StreamOutputs, which may have changed since the
|
||||
# worker started.
|
||||
stream_outputs=self._stream_state.outputs,
|
||||
start_time=self._start_time,
|
||||
)
|
||||
self._memory_file_pos = self._memory_file.tell()
|
||||
self.create_segment()
|
||||
# When using delay_moov, the moov is not written until a moof is also ready
|
||||
# Flush the moof
|
||||
self.flush(packet, last_part=False)
|
||||
else: # These are the ends of the part segments
|
||||
self.flush(packet, last_part=False)
|
||||
|
||||
|
@ -297,6 +318,10 @@ class StreamMuxer:
|
|||
# Closing the av_output will write the remaining buffered data to the
|
||||
# memory_file as a new moof/mdat.
|
||||
self._av_output.close()
|
||||
# With delay_moov, this may be the first time the file pointer has
|
||||
# moved, so the segment may not yet have been created
|
||||
if not self._segment:
|
||||
self.create_segment()
|
||||
elif not self._part_has_keyframe:
|
||||
# Parts which are not the last part or an independent part should
|
||||
# not have durations below 0.85 of the part target duration.
|
||||
|
@ -305,6 +330,9 @@ class StreamMuxer:
|
|||
self._part_start_dts
|
||||
+ 0.85 * self._stream_settings.part_target_duration / packet.time_base,
|
||||
)
|
||||
# Undo dts adjustments if we don't have ll_hls
|
||||
if not self._stream_settings.ll_hls:
|
||||
adjusted_dts = packet.dts
|
||||
assert self._segment
|
||||
self._memory_file.seek(self._memory_file_pos)
|
||||
self._hass.loop.call_soon_threadsafe(
|
||||
|
@ -445,10 +473,7 @@ def get_audio_bitstream_filter(
|
|||
_LOGGER.debug(
|
||||
"ADTS AAC detected. Adding aac_adtstoaac bitstream filter"
|
||||
)
|
||||
bsf = av.BitStreamFilter("aac_adtstoasc")
|
||||
bsf_context = bsf.create()
|
||||
bsf_context.set_input_stream(audio_stream)
|
||||
return bsf_context
|
||||
return av.BitStreamFilter("aac_adtstoasc")
|
||||
break
|
||||
return None
|
||||
|
||||
|
@ -489,7 +514,12 @@ def stream_worker(
|
|||
audio_stream = None
|
||||
# Disable ll-hls for hls inputs
|
||||
if container.format.name == "hls":
|
||||
stream_settings.ll_hls = False
|
||||
for field in attr.fields(StreamSettings):
|
||||
setattr(
|
||||
stream_settings,
|
||||
field.name,
|
||||
getattr(STREAM_SETTINGS_NON_LL_HLS, field.name),
|
||||
)
|
||||
stream_state.diagnostics.set_value("container_format", container.format.name)
|
||||
stream_state.diagnostics.set_value("video_codec", video_stream.name)
|
||||
if audio_stream:
|
||||
|
|
|
@ -749,7 +749,9 @@ async def test_durations(hass, worker_finished_stream):
|
|||
},
|
||||
)
|
||||
|
||||
source = generate_h264_video(duration=SEGMENT_DURATION + 1)
|
||||
source = generate_h264_video(
|
||||
duration=round(SEGMENT_DURATION + target_part_duration + 1)
|
||||
)
|
||||
worker_finished, mock_stream = worker_finished_stream
|
||||
|
||||
with patch("homeassistant.components.stream.Stream", wraps=mock_stream):
|
||||
|
|
Loading…
Reference in New Issue