core/tests/components/plex/test_services.py

190 lines
6.3 KiB
Python
Raw Normal View History

"""Tests for various Plex services."""
from http import HTTPStatus
from unittest.mock import patch
import plexapi.audio
from plexapi.exceptions import NotFound
import plexapi.playqueue
import pytest
from homeassistant.components.media_player.const import MEDIA_TYPE_MUSIC
from homeassistant.components.plex.const import (
CONF_SERVER,
CONF_SERVER_IDENTIFIER,
DOMAIN,
PLEX_SERVER_CONFIG,
SERVICE_REFRESH_LIBRARY,
SERVICE_SCAN_CLIENTS,
)
from homeassistant.components.plex.services import lookup_plex_media
from homeassistant.const import CONF_URL
from homeassistant.exceptions import HomeAssistantError
from .const import DEFAULT_OPTIONS, SECONDARY_DATA
from tests.common import MockConfigEntry
async def test_refresh_library(
hass,
mock_plex_server,
setup_plex_server,
requests_mock,
empty_payload,
plex_server_accounts,
plex_server_base,
):
"""Test refresh_library service call."""
url = mock_plex_server.url_in_use
refresh = requests_mock.get(
f"{url}/library/sections/1/refresh", status_code=HTTPStatus.OK
)
# Test with non-existent server
with pytest.raises(HomeAssistantError):
assert await hass.services.async_call(
DOMAIN,
SERVICE_REFRESH_LIBRARY,
{"server_name": "Not a Server", "library_name": "Movies"},
True,
)
assert not refresh.called
# Test with non-existent library
assert await hass.services.async_call(
DOMAIN,
SERVICE_REFRESH_LIBRARY,
{"library_name": "Not a Library"},
True,
)
assert not refresh.called
# Test with valid library
assert await hass.services.async_call(
DOMAIN,
SERVICE_REFRESH_LIBRARY,
{"library_name": "Movies"},
True,
)
assert refresh.call_count == 1
# Add a second configured server
secondary_url = SECONDARY_DATA[PLEX_SERVER_CONFIG][CONF_URL]
secondary_name = SECONDARY_DATA[CONF_SERVER]
secondary_id = SECONDARY_DATA[CONF_SERVER_IDENTIFIER]
requests_mock.get(
secondary_url,
text=plex_server_base.format(
name=secondary_name, machine_identifier=secondary_id
),
)
requests_mock.get(f"{secondary_url}/accounts", text=plex_server_accounts)
requests_mock.get(f"{secondary_url}/clients", text=empty_payload)
requests_mock.get(f"{secondary_url}/status/sessions", text=empty_payload)
entry_2 = MockConfigEntry(
domain=DOMAIN,
data=SECONDARY_DATA,
options=DEFAULT_OPTIONS,
unique_id=SECONDARY_DATA["server_id"],
)
await setup_plex_server(config_entry=entry_2)
# Test multiple servers available but none specified
with pytest.raises(HomeAssistantError) as excinfo:
assert await hass.services.async_call(
DOMAIN,
SERVICE_REFRESH_LIBRARY,
{"library_name": "Movies"},
True,
)
assert "Multiple Plex servers configured" in str(excinfo.value)
assert refresh.call_count == 1
async def test_scan_clients(hass, mock_plex_server):
"""Test scan_for_clients service call."""
assert await hass.services.async_call(
DOMAIN,
SERVICE_SCAN_CLIENTS,
blocking=True,
)
async def test_lookup_media_for_other_integrations(
hass,
entry,
setup_plex_server,
requests_mock,
playqueue_1234,
playqueue_created,
):
"""Test media lookup for media_player.play_media calls from cast/sonos."""
CONTENT_ID = '{"library_name": "Music", "artist_name": "Artist"}'
CONTENT_ID_KEY = "100"
CONTENT_ID_BAD_MEDIA = '{"library_name": "Music", "artist_name": "Not an Artist"}'
CONTENT_ID_PLAYQUEUE = '{"playqueue_id": 1234}'
CONTENT_ID_BAD_PLAYQUEUE = '{"playqueue_id": 1235}'
CONTENT_ID_SERVER = '{"plex_server": "Plex Server 1", "library_name": "Music", "artist_name": "Artist"}'
CONTENT_ID_SHUFFLE = (
'{"library_name": "Music", "artist_name": "Artist", "shuffle": 1}'
)
# Test with no Plex integration available
with pytest.raises(HomeAssistantError) as excinfo:
lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID)
assert "Plex integration not configured" in str(excinfo.value)
with patch(
"homeassistant.components.plex.PlexServer.connect", side_effect=NotFound
):
# Initialize Plex integration without setting up a server
with pytest.raises(AssertionError):
await setup_plex_server()
# Test with no Plex servers available
with pytest.raises(HomeAssistantError) as excinfo:
lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID)
assert "No Plex servers available" in str(excinfo.value)
# Complete setup of a Plex server
await hass.config_entries.async_unload(entry.entry_id)
await setup_plex_server()
# Test lookup success
result = lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID)
assert isinstance(result, plexapi.audio.Artist)
# Test media key payload
result = lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_KEY)
assert isinstance(result, plexapi.audio.Track)
# Test with specified server
result = lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_SERVER)
assert isinstance(result, plexapi.audio.Artist)
# Test with media not found
with patch("plexapi.library.LibrarySection.search", return_value=None):
with pytest.raises(HomeAssistantError) as excinfo:
lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_BAD_MEDIA)
assert "Plex media not found" in str(excinfo.value)
# Test with playqueue
requests_mock.get("https://1.2.3.4:32400/playQueues/1234", text=playqueue_1234)
result = lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_PLAYQUEUE)
assert isinstance(result, plexapi.playqueue.PlayQueue)
# Test with invalid playqueue
requests_mock.get(
"https://1.2.3.4:32400/playQueues/1235", status_code=HTTPStatus.NOT_FOUND
)
with pytest.raises(HomeAssistantError) as excinfo:
lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_BAD_PLAYQUEUE)
assert "PlayQueue '1235' could not be found" in str(excinfo.value)
# Test playqueue is created with shuffle
requests_mock.post("/playqueues", text=playqueue_created)
result = lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_SHUFFLE)
assert isinstance(result, plexapi.playqueue.PlayQueue)