"""Provides the worker thread needed for processing streams.""" from fractions import Fraction import io import logging from .const import AUDIO_SAMPLE_RATE from .core import Segment, StreamBuffer _LOGGER = logging.getLogger(__name__) def generate_audio_frame(): """Generate a blank audio frame.""" from av import AudioFrame audio_frame = AudioFrame(format='dbl', layout='mono', samples=1024) # audio_bytes = b''.join(b'\x00\x00\x00\x00\x00\x00\x00\x00' # for i in range(0, 1024)) audio_bytes = b'\x00\x00\x00\x00\x00\x00\x00\x00' * 1024 audio_frame.planes[0].update(audio_bytes) audio_frame.sample_rate = AUDIO_SAMPLE_RATE audio_frame.time_base = Fraction(1, AUDIO_SAMPLE_RATE) return audio_frame def create_stream_buffer(stream_output, video_stream, audio_frame): """Create a new StreamBuffer.""" import av a_packet = None segment = io.BytesIO() output = av.open( segment, mode='w', format=stream_output.format) vstream = output.add_stream( stream_output.video_codec, video_stream.rate) # Fix format vstream.codec_context.format = \ video_stream.codec_context.format # Check if audio is requested astream = None if stream_output.audio_codec: astream = output.add_stream( stream_output.audio_codec, AUDIO_SAMPLE_RATE) # Need to do it multiple times for some reason while not a_packet: a_packets = astream.encode(audio_frame) if a_packets: a_packet = a_packets[0] return (a_packet, StreamBuffer(segment, output, vstream, astream)) def stream_worker(hass, stream, quit_event): """Handle consuming streams.""" import av container = av.open(stream.source, options=stream.options) try: video_stream = container.streams.video[0] except (KeyError, IndexError): _LOGGER.error("Stream has no video") return audio_frame = generate_audio_frame() outputs = {} first_packet = True sequence = 1 audio_packets = {} last_dts = None while not quit_event.is_set(): try: packet = next(container.demux(video_stream)) if packet.dts is None: if first_packet: continue # If we get a "flushing" packet, the stream is done raise StopIteration("No dts in packet") except (av.AVError, StopIteration) as ex: # End of stream, clear listeners and stop thread for fmt, _ in outputs.items(): hass.loop.call_soon_threadsafe( stream.outputs[fmt].put, None) _LOGGER.error("Error demuxing stream: %s", str(ex)) break # Skip non monotonically increasing dts in feed if not first_packet and last_dts >= packet.dts: continue last_dts = packet.dts # Reset segment on every keyframe if packet.is_keyframe: # Save segment to outputs segment_duration = (packet.pts * packet.time_base) / sequence for fmt, buffer in outputs.items(): buffer.output.close() del audio_packets[buffer.astream] if stream.outputs.get(fmt): hass.loop.call_soon_threadsafe( stream.outputs[fmt].put, Segment( sequence, buffer.segment, segment_duration )) # Clear outputs and increment sequence outputs = {} if not first_packet: sequence += 1 # Initialize outputs for stream_output in stream.outputs.values(): if video_stream.name != stream_output.video_codec: continue a_packet, buffer = create_stream_buffer( stream_output, video_stream, audio_frame) audio_packets[buffer.astream] = a_packet outputs[stream_output.format] = buffer # First video packet tends to have a weird dts/pts if first_packet: packet.dts = 0 packet.pts = 0 first_packet = False # Store packets on each output for buffer in outputs.values(): # Check if the format requires audio if audio_packets.get(buffer.astream): a_packet = audio_packets[buffer.astream] a_time_base = a_packet.time_base # Determine video start timestamp and duration video_start = packet.pts * packet.time_base video_duration = packet.duration * packet.time_base if packet.is_keyframe: # Set first audio packet in sequence to equal video pts a_packet.pts = int(video_start / a_time_base) a_packet.dts = int(video_start / a_time_base) # Determine target end timestamp for audio target_pts = int((video_start + video_duration) / a_time_base) while a_packet.pts < target_pts: # Mux audio packet and adjust points until target hit buffer.output.mux(a_packet) a_packet.pts += a_packet.duration a_packet.dts += a_packet.duration audio_packets[buffer.astream] = a_packet # Assign the video packet to the new stream & mux packet.stream = buffer.vstream buffer.output.mux(packet)