Add get_image method to Stream (#61918)

* Add get_image method to Stream

* Add KeyFrameConverter class
pull/62622/head
uvjustin 2021-12-23 00:24:53 +08:00 committed by GitHub
parent eda9291ca1
commit 6e13605cad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 153 additions and 6 deletions

View File

@ -50,7 +50,7 @@ from .const import (
STREAM_RESTART_RESET_TIME,
TARGET_SEGMENT_DURATION_NON_LL_HLS,
)
from .core import PROVIDERS, IdleTimer, StreamOutput, StreamSettings
from .core import PROVIDERS, IdleTimer, KeyFrameConverter, StreamOutput, StreamSettings
from .hls import HlsStreamOutput, async_setup_hls
_LOGGER = logging.getLogger(__name__)
@ -137,6 +137,8 @@ def filter_libav_logging() -> None:
# Set log level to error for libav.mp4
logging.getLogger("libav.mp4").setLevel(logging.ERROR)
# Suppress "deprecated pixel format" WARNING
logging.getLogger("libav.swscaler").setLevel(logging.ERROR)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
@ -214,6 +216,7 @@ class Stream:
self._thread_quit = threading.Event()
self._outputs: dict[str, StreamOutput] = {}
self._fast_restart_once = False
self._keyframe_converter = KeyFrameConverter(hass)
self._available: bool = True
self._update_callback: Callable[[], None] | None = None
self._logger = (
@ -327,6 +330,7 @@ class Stream:
self.source,
self.options,
stream_state,
self._keyframe_converter,
self._thread_quit,
)
except StreamWorkerError as err:
@ -419,3 +423,12 @@ class Stream:
# Wait for latest segment, then add the lookback
await hls.recv()
recorder.prepend(list(hls.get_segments())[-num_segments:])
async def get_image(
self,
width: int | None = None,
height: int | None = None,
) -> bytes | None:
"""Wrap get_image from KeyFrameConverter."""
return await self._keyframe_converter.get_image(width=width, height=height)

View File

@ -19,6 +19,8 @@ from homeassistant.util.decorator import Registry
from .const import ATTR_STREAMS, DOMAIN
if TYPE_CHECKING:
from av import CodecContext, Packet
from . import Stream
PROVIDERS = Registry()
@ -356,3 +358,86 @@ class StreamView(HomeAssistantView):
) -> web.StreamResponse:
"""Handle the stream request."""
raise NotImplementedError()
class KeyFrameConverter:
"""
Generate and hold the keyframe as a jpeg.
An overview of the thread and state interaction:
the worker thread sets a packet
at any time, main loop can run a get_image call
_generate_image will try to create an image from the packet
Running _generate_image will clear the packet, so there will only
be one attempt per packet
If successful, _image will be updated and returned by get_image
If unsuccessful, get_image will return the previous image
"""
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize."""
# Keep import here so that we can import stream integration without installing reqs
# pylint: disable=import-outside-toplevel
from homeassistant.components.camera.img_util import TurboJPEGSingleton
self.packet: Packet = None
self._hass = hass
self._image: bytes | None = None
self._turbojpeg = TurboJPEGSingleton.instance()
self._lock = asyncio.Lock()
self._codec_context: CodecContext | None = None
def create_codec_context(self, codec_context: CodecContext) -> None:
"""
Create a codec context to be used for decoding the keyframes.
This is run by the worker thread and will only be called once per worker.
"""
# Keep import here so that we can import stream integration without installing reqs
# pylint: disable=import-outside-toplevel
from av import CodecContext
self._codec_context = CodecContext.create(codec_context.name, "r")
self._codec_context.extradata = codec_context.extradata
self._codec_context.skip_frame = "NONKEY"
self._codec_context.thread_type = "NONE"
def _generate_image(self, width: int | None, height: int | None) -> None:
"""
Generate the keyframe image.
This is run in an executor thread, but since it is called within an
the asyncio lock from the main thread, there will only be one entry
at a time per instance.
"""
if not (self._turbojpeg and self.packet and self._codec_context):
return
packet = self.packet
self.packet = None
# decode packet (flush afterwards)
frames = self._codec_context.decode(packet)
for _i in range(2):
if frames:
break
frames = self._codec_context.decode(None)
if frames:
frame = frames[0]
if width and height:
frame = frame.reformat(width=width, height=height)
bgr_array = frame.to_ndarray(format="bgr24")
self._image = bytes(self._turbojpeg.encode(bgr_array))
async def get_image(
self,
width: int | None = None,
height: int | None = None,
) -> bytes | None:
"""Fetch an image from the Stream and return it as a jpeg in bytes."""
# Use a lock to ensure only one thread is working on the keyframe at a time
async with self._lock:
await self._hass.async_add_executor_job(self._generate_image, width, height)
return self._image

View File

@ -2,7 +2,7 @@
"domain": "stream",
"name": "Stream",
"documentation": "https://www.home-assistant.io/integrations/stream",
"requirements": ["ha-av==8.0.4-rc.1"],
"requirements": ["ha-av==8.0.4-rc.1", "PyTurboJPEG==1.6.3"],
"dependencies": ["http"],
"codeowners": ["@hunterjm", "@uvjustin", "@allenporter"],
"quality_scale": "internal",

View File

@ -14,7 +14,7 @@ import av
from homeassistant.core import HomeAssistant
from . import redact_credentials
from . import KeyFrameConverter, redact_credentials
from .const import (
ATTR_SETTINGS,
AUDIO_CODECS,
@ -439,6 +439,7 @@ def stream_worker(
source: str,
options: dict[str, str],
stream_state: StreamState,
keyframe_converter: KeyFrameConverter,
quit_event: Event,
) -> None:
"""Handle consuming streams."""
@ -453,6 +454,7 @@ def stream_worker(
video_stream = container.streams.video[0]
except (KeyError, IndexError) as ex:
raise StreamWorkerError("Stream has no video") from ex
keyframe_converter.create_codec_context(codec_context=video_stream.codec_context)
try:
audio_stream = container.streams.audio[0]
except (KeyError, IndexError):
@ -474,7 +476,7 @@ def stream_worker(
def is_video(packet: av.Packet) -> Any:
"""Return true if the packet is for the video stream."""
return packet.stream == video_stream
return packet.stream.type == "video"
# Have to work around two problems with RTSP feeds in ffmpeg
# 1 - first frame has bad pts/dts https://trac.ffmpeg.org/ticket/5018
@ -535,3 +537,6 @@ def stream_worker(
raise StreamWorkerError("Error demuxing stream: %s" % str(ex)) from ex
muxer.mux_packet(packet)
if packet.is_keyframe and is_video(packet):
keyframe_converter.packet = packet

View File

@ -55,6 +55,7 @@ PySocks==1.7.1
PyTransportNSW==0.1.1
# homeassistant.components.camera
# homeassistant.components.stream
PyTurboJPEG==1.6.3
# homeassistant.components.vicare

View File

@ -33,6 +33,7 @@ PyRMVtransport==0.3.3
PyTransportNSW==0.1.1
# homeassistant.components.camera
# homeassistant.components.stream
PyTurboJPEG==1.6.3
# homeassistant.components.vicare

View File

@ -29,4 +29,5 @@ def mock_turbo_jpeg(
(second_width, second_height, 0, 0),
]
mocked_turbo_jpeg.scale_with_quality.return_value = EMPTY_8_6_JPEG
mocked_turbo_jpeg.encode.return_value = EMPTY_8_6_JPEG
return mocked_turbo_jpeg

View File

@ -23,7 +23,7 @@ from unittest.mock import patch
import av
import pytest
from homeassistant.components.stream import Stream, create_stream
from homeassistant.components.stream import KeyFrameConverter, Stream, create_stream
from homeassistant.components.stream.const import (
ATTR_SETTINGS,
CONF_LL_HLS,
@ -45,6 +45,7 @@ from homeassistant.components.stream.worker import (
)
from homeassistant.setup import async_setup_component
from tests.components.camera.common import EMPTY_8_6_JPEG, mock_turbo_jpeg
from tests.components.stream.common import generate_h264_video, generate_h265_video
from tests.components.stream.test_ll_hls import TEST_PART_DURATION
@ -97,6 +98,17 @@ class FakeAvInputStream:
self.codec = FakeCodec()
class FakeCodecContext:
name = "h264"
extradata = None
self.codec_context = FakeCodecContext()
@property
def type(self):
"""Return packet type."""
return "video" if self.name == VIDEO_STREAM_FORMAT else "audio"
def __str__(self) -> str:
"""Return a stream name for debugging."""
return f"FakePyAvStream<{self.name}, {self.time_base}>"
@ -195,6 +207,7 @@ class FakePyAvBuffer:
class FakeAvOutputStream:
def __init__(self, capture_packets):
self.capture_packets = capture_packets
self.type = "ignored-type"
def close(self):
return
@ -258,7 +271,9 @@ class MockPyAv:
def run_worker(hass, stream, stream_source):
"""Run the stream worker under test."""
stream_state = StreamState(hass, stream.outputs)
stream_worker(stream_source, {}, stream_state, threading.Event())
stream_worker(
stream_source, {}, stream_state, KeyFrameConverter(hass), threading.Event()
)
async def async_decode_stream(hass, packets, py_av=None):
@ -854,3 +869,29 @@ async def test_h265_video_is_hvc1(hass, record_worker_sync):
await record_worker_sync.join()
stream.stop()
async def test_get_image(hass, record_worker_sync):
"""Test that the has_keyframe metadata matches the media."""
await async_setup_component(hass, "stream", {"stream": {}})
source = generate_h264_video()
# Since libjpeg-turbo is not installed on the CI runner, we use a mock
with patch(
"homeassistant.components.camera.img_util.TurboJPEGSingleton"
) as mock_turbo_jpeg_singleton:
mock_turbo_jpeg_singleton.instance.return_value = mock_turbo_jpeg()
stream = create_stream(hass, source, {})
# use record_worker_sync to grab output segments
with patch.object(hass.config, "is_allowed_path", return_value=True):
await stream.async_record("/example/path")
assert stream._keyframe_converter._image is None
await record_worker_sync.join()
assert await stream.get_image() == EMPTY_8_6_JPEG
stream.stop()