"""Helpers to deal with Cast devices.""" from __future__ import annotations import asyncio import configparser from dataclasses import dataclass import logging from typing import Optional from urllib.parse import urlparse import aiohttp import attr import pychromecast from pychromecast import dial from pychromecast.const import CAST_TYPE_GROUP from pychromecast.models import CastInfo from homeassistant.core import HomeAssistant from homeassistant.helpers import aiohttp_client from .const import DOMAIN _LOGGER = logging.getLogger(__name__) _PLS_SECTION_PLAYLIST = "playlist" @attr.s(slots=True, frozen=True) class ChromecastInfo: """Class to hold all data about a chromecast for creating connections. This also has the same attributes as the mDNS fields by zeroconf. """ cast_info: CastInfo = attr.ib() is_dynamic_group = attr.ib(type=Optional[bool], default=None) @property def friendly_name(self) -> str: """Return the Friendly Name.""" return self.cast_info.friendly_name @property def is_audio_group(self) -> bool: """Return if the cast is an audio group.""" return self.cast_info.cast_type == CAST_TYPE_GROUP @property def uuid(self) -> bool: """Return the UUID.""" return self.cast_info.uuid def fill_out_missing_chromecast_info(self, hass: HomeAssistant) -> ChromecastInfo: """Return a new ChromecastInfo object with missing attributes filled in. Uses blocking HTTP / HTTPS. """ cast_info = self.cast_info if self.cast_info.cast_type is None or self.cast_info.manufacturer is None: unknown_models = hass.data[DOMAIN]["unknown_models"] if self.cast_info.model_name not in unknown_models: # Manufacturer and cast type is not available in mDNS data, get it over http cast_info = dial.get_cast_type( cast_info, zconf=ChromeCastZeroconf.get_zeroconf(), ) unknown_models[self.cast_info.model_name] = ( cast_info.cast_type, cast_info.manufacturer, ) report_issue = ( "create a bug report at " "https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue" "+label%3A%22integration%3A+cast%22" ) _LOGGER.info( "Fetched cast details for unknown model '%s' manufacturer: '%s', type: '%s'. Please %s", cast_info.model_name, cast_info.manufacturer, cast_info.cast_type, report_issue, ) else: cast_type, manufacturer = unknown_models[self.cast_info.model_name] cast_info = CastInfo( cast_info.services, cast_info.uuid, cast_info.model_name, cast_info.friendly_name, cast_info.host, cast_info.port, cast_type, manufacturer, ) if not self.is_audio_group or self.is_dynamic_group is not None: # We have all information, no need to check HTTP API. return ChromecastInfo(cast_info=cast_info) # Fill out missing group information via HTTP API. is_dynamic_group = False http_group_status = None http_group_status = dial.get_multizone_status( None, services=self.cast_info.services, zconf=ChromeCastZeroconf.get_zeroconf(), ) if http_group_status is not None: is_dynamic_group = any( g.uuid == self.cast_info.uuid for g in http_group_status.dynamic_groups ) return ChromecastInfo( cast_info=cast_info, is_dynamic_group=is_dynamic_group, ) class ChromeCastZeroconf: """Class to hold a zeroconf instance.""" __zconf = None @classmethod def set_zeroconf(cls, zconf): """Set zeroconf.""" cls.__zconf = zconf @classmethod def get_zeroconf(cls): """Get zeroconf.""" return cls.__zconf class CastStatusListener( pychromecast.controllers.media.MediaStatusListener, pychromecast.controllers.multizone.MultiZoneManagerListener, pychromecast.controllers.receiver.CastStatusListener, pychromecast.socket_client.ConnectionStatusListener, ): """Helper class to handle pychromecast status callbacks. Necessary because a CastDevice entity or dynamic group can create a new socket client and therefore callbacks from multiple chromecast connections can potentially arrive. This class allows invalidating past chromecast objects. """ def __init__(self, cast_device, chromecast, mz_mgr, mz_only=False): """Initialize the status listener.""" self._cast_device = cast_device self._uuid = chromecast.uuid self._valid = True self._mz_mgr = mz_mgr if cast_device._cast_info.is_audio_group: self._mz_mgr.add_multizone(chromecast) if mz_only: return chromecast.register_status_listener(self) chromecast.socket_client.media_controller.register_status_listener(self) chromecast.register_connection_listener(self) if not cast_device._cast_info.is_audio_group: self._mz_mgr.register_listener(chromecast.uuid, self) def new_cast_status(self, status): """Handle reception of a new CastStatus.""" if self._valid: self._cast_device.new_cast_status(status) def new_media_status(self, status): """Handle reception of a new MediaStatus.""" if self._valid: self._cast_device.new_media_status(status) def load_media_failed(self, item, error_code): """Handle reception of a new MediaStatus.""" if self._valid: self._cast_device.load_media_failed(item, error_code) def new_connection_status(self, status): """Handle reception of a new ConnectionStatus.""" if self._valid: self._cast_device.new_connection_status(status) def added_to_multizone(self, group_uuid): """Handle the cast added to a group.""" def removed_from_multizone(self, group_uuid): """Handle the cast removed from a group.""" if self._valid: self._cast_device.multizone_new_media_status(group_uuid, None) def multizone_new_cast_status(self, group_uuid, cast_status): """Handle reception of a new CastStatus for a group.""" def multizone_new_media_status(self, group_uuid, media_status): """Handle reception of a new MediaStatus for a group.""" if self._valid: self._cast_device.multizone_new_media_status(group_uuid, media_status) def invalidate(self): """Invalidate this status listener. All following callbacks won't be forwarded. """ # pylint: disable=protected-access if self._cast_device._cast_info.is_audio_group: self._mz_mgr.remove_multizone(self._uuid) else: self._mz_mgr.deregister_listener(self._uuid, self) self._valid = False class PlaylistError(Exception): """Exception wrapper for pls and m3u helpers.""" class PlaylistSupported(PlaylistError): """The playlist is supported by cast devices and should not be parsed.""" @dataclass class PlaylistItem: """Playlist item.""" length: str | None title: str | None url: str def _is_url(url): """Validate the URL can be parsed and at least has scheme + netloc.""" result = urlparse(url) return all([result.scheme, result.netloc]) async def _fetch_playlist(hass, url, supported_content_types): """Fetch a playlist from the given url.""" try: session = aiohttp_client.async_get_clientsession(hass, verify_ssl=False) async with session.get(url, timeout=5) as resp: charset = resp.charset or "utf-8" if resp.content_type in supported_content_types: raise PlaylistSupported try: playlist_data = (await resp.content.read(64 * 1024)).decode(charset) except ValueError as err: raise PlaylistError(f"Could not decode playlist {url}") from err except asyncio.TimeoutError as err: raise PlaylistError(f"Timeout while fetching playlist {url}") from err except aiohttp.client_exceptions.ClientError as err: raise PlaylistError(f"Error while fetching playlist {url}") from err return playlist_data async def parse_m3u(hass, url): """Very simple m3u parser. Based on https://github.com/dvndrsn/M3uParser/blob/master/m3uparser.py """ # From Mozilla gecko source: https://github.com/mozilla/gecko-dev/blob/c4c1adbae87bf2d128c39832d72498550ee1b4b8/dom/media/DecoderTraits.cpp#L47-L52 hls_content_types = ( # https://tools.ietf.org/html/draft-pantos-http-live-streaming-19#section-10 "application/vnd.apple.mpegurl", # Additional informal types used by Mozilla gecko not included as they # don't reliably indicate HLS streams ) m3u_data = await _fetch_playlist(hass, url, hls_content_types) m3u_lines = m3u_data.splitlines() playlist = [] length = None title = None for line in m3u_lines: line = line.strip() if line.startswith("#EXTINF:"): # Get length and title from #EXTINF line info = line.split("#EXTINF:")[1].split(",", 1) if len(info) != 2: _LOGGER.warning("Ignoring invalid extinf %s in playlist %s", line, url) continue length = info[0].split(" ", 1) title = info[1].strip() elif line.startswith("#EXT-X-VERSION:"): # HLS stream, supported by cast devices raise PlaylistSupported("HLS") elif line.startswith("#EXT-X-STREAM-INF:"): # HLS stream, supported by cast devices raise PlaylistSupported("HLS") elif line.startswith("#"): # Ignore other extensions continue elif len(line) != 0: # Get song path from all other, non-blank lines if not _is_url(line): raise PlaylistError(f"Invalid item {line} in playlist {url}") playlist.append(PlaylistItem(length=length, title=title, url=line)) # reset the song variables so it doesn't use the same EXTINF more than once length = None title = None return playlist async def parse_pls(hass, url): """Very simple pls parser. Based on https://github.com/mariob/plsparser/blob/master/src/plsparser.py """ pls_data = await _fetch_playlist(hass, url, ()) pls_parser = configparser.ConfigParser() try: pls_parser.read_string(pls_data, url) except configparser.Error as err: raise PlaylistError(f"Can't parse playlist {url}") from err if ( _PLS_SECTION_PLAYLIST not in pls_parser or pls_parser[_PLS_SECTION_PLAYLIST].getint("Version") != 2 ): raise PlaylistError(f"Invalid playlist {url}") try: num_entries = pls_parser.getint(_PLS_SECTION_PLAYLIST, "NumberOfEntries") except (configparser.NoOptionError, ValueError) as err: raise PlaylistError(f"Invalid NumberOfEntries in playlist {url}") from err playlist_section = pls_parser[_PLS_SECTION_PLAYLIST] playlist = [] for entry in range(1, num_entries + 1): file_option = f"File{entry}" if file_option not in playlist_section: _LOGGER.warning("Missing %s in pls from %s", file_option, url) continue item_url = playlist_section[file_option] if not _is_url(item_url): raise PlaylistError(f"Invalid item {item_url} in playlist {url}") playlist.append( PlaylistItem( length=playlist_section.get(f"Length{entry}"), title=playlist_section.get(f"Title{entry}"), url=item_url, ) ) return playlist async def parse_playlist(hass, url): """Parse an m3u or pls playlist.""" if url.endswith(".m3u") or url.endswith(".m3u8"): playlist = await parse_m3u(hass, url) else: playlist = await parse_pls(hass, url) if not playlist: raise PlaylistError(f"Empty playlist {url}") return playlist