2019-03-12 02:57:10 +00:00
|
|
|
"""Provides the worker thread needed for processing streams."""
|
2021-05-13 21:26:11 +00:00
|
|
|
from __future__ import annotations
|
|
|
|
|
2020-08-20 03:18:54 +00:00
|
|
|
from collections import deque
|
2021-07-27 15:53:42 +00:00
|
|
|
from collections.abc import Generator, Iterator, Mapping
|
2021-05-25 05:57:07 +00:00
|
|
|
from io import BytesIO
|
2019-03-12 02:57:10 +00:00
|
|
|
import logging
|
2021-06-13 16:41:21 +00:00
|
|
|
from threading import Event
|
2021-06-14 15:59:25 +00:00
|
|
|
from typing import Any, Callable, cast
|
2019-03-12 02:57:10 +00:00
|
|
|
|
2019-10-14 21:20:18 +00:00
|
|
|
import av
|
|
|
|
|
2021-04-08 19:44:17 +00:00
|
|
|
from . import redact_credentials
|
2020-10-25 02:55:12 +00:00
|
|
|
from .const import (
|
2021-02-23 02:37:19 +00:00
|
|
|
AUDIO_CODECS,
|
2020-10-25 02:55:12 +00:00
|
|
|
MAX_MISSING_DTS,
|
|
|
|
MAX_TIMESTAMP_GAP,
|
|
|
|
MIN_SEGMENT_DURATION,
|
|
|
|
PACKETS_TO_WAIT_FOR_AUDIO,
|
2021-02-23 02:37:19 +00:00
|
|
|
SEGMENT_CONTAINER_FORMAT,
|
2021-05-30 03:41:23 +00:00
|
|
|
SOURCE_TIMEOUT,
|
2021-06-13 16:41:21 +00:00
|
|
|
TARGET_PART_DURATION,
|
2020-10-25 02:55:12 +00:00
|
|
|
)
|
2021-06-13 16:41:21 +00:00
|
|
|
from .core import Part, Segment, StreamOutput
|
2019-03-12 02:57:10 +00:00
|
|
|
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
2021-02-15 17:52:37 +00:00
|
|
|
class SegmentBuffer:
|
|
|
|
"""Buffer for writing a sequence of packets to the output as a segment."""
|
|
|
|
|
2021-06-13 16:41:21 +00:00
|
|
|
def __init__(
|
|
|
|
self, outputs_callback: Callable[[], Mapping[str, StreamOutput]]
|
|
|
|
) -> None:
|
2021-02-15 17:52:37 +00:00
|
|
|
"""Initialize SegmentBuffer."""
|
2021-06-13 16:41:21 +00:00
|
|
|
self._stream_id: int = 0
|
|
|
|
self._outputs_callback: Callable[
|
|
|
|
[], Mapping[str, StreamOutput]
|
|
|
|
] = outputs_callback
|
2021-05-27 03:22:31 +00:00
|
|
|
# sequence gets incremented before the first segment so the first segment
|
|
|
|
# has a sequence number of 0.
|
|
|
|
self._sequence = -1
|
2021-06-13 16:41:21 +00:00
|
|
|
self._segment_start_dts: int = cast(int, None)
|
2021-05-25 05:57:07 +00:00
|
|
|
self._memory_file: BytesIO = cast(BytesIO, None)
|
|
|
|
self._av_output: av.container.OutputContainer = None
|
|
|
|
self._input_video_stream: av.video.VideoStream = None
|
2021-07-10 20:58:37 +00:00
|
|
|
self._input_audio_stream: av.audio.stream.AudioStream | None = None
|
2021-05-25 05:57:07 +00:00
|
|
|
self._output_video_stream: av.video.VideoStream = None
|
2021-07-10 20:58:37 +00:00
|
|
|
self._output_audio_stream: av.audio.stream.AudioStream | None = None
|
2021-06-13 16:41:21 +00:00
|
|
|
self._segment: Segment | None = None
|
2021-06-20 05:38:02 +00:00
|
|
|
# the following 3 member variables are used for Part formation
|
|
|
|
self._memory_file_pos: int = cast(int, None)
|
2021-06-13 16:41:21 +00:00
|
|
|
self._part_start_dts: int = cast(int, None)
|
|
|
|
self._part_has_keyframe = False
|
2021-05-25 05:57:07 +00:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def make_new_av(
|
2021-05-26 08:19:09 +00:00
|
|
|
memory_file: BytesIO, sequence: int, input_vstream: av.video.VideoStream
|
2021-05-25 05:57:07 +00:00
|
|
|
) -> av.container.OutputContainer:
|
|
|
|
"""Make a new av OutputContainer."""
|
|
|
|
return av.open(
|
|
|
|
memory_file,
|
|
|
|
mode="w",
|
|
|
|
format=SEGMENT_CONTAINER_FORMAT,
|
|
|
|
container_options={
|
|
|
|
# Removed skip_sidx - see https://github.com/home-assistant/core/pull/39970
|
|
|
|
# "cmaf" flag replaces several of the movflags used, but too recent to use for now
|
2021-06-13 16:41:21 +00:00
|
|
|
"movflags": "empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer",
|
|
|
|
# Sometimes the first segment begins with negative timestamps, and this setting just
|
|
|
|
# adjusts the timestamps in the output from that segment to start from 0. Helps from
|
|
|
|
# having to make some adjustments in test_durations
|
|
|
|
"avoid_negative_ts": "make_non_negative",
|
2021-05-25 05:57:07 +00:00
|
|
|
"fragment_index": str(sequence + 1),
|
|
|
|
"video_track_timescale": str(int(1 / input_vstream.time_base)),
|
2021-06-13 16:41:21 +00:00
|
|
|
# Create a fragments every TARGET_PART_DURATION. The data from each fragment is stored in
|
|
|
|
# a "Part" that can be combined with the data from all the other "Part"s, plus an init
|
|
|
|
# section, to reconstitute the data in a "Segment".
|
|
|
|
"frag_duration": str(int(TARGET_PART_DURATION * 1e6)),
|
2021-05-25 05:57:07 +00:00
|
|
|
},
|
|
|
|
)
|
2021-02-15 17:52:37 +00:00
|
|
|
|
2021-05-25 05:57:07 +00:00
|
|
|
def set_streams(
|
|
|
|
self,
|
|
|
|
video_stream: av.video.VideoStream,
|
2021-06-14 15:59:25 +00:00
|
|
|
audio_stream: Any,
|
2021-05-25 05:57:07 +00:00
|
|
|
# no type hint for audio_stream until https://github.com/PyAV-Org/PyAV/pull/775 is merged
|
|
|
|
) -> None:
|
2021-02-18 12:26:02 +00:00
|
|
|
"""Initialize output buffer with streams from container."""
|
2021-05-25 05:57:07 +00:00
|
|
|
self._input_video_stream = video_stream
|
|
|
|
self._input_audio_stream = audio_stream
|
2021-02-18 12:26:02 +00:00
|
|
|
|
2021-06-13 16:41:21 +00:00
|
|
|
def reset(self, video_dts: int) -> None:
|
2021-02-15 17:52:37 +00:00
|
|
|
"""Initialize a new stream segment."""
|
|
|
|
# Keep track of the number of segments we've processed
|
|
|
|
self._sequence += 1
|
2021-06-20 05:38:02 +00:00
|
|
|
self._segment_start_dts = video_dts
|
2021-06-13 16:41:21 +00:00
|
|
|
self._segment = None
|
2021-05-25 05:57:07 +00:00
|
|
|
self._memory_file = BytesIO()
|
2021-06-20 05:38:02 +00:00
|
|
|
self._memory_file_pos = 0
|
2021-05-25 05:57:07 +00:00
|
|
|
self._av_output = self.make_new_av(
|
|
|
|
memory_file=self._memory_file,
|
|
|
|
sequence=self._sequence,
|
|
|
|
input_vstream=self._input_video_stream,
|
2021-02-23 02:37:19 +00:00
|
|
|
)
|
2021-05-25 05:57:07 +00:00
|
|
|
self._output_video_stream = self._av_output.add_stream(
|
|
|
|
template=self._input_video_stream
|
|
|
|
)
|
|
|
|
# Check if audio is requested
|
|
|
|
self._output_audio_stream = None
|
|
|
|
if self._input_audio_stream and self._input_audio_stream.name in AUDIO_CODECS:
|
|
|
|
self._output_audio_stream = self._av_output.add_stream(
|
|
|
|
template=self._input_audio_stream
|
|
|
|
)
|
2021-02-15 17:52:37 +00:00
|
|
|
|
2021-06-13 16:41:21 +00:00
|
|
|
def mux_packet(self, packet: av.Packet) -> None:
|
2021-05-25 05:57:07 +00:00
|
|
|
"""Mux a packet to the appropriate output stream."""
|
2021-02-15 17:52:37 +00:00
|
|
|
|
|
|
|
# Check for end of segment
|
2021-06-13 16:41:21 +00:00
|
|
|
if packet.stream == self._input_video_stream:
|
2021-02-15 17:52:37 +00:00
|
|
|
|
2021-06-13 16:41:21 +00:00
|
|
|
if (
|
|
|
|
packet.is_keyframe
|
2021-06-20 05:38:02 +00:00
|
|
|
and (packet.dts - self._segment_start_dts) * packet.time_base
|
2021-06-13 16:41:21 +00:00
|
|
|
>= MIN_SEGMENT_DURATION
|
|
|
|
):
|
|
|
|
# Flush segment (also flushes the stub part segment)
|
2021-06-20 05:38:02 +00:00
|
|
|
self.flush(packet, last_part=True)
|
2021-02-15 17:52:37 +00:00
|
|
|
# Reinitialize
|
2021-06-13 16:41:21 +00:00
|
|
|
self.reset(packet.dts)
|
2021-02-15 17:52:37 +00:00
|
|
|
|
2021-06-13 16:41:21 +00:00
|
|
|
# Mux the packet
|
2021-05-25 05:57:07 +00:00
|
|
|
packet.stream = self._output_video_stream
|
|
|
|
self._av_output.mux(packet)
|
2021-06-13 16:41:21 +00:00
|
|
|
self.check_flush_part(packet)
|
|
|
|
self._part_has_keyframe |= packet.is_keyframe
|
|
|
|
|
2021-05-25 05:57:07 +00:00
|
|
|
elif packet.stream == self._input_audio_stream:
|
|
|
|
packet.stream = self._output_audio_stream
|
|
|
|
self._av_output.mux(packet)
|
2021-02-15 17:52:37 +00:00
|
|
|
|
2021-06-13 16:41:21 +00:00
|
|
|
def check_flush_part(self, packet: av.Packet) -> None:
|
|
|
|
"""Check for and mark a part segment boundary and record its duration."""
|
2021-06-20 05:38:02 +00:00
|
|
|
if self._memory_file_pos == self._memory_file.tell():
|
2021-06-13 16:41:21 +00:00
|
|
|
return
|
|
|
|
if self._segment is None:
|
|
|
|
# We have our first non-zero byte position. This means the init has just
|
|
|
|
# been written. Create a Segment and put it to the queue of each output.
|
|
|
|
self._segment = Segment(
|
|
|
|
sequence=self._sequence,
|
|
|
|
stream_id=self._stream_id,
|
|
|
|
init=self._memory_file.getvalue(),
|
|
|
|
)
|
2021-06-20 05:38:02 +00:00
|
|
|
self._memory_file_pos = self._memory_file.tell()
|
|
|
|
self._part_start_dts = self._segment_start_dts
|
2021-06-13 16:41:21 +00:00
|
|
|
# Fetch the latest StreamOutputs, which may have changed since the
|
|
|
|
# worker started.
|
|
|
|
for stream_output in self._outputs_callback().values():
|
|
|
|
stream_output.put(self._segment)
|
|
|
|
else: # These are the ends of the part segments
|
2021-06-20 05:38:02 +00:00
|
|
|
self.flush(packet, last_part=False)
|
|
|
|
|
|
|
|
def flush(self, packet: av.Packet, last_part: bool) -> None:
|
|
|
|
"""Output a part from the most recent bytes in the memory_file.
|
|
|
|
|
|
|
|
If last_part is True, also close the segment, give it a duration,
|
|
|
|
and clean up the av_output and memory_file.
|
|
|
|
"""
|
|
|
|
if last_part:
|
|
|
|
# Closing the av_output will write the remaining buffered data to the
|
|
|
|
# memory_file as a new moof/mdat.
|
|
|
|
self._av_output.close()
|
2021-06-13 16:41:21 +00:00
|
|
|
assert self._segment
|
2021-06-20 05:38:02 +00:00
|
|
|
self._memory_file.seek(self._memory_file_pos)
|
2021-06-13 16:41:21 +00:00
|
|
|
self._segment.parts.append(
|
|
|
|
Part(
|
|
|
|
duration=float((packet.dts - self._part_start_dts) * packet.time_base),
|
|
|
|
has_keyframe=self._part_has_keyframe,
|
2021-06-20 05:38:02 +00:00
|
|
|
data=self._memory_file.read(),
|
2021-06-13 16:41:21 +00:00
|
|
|
)
|
2021-05-26 08:19:09 +00:00
|
|
|
)
|
2021-06-20 05:38:02 +00:00
|
|
|
if last_part:
|
|
|
|
self._segment.duration = float(
|
|
|
|
(packet.dts - self._segment_start_dts) * packet.time_base
|
|
|
|
)
|
|
|
|
self._memory_file.close() # We don't need the BytesIO object anymore
|
|
|
|
else:
|
|
|
|
self._memory_file_pos = self._memory_file.tell()
|
|
|
|
self._part_start_dts = packet.dts
|
|
|
|
self._part_has_keyframe = False
|
2021-02-18 12:26:02 +00:00
|
|
|
|
2021-06-13 16:41:21 +00:00
|
|
|
def discontinuity(self) -> None:
|
2021-02-18 12:26:02 +00:00
|
|
|
"""Mark the stream as having been restarted."""
|
|
|
|
# Preserving sequence and stream_id here keep the HLS playlist logic
|
|
|
|
# simple to check for discontinuity at output time, and to determine
|
|
|
|
# the discontinuity sequence number.
|
|
|
|
self._stream_id += 1
|
2021-02-15 17:52:37 +00:00
|
|
|
|
2021-06-13 16:41:21 +00:00
|
|
|
def close(self) -> None:
|
2021-02-23 02:37:19 +00:00
|
|
|
"""Close stream buffer."""
|
2021-05-25 05:57:07 +00:00
|
|
|
self._av_output.close()
|
2021-05-26 08:19:09 +00:00
|
|
|
self._memory_file.close()
|
2021-02-15 17:52:37 +00:00
|
|
|
|
|
|
|
|
2021-07-27 15:53:42 +00:00
|
|
|
class PeekIterator(Iterator):
|
|
|
|
"""An Iterator that may allow multiple passes.
|
|
|
|
|
|
|
|
This may be consumed like a normal Iterator, however also supports a
|
|
|
|
peek() method that buffers consumed items from the iterator.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, iterator: Iterator[av.Packet]) -> None:
|
|
|
|
"""Initialize PeekIterator."""
|
|
|
|
self._iterator = iterator
|
|
|
|
self._buffer: deque[av.Packet] = deque()
|
|
|
|
# A pointer to either _iterator or _buffer
|
|
|
|
self._next = self._iterator.__next__
|
|
|
|
|
|
|
|
def __iter__(self) -> Iterator:
|
|
|
|
"""Return an iterator."""
|
|
|
|
return self
|
|
|
|
|
|
|
|
def __next__(self) -> av.Packet:
|
|
|
|
"""Return and consume the next item available."""
|
|
|
|
return self._next()
|
|
|
|
|
|
|
|
def _pop_buffer(self) -> av.Packet:
|
|
|
|
"""Consume items from the buffer until exhausted."""
|
|
|
|
if self._buffer:
|
|
|
|
return self._buffer.popleft()
|
|
|
|
# The buffer is empty, so change to consume from the iterator
|
|
|
|
self._next = self._iterator.__next__
|
|
|
|
return self._next()
|
|
|
|
|
|
|
|
def peek(self) -> Generator[av.Packet, None, None]:
|
|
|
|
"""Return items without consuming from the iterator."""
|
|
|
|
# Items consumed are added to a buffer for future calls to __next__
|
|
|
|
# or peek. First iterate over the buffer from previous calls to peek.
|
|
|
|
self._next = self._pop_buffer
|
|
|
|
for packet in self._buffer:
|
|
|
|
yield packet
|
|
|
|
for packet in self._iterator:
|
|
|
|
self._buffer.append(packet)
|
|
|
|
yield packet
|
|
|
|
|
|
|
|
|
2021-07-07 22:29:15 +00:00
|
|
|
class TimestampValidator:
|
|
|
|
"""Validate ordering of timestamps for packets in a stream."""
|
|
|
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
"""Initialize the TimestampValidator."""
|
|
|
|
# Decompression timestamp of last packet in each stream
|
|
|
|
self._last_dts: dict[av.stream.Stream, float] = {}
|
|
|
|
# Number of consecutive missing decompression timestamps
|
|
|
|
self._missing_dts = 0
|
|
|
|
|
|
|
|
def is_valid(self, packet: av.Packet) -> float:
|
|
|
|
"""Validate the packet timestamp based on ordering within the stream."""
|
|
|
|
# Discard packets missing DTS. Terminate if too many are missing.
|
|
|
|
if packet.dts is None:
|
|
|
|
if self._missing_dts >= MAX_MISSING_DTS:
|
|
|
|
raise StopIteration(
|
|
|
|
f"No dts in {MAX_MISSING_DTS+1} consecutive packets"
|
|
|
|
)
|
|
|
|
self._missing_dts += 1
|
|
|
|
return False
|
|
|
|
self._missing_dts = 0
|
|
|
|
# Discard when dts is not monotonic. Terminate if gap is too wide.
|
|
|
|
prev_dts = self._last_dts.get(packet.stream, float("-inf"))
|
|
|
|
if packet.dts <= prev_dts:
|
|
|
|
gap = packet.time_base * (prev_dts - packet.dts)
|
|
|
|
if gap > MAX_TIMESTAMP_GAP:
|
|
|
|
raise StopIteration(
|
|
|
|
f"Timestamp overflow detected: last dts = {prev_dts}, dts = {packet.dts}"
|
|
|
|
)
|
|
|
|
return False
|
|
|
|
self._last_dts[packet.stream] = packet.dts
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
2021-07-27 15:53:42 +00:00
|
|
|
def is_keyframe(packet: av.Packet) -> Any:
|
|
|
|
"""Return true if the packet is a keyframe."""
|
|
|
|
return packet.is_keyframe
|
|
|
|
|
|
|
|
|
|
|
|
def unsupported_audio(packets: Iterator[av.Packet], audio_stream: Any) -> bool:
|
|
|
|
"""Detect ADTS AAC, which is not supported by pyav."""
|
|
|
|
if not audio_stream:
|
|
|
|
return False
|
|
|
|
for count, packet in enumerate(packets):
|
|
|
|
if count >= PACKETS_TO_WAIT_FOR_AUDIO:
|
|
|
|
# Some streams declare an audio stream and never send any packets
|
|
|
|
_LOGGER.warning("Audio stream not found")
|
|
|
|
break
|
|
|
|
if packet.stream == audio_stream:
|
|
|
|
# detect ADTS AAC and disable audio
|
|
|
|
if audio_stream.codec.name == "aac" and packet.size > 2:
|
|
|
|
with memoryview(packet) as packet_view:
|
|
|
|
if packet_view[0] == 0xFF and packet_view[1] & 0xF0 == 0xF0:
|
|
|
|
_LOGGER.warning("ADTS AAC detected - disabling audio stream")
|
|
|
|
return True
|
|
|
|
break
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
2021-07-07 22:29:15 +00:00
|
|
|
def stream_worker(
|
2021-06-14 15:59:25 +00:00
|
|
|
source: str,
|
|
|
|
options: dict[str, str],
|
|
|
|
segment_buffer: SegmentBuffer,
|
|
|
|
quit_event: Event,
|
2021-06-13 16:41:21 +00:00
|
|
|
) -> None:
|
2019-03-12 02:57:10 +00:00
|
|
|
"""Handle consuming streams."""
|
2019-07-31 19:25:30 +00:00
|
|
|
|
2020-11-11 17:32:56 +00:00
|
|
|
try:
|
2021-05-30 03:41:23 +00:00
|
|
|
container = av.open(source, options=options, timeout=SOURCE_TIMEOUT)
|
2020-11-11 17:32:56 +00:00
|
|
|
except av.AVError:
|
2021-04-08 19:44:17 +00:00
|
|
|
_LOGGER.error("Error opening stream %s", redact_credentials(str(source)))
|
2020-11-11 17:32:56 +00:00
|
|
|
return
|
2019-03-12 02:57:10 +00:00
|
|
|
try:
|
|
|
|
video_stream = container.streams.video[0]
|
|
|
|
except (KeyError, IndexError):
|
|
|
|
_LOGGER.error("Stream has no video")
|
2020-09-10 19:55:55 +00:00
|
|
|
container.close()
|
2019-03-12 02:57:10 +00:00
|
|
|
return
|
2020-08-20 03:18:54 +00:00
|
|
|
try:
|
|
|
|
audio_stream = container.streams.audio[0]
|
|
|
|
except (KeyError, IndexError):
|
|
|
|
audio_stream = None
|
2020-09-10 19:55:55 +00:00
|
|
|
# These formats need aac_adtstoasc bitstream filter, but auto_bsf not
|
|
|
|
# compatible with empty_moov and manual bitstream filters not in PyAV
|
|
|
|
if container.format.name in {"hls", "mpegts"}:
|
|
|
|
audio_stream = None
|
2020-09-24 12:35:52 +00:00
|
|
|
# Some audio streams do not have a profile and throw errors when remuxing
|
|
|
|
if audio_stream and audio_stream.profile is None:
|
|
|
|
audio_stream = None
|
2019-03-12 02:57:10 +00:00
|
|
|
|
2021-07-27 15:53:42 +00:00
|
|
|
dts_validator = TimestampValidator()
|
|
|
|
container_packets = PeekIterator(
|
|
|
|
filter(dts_validator.is_valid, container.demux((video_stream, audio_stream)))
|
|
|
|
)
|
|
|
|
|
|
|
|
def is_video(packet: av.Packet) -> Any:
|
|
|
|
"""Return true if the packet is for the video stream."""
|
|
|
|
return packet.stream == video_stream
|
2020-08-20 03:18:54 +00:00
|
|
|
|
|
|
|
# Have to work around two problems with RTSP feeds in ffmpeg
|
|
|
|
# 1 - first frame has bad pts/dts https://trac.ffmpeg.org/ticket/5018
|
|
|
|
# 2 - seeking can be problematic https://trac.ffmpeg.org/ticket/7815
|
2021-07-27 15:53:42 +00:00
|
|
|
#
|
|
|
|
# Use a peeking iterator to peek into the start of the stream, ensuring
|
|
|
|
# everything looks good, then go back to the start when muxing below.
|
|
|
|
try:
|
|
|
|
if audio_stream and unsupported_audio(container_packets.peek(), audio_stream):
|
|
|
|
audio_stream = None
|
|
|
|
container_packets = PeekIterator(
|
|
|
|
filter(dts_validator.is_valid, container.demux(video_stream))
|
2020-08-20 03:18:54 +00:00
|
|
|
)
|
|
|
|
|
2021-07-27 15:53:42 +00:00
|
|
|
# Advance to the first keyframe for muxing, then rewind so the muxing
|
|
|
|
# loop below can consume.
|
|
|
|
first_keyframe = next(filter(is_keyframe, filter(is_video, container_packets)))
|
|
|
|
# Deal with problem #1 above (bad first packet pts/dts) by recalculating
|
|
|
|
# using pts/dts from second packet. Use the peek iterator to advance
|
|
|
|
# without consuming from container_packets. Skip over the first keyframe
|
|
|
|
# then use the duration from the second video packet to adjust dts.
|
|
|
|
next_video_packet = next(filter(is_video, container_packets.peek()))
|
|
|
|
start_dts = next_video_packet.dts - next_video_packet.duration
|
|
|
|
first_keyframe.dts = first_keyframe.pts = start_dts
|
|
|
|
except (av.AVError, StopIteration) as ex:
|
|
|
|
_LOGGER.error("Error demuxing stream while finding first packet: %s", str(ex))
|
2020-09-03 16:22:00 +00:00
|
|
|
container.close()
|
|
|
|
return
|
2020-11-16 20:13:33 +00:00
|
|
|
|
2021-02-18 12:26:02 +00:00
|
|
|
segment_buffer.set_streams(video_stream, audio_stream)
|
2021-07-27 15:53:42 +00:00
|
|
|
segment_buffer.reset(start_dts)
|
|
|
|
|
|
|
|
# Mux the first keyframe, then proceed through the rest of the packets
|
|
|
|
segment_buffer.mux_packet(first_keyframe)
|
2019-03-12 02:57:10 +00:00
|
|
|
|
|
|
|
while not quit_event.is_set():
|
|
|
|
try:
|
2021-07-27 15:53:42 +00:00
|
|
|
packet = next(container_packets)
|
2019-03-12 02:57:10 +00:00
|
|
|
except (av.AVError, StopIteration) as ex:
|
2019-03-21 14:31:55 +00:00
|
|
|
_LOGGER.error("Error demuxing stream: %s", str(ex))
|
2019-03-12 02:57:10 +00:00
|
|
|
break
|
2021-02-15 17:52:37 +00:00
|
|
|
segment_buffer.mux_packet(packet)
|
2020-05-22 16:13:37 +00:00
|
|
|
|
|
|
|
# Close stream
|
2021-02-15 17:52:37 +00:00
|
|
|
segment_buffer.close()
|
2020-05-22 16:13:37 +00:00
|
|
|
container.close()
|