From 414f167508eaf64b8e5c8cc3c1ba397ae423d4f7 Mon Sep 17 00:00:00 2001 From: uvjustin <46082645+uvjustin@users.noreply.github.com> Date: Tue, 17 Nov 2020 04:13:33 +0800 Subject: [PATCH] Remove pts adjustments in stream (#42399) * Remove unnecessary pts adjustments * Add comments * Use -inf for initial last_dts to be more clear * Use video first_pts as common adjuster in recorder * Remove seek(0) before av.open --- homeassistant/components/stream/recorder.py | 30 ++++++--- homeassistant/components/stream/worker.py | 72 +++++++++------------ 2 files changed, 50 insertions(+), 52 deletions(-) diff --git a/homeassistant/components/stream/recorder.py b/homeassistant/components/stream/recorder.py index d0b8789f602..420e7c654c5 100644 --- a/homeassistant/components/stream/recorder.py +++ b/homeassistant/components/stream/recorder.py @@ -25,12 +25,23 @@ def recorder_save_worker(file_out: str, segments: List[Segment], container_forma output_v = None output_a = None - for segment in segments: - # Seek to beginning and open segment - segment.segment.seek(0) + # Get first_pts values from first segment + if len(segments) > 0: + segment = segments[0] source = av.open(segment.segment, "r", format=container_format) source_v = source.streams.video[0] + first_pts["video"] = source_v.start_time + if len(source.streams.audio) > 0: + source_a = source.streams.audio[0] + first_pts["audio"] = int( + source_v.start_time * source_v.time_base / source_a.time_base + ) + source.close() + for segment in segments: + # Open segment + source = av.open(segment.segment, "r", format=container_format) + source_v = source.streams.video[0] # Add output streams if not output_v: output_v = output.add_stream(template=source_v) @@ -42,13 +53,12 @@ def recorder_save_worker(file_out: str, segments: List[Segment], container_forma # Remux video for packet in source.demux(): - if packet is not None and packet.dts is not None: - if first_pts[packet.stream.type] is None: - first_pts[packet.stream.type] = packet.pts - packet.pts -= first_pts[packet.stream.type] - packet.dts -= first_pts[packet.stream.type] - packet.stream = output_v if packet.stream.type == "video" else output_a - output.mux(packet) + if packet.dts is None: + continue + packet.pts -= first_pts[packet.stream.type] + packet.dts -= first_pts[packet.stream.type] + packet.stream = output_v if packet.stream.type == "video" else output_a + output.mux(packet) source.close() diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index aa6a5d350a9..68cbbc79726 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -102,11 +102,8 @@ def _stream_worker_internal(hass, stream, quit_event): # Iterator for demuxing container_packets = None - # The presentation timestamps of the first packet in each stream we receive - # Use to adjust before muxing or outputting, but we don't adjust internally - first_pts = {} # The decoder timestamps of the latest packet in each stream we processed - last_dts = None + last_dts = {video_stream: float("-inf"), audio_stream: float("-inf")} # Keep track of consecutive packets without a dts to detect end of stream. missing_dts = 0 # Holds the buffers for each stream provider @@ -123,21 +120,19 @@ def _stream_worker_internal(hass, stream, quit_event): # 2 - seeking can be problematic https://trac.ffmpeg.org/ticket/7815 def peek_first_pts(): - nonlocal first_pts, audio_stream, container_packets + """Initialize by peeking into the first few packets of the stream. + + Deal with problem #1 above (bad first packet pts/dts) by recalculating using pts/dts from second packet. + Also load the first video keyframe pts into segment_start_pts and check if the audio stream really exists. + """ + nonlocal segment_start_pts, audio_stream, container_packets missing_dts = 0 - - def empty_stream_dict(): - return { - video_stream: None, - **({audio_stream: None} if audio_stream else {}), - } - + found_audio = False try: container_packets = container.demux((video_stream, audio_stream)) - first_packet = empty_stream_dict() - first_pts = empty_stream_dict() + first_packet = None # Get to first video keyframe - while first_packet[video_stream] is None: + while first_packet is None: packet = next(container_packets) if ( packet.dts is None @@ -148,13 +143,17 @@ def _stream_worker_internal(hass, stream, quit_event): ) missing_dts += 1 continue - if packet.stream == video_stream and packet.is_keyframe: - first_packet[video_stream] = packet + if packet.stream == audio_stream: + found_audio = True + elif packet.is_keyframe: # video_keyframe + first_packet = packet initial_packets.append(packet) # Get first_pts from subsequent frame to first keyframe - while any( - [pts is None for pts in {**first_packet, **first_pts}.values()] - ) and (len(initial_packets) < PACKETS_TO_WAIT_FOR_AUDIO): + while segment_start_pts is None or ( + audio_stream + and not found_audio + and len(initial_packets) < PACKETS_TO_WAIT_FOR_AUDIO + ): packet = next(container_packets) if ( packet.dts is None @@ -165,24 +164,19 @@ def _stream_worker_internal(hass, stream, quit_event): ) missing_dts += 1 continue - if ( - first_packet[packet.stream] is None - ): # actually video already found above so only for audio - if packet.is_keyframe: - first_packet[packet.stream] = packet - else: # Discard leading non-keyframes - continue - else: # This is the second frame to calculate first_pts from - if first_pts[packet.stream] is None: - first_pts[packet.stream] = packet.dts - packet.duration - first_packet[packet.stream].pts = first_pts[packet.stream] - first_packet[packet.stream].dts = first_pts[packet.stream] + if packet.stream == audio_stream: + found_audio = True + elif ( + segment_start_pts is None + ): # This is the second video frame to calculate first_pts from + segment_start_pts = packet.dts - packet.duration + first_packet.pts = segment_start_pts + first_packet.dts = segment_start_pts initial_packets.append(packet) - if audio_stream and first_packet[audio_stream] is None: + if audio_stream and not found_audio: _LOGGER.warning( "Audio stream not found" ) # Some streams declare an audio stream and never send any packets - del first_pts[audio_stream] audio_stream = None except (av.AVError, StopIteration) as ex: @@ -212,9 +206,6 @@ def _stream_worker_internal(hass, stream, quit_event): ) def mux_video_packet(packet): - # adjust pts and dts before muxing - packet.pts -= first_pts[video_stream] - packet.dts -= first_pts[video_stream] # mux packets to each buffer for buffer, output_streams in outputs.values(): # Assign the packet to the new stream & mux @@ -223,9 +214,6 @@ def _stream_worker_internal(hass, stream, quit_event): def mux_audio_packet(packet): # almost the same as muxing video but add extra check - # adjust pts and dts before muxing - packet.pts -= first_pts[audio_stream] - packet.dts -= first_pts[audio_stream] for buffer, output_streams in outputs.values(): # Assign the packet to the new stream & mux if output_streams.get(audio_stream): @@ -241,8 +229,8 @@ def _stream_worker_internal(hass, stream, quit_event): if not peek_first_pts(): container.close() return - last_dts = {k: v - 1 for k, v in first_pts.items()} - initialize_segment(first_pts[video_stream]) + + initialize_segment(segment_start_pts) while not quit_event.is_set(): try: