Combine some stream test mocks (#53600)

Combine MockFlushPart with the FakePyAvContainer since the container is
effectively a mock AvOutput. This simulates the behavior of the call to
mux and close that actually write to the memory file.
pull/53609/head
Allen Porter 2021-07-28 01:20:51 -07:00 committed by GitHub
parent 10bfc78365
commit d58151034c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 7 additions and 23 deletions

View File

@ -149,6 +149,7 @@ class FakePyAvBuffer:
self.segments = []
self.audio_packets = []
self.video_packets = []
self.memory_file: io.BytesIO | None = None
def add_stream(self, template=None):
"""Create an output buffer that captures packets for test to examine."""
@ -171,10 +172,13 @@ class FakePyAvBuffer:
"""Capture a packet for tests to examine."""
# Forward to appropriate FakeStream
packet.stream.mux(packet)
# Make new init/part data available to the worker
self.memory_file.write(b"0")
def close(self):
"""Close the buffer."""
return
# Make the final segment data available to the worker
self.memory_file.write(b"0")
def capture_output_segment(self, segment):
"""Capture the output segment for tests to inspect."""
@ -201,23 +205,11 @@ class MockPyAv:
def open(self, stream_source, *args, **kwargs):
"""Return a stream or buffer depending on args."""
if isinstance(stream_source, io.BytesIO):
self.capture_buffer.memory_file = stream_source
return self.capture_buffer
return self.container
class MockFlushPart:
"""Class to hold a wrapper function for check_flush_part."""
# Wrap this method with a preceding write so the BytesIO pointer moves
check_flush_part = SegmentBuffer.check_flush_part
@classmethod
def wrapped_check_flush_part(cls, segment_buffer, packet):
"""Wrap check_flush_part to also advance the memory_file pointer."""
segment_buffer._memory_file.write(b"0")
return cls.check_flush_part(segment_buffer, packet)
async def async_decode_stream(hass, packets, py_av=None):
"""Start a stream worker that decodes incoming stream packets into output segments."""
stream = Stream(hass, STREAM_SOURCE, {})
@ -230,10 +222,6 @@ async def async_decode_stream(hass, packets, py_av=None):
with patch("av.open", new=py_av.open), patch(
"homeassistant.components.stream.core.StreamOutput.put",
side_effect=py_av.capture_buffer.capture_output_segment,
), patch(
"homeassistant.components.stream.worker.SegmentBuffer.check_flush_part",
side_effect=MockFlushPart.wrapped_check_flush_part,
autospec=True,
):
segment_buffer = SegmentBuffer(stream.outputs)
stream_worker(STREAM_SOURCE, {}, segment_buffer, threading.Event())
@ -612,11 +600,7 @@ async def test_update_stream_source(hass):
worker_wake.wait()
return py_av.open(stream_source, args, kwargs)
with patch("av.open", new=blocking_open), patch(
"homeassistant.components.stream.worker.SegmentBuffer.check_flush_part",
side_effect=MockFlushPart.wrapped_check_flush_part,
autospec=True,
):
with patch("av.open", new=blocking_open):
stream.start()
assert worker_open.wait(TIMEOUT)
assert last_stream_source == STREAM_SOURCE