core/homeassistant/components/media_source/local_source.py

222 lines
7.2 KiB
Python

"""Local Media Source Implementation."""
from __future__ import annotations
import mimetypes
from pathlib import Path
from aiohttp import web
from homeassistant.components.http import HomeAssistantView
from homeassistant.components.media_player.const import MEDIA_CLASS_DIRECTORY
from homeassistant.components.media_player.errors import BrowseError
from homeassistant.components.media_source.error import Unresolvable
from homeassistant.core import HomeAssistant, callback
from homeassistant.util import raise_if_invalid_path
from .const import DOMAIN, MEDIA_CLASS_MAP, MEDIA_MIME_TYPES
from .models import BrowseMediaSource, MediaSource, MediaSourceItem, PlayMedia
@callback
def async_setup(hass: HomeAssistant) -> None:
"""Set up local media source."""
source = LocalSource(hass)
hass.data[DOMAIN][DOMAIN] = source
hass.http.register_view(LocalMediaView(hass, source))
class LocalSource(MediaSource):
"""Provide local directories as media sources."""
name: str = "Local Media"
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize local source."""
super().__init__(DOMAIN)
self.hass = hass
@callback
def async_full_path(self, source_dir_id: str, location: str) -> Path:
"""Return full path."""
return Path(self.hass.config.media_dirs[source_dir_id], location)
@callback
def async_parse_identifier(self, item: MediaSourceItem) -> tuple[str, str]:
"""Parse identifier."""
if not item.identifier:
# Empty source_dir_id and location
return "", ""
source_dir_id, location = item.identifier.split("/", 1)
if source_dir_id not in self.hass.config.media_dirs:
raise Unresolvable("Unknown source directory.")
try:
raise_if_invalid_path(location)
except ValueError as err:
raise Unresolvable("Invalid path.") from err
return source_dir_id, location
async def async_resolve_media(self, item: MediaSourceItem) -> PlayMedia:
"""Resolve media to a url."""
source_dir_id, location = self.async_parse_identifier(item)
if source_dir_id == "" or source_dir_id not in self.hass.config.media_dirs:
raise Unresolvable("Unknown source directory.")
mime_type, _ = mimetypes.guess_type(
str(self.async_full_path(source_dir_id, location))
)
assert isinstance(mime_type, str)
return PlayMedia(f"/media/{item.identifier}", mime_type)
async def async_browse_media(self, item: MediaSourceItem) -> BrowseMediaSource:
"""Return media."""
try:
source_dir_id, location = self.async_parse_identifier(item)
except Unresolvable as err:
raise BrowseError(str(err)) from err
result = await self.hass.async_add_executor_job(
self._browse_media, source_dir_id, location
)
return result
def _browse_media(self, source_dir_id: str, location: str) -> BrowseMediaSource:
"""Browse media."""
# If only one media dir is configured, use that as the local media root
if source_dir_id == "" and len(self.hass.config.media_dirs) == 1:
source_dir_id = list(self.hass.config.media_dirs)[0]
# Multiple folder, root is requested
if source_dir_id == "":
if location:
raise BrowseError("Folder not found.")
base = BrowseMediaSource(
domain=DOMAIN,
identifier="",
media_class=MEDIA_CLASS_DIRECTORY,
media_content_type=None,
title=self.name,
can_play=False,
can_expand=True,
children_media_class=MEDIA_CLASS_DIRECTORY,
)
base.children = [
self._browse_media(source_dir_id, "")
for source_dir_id in self.hass.config.media_dirs
]
return base
full_path = Path(self.hass.config.media_dirs[source_dir_id], location)
if not full_path.exists():
if location == "":
raise BrowseError("Media directory does not exist.")
raise BrowseError("Path does not exist.")
if not full_path.is_dir():
raise BrowseError("Path is not a directory.")
result = self._build_item_response(source_dir_id, full_path)
if not result:
raise BrowseError("Unknown source directory.")
return result
def _build_item_response(
self, source_dir_id: str, path: Path, is_child: bool = False
) -> BrowseMediaSource | None:
mime_type, _ = mimetypes.guess_type(str(path))
is_file = path.is_file()
is_dir = path.is_dir()
# Make sure it's a file or directory
if not is_file and not is_dir:
return None
# Check that it's a media file
if is_file and (
not mime_type or mime_type.split("/")[0] not in MEDIA_MIME_TYPES
):
return None
title = path.name
if is_dir:
title += "/"
media_class = MEDIA_CLASS_DIRECTORY
if mime_type:
media_class = MEDIA_CLASS_MAP.get(
mime_type.split("/")[0], MEDIA_CLASS_DIRECTORY
)
media = BrowseMediaSource(
domain=DOMAIN,
identifier=f"{source_dir_id}/{path.relative_to(self.hass.config.media_dirs[source_dir_id])}",
media_class=media_class,
media_content_type=mime_type or "",
title=title,
can_play=is_file,
can_expand=is_dir,
)
if is_file or is_child:
return media
# Append first level children
media.children = []
for child_path in path.iterdir():
child = self._build_item_response(source_dir_id, child_path, True)
if child:
media.children.append(child)
# Sort children showing directories first, then by name
media.children.sort(key=lambda child: (child.can_play, child.title))
return media
class LocalMediaView(HomeAssistantView):
"""
Local Media Finder View.
Returns media files in config/media.
"""
url = "/media/{source_dir_id}/{location:.*}"
name = "media"
def __init__(self, hass: HomeAssistant, source: LocalSource) -> None:
"""Initialize the media view."""
self.hass = hass
self.source = source
async def get(
self, request: web.Request, source_dir_id: str, location: str
) -> web.FileResponse:
"""Start a GET request."""
try:
raise_if_invalid_path(location)
except ValueError as err:
raise web.HTTPBadRequest() from err
if source_dir_id not in self.hass.config.media_dirs:
raise web.HTTPNotFound()
media_path = self.source.async_full_path(source_dir_id, location)
# Check that the file exists
if not media_path.is_file():
raise web.HTTPNotFound()
# Check that it's a media file
mime_type, _ = mimetypes.guess_type(str(media_path))
if not mime_type or mime_type.split("/")[0] not in MEDIA_MIME_TYPES:
raise web.HTTPNotFound()
return web.FileResponse(media_path)