Implement data coordinator for LastFM (#96942)

Co-authored-by: G Johansson <goran.johansson@shiftit.se>
pull/97164/head
Joost Lekkerkerker 2023-07-24 19:39:46 +02:00 committed by GitHub
parent e96bff1674
commit fe66c3414b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 158 additions and 62 deletions

View File

@ -4,12 +4,17 @@ from __future__ import annotations
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from .const import PLATFORMS from .const import DOMAIN, PLATFORMS
from .coordinator import LastFMDataUpdateCoordinator
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up lastfm from a config entry.""" """Set up lastfm from a config entry."""
coordinator = LastFMDataUpdateCoordinator(hass)
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(update_listener)) entry.async_on_unload(entry.add_update_listener(update_listener))

View File

@ -0,0 +1,89 @@
"""DataUpdateCoordinator for the LastFM integration."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import timedelta
from pylast import LastFMNetwork, PyLastError, Track
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_API_KEY
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
CONF_USERS,
DOMAIN,
LOGGER,
)
def format_track(track: Track | None) -> str | None:
"""Format the track."""
if track is None:
return None
return f"{track.artist} - {track.title}"
@dataclass
class LastFMUserData:
"""Data holder for LastFM data."""
play_count: int
image: str
now_playing: str | None
top_track: str | None
last_track: str | None
class LastFMDataUpdateCoordinator(DataUpdateCoordinator[dict[str, LastFMUserData]]):
"""A LastFM Data Update Coordinator."""
config_entry: ConfigEntry
_client: LastFMNetwork
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the LastFM data coordinator."""
super().__init__(
hass,
LOGGER,
name=DOMAIN,
update_interval=timedelta(seconds=30),
)
self._client = LastFMNetwork(api_key=self.config_entry.options[CONF_API_KEY])
async def _async_update_data(self) -> dict[str, LastFMUserData]:
res = {}
for username in self.config_entry.options[CONF_USERS]:
data = await self.hass.async_add_executor_job(self._get_user_data, username)
if data is not None:
res[username] = data
if not res:
raise UpdateFailed
return res
def _get_user_data(self, username: str) -> LastFMUserData | None:
user = self._client.get_user(username)
try:
play_count = user.get_playcount()
image = user.get_image()
now_playing = format_track(user.get_now_playing())
top_tracks = user.get_top_tracks(limit=1)
last_tracks = user.get_recent_tracks(limit=1)
except PyLastError as exc:
if self.last_update_success:
LOGGER.error("LastFM update for %s failed: %r", username, exc)
return None
top_track = None
if len(top_tracks) > 0:
top_track = format_track(top_tracks[0].item)
last_track = None
if len(last_tracks) > 0:
last_track = format_track(last_tracks[0].track)
return LastFMUserData(
play_count,
image,
now_playing,
top_track,
last_track,
)

View File

@ -2,8 +2,8 @@
from __future__ import annotations from __future__ import annotations
import hashlib import hashlib
from typing import Any
from pylast import LastFMNetwork, PyLastError, Track, User
import voluptuous as vol import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA, SensorEntity from homeassistant.components.sensor import PLATFORM_SCHEMA, SensorEntity
@ -16,6 +16,9 @@ from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
)
from .const import ( from .const import (
ATTR_LAST_PLAYED, ATTR_LAST_PLAYED,
@ -24,9 +27,9 @@ from .const import (
CONF_USERS, CONF_USERS,
DEFAULT_NAME, DEFAULT_NAME,
DOMAIN, DOMAIN,
LOGGER,
STATE_NOT_SCROBBLING, STATE_NOT_SCROBBLING,
) )
from .coordinator import LastFMDataUpdateCoordinator, LastFMUserData
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{ {
@ -36,11 +39,6 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
) )
def format_track(track: Track) -> str:
"""Format the track."""
return f"{track.artist} - {track.title}"
async def async_setup_platform( async def async_setup_platform(
hass: HomeAssistant, hass: HomeAssistant,
config: ConfigType, config: ConfigType,
@ -78,61 +76,76 @@ async def async_setup_entry(
) -> None: ) -> None:
"""Initialize the entries.""" """Initialize the entries."""
lastfm_api = LastFMNetwork(api_key=entry.options[CONF_API_KEY]) coordinator: LastFMDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]
async_add_entities( async_add_entities(
( (
LastFmSensor(lastfm_api.get_user(user), entry.entry_id) LastFmSensor(coordinator, username, entry.entry_id)
for user in entry.options[CONF_USERS] for username in entry.options[CONF_USERS]
), ),
True,
) )
class LastFmSensor(SensorEntity): class LastFmSensor(CoordinatorEntity[LastFMDataUpdateCoordinator], SensorEntity):
"""A class for the Last.fm account.""" """A class for the Last.fm account."""
_attr_attribution = "Data provided by Last.fm" _attr_attribution = "Data provided by Last.fm"
_attr_icon = "mdi:radio-fm" _attr_icon = "mdi:radio-fm"
def __init__(self, user: User, entry_id: str) -> None: def __init__(
self,
coordinator: LastFMDataUpdateCoordinator,
username: str,
entry_id: str,
) -> None:
"""Initialize the sensor.""" """Initialize the sensor."""
self._user = user super().__init__(coordinator)
self._attr_unique_id = hashlib.sha256(user.name.encode("utf-8")).hexdigest() self._username = username
self._attr_name = user.name self._attr_unique_id = hashlib.sha256(username.encode("utf-8")).hexdigest()
self._attr_name = username
self._attr_device_info = DeviceInfo( self._attr_device_info = DeviceInfo(
configuration_url="https://www.last.fm", configuration_url="https://www.last.fm",
entry_type=DeviceEntryType.SERVICE, entry_type=DeviceEntryType.SERVICE,
identifiers={(DOMAIN, f"{entry_id}_{self._attr_unique_id}")}, identifiers={(DOMAIN, f"{entry_id}_{self._attr_unique_id}")},
manufacturer=DEFAULT_NAME, manufacturer=DEFAULT_NAME,
name=f"{DEFAULT_NAME} {user.name}", name=f"{DEFAULT_NAME} {username}",
) )
def update(self) -> None: @property
"""Update device state.""" def user_data(self) -> LastFMUserData | None:
self._attr_native_value = STATE_NOT_SCROBBLING """Returns the user from the coordinator."""
try: return self.coordinator.data.get(self._username)
play_count = self._user.get_playcount()
self._attr_entity_picture = self._user.get_image() @property
now_playing = self._user.get_now_playing() def available(self) -> bool:
top_tracks = self._user.get_top_tracks(limit=1) """If user not found in coordinator, entity is unavailable."""
last_tracks = self._user.get_recent_tracks(limit=1) return super().available and self.user_data is not None
except PyLastError as exc:
self._attr_available = False @property
LOGGER.error("Failed to load LastFM user `%s`: %r", self._user.name, exc) def entity_picture(self) -> str | None:
return """Return user avatar."""
self._attr_available = True if self.user_data and self.user_data.image is not None:
if now_playing: return self.user_data.image
self._attr_native_value = format_track(now_playing) return None
self._attr_extra_state_attributes = {
@property
def native_value(self) -> str:
"""Return value of sensor."""
if self.user_data and self.user_data.now_playing is not None:
return self.user_data.now_playing
return STATE_NOT_SCROBBLING
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return state attributes."""
play_count = None
last_track = None
top_track = None
if self.user_data:
play_count = self.user_data.play_count
last_track = self.user_data.last_track
top_track = self.user_data.top_track
return {
ATTR_PLAY_COUNT: play_count, ATTR_PLAY_COUNT: play_count,
ATTR_LAST_PLAYED: None, ATTR_LAST_PLAYED: last_track,
ATTR_TOP_PLAYED: None, ATTR_TOP_PLAYED: top_track,
} }
if len(last_tracks) > 0:
self._attr_extra_state_attributes[ATTR_LAST_PLAYED] = format_track(
last_tracks[0].track
)
if len(top_tracks) > 0:
self._attr_extra_state_attributes[ATTR_TOP_PLAYED] = format_track(
top_tracks[0].item
)

View File

@ -75,7 +75,7 @@ class MockUser:
def get_image(self) -> str: def get_image(self) -> str:
"""Get mock image.""" """Get mock image."""
return "" return "image"
def get_recent_tracks(self, limit: int) -> list[MockLastTrack]: def get_recent_tracks(self, limit: int) -> list[MockLastTrack]:
"""Get mock recent tracks.""" """Get mock recent tracks."""

View File

@ -3,7 +3,7 @@
StateSnapshot({ StateSnapshot({
'attributes': ReadOnlyDict({ 'attributes': ReadOnlyDict({
'attribution': 'Data provided by Last.fm', 'attribution': 'Data provided by Last.fm',
'entity_picture': '', 'entity_picture': 'image',
'friendly_name': 'testaccount1', 'friendly_name': 'testaccount1',
'icon': 'mdi:radio-fm', 'icon': 'mdi:radio-fm',
'last_played': 'artist - title', 'last_played': 'artist - title',
@ -21,7 +21,7 @@
StateSnapshot({ StateSnapshot({
'attributes': ReadOnlyDict({ 'attributes': ReadOnlyDict({
'attribution': 'Data provided by Last.fm', 'attribution': 'Data provided by Last.fm',
'entity_picture': '', 'entity_picture': 'image',
'friendly_name': 'testaccount1', 'friendly_name': 'testaccount1',
'icon': 'mdi:radio-fm', 'icon': 'mdi:radio-fm',
'last_played': None, 'last_played': None,
@ -36,16 +36,5 @@
}) })
# --- # ---
# name: test_sensors[not_found_user] # name: test_sensors[not_found_user]
StateSnapshot({ None
'attributes': ReadOnlyDict({
'attribution': 'Data provided by Last.fm',
'friendly_name': 'testaccount1',
'icon': 'mdi:radio-fm',
}),
'context': <ANY>,
'entity_id': 'sensor.testaccount1',
'last_changed': <ANY>,
'last_updated': <ANY>,
'state': 'unavailable',
})
# --- # ---

View File

@ -14,7 +14,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers import issue_registry as ir from homeassistant.helpers import issue_registry as ir
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
from . import API_KEY, USERNAME_1 from . import API_KEY, USERNAME_1, MockUser
from .conftest import ComponentSetup from .conftest import ComponentSetup
from tests.common import MockConfigEntry from tests.common import MockConfigEntry
@ -28,7 +28,7 @@ LEGACY_CONFIG = {
async def test_legacy_migration(hass: HomeAssistant) -> None: async def test_legacy_migration(hass: HomeAssistant) -> None:
"""Test migration from yaml to config flow.""" """Test migration from yaml to config flow."""
with patch("pylast.User", return_value=None): with patch("pylast.User", return_value=MockUser()):
assert await async_setup_component(hass, Platform.SENSOR, LEGACY_CONFIG) assert await async_setup_component(hass, Platform.SENSOR, LEGACY_CONFIG)
await hass.async_block_till_done() await hass.async_block_till_done()
entries = hass.config_entries.async_entries(DOMAIN) entries = hass.config_entries.async_entries(DOMAIN)