diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py index 91c4018d899..b986cddaf68 100644 --- a/homeassistant/components/stream/const.py +++ b/homeassistant/components/stream/const.py @@ -19,3 +19,4 @@ MAX_SEGMENTS = 3 # Max number of segments to keep around MIN_SEGMENT_DURATION = 1.5 # Each segment is at least this many seconds PACKETS_TO_WAIT_FOR_AUDIO = 20 # Some streams have an audio stream with no audio +MAX_TIMESTAMP_GAP = 10000 # seconds - anything from 10 to 50000 is probably reasonable diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index 9e036a764f8..22f67432a1b 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -6,7 +6,7 @@ import time import av -from .const import MIN_SEGMENT_DURATION, PACKETS_TO_WAIT_FOR_AUDIO +from .const import MAX_TIMESTAMP_GAP, MIN_SEGMENT_DURATION, PACKETS_TO_WAIT_FOR_AUDIO from .core import Segment, StreamBuffer _LOGGER = logging.getLogger(__name__) @@ -200,6 +200,12 @@ def _stream_worker_internal(hass, stream, quit_event): packet.stream = output_streams[audio_stream] buffer.output.mux(packet) + def finalize_stream(): + if not stream.keepalive: + # End of stream, clear listeners and stop thread + for fmt, _ in outputs.items(): + hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None) + if not peek_first_pts(): container.close() return @@ -222,15 +228,26 @@ def _stream_worker_internal(hass, stream, quit_event): continue last_packet_was_without_dts = False except (av.AVError, StopIteration) as ex: - if not stream.keepalive: - # End of stream, clear listeners and stop thread - for fmt, _ in outputs.items(): - hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None) _LOGGER.error("Error demuxing stream: %s", str(ex)) + finalize_stream() break # Discard packet if dts is not monotonic if packet.dts <= last_dts[packet.stream]: + if (last_dts[packet.stream] - packet.dts) > ( + packet.time_base * MAX_TIMESTAMP_GAP + ): + _LOGGER.warning( + "Timestamp overflow detected: dts = %s, resetting stream", + packet.dts, + ) + finalize_stream() + break + _LOGGER.warning( + "Dropping out of order packet: %s <= %s", + packet.dts, + last_dts[packet.stream], + ) continue # Check for end of segment