Move Data coordinator to separate module

pull/116204/head
tr4nt0r 2024-04-26 16:11:07 +00:00
parent 3c06ea2cc0
commit f5c8c3c886
3 changed files with 74 additions and 61 deletions

View File

@ -23,6 +23,7 @@ from .const import (
EVENT_API_CALL_SUCCESS,
SERVICE_API_CALL,
)
from .coordinator import HabitipyData
_LOGGER = logging.getLogger(__name__)
@ -107,7 +108,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
api = None
for entry in entries:
if entry.data[CONF_NAME] == name:
api = hass.data[DOMAIN].get(entry.entry_id)
coordinator = hass.data[DOMAIN].get(entry.entry_id)
api = coordinator.api
break
if api is None:
_LOGGER.error("API_CALL: User '%s' not configured", name)
@ -126,7 +128,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
EVENT_API_CALL_SUCCESS, {ATTR_NAME: name, ATTR_PATH: path, ATTR_DATA: data}
)
data = hass.data.setdefault(DOMAIN, {})
config = entry.data
websession = async_get_clientsession(hass)
url = config[CONF_URL]
@ -142,8 +143,9 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
entry,
data={**entry.data, CONF_NAME: name},
)
data[entry.entry_id] = api
coordinator = HabitipyData(api)
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
if not hass.services.has_service(DOMAIN, SERVICE_API_CALL):

View File

@ -0,0 +1,67 @@
"""Data Coordinator for Habitica."""
from datetime import timedelta
from http import HTTPStatus
import logging
from typing import Any
from aiohttp import ClientResponseError
from homeassistant.util import Throttle
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=1)
class HabitipyData:
"""Habitica API user data cache."""
tasks: dict[str, Any] = {}
def __init__(self, api) -> None:
"""Habitica API user data cache."""
self.api = api
self.data = None
@Throttle(MIN_TIME_BETWEEN_UPDATES)
async def update(self):
"""Get a new fix from Habitica servers."""
try:
self.data = await self.api.user.get()
except ClientResponseError as error:
if error.status == HTTPStatus.TOO_MANY_REQUESTS:
_LOGGER.warning(
(
"Sensor data update for %s has too many API requests;"
" Skipping the update"
),
DOMAIN,
)
else:
_LOGGER.error(
"Count not update sensor data for %s (%s)",
DOMAIN,
error,
)
for task_type in ("habits", "dailys", "todos", "rewards", "completedTodos"):
try:
self.tasks[task_type] = await self.api.tasks.user.get(type=task_type)
except ClientResponseError as error:
if error.status == HTTPStatus.TOO_MANY_REQUESTS:
_LOGGER.warning(
(
"Sensor data update for %s has too many API requests;"
" Skipping the update"
),
DOMAIN,
)
else:
_LOGGER.error(
"Count not update sensor data for %s (%s)",
DOMAIN,
error,
)

View File

@ -6,11 +6,8 @@ from collections import namedtuple
from dataclasses import dataclass
from datetime import timedelta
from enum import StrEnum
from http import HTTPStatus
import logging
from typing import TYPE_CHECKING, Any
from aiohttp import ClientResponseError
from typing import TYPE_CHECKING
from homeassistant.components.sensor import (
SensorDeviceClass,
@ -22,7 +19,6 @@ from homeassistant.const import CONF_NAME, CONF_URL
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.util import Throttle
from .const import DOMAIN, MANUFACTURER, NAME
@ -162,7 +158,7 @@ async def async_setup_entry(
"""Set up the habitica sensors."""
name = config_entry.data[CONF_NAME]
sensor_data = HabitipyData(hass.data[DOMAIN][config_entry.entry_id])
sensor_data = hass.data[DOMAIN][config_entry.entry_id]
await sensor_data.update()
entities: list[SensorEntity] = [
@ -175,58 +171,6 @@ async def async_setup_entry(
async_add_entities(entities, True)
class HabitipyData:
"""Habitica API user data cache."""
tasks: dict[str, Any]
def __init__(self, api) -> None:
"""Habitica API user data cache."""
self.api = api
self.data = None
self.tasks = {}
@Throttle(MIN_TIME_BETWEEN_UPDATES)
async def update(self):
"""Get a new fix from Habitica servers."""
try:
self.data = await self.api.user.get()
except ClientResponseError as error:
if error.status == HTTPStatus.TOO_MANY_REQUESTS:
_LOGGER.warning(
(
"Sensor data update for %s has too many API requests;"
" Skipping the update"
),
DOMAIN,
)
else:
_LOGGER.error(
"Count not update sensor data for %s (%s)",
DOMAIN,
error,
)
for task_type in TASKS_TYPES:
try:
self.tasks[task_type] = await self.api.tasks.user.get(type=task_type)
except ClientResponseError as error:
if error.status == HTTPStatus.TOO_MANY_REQUESTS:
_LOGGER.warning(
(
"Sensor data update for %s has too many API requests;"
" Skipping the update"
),
DOMAIN,
)
else:
_LOGGER.error(
"Count not update sensor data for %s (%s)",
DOMAIN,
error,
)
class HabitipySensor(SensorEntity):
"""A generic Habitica sensor."""