Add data update coordinator to the Tautulli integration (#54706)

* Add data update coordinator to the Tautulli integration

* update .coveragerc

* Add guard for UpdateFailed

* Apply suggestions from code review

Co-authored-by: Chris Talkington <chris@talkingtontech.com>

* ignore issues

Co-authored-by: Chris Talkington <chris@talkingtontech.com>
pull/55432/head
Joakim Sørensen 2021-08-28 03:39:12 +02:00 committed by GitHub
parent 61a7ce173c
commit 470aa7e871
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 120 additions and 95 deletions

View File

@ -1032,6 +1032,8 @@ omit =
homeassistant/components/tank_utility/sensor.py
homeassistant/components/tankerkoenig/*
homeassistant/components/tapsaff/binary_sensor.py
homeassistant/components/tautulli/const.py
homeassistant/components/tautulli/coordinator.py
homeassistant/components/tautulli/sensor.py
homeassistant/components/ted5000/sensor.py
homeassistant/components/telegram/notify.py

View File

@ -0,0 +1,5 @@
"""Constants for the Tautulli integration."""
from logging import Logger, getLogger
DOMAIN = "tautulli"
LOGGER: Logger = getLogger(__package__)

View File

@ -0,0 +1,52 @@
"""Data update coordinator for the Tautulli integration."""
from __future__ import annotations
import asyncio
from datetime import timedelta
from pytautulli import (
PyTautulli,
PyTautulliApiActivity,
PyTautulliApiHomeStats,
PyTautulliApiUser,
PyTautulliException,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN, LOGGER
class TautulliDataUpdateCoordinator(DataUpdateCoordinator):
"""Data update coordinator for the Tautulli integration."""
def __init__(
self,
hass: HomeAssistant,
api_client: PyTautulli,
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass=hass,
logger=LOGGER,
name=DOMAIN,
update_interval=timedelta(seconds=10),
)
self.api_client = api_client
self.activity: PyTautulliApiActivity | None = None
self.home_stats: list[PyTautulliApiHomeStats] | None = None
self.users: list[PyTautulliApiUser] | None = None
async def _async_update_data(self) -> None:
"""Get the latest data from Tautulli."""
try:
[self.activity, self.home_stats, self.users] = await asyncio.gather(
*[
self.api_client.async_get_activity(),
self.api_client.async_get_home_stats(),
self.api_client.async_get_users(),
]
)
except PyTautulliException as exception:
raise UpdateFailed(exception) from exception

View File

@ -1,6 +1,4 @@
"""A platform which allows you to get information from Tautulli."""
from datetime import timedelta
from pytautulli import PyTautulli
import voluptuous as vol
@ -15,10 +13,11 @@ from homeassistant.const import (
CONF_SSL,
CONF_VERIFY_SSL,
)
from homeassistant.exceptions import PlatformNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .coordinator import TautulliDataUpdateCoordinator
CONF_MONITORED_USERS = "monitored_users"
@ -28,8 +27,6 @@ DEFAULT_PATH = ""
DEFAULT_SSL = False
DEFAULT_VERIFY_SSL = True
TIME_BETWEEN_UPDATES = timedelta(seconds=10)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_API_KEY): cv.string,
@ -59,90 +56,34 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
verify_ssl = config.get(CONF_VERIFY_SSL)
session = async_get_clientsession(hass, verify_ssl)
tautulli = TautulliData(
PyTautulli(
api_token=api_key,
hostname=host,
session=session,
verify_ssl=verify_ssl,
port=port,
ssl=use_ssl,
base_api_path=path,
)
api_client = PyTautulli(
api_token=api_key,
hostname=host,
session=session,
verify_ssl=verify_ssl,
port=port,
ssl=use_ssl,
base_api_path=path,
)
await tautulli.async_update()
if not tautulli.activity or not tautulli.home_stats or not tautulli.users:
raise PlatformNotReady
coordinator = TautulliDataUpdateCoordinator(hass=hass, api_client=api_client)
sensor = [TautulliSensor(tautulli, name, monitored_conditions, user)]
entities = [TautulliSensor(coordinator, name, monitored_conditions, user)]
async_add_entities(sensor, True)
async_add_entities(entities, True)
class TautulliSensor(SensorEntity):
class TautulliSensor(CoordinatorEntity, SensorEntity):
"""Representation of a Tautulli sensor."""
def __init__(self, tautulli, name, monitored_conditions, users):
coordinator: TautulliDataUpdateCoordinator
def __init__(self, coordinator, name, monitored_conditions, users):
"""Initialize the Tautulli sensor."""
self.tautulli = tautulli
super().__init__(coordinator)
self.monitored_conditions = monitored_conditions
self.usernames = users
self.sessions = {}
self.home = {}
self._attributes = {}
self._name = name
self._state = None
async def async_update(self):
"""Get the latest data from the Tautulli API."""
await self.tautulli.async_update()
if (
not self.tautulli.activity
or not self.tautulli.home_stats
or not self.tautulli.users
):
return
self._attributes = {
"stream_count": self.tautulli.activity.stream_count,
"stream_count_direct_play": self.tautulli.activity.stream_count_direct_play,
"stream_count_direct_stream": self.tautulli.activity.stream_count_direct_stream,
"stream_count_transcode": self.tautulli.activity.stream_count_transcode,
"total_bandwidth": self.tautulli.activity.total_bandwidth,
"lan_bandwidth": self.tautulli.activity.lan_bandwidth,
"wan_bandwidth": self.tautulli.activity.wan_bandwidth,
}
for stat in self.tautulli.home_stats:
if stat.stat_id == "top_movies":
self._attributes["Top Movie"] = (
stat.rows[0].title if stat.rows else None
)
elif stat.stat_id == "top_tv":
self._attributes["Top TV Show"] = (
stat.rows[0].title if stat.rows else None
)
elif stat.stat_id == "top_users":
self._attributes["Top User"] = stat.rows[0].user if stat.rows else None
for user in self.tautulli.users:
if (
self.usernames
and user.username not in self.usernames
or user.username == "Local"
):
continue
self._attributes.setdefault(user.username, {})["Activity"] = None
for session in self.tautulli.activity.sessions:
if not self._attributes.get(session.username):
continue
self._attributes[session.username]["Activity"] = session.state
if self.monitored_conditions:
for key in self.monitored_conditions:
self._attributes[session.username][key] = getattr(session, key)
@property
def name(self):
@ -152,9 +93,9 @@ class TautulliSensor(SensorEntity):
@property
def native_value(self):
"""Return the state of the sensor."""
if not self.tautulli.activity:
if not self.coordinator.activity:
return 0
return self.tautulli.activity.stream_count
return self.coordinator.activity.stream_count
@property
def icon(self):
@ -169,22 +110,47 @@ class TautulliSensor(SensorEntity):
@property
def extra_state_attributes(self):
"""Return attributes for the sensor."""
return self._attributes
if (
not self.coordinator.activity
or not self.coordinator.home_stats
or not self.coordinator.users
):
return None
_attributes = {
"stream_count": self.coordinator.activity.stream_count,
"stream_count_direct_play": self.coordinator.activity.stream_count_direct_play,
"stream_count_direct_stream": self.coordinator.activity.stream_count_direct_stream,
"stream_count_transcode": self.coordinator.activity.stream_count_transcode,
"total_bandwidth": self.coordinator.activity.total_bandwidth,
"lan_bandwidth": self.coordinator.activity.lan_bandwidth,
"wan_bandwidth": self.coordinator.activity.wan_bandwidth,
}
class TautulliData:
"""Get the latest data and update the states."""
for stat in self.coordinator.home_stats:
if stat.stat_id == "top_movies":
_attributes["Top Movie"] = stat.rows[0].title if stat.rows else None
elif stat.stat_id == "top_tv":
_attributes["Top TV Show"] = stat.rows[0].title if stat.rows else None
elif stat.stat_id == "top_users":
_attributes["Top User"] = stat.rows[0].user if stat.rows else None
def __init__(self, api):
"""Initialize the data object."""
self.api = api
self.activity = None
self.home_stats = None
self.users = None
for user in self.coordinator.users:
if (
self.usernames
and user.username not in self.usernames
or user.username == "Local"
):
continue
_attributes.setdefault(user.username, {})["Activity"] = None
@Throttle(TIME_BETWEEN_UPDATES)
async def async_update(self):
"""Get the latest data from Tautulli."""
self.activity = await self.api.async_get_activity()
self.home_stats = await self.api.async_get_home_stats()
self.users = await self.api.async_get_users()
for session in self.coordinator.activity.sessions:
if not _attributes.get(session.username):
continue
_attributes[session.username]["Activity"] = session.state
if self.monitored_conditions:
for key in self.monitored_conditions:
_attributes[session.username][key] = getattr(session, key)
return _attributes