core/homeassistant/components/media_source/local_source.py

161 lines
4.8 KiB
Python

"""Local Media Source Implementation."""
import mimetypes
from pathlib import Path
from typing import Tuple
from aiohttp import web
from homeassistant.components.http import HomeAssistantView
from homeassistant.components.media_player.errors import BrowseError
from homeassistant.components.media_source.error import Unresolvable
from homeassistant.core import HomeAssistant, callback
from homeassistant.util import sanitize_path
from .const import DOMAIN, MEDIA_MIME_TYPES
from .models import BrowseMedia, MediaSource, MediaSourceItem, PlayMedia
@callback
def async_setup(hass: HomeAssistant):
"""Set up local media source."""
source = LocalSource(hass)
hass.data[DOMAIN][DOMAIN] = source
hass.http.register_view(LocalMediaView(hass))
@callback
def async_parse_identifier(item: MediaSourceItem) -> Tuple[str, str]:
"""Parse identifier."""
if not item.identifier:
source_dir_id = "media"
location = ""
else:
source_dir_id, location = item.identifier.lstrip("/").split("/", 1)
if source_dir_id != "media":
raise Unresolvable("Unknown source directory.")
if location != sanitize_path(location):
raise Unresolvable("Invalid path.")
return source_dir_id, location
class LocalSource(MediaSource):
"""Provide local directories as media sources."""
name: str = "Local Media"
def __init__(self, hass: HomeAssistant):
"""Initialize local source."""
super().__init__(DOMAIN)
self.hass = hass
@callback
def async_full_path(self, source_dir_id, location) -> Path:
"""Return full path."""
return self.hass.config.path("media", location)
async def async_resolve_media(self, item: MediaSourceItem) -> str:
"""Resolve media to a url."""
source_dir_id, location = async_parse_identifier(item)
mime_type, _ = mimetypes.guess_type(
self.async_full_path(source_dir_id, location)
)
return PlayMedia(item.identifier, mime_type)
async def async_browse_media(
self, item: MediaSourceItem, media_types: Tuple[str] = MEDIA_MIME_TYPES
) -> BrowseMedia:
"""Return media."""
try:
source_dir_id, location = async_parse_identifier(item)
except Unresolvable as err:
raise BrowseError(str(err)) from err
return await self.hass.async_add_executor_job(
self._browse_media, source_dir_id, location
)
def _browse_media(self, source_dir_id, location):
"""Browse media."""
full_path = Path(self.hass.config.path("media", location))
if not full_path.exists():
raise BrowseError("Path does not exist.")
if not full_path.is_dir():
raise BrowseError("Path is not a directory.")
return self._build_item_response(source_dir_id, full_path)
def _build_item_response(self, source_dir_id: str, path: Path, is_child=False):
mime_type, _ = mimetypes.guess_type(str(path))
media = BrowseMedia(
DOMAIN,
f"{source_dir_id}/{path.relative_to(self.hass.config.path('media'))}",
path.name,
path.is_file(),
path.is_dir(),
mime_type,
)
# Make sure it's a file or directory
if not media.can_play and not media.can_expand:
return None
# Check that it's a media file
if media.can_play and (
not mime_type or mime_type.split("/")[0] not in MEDIA_MIME_TYPES
):
return None
if not media.can_expand:
return media
media.name += "/"
# Append first level children
if not is_child:
media.children = []
for child_path in path.iterdir():
child = self._build_item_response(source_dir_id, child_path, True)
if child:
media.children.append(child)
return media
class LocalMediaView(HomeAssistantView):
"""
Local Media Finder View.
Returns media files in config/media.
"""
url = "/media/{location:.*}"
name = "media"
def __init__(self, hass: HomeAssistant):
"""Initialize the media view."""
self.hass = hass
async def get(self, request: web.Request, location: str) -> web.FileResponse:
"""Start a GET request."""
if location != sanitize_path(location):
return web.HTTPNotFound()
media_path = Path(self.hass.config.path("media", location))
# Check that the file exists
if not media_path.is_file():
raise web.HTTPNotFound()
# Check that it's a media file
mime_type, _ = mimetypes.guess_type(str(media_path))
if not mime_type or mime_type.split("/")[0] not in MEDIA_MIME_TYPES:
raise web.HTTPNotFound()
return web.FileResponse(media_path)