Bump pychromecast to 14.0.0 (#108374)

* Pass unregister callback to cast HomeAssisstantController

* Update tests

* Bump pychromecast to 14.0.0

* Fix lint warning, adjust tests

* Improve test coverage
pull/110474/head
Erik Montnemery 2024-02-15 20:14:01 +01:00 committed by GitHub
parent dc09633cc2
commit 06a21d4ed9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 152 additions and 66 deletions

View File

@ -1,9 +1,7 @@
"""Consts for Cast integration."""
from __future__ import annotations
from typing import TYPE_CHECKING
from pychromecast.controllers.homeassistant import HomeAssistantController
from typing import TYPE_CHECKING, TypedDict
from homeassistant.helpers.dispatcher import SignalType
@ -33,8 +31,17 @@ SIGNAL_CAST_REMOVED: SignalType[ChromecastInfo] = SignalType("cast_removed")
# Dispatcher signal fired when a Chromecast should show a Home Assistant Cast view.
SIGNAL_HASS_CAST_SHOW_VIEW: SignalType[
HomeAssistantController, str, str, str | None
HomeAssistantControllerData, str, str, str | None
] = SignalType("cast_show_view")
CONF_IGNORE_CEC = "ignore_cec"
CONF_KNOWN_HOSTS = "known_hosts"
class HomeAssistantControllerData(TypedDict):
"""Data for creating a HomeAssistantController."""
hass_url: str
hass_uuid: str
client_id: str | None
refresh_token: str

View File

@ -182,10 +182,10 @@ class CastStatusListener(
if self._valid:
self._cast_device.new_media_status(status)
def load_media_failed(self, item, error_code):
def load_media_failed(self, queue_item_id, error_code):
"""Handle reception of a new MediaStatus."""
if self._valid:
self._cast_device.load_media_failed(item, error_code)
self._cast_device.load_media_failed(queue_item_id, error_code)
def new_connection_status(self, status):
"""Handle reception of a new ConnectionStatus."""

View File

@ -1,7 +1,6 @@
"""Home Assistant Cast integration for Cast."""
from __future__ import annotations
from pychromecast.controllers.homeassistant import HomeAssistantController
import voluptuous as vol
from homeassistant import auth, config_entries, core
@ -11,7 +10,7 @@ from homeassistant.helpers import config_validation as cv, dispatcher, instance_
from homeassistant.helpers.network import NoURLAvailableError, get_url
from homeassistant.helpers.service import async_register_admin_service
from .const import DOMAIN, SIGNAL_HASS_CAST_SHOW_VIEW
from .const import DOMAIN, SIGNAL_HASS_CAST_SHOW_VIEW, HomeAssistantControllerData
SERVICE_SHOW_VIEW = "show_lovelace_view"
ATTR_VIEW_PATH = "view_path"
@ -55,7 +54,7 @@ async def async_setup_ha_cast(
hass_uuid = await instance_id.async_get(hass)
controller = HomeAssistantController(
controller_data = HomeAssistantControllerData(
# If you are developing Home Assistant Cast, uncomment and set to
# your dev app id.
# app_id="5FE44367",
@ -68,7 +67,7 @@ async def async_setup_ha_cast(
dispatcher.async_dispatcher_send(
hass,
SIGNAL_HASS_CAST_SHOW_VIEW,
controller,
controller_data,
call.data[ATTR_ENTITY_ID],
call.data[ATTR_VIEW_PATH],
call.data.get(ATTR_URL_PATH),

View File

@ -14,6 +14,6 @@
"documentation": "https://www.home-assistant.io/integrations/cast",
"iot_class": "local_polling",
"loggers": ["casttube", "pychromecast"],
"requirements": ["PyChromecast==13.1.0"],
"requirements": ["PyChromecast==14.0.0"],
"zeroconf": ["_googlecast._tcp.local."]
}

View File

@ -61,6 +61,7 @@ from .const import (
SIGNAL_CAST_DISCOVERED,
SIGNAL_CAST_REMOVED,
SIGNAL_HASS_CAST_SHOW_VIEW,
HomeAssistantControllerData,
)
from .discovery import setup_internal_discovery
from .helpers import (
@ -389,15 +390,15 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
self.media_status_received = dt_util.utcnow()
self.schedule_update_ha_state()
def load_media_failed(self, item, error_code):
def load_media_failed(self, queue_item_id, error_code):
"""Handle load media failed."""
_LOGGER.debug(
"[%s %s] Load media failed with code %s(%s) for item %s",
"[%s %s] Load media failed with code %s(%s) for queue_item_id %s",
self.entity_id,
self._cast_info.friendly_name,
error_code,
MEDIA_PLAYER_ERROR_CODES.get(error_code, "unknown code"),
item,
queue_item_id,
)
def new_connection_status(self, connection_status):
@ -951,7 +952,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
def _handle_signal_show_view(
self,
controller: HomeAssistantController,
controller_data: HomeAssistantControllerData,
entity_id: str,
view_path: str,
url_path: str | None,
@ -961,6 +962,23 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
return
if self._hass_cast_controller is None:
def unregister() -> None:
"""Handle request to unregister the handler."""
if not self._hass_cast_controller or not self._chromecast:
return
_LOGGER.debug(
"[%s %s] Unregistering HomeAssistantController",
self.entity_id,
self._cast_info.friendly_name,
)
self._chromecast.unregister_handler(self._hass_cast_controller)
self._hass_cast_controller = None
controller = HomeAssistantController(
**controller_data, unregister=unregister
)
self._hass_cast_controller = controller
self._chromecast.register_handler(controller)

View File

@ -54,7 +54,7 @@ ProgettiHWSW==0.1.3
# PyBluez==0.22
# homeassistant.components.cast
PyChromecast==13.1.0
PyChromecast==14.0.0
# homeassistant.components.flick_electric
PyFlick==0.0.2

View File

@ -45,7 +45,7 @@ PlexAPI==4.15.9
ProgettiHWSW==0.1.3
# homeassistant.components.cast
PyChromecast==13.1.0
PyChromecast==14.0.0
# homeassistant.components.flick_electric
PyFlick==0.0.2

View File

@ -46,6 +46,16 @@ def get_chromecast_mock():
return MagicMock()
@pytest.fixture
def ha_controller_mock():
"""Mock HomeAssistantController."""
with patch(
"homeassistant.components.cast.media_player.HomeAssistantController",
MagicMock(),
) as ha_controller_mock:
yield ha_controller_mock
@pytest.fixture(autouse=True)
def cast_mock(
mz_mock,

View File

@ -40,11 +40,11 @@ async def test_service_show_view(hass: HomeAssistant, mock_zeroconf: None) -> No
)
assert len(calls) == 1
controller, entity_id, view_path, url_path = calls[0]
assert controller.hass_url == "https://example.com"
assert controller.client_id is None
controller_data, entity_id, view_path, url_path = calls[0]
assert controller_data["hass_url"] == "https://example.com"
assert controller_data["client_id"] is None
# Verify user did not accidentally submit their dev app id
assert controller.supporting_app_id == "A078F6B0"
assert "supporting_app_id" not in controller_data
assert entity_id == "media_player.kitchen"
assert view_path == "mock_path"
assert url_path is None
@ -75,7 +75,7 @@ async def test_service_show_view_dashboard(
)
assert len(calls) == 1
_controller, entity_id, view_path, url_path = calls[0]
_controller_data, entity_id, view_path, url_path = calls[0]
assert entity_id == "media_player.kitchen"
assert view_path == "mock_path"
assert url_path == "mock-dashboard"
@ -106,8 +106,8 @@ async def test_use_cloud_url(hass: HomeAssistant, mock_zeroconf: None) -> None:
)
assert len(calls) == 1
controller = calls[0][0]
assert controller.hass_url == "https://something.nabu.casa"
controller_data = calls[0][0]
assert controller_data["hass_url"] == "https://something.nabu.casa"
async def test_remove_entry(hass: HomeAssistant, mock_zeroconf: None) -> None:

View File

@ -14,6 +14,10 @@ import yarl
from homeassistant.components import media_player, tts
from homeassistant.components.cast import media_player as cast
from homeassistant.components.cast.const import (
SIGNAL_HASS_CAST_SHOW_VIEW,
HomeAssistantControllerData,
)
from homeassistant.components.cast.media_player import ChromecastInfo
from homeassistant.components.media_player import (
BrowseMedia,
@ -29,7 +33,10 @@ from homeassistant.const import (
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import device_registry as dr, entity_registry as er, network
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.setup import async_setup_component
from tests.common import (
@ -46,12 +53,8 @@ FakeUUID = UUID("57355bce-9364-4aa6-ac1e-eb849dccf9e2")
FakeUUID2 = UUID("57355bce-9364-4aa6-ac1e-eb849dccf9e4")
FakeGroupUUID = UUID("57355bce-9364-4aa6-ac1e-eb849dccf9e3")
FAKE_HOST_SERVICE = pychromecast.discovery.ServiceInfo(
pychromecast.const.SERVICE_TYPE_HOST, ("127.0.0.1", 8009)
)
FAKE_MDNS_SERVICE = pychromecast.discovery.ServiceInfo(
pychromecast.const.SERVICE_TYPE_MDNS, "the-service"
)
FAKE_HOST_SERVICE = pychromecast.discovery.HostServiceInfo("127.0.0.1", 8009)
FAKE_MDNS_SERVICE = pychromecast.discovery.MDNSServiceInfo("the-service")
UNDEFINED = object()
@ -77,9 +80,7 @@ def get_fake_chromecast_info(
"""Generate a Fake ChromecastInfo with the specified arguments."""
if service is None:
service = pychromecast.discovery.ServiceInfo(
pychromecast.const.SERVICE_TYPE_HOST, (host, port)
)
service = pychromecast.discovery.HostServiceInfo(host, port)
if cast_type is UNDEFINED:
cast_type = CAST_TYPE_GROUP if port != 8009 else CAST_TYPE_CHROMECAST
if manufacturer is UNDEFINED:
@ -144,7 +145,11 @@ async def async_setup_cast_internal_discovery(hass, config=None):
remove_callback = cast_browser.call_args[0][0].remove_cast
def discover_chromecast(
service: pychromecast.discovery.ServiceInfo, info: ChromecastInfo
service: (
pychromecast.discovery.HostServiceInfo
| pychromecast.discovery.MDNSServiceInfo
),
info: ChromecastInfo,
) -> None:
"""Discover a chromecast device."""
browser.devices[info.uuid] = pychromecast.discovery.CastInfo(
@ -214,7 +219,7 @@ async def async_setup_media_player_cast(hass: HomeAssistant, info: ChromecastInf
info.cast_info.cast_type,
info.cast_info.manufacturer,
)
discovery_callback(info.uuid, FAKE_MDNS_SERVICE[1])
discovery_callback(info.uuid, FAKE_MDNS_SERVICE.name)
await hass.async_block_till_done()
await hass.async_block_till_done()
@ -488,9 +493,7 @@ async def test_manual_cast_chromecasts_uuid(hass: HomeAssistant) -> None:
return_value=zconf_2,
):
discover_cast(
pychromecast.discovery.ServiceInfo(
pychromecast.const.SERVICE_TYPE_MDNS, "service2"
),
pychromecast.discovery.MDNSServiceInfo("service2"),
cast_2,
)
await hass.async_block_till_done()
@ -502,9 +505,7 @@ async def test_manual_cast_chromecasts_uuid(hass: HomeAssistant) -> None:
return_value=zconf_1,
):
discover_cast(
pychromecast.discovery.ServiceInfo(
pychromecast.const.SERVICE_TYPE_MDNS, "service1"
),
pychromecast.discovery.MDNSServiceInfo("service1"),
cast_1,
)
await hass.async_block_till_done()
@ -526,9 +527,7 @@ async def test_auto_cast_chromecasts(hass: HomeAssistant) -> None:
return_value=zconf_1,
):
discover_cast(
pychromecast.discovery.ServiceInfo(
pychromecast.const.SERVICE_TYPE_MDNS, "service2"
),
pychromecast.discovery.MDNSServiceInfo("service2"),
cast_2,
)
await hass.async_block_till_done()
@ -540,9 +539,7 @@ async def test_auto_cast_chromecasts(hass: HomeAssistant) -> None:
return_value=zconf_2,
):
discover_cast(
pychromecast.discovery.ServiceInfo(
pychromecast.const.SERVICE_TYPE_MDNS, "service1"
),
pychromecast.discovery.MDNSServiceInfo("service1"),
cast_1,
)
await hass.async_block_till_done()
@ -591,9 +588,7 @@ async def test_discover_dynamic_group(
wraps=create_task,
):
discover_cast(
pychromecast.discovery.ServiceInfo(
pychromecast.const.SERVICE_TYPE_MDNS, "service"
),
pychromecast.discovery.MDNSServiceInfo("service"),
cast_1,
)
await hass.async_block_till_done()
@ -619,9 +614,7 @@ async def test_discover_dynamic_group(
wraps=create_task,
):
discover_cast(
pychromecast.discovery.ServiceInfo(
pychromecast.const.SERVICE_TYPE_MDNS, "service"
),
pychromecast.discovery.MDNSServiceInfo("service"),
cast_2,
)
await hass.async_block_till_done()
@ -647,9 +640,7 @@ async def test_discover_dynamic_group(
wraps=create_task,
):
discover_cast(
pychromecast.discovery.ServiceInfo(
pychromecast.const.SERVICE_TYPE_MDNS, "service"
),
pychromecast.discovery.MDNSServiceInfo("service"),
cast_1,
)
await hass.async_block_till_done()
@ -670,9 +661,7 @@ async def test_discover_dynamic_group(
return_value=zconf_1,
):
remove_cast(
pychromecast.discovery.ServiceInfo(
pychromecast.const.SERVICE_TYPE_MDNS, "service"
),
pychromecast.discovery.MDNSServiceInfo("service"),
cast_1,
)
await hass.async_block_till_done()
@ -696,9 +685,7 @@ async def test_update_cast_chromecasts(hass: HomeAssistant) -> None:
return_value=zconf_1,
):
discover_cast(
pychromecast.discovery.ServiceInfo(
pychromecast.const.SERVICE_TYPE_MDNS, "service1"
),
pychromecast.discovery.MDNSServiceInfo("service1"),
cast_1,
)
await hass.async_block_till_done()
@ -710,9 +697,7 @@ async def test_update_cast_chromecasts(hass: HomeAssistant) -> None:
return_value=zconf_2,
):
discover_cast(
pychromecast.discovery.ServiceInfo(
pychromecast.const.SERVICE_TYPE_MDNS, "service2"
),
pychromecast.discovery.MDNSServiceInfo("service2"),
cast_2,
)
await hass.async_block_till_done()
@ -2289,3 +2274,70 @@ async def test_cast_platform_play_media_local_media(
app_data["media_id"]
== f"{network.get_url(hass)}/api/hls/bla/master_playlist.m3u8?token=bla"
)
async def test_ha_cast(hass: HomeAssistant, ha_controller_mock) -> None:
"""Test Home Assistant cast."""
entity_id = "media_player.speaker"
info = get_fake_chromecast_info()
chromecast, _ = await async_setup_media_player_cast(hass, info)
chromecast.cast_type = pychromecast.const.CAST_TYPE_CHROMECAST
ha_controller = MagicMock()
ha_controller_mock.return_value = ha_controller
# Test show view signal for other entity is ignored
controller_data = HomeAssistantControllerData(
hass_url="url",
hass_uuid="12341234",
client_id="client_id_1234",
refresh_token="refresh_token_1234",
)
async_dispatcher_send(
hass,
SIGNAL_HASS_CAST_SHOW_VIEW,
controller_data,
"media_player.other",
"view_path",
"url_path",
)
await hass.async_block_till_done()
ha_controller_mock.assert_not_called()
# Test show view signal is handled
controller_data = HomeAssistantControllerData(
hass_url="url",
hass_uuid="12341234",
client_id="client_id_1234",
refresh_token="refresh_token_1234",
)
async_dispatcher_send(
hass,
SIGNAL_HASS_CAST_SHOW_VIEW,
controller_data,
entity_id,
"view_path",
"url_path",
)
await hass.async_block_till_done()
ha_controller_mock.assert_called_once_with(
client_id="client_id_1234",
hass_url="url",
hass_uuid="12341234",
refresh_token="refresh_token_1234",
unregister=ANY,
)
ha_controller.show_lovelace_view.assert_called_once_with("view_path", "url_path")
chromecast.unregister_handler.assert_not_called()
# Call unregister callback
unregister_cb = ha_controller_mock.mock_calls[0][2]["unregister"]
unregister_cb()
chromecast.unregister_handler.assert_called_once_with(ha_controller)
# Test unregister callback called again
chromecast.unregister_handler.reset_mock()
unregister_cb()
chromecast.unregister_handler.assert_not_called()