core/homeassistant/components/braviatv/coordinator.py

365 lines
13 KiB
Python

"""Update coordinator for Bravia TV integration."""
from __future__ import annotations
from collections.abc import Awaitable, Callable, Coroutine, Iterable
from datetime import datetime, timedelta
from functools import wraps
import logging
from typing import Any, Concatenate, Final
from pybravia import (
BraviaAuthError,
BraviaClient,
BraviaConnectionError,
BraviaConnectionTimeout,
BraviaError,
BraviaNotFound,
BraviaTurnedOff,
)
from homeassistant.components.media_player import MediaType
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_CLIENT_ID, CONF_PIN
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
CONF_NICKNAME,
CONF_USE_PSK,
DOMAIN,
LEGACY_CLIENT_ID,
NICKNAME_PREFIX,
SourceType,
)
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL: Final = timedelta(seconds=10)
type BraviaTVConfigEntry = ConfigEntry["BraviaTVCoordinator"]
def catch_braviatv_errors[_BraviaTVCoordinatorT: BraviaTVCoordinator, **_P](
func: Callable[Concatenate[_BraviaTVCoordinatorT, _P], Awaitable[None]],
) -> Callable[Concatenate[_BraviaTVCoordinatorT, _P], Coroutine[Any, Any, None]]:
"""Catch Bravia errors."""
@wraps(func)
async def wrapper(
self: _BraviaTVCoordinatorT,
*args: _P.args,
**kwargs: _P.kwargs,
) -> None:
"""Catch Bravia errors and log message."""
try:
await func(self, *args, **kwargs)
except BraviaError as err:
_LOGGER.error("Command error: %s", err)
await self.async_request_refresh()
return wrapper
class BraviaTVCoordinator(DataUpdateCoordinator[None]):
"""Representation of a Bravia TV Coordinator."""
config_entry: BraviaTVConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: BraviaTVConfigEntry,
client: BraviaClient,
) -> None:
"""Initialize Bravia TV Client."""
self.client = client
self.pin = config_entry.data[CONF_PIN]
self.use_psk = config_entry.data.get(CONF_USE_PSK, False)
self.client_id = config_entry.data.get(CONF_CLIENT_ID, LEGACY_CLIENT_ID)
self.nickname = config_entry.data.get(CONF_NICKNAME, NICKNAME_PREFIX)
self.source: str | None = None
self.source_list: list[str] = []
self.source_map: dict[str, dict] = {}
self.media_title: str | None = None
self.media_channel: str | None = None
self.media_content_id: str | None = None
self.media_content_type: MediaType | None = None
self.media_uri: str | None = None
self.media_duration: int | None = None
self.media_position: int | None = None
self.media_position_updated_at: datetime | None = None
self.volume_level: float | None = None
self.volume_target: str | None = None
self.volume_muted = False
self.is_on = False
self.connected = False
self.skipped_updates = 0
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=DOMAIN,
update_interval=SCAN_INTERVAL,
request_refresh_debouncer=Debouncer(
hass, _LOGGER, cooldown=1.0, immediate=False
),
)
def _sources_extend(
self,
sources: list[dict],
source_type: SourceType,
add_to_list: bool = False,
sort_by: str | None = None,
) -> None:
"""Extend source map and source list."""
if sort_by:
sources = sorted(sources, key=lambda d: d.get(sort_by, ""))
for item in sources:
title = item.get("title")
uri = item.get("uri")
if not title or not uri:
continue
self.source_map[uri] = {**item, "type": source_type}
if add_to_list and title not in self.source_list:
self.source_list.append(title)
async def _async_update_data(self) -> None:
"""Connect and fetch data."""
try:
if not self.connected:
try:
if self.use_psk:
await self.client.connect(psk=self.pin)
else:
await self.client.connect(
pin=self.pin,
clientid=self.client_id,
nickname=self.nickname,
)
self.connected = True
except BraviaAuthError as err:
raise ConfigEntryAuthFailed from err
power_status = await self.client.get_power_status()
self.is_on = power_status == "active"
self.skipped_updates = 0
if self.is_on is False:
return
if not self.source_map:
await self.async_update_sources()
await self.async_update_volume()
await self.async_update_playing()
except BraviaNotFound as err:
if self.skipped_updates < 10:
self.connected = False
self.skipped_updates += 1
_LOGGER.debug("Update skipped, Bravia API service is reloading")
return
raise UpdateFailed("Error communicating with device") from err
except (BraviaConnectionError, BraviaConnectionTimeout, BraviaTurnedOff):
self.is_on = False
self.connected = False
_LOGGER.debug("Update skipped, Bravia TV is off")
except BraviaError as err:
self.is_on = False
self.connected = False
raise UpdateFailed("Error communicating with device") from err
async def async_update_volume(self) -> None:
"""Update volume information."""
volume_info = await self.client.get_volume_info()
if (volume_level := volume_info.get("volume")) is not None:
self.volume_level = volume_level / 100
self.volume_muted = volume_info.get("mute", False)
self.volume_target = volume_info.get("target")
async def async_update_playing(self) -> None:
"""Update current playing information."""
playing_info = await self.client.get_playing_info()
self.media_title = playing_info.get("title")
self.media_uri = playing_info.get("uri")
self.media_duration = playing_info.get("durationSec")
self.media_channel = None
self.media_content_id = None
self.media_content_type = None
self.source = None
if start_datetime := playing_info.get("startDateTime"):
start_datetime = datetime.fromisoformat(start_datetime)
current_datetime = datetime.now().replace(tzinfo=start_datetime.tzinfo)
self.media_position = int(
(current_datetime - start_datetime).total_seconds()
)
self.media_position_updated_at = datetime.now()
else:
self.media_position = None
self.media_position_updated_at = None
if self.media_uri:
self.media_content_id = self.media_uri
if self.media_uri[:8] == "extInput":
self.source = playing_info.get("title")
if self.media_uri[:2] == "tv":
self.media_content_id = playing_info.get("dispNum")
self.media_title = (
playing_info.get("programTitle") or self.media_content_id
)
self.media_channel = playing_info.get("title") or self.media_content_id
self.media_content_type = MediaType.CHANNEL
if not playing_info:
self.media_title = "Smart TV"
self.media_content_type = MediaType.APP
async def async_update_sources(self) -> None:
"""Update all sources."""
self.source_list = []
self.source_map = {}
inputs = await self.client.get_external_status()
self._sources_extend(inputs, SourceType.INPUT, add_to_list=True)
apps = await self.client.get_app_list()
self._sources_extend(apps, SourceType.APP, sort_by="title")
channels = await self.client.get_content_list_all("tv")
self._sources_extend(channels, SourceType.CHANNEL)
async def async_source_start(self, uri: str, source_type: SourceType | str) -> None:
"""Select source by uri."""
if source_type == SourceType.APP:
await self.client.set_active_app(uri)
else:
await self.client.set_play_content(uri)
async def async_source_find(
self, query: str, source_type: SourceType | str
) -> None:
"""Find and select source by query."""
if query.startswith(("extInput:", "tv:", "com.sony.dtv.")):
return await self.async_source_start(query, source_type)
coarse_uri = None
is_numeric_search = source_type == SourceType.CHANNEL and query.isnumeric()
for uri, item in self.source_map.items():
if item["type"] == source_type:
if is_numeric_search:
num = item.get("dispNum")
if num and int(query) == int(num):
return await self.async_source_start(uri, source_type)
else:
title: str = item["title"]
if query.lower() == title.lower():
return await self.async_source_start(uri, source_type)
if query.lower() in title.lower():
coarse_uri = uri
if coarse_uri:
return await self.async_source_start(coarse_uri, source_type)
raise ValueError(f"Not found {source_type}: {query}")
@catch_braviatv_errors
async def async_turn_on(self) -> None:
"""Turn the device on."""
await self.client.turn_on()
@catch_braviatv_errors
async def async_turn_off(self) -> None:
"""Turn off device."""
await self.client.turn_off()
@catch_braviatv_errors
async def async_set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1."""
await self.client.volume_level(round(volume * 100))
@catch_braviatv_errors
async def async_volume_up(self) -> None:
"""Send volume up command to device."""
await self.client.volume_up()
@catch_braviatv_errors
async def async_volume_down(self) -> None:
"""Send volume down command to device."""
await self.client.volume_down()
@catch_braviatv_errors
async def async_volume_mute(self, mute: bool) -> None:
"""Send mute command to device."""
await self.client.volume_mute()
@catch_braviatv_errors
async def async_media_play(self) -> None:
"""Send play command to device."""
await self.client.play()
@catch_braviatv_errors
async def async_media_pause(self) -> None:
"""Send pause command to device."""
await self.client.pause()
@catch_braviatv_errors
async def async_media_stop(self) -> None:
"""Send stop command to device."""
await self.client.stop()
@catch_braviatv_errors
async def async_media_next_track(self) -> None:
"""Send next track command."""
if self.media_content_type == MediaType.CHANNEL:
await self.client.channel_up()
else:
await self.client.next_track()
@catch_braviatv_errors
async def async_media_previous_track(self) -> None:
"""Send previous track command."""
if self.media_content_type == MediaType.CHANNEL:
await self.client.channel_down()
else:
await self.client.previous_track()
@catch_braviatv_errors
async def async_play_media(
self, media_type: MediaType | str, media_id: str, **kwargs: Any
) -> None:
"""Play a piece of media."""
if media_type not in (MediaType.APP, MediaType.CHANNEL):
raise ValueError(f"Invalid media type: {media_type}")
await self.async_source_find(media_id, media_type)
@catch_braviatv_errors
async def async_select_source(self, source: str) -> None:
"""Set the input source."""
await self.async_source_find(source, SourceType.INPUT)
@catch_braviatv_errors
async def async_send_command(self, command: Iterable[str], repeats: int) -> None:
"""Send command to device."""
for _ in range(repeats):
for cmd in command:
response = await self.client.send_command(cmd)
if not response:
commands = await self.client.get_command_list()
commands_keys = ", ".join(commands.keys())
# Logging an error instead of raising a ValueError
# https://github.com/home-assistant/core/pull/77329#discussion_r955768245
_LOGGER.error(
"Unsupported command: %s, list of available commands: %s",
cmd,
commands_keys,
)
@catch_braviatv_errors
async def async_reboot_device(self) -> None:
"""Send command to reboot the device."""
await self.client.reboot()
@catch_braviatv_errors
async def async_terminate_apps(self) -> None:
"""Send command to terminate all applications."""
await self.client.terminate_apps()