core/homeassistant/components/stream/core.py

181 lines
4.8 KiB
Python

"""Provides core stream functionality."""
import asyncio
from collections import deque
import io
from typing import List, Any
import attr
from aiohttp import web
from homeassistant.core import callback
from homeassistant.components.http import HomeAssistantView
from homeassistant.helpers.event import async_call_later
from homeassistant.util.decorator import Registry
from .const import DOMAIN, ATTR_STREAMS
PROVIDERS = Registry()
@attr.s
class StreamBuffer:
"""Represent a segment."""
segment = attr.ib(type=io.BytesIO)
output = attr.ib() # type=av.OutputContainer
vstream = attr.ib() # type=av.VideoStream
astream = attr.ib(default=None) # type=av.AudioStream
@attr.s
class Segment:
"""Represent a segment."""
sequence = attr.ib(type=int)
segment = attr.ib(type=io.BytesIO)
duration = attr.ib(type=float)
class StreamOutput:
"""Represents a stream output."""
num_segments = 3
def __init__(self, stream, timeout: int = 300) -> None:
"""Initialize a stream output."""
self.idle = False
self.timeout = timeout
self._stream = stream
self._cursor = None
self._event = asyncio.Event()
self._segments = deque(maxlen=self.num_segments)
self._unsub = None
@property
def name(self) -> str:
"""Return provider name."""
return None
@property
def format(self) -> str:
"""Return container format."""
return None
@property
def audio_codec(self) -> str:
"""Return desired audio codec."""
return None
@property
def video_codec(self) -> str:
"""Return desired video codec."""
return None
@property
def segments(self) -> List[int]:
"""Return current sequence from segments."""
return [s.sequence for s in self._segments]
@property
def target_duration(self) -> int:
"""Return the average duration of the segments in seconds."""
durations = [s.duration for s in self._segments]
return round(sum(durations) // len(self._segments)) or 1
def get_segment(self, sequence: int = None) -> Any:
"""Retrieve a specific segment, or the whole list."""
self.idle = False
# Reset idle timeout
if self._unsub is not None:
self._unsub()
self._unsub = async_call_later(
self._stream.hass, self.timeout, self._timeout)
if not sequence:
return self._segments
for segment in self._segments:
if segment.sequence == sequence:
return segment
return None
async def recv(self) -> Segment:
"""Wait for and retrieve the latest segment."""
last_segment = max(self.segments, default=0)
if self._cursor is None or self._cursor <= last_segment:
await self._event.wait()
if not self._segments:
return None
segment = self.get_segment()[-1]
self._cursor = segment.sequence
return segment
@callback
def put(self, segment: Segment) -> None:
"""Store output."""
# Start idle timeout when we start recieving data
if self._unsub is None:
self._unsub = async_call_later(
self._stream.hass, self.timeout, self._timeout)
if segment is None:
self._event.set()
# Cleanup provider
if self._unsub is not None:
self._unsub()
self.cleanup()
return
self._segments.append(segment)
self._event.set()
self._event.clear()
@callback
def _timeout(self, _now=None):
"""Handle stream timeout."""
self._unsub = None
if self._stream.keepalive:
self.idle = True
self._stream.check_idle()
else:
self.cleanup()
def cleanup(self):
"""Handle cleanup."""
self._segments = deque(maxlen=self.num_segments)
self._stream.remove_provider(self)
class StreamView(HomeAssistantView):
"""
Base StreamView.
For implementation of a new stream format, define `url` and `name`
attributes, and implement `handle` method in a child class.
"""
requires_auth = False
platform = None
async def get(self, request, token, sequence=None):
"""Start a GET request."""
hass = request.app['hass']
stream = next((
s for s in hass.data[DOMAIN][ATTR_STREAMS].values()
if s.access_token == token), None)
if not stream:
raise web.HTTPNotFound()
# Start worker if not already started
stream.start()
return await self.handle(request, stream, sequence)
async def handle(self, request, stream, sequence):
"""Handle the stream request."""
raise NotImplementedError()