core/homeassistant/components/stream/worker.py

167 lines
6.3 KiB
Python
Raw Normal View History

"""Provides the worker thread needed for processing streams."""
from fractions import Fraction
import io
import logging
import av
from .const import AUDIO_SAMPLE_RATE
from .core import Segment, StreamBuffer
_LOGGER = logging.getLogger(__name__)
def generate_audio_frame():
"""Generate a blank audio frame."""
2019-07-31 19:25:30 +00:00
audio_frame = av.AudioFrame(format="dbl", layout="mono", samples=1024)
# audio_bytes = b''.join(b'\x00\x00\x00\x00\x00\x00\x00\x00'
# for i in range(0, 1024))
2019-07-31 19:25:30 +00:00
audio_bytes = b"\x00\x00\x00\x00\x00\x00\x00\x00" * 1024
audio_frame.planes[0].update(audio_bytes)
audio_frame.sample_rate = AUDIO_SAMPLE_RATE
audio_frame.time_base = Fraction(1, AUDIO_SAMPLE_RATE)
return audio_frame
def create_stream_buffer(stream_output, video_stream, audio_frame):
"""Create a new StreamBuffer."""
2019-07-31 19:25:30 +00:00
a_packet = None
segment = io.BytesIO()
2019-07-31 19:25:30 +00:00
output = av.open(segment, mode="w", format=stream_output.format)
vstream = output.add_stream(template=video_stream)
# Check if audio is requested
astream = None
if stream_output.audio_codec:
2019-07-31 19:25:30 +00:00
astream = output.add_stream(stream_output.audio_codec, AUDIO_SAMPLE_RATE)
# Need to do it multiple times for some reason
while not a_packet:
a_packets = astream.encode(audio_frame)
if a_packets:
a_packet = a_packets[0]
return (a_packet, StreamBuffer(segment, output, vstream, astream))
def stream_worker(hass, stream, quit_event):
"""Handle consuming streams."""
2019-07-31 19:25:30 +00:00
container = av.open(stream.source, options=stream.options)
try:
video_stream = container.streams.video[0]
except (KeyError, IndexError):
_LOGGER.error("Stream has no video")
return
audio_frame = generate_audio_frame()
first_packet = True
# Holds the buffers for each stream provider
outputs = {}
# Keep track of the number of segments we've processed
sequence = 1
# Holds the generated silence that needs to be muxed into the output
audio_packets = {}
2019-08-02 21:20:07 +00:00
# The presentation timestamp of the first video packet we receive
first_pts = 0
# The decoder timestamp of the latest packet we processed
last_dts = None
while not quit_event.is_set():
try:
packet = next(container.demux(video_stream))
if packet.dts is None:
if first_packet:
continue
# If we get a "flushing" packet, the stream is done
raise StopIteration("No dts in packet")
except (av.AVError, StopIteration) as ex:
# End of stream, clear listeners and stop thread
for fmt, _ in outputs.items():
2019-07-31 19:25:30 +00:00
hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None)
_LOGGER.error("Error demuxing stream: %s", str(ex))
break
# Skip non monotonically increasing dts in feed
if not first_packet and last_dts >= packet.dts:
continue
last_dts = packet.dts
# Reset timestamps from a 0 time base for this stream
packet.dts -= first_pts
packet.pts -= first_pts
# Reset segment on every keyframe
if packet.is_keyframe:
# Calculate the segment duration by multiplying the presentation
# timestamp by the time base, which gets us total seconds.
# By then dividing by the seqence, we can calculate how long
# each segment is, assuming the stream starts from 0.
segment_duration = (packet.pts * packet.time_base) / sequence
# Save segment to outputs
for fmt, buffer in outputs.items():
buffer.output.close()
del audio_packets[buffer.astream]
if stream.outputs.get(fmt):
hass.loop.call_soon_threadsafe(
2019-07-31 19:25:30 +00:00
stream.outputs[fmt].put,
Segment(sequence, buffer.segment, segment_duration),
)
# Clear outputs and increment sequence
outputs = {}
if not first_packet:
sequence += 1
# Initialize outputs
for stream_output in stream.outputs.values():
if video_stream.name != stream_output.video_codec:
continue
a_packet, buffer = create_stream_buffer(
2019-07-31 19:25:30 +00:00
stream_output, video_stream, audio_frame
)
audio_packets[buffer.astream] = a_packet
outputs[stream_output.name] = buffer
# First video packet tends to have a weird dts/pts
if first_packet:
# If we are attaching to a live stream that does not reset
# timestamps for us, we need to do it ourselves by recording
# the first presentation timestamp and subtracting it from
2019-08-02 21:20:07 +00:00
# subsequent packets we receive.
if (packet.pts * packet.time_base) > 1:
first_pts = packet.pts
packet.dts = 0
packet.pts = 0
first_packet = False
# Store packets on each output
for buffer in outputs.values():
# Check if the format requires audio
if audio_packets.get(buffer.astream):
a_packet = audio_packets[buffer.astream]
a_time_base = a_packet.time_base
# Determine video start timestamp and duration
video_start = packet.pts * packet.time_base
video_duration = packet.duration * packet.time_base
if packet.is_keyframe:
# Set first audio packet in sequence to equal video pts
a_packet.pts = int(video_start / a_time_base)
a_packet.dts = int(video_start / a_time_base)
# Determine target end timestamp for audio
target_pts = int((video_start + video_duration) / a_time_base)
while a_packet.pts < target_pts:
# Mux audio packet and adjust points until target hit
buffer.output.mux(a_packet)
a_packet.pts += a_packet.duration
a_packet.dts += a_packet.duration
audio_packets[buffer.astream] = a_packet
# Assign the video packet to the new stream & mux
packet.stream = buffer.vstream
buffer.output.mux(packet)