Stream Record Service (#22456)

* Initial commit of record service for live streams

* fix lint

* update service descriptions

* add tests

* fix lint
pull/22493/head
Jason Hunter 2019-03-28 00:47:07 -04:00 committed by Paulus Schoutsen
parent 9d21afa444
commit 26726af689
11 changed files with 466 additions and 19 deletions

View File

@ -20,7 +20,7 @@ import voluptuous as vol
from homeassistant.core import callback
from homeassistant.const import ATTR_ENTITY_ID, SERVICE_TURN_OFF, \
SERVICE_TURN_ON, EVENT_HOMEASSISTANT_START
SERVICE_TURN_ON, EVENT_HOMEASSISTANT_START, CONF_FILENAME
from homeassistant.exceptions import HomeAssistantError
from homeassistant.loader import bind_hass
from homeassistant.helpers.entity import Entity
@ -33,7 +33,8 @@ from homeassistant.components.media_player.const import (
SERVICE_PLAY_MEDIA, DOMAIN as DOMAIN_MP)
from homeassistant.components.stream import request_stream
from homeassistant.components.stream.const import (
OUTPUT_FORMATS, FORMAT_CONTENT_TYPE)
OUTPUT_FORMATS, FORMAT_CONTENT_TYPE, CONF_STREAM_SOURCE, CONF_LOOKBACK,
CONF_DURATION, SERVICE_RECORD, DOMAIN as DOMAIN_STREAM)
from homeassistant.components import websocket_api
import homeassistant.helpers.config_validation as cv
@ -85,6 +86,12 @@ CAMERA_SERVICE_PLAY_STREAM = CAMERA_SERVICE_SCHEMA.extend({
vol.Optional(ATTR_FORMAT, default='hls'): vol.In(OUTPUT_FORMATS),
})
CAMERA_SERVICE_RECORD = CAMERA_SERVICE_SCHEMA.extend({
vol.Required(CONF_FILENAME): cv.template,
vol.Optional(CONF_DURATION, default=30): int,
vol.Optional(CONF_LOOKBACK, default=0): int,
})
WS_TYPE_CAMERA_THUMBNAIL = 'camera_thumbnail'
SCHEMA_WS_CAMERA_THUMBNAIL = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend({
vol.Required('type'): WS_TYPE_CAMERA_THUMBNAIL,
@ -260,6 +267,10 @@ async def async_setup(hass, config):
SERVICE_PLAY_STREAM, CAMERA_SERVICE_PLAY_STREAM,
async_handle_play_stream_service
)
component.async_register_entity_service(
SERVICE_RECORD, CAMERA_SERVICE_RECORD,
async_handle_record_service
)
return True
@ -640,3 +651,27 @@ async def async_handle_play_stream_service(camera, service_call):
await hass.services.async_call(
DOMAIN_MP, SERVICE_PLAY_MEDIA, data,
blocking=True, context=service_call.context)
async def async_handle_record_service(camera, call):
"""Handle stream recording service calls."""
if not camera.stream_source:
raise HomeAssistantError("{} does not support record service"
.format(camera.entity_id))
hass = camera.hass
filename = call.data[CONF_FILENAME]
filename.hass = hass
video_path = filename.async_render(
variables={ATTR_ENTITY_ID: camera})
data = {
CONF_STREAM_SOURCE: camera.stream_source,
CONF_FILENAME: video_path,
CONF_DURATION: call.data[CONF_DURATION],
CONF_LOOKBACK: call.data[CONF_LOOKBACK],
}
await hass.services.async_call(
DOMAIN_STREAM, SERVICE_RECORD, data,
blocking=True, context=call.context)

View File

@ -51,6 +51,22 @@ play_stream:
description: (Optional) Stream format supported by media player.
example: 'hls'
record:
description: Record live camera feed.
fields:
entity_id:
description: Name of entities to record.
example: 'camera.living_room_camera'
filename:
description: Template of a Filename. Variable is entity_id. Must be mp4.
example: '/tmp/snapshot_{{ entity_id }}.mp4'
duration:
description: (Optional) Target recording length (in seconds). Default: 30
example: 30
lookback:
description: (Optional) Target lookback period (in seconds) to include in addition to duration. Only available if there is currently an active HLS stream.
example: 4
local_file_update_file_path:
description: Update the file_path for a local_file camera.
fields:

View File

@ -10,15 +10,19 @@ import threading
import voluptuous as vol
from homeassistant.auth.util import generate_secret
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
import homeassistant.helpers.config_validation as cv
from homeassistant.const import EVENT_HOMEASSISTANT_STOP, CONF_FILENAME
from homeassistant.core import callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.loader import bind_hass
from .const import DOMAIN, ATTR_STREAMS, ATTR_ENDPOINTS
from .const import (
DOMAIN, ATTR_STREAMS, ATTR_ENDPOINTS, CONF_STREAM_SOURCE,
CONF_DURATION, CONF_LOOKBACK, SERVICE_RECORD)
from .core import PROVIDERS
from .worker import stream_worker
from .hls import async_setup_hls
from .recorder import async_setup_recorder
REQUIREMENTS = ['av==6.1.2']
@ -30,6 +34,16 @@ CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({}),
}, extra=vol.ALLOW_EXTRA)
STREAM_SERVICE_SCHEMA = vol.Schema({
vol.Required(CONF_STREAM_SOURCE): cv.string,
})
SERVICE_RECORD_SCHEMA = STREAM_SERVICE_SCHEMA.extend({
vol.Required(CONF_FILENAME): cv.string,
vol.Optional(CONF_DURATION, default=30): int,
vol.Optional(CONF_LOOKBACK, default=0): int,
})
# Set log level to error for libav
logging.getLogger('libav').setLevel(logging.ERROR)
@ -82,6 +96,9 @@ async def async_setup(hass, config):
hls_endpoint = async_setup_hls(hass)
hass.data[DOMAIN][ATTR_ENDPOINTS]['hls'] = hls_endpoint
# Setup Recorder
async_setup_recorder(hass)
@callback
def shutdown(event):
"""Stop all stream workers."""
@ -92,6 +109,13 @@ async def async_setup(hass, config):
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
async def async_record(call):
"""Call record stream service handler."""
await async_handle_record_service(hass, call)
hass.services.async_register(DOMAIN, SERVICE_RECORD,
async_record, schema=SERVICE_RECORD_SCHEMA)
return True
@ -119,15 +143,15 @@ class Stream:
def add_provider(self, fmt):
"""Add provider output stream."""
provider = PROVIDERS[fmt](self)
if not self._outputs.get(provider.format):
self._outputs[provider.format] = provider
return self._outputs[provider.format]
if not self._outputs.get(fmt):
provider = PROVIDERS[fmt](self)
self._outputs[fmt] = provider
return self._outputs[fmt]
def remove_provider(self, provider):
"""Remove provider output stream."""
if provider.format in self._outputs:
del self._outputs[provider.format]
if provider.name in self._outputs:
del self._outputs[provider.name]
self.check_idle()
if not self._outputs:
@ -165,3 +189,44 @@ class Stream:
self._thread.join()
self._thread = None
_LOGGER.info("Stopped stream: %s", self.source)
async def async_handle_record_service(hass, call):
"""Handle save video service calls."""
stream_source = call.data[CONF_STREAM_SOURCE]
video_path = call.data[CONF_FILENAME]
duration = call.data[CONF_DURATION]
lookback = call.data[CONF_LOOKBACK]
# Check for file access
if not hass.config.is_allowed_path(video_path):
raise HomeAssistantError("Can't write {}, no access to path!"
.format(video_path))
# Check for active stream
streams = hass.data[DOMAIN][ATTR_STREAMS]
stream = streams.get(stream_source)
if not stream:
stream = Stream(hass, stream_source)
streams[stream_source] = stream
# Add recorder
recorder = stream.outputs.get('recorder')
if recorder:
raise HomeAssistantError("Stream already recording to {}!"
.format(recorder.video_path))
recorder = stream.add_provider('recorder')
recorder.video_path = video_path
recorder.timeout = duration
stream.start()
# Take advantage of lookback
hls = stream.outputs.get('hls')
if lookback > 0 and hls:
num_segments = min(int(lookback // hls.target_duration),
hls.num_segments)
# Wait for latest segment, then add the lookback
await hls.recv()
recorder.prepend(list(hls.get_segment())[-num_segments:])

View File

@ -1,10 +1,16 @@
"""Constants for Stream component."""
DOMAIN = 'stream'
CONF_STREAM_SOURCE = 'stream_source'
CONF_LOOKBACK = 'lookback'
CONF_DURATION = 'duration'
ATTR_ENDPOINTS = 'endpoints'
ATTR_STREAMS = 'streams'
ATTR_KEEPALIVE = 'keepalive'
SERVICE_RECORD = 'record'
OUTPUT_FORMATS = ['hls']
FORMAT_CONTENT_TYPE = {

View File

@ -41,15 +41,21 @@ class StreamOutput:
num_segments = 3
def __init__(self, stream) -> None:
def __init__(self, stream, timeout: int = 300) -> None:
"""Initialize a stream output."""
self.idle = False
self.timeout = timeout
self._stream = stream
self._cursor = None
self._event = asyncio.Event()
self._segments = deque(maxlen=self.num_segments)
self._unsub = None
@property
def name(self) -> str:
"""Return provider name."""
return None
@property
def format(self) -> str:
"""Return container format."""
@ -82,7 +88,8 @@ class StreamOutput:
# Reset idle timeout
if self._unsub is not None:
self._unsub()
self._unsub = async_call_later(self._stream.hass, 300, self._timeout)
self._unsub = async_call_later(
self._stream.hass, self.timeout, self._timeout)
if not sequence:
return self._segments
@ -111,14 +118,14 @@ class StreamOutput:
# Start idle timeout when we start recieving data
if self._unsub is None:
self._unsub = async_call_later(
self._stream.hass, 300, self._timeout)
self._stream.hass, self.timeout, self._timeout)
if segment is None:
self._event.set()
# Cleanup provider
if self._unsub is not None:
self._unsub()
self._cleanup()
self.cleanup()
return
self._segments.append(segment)
@ -133,11 +140,11 @@ class StreamOutput:
self.idle = True
self._stream.check_idle()
else:
self._cleanup()
self.cleanup()
def _cleanup(self):
"""Remove provider."""
self._segments = []
def cleanup(self):
"""Handle cleanup."""
self._segments = deque(maxlen=self.num_segments)
self._stream.remove_provider(self)

View File

@ -110,6 +110,11 @@ class M3U8Renderer:
class HlsStreamOutput(StreamOutput):
"""Represents HLS Output formats."""
@property
def name(self) -> str:
"""Return provider name."""
return 'hls'
@property
def format(self) -> str:
"""Return container format."""

View File

@ -0,0 +1,92 @@
"""Provide functionality to record stream."""
import threading
from typing import List
from homeassistant.core import callback
from .core import Segment, StreamOutput, PROVIDERS
@callback
def async_setup_recorder(hass):
"""Only here so Provider Registry works."""
def recorder_save_worker(file_out: str, segments: List[Segment]):
"""Handle saving stream."""
import av
output = av.open(file_out, 'w', options={'movflags': 'frag_keyframe'})
output_v = None
for segment in segments:
# Seek to beginning and open segment
segment.segment.seek(0)
source = av.open(segment.segment, 'r', format='mpegts')
source_v = source.streams.video[0]
# Add output streams
if not output_v:
output_v = output.add_stream(template=source_v)
# Remux video
for packet in source.demux(source_v):
if packet is not None and packet.dts is not None:
packet.stream = output_v
output.mux(packet)
output.close()
@PROVIDERS.register('recorder')
class RecorderOutput(StreamOutput):
"""Represents HLS Output formats."""
def __init__(self, stream, timeout: int = 30) -> None:
"""Initialize recorder output."""
super().__init__(stream, timeout)
self.video_path = None
self._segments = []
@property
def name(self) -> str:
"""Return provider name."""
return 'recorder'
@property
def format(self) -> str:
"""Return container format."""
return 'mpegts'
@property
def audio_codec(self) -> str:
"""Return desired audio codec."""
return 'aac'
@property
def video_codec(self) -> str:
"""Return desired video codec."""
return 'h264'
def prepend(self, segments: List[Segment]) -> None:
"""Prepend segments to existing list."""
own_segments = self.segments
segments = [s for s in segments if s.sequence not in own_segments]
self._segments = segments + self._segments
@callback
def _timeout(self, _now=None):
"""Handle recorder timeout."""
self._unsub = None
self.cleanup()
def cleanup(self):
"""Write recording and clean up."""
thread = threading.Thread(
name='recorder_save_worker',
target=recorder_save_worker,
args=(self.video_path, self._segments))
thread.start()
self._segments = []
self._stream.remove_provider(self)

View File

@ -112,7 +112,7 @@ def stream_worker(hass, stream, quit_event):
a_packet, buffer = create_stream_buffer(
stream_output, video_stream, audio_frame)
audio_packets[buffer.astream] = a_packet
outputs[stream_output.format] = buffer
outputs[stream_output.name] = buffer
# First video packet tends to have a weird dts/pts
if first_packet:

View File

@ -341,3 +341,38 @@ async def test_preload_stream(hass, mock_stream):
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
assert mock_request_stream.called
async def test_record_service_invalid_path(hass, mock_camera):
"""Test record service with invalid path."""
data = {
ATTR_ENTITY_ID: 'camera.demo_camera',
camera.CONF_FILENAME: '/my/invalid/path'
}
with patch.object(hass.config, 'is_allowed_path', return_value=False), \
pytest.raises(HomeAssistantError):
# Call service
await hass.services.async_call(
camera.DOMAIN, camera.SERVICE_RECORD, data, blocking=True)
async def test_record_service(hass, mock_camera, mock_stream):
"""Test record service."""
data = {
ATTR_ENTITY_ID: 'camera.demo_camera',
camera.CONF_FILENAME: '/my/path'
}
with patch('homeassistant.components.demo.camera.DemoCamera.stream_source',
new_callable=PropertyMock) as mock_stream_source, \
patch(
'homeassistant.components.stream.async_handle_record_service',
return_value=mock_coro()) as mock_record_service, \
patch.object(hass.config, 'is_allowed_path', return_value=True):
mock_stream_source.return_value = io.BytesIO()
# Call service
await hass.services.async_call(
camera.DOMAIN, camera.SERVICE_RECORD, data, blocking=True)
# So long as we call stream.record, the rest should be covered
# by those tests.
assert mock_record_service.called

View File

@ -0,0 +1,103 @@
"""The tests for stream."""
from unittest.mock import patch, MagicMock
import pytest
from homeassistant.const import CONF_FILENAME
from homeassistant.components.stream.const import (
DOMAIN, SERVICE_RECORD, CONF_STREAM_SOURCE, CONF_LOOKBACK, ATTR_STREAMS)
from homeassistant.exceptions import HomeAssistantError
from homeassistant.setup import async_setup_component
from tests.common import mock_coro
async def test_record_service_invalid_file(hass):
"""Test record service call with invalid file."""
await async_setup_component(hass, 'stream', {
'stream': {}
})
data = {
CONF_STREAM_SOURCE: 'rtsp://my.video',
CONF_FILENAME: '/my/invalid/path'
}
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
DOMAIN, SERVICE_RECORD, data, blocking=True)
async def test_record_service_init_stream(hass):
"""Test record service call with invalid file."""
await async_setup_component(hass, 'stream', {
'stream': {}
})
data = {
CONF_STREAM_SOURCE: 'rtsp://my.video',
CONF_FILENAME: '/my/invalid/path'
}
with patch('homeassistant.components.stream.Stream') as stream_mock, \
patch.object(hass.config, 'is_allowed_path', return_value=True):
# Setup stubs
stream_mock.return_value.outputs = {}
# Call Service
await hass.services.async_call(
DOMAIN, SERVICE_RECORD, data, blocking=True)
# Assert
assert stream_mock.called
async def test_record_service_existing_record_session(hass):
"""Test record service call with invalid file."""
await async_setup_component(hass, 'stream', {
'stream': {}
})
source = 'rtsp://my.video'
data = {
CONF_STREAM_SOURCE: source,
CONF_FILENAME: '/my/invalid/path'
}
# Setup stubs
stream_mock = MagicMock()
stream_mock.return_value.outputs = {'recorder': MagicMock()}
hass.data[DOMAIN][ATTR_STREAMS][source] = stream_mock
with patch.object(hass.config, 'is_allowed_path', return_value=True), \
pytest.raises(HomeAssistantError):
# Call Service
await hass.services.async_call(
DOMAIN, SERVICE_RECORD, data, blocking=True)
async def test_record_service_lookback(hass):
"""Test record service call with invalid file."""
await async_setup_component(hass, 'stream', {
'stream': {}
})
data = {
CONF_STREAM_SOURCE: 'rtsp://my.video',
CONF_FILENAME: '/my/invalid/path',
CONF_LOOKBACK: 4
}
with patch('homeassistant.components.stream.Stream') as stream_mock, \
patch.object(hass.config, 'is_allowed_path', return_value=True):
# Setup stubs
hls_mock = MagicMock()
hls_mock.num_segments = 3
hls_mock.target_duration = 2
hls_mock.recv.return_value = mock_coro()
stream_mock.return_value.outputs = {
'hls': hls_mock
}
# Call Service
await hass.services.async_call(
DOMAIN, SERVICE_RECORD, data, blocking=True)
assert stream_mock.called
stream_mock.return_value.add_provider.assert_called_once_with(
'recorder')
assert hls_mock.recv.called

View File

@ -0,0 +1,83 @@
"""The tests for hls streams."""
from datetime import timedelta
from io import BytesIO
from unittest.mock import patch
from homeassistant.setup import async_setup_component
from homeassistant.components.stream.core import Segment
from homeassistant.components.stream.recorder import recorder_save_worker
import homeassistant.util.dt as dt_util
from tests.common import async_fire_time_changed
from tests.components.stream.common import (
generate_h264_video, preload_stream)
async def test_record_stream(hass, hass_client):
"""
Test record stream.
Purposefully not mocking anything here to test full
integration with the stream component.
"""
await async_setup_component(hass, 'stream', {
'stream': {}
})
with patch(
'homeassistant.components.stream.recorder.recorder_save_worker'):
# Setup demo track
source = generate_h264_video()
stream = preload_stream(hass, source)
recorder = stream.add_provider('recorder')
stream.start()
segments = 0
while True:
segment = await recorder.recv()
if not segment:
break
segments += 1
stream.stop()
assert segments == 3
async def test_recorder_timeout(hass, hass_client):
"""Test recorder timeout."""
await async_setup_component(hass, 'stream', {
'stream': {}
})
with patch(
'homeassistant.components.stream.recorder.RecorderOutput.cleanup'
) as mock_cleanup:
# Setup demo track
source = generate_h264_video()
stream = preload_stream(hass, source)
recorder = stream.add_provider('recorder')
stream.start()
await recorder.recv()
# Wait a minute
future = dt_util.utcnow() + timedelta(minutes=1)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert mock_cleanup.called
async def test_recorder_save():
"""Test recorder save."""
# Setup
source = generate_h264_video()
output = BytesIO()
output.name = 'test.mp4'
# Run
recorder_save_worker(output, [Segment(1, source, 4)])
# Assert
assert output.getvalue()