Decouple media lookup from Plex play_media service (#35663)

* Decouple media lookup from play_media service

* More explicit input/search validation, cleanup, more tests

* Minor cleanup

* Normalize media_type string in lookup call

* Move key lookup, add tests via service calls

* Always allow play_media service calls

* No need to pass arguments to nested functions
pull/36179/head
jjlawren 2020-05-26 14:39:56 -05:00 committed by GitHub
parent 6507951bb1
commit 59c112a3f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 495 additions and 104 deletions

View File

@ -7,12 +7,9 @@ import requests.exceptions
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN, MediaPlayerEntity
from homeassistant.components.media_player.const import (
MEDIA_TYPE_EPISODE,
MEDIA_TYPE_MOVIE,
MEDIA_TYPE_MUSIC,
MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_TVSHOW,
MEDIA_TYPE_VIDEO,
SUPPORT_NEXT_TRACK,
SUPPORT_PAUSE,
SUPPORT_PLAY,
@ -487,7 +484,7 @@ class PlexMediaPlayer(MediaPlayerEntity):
| SUPPORT_VOLUME_MUTE
)
return 0
return SUPPORT_PLAY_MEDIA
def set_volume_level(self, volume):
"""Set volume level, range 0..1."""
@ -561,32 +558,12 @@ class PlexMediaPlayer(MediaPlayerEntity):
)
return
media_type = media_type.lower()
src = json.loads(media_id)
if media_type == PLEX_DOMAIN and isinstance(src, int):
try:
media = self.plex_server.fetch_item(src)
except plexapi.exceptions.NotFound:
_LOGGER.error("Media for key %s not found", src)
return
shuffle = 0
else:
library = src.get("library_name")
shuffle = src.get("shuffle", 0)
media = None
if isinstance(src, int):
src = {"plex_key": src}
try:
if media_type == MEDIA_TYPE_MUSIC:
media = self._get_music_media(library, src)
elif media_type == MEDIA_TYPE_EPISODE:
media = self._get_tv_media(library, src)
elif media_type == MEDIA_TYPE_PLAYLIST:
media = self.plex_server.playlist(src["playlist_name"])
elif media_type == MEDIA_TYPE_VIDEO:
media = self.plex_server.library.section(library).get(src["video_name"])
except plexapi.exceptions.NotFound:
_LOGGER.error("Media could not be found: %s", media_id)
return
shuffle = src.pop("shuffle", 0)
media = self.plex_server.lookup_media(media_type, **src)
if media is None:
_LOGGER.error("Media could not be found: %s", media_id)
@ -600,79 +577,6 @@ class PlexMediaPlayer(MediaPlayerEntity):
except requests.exceptions.ConnectTimeout:
_LOGGER.error("Timed out playing on %s", self.name)
def _get_music_media(self, library_name, src):
"""Find music media and return a Plex media object."""
artist_name = src["artist_name"]
album_name = src.get("album_name")
track_name = src.get("track_name")
track_number = src.get("track_number")
artist = self.plex_server.library.section(library_name).get(artist_name)
if album_name:
album = artist.album(album_name)
if track_name:
return album.track(track_name)
if track_number:
for track in album.tracks():
if int(track.index) == int(track_number):
return track
return None
return album
if track_name:
return artist.searchTracks(track_name, maxresults=1)
return artist
def _get_tv_media(self, library_name, src):
"""Find TV media and return a Plex media object."""
show_name = src["show_name"]
season_number = src.get("season_number")
episode_number = src.get("episode_number")
target_season = None
target_episode = None
show = self.plex_server.library.section(library_name).get(show_name)
if not season_number:
return show
for season in show.seasons():
if int(season.seasonNumber) == int(season_number):
target_season = season
break
if target_season is None:
_LOGGER.error(
"Season not found: %s\\%s - S%sE%s",
library_name,
show_name,
str(season_number).zfill(2),
str(episode_number).zfill(2),
)
else:
if not episode_number:
return target_season
for episode in target_season.episodes():
if int(episode.index) == int(episode_number):
target_episode = episode
break
if target_episode is None:
_LOGGER.error(
"Episode not found: %s\\%s - S%sE%s",
library_name,
show_name,
str(season_number).zfill(2),
str(episode_number).zfill(2),
)
return target_episode
@property
def device_state_attributes(self):
"""Return the scene state attributes."""

View File

@ -3,7 +3,7 @@ import logging
import ssl
from urllib.parse import urlparse
from plexapi.exceptions import Unauthorized
from plexapi.exceptions import NotFound, Unauthorized
import plexapi.myplex
import plexapi.playqueue
import plexapi.server
@ -11,6 +11,12 @@ from requests import Session
import requests.exceptions
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
from homeassistant.components.media_player.const import (
MEDIA_TYPE_EPISODE,
MEDIA_TYPE_MUSIC,
MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_VIDEO,
)
from homeassistant.const import CONF_TOKEN, CONF_URL, CONF_VERIFY_SSL
from homeassistant.core import callback
from homeassistant.helpers.debounce import Debouncer
@ -25,6 +31,7 @@ from .const import (
CONF_USE_EPISODE_ART,
DEBOUNCE_TIMEOUT,
DEFAULT_VERIFY_SSL,
DOMAIN,
PLEX_NEW_MP_SIGNAL,
PLEX_UPDATE_MEDIA_PLAYER_SIGNAL,
PLEX_UPDATE_SENSOR_SIGNAL,
@ -367,3 +374,157 @@ class PlexServer:
def fetch_item(self, item):
"""Fetch item from Plex server."""
return self._plex_server.fetchItem(item)
def lookup_media(self, media_type, **kwargs):
"""Lookup a piece of media."""
media_type = media_type.lower()
if media_type == DOMAIN:
key = kwargs["plex_key"]
try:
return self.fetch_item(key)
except plexapi.exceptions.NotFound:
_LOGGER.error("Media for key %s not found", key)
return None
if media_type == MEDIA_TYPE_PLAYLIST:
try:
playlist_name = kwargs["playlist_name"]
return self.playlist(playlist_name)
except KeyError:
_LOGGER.error("Must specify 'playlist_name' for this search")
return None
except NotFound:
_LOGGER.error(
"Playlist '%s' not found", playlist_name,
)
return None
try:
library_name = kwargs["library_name"]
library_section = self.library.section(library_name)
except KeyError:
_LOGGER.error("Must specify 'library_name' for this search")
return None
except NotFound:
_LOGGER.error("Library '%s' not found", library_name)
return None
def lookup_music():
"""Search for music and return a Plex media object."""
album_name = kwargs.get("album_name")
track_name = kwargs.get("track_name")
track_number = kwargs.get("track_number")
try:
artist_name = kwargs["artist_name"]
artist = library_section.get(artist_name)
except KeyError:
_LOGGER.error("Must specify 'artist_name' for this search")
return None
except NotFound:
_LOGGER.error(
"Artist '%s' not found in '%s'", artist_name, library_name
)
return None
if album_name:
try:
album = artist.album(album_name)
except NotFound:
_LOGGER.error(
"Album '%s' by '%s' not found", album_name, artist_name
)
return None
if track_name:
try:
return album.track(track_name)
except NotFound:
_LOGGER.error(
"Track '%s' on '%s' by '%s' not found",
track_name,
album_name,
artist_name,
)
return None
if track_number:
for track in album.tracks():
if int(track.index) == int(track_number):
return track
_LOGGER.error(
"Track %d on '%s' by '%s' not found",
track_number,
album_name,
artist_name,
)
return None
return album
if track_name:
try:
return artist.get(track_name)
except NotFound:
_LOGGER.error(
"Track '%s' by '%s' not found", track_name, artist_name
)
return None
return artist
def lookup_tv():
"""Find TV media and return a Plex media object."""
season_number = kwargs.get("season_number")
episode_number = kwargs.get("episode_number")
try:
show_name = kwargs["show_name"]
show = library_section.get(show_name)
except KeyError:
_LOGGER.error("Must specify 'show_name' for this search")
return None
except NotFound:
_LOGGER.error("Show '%s' not found in '%s'", show_name, library_name)
return None
if not season_number:
return show
try:
season = show.season(int(season_number))
except NotFound:
_LOGGER.error(
"Season %d of '%s' not found", season_number, show_name,
)
return None
if not episode_number:
return season
try:
return season.episode(episode=int(episode_number))
except NotFound:
_LOGGER.error(
"Episode not found: %s - S%sE%s",
show_name,
str(season_number).zfill(2),
str(episode_number).zfill(2),
)
return None
if media_type == MEDIA_TYPE_MUSIC:
return lookup_music()
if media_type == MEDIA_TYPE_EPISODE:
return lookup_tv()
if media_type == MEDIA_TYPE_VIDEO:
try:
video_name = kwargs["video_name"]
return library_section.get(video_name)
except KeyError:
_LOGGER.error("Must specify 'video_name' for this search")
except NotFound:
_LOGGER.error(
"Movie '%s' not found in '%s'", video_name, library_name,
)

View File

@ -152,6 +152,19 @@ class MockPlexServer:
"""Mock version of PlexServer."""
return "1.0"
@property
def library(self):
"""Mock library object of PlexServer."""
return MockPlexLibrary()
def playlist(self, playlist):
"""Mock the playlist lookup method."""
return MockPlexMediaItem(playlist, mediatype="playlist")
def fetchItem(self, item):
"""Mock the fetchItem method."""
return MockPlexMediaItem("Item Name")
class MockPlexClient:
"""Mock a PlexClient instance."""
@ -186,7 +199,7 @@ class MockPlexClient:
@property
def protocolCapabilities(self):
"""Mock the protocolCapabilities attribute."""
return ["player"]
return ["playback"]
@property
def state(self):
@ -203,6 +216,10 @@ class MockPlexClient:
"""Mock the version attribute."""
return "1.0"
def playMedia(self, item):
"""Mock the playMedia method."""
pass
class MockPlexSession:
"""Mock a PlexServer.sessions() instance."""
@ -259,9 +276,78 @@ class MockPlexSession:
return 2020
class MockPlexLibrary:
"""Mock a Plex Library instance."""
def __init__(self):
"""Initialize the object."""
def section(self, library_name):
"""Mock the LibrarySection lookup."""
return MockPlexLibrarySection(library_name)
class MockPlexLibrarySection:
"""Mock a Plex LibrarySection instance."""
def __init__(self, library="Movies"):
"""Initialize the object."""
self.title = library
def get(self, query):
"""Mock the get lookup method."""
if self.title == "Music":
return MockPlexArtist(query)
return MockPlexMediaItem(query)
class MockPlexMediaItem:
"""Mock a Plex Media instance."""
def __init__(self, title, mediatype="video"):
"""Initialize the object."""
self.title = str(title)
self.type = mediatype
def album(self, album):
"""Mock the album lookup method."""
return MockPlexMediaItem(album, mediatype="album")
def track(self, track):
"""Mock the track lookup method."""
return MockPlexMediaTrack()
def tracks(self):
"""Mock the tracks lookup method."""
for index in range(1, 10):
yield MockPlexMediaTrack(index)
def episode(self, episode):
"""Mock the episode lookup method."""
return MockPlexMediaItem(episode, mediatype="episode")
def season(self, season):
"""Mock the season lookup method."""
return MockPlexMediaItem(season, mediatype="season")
class MockPlexArtist(MockPlexMediaItem):
"""Mock a Plex Artist instance."""
def __init__(self, artist):
"""Initialize the object."""
super().__init__(artist)
self.type = "artist"
def get(self, track):
"""Mock the track lookup method."""
return MockPlexMediaTrack()
class MockPlexMediaTrack(MockPlexMediaItem):
"""Mock a Plex Track instance."""
def __init__(self, index=1):
"""Initialize the object."""
super().__init__(f"Track {index}", "track")
self.index = index

View File

@ -1,7 +1,18 @@
"""Tests for Plex server."""
import copy
from plexapi.exceptions import NotFound
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
from homeassistant.components.media_player.const import (
ATTR_MEDIA_CONTENT_ID,
ATTR_MEDIA_CONTENT_TYPE,
MEDIA_TYPE_EPISODE,
MEDIA_TYPE_MUSIC,
MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_VIDEO,
SERVICE_PLAY_MEDIA,
)
from homeassistant.components.plex.const import (
CONF_IGNORE_NEW_SHARED_USERS,
CONF_IGNORE_PLEX_WEB_CLIENTS,
@ -10,10 +21,17 @@ from homeassistant.components.plex.const import (
PLEX_UPDATE_PLATFORMS_SIGNAL,
SERVERS,
)
from homeassistant.const import ATTR_ENTITY_ID
from homeassistant.helpers.dispatcher import async_dispatcher_send
from .const import DEFAULT_DATA, DEFAULT_OPTIONS
from .mock_classes import MockPlexServer
from .mock_classes import (
MockPlexArtist,
MockPlexLibrary,
MockPlexLibrarySection,
MockPlexMediaItem,
MockPlexServer,
)
from tests.async_mock import patch
from tests.common import MockConfigEntry
@ -244,3 +262,225 @@ async def test_ignore_plex_web_client(hass):
media_players = hass.states.async_entity_ids("media_player")
assert len(media_players) == int(sensor.state) - 1
async def test_media_lookups(hass):
"""Test media lookups to Plex server."""
entry = MockConfigEntry(
domain=DOMAIN,
data=DEFAULT_DATA,
options=DEFAULT_OPTIONS,
unique_id=DEFAULT_DATA["server_id"],
)
mock_plex_server = MockPlexServer(config_entry=entry)
with patch("plexapi.server.PlexServer", return_value=mock_plex_server), patch(
"homeassistant.components.plex.PlexWebsocket.listen"
):
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
server_id = mock_plex_server.machineIdentifier
loaded_server = hass.data[DOMAIN][SERVERS][server_id]
# Plex Key searches
async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
await hass.async_block_till_done()
media_player_id = hass.states.async_entity_ids("media_player")[0]
with patch("homeassistant.components.plex.PlexServer.create_playqueue"):
assert await hass.services.async_call(
MP_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: media_player_id,
ATTR_MEDIA_CONTENT_TYPE: DOMAIN,
ATTR_MEDIA_CONTENT_ID: 123,
},
True,
)
with patch.object(MockPlexServer, "fetchItem", side_effect=NotFound):
assert await hass.services.async_call(
MP_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: media_player_id,
ATTR_MEDIA_CONTENT_TYPE: DOMAIN,
ATTR_MEDIA_CONTENT_ID: 123,
},
True,
)
# TV show searches
with patch.object(MockPlexLibrary, "section", side_effect=NotFound):
assert (
loaded_server.lookup_media(
MEDIA_TYPE_EPISODE, library_name="Not a Library", show_name="A TV Show"
)
is None
)
with patch.object(MockPlexLibrarySection, "get", side_effect=NotFound):
assert (
loaded_server.lookup_media(
MEDIA_TYPE_EPISODE, library_name="TV Shows", show_name="Not a TV Show"
)
is None
)
assert (
loaded_server.lookup_media(
MEDIA_TYPE_EPISODE, library_name="TV Shows", episode_name="An Episode"
)
is None
)
assert loaded_server.lookup_media(
MEDIA_TYPE_EPISODE, library_name="TV Shows", show_name="A TV Show"
)
assert loaded_server.lookup_media(
MEDIA_TYPE_EPISODE,
library_name="TV Shows",
show_name="A TV Show",
season_number=2,
)
assert loaded_server.lookup_media(
MEDIA_TYPE_EPISODE,
library_name="TV Shows",
show_name="A TV Show",
season_number=2,
episode_number=3,
)
with patch.object(MockPlexMediaItem, "season", side_effect=NotFound):
assert (
loaded_server.lookup_media(
MEDIA_TYPE_EPISODE,
library_name="TV Shows",
show_name="A TV Show",
season_number=2,
)
is None
)
with patch.object(MockPlexMediaItem, "episode", side_effect=NotFound):
assert (
loaded_server.lookup_media(
MEDIA_TYPE_EPISODE,
library_name="TV Shows",
show_name="A TV Show",
season_number=2,
episode_number=1,
)
is None
)
# Music searches
assert (
loaded_server.lookup_media(
MEDIA_TYPE_MUSIC, library_name="Music", album_name="An Album"
)
is None
)
assert loaded_server.lookup_media(
MEDIA_TYPE_MUSIC, library_name="Music", artist_name="An Artist"
)
assert loaded_server.lookup_media(
MEDIA_TYPE_MUSIC,
library_name="Music",
artist_name="An Artist",
track_name="A Track",
)
assert loaded_server.lookup_media(
MEDIA_TYPE_MUSIC,
library_name="Music",
artist_name="An Artist",
album_name="An Album",
)
with patch.object(MockPlexLibrarySection, "get", side_effect=NotFound):
assert (
loaded_server.lookup_media(
MEDIA_TYPE_MUSIC,
library_name="Music",
artist_name="Not an Artist",
album_name="An Album",
)
is None
)
with patch.object(MockPlexArtist, "album", side_effect=NotFound):
assert (
loaded_server.lookup_media(
MEDIA_TYPE_MUSIC,
library_name="Music",
artist_name="An Artist",
album_name="Not an Album",
)
is None
)
with patch.object(MockPlexMediaItem, "track", side_effect=NotFound):
assert (
loaded_server.lookup_media(
MEDIA_TYPE_MUSIC,
library_name="Music",
artist_name="An Artist",
album_name="An Album",
track_name="Not a Track",
)
is None
)
with patch.object(MockPlexArtist, "get", side_effect=NotFound):
assert (
loaded_server.lookup_media(
MEDIA_TYPE_MUSIC,
library_name="Music",
artist_name="An Artist",
track_name="Not a Track",
)
is None
)
assert loaded_server.lookup_media(
MEDIA_TYPE_MUSIC,
library_name="Music",
artist_name="An Artist",
album_name="An Album",
track_number=3,
)
assert (
loaded_server.lookup_media(
MEDIA_TYPE_MUSIC,
library_name="Music",
artist_name="An Artist",
album_name="An Album",
track_number=30,
)
is None
)
assert loaded_server.lookup_media(
MEDIA_TYPE_MUSIC,
library_name="Music",
artist_name="An Artist",
album_name="An Album",
track_name="A Track",
)
# Playlist searches
assert loaded_server.lookup_media(MEDIA_TYPE_PLAYLIST, playlist_name="A Playlist")
assert loaded_server.lookup_media(MEDIA_TYPE_PLAYLIST) is None
with patch.object(MockPlexServer, "playlist", side_effect=NotFound):
assert (
loaded_server.lookup_media(
MEDIA_TYPE_PLAYLIST, playlist_name="Not a Playlist"
)
is None
)
# Movie searches
assert loaded_server.lookup_media(MEDIA_TYPE_VIDEO, video_name="A Movie") is None
assert loaded_server.lookup_media(MEDIA_TYPE_VIDEO, library_name="Movies") is None
assert loaded_server.lookup_media(
MEDIA_TYPE_VIDEO, library_name="Movies", video_name="A Movie"
)
with patch.object(MockPlexLibrarySection, "get", side_effect=NotFound):
assert (
loaded_server.lookup_media(
MEDIA_TYPE_VIDEO, library_name="Movies", video_name="Not a Movie"
)
is None
)