1827 lines
62 KiB
Python
1827 lines
62 KiB
Python
"""The tests for the Cast Media player platform."""
|
|
# pylint: disable=protected-access
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, patch
|
|
from uuid import UUID
|
|
|
|
import attr
|
|
import pychromecast
|
|
from pychromecast.const import CAST_TYPE_CHROMECAST, CAST_TYPE_GROUP
|
|
import pytest
|
|
|
|
from homeassistant.components import media_player, tts
|
|
from homeassistant.components.cast import media_player as cast
|
|
from homeassistant.components.cast.media_player import ChromecastInfo
|
|
from homeassistant.components.media_player import BrowseMedia
|
|
from homeassistant.components.media_player.const import (
|
|
MEDIA_CLASS_APP,
|
|
MEDIA_CLASS_PLAYLIST,
|
|
SUPPORT_NEXT_TRACK,
|
|
SUPPORT_PAUSE,
|
|
SUPPORT_PLAY,
|
|
SUPPORT_PLAY_MEDIA,
|
|
SUPPORT_PREVIOUS_TRACK,
|
|
SUPPORT_SEEK,
|
|
SUPPORT_STOP,
|
|
SUPPORT_TURN_OFF,
|
|
SUPPORT_TURN_ON,
|
|
SUPPORT_VOLUME_MUTE,
|
|
SUPPORT_VOLUME_SET,
|
|
)
|
|
from homeassistant.config import async_process_ha_core_config
|
|
from homeassistant.const import (
|
|
ATTR_ENTITY_ID,
|
|
CAST_APP_ID_HOMEASSISTANT_LOVELACE,
|
|
EVENT_HOMEASSISTANT_STOP,
|
|
)
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers import entity_registry as er
|
|
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
|
from homeassistant.setup import async_setup_component
|
|
|
|
from tests.common import MockConfigEntry, assert_setup_component, mock_platform
|
|
from tests.components.media_player import common
|
|
|
|
# pylint: disable=invalid-name
|
|
FakeUUID = UUID("57355bce-9364-4aa6-ac1e-eb849dccf9e2")
|
|
FakeUUID2 = UUID("57355bce-9364-4aa6-ac1e-eb849dccf9e4")
|
|
FakeGroupUUID = UUID("57355bce-9364-4aa6-ac1e-eb849dccf9e3")
|
|
|
|
FAKE_HOST_SERVICE = pychromecast.discovery.ServiceInfo(
|
|
pychromecast.const.SERVICE_TYPE_HOST, ("127.0.0.1", 8009)
|
|
)
|
|
FAKE_MDNS_SERVICE = pychromecast.discovery.ServiceInfo(
|
|
pychromecast.const.SERVICE_TYPE_MDNS, "the-service"
|
|
)
|
|
|
|
|
|
def get_fake_chromecast(info: ChromecastInfo):
|
|
"""Generate a Fake Chromecast object with the specified arguments."""
|
|
mock = MagicMock(uuid=info.uuid)
|
|
mock.app_id = None
|
|
mock.media_controller.status = None
|
|
return mock
|
|
|
|
|
|
def get_fake_chromecast_info(
|
|
host="192.168.178.42", port=8009, service=None, uuid: UUID | None = FakeUUID
|
|
):
|
|
"""Generate a Fake ChromecastInfo with the specified arguments."""
|
|
|
|
if service is None:
|
|
service = pychromecast.discovery.ServiceInfo(
|
|
pychromecast.const.SERVICE_TYPE_HOST, (host, port)
|
|
)
|
|
return ChromecastInfo(
|
|
cast_info=pychromecast.models.CastInfo(
|
|
services={service},
|
|
uuid=uuid,
|
|
model_name="Chromecast",
|
|
friendly_name="Speaker",
|
|
host=host,
|
|
port=port,
|
|
cast_type=CAST_TYPE_GROUP if port != 8009 else CAST_TYPE_CHROMECAST,
|
|
manufacturer="Nabu Casa",
|
|
)
|
|
)
|
|
|
|
|
|
def get_fake_zconf(host="192.168.178.42", port=8009):
|
|
"""Generate a Fake Zeroconf object with the specified arguments."""
|
|
parsed_addresses = MagicMock()
|
|
parsed_addresses.return_value = [host]
|
|
service_info = MagicMock(parsed_addresses=parsed_addresses, port=port)
|
|
zconf = MagicMock()
|
|
zconf.get_service_info.return_value = service_info
|
|
return zconf
|
|
|
|
|
|
async def async_setup_cast(hass, config=None):
|
|
"""Set up the cast platform."""
|
|
if config is None:
|
|
config = {}
|
|
data = {**{"ignore_cec": [], "known_hosts": [], "uuid": []}, **config}
|
|
with patch(
|
|
"homeassistant.helpers.entity_platform.EntityPlatform._async_schedule_add_entities"
|
|
) as add_entities:
|
|
entry = MockConfigEntry(data=data, domain="cast")
|
|
entry.add_to_hass(hass)
|
|
assert await hass.config_entries.async_setup(entry.entry_id)
|
|
await hass.async_block_till_done()
|
|
|
|
return add_entities
|
|
|
|
|
|
async def async_setup_cast_internal_discovery(hass, config=None):
|
|
"""Set up the cast platform and the discovery."""
|
|
browser = MagicMock(devices={}, zc={})
|
|
|
|
with patch(
|
|
"homeassistant.components.cast.discovery.pychromecast.discovery.CastBrowser",
|
|
return_value=browser,
|
|
) as cast_browser:
|
|
add_entities = await async_setup_cast(hass, config)
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done()
|
|
|
|
assert browser.start_discovery.call_count == 1
|
|
|
|
discovery_callback = cast_browser.call_args[0][0].add_cast
|
|
remove_callback = cast_browser.call_args[0][0].remove_cast
|
|
|
|
def discover_chromecast(
|
|
service: pychromecast.discovery.ServiceInfo, info: ChromecastInfo
|
|
) -> None:
|
|
"""Discover a chromecast device."""
|
|
browser.devices[info.uuid] = pychromecast.discovery.CastInfo(
|
|
{service},
|
|
info.uuid,
|
|
info.cast_info.model_name,
|
|
info.friendly_name,
|
|
info.cast_info.host,
|
|
info.cast_info.port,
|
|
info.cast_info.cast_type,
|
|
info.cast_info.manufacturer,
|
|
)
|
|
discovery_callback(info.uuid, "")
|
|
|
|
def remove_chromecast(service_name: str, info: ChromecastInfo) -> None:
|
|
"""Remove a chromecast device."""
|
|
remove_callback(
|
|
info.uuid,
|
|
service_name,
|
|
pychromecast.models.CastInfo(
|
|
set(),
|
|
info.uuid,
|
|
info.cast_info.model_name,
|
|
info.cast_info.friendly_name,
|
|
info.cast_info.host,
|
|
info.cast_info.port,
|
|
info.cast_info.cast_type,
|
|
info.cast_info.manufacturer,
|
|
),
|
|
)
|
|
|
|
return discover_chromecast, remove_chromecast, add_entities
|
|
|
|
|
|
async def async_setup_media_player_cast(hass: HomeAssistant, info: ChromecastInfo):
|
|
"""Set up the cast platform with async_setup_component."""
|
|
browser = MagicMock(devices={}, zc={})
|
|
chromecast = get_fake_chromecast(info)
|
|
zconf = get_fake_zconf(host=info.cast_info.host, port=info.cast_info.port)
|
|
|
|
with patch(
|
|
"homeassistant.components.cast.discovery.pychromecast.get_chromecast_from_cast_info",
|
|
return_value=chromecast,
|
|
) as get_chromecast, patch(
|
|
"homeassistant.components.cast.discovery.pychromecast.discovery.CastBrowser",
|
|
return_value=browser,
|
|
) as cast_browser, patch(
|
|
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
|
|
return_value=zconf,
|
|
):
|
|
await async_setup_component(
|
|
hass, "cast", {"cast": {"media_player": {"uuid": info.uuid}}}
|
|
)
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done()
|
|
|
|
discovery_callback = cast_browser.call_args[0][0].add_cast
|
|
|
|
browser.devices[info.uuid] = pychromecast.discovery.CastInfo(
|
|
{FAKE_MDNS_SERVICE},
|
|
info.uuid,
|
|
info.cast_info.model_name,
|
|
info.friendly_name,
|
|
info.cast_info.host,
|
|
info.cast_info.port,
|
|
info.cast_info.cast_type,
|
|
info.cast_info.manufacturer,
|
|
)
|
|
discovery_callback(info.uuid, FAKE_MDNS_SERVICE[1])
|
|
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done()
|
|
assert get_chromecast.call_count == 1
|
|
|
|
def discover_chromecast(service_name: str, info: ChromecastInfo) -> None:
|
|
"""Discover a chromecast device."""
|
|
browser.devices[info.uuid] = pychromecast.discovery.CastInfo(
|
|
{FAKE_MDNS_SERVICE},
|
|
info.uuid,
|
|
info.cast_info.model_name,
|
|
info.friendly_name,
|
|
info.cast_info.host,
|
|
info.cast_info.port,
|
|
info.cast_info.cast_type,
|
|
info.cast_info.manufacturer,
|
|
)
|
|
discovery_callback(info.uuid, FAKE_MDNS_SERVICE[1])
|
|
|
|
return chromecast, discover_chromecast
|
|
|
|
|
|
def get_status_callbacks(chromecast_mock, mz_mock=None):
|
|
"""Get registered status callbacks from the chromecast mock."""
|
|
status_listener = chromecast_mock.register_status_listener.call_args[0][0]
|
|
cast_status_cb = status_listener.new_cast_status
|
|
|
|
connection_listener = chromecast_mock.register_connection_listener.call_args[0][0]
|
|
conn_status_cb = connection_listener.new_connection_status
|
|
|
|
mc = chromecast_mock.socket_client.media_controller
|
|
media_status_cb = mc.register_status_listener.call_args[0][0].new_media_status
|
|
|
|
if not mz_mock:
|
|
return cast_status_cb, conn_status_cb, media_status_cb
|
|
|
|
mz_listener = mz_mock.register_listener.call_args[0][1]
|
|
group_media_status_cb = mz_listener.multizone_new_media_status
|
|
return cast_status_cb, conn_status_cb, media_status_cb, group_media_status_cb
|
|
|
|
|
|
async def test_start_discovery_called_once(hass, castbrowser_mock):
|
|
"""Test pychromecast.start_discovery called exactly once."""
|
|
await async_setup_cast(hass)
|
|
assert castbrowser_mock.return_value.start_discovery.call_count == 1
|
|
|
|
await async_setup_cast(hass)
|
|
assert castbrowser_mock.return_value.start_discovery.call_count == 1
|
|
|
|
|
|
async def test_internal_discovery_callback_fill_out_group_fail(
|
|
hass, get_multizone_status_mock
|
|
):
|
|
"""Test internal discovery automatically filling out information."""
|
|
discover_cast, _, _ = await async_setup_cast_internal_discovery(hass)
|
|
info = get_fake_chromecast_info(host="host1", port=12345, service=FAKE_MDNS_SERVICE)
|
|
zconf = get_fake_zconf(host="host1", port=12345)
|
|
full_info = attr.evolve(
|
|
info,
|
|
cast_info=pychromecast.discovery.CastInfo(
|
|
services=info.cast_info.services,
|
|
uuid=FakeUUID,
|
|
model_name="Chromecast",
|
|
friendly_name="Speaker",
|
|
host=info.cast_info.host,
|
|
port=info.cast_info.port,
|
|
cast_type=info.cast_info.cast_type,
|
|
manufacturer=info.cast_info.manufacturer,
|
|
),
|
|
is_dynamic_group=False,
|
|
)
|
|
|
|
get_multizone_status_mock.assert_not_called()
|
|
get_multizone_status_mock.return_value = None
|
|
|
|
with patch(
|
|
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
|
|
return_value=zconf,
|
|
):
|
|
signal = MagicMock()
|
|
|
|
async_dispatcher_connect(hass, "cast_discovered", signal)
|
|
discover_cast(FAKE_MDNS_SERVICE, info)
|
|
await hass.async_block_till_done()
|
|
|
|
# when called with incomplete info, it should use HTTP to get missing
|
|
discover = signal.mock_calls[0][1][0]
|
|
assert discover == full_info
|
|
get_multizone_status_mock.assert_called_once()
|
|
|
|
|
|
async def test_internal_discovery_callback_fill_out_group(
|
|
hass, get_multizone_status_mock
|
|
):
|
|
"""Test internal discovery automatically filling out information."""
|
|
discover_cast, _, _ = await async_setup_cast_internal_discovery(hass)
|
|
info = get_fake_chromecast_info(host="host1", port=12345, service=FAKE_MDNS_SERVICE)
|
|
zconf = get_fake_zconf(host="host1", port=12345)
|
|
full_info = attr.evolve(
|
|
info,
|
|
cast_info=pychromecast.discovery.CastInfo(
|
|
services=info.cast_info.services,
|
|
uuid=FakeUUID,
|
|
model_name="Chromecast",
|
|
friendly_name="Speaker",
|
|
host=info.cast_info.host,
|
|
port=info.cast_info.port,
|
|
cast_type=info.cast_info.cast_type,
|
|
manufacturer=info.cast_info.manufacturer,
|
|
),
|
|
is_dynamic_group=False,
|
|
)
|
|
|
|
get_multizone_status_mock.assert_not_called()
|
|
get_multizone_status_mock.return_value = None
|
|
|
|
with patch(
|
|
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
|
|
return_value=zconf,
|
|
):
|
|
signal = MagicMock()
|
|
|
|
async_dispatcher_connect(hass, "cast_discovered", signal)
|
|
discover_cast(FAKE_MDNS_SERVICE, info)
|
|
await hass.async_block_till_done()
|
|
|
|
# when called with incomplete info, it should use HTTP to get missing
|
|
discover = signal.mock_calls[0][1][0]
|
|
assert discover == full_info
|
|
get_multizone_status_mock.assert_called_once()
|
|
|
|
|
|
async def test_stop_discovery_called_on_stop(hass, castbrowser_mock):
|
|
"""Test pychromecast.stop_discovery called on shutdown."""
|
|
# start_discovery should be called with empty config
|
|
await async_setup_cast(hass, {})
|
|
assert castbrowser_mock.return_value.start_discovery.call_count == 1
|
|
|
|
# stop discovery should be called on shutdown
|
|
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
|
|
await hass.async_block_till_done()
|
|
assert castbrowser_mock.return_value.stop_discovery.call_count == 1
|
|
|
|
|
|
async def test_create_cast_device_without_uuid(hass):
|
|
"""Test create a cast device with no UUId does not create an entity."""
|
|
info = get_fake_chromecast_info(uuid=None)
|
|
cast_device = cast._async_create_cast_device(hass, info)
|
|
assert cast_device is None
|
|
|
|
|
|
async def test_create_cast_device_with_uuid(hass):
|
|
"""Test create cast devices with UUID creates entities."""
|
|
added_casts = hass.data[cast.ADDED_CAST_DEVICES_KEY] = set()
|
|
info = get_fake_chromecast_info()
|
|
|
|
cast_device = cast._async_create_cast_device(hass, info)
|
|
assert cast_device is not None
|
|
assert info.uuid in added_casts
|
|
|
|
# Sending second time should not create new entity
|
|
cast_device = cast._async_create_cast_device(hass, info)
|
|
assert cast_device is None
|
|
|
|
|
|
async def test_manual_cast_chromecasts_uuid(hass):
|
|
"""Test only wanted casts are added for manual configuration."""
|
|
cast_1 = get_fake_chromecast_info(host="host_1", uuid=FakeUUID)
|
|
cast_2 = get_fake_chromecast_info(host="host_2", uuid=FakeUUID2)
|
|
zconf_1 = get_fake_zconf(host="host_1")
|
|
zconf_2 = get_fake_zconf(host="host_2")
|
|
|
|
# Manual configuration of media player with host "configured_host"
|
|
discover_cast, _, add_dev1 = await async_setup_cast_internal_discovery(
|
|
hass, config={"uuid": str(FakeUUID)}
|
|
)
|
|
with patch(
|
|
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
|
|
return_value=zconf_2,
|
|
):
|
|
discover_cast(
|
|
pychromecast.discovery.ServiceInfo(
|
|
pychromecast.const.SERVICE_TYPE_MDNS, "service2"
|
|
),
|
|
cast_2,
|
|
)
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done() # having tasks that add jobs
|
|
assert add_dev1.call_count == 0
|
|
|
|
with patch(
|
|
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
|
|
return_value=zconf_1,
|
|
):
|
|
discover_cast(
|
|
pychromecast.discovery.ServiceInfo(
|
|
pychromecast.const.SERVICE_TYPE_MDNS, "service1"
|
|
),
|
|
cast_1,
|
|
)
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done() # having tasks that add jobs
|
|
assert add_dev1.call_count == 1
|
|
|
|
|
|
async def test_auto_cast_chromecasts(hass):
|
|
"""Test all discovered casts are added for default configuration."""
|
|
cast_1 = get_fake_chromecast_info(host="some_host")
|
|
cast_2 = get_fake_chromecast_info(host="other_host", uuid=FakeUUID2)
|
|
zconf_1 = get_fake_zconf(host="some_host")
|
|
zconf_2 = get_fake_zconf(host="other_host")
|
|
|
|
# Manual configuration of media player with host "configured_host"
|
|
discover_cast, _, add_dev1 = await async_setup_cast_internal_discovery(hass)
|
|
with patch(
|
|
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
|
|
return_value=zconf_1,
|
|
):
|
|
discover_cast(
|
|
pychromecast.discovery.ServiceInfo(
|
|
pychromecast.const.SERVICE_TYPE_MDNS, "service2"
|
|
),
|
|
cast_2,
|
|
)
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done() # having tasks that add jobs
|
|
assert add_dev1.call_count == 1
|
|
|
|
with patch(
|
|
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
|
|
return_value=zconf_2,
|
|
):
|
|
discover_cast(
|
|
pychromecast.discovery.ServiceInfo(
|
|
pychromecast.const.SERVICE_TYPE_MDNS, "service1"
|
|
),
|
|
cast_1,
|
|
)
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done() # having tasks that add jobs
|
|
assert add_dev1.call_count == 2
|
|
|
|
|
|
async def test_discover_dynamic_group(
|
|
hass, get_multizone_status_mock, get_chromecast_mock, caplog
|
|
):
|
|
"""Test dynamic group does not create device or entity."""
|
|
cast_1 = get_fake_chromecast_info(host="host_1", port=23456, uuid=FakeUUID)
|
|
cast_2 = get_fake_chromecast_info(host="host_2", port=34567, uuid=FakeUUID2)
|
|
zconf_1 = get_fake_zconf(host="host_1", port=23456)
|
|
zconf_2 = get_fake_zconf(host="host_2", port=34567)
|
|
|
|
reg = er.async_get(hass)
|
|
|
|
# Fake dynamic group info
|
|
tmp1 = MagicMock()
|
|
tmp1.uuid = FakeUUID
|
|
tmp2 = MagicMock()
|
|
tmp2.uuid = FakeUUID2
|
|
get_multizone_status_mock.return_value.dynamic_groups = [tmp1, tmp2]
|
|
|
|
get_chromecast_mock.assert_not_called()
|
|
discover_cast, remove_cast, add_dev1 = await async_setup_cast_internal_discovery(
|
|
hass
|
|
)
|
|
|
|
# Discover cast service
|
|
with patch(
|
|
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
|
|
return_value=zconf_1,
|
|
):
|
|
discover_cast(
|
|
pychromecast.discovery.ServiceInfo(
|
|
pychromecast.const.SERVICE_TYPE_MDNS, "service"
|
|
),
|
|
cast_1,
|
|
)
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done() # having tasks that add jobs
|
|
get_chromecast_mock.assert_called()
|
|
get_chromecast_mock.reset_mock()
|
|
assert add_dev1.call_count == 0
|
|
assert reg.async_get_entity_id("media_player", "cast", cast_1.uuid) is None
|
|
|
|
# Discover other dynamic group cast service
|
|
with patch(
|
|
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
|
|
return_value=zconf_2,
|
|
):
|
|
discover_cast(
|
|
pychromecast.discovery.ServiceInfo(
|
|
pychromecast.const.SERVICE_TYPE_MDNS, "service"
|
|
),
|
|
cast_2,
|
|
)
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done() # having tasks that add jobs
|
|
get_chromecast_mock.assert_called()
|
|
get_chromecast_mock.reset_mock()
|
|
assert add_dev1.call_count == 0
|
|
assert reg.async_get_entity_id("media_player", "cast", cast_2.uuid) is None
|
|
|
|
# Get update for cast service
|
|
with patch(
|
|
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
|
|
return_value=zconf_1,
|
|
):
|
|
discover_cast(
|
|
pychromecast.discovery.ServiceInfo(
|
|
pychromecast.const.SERVICE_TYPE_MDNS, "service"
|
|
),
|
|
cast_1,
|
|
)
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done() # having tasks that add jobs
|
|
get_chromecast_mock.assert_not_called()
|
|
assert add_dev1.call_count == 0
|
|
assert reg.async_get_entity_id("media_player", "cast", cast_1.uuid) is None
|
|
|
|
# Remove cast service
|
|
assert "Disconnecting from chromecast" not in caplog.text
|
|
|
|
with patch(
|
|
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
|
|
return_value=zconf_1,
|
|
):
|
|
remove_cast(
|
|
pychromecast.discovery.ServiceInfo(
|
|
pychromecast.const.SERVICE_TYPE_MDNS, "service"
|
|
),
|
|
cast_1,
|
|
)
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done() # having tasks that add jobs
|
|
|
|
assert "Disconnecting from chromecast" in caplog.text
|
|
|
|
|
|
async def test_update_cast_chromecasts(hass):
|
|
"""Test discovery of same UUID twice only adds one cast."""
|
|
cast_1 = get_fake_chromecast_info(host="old_host")
|
|
cast_2 = get_fake_chromecast_info(host="new_host")
|
|
zconf_1 = get_fake_zconf(host="old_host")
|
|
zconf_2 = get_fake_zconf(host="new_host")
|
|
|
|
# Manual configuration of media player with host "configured_host"
|
|
discover_cast, _, add_dev1 = await async_setup_cast_internal_discovery(hass)
|
|
|
|
with patch(
|
|
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
|
|
return_value=zconf_1,
|
|
):
|
|
discover_cast(
|
|
pychromecast.discovery.ServiceInfo(
|
|
pychromecast.const.SERVICE_TYPE_MDNS, "service1"
|
|
),
|
|
cast_1,
|
|
)
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done() # having tasks that add jobs
|
|
assert add_dev1.call_count == 1
|
|
|
|
with patch(
|
|
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
|
|
return_value=zconf_2,
|
|
):
|
|
discover_cast(
|
|
pychromecast.discovery.ServiceInfo(
|
|
pychromecast.const.SERVICE_TYPE_MDNS, "service2"
|
|
),
|
|
cast_2,
|
|
)
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done() # having tasks that add jobs
|
|
assert add_dev1.call_count == 1
|
|
|
|
|
|
async def test_entity_availability(hass: HomeAssistant):
|
|
"""Test handling of connection status."""
|
|
entity_id = "media_player.speaker"
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
_, conn_status_cb, _ = get_status_callbacks(chromecast)
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "unavailable"
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "idle"
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "DISCONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "unavailable"
|
|
|
|
|
|
async def test_entity_cast_status(hass: HomeAssistant):
|
|
"""Test handling of cast status."""
|
|
entity_id = "media_player.speaker"
|
|
reg = er.async_get(hass)
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
chromecast.cast_type = pychromecast.const.CAST_TYPE_CHROMECAST
|
|
cast_status_cb, conn_status_cb, _ = get_status_callbacks(chromecast)
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state is not None
|
|
assert state.name == "Speaker"
|
|
assert state.state == "idle"
|
|
assert entity_id == reg.async_get_entity_id("media_player", "cast", str(info.uuid))
|
|
|
|
# No media status, pause, play, stop not supported
|
|
assert state.attributes.get("supported_features") == (
|
|
SUPPORT_PLAY_MEDIA
|
|
| SUPPORT_TURN_OFF
|
|
| SUPPORT_TURN_ON
|
|
| SUPPORT_VOLUME_MUTE
|
|
| SUPPORT_VOLUME_SET
|
|
)
|
|
|
|
cast_status = MagicMock()
|
|
cast_status.volume_level = 0.5
|
|
cast_status.volume_muted = False
|
|
cast_status_cb(cast_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
# Volume not hidden even if no app is active
|
|
assert state.attributes.get("volume_level") == 0.5
|
|
assert not state.attributes.get("is_volume_muted")
|
|
|
|
chromecast.app_id = "1234"
|
|
cast_status = MagicMock()
|
|
cast_status.volume_level = 0.5
|
|
cast_status.volume_muted = False
|
|
cast_status_cb(cast_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.attributes.get("volume_level") == 0.5
|
|
assert not state.attributes.get("is_volume_muted")
|
|
|
|
cast_status = MagicMock()
|
|
cast_status.volume_level = 0.2
|
|
cast_status.volume_muted = True
|
|
cast_status_cb(cast_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.attributes.get("volume_level") == 0.2
|
|
assert state.attributes.get("is_volume_muted")
|
|
|
|
# Disable support for volume control
|
|
cast_status = MagicMock()
|
|
cast_status.volume_control_type = "fixed"
|
|
cast_status_cb(cast_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.attributes.get("supported_features") == (
|
|
SUPPORT_PLAY_MEDIA | SUPPORT_TURN_OFF | SUPPORT_TURN_ON
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"cast_type,supported_features,supported_features_no_media",
|
|
[
|
|
(
|
|
pychromecast.const.CAST_TYPE_AUDIO,
|
|
SUPPORT_PAUSE
|
|
| SUPPORT_PLAY
|
|
| SUPPORT_PLAY_MEDIA
|
|
| SUPPORT_STOP
|
|
| SUPPORT_TURN_OFF
|
|
| SUPPORT_TURN_ON
|
|
| SUPPORT_VOLUME_MUTE
|
|
| SUPPORT_VOLUME_SET,
|
|
SUPPORT_PLAY_MEDIA
|
|
| SUPPORT_TURN_OFF
|
|
| SUPPORT_TURN_ON
|
|
| SUPPORT_VOLUME_MUTE
|
|
| SUPPORT_VOLUME_SET,
|
|
),
|
|
(
|
|
pychromecast.const.CAST_TYPE_CHROMECAST,
|
|
SUPPORT_PAUSE
|
|
| SUPPORT_PLAY
|
|
| SUPPORT_PLAY_MEDIA
|
|
| SUPPORT_STOP
|
|
| SUPPORT_TURN_OFF
|
|
| SUPPORT_TURN_ON
|
|
| SUPPORT_VOLUME_MUTE
|
|
| SUPPORT_VOLUME_SET,
|
|
SUPPORT_PLAY_MEDIA
|
|
| SUPPORT_TURN_OFF
|
|
| SUPPORT_TURN_ON
|
|
| SUPPORT_VOLUME_MUTE
|
|
| SUPPORT_VOLUME_SET,
|
|
),
|
|
(
|
|
pychromecast.const.CAST_TYPE_GROUP,
|
|
SUPPORT_PAUSE
|
|
| SUPPORT_PLAY
|
|
| SUPPORT_PLAY_MEDIA
|
|
| SUPPORT_STOP
|
|
| SUPPORT_TURN_OFF
|
|
| SUPPORT_VOLUME_MUTE
|
|
| SUPPORT_VOLUME_SET,
|
|
SUPPORT_PLAY_MEDIA
|
|
| SUPPORT_TURN_OFF
|
|
| SUPPORT_VOLUME_MUTE
|
|
| SUPPORT_VOLUME_SET,
|
|
),
|
|
],
|
|
)
|
|
async def test_supported_features(
|
|
hass: HomeAssistant, cast_type, supported_features, supported_features_no_media
|
|
):
|
|
"""Test supported features."""
|
|
entity_id = "media_player.speaker"
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
chromecast.cast_type = cast_type
|
|
_, conn_status_cb, media_status_cb = get_status_callbacks(chromecast)
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state is not None
|
|
assert state.name == "Speaker"
|
|
assert state.state == "idle"
|
|
assert state.attributes.get("supported_features") == supported_features_no_media
|
|
|
|
media_status = MagicMock(images=None)
|
|
media_status.supports_queue_next = False
|
|
media_status.supports_seek = False
|
|
media_status_cb(media_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.attributes.get("supported_features") == supported_features
|
|
|
|
|
|
async def test_entity_browse_media(hass: HomeAssistant, hass_ws_client):
|
|
"""Test we can browse media."""
|
|
await async_setup_component(hass, "media_source", {"media_source": {}})
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
_, conn_status_cb, _ = get_status_callbacks(chromecast)
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
client = await hass_ws_client()
|
|
await client.send_json(
|
|
{
|
|
"id": 1,
|
|
"type": "media_player/browse_media",
|
|
"entity_id": "media_player.speaker",
|
|
}
|
|
)
|
|
response = await client.receive_json()
|
|
assert response["success"]
|
|
expected_child_1 = {
|
|
"title": "Epic Sax Guy 10 Hours.mp4",
|
|
"media_class": "video",
|
|
"media_content_type": "video/mp4",
|
|
"media_content_id": "media-source://media_source/local/Epic Sax Guy 10 Hours.mp4",
|
|
"can_play": True,
|
|
"can_expand": False,
|
|
"children_media_class": None,
|
|
"thumbnail": None,
|
|
}
|
|
assert expected_child_1 in response["result"]["children"]
|
|
|
|
expected_child_2 = {
|
|
"title": "test.mp3",
|
|
"media_class": "music",
|
|
"media_content_type": "audio/mpeg",
|
|
"media_content_id": "media-source://media_source/local/test.mp3",
|
|
"can_play": True,
|
|
"can_expand": False,
|
|
"children_media_class": None,
|
|
"thumbnail": None,
|
|
}
|
|
assert expected_child_2 in response["result"]["children"]
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"cast_type",
|
|
[pychromecast.const.CAST_TYPE_AUDIO, pychromecast.const.CAST_TYPE_GROUP],
|
|
)
|
|
async def test_entity_browse_media_audio_only(
|
|
hass: HomeAssistant, hass_ws_client, cast_type
|
|
):
|
|
"""Test we can browse media."""
|
|
await async_setup_component(hass, "media_source", {"media_source": {}})
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
chromecast.cast_type = cast_type
|
|
_, conn_status_cb, _ = get_status_callbacks(chromecast)
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
client = await hass_ws_client()
|
|
await client.send_json(
|
|
{
|
|
"id": 1,
|
|
"type": "media_player/browse_media",
|
|
"entity_id": "media_player.speaker",
|
|
}
|
|
)
|
|
response = await client.receive_json()
|
|
assert response["success"]
|
|
expected_child_1 = {
|
|
"title": "Epic Sax Guy 10 Hours.mp4",
|
|
"media_class": "video",
|
|
"media_content_type": "video/mp4",
|
|
"media_content_id": "media-source://media_source/local/Epic Sax Guy 10 Hours.mp4",
|
|
"can_play": True,
|
|
"can_expand": False,
|
|
"children_media_class": None,
|
|
"thumbnail": None,
|
|
}
|
|
assert expected_child_1 not in response["result"]["children"]
|
|
|
|
expected_child_2 = {
|
|
"title": "test.mp3",
|
|
"media_class": "music",
|
|
"media_content_type": "audio/mpeg",
|
|
"media_content_id": "media-source://media_source/local/test.mp3",
|
|
"can_play": True,
|
|
"can_expand": False,
|
|
"children_media_class": None,
|
|
"thumbnail": None,
|
|
}
|
|
assert expected_child_2 in response["result"]["children"]
|
|
|
|
|
|
async def test_entity_play_media(hass: HomeAssistant, quick_play_mock):
|
|
"""Test playing media."""
|
|
entity_id = "media_player.speaker"
|
|
reg = er.async_get(hass)
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
_, conn_status_cb, _ = get_status_callbacks(chromecast)
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state is not None
|
|
assert state.name == "Speaker"
|
|
assert state.state == "idle"
|
|
assert entity_id == reg.async_get_entity_id("media_player", "cast", str(info.uuid))
|
|
|
|
# Play_media
|
|
await hass.services.async_call(
|
|
media_player.DOMAIN,
|
|
media_player.SERVICE_PLAY_MEDIA,
|
|
{
|
|
ATTR_ENTITY_ID: entity_id,
|
|
media_player.ATTR_MEDIA_CONTENT_TYPE: "audio",
|
|
media_player.ATTR_MEDIA_CONTENT_ID: "best.mp3",
|
|
media_player.ATTR_MEDIA_EXTRA: {"metadata": {"metadatatype": 3}},
|
|
},
|
|
blocking=True,
|
|
)
|
|
|
|
chromecast.media_controller.play_media.assert_not_called()
|
|
quick_play_mock.assert_called_once_with(
|
|
chromecast,
|
|
"default_media_receiver",
|
|
{
|
|
"media_id": "best.mp3",
|
|
"media_type": "audio",
|
|
"metadata": {"metadatatype": 3},
|
|
},
|
|
)
|
|
|
|
|
|
async def test_entity_play_media_cast(hass: HomeAssistant, quick_play_mock):
|
|
"""Test playing media with cast special features."""
|
|
entity_id = "media_player.speaker"
|
|
reg = er.async_get(hass)
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
_, conn_status_cb, _ = get_status_callbacks(chromecast)
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state is not None
|
|
assert state.name == "Speaker"
|
|
assert state.state == "idle"
|
|
assert entity_id == reg.async_get_entity_id("media_player", "cast", str(info.uuid))
|
|
|
|
# Play_media - cast with app ID
|
|
await common.async_play_media(hass, "cast", '{"app_id": "abc123"}', entity_id)
|
|
chromecast.start_app.assert_called_once_with("abc123")
|
|
|
|
# Play_media - cast with app name (quick play)
|
|
await hass.services.async_call(
|
|
media_player.DOMAIN,
|
|
media_player.SERVICE_PLAY_MEDIA,
|
|
{
|
|
ATTR_ENTITY_ID: entity_id,
|
|
media_player.ATTR_MEDIA_CONTENT_TYPE: "cast",
|
|
media_player.ATTR_MEDIA_CONTENT_ID: '{"app_name":"youtube"}',
|
|
media_player.ATTR_MEDIA_EXTRA: {"metadata": {"metadatatype": 3}},
|
|
},
|
|
blocking=True,
|
|
)
|
|
quick_play_mock.assert_called_once_with(
|
|
ANY, "youtube", {"metadata": {"metadatatype": 3}}
|
|
)
|
|
|
|
|
|
async def test_entity_play_media_cast_invalid(hass, caplog, quick_play_mock):
|
|
"""Test playing media."""
|
|
entity_id = "media_player.speaker"
|
|
reg = er.async_get(hass)
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
_, conn_status_cb, _ = get_status_callbacks(chromecast)
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state is not None
|
|
assert state.name == "Speaker"
|
|
assert state.state == "idle"
|
|
assert entity_id == reg.async_get_entity_id("media_player", "cast", str(info.uuid))
|
|
|
|
# play_media - media_type cast with invalid JSON
|
|
with pytest.raises(json.decoder.JSONDecodeError):
|
|
await common.async_play_media(hass, "cast", '{"app_id": "abc123"', entity_id)
|
|
assert "Invalid JSON in media_content_id" in caplog.text
|
|
chromecast.start_app.assert_not_called()
|
|
quick_play_mock.assert_not_called()
|
|
|
|
# Play_media - media_type cast with extra keys
|
|
await common.async_play_media(
|
|
hass, "cast", '{"app_id": "abc123", "extra": "data"}', entity_id
|
|
)
|
|
assert "Extra keys dict_keys(['extra']) were ignored" in caplog.text
|
|
chromecast.start_app.assert_called_once_with("abc123")
|
|
quick_play_mock.assert_not_called()
|
|
|
|
# Play_media - media_type cast with unsupported app
|
|
quick_play_mock.side_effect = NotImplementedError()
|
|
await common.async_play_media(hass, "cast", '{"app_name": "unknown"}', entity_id)
|
|
quick_play_mock.assert_called_once_with(ANY, "unknown", {})
|
|
assert "App unknown not supported" in caplog.text
|
|
|
|
|
|
async def test_entity_play_media_sign_URL(hass: HomeAssistant, quick_play_mock):
|
|
"""Test playing media."""
|
|
entity_id = "media_player.speaker"
|
|
|
|
await async_process_ha_core_config(
|
|
hass,
|
|
{"external_url": "http://example.com:8123"},
|
|
)
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
_, conn_status_cb, _ = get_status_callbacks(chromecast)
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
# Play_media
|
|
await common.async_play_media(hass, "audio", "/best.mp3", entity_id)
|
|
quick_play_mock.assert_called_once_with(
|
|
chromecast, "default_media_receiver", {"media_id": ANY, "media_type": "audio"}
|
|
)
|
|
assert quick_play_mock.call_args[0][2]["media_id"].startswith(
|
|
"http://example.com:8123/best.mp3?authSig="
|
|
)
|
|
|
|
|
|
async def test_entity_media_content_type(hass: HomeAssistant):
|
|
"""Test various content types."""
|
|
entity_id = "media_player.speaker"
|
|
reg = er.async_get(hass)
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
_, conn_status_cb, media_status_cb = get_status_callbacks(chromecast)
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state is not None
|
|
assert state.name == "Speaker"
|
|
assert state.state == "idle"
|
|
assert entity_id == reg.async_get_entity_id("media_player", "cast", str(info.uuid))
|
|
|
|
media_status = MagicMock(images=None)
|
|
media_status.media_is_movie = False
|
|
media_status.media_is_musictrack = False
|
|
media_status.media_is_tvshow = False
|
|
media_status_cb(media_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.attributes.get("media_content_type") is None
|
|
|
|
media_status.media_is_tvshow = True
|
|
media_status_cb(media_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.attributes.get("media_content_type") == "tvshow"
|
|
|
|
media_status.media_is_tvshow = False
|
|
media_status.media_is_musictrack = True
|
|
media_status_cb(media_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.attributes.get("media_content_type") == "music"
|
|
|
|
media_status.media_is_musictrack = True
|
|
media_status.media_is_movie = True
|
|
media_status_cb(media_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.attributes.get("media_content_type") == "movie"
|
|
|
|
|
|
async def test_entity_control(hass: HomeAssistant):
|
|
"""Test various device and media controls."""
|
|
entity_id = "media_player.speaker"
|
|
reg = er.async_get(hass)
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
chromecast.cast_type = pychromecast.const.CAST_TYPE_CHROMECAST
|
|
_, conn_status_cb, media_status_cb = get_status_callbacks(chromecast)
|
|
|
|
# Fake connection status
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
# Fake media status
|
|
media_status = MagicMock(images=None)
|
|
media_status.supports_queue_next = False
|
|
media_status.supports_seek = False
|
|
media_status_cb(media_status)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state is not None
|
|
assert state.name == "Speaker"
|
|
assert state.state == "playing"
|
|
assert entity_id == reg.async_get_entity_id("media_player", "cast", str(info.uuid))
|
|
|
|
assert state.attributes.get("supported_features") == (
|
|
SUPPORT_PAUSE
|
|
| SUPPORT_PLAY
|
|
| SUPPORT_PLAY_MEDIA
|
|
| SUPPORT_STOP
|
|
| SUPPORT_TURN_OFF
|
|
| SUPPORT_TURN_ON
|
|
| SUPPORT_VOLUME_MUTE
|
|
| SUPPORT_VOLUME_SET
|
|
)
|
|
|
|
# Turn on
|
|
await common.async_turn_on(hass, entity_id)
|
|
chromecast.play_media.assert_called_once_with(
|
|
"https://www.home-assistant.io/images/cast/splash.png", ANY
|
|
)
|
|
chromecast.quit_app.reset_mock()
|
|
|
|
# Turn off
|
|
await common.async_turn_off(hass, entity_id)
|
|
chromecast.quit_app.assert_called_once_with()
|
|
|
|
# Mute
|
|
await common.async_mute_volume(hass, True, entity_id)
|
|
chromecast.set_volume_muted.assert_called_once_with(True)
|
|
|
|
# Volume
|
|
await common.async_set_volume_level(hass, 0.33, entity_id)
|
|
chromecast.set_volume.assert_called_once_with(0.33)
|
|
|
|
# Media play
|
|
await common.async_media_play(hass, entity_id)
|
|
chromecast.media_controller.play.assert_called_once_with()
|
|
|
|
# Media pause
|
|
await common.async_media_pause(hass, entity_id)
|
|
chromecast.media_controller.pause.assert_called_once_with()
|
|
|
|
# Media previous
|
|
await common.async_media_previous_track(hass, entity_id)
|
|
chromecast.media_controller.queue_prev.assert_not_called()
|
|
|
|
# Media next
|
|
await common.async_media_next_track(hass, entity_id)
|
|
chromecast.media_controller.queue_next.assert_not_called()
|
|
|
|
# Media seek
|
|
await common.async_media_seek(hass, 123, entity_id)
|
|
chromecast.media_controller.seek.assert_not_called()
|
|
|
|
# Enable support for queue and seek
|
|
media_status = MagicMock(images=None)
|
|
media_status.supports_queue_next = True
|
|
media_status.supports_seek = True
|
|
media_status_cb(media_status)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state.attributes.get("supported_features") == (
|
|
SUPPORT_PAUSE
|
|
| SUPPORT_PLAY
|
|
| SUPPORT_PLAY_MEDIA
|
|
| SUPPORT_STOP
|
|
| SUPPORT_TURN_OFF
|
|
| SUPPORT_TURN_ON
|
|
| SUPPORT_PREVIOUS_TRACK
|
|
| SUPPORT_NEXT_TRACK
|
|
| SUPPORT_SEEK
|
|
| SUPPORT_VOLUME_MUTE
|
|
| SUPPORT_VOLUME_SET
|
|
)
|
|
|
|
# Media previous
|
|
await common.async_media_previous_track(hass, entity_id)
|
|
chromecast.media_controller.queue_prev.assert_called_once_with()
|
|
|
|
# Media next
|
|
await common.async_media_next_track(hass, entity_id)
|
|
chromecast.media_controller.queue_next.assert_called_once_with()
|
|
|
|
# Media seek
|
|
await common.async_media_seek(hass, 123, entity_id)
|
|
chromecast.media_controller.seek.assert_called_once_with(123)
|
|
|
|
|
|
# Some smart TV's with Google TV report "Netflix", not the Netflix app's ID
|
|
@pytest.mark.parametrize(
|
|
"app_id, state_no_media",
|
|
[(pychromecast.APP_YOUTUBE, "idle"), ("Netflix", "playing")],
|
|
)
|
|
async def test_entity_media_states(hass: HomeAssistant, app_id, state_no_media):
|
|
"""Test various entity media states."""
|
|
entity_id = "media_player.speaker"
|
|
reg = er.async_get(hass)
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
cast_status_cb, conn_status_cb, media_status_cb = get_status_callbacks(chromecast)
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state is not None
|
|
assert state.name == "Speaker"
|
|
assert state.state == "idle"
|
|
assert entity_id == reg.async_get_entity_id("media_player", "cast", str(info.uuid))
|
|
|
|
# App id updated, but no media status
|
|
chromecast.app_id = app_id
|
|
cast_status = MagicMock()
|
|
cast_status_cb(cast_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == state_no_media
|
|
|
|
# Got media status
|
|
media_status = MagicMock(images=None)
|
|
media_status.player_is_playing = True
|
|
media_status_cb(media_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "playing"
|
|
|
|
media_status.player_is_playing = False
|
|
media_status.player_is_paused = True
|
|
media_status_cb(media_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "paused"
|
|
|
|
media_status.player_is_paused = False
|
|
media_status.player_is_idle = True
|
|
media_status_cb(media_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "idle"
|
|
|
|
# No media status, app is still running
|
|
media_status_cb(None)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == state_no_media
|
|
|
|
# App no longer running
|
|
chromecast.app_id = pychromecast.IDLE_APP_ID
|
|
cast_status = MagicMock()
|
|
cast_status_cb(cast_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "idle"
|
|
|
|
# No cast status
|
|
chromecast.is_idle = False
|
|
cast_status_cb(None)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "unknown"
|
|
|
|
|
|
async def test_entity_media_states_lovelace_app(hass: HomeAssistant):
|
|
"""Test various entity media states when the lovelace app is active."""
|
|
entity_id = "media_player.speaker"
|
|
reg = er.async_get(hass)
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
cast_status_cb, conn_status_cb, media_status_cb = get_status_callbacks(chromecast)
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state is not None
|
|
assert state.name == "Speaker"
|
|
assert state.state == "idle"
|
|
assert entity_id == reg.async_get_entity_id("media_player", "cast", str(info.uuid))
|
|
|
|
chromecast.app_id = CAST_APP_ID_HOMEASSISTANT_LOVELACE
|
|
cast_status = MagicMock()
|
|
cast_status_cb(cast_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "playing"
|
|
assert state.attributes.get("supported_features") == (
|
|
SUPPORT_PLAY_MEDIA | SUPPORT_TURN_OFF | SUPPORT_VOLUME_MUTE | SUPPORT_VOLUME_SET
|
|
)
|
|
|
|
media_status = MagicMock(images=None)
|
|
media_status.player_is_playing = True
|
|
media_status_cb(media_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "playing"
|
|
|
|
media_status.player_is_playing = False
|
|
media_status.player_is_paused = True
|
|
media_status_cb(media_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "playing"
|
|
|
|
media_status.player_is_paused = False
|
|
media_status.player_is_idle = True
|
|
media_status_cb(media_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "playing"
|
|
|
|
chromecast.app_id = pychromecast.IDLE_APP_ID
|
|
media_status.player_is_idle = False
|
|
chromecast.is_idle = True
|
|
media_status_cb(media_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "idle"
|
|
|
|
chromecast.is_idle = False
|
|
media_status_cb(media_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "unknown"
|
|
|
|
|
|
async def test_group_media_states(hass, mz_mock):
|
|
"""Test media states are read from group if entity has no state."""
|
|
entity_id = "media_player.speaker"
|
|
reg = er.async_get(hass)
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
_, conn_status_cb, media_status_cb, group_media_status_cb = get_status_callbacks(
|
|
chromecast, mz_mock
|
|
)
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state is not None
|
|
assert state.name == "Speaker"
|
|
assert state.state == "idle"
|
|
assert entity_id == reg.async_get_entity_id("media_player", "cast", str(info.uuid))
|
|
|
|
group_media_status = MagicMock(images=None)
|
|
player_media_status = MagicMock(images=None)
|
|
|
|
# Player has no state, group is playing -> Should report 'playing'
|
|
group_media_status.player_is_playing = True
|
|
group_media_status_cb(str(FakeGroupUUID), group_media_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "playing"
|
|
|
|
# Player is paused, group is playing -> Should report 'paused'
|
|
player_media_status.player_is_playing = False
|
|
player_media_status.player_is_paused = True
|
|
media_status_cb(player_media_status)
|
|
await hass.async_block_till_done()
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "paused"
|
|
|
|
# Player is in unknown state, group is playing -> Should report 'playing'
|
|
player_media_status.player_state = "UNKNOWN"
|
|
media_status_cb(player_media_status)
|
|
await hass.async_block_till_done()
|
|
state = hass.states.get(entity_id)
|
|
assert state.state == "playing"
|
|
|
|
|
|
async def test_group_media_control(hass, mz_mock, quick_play_mock):
|
|
"""Test media controls are handled by group if entity has no state."""
|
|
entity_id = "media_player.speaker"
|
|
reg = er.async_get(hass)
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
|
|
_, conn_status_cb, media_status_cb, group_media_status_cb = get_status_callbacks(
|
|
chromecast, mz_mock
|
|
)
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
state = hass.states.get(entity_id)
|
|
assert state is not None
|
|
assert state.name == "Speaker"
|
|
assert state.state == "idle"
|
|
assert entity_id == reg.async_get_entity_id("media_player", "cast", str(info.uuid))
|
|
|
|
group_media_status = MagicMock(images=None)
|
|
player_media_status = MagicMock(images=None)
|
|
|
|
# Player has no state, group is playing -> Should forward calls to group
|
|
group_media_status.player_is_playing = True
|
|
group_media_status_cb(str(FakeGroupUUID), group_media_status)
|
|
await common.async_media_play(hass, entity_id)
|
|
grp_media = mz_mock.get_multizone_mediacontroller(str(FakeGroupUUID))
|
|
assert grp_media.play.called
|
|
assert not chromecast.media_controller.play.called
|
|
|
|
# Player is paused, group is playing -> Should not forward
|
|
player_media_status.player_is_playing = False
|
|
player_media_status.player_is_paused = True
|
|
media_status_cb(player_media_status)
|
|
await common.async_media_pause(hass, entity_id)
|
|
grp_media = mz_mock.get_multizone_mediacontroller(str(FakeGroupUUID))
|
|
assert not grp_media.pause.called
|
|
assert chromecast.media_controller.pause.called
|
|
|
|
# Player is in unknown state, group is playing -> Should forward to group
|
|
player_media_status.player_state = "UNKNOWN"
|
|
media_status_cb(player_media_status)
|
|
await common.async_media_stop(hass, entity_id)
|
|
grp_media = mz_mock.get_multizone_mediacontroller(str(FakeGroupUUID))
|
|
assert grp_media.stop.called
|
|
assert not chromecast.media_controller.stop.called
|
|
|
|
# Verify play_media is not forwarded
|
|
await common.async_play_media(hass, "music", "best.mp3", entity_id)
|
|
assert not grp_media.play_media.called
|
|
assert not chromecast.media_controller.play_media.called
|
|
quick_play_mock.assert_called_once_with(
|
|
chromecast,
|
|
"default_media_receiver",
|
|
{"media_id": "best.mp3", "media_type": "music"},
|
|
)
|
|
|
|
|
|
async def test_failed_cast_on_idle(hass, caplog):
|
|
"""Test no warning when unless player went idle with reason "ERROR"."""
|
|
info = get_fake_chromecast_info()
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
_, _, media_status_cb = get_status_callbacks(chromecast)
|
|
|
|
media_status = MagicMock(images=None)
|
|
media_status.player_is_idle = False
|
|
media_status.idle_reason = "ERROR"
|
|
media_status.content_id = "http://example.com:8123/tts.mp3"
|
|
media_status_cb(media_status)
|
|
assert "Failed to cast media" not in caplog.text
|
|
|
|
media_status = MagicMock(images=None)
|
|
media_status.player_is_idle = True
|
|
media_status.idle_reason = "Other"
|
|
media_status.content_id = "http://example.com:8123/tts.mp3"
|
|
media_status_cb(media_status)
|
|
assert "Failed to cast media" not in caplog.text
|
|
|
|
media_status = MagicMock(images=None)
|
|
media_status.player_is_idle = True
|
|
media_status.idle_reason = "ERROR"
|
|
media_status.content_id = "http://example.com:8123/tts.mp3"
|
|
media_status_cb(media_status)
|
|
assert "Failed to cast media http://example.com:8123/tts.mp3." in caplog.text
|
|
|
|
|
|
async def test_failed_cast_other_url(hass, caplog):
|
|
"""Test warning when casting from internal_url fails."""
|
|
with assert_setup_component(1, tts.DOMAIN):
|
|
assert await async_setup_component(
|
|
hass,
|
|
tts.DOMAIN,
|
|
{tts.DOMAIN: {"platform": "demo", "base_url": "http://example.local:8123"}},
|
|
)
|
|
|
|
info = get_fake_chromecast_info()
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
_, _, media_status_cb = get_status_callbacks(chromecast)
|
|
|
|
media_status = MagicMock(images=None)
|
|
media_status.player_is_idle = True
|
|
media_status.idle_reason = "ERROR"
|
|
media_status.content_id = "http://example.com:8123/tts.mp3"
|
|
media_status_cb(media_status)
|
|
assert "Failed to cast media http://example.com:8123/tts.mp3." in caplog.text
|
|
|
|
|
|
async def test_failed_cast_internal_url(hass, caplog):
|
|
"""Test warning when casting from internal_url fails."""
|
|
await async_process_ha_core_config(
|
|
hass,
|
|
{"internal_url": "http://example.local:8123"},
|
|
)
|
|
with assert_setup_component(1, tts.DOMAIN):
|
|
assert await async_setup_component(
|
|
hass, tts.DOMAIN, {tts.DOMAIN: {"platform": "demo"}}
|
|
)
|
|
|
|
info = get_fake_chromecast_info()
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
_, _, media_status_cb = get_status_callbacks(chromecast)
|
|
|
|
media_status = MagicMock(images=None)
|
|
media_status.player_is_idle = True
|
|
media_status.idle_reason = "ERROR"
|
|
media_status.content_id = "http://example.local:8123/tts.mp3"
|
|
media_status_cb(media_status)
|
|
assert (
|
|
"Failed to cast media http://example.local:8123/tts.mp3 from internal_url"
|
|
in caplog.text
|
|
)
|
|
|
|
|
|
async def test_failed_cast_external_url(hass, caplog):
|
|
"""Test warning when casting from external_url fails."""
|
|
await async_process_ha_core_config(
|
|
hass,
|
|
{"external_url": "http://example.com:8123"},
|
|
)
|
|
with assert_setup_component(1, tts.DOMAIN):
|
|
assert await async_setup_component(
|
|
hass,
|
|
tts.DOMAIN,
|
|
{tts.DOMAIN: {"platform": "demo", "base_url": "http://example.com:8123"}},
|
|
)
|
|
|
|
info = get_fake_chromecast_info()
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
_, _, media_status_cb = get_status_callbacks(chromecast)
|
|
|
|
media_status = MagicMock(images=None)
|
|
media_status.player_is_idle = True
|
|
media_status.idle_reason = "ERROR"
|
|
media_status.content_id = "http://example.com:8123/tts.mp3"
|
|
media_status_cb(media_status)
|
|
assert (
|
|
"Failed to cast media http://example.com:8123/tts.mp3 from external_url"
|
|
in caplog.text
|
|
)
|
|
|
|
|
|
async def test_failed_cast_tts_base_url(hass, caplog):
|
|
"""Test warning when casting from tts.base_url fails."""
|
|
with assert_setup_component(1, tts.DOMAIN):
|
|
assert await async_setup_component(
|
|
hass,
|
|
tts.DOMAIN,
|
|
{tts.DOMAIN: {"platform": "demo", "base_url": "http://example.local:8123"}},
|
|
)
|
|
|
|
info = get_fake_chromecast_info()
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
_, _, media_status_cb = get_status_callbacks(chromecast)
|
|
|
|
media_status = MagicMock(images=None)
|
|
media_status.player_is_idle = True
|
|
media_status.idle_reason = "ERROR"
|
|
media_status.content_id = "http://example.local:8123/tts.mp3"
|
|
media_status_cb(media_status)
|
|
assert (
|
|
"Failed to cast media http://example.local:8123/tts.mp3 from tts.base_url"
|
|
in caplog.text
|
|
)
|
|
|
|
|
|
async def test_disconnect_on_stop(hass: HomeAssistant):
|
|
"""Test cast device disconnects socket on stop."""
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
|
|
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
|
|
await hass.async_block_till_done()
|
|
assert chromecast.disconnect.call_count == 1
|
|
|
|
|
|
async def test_entry_setup_no_config(hass: HomeAssistant):
|
|
"""Test deprecated empty yaml config.."""
|
|
await async_setup_component(hass, "cast", {})
|
|
await hass.async_block_till_done()
|
|
|
|
assert not hass.config_entries.async_entries("cast")
|
|
|
|
|
|
async def test_entry_setup_empty_config(hass: HomeAssistant):
|
|
"""Test deprecated empty yaml config.."""
|
|
await async_setup_component(hass, "cast", {"cast": {}})
|
|
await hass.async_block_till_done()
|
|
|
|
config_entry = hass.config_entries.async_entries("cast")[0]
|
|
assert config_entry.data["uuid"] == []
|
|
assert config_entry.data["ignore_cec"] == []
|
|
|
|
|
|
async def test_entry_setup_single_config(hass: HomeAssistant):
|
|
"""Test deprecated yaml config with a single config media_player."""
|
|
await async_setup_component(
|
|
hass, "cast", {"cast": {"media_player": {"uuid": "bla", "ignore_cec": "cast1"}}}
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
config_entry = hass.config_entries.async_entries("cast")[0]
|
|
assert config_entry.data["uuid"] == ["bla"]
|
|
assert config_entry.data["ignore_cec"] == ["cast1"]
|
|
|
|
assert pychromecast.IGNORE_CEC == ["cast1"]
|
|
|
|
|
|
async def test_entry_setup_list_config(hass: HomeAssistant):
|
|
"""Test deprecated yaml config with multiple media_players."""
|
|
await async_setup_component(
|
|
hass,
|
|
"cast",
|
|
{
|
|
"cast": {
|
|
"media_player": [
|
|
{"uuid": "bla", "ignore_cec": "cast1"},
|
|
{"uuid": "blu", "ignore_cec": ["cast2", "cast3"]},
|
|
]
|
|
}
|
|
},
|
|
)
|
|
await hass.async_block_till_done()
|
|
|
|
config_entry = hass.config_entries.async_entries("cast")[0]
|
|
assert set(config_entry.data["uuid"]) == {"bla", "blu"}
|
|
assert set(config_entry.data["ignore_cec"]) == {"cast1", "cast2", "cast3"}
|
|
assert set(pychromecast.IGNORE_CEC) == {"cast1", "cast2", "cast3"}
|
|
|
|
|
|
async def test_invalid_cast_platform(hass: HomeAssistant, caplog):
|
|
"""Test we can play media through a cast platform."""
|
|
cast_platform_mock = Mock()
|
|
del cast_platform_mock.async_get_media_browser_root_object
|
|
del cast_platform_mock.async_browse_media
|
|
del cast_platform_mock.async_play_media
|
|
mock_platform(hass, "test.cast", cast_platform_mock)
|
|
|
|
await async_setup_component(hass, "test", {"test": {}})
|
|
await hass.async_block_till_done()
|
|
|
|
info = get_fake_chromecast_info()
|
|
await async_setup_media_player_cast(hass, info)
|
|
|
|
assert "Invalid cast platform <Mock id" in caplog.text
|
|
|
|
|
|
async def test_cast_platform_play_media(hass: HomeAssistant, quick_play_mock, caplog):
|
|
"""Test we can play media through a cast platform."""
|
|
entity_id = "media_player.speaker"
|
|
|
|
_can_play = True
|
|
|
|
def can_play(*args):
|
|
return _can_play
|
|
|
|
cast_platform_mock = Mock(
|
|
async_get_media_browser_root_object=AsyncMock(return_value=[]),
|
|
async_browse_media=AsyncMock(return_value=None),
|
|
async_play_media=AsyncMock(side_effect=can_play),
|
|
)
|
|
mock_platform(hass, "test.cast", cast_platform_mock)
|
|
|
|
await async_setup_component(hass, "test", {"test": {}})
|
|
await hass.async_block_till_done()
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
assert "Invalid cast platform <Mock id" not in caplog.text
|
|
_, conn_status_cb, _ = get_status_callbacks(chromecast)
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
# This will play using the cast platform
|
|
await hass.services.async_call(
|
|
media_player.DOMAIN,
|
|
media_player.SERVICE_PLAY_MEDIA,
|
|
{
|
|
ATTR_ENTITY_ID: entity_id,
|
|
media_player.ATTR_MEDIA_CONTENT_TYPE: "audio",
|
|
media_player.ATTR_MEDIA_CONTENT_ID: "best.mp3",
|
|
media_player.ATTR_MEDIA_EXTRA: {"metadata": {"metadatatype": 3}},
|
|
},
|
|
blocking=True,
|
|
)
|
|
|
|
# Assert the media player attempt to play media through the cast platform
|
|
cast_platform_mock.async_play_media.assert_called_once_with(
|
|
hass, entity_id, chromecast, "audio", "best.mp3"
|
|
)
|
|
|
|
# Assert pychromecast is not used to play media
|
|
chromecast.media_controller.play_media.assert_not_called()
|
|
quick_play_mock.assert_not_called()
|
|
|
|
# This will not play using the cast platform
|
|
_can_play = False
|
|
cast_platform_mock.async_play_media.reset_mock()
|
|
await hass.services.async_call(
|
|
media_player.DOMAIN,
|
|
media_player.SERVICE_PLAY_MEDIA,
|
|
{
|
|
ATTR_ENTITY_ID: entity_id,
|
|
media_player.ATTR_MEDIA_CONTENT_TYPE: "audio",
|
|
media_player.ATTR_MEDIA_CONTENT_ID: "best.mp3",
|
|
media_player.ATTR_MEDIA_EXTRA: {"metadata": {"metadatatype": 3}},
|
|
},
|
|
blocking=True,
|
|
)
|
|
|
|
# Assert the media player attempt to play media through the cast platform
|
|
cast_platform_mock.async_play_media.assert_called_once_with(
|
|
hass, entity_id, chromecast, "audio", "best.mp3"
|
|
)
|
|
|
|
# Assert pychromecast is used to play media
|
|
chromecast.media_controller.play_media.assert_not_called()
|
|
quick_play_mock.assert_called()
|
|
|
|
|
|
async def test_cast_platform_browse_media(hass: HomeAssistant, hass_ws_client):
|
|
"""Test we can play media through a cast platform."""
|
|
cast_platform_mock = Mock(
|
|
async_get_media_browser_root_object=AsyncMock(
|
|
return_value=[
|
|
BrowseMedia(
|
|
title="Spotify",
|
|
media_class=MEDIA_CLASS_APP,
|
|
media_content_id="",
|
|
media_content_type="spotify",
|
|
thumbnail="https://brands.home-assistant.io/_/spotify/logo.png",
|
|
can_play=False,
|
|
can_expand=True,
|
|
)
|
|
]
|
|
),
|
|
async_browse_media=AsyncMock(
|
|
return_value=BrowseMedia(
|
|
title="Spotify Favourites",
|
|
media_class=MEDIA_CLASS_PLAYLIST,
|
|
media_content_id="",
|
|
media_content_type="spotify",
|
|
can_play=True,
|
|
can_expand=False,
|
|
)
|
|
),
|
|
async_play_media=AsyncMock(return_value=False),
|
|
)
|
|
mock_platform(hass, "test.cast", cast_platform_mock)
|
|
|
|
await async_setup_component(hass, "test", {"test": {}})
|
|
await async_setup_component(hass, "media_source", {"media_source": {}})
|
|
await hass.async_block_till_done()
|
|
|
|
info = get_fake_chromecast_info()
|
|
|
|
chromecast, _ = await async_setup_media_player_cast(hass, info)
|
|
_, conn_status_cb, _ = get_status_callbacks(chromecast)
|
|
|
|
connection_status = MagicMock()
|
|
connection_status.status = "CONNECTED"
|
|
conn_status_cb(connection_status)
|
|
await hass.async_block_till_done()
|
|
|
|
client = await hass_ws_client()
|
|
await client.send_json(
|
|
{
|
|
"id": 1,
|
|
"type": "media_player/browse_media",
|
|
"entity_id": "media_player.speaker",
|
|
}
|
|
)
|
|
response = await client.receive_json()
|
|
assert response["success"]
|
|
expected_child = {
|
|
"title": "Spotify",
|
|
"media_class": "app",
|
|
"media_content_type": "spotify",
|
|
"media_content_id": "",
|
|
"can_play": False,
|
|
"can_expand": True,
|
|
"children_media_class": None,
|
|
"thumbnail": "https://brands.home-assistant.io/_/spotify/logo.png",
|
|
}
|
|
assert expected_child in response["result"]["children"]
|
|
|
|
client = await hass_ws_client()
|
|
await client.send_json(
|
|
{
|
|
"id": 2,
|
|
"type": "media_player/browse_media",
|
|
"entity_id": "media_player.speaker",
|
|
"media_content_id": "",
|
|
"media_content_type": "spotify",
|
|
}
|
|
)
|
|
response = await client.receive_json()
|
|
assert response["success"]
|
|
expected_response = {
|
|
"title": "Spotify Favourites",
|
|
"media_class": "playlist",
|
|
"media_content_type": "spotify",
|
|
"media_content_id": "",
|
|
"can_play": True,
|
|
"can_expand": False,
|
|
"children_media_class": None,
|
|
"thumbnail": None,
|
|
"children": [],
|
|
}
|
|
assert response["result"] == expected_response
|